# Llama

![](https://ucarecdn.com/05af6071-73b2-4067-9a39-632fcb2f24e9/)


Llama 1 вышла в феврале 2023 года. Это уже подальше, чем GPT-2. И в ее архитектуре появилось уже больше серьезных изменений:

- Нормализация RMSNorm (вместе с pre-norm).
- Функция активации SwiGLU.
- Новый способ кодирования позиций — Rotary Positional Embeddings.

# RMSNorm

![2\_rmsnorm.png](https://ucarecdn.com/2975a217-27ff-4d26-b4a1-cc48a8de1e45/)

В Llama используется более быстрая и эффективная нормализация — **RMSNorm (Root Mean Square Normalization)**.
И, также как в GPT-2, используется *pre-norm* нормализация, то есть слои нормализации располагаются **перед блоками внимания и FNN**.

RMSNorm отличается от обычной нормализации только одним: в нём исключен этап центрирования (вычитание среднего) и используется только масштабирование по RMS.
Это сокращает вычислительные затраты (на 7–64%) без существенной потери качества.
На картинке показана разница в распределении после применения RMSNorm и LayerNorm к исходным данным — RMSNorm не разбросан вокруг нуля.

<p align="center">
  <img src="https://ucarecdn.com/cbfbb78e-e2b0-40e2-ba56-73e5114d54f6/" width="350" alt="RMSNorm vs LayerNorm">
</p>

## Этапы вычисления RMSNorm

1. **Вычисление среднеквадратичного значения:**

   $$\text{RMS}(\mathbf{x}) = \sqrt{\frac{1}{d} \sum_{j=1}^{d} x_j^2}$$

2. **Нормализация входящего вектора:**

   $$\hat{x}_i = \frac{x_i}{\text{RMS}(\mathbf{x})}$$

3. **Применение масштабирования:**

   $$y_i = w_i \cdot \hat{x}_i$$

---

**Где:**

* $x_i$ — *i*-й элемент входящего вектора.
* $w_i$ — *i*-й элемент обучаемого вектора весов.
  Использование весов позволяет модели адаптивно регулировать амплитуду признаков.
  Без них нормализация была бы слишком «жёсткой» и могла бы ограничить качество модели.
* $d$ — размерность входящего вектора.
* $\varepsilon$ — малая константа (например, 1e-6), предотвращает деление на ноль.

---

Так как на вход подаётся тензор, то в векторной форме RMSNorm вычисляется так:

$$
RMSNorm(x) = w ⊙ \frac{x}{\sqrt{mean(x^2) + ϵ}}
$$

**Где:**

* $x$ — входящий тензор размера `batch_size × ...`



In [None]:
import torch
from torch import nn

class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self._eps = eps
        self._w = nn.Parameter(torch.ones(dim))
    
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        rms = (x.pow(2).mean(-1, keepdim=True) + self._eps) ** 0.5
        norm_x = x / rms
        return self._w * norm_x

# SwiGLU

![3\_swiglu.png](https://ucarecdn.com/120dea77-2bf2-455f-9b54-c35c4efddc9e/)

В **Llama** ввели новую функцию активации — **SwiGLU (Swish-Gated Linear Unit)** — это гибридная функция активации, которая представляет собой комбинацию трёх линейных преобразований и функции активации **SiLU (Sigmoid Linear Unit)**, она же *Swish* в терминологии Google.

Формула SwiGLU выглядит так:

$$
\text{SwiGLU}(x) = \text{down}(\text{SiLU}(\text{gate}(x)) \otimes \text{up}(x))
$$

где:

* $x$ — входящий тензор.
* $\text{gate}(x)$ — линейный слой для гейтового механизма. Преобразует вход `x` размерностью `emb_size` в промежуточное представление размерности `4 * emb_size`.
* $\text{up}(x)$ — линейный слой для увеличения размерности. Также преобразует `x` в размерность `4 * emb_size`.
* $\text{SiLU}(x) = x \cdot \sigma(x)$ — функция активации, где $\sigma$ — сигмоида.
* $\otimes$ — поэлементное умножение.
* $\text{down}(x)$ — линейный слой для уменьшения промежуточного представления до исходного размера (`emb_size`).

> **Гейтинг** (от слова *gate* — «врата») — это механизм, который позволяет сети динамически фильтровать, какая информация должна проходить дальше.
> При гейтинге создаются как бы два независимых потока:
>
> * один предназначен для прямой передачи информации (*up-down*),
> * другой — для контроля передаваемой информации (*gate*).
>
> Это позволяет сети учить более сложные паттерны.
> Например, гейт может научиться:
> «если признак A активен, то пропусти признак B»,
> что невозможно с простой функцией активации между линейными слоями.
>
> Также гейтинг помогает с затуханием градиентов: вместо простого обнуления (как в ReLU), гейт может тонко модулировать силу сигнала.

SwiGLU более сложная (дорогая), чем ReLU/GELU, так как требует больше вычислений (три линейных преобразования вместо двух).
Но при этом показывает лучшее качество по сравнению с ReLU и GELU.

График **SiLU** похож на **GELU**, но более гладкий:

<p align="center">
  <img src="https://ucarecdn.com/6683e0c8-96b7-4389-826a-a73708b4a835/" width="500" alt="SiLU vs GELU">
</p>


In [16]:
import torch
from torch import nn
import torch.nn.functional as F

class SiLU(nn.Module):
    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size]
        return torch.sigmoid(x) * x

## SwiGLU

In [None]:
import torch
from torch import nn

class SwiGLU(nn.Module):
    def __init__(self, emb_size: int, dropout: float = 0.1):
        super().__init__()

        self._gate = nn.Linear(emb_size, 4 * emb_size)
        self._up = nn.Linear(emb_size, 4 * emb_size)
        self._down = nn.Linear(4 * emb_size, emb_size)
        self._actvation = SiLU()
        self._dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor): # [batch_size × seq_len × emb_size].
        gate_out = self._gate(x)                          # [batch, seq, 4*emb]
        activation_out = self._activation(gate_out)       # [batch, seq, 4*emb]
        up_out = self._up(x)                              # [batch, seq, 4*emb]
        out = up_out * activation_out                     # поэлементное!
        out = self._down(out)                             # [batch, seq, emb]
        return self._dropout(out)

        

In [11]:
import torch
from torch import nn
import torch.nn.functional as F
from math import sqrt
import torch
from torch import nn
from torch import Tensor

class TokenEmbeddings(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int):
        super().__init__()
        self._embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=emb_size
        )

    def forward(self, x: Tensor) -> Tensor:
        return self._embedding(x)

    @property
    def num_embeddings(self) -> int:
        return self._embedding.num_embeddings

    @property
    def embedding_dim(self) -> int:
        return self._embedding.embedding_dim


import torch
from torch import nn, Tensor

class PositionalEmbeddings(nn.Module):
    def __init__(self, max_seq_len: int, emb_size: int):
        super().__init__()
        self.max_seq_len = max_seq_len
        self.emb_size = emb_size
        self.embedding = nn.Embedding(
            num_embeddings=max_seq_len,
            embedding_dim=emb_size
        )

    def forward(self, seq_len: int, start_pos: int = 0) -> Tensor:
        if seq_len < 1 or seq_len > self.max_seq_len:
            raise IndexError(f"Длина {seq_len} должна быть от 1 до {self.max_seq_len}")
        if start_pos == 0:
            positions = torch.arange(seq_len, device=self.embedding.weight.device)
        else:
            positions = torch.arange(start=start_pos, end=start_pos + seq_len, device=self.embedding.weight.device)
        return self.embedding(positions)
    
    
class HeadAttention(nn.Module):

    def __init__(self, emb_size: int, head_size: int, max_seq_len: int):
        super().__init__()
        self._emb_size = emb_size
        self._head_size = head_size
        self._max_seq_len = max_seq_len

        self._k = nn.Linear(emb_size, head_size)
        self._q = nn.Linear(emb_size, head_size)
        self._v = nn.Linear(emb_size, head_size)

        mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
        self.register_buffer('_tril_mask', mask.bool() if hasattr(torch, 'bool') else mask.byte())

    def forward(self, x: torch.Tensor, use_cache: bool = True, cache: tuple = None) -> tuple:
        seq_len = x.shape[1]
        if seq_len > self._max_seq_len:
            raise ValueError(f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}")

        k = self._k(x)  # [B, T, hs]
        q = self._q(x)  # [B, T, hs]
        v = self._v(x)  # [B, T, hs]

        if cache is not None:
            k_cache, v_cache = cache
            k = torch.cat([k_cache, k], dim=1)  # [B, cache_len + T, hs]
            v = torch.cat([v_cache, v], dim=1)  # [B, cache_len + T, hs]
        
        scores = q @ k.transpose(-2, -1) / sqrt(self._head_size)
        
        if cache is None:
            scores = scores.masked_fill(~self._tril_mask[:seq_len, :seq_len], float('-inf'))
        
        weights = F.softmax(scores, dim=-1)
        x_out = weights @ v  # [B, T, hs]

        if use_cache is True:
            return (x_out, (k, v))
        else:
            return (x_out, None)
        
from torch import nn
import torch
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads: int, emb_size: int, head_size: int, max_seq_len: int, dropout: float = 0.1):

        super().__init__()
        self._heads = nn.ModuleList([
            HeadAttention(
                emb_size=emb_size, 
                head_size=head_size, 
                max_seq_len=max_seq_len
            ) for _ in range(num_heads)
        ])
        self._layer = nn.Linear(head_size * num_heads, emb_size)
        self._dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None):

        attention_results = []
        for i, head in enumerate(self._heads):
            head_cache = cache[i] if cache is not None else None
            result = head(x, use_cache=use_cache, cache=head_cache)
            attention_results.append(result)
        
        outputs, caches = zip(*attention_results)
        attention_outputs = list(outputs)
        kv_caches = list(caches)
 
        concatenated_attention = torch.cat(attention_outputs, dim=-1)

        projected_output = self._layer(concatenated_attention)
        
        final_output = self._dropout(projected_output)
        
        if use_cache is True:
            return (final_output, kv_caches)
        else:
            return (final_output, None)


class GELU(nn.Module):
    def __init__(self):
        super().__init__()
        self.sqrt_2_over_pi = torch.sqrt(torch.tensor(2.0) / math.pi)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return 0.5 * x * (1 + torch.tanh(
            self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))
        ))

class FeedForward(nn.Module):

    def __init__(self, emb_size: int, dropout: float = 0.1):
        super().__init__()
        self._layer1 = nn.Linear(emb_size, emb_size * 4)
        self._gelu = GELU()
        self._layer2 = nn.Linear(emb_size * 4, emb_size)
        self._dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor):
        input_dtype = x.dtype
        
        if input_dtype != self._layer1.weight.dtype:
            self._layer1 = self._layer1.to(dtype=input_dtype)
            self._layer2 = self._layer2.to(dtype=input_dtype)
            
        x = self._layer1(x)
        x = self._gelu(x)
        x = self._layer2(x)
        return self._dropout(x)
    
class Decoder(nn.Module):
    def __init__(self, 
        num_heads: int,
        emb_size: int,
        head_size: int,
        max_seq_len: int,
        dropout: float = 0.1
    ):
        super().__init__()
        self._heads = MultiHeadAttention(
            num_heads=num_heads, 
            emb_size=emb_size, 
            head_size=head_size, 
            max_seq_len=max_seq_len, 
            dropout=dropout
        )
        self._ff = FeedForward(emb_size=emb_size, dropout=dropout)
        self._norm1 = RMSNorm(emb_size)
        self._norm2 = RMSNorm(emb_size)

    def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None) -> torch.Tensor:
        norm1_out = self._norm1(x)
        attention, kv_caches = self._heads(norm1_out, mask, use_cache=use_cache, cache=cache)
        out = attention + x
        
        norm2_out = self._norm2(out)
        ffn_out = self._ff(norm2_out)

        if use_cache is True:
            return (ffn_out + out, kv_caches)
        else:
            return (ffn_out + out, None)



from torch import nn
import torch
import torch.nn.functional as F

class Llama(nn.Module):
    def __init__(self,
        vocab_size: int,
        max_seq_len: int,
        emb_size: int,
        num_heads: int,
        head_size: int,
        num_layers: int,
        dropout: float = 0.1,
        device: str = 'cpu'
    ):
        super().__init__()
        self._vocab_size = vocab_size
        self._max_seq_len = max_seq_len
        self._emb_size = emb_size
        self._num_heads = num_heads
        self._head_size = head_size
        self._num_layers = num_layers
        self._dropout = dropout
        self._device = device
        
        self.validation_loss = None

        # Инициализация слоев
        self._token_embeddings = TokenEmbeddings(
            vocab_size=vocab_size, 
            emb_size=emb_size
        )
        self._position_embeddings = PositionalEmbeddings(
            max_seq_len=max_seq_len, 
            emb_size=emb_size
        )
        self._dropout = nn.Dropout(dropout)
        self._decoders = nn.ModuleList([Decoder(
            num_heads=num_heads,
            emb_size=emb_size,
            head_size=head_size,
            max_seq_len=max_seq_len,
            dropout=dropout 
        ) for _ in range(num_layers)])
        self._norm = RMSNorm(emb_size)
        self._linear = nn.Linear(emb_size, vocab_size)

    def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
        # Проверка длины последовательности (только при отсутствии кэша)
        if cache is None and x.size(1) > self._max_seq_len:
            raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
        
        
        # Вычисление start_pos из кэша (если кэш передан)
        if cache is not None:
            # При кэше обрабатываем только один токен (последний)
            seq_len = 1
            # Вычисляем start_pos из самого нижнего уровня кэша
            if cache and cache[0] and cache[0][0]:
                key_cache, _ = cache[0][0]  # Первый декодер, первая голова
                start_pos = key_cache.size(1)  # cache_len
            else:
                start_pos = 0
        else:
            # Без кэша работаем как раньше
            start_pos = 0
            seq_len = x.size(1)

        # Эмбеддинги токенов и позиций
        tok_out = self._token_embeddings(x)  # [batch, seq_len, emb_size]
        pos_out = self._position_embeddings(seq_len, start_pos=start_pos)  # [seq_len, emb_size]
        
        # Комбинирование
        out = self._dropout(tok_out + pos_out.unsqueeze(0))  # [batch, seq_len, emb_size]
        
        # Стек декодеров с передачей кэша
        new_cache = []
        for i, decoder in enumerate(self._decoders):
            decoder_cache = cache[i] if cache is not None else None
            decoder_result = decoder(out, use_cache=use_cache, cache=decoder_cache)

            # Извлекаем результат из кортежа
            if use_cache:
                out, decoder_new_cache = decoder_result
                new_cache.append(decoder_new_cache)
            else:
                out = decoder_result[0]

        out = self._norm(out)
        logits = self._linear(out)
            
        # Возвращаем результат с учетом use_cache
        if use_cache:
            return (logits, new_cache)
        else:
            return (logits, None)

    def generate(self,
        x: torch.Tensor, 
        max_new_tokens: int, 
        do_sample: bool,
        temperature: float = 1.0,
        top_k: int = None,
        top_p: float = None,
        use_cache: bool = True
    ) -> torch.Tensor:
        cache = None

        for _ in range(max_new_tokens):
            if use_cache and cache is not None:
                # Используем кэш - передаем только последний токен
                x_input = x[:, -1:]  # [batch_size, 1]
            else:
                # Первая итерация или кэш отключен - передаем всю последовательность
                x_input = x
            
            # Прямой проход с кэшем
            logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
            
            # Обновляем кэш для следующей итерации
            if use_cache:
                cache = new_cache

            last_logits = logits[:, -1, :]  # [batch_size, vocab_size]

            # Масштабируем логиты температурой
            if temperature > 0:
                logits_scaled = last_logits / temperature
            else:
                logits_scaled = last_logits

            if do_sample == True and top_k != None:
                _, topk_indices = torch.topk(logits_scaled, top_k, dim=-1)

                # # Заменим все НЕ top-k логиты на -inf
                masked_logits = logits_scaled.clone()
                vocab_size = logits_scaled.size(-1)

                # создаём маску: 1, если токен НЕ в topk_indices
                mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
                mask.scatter_(1, topk_indices, 0)  # 0 там, где top-k индексы
                masked_logits[mask.byte()] = float('-inf')

                logits_scaled = masked_logits

            if do_sample == True and top_p != None:
                # 1. Применим softmax, чтобы получить вероятности:
                probs = F.softmax(logits_scaled, dim=-1)  # [B, vocab_size]
                # 2. Отсортируем токены по убыванию вероятностей:
                sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
                # 3. Посчитаем кумулятивную сумму вероятностей:
                cum_probs = torch.cumsum(sorted_probs, dim=-1)  # [B, vocab_size]
                # 4. Определим маску: оставить токены, пока сумма < top_p
                sorted_mask = (cum_probs <= top_p).byte()  # [B, vocab_size]
                # Гарантируем, что хотя бы первый токен останется
                sorted_mask[:, 0] = 1
                # 5. Преобразуем маску обратно в оригинальный порядок:
                # Создаём полную маску из 0
                mask = torch.zeros_like(probs, dtype=torch.uint8)
                # Устанавливаем 1 в местах нужных токенов
                mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
                # 6. Зануляем логиты токенов вне топ-p:
                logits_scaled[~mask] = float('-inf')

            # 4. Применяем Softmax
            probs = F.softmax(logits_scaled, dim=-1)  # [batch_size, vocab_size]


            if do_sample == True:
                # 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
                next_token = torch.multinomial(probs, num_samples=1)  # [batch_size, 1]
            else:
                # 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
                next_token = torch.argmax(probs, dim=-1, keepdim=True)  # [batch_size, 1]
            
            # 6. Добавляем его к последовательности
            x = torch.cat([x, next_token], dim=1)  # [batch_size, seq_len+1]
        return x

    def save(self, path):
        torch.save({
            'model_state_dict': self.state_dict(),
            'vocab_size': self._vocab_size,
            'max_seq_len': self._max_seq_len,
            'emb_size': self._emb_size,
            'num_heads': self._num_heads,
            'head_size': self._head_size,
            'num_layers': self._num_layers
        }, path)

    @classmethod
    def load(cls, path, device):
        checkpoint = torch.load(path, map_location=device)
        model = cls(
            vocab_size=checkpoint['vocab_size'],
            max_seq_len=checkpoint['max_seq_len'],
            emb_size=checkpoint['emb_size'],
            num_heads=checkpoint['num_heads'],
            head_size=checkpoint['head_size'],
            num_layers=checkpoint['num_layers']
        )
        model.load_state_dict(checkpoint['model_state_dict'])
        model.to(device)
        return model

    @property
    def max_seq_len(self) -> int:
        return self._max_seq_len