Рефакторинг: единообразие оформления кода (пробелы, кавычки, пустые строки), без изменения логики по всему проекту.

This commit is contained in:
Sergey Penkovsky
2025-10-06 22:57:19 +03:00
parent 332cad6159
commit 712278e33c
49 changed files with 2324 additions and 2004 deletions

View File

@@ -19,23 +19,25 @@ from abc import ABC, abstractmethod
from typing import Optional, Tuple
import torch
class BaseModel(nn.Module, ABC):
"""
Абстрактный класс — стандарт для всех архитектур LLM.
Научная идея:
Реализация унифицированного входа/выхода для поддержки построения и обучения любых современных языковых моделей.
Args:
config (dict): Параметры архитектуры (размерность эмбеддингов, число слоев, heads и т.д.)
Attributes:
config (dict): Конфиг модели
"""
def __init__(self, config: dict):
"""
Инициализация модели.
Args:
config (dict): Настройки архитектуры модели (размеры слоев, типы блоков и т.д.)
"""
@@ -43,10 +45,12 @@ class BaseModel(nn.Module, ABC):
self.config = config
@abstractmethod
def forward(self, input_ids: torch.Tensor, attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
def forward(
self, input_ids: torch.Tensor, attention_mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""
Прямой проход — получение логитов для входных токенов.
Args:
input_ids (Tensor[int]): Индексы токенов [batch, seq_len]
attention_mask (Optional[Tensor[bool]]): Маска разрешенных позиций (если требуется) [batch, seq_len]
@@ -59,7 +63,7 @@ class BaseModel(nn.Module, ABC):
def generate(self, input_ids: torch.Tensor, max_length: int = 50) -> torch.Tensor:
"""
Генерация текста (авторегрессивно, greedy или sampling).
Args:
input_ids (Tensor[int]): Начальные токены [batch, start_len]
max_length (int): Максимальная длина последовательности

View File

@@ -6,6 +6,7 @@ from .feed_forward import FeedForward
from .multi_head_attention import MultiHeadAttention
from .rope import RoPE
class CachedDecoder(nn.Module):
"""
Универсальный декодерный блок для современных LLM (GPT, LLaMA, др.), поддерживает кэширование key-value для эффективной генерации.
@@ -28,7 +29,7 @@ class CachedDecoder(nn.Module):
norm_layer (тип nn.Module): Normalization слой (LayerNorm или RMSNorm)
dropout (float): Dropout
rope (RoPE|None): Экземпляр RoPE (для LLaMA)
Пример (GPT2 style):
>>> decoder = CachedDecoder(
... feed_forward_layer=FeedForward(...),
@@ -36,6 +37,7 @@ class CachedDecoder(nn.Module):
... num_heads=4, emb_size=256, head_size=64, max_seq_len=128)
>>> out, cache = decoder(x, use_cache=True)
"""
def __init__(
self,
feed_forward_layer: nn.Module,
@@ -49,7 +51,7 @@ class CachedDecoder(nn.Module):
):
"""
Инициализация декодера с кэшированием.
Поведение аналогично блоку TransformerDecoderLayer,
но с гибкой возможностью подмены любых подкомпонент (активация, norm, позиции).
@@ -85,7 +87,7 @@ class CachedDecoder(nn.Module):
):
"""
Прямой проход с поддержкой кэша.
Args:
x (Tensor[float]): [batch, seq_len, emb_size] — скрытые состояния
mask (Optional[Tensor]): маска внимания (или causal mask), shape [seq_len, seq_len]
@@ -111,4 +113,4 @@ class CachedDecoder(nn.Module):
if use_cache:
return (result, kv_caches)
else:
return (result, None)
return (result, None)

View File

@@ -3,6 +3,7 @@ import torch
from .feed_forward import FeedForward
from .multi_head_attention import MultiHeadAttention
class Decoder(nn.Module):
"""
Базовый автогерессивный блок-декодер трансформера (без кэша KV).
@@ -24,12 +25,14 @@ class Decoder(nn.Module):
>>> out = decoder(x)
>>> print(out.shape) # torch.Size([1, 10, 512])
"""
def __init__(self,
def __init__(
self,
num_heads: int,
emb_size: int,
head_size: int,
max_seq_len: int,
dropout: float = 0.1
dropout: float = 0.1,
):
"""
Инициализация декодера.
@@ -43,11 +46,11 @@ class Decoder(nn.Module):
"""
super().__init__()
self._heads = MultiHeadAttention(
num_heads=num_heads,
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len,
dropout=dropout
num_heads=num_heads,
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len,
dropout=dropout,
)
self._ff = FeedForward(emb_size=emb_size, dropout=dropout)
self._norm1 = nn.LayerNorm(emb_size)
@@ -73,7 +76,7 @@ class Decoder(nn.Module):
# Self-Attention блок
attention, _ = self._heads(x, mask, use_cache=False, cache=None)
out = self._norm1(attention + x)
# FeedForward блок
ffn_out = self._ff(out)
return self._norm2(ffn_out + out)
return self._norm2(ffn_out + out)

View File

@@ -16,7 +16,7 @@ class FeedForward(nn.Module):
- После внимания каждому токену применяется одинаковая двухслойная нейросеть.
- Дает глубокую нелинейность; позволяет модели не только сопоставлять, но и моделировать сложные связи между токенами.
- Изначально предложен в «Attention is All You Need» (Vaswani et al., 2017).
Формула:
FFN(x) = Dropout(W2·act(W1·x))
где act — ReLU, GELU и др., обычно expansion x4.
@@ -33,22 +33,23 @@ class FeedForward(nn.Module):
- Добавляет нелинейность в архитектуру трансформера
- Обеспечивает взаимодействие между различными размерностями эмбеддингов
- Работает независимо для каждого токена в последовательности
Args:
emb_size (int): размерность входных эмбеддингов
dropout (float): вероятность(dropout)
activation (str): нелинейная функция (relu, gelu, gelu_exact)
Пример:
>>> ff = FeedForward(emb_size=512, dropout=0.1)
>>> x = torch.randn(32, 10, 512)
>>> output = ff(x)
>>> print(output.shape) # torch.Size([32, 10, 512])
"""
def __init__(self, emb_size: int, dropout: float = 0.1, activation: str = "relu"):
"""
Инициализация слоя Feed Forward Network.
Args:
emb_size: Размерность входных эмбеддингов
dropout: Вероятность dropout для регуляризации (по умолчанию: 0.1)
@@ -73,23 +74,23 @@ class FeedForward(nn.Module):
def forward(self, x: torch.Tensor):
"""
Прямой проход через слой Feed Forward Network.
Args:
x: Входной тензор размерности [batch_size, seq_len, emb_size]
Returns:
Тензор той же размерности, что и входной
"""
# Сохраняем dtype входных данных
input_dtype = x.dtype
# Приводим веса к нужному типу если необходимо
if input_dtype != self._layer1.weight.dtype:
self._layer1 = self._layer1.to(dtype=input_dtype)
self._layer2 = self._layer2.to(dtype=input_dtype)
# Пропустим тензор x по очереди через все созданные слои
x = self._layer1(x)
x = self._activation(x)
x = self._layer2(x)
return self._dropout(x)
return self._dropout(x)

View File

@@ -1,6 +1,7 @@
import torch
from torch import nn
class GELU(nn.Module):
"""
Гауссовская Эрф-активация (GELU, Gaussian Error Linear Unit).
@@ -17,11 +18,14 @@ class GELU(nn.Module):
>>> y = gelu(torch.tensor([-1.0, 0.0, 1.0]))
>>> print(y)
"""
def __init__(self):
super().__init__()
self.sqrt_2_over_pi = torch.sqrt(torch.tensor(2.0) / math.pi)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return 0.5 * x * (1 + torch.tanh(
self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))
))
return (
0.5
* x
* (1 + torch.tanh(self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))))
)

View File

@@ -4,6 +4,7 @@ import torch.nn.functional as F
from math import sqrt
from .rope import RoPE
class HeadAttention(nn.Module):
"""
Одноголовый механизм внимания (scaled dot-product attention) — фундаментальный строительный блок всех современных Transformer.
@@ -11,11 +12,11 @@ class HeadAttention(nn.Module):
Научная суть:
- Attention учит модель самостоятельно "выбирать" важные связи между словами, независимо от их положения.
- Механизм causal mask гарантирует невозможность "заглядывания в будущее" при генерации (авторегрессия).
Формула:
Attention(Q, K, V) = softmax(QK^T / sqrt(d_k)) · V
(Q — запросы, K — ключи, V — значения; d_k — размерность ключа)
Поддерживает Rotary Position Encoding (RoPE) для относительного позиционного кодирования.
Args:
@@ -28,14 +29,17 @@ class HeadAttention(nn.Module):
- Использует нижнетреугольную маску для предотвращения "заглядывания в будущее"
- Автоматически адаптируется к разным версиям PyTorch
- Поддерживает batch-обработку входных данных
Пример использования:
>>> attention = HeadAttention(emb_size=64, head_size=32, max_seq_len=128)
>>> x = torch.randn(1, 10, 64)
>>> output, _ = attention(x)
>>> print(output.shape) # torch.Size([1, 10, 32])
"""
def __init__(self, emb_size: int, head_size: int, max_seq_len: int, rope: RoPE = None):
def __init__(
self, emb_size: int, head_size: int, max_seq_len: int, rope: RoPE = None
):
super().__init__()
self._emb_size = emb_size
self._head_size = head_size
@@ -49,21 +53,25 @@ class HeadAttention(nn.Module):
# Создание causal маски
mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
self.register_buffer('_tril_mask', mask.bool() if hasattr(torch, 'bool') else mask.byte())
self.register_buffer(
"_tril_mask", mask.bool() if hasattr(torch, "bool") else mask.byte()
)
def forward(self, x: torch.Tensor, use_cache: bool = True, cache: tuple = None) -> tuple:
def forward(
self, x: torch.Tensor, use_cache: bool = True, cache: tuple = None
) -> tuple:
"""
Прямой проход через слой внимания.
Аргументы:
x (torch.Tensor): Входной тензор формы [batch_size, seq_len, emb_size]
Возвращает:
torch.Tensor: Выходной тензор формы [batch_size, seq_len, head_size]
Исключения:
ValueError: Если длина последовательности превышает max_seq_len
Пример внутренних преобразований:
Для входа x.shape = [2, 5, 64]:
1. Q/K/V преобразования -> [2, 5, 32]
@@ -73,7 +81,9 @@ class HeadAttention(nn.Module):
"""
seq_len = x.shape[1]
if seq_len > self._max_seq_len:
raise ValueError(f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}")
raise ValueError(
f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}"
)
k = self._k(x) # [B, T, hs]
q = self._q(x) # [B, T, hs]
@@ -88,16 +98,18 @@ class HeadAttention(nn.Module):
k_cache, v_cache = cache
k = torch.cat([k_cache, k], dim=1) # [B, cache_len + T, hs]
v = torch.cat([v_cache, v], dim=1) # [B, cache_len + T, hs]
scores = q @ k.transpose(-2, -1) / sqrt(self._head_size)
if cache is None:
scores = scores.masked_fill(~self._tril_mask[:seq_len, :seq_len], float('-inf'))
scores = scores.masked_fill(
~self._tril_mask[:seq_len, :seq_len], float("-inf")
)
weights = F.softmax(scores, dim=-1)
x_out = weights @ v # [B, T, hs]
if use_cache is True:
return (x_out, (k, v))
else:
return (x_out, None)
return (x_out, None)

View File

@@ -3,6 +3,7 @@ import torch
from .head_attention import HeadAttention
from .rope import RoPE
class MultiHeadAttention(nn.Module):
"""
Мультиголовый (многоголовый) механизм внимания — ключевой компонент любого Transformer.
@@ -12,7 +13,7 @@ class MultiHeadAttention(nn.Module):
чтобы видеть разные связи в последовательности (разный контекст, локально/глобально).
- Каждый attention блок работает независимо, выход конкатенируется.
- Механизм предложен в статье "Attention is All You Need" (Vaswani et al., 2017).
Формула внимания для одной головы:
Attention(Q, K, V) = softmax(QK^T/sqrt(d_k))·V
Мультиголовый:
@@ -32,7 +33,16 @@ class MultiHeadAttention(nn.Module):
>>> out, cache = mha(x)
>>> print(out.shape)
"""
def __init__(self, num_heads: int, emb_size: int, head_size: int, max_seq_len: int, rope: RoPE = None, dropout: float = 0.1):
def __init__(
self,
num_heads: int,
emb_size: int,
head_size: int,
max_seq_len: int,
rope: RoPE = None,
dropout: float = 0.1,
):
"""
Инициализация многоголового внимания.
@@ -49,18 +59,27 @@ class MultiHeadAttention(nn.Module):
- max_seq_len зависит от задачи (512 для BERT, 2048 для GPT-3)
"""
super().__init__()
self._heads = nn.ModuleList([
HeadAttention(
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len,
rope=rope,
) for _ in range(num_heads)
])
self._heads = nn.ModuleList(
[
HeadAttention(
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len,
rope=rope,
)
for _ in range(num_heads)
]
)
self._layer = nn.Linear(head_size * num_heads, emb_size)
self._dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None):
def forward(
self,
x: torch.Tensor,
mask: torch.Tensor = None,
use_cache: bool = True,
cache: list = None,
):
"""
Прямой проход (forward):
Для каждого токена оценивает "важность" остальных токенов сразу через несколько attention-блоков.
@@ -76,7 +95,7 @@ class MultiHeadAttention(nn.Module):
4. Линейная проекция:
- Выход: [batch_size, seq_len, emb_size]
5. Применение dropout
Args:
x (Tensor[float]): [batch, seq_len, emb_size] — вход
mask (Optional[Tensor[bool]]): маска позиции [seq_len, seq_len]
@@ -94,11 +113,11 @@ class MultiHeadAttention(nn.Module):
Пример преобразований для emb_size=512, num_heads=8:
Вход: [4, 100, 512]
-> Каждая голова: [4, 100, 64]
-> После внимания: 8 x [4, 100, 64]
-> После внимания: 8 x [4, 100, 64]
-> Конкатенация: [4, 100, 512]
-> Проекция: [4, 100, 512]
-> Dropout: [4, 100, 512]
Пример:
>>> out, caches = mha(x)
>>> out.shape # [batch, seq_len, emb_size]
@@ -109,20 +128,20 @@ class MultiHeadAttention(nn.Module):
head_cache = cache[i] if cache is not None else None
result = head(x, use_cache=use_cache, cache=head_cache)
attention_results.append(result)
outputs, caches = zip(*attention_results)
attention_outputs = list(outputs)
kv_caches = list(caches)
# 2. Объединяем результаты всех голов
concatenated_attention = torch.cat(attention_outputs, dim=-1)
# 3. Проецируем в пространство эмбеддингов
projected_output = self._layer(concatenated_attention)
# 4. Применяем dropout для регуляризации
final_output = self._dropout(projected_output)
if use_cache is True:
return (final_output, kv_caches)
else:

View File

@@ -1,13 +1,14 @@
import torch
from torch import nn, Tensor
class PositionalEmbeddings(nn.Module):
"""
Обучаемые позиционные эмбеддинги (learnable positional embeddings).
Позиционные эмбеддинги используются в нейросетях для передачи информации
Позиционные эмбеддинги используются в нейросетях для передачи информации
о позиции элементов в последовательности (например, в Transformer).
Научная суть:
- Трансформеры не используют рекуррентность, а значит сами по себе не различают порядок слов.
- Позиционные эмбеддинги добавляются к токеновым, чтобы сеть понимала, в каком месте последовательности находится каждый токен.
@@ -16,7 +17,7 @@ class PositionalEmbeddings(nn.Module):
Args:
max_seq_len (int): максимальная длина последовательности
emb_size (int): размер вектора позиции
Пример использования:
>>> pos_encoder = PositionalEmbeddings(max_seq_len=100, emb_size=256)
>>> # Получить эмбеддинги для последовательности из 10 элементов
@@ -36,23 +37,22 @@ class PositionalEmbeddings(nn.Module):
self.max_seq_len = max_seq_len
self.emb_size = emb_size
self.embedding = nn.Embedding(
num_embeddings=max_seq_len,
embedding_dim=emb_size
num_embeddings=max_seq_len, embedding_dim=emb_size
)
def forward(self, seq_len: int, start_pos: int = 0) -> Tensor:
"""
Возвращает позиционные эмбеддинги для заданной длины последовательности.
Args:
seq_len (int): Длина последовательности (1 <= seq_len <= max_seq_len)
Returns:
Tensor: Тензор позиционных эмбеддингов формы [seq_len, emb_size]
Raises:
IndexError: Если seq_len выходит за допустимые границы
Пример:
>>> pos_encoder = PositionalEmbeddings(100, 64)
>>> emb = pos_encoder(10) # Тензор 10x64
@@ -62,5 +62,9 @@ class PositionalEmbeddings(nn.Module):
if start_pos == 0:
positions = torch.arange(seq_len, device=self.embedding.weight.device)
else:
positions = torch.arange(start=start_pos, end=start_pos + seq_len, device=self.embedding.weight.device)
positions = torch.arange(
start=start_pos,
end=start_pos + seq_len,
device=self.embedding.weight.device,
)
return self.embedding(positions)

View File

@@ -33,23 +33,23 @@ class RMSNorm(nn.Module):
- Упрощенный вариант LayerNorm без вычисления среднего, только деление на rms.
- Лучшая численная стабильность на больших моделях, меньше вычислений.
- Применяется в LLaMA, PaLM и др.
Формула:
RMSNorm(x) = (x / sqrt(mean(x²) + eps)) * w (w — обучаемый вектор)
Args:
dim (int): размер последнего измерения (обычно emb_size)
eps (float): для численной устойчивости
Пример:
>>> norm = RMSNorm(emb_size)
>>> out = norm(x)
"""
def __init__(self, dim: int, eps: float = 1e-6):
"""
Инициализация RMSNorm слоя.
Args:
dim: Размерность нормализуемого измерения
eps: Малое значение для численной стабильности (по умолчанию 1e-6)
@@ -57,27 +57,27 @@ class RMSNorm(nn.Module):
super().__init__()
self._eps = eps
self._w = nn.Parameter(torch.ones(dim))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Прямой проход через RMSNorm слой.
Args:
x: Входной тензор формы [..., dim]
Returns:
Нормализованный тензор той же формы, что и входной
Формула:
output = w * (x / sqrt(mean(x²) + eps))
"""
# Вычисление RMS (Root Mean Square) по последнему измерению
rms = (x.pow(2).mean(-1, keepdim=True) + self._eps) ** 0.5
# Нормализация и масштабирование
norm_x = x / rms
return self._w * norm_x
def extra_repr(self) -> str:
"""Строковое представление для отладки."""
return f'dim={self._w.shape[0]}, eps={self._eps}'
return f"dim={self._w.shape[0]}, eps={self._eps}"

View File

@@ -26,72 +26,72 @@ from typing import Optional
class RoPE(nn.Module):
"""
Rotary Positional Embeddings (RoPE) для механизма внимания.
Кодирует позиционную информацию через вращение векторов запросов и ключей
в многомерном пространстве с использованием синусов и косинусов.
Args:
head_size: Размерность головы внимания (должен быть четным)
max_seq_len: Максимальная длина последовательности
base: Базовое значение для вычисления частот (по умолчанию 10000)
Attributes:
cos_matrix: Буферизованная матрица косинусов формы [max_seq_len, head_size//2]
sin_matrix: Буферизованная матрица синусов формы [max_seq_len, head_size//2]
"""
def __init__(self, head_size: int, max_seq_len: int, base: int = 10_000):
"""
Инициализация RoPE эмбеддингов.
Args:
head_size: Размерность головы внимания (должен быть четным)
max_seq_len: Максимальная поддерживаемая длина последовательности
base: Базовое значение для вычисления частот (типично 10000)
Raises:
AssertionError: Если head_size не четный
"""
super().__init__()
assert head_size % 2 == 0, "head_size должен быть четным"
# Вычисление частот: θ_i = base^(-2i/d) для i ∈ [0, d/2-1]
freqs = 1.0 / (base ** (2 * torch.arange(head_size // 2).float() / head_size))
# Позиции от 0 до max_seq_len-1
positions = torch.arange(max_seq_len).float()
# Внешнее произведение: m * θ_i для всех позиций и частот
freq_matrix = positions.unsqueeze(1) * freqs.unsqueeze(0)
# Предвычисление матриц косинусов и синусов
self.register_buffer('cos_matrix', torch.cos(freq_matrix))
self.register_buffer('sin_matrix', torch.sin(freq_matrix))
self.register_buffer("cos_matrix", torch.cos(freq_matrix))
self.register_buffer("sin_matrix", torch.sin(freq_matrix))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Применение ротационного позиционного кодирования к входному тензору.
Args:
x: Входной тензор формы [batch_size, seq_len, head_size]
Returns:
Тензор с примененным RoPE формы [batch_size, seq_len, head_size]
Алгоритм:
1. Разделение векторов на четные и нечетные компоненты
2. Применение вращения через синусы и косинусы
3. Объединение компонент обратно
"""
seq_len = x.size(1)
# Берем нужную часть матриц и приводим к типу x
cos = self.cos_matrix[:seq_len].to(x.dtype) # [seq_len, head_size//2]
sin = self.sin_matrix[:seq_len].to(x.dtype) # [seq_len, head_size//2]
# Разделяем на четные и нечетные компоненты
x_even = x[:, :, 0::2] # [batch_size, seq_len, head_size//2]
x_odd = x[:, :, 1::2] # [batch_size, seq_len, head_size//2]
x_odd = x[:, :, 1::2] # [batch_size, seq_len, head_size//2]
# Применяем поворот: q' = q * cos(mθ) + rotate(q) * sin(mθ)
x_rotated_even = x_even * cos - x_odd * sin
@@ -101,4 +101,4 @@ class RoPE(nn.Module):
x_rotated = torch.stack([x_rotated_even, x_rotated_odd], dim=-1)
x_rotated = x_rotated.flatten(-2) # [batch_size, seq_len, head_size]
return x_rotated
return x_rotated

View File

@@ -1,19 +1,21 @@
import torch
from torch import nn
class SiLU(nn.Module):
"""
SiLU (Swish) — современная активационная функция для нейросетей.
Научная суть:
- Формула: $SiLU(x) = x * \sigm(x)$, где $\sigm(x)$ — сигмоида.
- Более гладкая альтернатива ReLU, улучшает поток градиентов в глубоких сетях.
- Используется во многих «state-of-the-art» архитектурах (SwiGLU, PaLM, LLaMA).
- Также известна как Swish (Ramachandran et al, 2017).
- Также известна как Swish (Ramachandran et al, 2017).
Пример:
>>> act = SiLU()
>>> x = torch.tensor([-1.0, 0.0, 1.0])
>>> print(act(x))
"""
def forward(self, x: torch.Tensor):
return torch.sigmoid(x) * x
return torch.sigmoid(x) * x

View File

@@ -27,13 +27,13 @@ class SwiGLU(nn.Module):
SwiGLU (Swish-Gated Linear Unit) — современная нелинейность для архитектур LLM (LLaMA, PaLM).
Реализация SwiGLU активационной функции.
Состоит из трех линейных слоев и активации SiLU:
1. Gate слой + SiLU активация
2. Up слой (линейное преобразование)
3. Element-wise multiplication gate и up
4. Down слой (линейная проекция)
Научная суть:
- Сохраняет преимущества GLU (раздельные гейтом и телом) + мощность Swish/SiLU активации.
- Дает надежную гладкую активацию, хорошо работает на больших масштабах.
@@ -50,11 +50,11 @@ class SwiGLU(nn.Module):
>>> ff = SwiGLU(emb_size=512, dropout=0.1)
>>> y = ff(torch.randn(2,10,512))
"""
def __init__(self, emb_size: int, dropout: float = 0.1):
"""
Инициализация SwiGLU слоя.
Args:
emb_size: Размерность входных/выходных эмбеддингов
dropout: Вероятность dropout (по умолчанию 0.1)
@@ -69,13 +69,13 @@ class SwiGLU(nn.Module):
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Прямой проход через SwiGLU слой.
Args:
x: Входной тензор формы [batch_size, seq_len, emb_size]
Returns:
Выходной тензор формы [batch_size, seq_len, emb_size]
Алгоритм:
1. gate = SiLU(linear_gate(x))
2. up = linear_up(x)
@@ -83,19 +83,19 @@ class SwiGLU(nn.Module):
4. apply dropout
"""
# Gate ветвь: линейное преобразование + активация
gate_out = self._gate(x) # [batch, seq, 4*emb]
activation_out = self._activation(gate_out) # [batch, seq, 4*emb]
gate_out = self._gate(x) # [batch, seq, 4*emb]
activation_out = self._activation(gate_out) # [batch, seq, 4*emb]
# Up ветвь: линейное преобразование
up_out = self._up(x) # [batch, seq, 4*emb]
up_out = self._up(x) # [batch, seq, 4*emb]
# Element-wise multiplication (gating mechanism)
out = up_out * activation_out # поэлементное умножение!
out = up_out * activation_out # поэлементное умножение!
# Final projection and dropout
out = self._down(out) # [batch, seq, emb]
out = self._down(out) # [batch, seq, emb]
return self._dropout(out)
def extra_repr(self) -> str:
"""Строковое представление для отладки."""
return f'emb_size={self._gate.in_features}, dropout={self._dropout.p}'
return f"emb_size={self._gate.in_features}, dropout={self._dropout.p}"

View File

@@ -2,13 +2,14 @@ import torch
from torch import nn
from torch import Tensor
class TokenEmbeddings(nn.Module):
"""
Токеновые эмбеддинги — обучаемые векторные представления для каждого токена словаря.
Преобразует целочисленные индексы токенов в обучаемые векторные представления фиксированного размера.
Обычно используется как первый слой в нейронных сетях для задач NLP.
Научная суть:
- Первый шаг для любого NLP-модуля: вместо индекса токена подаём его dense-вектор.
- Эти вектора изучаются в процессе обучения и отражают скрытые взаимосвязи между токенами.
@@ -22,18 +23,18 @@ class TokenEmbeddings(nn.Module):
Примечание:
- Индексы должны быть в диапазоне [0, vocab_size-1]
- Эмбеддинги инициализируются случайно и обучаются в процессе тренировки модели
Пример:
>>> emb = TokenEmbeddings(vocab_size=10000, emb_size=256)
>>> tokens = torch.tensor([[1, 2, 3]])
>>> vecs = emb(tokens)
>>> vecs.shape # torch.Size([1, 3, 256])
"""
def __init__(self, vocab_size: int, emb_size: int):
super().__init__()
self._embedding = nn.Embedding(
num_embeddings=vocab_size,
embedding_dim=emb_size
num_embeddings=vocab_size, embedding_dim=emb_size
)
def forward(self, x: Tensor) -> Tensor:
@@ -55,10 +56,7 @@ if __name__ == "__main__":
embedding = TokenEmbeddings(vocab_size=100, emb_size=128)
# Создаем тензор с индексами в пределах vocab_size (0-99)
tensor = torch.tensor([
[11, 45, 76, 34],
[34, 67, 45, 54]
])
tensor = torch.tensor([[11, 45, 76, 34], [34, 67, 45, 54]])
# Проверяем индексы
if (tensor >= 100).any():
@@ -66,4 +64,4 @@ if __name__ == "__main__":
output = embedding(tensor)
print("Embeddings shape:", output.shape)
print(f"{output.shape} | {output.mean().item():.11f}") # Формат как в ТЗ
print(f"{output.shape} | {output.mean().item():.11f}") # Формат как в ТЗ

View File

@@ -34,10 +34,10 @@ from llm.core.positional_embeddings import PositionalEmbeddings
class GPT(BaseModel):
"""
Original GPT (Generative Pre-trained Transformer) модель.
Первая версия трансформерной архитектуры от OpenAI, предназначенная
для генеративного предобучения на текстовых данных.
Args:
config: Словарь конфигурации с параметрами:
- vocab_size: Размер словаря токенов
@@ -46,7 +46,7 @@ class GPT(BaseModel):
- num_layers: Количество декодерных слоев
- max_position_embeddings: Максимальная длина последовательности
- dropout: Вероятность dropout
Attributes:
_token_embeddings: Слой векторных представлений токенов
_position_embeddings: Слой позиционных эмбеддингов
@@ -54,30 +54,34 @@ class GPT(BaseModel):
_norm: Финальный слой нормализации
_linear: Выходной линейный слой
"""
def __init__(self, config):
super().__init__(config)
# Инициализация слоев
self._max_seq_len = config["max_position_embeddings"]
self._token_embeddings = TokenEmbeddings(
vocab_size=config["vocab_size"],
emb_size=config["embed_dim"]
vocab_size=config["vocab_size"], emb_size=config["embed_dim"]
)
self._position_embeddings = PositionalEmbeddings(
max_seq_len=config["max_position_embeddings"],
emb_size=config["embed_dim"]
max_seq_len=config["max_position_embeddings"], emb_size=config["embed_dim"]
)
self._dropout = nn.Dropout(config["dropout"])
# head_size = emb_size // num_heads
self._decoders = nn.ModuleList([Decoder(
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
max_seq_len=config["max_position_embeddings"],
dropout=config["dropout"]
) for _ in range(config["num_layers"])])
self._decoders = nn.ModuleList(
[
Decoder(
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
max_seq_len=config["max_position_embeddings"],
dropout=config["dropout"],
)
for _ in range(config["num_layers"])
]
)
self._linear = nn.Linear(config["embed_dim"], config["vocab_size"])
@property
def max_seq_len(self):
"""Возвращает максимальную длину последовательности."""
@@ -85,57 +89,60 @@ class GPT(BaseModel):
def forward(self, x: torch.Tensor, attention_mask=None) -> torch.Tensor:
"""Прямой проход через GPT
Args:
x: Входной тензор [batch_size, seq_len]
Returns:
Тензор логитов [batch_size, seq_len, vocab_size]
"""
# Проверка длины последовательности
if x.size(1) > self._max_seq_len:
raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self._max_seq_len}")
raise ValueError(
f"Длина последовательности {x.size(1)} превышает максимальную {self._max_seq_len}"
)
# Эмбеддинги токенов и позиций
tok_out = self._token_embeddings(x) # [batch, seq_len, emb_size]
pos_out = self._position_embeddings(x.size(1)) # [seq_len, emb_size]
# Комбинирование
out = self._dropout(tok_out + pos_out.unsqueeze(0)) # [batch, seq_len, emb_size]
out = self._dropout(
tok_out + pos_out.unsqueeze(0)
) # [batch, seq_len, emb_size]
# Стек декодеров
for decoder in self._decoders:
out = decoder(out)
return self._linear(out) # [batch, seq_len, vocab_size]
# def forward(self, input_ids, attention_mask=None):
# B, T = input_ids.size()
# pos = torch.arange(0, T, device=input_ids.device).unsqueeze(0)
#
# x = self.token_emb(input_ids) + self.pos_emb(pos)
#
# for block in self.blocks:
# x = block(x, attention_mask)
#
# x = self.ln_f(x)
# logits = self.head(x)
# return logits
# def forward(self, input_ids, attention_mask=None):
# B, T = input_ids.size()
# pos = torch.arange(0, T, device=input_ids.device).unsqueeze(0)
#
# x = self.token_emb(input_ids) + self.pos_emb(pos)
#
# for block in self.blocks:
# x = block(x, attention_mask)
#
# x = self.ln_f(x)
# logits = self.head(x)
# return logits
def generate(self,
x: torch.Tensor,
max_new_tokens: int,
def generate(
self,
x: torch.Tensor,
max_new_tokens: int,
do_sample: bool,
temperature: float = 1.0,
top_k: int = None,
top_p: float = None,
attention_mask: torch.Tensor = None, # Добавляем для совместимости с HF
**kwargs # Игнорируем остальные параметры
**kwargs, # Игнорируем остальные параметры
) -> torch.Tensor:
"""Авторегрессивная генерация текста.
Параметры:
x: Входной тензор с индексами токенов формы [batch_size, seq_len],
где batch_size - размер батча, seq_len - длина последовательности.
@@ -157,9 +164,9 @@ class GPT(BaseModel):
- Гарантируется, что хотя бы один токен остаётся (даже если его вероятность > top_p)
- None: отключено (по умолчанию)
- Должен быть в диапазоне (0, 1]
Возвращает:
torch.Tensor: Тензор с расширенной последовательностью токенов формы
torch.Tensor: Тензор с расширенной последовательностью токенов формы
[batch_size, seq_len + max_new_tokens]
Исключения:
@@ -172,7 +179,7 @@ class GPT(BaseModel):
Примеры:
>>> # Жадная генерация
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=False)
>>>
>>>
>>> # Вероятностная генерация с top-k
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, top_k=50)
>>>
@@ -180,11 +187,11 @@ class GPT(BaseModel):
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, top_p=0.9)
>>>
>>> # Комбинация температуры и top-k
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True,
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True,
... temperature=0.7, top_k=50)
Примечания:
1. Для детерминированных результатов в режиме сэмплирования
1. Для детерминированных результатов в режиме сэмплирования
зафиксируйте random seed (torch.manual_seed).
2. Температура влияет только на режим сэмплирования (do_sample=True).
3. Одновременное использование top_k и top_p запрещено.
@@ -204,7 +211,7 @@ class GPT(BaseModel):
Должна быть > 0 (по умолчанию: 1.0)
Returns:
torch.Tensor: Тензор с расширенной последовательностью токенов формы
torch.Tensor: Тензор с расширенной последовательностью токенов формы
[batch_size, seq_len + max_new_tokens]
Raises:
@@ -222,13 +229,13 @@ class GPT(BaseModel):
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, temperature=1.5)
Note:
Для детерминированных результатов в режиме сэмплирования
Для детерминированных результатов в режиме сэмплирования
зафиксируйте random seed (torch.manual_seed).
Температура влияет только на режим сэмплирования (do_sample=True).
"""
for _ in range(max_new_tokens):
# 1. Обрезаем вход, если последовательность слишком длинная
x_cond = x[:, -self._max_seq_len:]
x_cond = x[:, -self._max_seq_len :]
# 2. Передаем последовательность в метод forward класса GPT и полуаем логиты.
logits = self.forward(x_cond)
@@ -250,9 +257,14 @@ class GPT(BaseModel):
vocab_size = logits_scaled.size(-1)
# создаём маску: True, если токен НЕ в topk_indices
mask = torch.ones_like(logits_scaled, dtype=torch.bool if hasattr(torch, 'bool') else torch.uint8)
mask.scatter_(1, topk_indices, False if hasattr(torch, 'bool') else 0) # False там, где top-k индексы
masked_logits[mask] = float('-inf')
mask = torch.ones_like(
logits_scaled,
dtype=torch.bool if hasattr(torch, "bool") else torch.uint8,
)
mask.scatter_(
1, topk_indices, False if hasattr(torch, "bool") else 0
) # False там, где top-k индексы
masked_logits[mask] = float("-inf")
logits_scaled = masked_logits
@@ -260,36 +272,42 @@ class GPT(BaseModel):
# 1. Применим softmax, чтобы получить вероятности:
probs = F.softmax(logits_scaled, dim=-1) # [B, vocab_size]
# 2. Отсортируем токены по убыванию вероятностей:
sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
sorted_probs, sorted_indices = torch.sort(
probs, descending=True, dim=-1
)
# 3. Посчитаем кумулятивную сумму вероятностей:
cum_probs = torch.cumsum(sorted_probs, dim=-1) # [B, vocab_size]
# 4. Определим маску: оставить токены, пока сумма < top_p
sorted_mask = (cum_probs <= top_p) # [B, vocab_size]
sorted_mask = cum_probs <= top_p # [B, vocab_size]
# Гарантируем, что хотя бы первый токен останется
sorted_mask[:, 0] = True
# 5. Преобразуем маску обратно в оригинальный порядок:
# Создаём полную маску из False
mask = torch.zeros_like(probs, dtype=torch.bool if hasattr(torch, 'bool') else torch.uint8)
mask = torch.zeros_like(
probs, dtype=torch.bool if hasattr(torch, "bool") else torch.uint8
)
# Устанавливаем True в местах нужных токенов
mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
# 6. Зануляем логиты токенов вне топ-p:
logits_scaled[~mask] = float('-inf')
logits_scaled[~mask] = float("-inf")
# 4. Применяем Softmax
probs = F.softmax(logits_scaled, dim=-1) # [batch_size, vocab_size]
if do_sample == True:
# 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
next_token = torch.multinomial(probs, num_samples=1) # [batch_size, 1]
else:
# 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
next_token = torch.argmax(probs, dim=-1, keepdim=True) # [batch_size, 1]
next_token = torch.argmax(
probs, dim=-1, keepdim=True
) # [batch_size, 1]
# 6. Добавляем его к последовательности
x = torch.cat([x, next_token], dim=1) # [batch_size, seq_len+1]
return x
# def generate(self, input_ids, max_length=50):
# for _ in range(max_length):
# logits = self.forward(input_ids)

View File

@@ -27,6 +27,7 @@ from llm.core.positional_embeddings import PositionalEmbeddings
from llm.core.cached_decoder import CachedDecoder
from llm.core.feed_forward import FeedForward
class GPT2(BaseModel):
"""
GPT2 — автогерессивная языковая модель, архитектура Transformer, предложенная OpenAI.
@@ -35,7 +36,7 @@ class GPT2(BaseModel):
- Масштабируемый автогерессивный трансформер для предсказания токенов слева направо.
- Главное отличие от классической GPT: порядок layer normalization ПЕРЕД attention и FFN.
- Используется GELU, efficient KV-cache, несет наследие классической GPT, но делает архитектуру глубже/шире.
Args:
config (dict): параметры архитектуры (vocab_size, embed_dim, num_heads, num_layers, max_position_embeddings, dropout)
@@ -44,37 +45,43 @@ class GPT2(BaseModel):
>>> logits = model(input_ids)
>>> out = model.generate(input_ids, max_length=20)
"""
def __init__(self, config):
super().__init__(config)
# Инициализация слоев
self._max_seq_len = config["max_position_embeddings"]
self._token_embeddings = TokenEmbeddings(
vocab_size=config["vocab_size"],
emb_size=config["embed_dim"]
vocab_size=config["vocab_size"], emb_size=config["embed_dim"]
)
self._position_embeddings = PositionalEmbeddings(
max_seq_len=config["max_position_embeddings"],
emb_size=config["embed_dim"]
max_seq_len=config["max_position_embeddings"], emb_size=config["embed_dim"]
)
self._dropout = nn.Dropout(config["dropout"])
# head_size = emb_size // num_heads
self._decoders = nn.ModuleList([CachedDecoder(
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
feed_forward_layer=FeedForward(
emb_size=config["embed_dim"],
dropout=config["dropout"],
activation="gelu"
),
max_seq_len=config["max_position_embeddings"],
dropout=config["dropout"]
) for _ in range(config["num_layers"])])
self._decoders = nn.ModuleList(
[
CachedDecoder(
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
feed_forward_layer=FeedForward(
emb_size=config["embed_dim"],
dropout=config["dropout"],
activation="gelu",
),
max_seq_len=config["max_position_embeddings"],
dropout=config["dropout"],
)
for _ in range(config["num_layers"])
]
)
self._norm = nn.LayerNorm(config["embed_dim"])
self._linear = nn.Linear(config["embed_dim"], config["vocab_size"])
def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
def forward(
self, x: torch.Tensor, use_cache: bool = True, cache: list = None
) -> tuple:
"""
Прямой проход GPT2:
- Все слои работают как autoregressive transformer (masked self-attention).
@@ -91,9 +98,10 @@ class GPT2(BaseModel):
"""
# Проверка длины последовательности (только при отсутствии кэша)
if cache is None and x.size(1) > self._max_seq_len:
raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
raise ValueError(
f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}"
)
# Вычисление start_pos из кэша (если кэш передан)
if cache is not None:
# При кэше обрабатываем только один токен (последний)
@@ -111,11 +119,15 @@ class GPT2(BaseModel):
# Эмбеддинги токенов и позиций
tok_out = self._token_embeddings(x) # [batch, seq_len, emb_size]
pos_out = self._position_embeddings(seq_len, start_pos=start_pos) # [seq_len, emb_size]
pos_out = self._position_embeddings(
seq_len, start_pos=start_pos
) # [seq_len, emb_size]
# Комбинирование
out = self._dropout(tok_out + pos_out.unsqueeze(0)) # [batch, seq_len, emb_size]
out = self._dropout(
tok_out + pos_out.unsqueeze(0)
) # [batch, seq_len, emb_size]
# Стек декодеров с передачей кэша
new_cache = []
for i, decoder in enumerate(self._decoders):
@@ -131,21 +143,22 @@ class GPT2(BaseModel):
out = self._norm(out)
logits = self._linear(out)
# Возвращаем результат с учетом use_cache
if use_cache:
return (logits, new_cache)
else:
return (logits, None)
def generate(self,
x: torch.Tensor,
max_new_tokens: int,
def generate(
self,
x: torch.Tensor,
max_new_tokens: int,
do_sample: bool,
temperature: float = 1.0,
top_k: int = None,
top_p: float = None,
use_cache: bool = True
use_cache: bool = True,
) -> torch.Tensor:
"""
Генерация текста с использованием autoregressive трансформера (GPT2).
@@ -174,10 +187,10 @@ class GPT2(BaseModel):
else:
# Первая итерация или кэш отключен - передаем всю последовательность
x_input = x
# Прямой проход с кэшем
logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
# Обновляем кэш для следующей итерации
if use_cache:
cache = new_cache
@@ -200,7 +213,7 @@ class GPT2(BaseModel):
# создаём маску: 1, если токен НЕ в topk_indices
mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
mask.scatter_(1, topk_indices, 0) # 0 там, где top-k индексы
masked_logits[mask.byte()] = float('-inf')
masked_logits[mask.byte()] = float("-inf")
logits_scaled = masked_logits
@@ -208,7 +221,9 @@ class GPT2(BaseModel):
# 1. Применим softmax, чтобы получить вероятности:
probs = F.softmax(logits_scaled, dim=-1) # [B, vocab_size]
# 2. Отсортируем токены по убыванию вероятностей:
sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
sorted_probs, sorted_indices = torch.sort(
probs, descending=True, dim=-1
)
# 3. Посчитаем кумулятивную сумму вероятностей:
cum_probs = torch.cumsum(sorted_probs, dim=-1) # [B, vocab_size]
# 4. Определим маску: оставить токены, пока сумма < top_p
@@ -221,23 +236,24 @@ class GPT2(BaseModel):
# Устанавливаем 1 в местах нужных токенов
mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
# 6. Зануляем логиты токенов вне топ-p:
logits_scaled[~mask] = float('-inf')
logits_scaled[~mask] = float("-inf")
# 4. Применяем Softmax
probs = F.softmax(logits_scaled, dim=-1) # [batch_size, vocab_size]
if do_sample == True:
# 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
next_token = torch.multinomial(probs, num_samples=1) # [batch_size, 1]
else:
# 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
next_token = torch.argmax(probs, dim=-1, keepdim=True) # [batch_size, 1]
next_token = torch.argmax(
probs, dim=-1, keepdim=True
) # [batch_size, 1]
# 6. Добавляем его к последовательности
x = torch.cat([x, next_token], dim=1) # [batch_size, seq_len+1]
return x
@property
def max_seq_len(self) -> int:
return self._max_seq_len
return self._max_seq_len

View File

@@ -10,7 +10,6 @@ from llm.core.rope import RoPE
from llm.core.cached_decoder import CachedDecoder
class Llama(BaseModel):
"""
LLaMA (Large Language Model Meta AI) — высокоэффективная масштабируемая языковая модель, разработанная Meta AI Research.
@@ -29,38 +28,45 @@ class Llama(BaseModel):
>>> logits, cache = model(input_ids, use_cache=True)
>>> out = model.generate(input_ids, max_new_tokens=20)
"""
def __init__(self,config):
def __init__(self, config):
super().__init__(config)
# Инициализация слоев
self._max_seq_len = config["max_position_embeddings"]
self._token_embeddings = TokenEmbeddings(
vocab_size=config["vocab_size"],
emb_size=config["embed_dim"]
vocab_size=config["vocab_size"], emb_size=config["embed_dim"]
)
self._position_embeddings = RoPE(
head_size=config["embed_dim"] // config["num_heads"],
max_seq_len=config["max_position_embeddings"]
max_seq_len=config["max_position_embeddings"],
)
self._dropout = nn.Dropout(config["dropout"])
self._decoders = nn.ModuleList([CachedDecoder(
norm_layer=RMSNorm,
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
feed_forward_layer=SwiGLU(
emb_size=config["embed_dim"],
dropout=config["dropout"],
),
max_seq_len=config["max_position_embeddings"],
rope=self._position_embeddings,
dropout=config["dropout"],
) for _ in range(config["num_layers"])])
self._decoders = nn.ModuleList(
[
CachedDecoder(
norm_layer=RMSNorm,
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
feed_forward_layer=SwiGLU(
emb_size=config["embed_dim"],
dropout=config["dropout"],
),
max_seq_len=config["max_position_embeddings"],
rope=self._position_embeddings,
dropout=config["dropout"],
)
for _ in range(config["num_layers"])
]
)
self._norm = RMSNorm(config["embed_dim"])
self._linear = nn.Linear(config["embed_dim"], config["vocab_size"])
def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
def forward(
self, x: torch.Tensor, use_cache: bool = True, cache: list = None
) -> tuple:
"""
Прямой проход через LLaMA (inference/train): авторегрессионное предсказание токенов.
@@ -76,11 +82,12 @@ class Llama(BaseModel):
"""
# Проверка длины последовательности (только при отсутствии кэша)
if cache is None and x.size(1) > self._max_seq_len:
raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
raise ValueError(
f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}"
)
# Вычисление start_pos из кэша (если кэш передан)
#if cache is not None:
# if cache is not None:
# # При кэше обрабатываем только один токен (последний)
# seq_len = 1
# # Вычисляем start_pos из самого нижнего уровня кэша
@@ -89,18 +96,18 @@ class Llama(BaseModel):
# start_pos = key_cache.size(1) # cache_len
# else:
# start_pos = 0
#else:
# else:
# # Без кэша работаем как раньше
# start_pos = 0
# seq_len = x.size(1)
# Эмбеддинги токенов и позиций
tok_out = self._token_embeddings(x) # [batch, seq_len, emb_size]
#pos_out = self._position_embeddings(x) # [batch, seq_len, emb_size]
# pos_out = self._position_embeddings(x) # [batch, seq_len, emb_size]
# Комбинирование
out = self._dropout(tok_out) # [batch, seq_len, emb_size]
# Стек декодеров с передачей кэша
new_cache = []
for i, decoder in enumerate(self._decoders):
@@ -116,42 +123,43 @@ class Llama(BaseModel):
out = self._norm(out)
logits = self._linear(out)
# Возвращаем результат с учетом use_cache
if use_cache:
return (logits, new_cache)
else:
return (logits, None)
def generate(self,
x: torch.Tensor,
max_new_tokens: int,
def generate(
self,
x: torch.Tensor,
max_new_tokens: int,
do_sample: bool,
temperature: float = 1.0,
top_k: int = None,
top_p: float = None,
use_cache: bool = True
use_cache: bool = True,
) -> torch.Tensor:
"""
Генерация текста c помощью LLaMA (autoregressive Transformer).
Поддерживается:
- greedy и вероятностное сэмплирование (top-k, top-p, temperature)
- кэш attention для ускорения генерации длинных последовательностей
Генерация текста c помощью LLaMA (autoregressive Transformer).
Поддерживается:
- greedy и вероятностное сэмплирование (top-k, top-p, temperature)
- кэш attention для ускорения генерации длинных последовательностей
Args:
x (Tensor[int]): начальная последовательность [batch, seq_len]
max_new_tokens (int): сколько новых токенов сгенерировать
do_sample (bool): использовать стохастику (True) или жадный выбор (False)
temperature (float): масштаб для softmax (важно для sampling)
top_k (int|None): ограничение на количество кандидатов (top-k sampling)
top_p (float|None): nucleus sampling
use_cache (bool): ускоряет autoregressive при длинной генерации
Returns:
output (Tensor[int]): [batch, seq_len + max_new_tokens]
Пример:
>>> prompt = tokenizer.encode('Meta AI', return_tensors="pt")
>>> generated = model.generate(prompt, max_new_tokens=30, do_sample=True)
>>> print(tokenizer.decode(generated[0]))
Args:
x (Tensor[int]): начальная последовательность [batch, seq_len]
max_new_tokens (int): сколько новых токенов сгенерировать
do_sample (bool): использовать стохастику (True) или жадный выбор (False)
temperature (float): масштаб для softmax (важно для sampling)
top_k (int|None): ограничение на количество кандидатов (top-k sampling)
top_p (float|None): nucleus sampling
use_cache (bool): ускоряет autoregressive при длинной генерации
Returns:
output (Tensor[int]): [batch, seq_len + max_new_tokens]
Пример:
>>> prompt = tokenizer.encode('Meta AI', return_tensors="pt")
>>> generated = model.generate(prompt, max_new_tokens=30, do_sample=True)
>>> print(tokenizer.decode(generated[0]))
"""
cache = None
@@ -162,10 +170,10 @@ class Llama(BaseModel):
else:
# Первая итерация или кэш отключен - передаем всю последовательность
x_input = x
# Прямой проход с кэшем
logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
# Обновляем кэш для следующей итерации
if use_cache:
cache = new_cache
@@ -188,7 +196,7 @@ class Llama(BaseModel):
# создаём маску: 1, если токен НЕ в topk_indices
mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
mask.scatter_(1, topk_indices, 0) # 0 там, где top-k индексы
masked_logits[mask.byte()] = float('-inf')
masked_logits[mask.byte()] = float("-inf")
logits_scaled = masked_logits
@@ -196,7 +204,9 @@ class Llama(BaseModel):
# 1. Применим softmax, чтобы получить вероятности:
probs = F.softmax(logits_scaled, dim=-1) # [B, vocab_size]
# 2. Отсортируем токены по убыванию вероятностей:
sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
sorted_probs, sorted_indices = torch.sort(
probs, descending=True, dim=-1
)
# 3. Посчитаем кумулятивную сумму вероятностей:
cum_probs = torch.cumsum(sorted_probs, dim=-1) # [B, vocab_size]
# 4. Определим маску: оставить токены, пока сумма < top_p
@@ -209,25 +219,24 @@ class Llama(BaseModel):
# Устанавливаем 1 в местах нужных токенов
mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
# 6. Зануляем логиты токенов вне топ-p:
logits_scaled[~mask] = float('-inf')
logits_scaled[~mask] = float("-inf")
# 4. Применяем Softmax
probs = F.softmax(logits_scaled, dim=-1) # [batch_size, vocab_size]
if do_sample == True:
# 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
next_token = torch.multinomial(probs, num_samples=1) # [batch_size, 1]
else:
# 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
next_token = torch.argmax(probs, dim=-1, keepdim=True) # [batch_size, 1]
next_token = torch.argmax(
probs, dim=-1, keepdim=True
) # [batch_size, 1]
# 6. Добавляем его к последовательности
x = torch.cat([x, next_token], dim=1) # [batch_size, seq_len+1]
return x
@property
def max_seq_len(self) -> int:
return self._max_seq_len
return self._max_seq_len

View File

@@ -10,92 +10,94 @@ import json
class BaseTokenizer(ABC):
"""
Абстрактный базовый класс для всех токенизаторов.
Определяет общий интерфейс для токенизации текста.
"""
def __init__(self):
self.vocab: Dict[str, int] = {}
self.inverse_vocab: Dict[int, str] = {}
self.vocab_size: int = 0
# Специальные токены
self.pad_token = "<pad>"
self.unk_token = "<unk>"
self.bos_token = "<bos>"
self.eos_token = "<eos>"
self.pad_token_id: Optional[int] = None
self.unk_token_id: Optional[int] = None
self.bos_token_id: Optional[int] = None
self.eos_token_id: Optional[int] = None
@abstractmethod
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""
Обучение токенизатора на текстах.
Args:
texts: Список текстов для обучения
vocab_size: Желаемый размер словаря
**kwargs: Дополнительные параметры обучения
"""
pass
@abstractmethod
def encode(self, text: str, **kwargs) -> List[int]:
"""
Кодирование текста в последовательность токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры кодирования
Returns:
List[int]: Список идентификаторов токенов
"""
pass
@abstractmethod
def decode(self, tokens: List[int], **kwargs) -> str:
"""
Декодирование последовательности токенов в текст.
Args:
tokens: Список идентификаторов токенов
**kwargs: Дополнительные параметры декодирования
Returns:
str: Декодированный текст
"""
pass
def tokenize(self, text: str, **kwargs) -> List[str]:
"""
Токенизация текста в список строковых токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры
Returns:
List[str]: Список токенов
"""
token_ids = self.encode(text, **kwargs)
return [self.inverse_vocab.get(token_id, self.unk_token) for token_id in token_ids]
return [
self.inverse_vocab.get(token_id, self.unk_token) for token_id in token_ids
]
def get_vocab(self) -> Dict[str, int]:
"""Возвращает словарь токенизатора."""
return self.vocab.copy()
def get_vocab_size(self) -> int:
"""Возвращает размер словаря."""
return self.vocab_size
def add_special_tokens(self, special_tokens: List[str]):
"""
Добавляет специальные токены в словарь.
Args:
special_tokens: Список специальных токенов
"""
@@ -105,70 +107,70 @@ class BaseTokenizer(ABC):
self.vocab[token] = token_id
self.inverse_vocab[token_id] = token
self.vocab_size += 1
# Обновляем ID специальных токенов
self.pad_token_id = self.vocab.get(self.pad_token)
self.unk_token_id = self.vocab.get(self.unk_token)
self.bos_token_id = self.vocab.get(self.bos_token)
self.eos_token_id = self.vocab.get(self.eos_token)
def save(self, filepath: str):
"""
Сохраняет токенизатор в файл.
Args:
filepath: Путь для сохранения
"""
config = {
'vocab': self.vocab,
'vocab_size': self.vocab_size,
'pad_token': self.pad_token,
'unk_token': self.unk_token,
'bos_token': self.bos_token,
'eos_token': self.eos_token,
'tokenizer_type': self.__class__.__name__
"vocab": self.vocab,
"vocab_size": self.vocab_size,
"pad_token": self.pad_token,
"unk_token": self.unk_token,
"bos_token": self.bos_token,
"eos_token": self.eos_token,
"tokenizer_type": self.__class__.__name__,
}
with open(filepath, 'w', encoding='utf-8') as f:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, filepath: str):
"""
Загружает токенизатор из файла.
Args:
filepath: Путь к файлу
Returns:
BaseTokenizer: Загруженный токенизатор
"""
with open(filepath, 'r', encoding='utf-8') as f:
with open(filepath, "r", encoding="utf-8") as f:
config = json.load(f)
# Создаем экземпляр токенизатора
tokenizer = cls()
tokenizer.vocab = config['vocab']
tokenizer.vocab_size = config['vocab_size']
tokenizer.pad_token = config['pad_token']
tokenizer.unk_token = config['unk_token']
tokenizer.bos_token = config['bos_token']
tokenizer.eos_token = config['eos_token']
tokenizer.vocab = config["vocab"]
tokenizer.vocab_size = config["vocab_size"]
tokenizer.pad_token = config["pad_token"]
tokenizer.unk_token = config["unk_token"]
tokenizer.bos_token = config["bos_token"]
tokenizer.eos_token = config["eos_token"]
# Создаем обратный словарь
tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()}
# Обновляем ID специальных токенов
tokenizer.pad_token_id = tokenizer.vocab.get(tokenizer.pad_token)
tokenizer.unk_token_id = tokenizer.vocab.get(tokenizer.unk_token)
tokenizer.bos_token_id = tokenizer.vocab.get(tokenizer.bos_token)
tokenizer.eos_token_id = tokenizer.vocab.get(tokenizer.eos_token)
return tokenizer
def __len__(self) -> int:
"""Возвращает размер словаря."""
return self.vocab_size
def __repr__(self) -> str:
return f"{self.__class__.__name__}(vocab_size={self.vocab_size})"

View File

@@ -13,26 +13,26 @@ from .base_tokenizer import BaseTokenizer
class BPETokenizer(BaseTokenizer):
"""
BPE токенизатор для обработки текста.
Реализует алгоритм Byte Pair Encoding для создания субсловных токенов.
Примеры использования:
>>> tokenizer = BPETokenizer()
>>> tokenizer.train(["пример текста для обучения"], vocab_size=1000)
>>> tokens = tokenizer.encode("новый текст")
>>> text = tokenizer.decode(tokens)
"""
def __init__(self):
super().__init__()
self.merges: Dict[Tuple[str, str], int] = {}
self.pattern = r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
self.compiled_pattern = re.compile(self.pattern, re.UNICODE)
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""
Обучение BPE токенизатора на текстах.
Args:
texts: Список текстов для обучения
vocab_size: Желаемый размер словаря
@@ -42,37 +42,40 @@ class BPETokenizer(BaseTokenizer):
"""
# Инициализация базового словаря
self._initialize_vocab()
# Добавляем специальные токены если указаны
special_tokens = kwargs.get('special_tokens', [self.pad_token, self.unk_token, self.bos_token, self.eos_token])
special_tokens = kwargs.get(
"special_tokens",
[self.pad_token, self.unk_token, self.bos_token, self.eos_token],
)
self.add_special_tokens(special_tokens)
# Предобработка текстов
words = self._preprocess_texts(texts)
# Получаем начальные токены
vocab = self._get_initial_vocab(words)
# Выполняем BPE мерджи
self._perform_merges(vocab, vocab_size, kwargs.get('min_frequency', 2))
self._perform_merges(vocab, vocab_size, kwargs.get("min_frequency", 2))
# Строим финальный словарь
self._build_final_vocab()
def _initialize_vocab(self):
"""Инициализирует базовый словарь."""
self.vocab.clear()
self.inverse_vocab.clear()
self.merges.clear()
self.vocab_size = 0
def _preprocess_texts(self, texts: List[str]) -> List[List[str]]:
"""
Предобработка текстов для обучения.
Args:
texts: Список текстов
Returns:
List[List[str]]: Предобработанные слова
"""
@@ -84,14 +87,14 @@ class BPETokenizer(BaseTokenizer):
tokens = self.compiled_pattern.findall(text)
words.append(tokens)
return words
def _get_initial_vocab(self, words: List[List[str]]) -> Dict[str, int]:
"""
Создает начальный словарь из символов.
Args:
words: Список токенизированных текстов
Returns:
Dict[str, int]: Начальный словарь частот
"""
@@ -99,43 +102,45 @@ class BPETokenizer(BaseTokenizer):
for word_list in words:
for word in word_list:
# Разбиваем слово на символы и добавляем специальный символ конца слова
chars = list(word) + ['</w>']
vocab.update([''.join(chars[i:i+1]) for i in range(len(chars))])
chars = list(word) + ["</w>"]
vocab.update(["".join(chars[i : i + 1]) for i in range(len(chars))])
return vocab
def _perform_merges(self, vocab: Dict[str, int], target_vocab_size: int, min_frequency: int):
def _perform_merges(
self, vocab: Dict[str, int], target_vocab_size: int, min_frequency: int
):
"""
Выполняет BPE мерджи до достижения целевого размера словаря.
Args:
vocab: Начальный словарь
target_vocab_size: Целевой размер словаря
min_frequency: Минимальная частота для мерджа
"""
current_vocab_size = len(vocab) + len(self.vocab)
while current_vocab_size < target_vocab_size:
# Находим наиболее частую пару
pairs = self._get_stats(vocab)
if not pairs:
break
best_pair = max(pairs, key=pairs.get)
if pairs[best_pair] < min_frequency:
break
# Выполняем мердж
vocab = self._merge_vocab(vocab, best_pair)
self.merges[best_pair] = len(self.merges)
current_vocab_size += 1
def _get_stats(self, vocab: Dict[str, int]) -> Dict[Tuple[str, str], int]:
"""
Собирает статистику по парам символов.
Args:
vocab: Словарь токенов
Returns:
Dict[Tuple[str, str], int]: Частоты пар
"""
@@ -145,107 +150,113 @@ class BPETokenizer(BaseTokenizer):
for i in range(len(symbols) - 1):
pairs[symbols[i], symbols[i + 1]] += freq
return pairs
def _merge_vocab(self, vocab: Dict[str, int], pair: Tuple[str, str]) -> Dict[str, int]:
def _merge_vocab(
self, vocab: Dict[str, int], pair: Tuple[str, str]
) -> Dict[str, int]:
"""
Объединяет пару символов в словаре.
Args:
vocab: Исходный словарь
pair: Пара для объединения
Returns:
Dict[str, int]: Обновленный словарь
"""
new_vocab = {}
bigram = re.compile(r'(?<!\\S)' + re.escape(pair[0]) + r' ' + re.escape(pair[1]) + r'(?!\\S)')
bigram = re.compile(
r"(?<!\\S)" + re.escape(pair[0]) + r" " + re.escape(pair[1]) + r"(?!\\S)"
)
replacement = pair[0] + pair[1]
for word in vocab:
new_word = bigram.sub(replacement, word)
new_vocab[new_word] = vocab[word]
return new_vocab
def _build_final_vocab(self):
"""Строит финальный словарь токенизатора."""
# Собираем все уникальные токены из мерджей
all_tokens = set()
# Добавляем специальные токены
all_tokens.update([self.pad_token, self.unk_token, self.bos_token, self.eos_token])
all_tokens.update(
[self.pad_token, self.unk_token, self.bos_token, self.eos_token]
)
# Добавляем токены из мерджей
for pair in self.merges:
all_tokens.update(pair)
# Создаем словарь
for i, token in enumerate(sorted(all_tokens)):
self.vocab[token] = i
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
self.vocab_size = len(self.vocab)
# Обновляем ID специальных токенов
self.pad_token_id = self.vocab.get(self.pad_token)
self.unk_token_id = self.vocab.get(self.unk_token)
self.bos_token_id = self.vocab.get(self.bos_token)
self.eos_token_id = self.vocab.get(self.eos_token)
def encode(self, text: str, **kwargs) -> List[int]:
"""
Кодирует текст в последовательность токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры
- add_special_tokens: Добавлять специальные токены
Returns:
List[int]: Список идентификаторов токенов
"""
add_special_tokens = kwargs.get('add_special_tokens', False)
add_special_tokens = kwargs.get("add_special_tokens", False)
# Токенизация текста
tokens = self.compiled_pattern.findall(text)
# Применяем BPE к каждому токену
bpe_tokens = []
for token in tokens:
# Преобразуем токен в BPE представление
bpe_token = self._apply_bpe(token)
bpe_tokens.extend(bpe_token)
# Конвертируем в ID
token_ids = []
for token in bpe_tokens:
token_id = self.vocab.get(token, self.unk_token_id)
if token_id is not None:
token_ids.append(token_id)
# Добавляем специальные токены если нужно
if add_special_tokens:
if self.bos_token_id is not None:
token_ids.insert(0, self.bos_token_id)
if self.eos_token_id is not None:
token_ids.append(self.eos_token_id)
return token_ids
def _apply_bpe(self, token: str) -> List[str]:
"""
Применяет BPE к одному токену.
Args:
token: Входной токен
Returns:
List[str]: Список BPE токенов
"""
# Простая реализация - в реальной реализации нужно применять обученные мерджи
word = token + '</w>'
tokens = [word[i:i+1] for i in range(len(word))]
word = token + "</w>"
tokens = [word[i : i + 1] for i in range(len(word))]
# Применяем мерджи (упрощенная версия)
# В полной реализации нужно применять все обученные мерджи
for pair in self.merges:
@@ -256,109 +267,114 @@ class BPETokenizer(BaseTokenizer):
del tokens[i + 1]
else:
i += 1
return tokens
def decode(self, tokens: List[int], **kwargs) -> str:
"""
Декодирует последовательность токенов в текст.
Args:
tokens: Список идентификаторов токенов
**kwargs: Дополнительные параметры
- skip_special_tokens: Пропускать специальные токены
Returns:
str: Декодированный текст
"""
skip_special_tokens = kwargs.get('skip_special_tokens', True)
skip_special_tokens = kwargs.get("skip_special_tokens", True)
# Конвертируем ID в токены
token_strings = []
for token_id in tokens:
token = self.inverse_vocab.get(token_id, self.unk_token)
# Пропускаем специальные токены если нужно
if skip_special_tokens and token in [self.pad_token, self.unk_token, self.bos_token, self.eos_token]:
if skip_special_tokens and token in [
self.pad_token,
self.unk_token,
self.bos_token,
self.eos_token,
]:
continue
token_strings.append(token)
# Объединяем токены в текст
text = ''.join(token_strings)
text = "".join(token_strings)
# Убираем маркер конца слова
text = text.replace('</w>', ' ')
text = text.replace("</w>", " ")
return text.strip()
def save(self, filepath: str):
"""
Сохраняет BPE токенизатор в файл.
Args:
filepath: Путь для сохранения
"""
import json
config = {
'vocab': self.vocab,
'merges': {f"{k[0]} {k[1]}": v for k, v in self.merges.items()},
'vocab_size': self.vocab_size,
'pad_token': self.pad_token,
'unk_token': self.unk_token,
'bos_token': self.bos_token,
'eos_token': self.eos_token,
'pattern': self.pattern,
'tokenizer_type': self.__class__.__name__
"vocab": self.vocab,
"merges": {f"{k[0]} {k[1]}": v for k, v in self.merges.items()},
"vocab_size": self.vocab_size,
"pad_token": self.pad_token,
"unk_token": self.unk_token,
"bos_token": self.bos_token,
"eos_token": self.eos_token,
"pattern": self.pattern,
"tokenizer_type": self.__class__.__name__,
}
with open(filepath, 'w', encoding='utf-8') as f:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, filepath: str):
"""
Загружает BPE токенизатор из файла.
Args:
filepath: Путь к файлу
Returns:
BPETokenizer: Загруженный токенизатор
"""
import json
with open(filepath, 'r', encoding='utf-8') as f:
with open(filepath, "r", encoding="utf-8") as f:
config = json.load(f)
tokenizer = cls()
tokenizer.vocab = config['vocab']
tokenizer.vocab_size = config['vocab_size']
tokenizer.pad_token = config['pad_token']
tokenizer.unk_token = config['unk_token']
tokenizer.bos_token = config['bos_token']
tokenizer.eos_token = config['eos_token']
tokenizer.pattern = config.get('pattern', tokenizer.pattern)
tokenizer.vocab = config["vocab"]
tokenizer.vocab_size = config["vocab_size"]
tokenizer.pad_token = config["pad_token"]
tokenizer.unk_token = config["unk_token"]
tokenizer.bos_token = config["bos_token"]
tokenizer.eos_token = config["eos_token"]
tokenizer.pattern = config.get("pattern", tokenizer.pattern)
tokenizer.compiled_pattern = re.compile(tokenizer.pattern, re.UNICODE)
# Восстанавливаем мерджи
merges = config.get('merges', {})
merges = config.get("merges", {})
tokenizer.merges = {}
for k, v in merges.items():
parts = k.split()
if len(parts) == 2:
tokenizer.merges[(parts[0], parts[1])] = v
# Создаем обратный словарь
tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()}
# Обновляем ID специальных токенов
tokenizer.pad_token_id = tokenizer.vocab.get(tokenizer.pad_token)
tokenizer.unk_token_id = tokenizer.vocab.get(tokenizer.unk_token)
tokenizer.bos_token_id = tokenizer.vocab.get(tokenizer.bos_token)
tokenizer.eos_token_id = tokenizer.vocab.get(tokenizer.eos_token)
return tokenizer
@@ -367,62 +383,72 @@ class SimpleBPETokenizer(BPETokenizer):
"""
Упрощенная версия BPE токенизатора для демонстрации.
"""
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""Упрощенное обучение для демонстрации."""
# Инициализация базового словаря
self._initialize_vocab()
# Добавляем базовые токены
special_tokens = [self.pad_token, self.unk_token, self.bos_token, self.eos_token]
special_tokens = [
self.pad_token,
self.unk_token,
self.bos_token,
self.eos_token,
]
self.add_special_tokens(special_tokens)
# Простая реализация - собираем все символы
all_chars = set()
for text in texts:
all_chars.update(text)
# Добавляем символы в словарь
for char in sorted(all_chars):
if char not in self.vocab:
self.vocab[char] = len(self.vocab)
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
self.vocab_size = len(self.vocab)
# Обновляем ID специальных токенов
self.pad_token_id = self.vocab.get(self.pad_token)
self.unk_token_id = self.vocab.get(self.unk_token)
self.bos_token_id = self.vocab.get(self.bos_token)
self.eos_token_id = self.vocab.get(self.eos_token)
def encode(self, text: str, **kwargs) -> List[int]:
"""Упрощенное кодирование - разбиваем на символы."""
add_special_tokens = kwargs.get('add_special_tokens', False)
add_special_tokens = kwargs.get("add_special_tokens", False)
token_ids = []
for char in text:
token_id = self.vocab.get(char, self.unk_token_id)
if token_id is not None:
token_ids.append(token_id)
if add_special_tokens:
if self.bos_token_id is not None:
token_ids.insert(0, self.bos_token_id)
if self.eos_token_id is not None:
token_ids.append(self.eos_token_id)
return token_ids
def decode(self, tokens: List[int], **kwargs) -> str:
"""Упрощенное декодирование."""
skip_special_tokens = kwargs.get('skip_special_tokens', True)
skip_special_tokens = kwargs.get("skip_special_tokens", True)
chars = []
for token_id in tokens:
char = self.inverse_vocab.get(token_id, self.unk_token)
if skip_special_tokens and char in [self.pad_token, self.unk_token, self.bos_token, self.eos_token]:
if skip_special_tokens and char in [
self.pad_token,
self.unk_token,
self.bos_token,
self.eos_token,
]:
continue
chars.append(char)
return ''.join(chars)
return "".join(chars)

View File

@@ -11,26 +11,26 @@ from .base_tokenizer import BaseTokenizer
class BPETokenizer(BaseTokenizer):
"""
BPE токенизатор для обработки текста.
Реализует алгоритм Byte Pair Encoding для создания субсловных токенов.
Использует вашу реализацию BPE.
Примеры использования:
>>> tokenizer = BPETokenizer()
>>> tokenizer.train(["пример текста для обучения"], vocab_size=1000)
>>> tokens = tokenizer.encode("новый текст")
>>> text = tokenizer.decode(tokens)
"""
def __init__(self):
super().__init__()
self.merges: Dict[Tuple[str, str], int] = {}
self.vocab_list: List[str] = []
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""
Обучение BPE токенизатора на текстах.
Args:
texts: Список текстов для обучения
vocab_size: Желаемый размер словаря
@@ -39,7 +39,7 @@ class BPETokenizer(BaseTokenizer):
"""
# Объединяем все тексты в одну строку для обучения
combined_text = " ".join(texts)
# 1. Получаем уникальные токены (символы)
unique_tokens = sorted(set(combined_text))
tokens = unique_tokens.copy()
@@ -61,7 +61,10 @@ class BPETokenizer(BaseTokenizer):
break # нет пар — выходим
# Находим самую частую пару (в случае равенства — та, что встретилась первой)
most_frequent_pair = max(pair_freq.items(), key=lambda x: (x[1], -self._pair_first_index(sequence, x[0])))[0]
most_frequent_pair = max(
pair_freq.items(),
key=lambda x: (x[1], -self._pair_first_index(sequence, x[0])),
)[0]
# Создаем новый токен
new_token = most_frequent_pair[0] + most_frequent_pair[1]
@@ -71,45 +74,51 @@ class BPETokenizer(BaseTokenizer):
new_sequence = []
while i < len(sequence):
if i < len(sequence) - 1 and (sequence[i], sequence[i + 1]) == most_frequent_pair:
if (
i < len(sequence) - 1
and (sequence[i], sequence[i + 1]) == most_frequent_pair
):
new_sequence.append(new_token)
i += 2 # пропускаем два символа — заменённую пару
else:
new_sequence.append(sequence[i])
i += 1
sequence = new_sequence
# 4. Создаем словари
self.vocab_list = tokens.copy()
self.vocab = dict(zip(tokens, range(vocab_size)))
self.inverse_vocab = dict(zip(range(vocab_size), tokens))
self.vocab_size = len(self.vocab)
# Добавляем специальные токены если указаны
special_tokens = kwargs.get('special_tokens', [self.pad_token, self.unk_token, self.bos_token, self.eos_token])
special_tokens = kwargs.get(
"special_tokens",
[self.pad_token, self.unk_token, self.bos_token, self.eos_token],
)
self.add_special_tokens(special_tokens)
def _pair_first_index(self, sequence, pair):
"""Находит первый индекс пары в последовательности."""
for i in range(len(sequence) - 1):
if (sequence[i], sequence[i + 1]) == pair:
return i
return float('inf') # если пара не найдена (в теории не должно случиться)
return float("inf") # если пара не найдена (в теории не должно случиться)
def encode(self, text: str, **kwargs) -> List[int]:
"""
Кодирует текст в последовательность токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры
- add_special_tokens: Добавлять специальные токены
Returns:
List[int]: Список идентификаторов токенов
"""
add_special_tokens = kwargs.get('add_special_tokens', False)
add_special_tokens = kwargs.get("add_special_tokens", False)
# 1. Разбиваем текст на токены-символы
sequence = list(text)
# 2. Инициализация пустого списка токенов
@@ -119,7 +128,9 @@ class BPETokenizer(BaseTokenizer):
while i < len(text):
# 3.1 Найти все токены в словаре, начинающиеся с text[i]
start_char = text[i]
result = [token for token in self.vocab_list if token.startswith(start_char)]
result = [
token for token in self.vocab_list if token.startswith(start_char)
]
# 3.2 Выбрать самый длинный подходящий токен
find_token = self._find_max_matching_token(text[i:], result)
if find_token is None:
@@ -134,19 +145,19 @@ class BPETokenizer(BaseTokenizer):
# 4. Заменить токены на их ID
token_ids = self._tokens_to_ids(tokens)
# Заменяем -1 на unk_token_id
token_ids = [tid if tid != -1 else self.unk_token_id for tid in token_ids]
# Добавляем специальные токены если нужно
if add_special_tokens:
if self.bos_token_id is not None:
token_ids.insert(0, self.bos_token_id)
if self.eos_token_id is not None:
token_ids.append(self.eos_token_id)
return token_ids
def _find_max_matching_token(self, text: str, tokens: list) -> Optional[str]:
"""Находит самый длинный токен из списка, с которого начинается текст"""
matching = [token for token in tokens if text.startswith(token)]
@@ -161,33 +172,41 @@ class BPETokenizer(BaseTokenizer):
else:
ids.append(-1) # Специальное значение
return ids
def decode(self, tokens: List[int], **kwargs) -> str:
"""
Декодирует последовательность токенов в текст.
Args:
tokens: Список идентификаторов токенов
**kwargs: Дополнительные параметры
- skip_special_tokens: Пропускать специальные токены
Returns:
str: Декодированный текст
"""
skip_special_tokens = kwargs.get('skip_special_tokens', True)
skip_special_tokens = kwargs.get("skip_special_tokens", True)
# Фильтруем специальные токены если нужно
if skip_special_tokens:
tokens = [tid for tid in tokens if tid not in [
self.pad_token_id, self.unk_token_id, self.bos_token_id, self.eos_token_id
]]
tokens = [
tid
for tid in tokens
if tid
not in [
self.pad_token_id,
self.unk_token_id,
self.bos_token_id,
self.eos_token_id,
]
]
# Конвертируем ID в токены
token_strings = self._ids_to_tokens(tokens)
# Объединяем токены в текст
return ''.join(token_strings)
return "".join(token_strings)
def _ids_to_tokens(self, ids: List[int]) -> List[str]:
"""Конвертирует список Ids в их tokens"""
tokens = []
@@ -197,76 +216,76 @@ class BPETokenizer(BaseTokenizer):
else:
tokens.append(self.unk_token) # Специальное значение
return tokens
def save(self, filepath: str):
"""
Сохраняет токенизатор в файл.
Args:
filepath: Путь для сохранения
"""
import json
# Преобразуем кортежи в строки для JSON сериализации
merges_serializable = {f"{k[0]},{k[1]}": v for k, v in self.merges.items()}
config = {
'vocab': self.vocab,
'vocab_size': self.vocab_size,
'pad_token': self.pad_token,
'unk_token': self.unk_token,
'bos_token': self.bos_token,
'eos_token': self.eos_token,
'tokenizer_type': self.__class__.__name__,
'merges': merges_serializable,
'vocab_list': self.vocab_list
"vocab": self.vocab,
"vocab_size": self.vocab_size,
"pad_token": self.pad_token,
"unk_token": self.unk_token,
"bos_token": self.bos_token,
"eos_token": self.eos_token,
"tokenizer_type": self.__class__.__name__,
"merges": merges_serializable,
"vocab_list": self.vocab_list,
}
with open(filepath, 'w', encoding='utf-8') as f:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, filepath: str):
"""
Загружает токенизатор из файла.
Args:
filepath: Путь к файлу
Returns:
BPETokenizer: Загруженный токенизатор
"""
import json
with open(filepath, 'r', encoding='utf-8') as f:
with open(filepath, "r", encoding="utf-8") as f:
config = json.load(f)
# Создаем экземпляр токенизатора
tokenizer = cls()
tokenizer.vocab = config['vocab']
tokenizer.vocab_size = config['vocab_size']
tokenizer.pad_token = config['pad_token']
tokenizer.unk_token = config['unk_token']
tokenizer.bos_token = config['bos_token']
tokenizer.eos_token = config['eos_token']
tokenizer.vocab_list = config['vocab_list']
tokenizer.vocab = config["vocab"]
tokenizer.vocab_size = config["vocab_size"]
tokenizer.pad_token = config["pad_token"]
tokenizer.unk_token = config["unk_token"]
tokenizer.bos_token = config["bos_token"]
tokenizer.eos_token = config["eos_token"]
tokenizer.vocab_list = config["vocab_list"]
# Восстанавливаем кортежи из строк
tokenizer.merges = {}
for k, v in config['merges'].items():
parts = k.split(',')
for k, v in config["merges"].items():
parts = k.split(",")
if len(parts) == 2:
tokenizer.merges[(parts[0], parts[1])] = v
# Создаем обратный словарь
tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()}
# Обновляем ID специальных токенов
tokenizer.pad_token_id = tokenizer.vocab.get(tokenizer.pad_token)
tokenizer.unk_token_id = tokenizer.vocab.get(tokenizer.unk_token)
tokenizer.bos_token_id = tokenizer.vocab.get(tokenizer.bos_token)
tokenizer.eos_token_id = tokenizer.vocab.get(tokenizer.eos_token)
return tokenizer
@@ -275,4 +294,5 @@ class SimpleBPETokenizer(BPETokenizer):
Упрощенная версия BPE токенизатора для демонстрации.
Наследует вашу реализацию, но может быть упрощена при необходимости.
"""
pass

View File

@@ -12,7 +12,7 @@ class TextDataset(Dataset):
def __init__(self, texts: List[str], tokenizer: Any, block_size: int = 128):
"""
Инициализация датасета.
Args:
texts: Список текстов для обучения
tokenizer: Токенизатор с методами encode/decode
@@ -25,15 +25,15 @@ class TextDataset(Dataset):
for text in texts:
# Кодируем текст в токены
input_ids = tokenizer.encode(text, add_special_tokens=False)
# Обрезаем или дополняем до нужной длины
if len(input_ids) > block_size:
input_ids = input_ids[:block_size]
else:
# Дополняем pad_token_id
pad_token_id = getattr(tokenizer, 'pad_token_id', 0)
pad_token_id = getattr(tokenizer, "pad_token_id", 0)
input_ids = input_ids + [pad_token_id] * (block_size - len(input_ids))
self.examples.append(input_ids)
def __len__(self):
@@ -50,33 +50,35 @@ class StreamingTextDataset(Dataset):
Датасет для потоковой обработки больших текстов.
Токенизация происходит на лету, что экономит память.
"""
def __init__(self, texts: List[str], tokenizer: Any, block_size: int = 128):
self.texts = texts
self.tokenizer = tokenizer
self.block_size = block_size
# Получаем pad_token_id из токенизатора
self.pad_token_id = getattr(tokenizer, 'pad_token_id', 0)
self.pad_token_id = getattr(tokenizer, "pad_token_id", 0)
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
# Токенизация на лету
input_ids = self.tokenizer.encode(text, add_special_tokens=False)
# Обрезаем или дополняем до нужной длины
if len(input_ids) > self.block_size:
input_ids = input_ids[:self.block_size]
input_ids = input_ids[: self.block_size]
else:
input_ids = input_ids + [self.pad_token_id] * (self.block_size - len(input_ids))
input_ids = input_ids + [self.pad_token_id] * (
self.block_size - len(input_ids)
)
input_ids = torch.tensor(input_ids, dtype=torch.long)
labels = input_ids.clone()
return {"input_ids": input_ids, "labels": labels}
@@ -84,9 +86,15 @@ class TextDatasetWithSpecialTokens(TextDataset):
"""
Расширенная версия TextDataset с поддержкой специальных токенов.
"""
def __init__(self, texts: List[str], tokenizer: Any, block_size: int = 128,
add_bos: bool = False, add_eos: bool = False):
def __init__(
self,
texts: List[str],
tokenizer: Any,
block_size: int = 128,
add_bos: bool = False,
add_eos: bool = False,
):
"""
Args:
texts: Список текстов
@@ -104,33 +112,38 @@ class TextDatasetWithSpecialTokens(TextDataset):
for text in texts:
# Кодируем с специальными токенами
input_ids = tokenizer.encode(
text,
add_special_tokens=True,
add_bos_token=add_bos,
add_eos_token=eos
text, add_special_tokens=True, add_bos_token=add_bos, add_eos_token=eos
)
# Учитываем специальные токены при обрезке/дополнении
effective_block_size = block_size
if add_bos:
effective_block_size -= 1
if add_eos:
effective_block_size -= 1
if len(input_ids) > effective_block_size:
input_ids = input_ids[:effective_block_size]
# Добавляем специальные токены если нужно
if add_bos and hasattr(tokenizer, 'bos_token_id') and tokenizer.bos_token_id is not None:
if (
add_bos
and hasattr(tokenizer, "bos_token_id")
and tokenizer.bos_token_id is not None
):
input_ids = [tokenizer.bos_token_id] + input_ids
if add_eos and hasattr(tokenizer, 'eos_token_id') and tokenizer.eos_token_id is not None:
if (
add_eos
and hasattr(tokenizer, "eos_token_id")
and tokenizer.eos_token_id is not None
):
input_ids = input_ids + [tokenizer.eos_token_id]
# Дополняем до полной длины
pad_token_id = getattr(tokenizer, 'pad_token_id', 0)
pad_token_id = getattr(tokenizer, "pad_token_id", 0)
if len(input_ids) < block_size:
input_ids = input_ids + [pad_token_id] * (block_size - len(input_ids))
self.examples.append(input_ids)
def __len__(self):

View File

@@ -1,5 +1,6 @@
import torch.optim as optim
def get_optimizer(model, lr=3e-4, weight_decay=0.01, optimizer_type="adamw"):
"""
Возвращает оптимизатор для обучения модели.

View File

@@ -1,5 +1,6 @@
from torch.optim.lr_scheduler import LambdaLR
def get_linear_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps):
"""
Линейный планировщик обучения с warmup.
@@ -8,6 +9,10 @@ def get_linear_schedule_with_warmup(optimizer, num_warmup_steps, num_training_st
def lr_lambda(current_step):
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
return max(0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps)))
return max(
0.0,
float(num_training_steps - current_step)
/ float(max(1, num_training_steps - num_warmup_steps)),
)
return LambdaLR(optimizer, lr_lambda)

View File

@@ -5,15 +5,29 @@ from tqdm import tqdm
from llm.training.optimizer import get_optimizer
from llm.training.scheduler import get_linear_schedule_with_warmup
class Trainer:
"""
Универсальный класс обучения LLM (GPT, LLaMA, Mistral и т.д.)
"""
def __init__(self, model, train_dataset, val_dataset=None, lr=3e-4, batch_size=8, num_epochs=3, warmup_steps=100):
def __init__(
self,
model,
train_dataset,
val_dataset=None,
lr=3e-4,
batch_size=8,
num_epochs=3,
warmup_steps=100,
):
self.model = model
self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
self.val_loader = DataLoader(val_dataset, batch_size=batch_size) if val_dataset else None
self.train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True
)
self.val_loader = (
DataLoader(val_dataset, batch_size=batch_size) if val_dataset else None
)
self.optimizer = get_optimizer(model, lr=lr)
self.scheduler = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
@@ -29,24 +43,28 @@ class Trainer:
# Сдвигаем логиты и метки для языкового моделирования
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# Вычисляем cross-entropy loss
loss = F.cross_entropy(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1),
ignore_index=-100 # Игнорируем padding tokens
ignore_index=-100, # Игнорируем padding tokens
)
return loss
def train(self):
total_steps = len(self.train_loader) * self.num_epochs
self.scheduler = get_linear_schedule_with_warmup(self.optimizer, self.warmup_steps, total_steps)
self.scheduler = get_linear_schedule_with_warmup(
self.optimizer, self.warmup_steps, total_steps
)
for epoch in range(self.num_epochs):
self.model.train()
total_loss = 0
progress_bar = tqdm(self.train_loader, desc=f"Epoch {epoch+1}/{self.num_epochs}")
progress_bar = tqdm(
self.train_loader, desc=f"Epoch {epoch+1}/{self.num_epochs}"
)
for batch in progress_bar:
self.optimizer.zero_grad()
@@ -59,7 +77,7 @@ class Trainer:
logits = outputs[0]
else:
logits = outputs
# Trainer вычисляет loss
loss = self.compute_lm_loss(logits, labels)
loss.backward()
@@ -85,7 +103,7 @@ class Trainer:
for batch in self.val_loader:
input_ids = batch["input_ids"].to(self.device)
labels = batch["labels"].to(self.device)
outputs = self.model(input_ids)
if isinstance(outputs, tuple):
logits = outputs[0]

View File

@@ -58,7 +58,7 @@ def gpt_config(vocab_size, embed_dim, num_heads, num_layers):
"num_heads": num_heads,
"num_layers": num_layers,
"max_position_embeddings": 1024,
"dropout": 0.1
"dropout": 0.1,
}
@@ -68,12 +68,14 @@ def random_inputs(batch_size, seq_len, vocab_size):
input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))
return input_ids
@pytest.fixture
def random_float_inputs(batch_size, seq_len, embed_dim):
"""Generate random floating point input tensors for testing feed forward."""
inputs = torch.randn(batch_size, seq_len, embed_dim)
return inputs
@pytest.fixture
def random_embeddings(batch_size, seq_len, embed_dim):
"""Generate random embedding tensors for testing attention modules."""

View File

@@ -9,180 +9,233 @@ from llm.core.decoder import Decoder
class TestDecoder:
"""Test cases for Decoder."""
def test_initialization(self, embed_dim, num_heads):
"""Test that Decoder can be initialized."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
assert decoder is not None
# Check internal components
assert hasattr(decoder, '_heads')
assert hasattr(decoder, '_ff')
assert hasattr(decoder, '_norm1')
assert hasattr(decoder, '_norm2')
assert hasattr(decoder, "_heads")
assert hasattr(decoder, "_ff")
assert hasattr(decoder, "_norm1")
assert hasattr(decoder, "_norm2")
def test_forward_pass(self, embed_dim, num_heads, random_embeddings):
"""Test forward pass of Decoder."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
# Forward pass
output = decoder(random_embeddings)
# Check output shape
assert output.shape == random_embeddings.shape
assert isinstance(output, torch.Tensor)
def test_forward_with_causal_mask(self, embed_dim, num_heads, random_embeddings):
"""Test forward pass with causal mask."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
batch_size, seq_len = random_embeddings.shape[:2]
# Create causal mask
mask = torch.tril(torch.ones(seq_len, seq_len))
# Forward pass with causal mask
output = decoder(random_embeddings, mask=mask)
# Check output shape
assert output.shape == random_embeddings.shape
def test_residual_connections(self, embed_dim, num_heads, random_embeddings):
"""Test that residual connections are properly applied."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
output = decoder(random_embeddings)
# With residual connections and layer norm, the output shouldn't be
# too different from input (in terms of scale/distribution)
input_norm = random_embeddings.norm(dim=-1).mean()
output_norm = output.norm(dim=-1).mean()
# Norms should be of similar magnitude (not exact due to transformations)
assert 0.1 < (output_norm / input_norm) < 10.0
def test_layer_norm(self, embed_dim, num_heads, random_embeddings):
"""Test that layer normalization is applied."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
output = decoder(random_embeddings)
# Check that output has reasonable statistics (due to layer norm)
# Mean should be close to 0, std close to 1 for each sequence position
output_mean = output.mean(dim=-1)
output_std = output.std(dim=-1)
# These are approximate checks since the data goes through multiple transformations
assert torch.allclose(output_mean, torch.zeros_like(output_mean), atol=1.0)
assert torch.allclose(output_std, torch.ones_like(output_std), atol=2.0)
def test_gradient_flow(self, embed_dim, num_heads, random_embeddings):
"""Test that gradients flow through Decoder."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
# Forward pass
output = decoder(random_embeddings)
# Create a dummy loss and backward pass
loss = output.sum()
loss.backward()
# Check that gradients are computed for learnable parameters
# in attention and feed forward components
assert decoder._heads._layer.weight.grad is not None
assert decoder._ff._layer1.weight.grad is not None
assert decoder._norm1.weight.grad is not None
assert decoder._norm2.weight.grad is not None
def test_device_consistency(self, embed_dim, num_heads, random_embeddings, device):
"""Test that Decoder works on correct device."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len).to(device)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
).to(device)
inputs = random_embeddings.to(device)
# Forward pass
output = decoder(inputs)
# Check device consistency
assert output.device == device
assert decoder._heads._layer.weight.device == device
def test_different_configurations(self):
"""Test Decoder with different configurations."""
test_cases = [
(64, 2), # embed_dim=64, num_heads=2
(64, 2), # embed_dim=64, num_heads=2
(128, 4), # embed_dim=128, num_heads=4
(256, 8), # embed_dim=256, num_heads=8
]
for embed_dim, num_heads in test_cases:
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = decoder(inputs)
assert output.shape == inputs.shape
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
def test_different_input_shapes(self, embed_dim, num_heads, batch_size, seq_len):
"""Test Decoder with different input shapes."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = decoder(inputs)
assert output.shape == (batch_size, seq_len, embed_dim)
def test_training_vs_evaluation(self, embed_dim, num_heads, random_embeddings):
"""Test that Decoder behaves differently in train vs eval mode."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len, dropout=0.5)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
dropout=0.5,
)
# Training mode
decoder.train()
output_train = decoder(random_embeddings)
# Evaluation mode
decoder.eval()
output_eval = decoder(random_embeddings)
# Outputs should be different due to dropout
assert not torch.allclose(output_train, output_eval)
def test_parameter_initialization(self, embed_dim, num_heads):
"""Test that parameters are properly initialized."""
head_size = embed_dim // num_heads
max_seq_len = 1024
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
decoder = Decoder(
num_heads=num_heads,
emb_size=embed_dim,
head_size=head_size,
max_seq_len=max_seq_len,
)
# Check that various components have non-zero parameters
assert not torch.allclose(
decoder._heads._layer.weight,
torch.zeros_like(decoder._heads._layer.weight)
decoder._heads._layer.weight, torch.zeros_like(decoder._heads._layer.weight)
)
assert not torch.allclose(
decoder._ff._layer1.weight,
torch.zeros_like(decoder._ff._layer1.weight)
decoder._ff._layer1.weight, torch.zeros_like(decoder._ff._layer1.weight)
)
assert not torch.allclose(
decoder._norm1.weight,
torch.zeros_like(decoder._norm1.weight)
decoder._norm1.weight, torch.zeros_like(decoder._norm1.weight)
)

View File

@@ -10,168 +10,178 @@ from llm.core.feed_forward import FeedForward
class TestFeedForward:
"""Test cases for FeedForward."""
def test_initialization(self, embed_dim):
"""Test that FeedForward can be initialized."""
ff = FeedForward(embed_dim)
assert ff is not None
# Check internal layers
assert hasattr(ff, '_layer1')
assert hasattr(ff, '_layer2')
assert hasattr(ff, '_activation')
assert hasattr(ff, '_dropout')
assert hasattr(ff, "_layer1")
assert hasattr(ff, "_layer2")
assert hasattr(ff, "_activation")
assert hasattr(ff, "_dropout")
# Check layer dimensions
expected_hidden_dim = embed_dim * 4 # Default expansion factor
assert ff._layer1.weight.shape == (expected_hidden_dim, embed_dim)
assert ff._layer2.weight.shape == (embed_dim, expected_hidden_dim)
def test_forward_pass(self, embed_dim, random_float_inputs):
"""Test forward pass of FeedForward."""
ff = FeedForward(embed_dim)
# Forward pass
output = ff(random_float_inputs)
# Check output shape
assert output.shape == random_float_inputs.shape
assert isinstance(output, torch.Tensor)
def test_custom_hidden_dim(self, embed_dim):
"""Test FeedForward with custom hidden dimension."""
# FeedForward doesn't support custom hidden_dim in current implementation
# This test is not applicable
ff = FeedForward(embed_dim)
# Check layer dimensions (fixed 4x expansion)
expected_hidden_dim = embed_dim * 4
assert ff._layer1.weight.shape == (expected_hidden_dim, embed_dim)
assert ff._layer2.weight.shape == (embed_dim, expected_hidden_dim)
def test_dropout(self, embed_dim, random_float_inputs):
"""Test that dropout is applied during training."""
ff = FeedForward(embed_dim, dropout=0.5)
ff.train() # Set to training mode
output = ff(random_float_inputs)
# In training mode with dropout, some values should be zeroed
# This is probabilistic, so we can't assert exact zeros,
# but we can check the structure is preserved
assert output.shape == random_float_inputs.shape
def test_no_dropout_in_eval(self, embed_dim, random_float_inputs):
"""Test that dropout is not applied during evaluation."""
ff = FeedForward(embed_dim, dropout=0.5)
ff.eval() # Set to evaluation mode
# Run forward pass multiple times - outputs should be identical
output1 = ff(random_float_inputs)
output2 = ff(random_float_inputs)
assert torch.allclose(output1, output2)
def test_activation_function(self, embed_dim, random_float_inputs):
"""Test that activation function is applied."""
ff = FeedForward(embed_dim)
# Manually compute expected output without dropout for deterministic comparison
hidden = ff._layer1(random_float_inputs)
activated = ff._activation(hidden)
expected_output = ff._layer2(activated)
# Compare with forward pass in eval mode (no dropout)
ff.eval()
actual_output = ff(random_float_inputs)
assert torch.allclose(actual_output, expected_output, rtol=1e-4)
def test_gradient_flow(self, embed_dim, random_float_inputs):
"""Test that gradients flow through FeedForward."""
ff = FeedForward(embed_dim)
# Forward pass
output = ff(random_float_inputs)
# Create a dummy loss and backward pass
loss = output.sum()
loss.backward()
# Check that gradients are computed for learnable parameters
assert ff._layer1.weight.grad is not None
assert ff._layer2.weight.grad is not None
assert not torch.allclose(ff._layer1.weight.grad,
torch.zeros_like(ff._layer1.weight.grad))
assert not torch.allclose(ff._layer2.weight.grad,
torch.zeros_like(ff._layer2.weight.grad))
assert not torch.allclose(
ff._layer1.weight.grad, torch.zeros_like(ff._layer1.weight.grad)
)
assert not torch.allclose(
ff._layer2.weight.grad, torch.zeros_like(ff._layer2.weight.grad)
)
def test_device_consistency(self, embed_dim, random_float_inputs, device):
"""Test that FeedForward works on correct device."""
ff = FeedForward(embed_dim).to(device)
inputs = random_float_inputs.to(device)
# Forward pass
output = ff(inputs)
# Check device consistency
assert output.device == device
assert ff._layer1.weight.device == device
assert ff._layer2.weight.device == device
def test_different_embed_dims(self):
"""Test FeedForward with different embedding dimensions."""
test_cases = [64, 128, 256, 512]
for embed_dim in test_cases:
ff = FeedForward(embed_dim)
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = ff(inputs)
assert output.shape == inputs.shape
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
def test_different_input_shapes(self, embed_dim, batch_size, seq_len):
"""Test FeedForward with different input shapes."""
ff = FeedForward(embed_dim)
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = ff(inputs)
assert output.shape == (batch_size, seq_len, embed_dim)
def test_non_linearity(self, embed_dim, random_float_inputs):
"""Test that FeedForward introduces non-linearity."""
ff = FeedForward(embed_dim)
# Create a simple linear transformation for comparison
linear_layer = nn.Linear(embed_dim, embed_dim)
# Copy weights to make comparison fair
with torch.no_grad():
linear_layer.weight.copy_(ff._layer2.weight @ ff._layer1.weight)
if linear_layer.bias is not None:
linear_layer.bias.zero_()
linear_output = linear_layer(random_float_inputs)
ff_output = ff(random_float_inputs)
# FeedForward output should be different from pure linear transformation
# due to activation function
assert not torch.allclose(ff_output, linear_output, rtol=1e-4)
def test_parameter_initialization(self, embed_dim):
"""Test that parameters are properly initialized."""
ff = FeedForward(embed_dim)
# Check that weights are not all zeros
assert not torch.allclose(ff._layer1.weight, torch.zeros_like(ff._layer1.weight))
assert not torch.allclose(ff._layer2.weight, torch.zeros_like(ff._layer2.weight))
assert not torch.allclose(
ff._layer1.weight, torch.zeros_like(ff._layer1.weight)
)
assert not torch.allclose(
ff._layer2.weight, torch.zeros_like(ff._layer2.weight)
)
# Check that biases are not all zeros (they should be initialized with some values)
if ff._layer1.bias is not None:
assert not torch.allclose(ff._layer1.bias, torch.zeros_like(ff._layer1.bias))
assert not torch.allclose(
ff._layer1.bias, torch.zeros_like(ff._layer1.bias)
)
if ff._layer2.bias is not None:
assert not torch.allclose(ff._layer2.bias, torch.zeros_like(ff._layer2.bias))
assert not torch.allclose(
ff._layer2.bias, torch.zeros_like(ff._layer2.bias)
)

View File

@@ -9,157 +9,181 @@ from llm.core.multi_head_attention import MultiHeadAttention
class TestMultiHeadAttention:
"""Test cases for MultiHeadAttention."""
def test_initialization(self, embed_dim, num_heads):
"""Test that MultiHeadAttention can be initialized."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
assert attention is not None
# Check internal attributes
assert len(attention._heads) == num_heads
assert attention._layer.in_features == embed_dim
assert attention._layer.out_features == embed_dim
def test_forward_pass(self, embed_dim, num_heads, random_embeddings):
"""Test forward pass of MultiHeadAttention."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
# Forward pass
output, _ = attention(random_embeddings)
# Check output shape
assert output.shape == random_embeddings.shape
assert isinstance(output, torch.Tensor)
def test_forward_with_mask(self, embed_dim, num_heads, random_embeddings):
"""Test forward pass with attention mask."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
# Create a simple mask
seq_len = random_embeddings.shape[1]
mask = torch.tril(torch.ones(seq_len, seq_len)) # Causal mask
# Forward pass with mask
output, _ = attention(random_embeddings, mask=mask)
# Check output shape
assert output.shape == random_embeddings.shape
def test_causal_mask(self, embed_dim, num_heads, random_embeddings):
"""Test that causal mask prevents attending to future positions."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
# Create causal mask
seq_len = random_embeddings.shape[1]
causal_mask = torch.tril(torch.ones(seq_len, seq_len))
# Forward pass with causal mask
output, _ = attention(random_embeddings, mask=causal_mask)
# Check output shape
assert output.shape == random_embeddings.shape
def test_attention_weights_normalization(self, embed_dim, num_heads, random_embeddings):
def test_attention_weights_normalization(
self, embed_dim, num_heads, random_embeddings
):
"""Test that attention weights are properly normalized."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
# Forward pass
output, _ = attention(random_embeddings)
# Check output shape
assert output.shape == random_embeddings.shape
def test_gradient_flow(self, embed_dim, num_heads, random_embeddings):
"""Test that gradients flow through MultiHeadAttention."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
# Forward pass
output, _ = attention(random_embeddings)
# Create a dummy loss and backward pass
loss = output.sum()
loss.backward()
# Check that gradients are computed for learnable parameters
assert attention._layer.weight.grad is not None
if len(attention._heads) > 0:
assert attention._heads[0]._q.weight.grad is not None
def test_device_consistency(self, embed_dim, num_heads, random_embeddings, device):
"""Test that MultiHeadAttention works on correct device."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024).to(device)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
).to(device)
inputs = random_embeddings.to(device)
# Forward pass
output, _ = attention(inputs)
# Check device consistency
assert output.device == device
assert attention._layer.weight.device == device
def test_different_embed_dim_and_heads(self):
"""Test MultiHeadAttention with different embed_dim and num_heads combinations."""
test_cases = [
(64, 2), # embed_dim=64, num_heads=2
(64, 2), # embed_dim=64, num_heads=2
(128, 4), # embed_dim=128, num_heads=4
(256, 8), # embed_dim=256, num_heads=8
(512, 16), # embed_dim=512, num_heads=16
(512, 16), # embed_dim=512, num_heads=16
]
for embed_dim, num_heads in test_cases:
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, embed_dim)
output, _ = attention(inputs)
assert output.shape == inputs.shape
def test_attention_output_range(self, embed_dim, num_heads, random_embeddings):
"""Test that attention output is in reasonable range."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
output, _ = attention(random_embeddings)
# Output shouldn't have extreme values
assert output.abs().max() < 100 # Reasonable upper bound
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
def test_different_input_shapes(self, embed_dim, num_heads, batch_size, seq_len):
"""Test MultiHeadAttention with different input shapes."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024
)
inputs = torch.randn(batch_size, seq_len, embed_dim)
output, _ = attention(inputs)
assert output.shape == (batch_size, seq_len, embed_dim)
def test_parameter_sharing(self, embed_dim, num_heads):
"""Test that parameters are properly shared across the sequence."""
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024, dropout=0.0) # No dropout for deterministic test
attention = MultiHeadAttention(
num_heads, embed_dim, head_size, max_seq_len=1024, dropout=0.0
) # No dropout for deterministic test
# Create two identical sequences
seq_len = 10
base_sequence = torch.randn(1, seq_len, embed_dim)
identical_sequence = base_sequence.clone()
# Set to eval mode to disable dropout
attention.eval()
with torch.no_grad():
output1, _ = attention(base_sequence)
output2, _ = attention(identical_sequence)
# With identical inputs and same parameters, outputs should be identical
assert torch.allclose(output1, output2, rtol=1e-5)

View File

@@ -10,127 +10,134 @@ from llm.core.positional_embeddings import PositionalEmbeddings
class TestPositionalEmbeddings:
"""Test cases for PositionalEmbeddings."""
def test_initialization(self, embed_dim):
"""Test that PositionalEmbeddings can be initialized."""
max_seq_len = 1024
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
assert embeddings is not None
# Check that positional embeddings are created
assert hasattr(embeddings, 'embedding')
assert hasattr(embeddings, "embedding")
assert embeddings.embedding.weight.shape == (max_seq_len, embed_dim)
def test_forward_pass(self, embed_dim):
"""Test forward pass of PositionalEmbeddings."""
max_seq_len = 1024
seq_len = 64
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
# Forward pass - takes sequence length, not input tensor
output = embeddings(seq_len)
# Check output shape
expected_shape = (seq_len, embed_dim)
assert output.shape == expected_shape
assert isinstance(output, torch.Tensor)
def test_positional_encoding_values(self, embed_dim):
"""Test that positional encoding values are computed correctly."""
max_seq_len = 10
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
# Get embeddings for all positions
pe = embeddings(max_seq_len) # Shape: [max_seq_len, embed_dim]
# Check that different positions have different embeddings
# (since these are learnable embeddings, not fixed sine/cosine)
for pos in range(max_seq_len):
for i in range(pos + 1, max_seq_len):
assert not torch.allclose(pe[pos], pe[i], rtol=1e-4)
def test_different_sequence_lengths(self, embed_dim):
"""Test PositionalEmbeddings with different sequence lengths."""
test_cases = [
(10, 5), # seq_len < max_seq_len
(10, 5), # seq_len < max_seq_len
(10, 10), # seq_len == max_seq_len
]
for max_seq_len, seq_len in test_cases:
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
# Get embeddings for specific sequence length
output = embeddings(seq_len)
# Output should have shape [seq_len, embed_dim]
assert output.shape == (seq_len, embed_dim)
def test_gradient_flow(self, embed_dim):
"""Test that gradients flow through PositionalEmbeddings."""
max_seq_len = 64
seq_len = 32
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
# Forward pass
output = embeddings(seq_len)
# Create a dummy loss and backward pass
loss = output.sum()
loss.backward()
# Positional embeddings should have gradients (they're learnable)
assert embeddings.embedding.weight.grad is not None
assert not torch.allclose(embeddings.embedding.weight.grad,
torch.zeros_like(embeddings.embedding.weight.grad))
assert not torch.allclose(
embeddings.embedding.weight.grad,
torch.zeros_like(embeddings.embedding.weight.grad),
)
def test_device_consistency(self, embed_dim, device):
"""Test that PositionalEmbeddings works on correct device."""
max_seq_len = 64
seq_len = 32
embeddings = PositionalEmbeddings(max_seq_len, embed_dim).to(device)
# Forward pass
output = embeddings(seq_len)
# Check device consistency
assert output.device == device
assert embeddings.embedding.weight.device == device
def test_reproducibility(self, embed_dim):
"""Test that positional embeddings are reproducible."""
max_seq_len = 100
embeddings1 = PositionalEmbeddings(max_seq_len, embed_dim)
embeddings2 = PositionalEmbeddings(max_seq_len, embed_dim)
# Different instances should have different embeddings (random initialization)
assert not torch.allclose(embeddings1.embedding.weight, embeddings2.embedding.weight)
assert not torch.allclose(
embeddings1.embedding.weight, embeddings2.embedding.weight
)
# But same instance should produce same output for same input
seq_len = 50
output1 = embeddings1(seq_len)
output2 = embeddings1(seq_len) # Same instance, same input
assert torch.allclose(output1, output2)
def test_positional_pattern(self, embed_dim):
"""Test that positional embeddings create a meaningful pattern."""
max_seq_len = 50
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
pe = embeddings(max_seq_len) # Get all positional embeddings
# Check that different positions have different embeddings
# (with high probability due to random initialization)
assert not torch.allclose(pe[0], pe[1], rtol=1e-4)
assert not torch.allclose(pe[10], pe[20], rtol=1e-4)
@pytest.mark.parametrize("max_seq_len,seq_len,embed_dim", [
(64, 10, 64),
(128, 50, 128),
(256, 100, 256),
])
@pytest.mark.parametrize(
"max_seq_len,seq_len,embed_dim",
[
(64, 10, 64),
(128, 50, 128),
(256, 100, 256),
],
)
def test_different_configurations(self, max_seq_len, seq_len, embed_dim):
"""Test PositionalEmbeddings with different configurations."""
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
output = embeddings(seq_len)
assert output.shape == (seq_len, embed_dim)

View File

@@ -9,99 +9,103 @@ from llm.core.token_embeddings import TokenEmbeddings
class TestTokenEmbeddings:
"""Test cases for TokenEmbeddings."""
def test_initialization(self, vocab_size, embed_dim):
"""Test that TokenEmbeddings can be initialized."""
embeddings = TokenEmbeddings(vocab_size, embed_dim)
assert embeddings is not None
# Check embedding layer
assert hasattr(embeddings, '_embedding')
assert hasattr(embeddings, "_embedding")
assert embeddings._embedding.weight.shape == (vocab_size, embed_dim)
def test_forward_pass(self, vocab_size, embed_dim, random_inputs):
"""Test forward pass of TokenEmbeddings."""
embeddings = TokenEmbeddings(vocab_size, embed_dim)
# Forward pass
output = embeddings(random_inputs)
# Check output shape
assert output.shape == (random_inputs.shape[0], random_inputs.shape[1], embed_dim)
assert output.shape == (
random_inputs.shape[0],
random_inputs.shape[1],
embed_dim,
)
assert isinstance(output, torch.Tensor)
def test_embedding_weights(self, vocab_size, embed_dim):
"""Test that embedding weights are properly initialized."""
embeddings = TokenEmbeddings(vocab_size, embed_dim)
weights = embeddings._embedding.weight
assert weights.requires_grad is True
# Check that weights are not all zeros
assert not torch.allclose(weights, torch.zeros_like(weights))
def test_different_vocab_sizes(self):
"""Test TokenEmbeddings with different vocabulary sizes."""
test_cases = [
(100, 128),
(1000, 256),
(50000, 512)
]
test_cases = [(100, 128), (1000, 256), (50000, 512)]
for vocab_size, embed_dim in test_cases:
embeddings = TokenEmbeddings(vocab_size, embed_dim)
assert embeddings._embedding.weight.shape == (vocab_size, embed_dim)
def test_gradient_flow(self, vocab_size, embed_dim, random_inputs):
"""Test that gradients flow through TokenEmbeddings."""
embeddings = TokenEmbeddings(vocab_size, embed_dim)
# Forward pass
output = embeddings(random_inputs)
# Create a dummy loss and backward pass
loss = output.sum()
loss.backward()
# Check that gradients are computed
assert embeddings._embedding.weight.grad is not None
assert not torch.allclose(embeddings._embedding.weight.grad,
torch.zeros_like(embeddings._embedding.weight.grad))
assert not torch.allclose(
embeddings._embedding.weight.grad,
torch.zeros_like(embeddings._embedding.weight.grad),
)
def test_device_consistency(self, vocab_size, embed_dim, random_inputs, device):
"""Test that TokenEmbeddings works on correct device."""
embeddings = TokenEmbeddings(vocab_size, embed_dim).to(device)
inputs = random_inputs.to(device)
# Forward pass
output = embeddings(inputs)
# Check device consistency
assert output.device == device
assert embeddings._embedding.weight.device == device
def test_embedding_lookup(self, vocab_size, embed_dim):
"""Test specific embedding lookups."""
embeddings = TokenEmbeddings(vocab_size, embed_dim)
# Test lookup for specific tokens
test_tokens = torch.tensor([[0, 1, 2], [vocab_size - 1, vocab_size - 2, vocab_size - 3]])
test_tokens = torch.tensor(
[[0, 1, 2], [vocab_size - 1, vocab_size - 2, vocab_size - 3]]
)
output = embeddings(test_tokens)
# Check shape
assert output.shape == (2, 3, embed_dim)
# Check that different tokens have different embeddings
# (with high probability due to random initialization)
assert not torch.allclose(output[0, 0], output[0, 1], rtol=1e-4)
@pytest.mark.parametrize("batch_size,seq_len", [(1, 1), (2, 10), (8, 64)])
def test_different_input_shapes(self, vocab_size, embed_dim, batch_size, seq_len):
"""Test TokenEmbeddings with different input shapes."""
embeddings = TokenEmbeddings(vocab_size, embed_dim)
inputs = torch.randint(0, vocab_size, (batch_size, seq_len))
output = embeddings(inputs)
assert output.shape == (batch_size, seq_len, embed_dim)

View File

@@ -9,162 +9,156 @@ from llm.models.gpt import GPT
class TestGPT:
"""Test cases for GPT model."""
def test_initialization(self, gpt_config):
"""Test that GPT can be initialized."""
model = GPT(gpt_config)
assert model is not None
# Check that model has required components
assert hasattr(model, '_token_embeddings')
assert hasattr(model, '_position_embeddings')
assert hasattr(model, '_decoders')
assert hasattr(model, '_linear')
assert hasattr(model, '_dropout')
assert hasattr(model, "_token_embeddings")
assert hasattr(model, "_position_embeddings")
assert hasattr(model, "_decoders")
assert hasattr(model, "_linear")
assert hasattr(model, "_dropout")
# Check number of decoder layers
assert len(model._decoders) == gpt_config['num_layers']
assert len(model._decoders) == gpt_config["num_layers"]
def test_forward_pass(self, gpt_config, random_inputs):
"""Test forward pass of GPT."""
model = GPT(gpt_config)
# Forward pass
logits = model(random_inputs)
# Check output shape
batch_size, seq_len = random_inputs.shape
vocab_size = gpt_config['vocab_size']
vocab_size = gpt_config["vocab_size"]
assert logits.shape == (batch_size, seq_len, vocab_size)
assert isinstance(logits, torch.Tensor)
def test_forward_with_attention_mask(self, gpt_config, random_inputs, attention_mask):
def test_forward_with_attention_mask(
self, gpt_config, random_inputs, attention_mask
):
"""Test forward pass with attention mask."""
model = GPT(gpt_config)
# Forward pass with mask
logits = model(random_inputs, attention_mask=attention_mask)
# Check output shape
batch_size, seq_len = random_inputs.shape
vocab_size = gpt_config['vocab_size']
vocab_size = gpt_config["vocab_size"]
assert logits.shape == (batch_size, seq_len, vocab_size)
def test_generate_text(self, gpt_config):
"""Test text generation."""
model = GPT(gpt_config)
model.eval() # Set to evaluation mode for generation
# Create initial input
batch_size = 2
initial_seq_len = 5
input_ids = torch.randint(0, gpt_config['vocab_size'], (batch_size, initial_seq_len))
input_ids = torch.randint(
0, gpt_config["vocab_size"], (batch_size, initial_seq_len)
)
# Generate text
with torch.no_grad():
generated = model.generate(
x=input_ids,
max_new_tokens=10,
do_sample=False # Use greedy for deterministic testing
do_sample=False, # Use greedy for deterministic testing
)
# Check output shape
expected_seq_len = initial_seq_len + 10
assert generated.shape == (batch_size, expected_seq_len)
# Check that initial sequence is preserved
assert torch.allclose(generated[:, :initial_seq_len], input_ids)
def test_generate_with_temperature(self, gpt_config):
"""Test text generation with temperature sampling."""
model = GPT(gpt_config)
model.eval()
# Create initial input
input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3))
input_ids = torch.randint(0, gpt_config["vocab_size"], (1, 3))
# Generate with temperature
with torch.no_grad():
generated = model.generate(
x=input_ids,
max_new_tokens=5,
do_sample=True,
temperature=0.8
x=input_ids, max_new_tokens=5, do_sample=True, temperature=0.8
)
assert generated.shape == (1, 8) # 3 initial + 5 new tokens
def test_generate_with_top_k(self, gpt_config):
"""Test text generation with top-k sampling."""
model = GPT(gpt_config)
model.eval()
# Create initial input
input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3))
input_ids = torch.randint(0, gpt_config["vocab_size"], (1, 3))
# Generate with top-k
with torch.no_grad():
generated = model.generate(
x=input_ids,
max_new_tokens=5,
do_sample=True,
top_k=10
x=input_ids, max_new_tokens=5, do_sample=True, top_k=10
)
assert generated.shape == (1, 8)
def test_generate_with_top_p(self, gpt_config):
"""Test text generation with top-p (nucleus) sampling."""
model = GPT(gpt_config)
model.eval()
# Create initial input
input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3))
input_ids = torch.randint(0, gpt_config["vocab_size"], (1, 3))
# Generate with top-p
with torch.no_grad():
generated = model.generate(
x=input_ids,
max_new_tokens=5,
do_sample=True,
top_p=0.9
x=input_ids, max_new_tokens=5, do_sample=True, top_p=0.9
)
assert generated.shape == (1, 8)
def test_gradient_flow(self, gpt_config, random_inputs):
"""Test that gradients flow through GPT."""
model = GPT(gpt_config)
# Forward pass
logits = model(random_inputs)
# Create a dummy loss and backward pass
targets = torch.randint(0, gpt_config['vocab_size'], random_inputs.shape)
targets = torch.randint(0, gpt_config["vocab_size"], random_inputs.shape)
loss = torch.nn.functional.cross_entropy(
logits.view(-1, logits.size(-1)),
targets.view(-1)
logits.view(-1, logits.size(-1)), targets.view(-1)
)
loss.backward()
# Check that gradients are computed for various components
assert model._token_embeddings._embedding.weight.grad is not None
assert model._linear.weight.grad is not None
if len(model._decoders) > 0:
assert model._decoders[0]._heads._heads[0]._q.weight.grad is not None
def test_device_consistency(self, gpt_config, random_inputs, device):
"""Test that GPT works on correct device."""
model = GPT(gpt_config).to(device)
inputs = random_inputs.to(device)
# Forward pass
logits = model(inputs)
# Check device consistency
assert logits.device == device
assert model._token_embeddings._embedding.weight.device == device
def test_different_configurations(self):
"""Test GPT with different configurations."""
test_configs = [
@@ -174,7 +168,7 @@ class TestGPT:
"num_heads": 2,
"num_layers": 2,
"max_position_embeddings": 256,
"dropout": 0.1
"dropout": 0.1,
},
{
"vocab_size": 5000,
@@ -182,7 +176,7 @@ class TestGPT:
"num_heads": 4,
"num_layers": 4,
"max_position_embeddings": 512,
"dropout": 0.1
"dropout": 0.1,
},
{
"vocab_size": 10000,
@@ -190,98 +184,94 @@ class TestGPT:
"num_heads": 8,
"num_layers": 6,
"max_position_embeddings": 1024,
"dropout": 0.1
}
"dropout": 0.1,
},
]
for config in test_configs:
model = GPT(config)
batch_size, seq_len = 2, 16
inputs = torch.randint(0, config['vocab_size'], (batch_size, seq_len))
inputs = torch.randint(0, config["vocab_size"], (batch_size, seq_len))
logits = model(inputs)
expected_shape = (batch_size, seq_len, config['vocab_size'])
expected_shape = (batch_size, seq_len, config["vocab_size"])
assert logits.shape == expected_shape
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
def test_different_input_shapes(self, gpt_config, batch_size, seq_len):
"""Test GPT with different input shapes."""
model = GPT(gpt_config)
inputs = torch.randint(0, gpt_config['vocab_size'], (batch_size, seq_len))
inputs = torch.randint(0, gpt_config["vocab_size"], (batch_size, seq_len))
logits = model(inputs)
expected_shape = (batch_size, seq_len, gpt_config['vocab_size'])
expected_shape = (batch_size, seq_len, gpt_config["vocab_size"])
assert logits.shape == expected_shape
def test_training_vs_evaluation(self, gpt_config, random_inputs):
"""Test that GPT behaves differently in train vs eval mode."""
model = GPT(gpt_config)
# Training mode
model.train()
output_train = model(random_inputs)
# Evaluation mode
model.eval()
output_eval = model(random_inputs)
# Outputs should be different due to dropout
assert not torch.allclose(output_train, output_eval)
def test_parameter_count(self, gpt_config):
"""Test that GPT has reasonable number of parameters."""
model = GPT(gpt_config)
total_params = sum(p.numel() for p in model.parameters())
# For a small GPT model, parameters should be in reasonable range
vocab_size = gpt_config['vocab_size']
embed_dim = gpt_config['embed_dim']
num_layers = gpt_config['num_layers']
num_heads = gpt_config['num_heads']
vocab_size = gpt_config["vocab_size"]
embed_dim = gpt_config["embed_dim"]
num_layers = gpt_config["num_layers"]
num_heads = gpt_config["num_heads"]
# Rough estimate: token_embeddings + output_layer + (attention + ff) * layers
expected_min = vocab_size * embed_dim * 2 # embeddings and output
expected_max = expected_min * 10 # Allow for decoder parameters
assert expected_min < total_params < expected_max
def test_causal_attention(self, gpt_config):
"""Test that GPT uses causal attention during generation."""
model = GPT(gpt_config)
model.eval()
# Create input with known pattern
input_ids = torch.tensor([[1, 2, 3]]).long()
with torch.no_grad():
# Get logits for next token prediction
logits = model(input_ids)
# The model should only attend to previous tokens (causal)
# We can't directly test attention masks in the public API,
# but we can verify the generation works correctly
generated = model.generate(
x=input_ids,
max_new_tokens=3,
do_sample=False
)
generated = model.generate(x=input_ids, max_new_tokens=3, do_sample=False)
# Generated sequence should be longer than input
assert generated.shape[1] == input_ids.shape[1] + 3
def test_output_distribution(self, gpt_config, random_inputs):
"""Test that GPT output has proper distribution."""
model = GPT(gpt_config)
logits = model(random_inputs)
# Logits should not have extreme values
assert logits.abs().max() < 100
# Softmax should produce valid probabilities
probs = torch.softmax(logits, dim=-1)
assert torch.allclose(probs.sum(dim=-1), torch.ones_like(probs.sum(dim=-1)))

View File

@@ -11,25 +11,25 @@ import os
def test_gpt_model_creation():
"""Test that GPT model can be created and forward pass works."""
from llm.models.gpt import GPT
config = {
"vocab_size": 1000,
"embed_dim": 128,
"num_heads": 4,
"num_layers": 2,
"max_position_embeddings": 256,
"dropout": 0.1
"dropout": 0.1,
}
model = GPT(config)
# Test forward pass
batch_size, seq_len = 2, 16
input_ids = torch.randint(0, config["vocab_size"], (batch_size, seq_len))
with torch.no_grad():
logits = model(input_ids)
assert logits.shape == (batch_size, seq_len, config["vocab_size"])
print("✅ GPT model creation and forward pass test passed")
@@ -37,27 +37,21 @@ def test_gpt_model_creation():
def test_bpe_tokenizer_basic():
"""Test basic BPE tokenizer functionality."""
from llm.tokenizers import BPETokenizer
tokenizer = BPETokenizer()
# Train on simple texts
texts = [
"hello world",
"test tokenization",
"simple example"
]
texts = ["hello world", "test tokenization", "simple example"]
tokenizer.train(
texts=texts,
vocab_size=50,
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
texts=texts, vocab_size=50, special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
)
# Test encoding/decoding
text = "hello world"
tokens = tokenizer.encode(text)
decoded = tokenizer.decode(tokens)
assert isinstance(tokens, list)
assert isinstance(decoded, str)
assert len(tokens) > 0
@@ -67,18 +61,18 @@ def test_bpe_tokenizer_basic():
def test_token_embeddings():
"""Test token embeddings."""
from llm.core.token_embeddings import TokenEmbeddings
vocab_size = 1000
embed_dim = 128
embeddings = TokenEmbeddings(vocab_size, embed_dim)
# Test forward pass
batch_size, seq_len = 2, 16
input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))
output = embeddings(input_ids)
assert output.shape == (batch_size, seq_len, embed_dim)
print("✅ Token embeddings test passed")
@@ -86,20 +80,20 @@ def test_token_embeddings():
def test_multi_head_attention():
"""Test multi-head attention."""
from llm.core.multi_head_attention import MultiHeadAttention
num_heads = 4
emb_size = 128
head_size = emb_size // num_heads
max_seq_len = 256
attention = MultiHeadAttention(num_heads, emb_size, head_size, max_seq_len)
# Test forward pass
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, emb_size)
output, _ = attention(inputs)
assert output.shape == inputs.shape
print("✅ Multi-head attention test passed")
@@ -107,17 +101,17 @@ def test_multi_head_attention():
def test_feed_forward():
"""Test feed forward network."""
from llm.core.feed_forward import FeedForward
embed_dim = 128
ff = FeedForward(embed_dim)
# Test forward pass
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = ff(inputs)
assert output.shape == inputs.shape
print("✅ Feed forward test passed")
@@ -125,29 +119,25 @@ def test_feed_forward():
def test_gpt_generation():
"""Test GPT text generation."""
from llm.models.gpt import GPT
config = {
"vocab_size": 1000,
"embed_dim": 128,
"num_heads": 4,
"num_layers": 2,
"max_position_embeddings": 256,
"dropout": 0.1
"dropout": 0.1,
}
model = GPT(config)
model.eval()
# Test greedy generation
input_ids = torch.randint(0, config["vocab_size"], (1, 5))
with torch.no_grad():
generated = model.generate(
x=input_ids,
max_new_tokens=3,
do_sample=False
)
generated = model.generate(x=input_ids, max_new_tokens=3, do_sample=False)
assert generated.shape == (1, 8) # 5 initial + 3 new tokens
print("✅ GPT generation test passed")
@@ -155,50 +145,48 @@ def test_gpt_generation():
def test_bpe_tokenizer_save_load():
"""Test BPE tokenizer save/load functionality."""
from llm.tokenizers import BPETokenizer
tokenizer = BPETokenizer()
# Train on simple texts
texts = ["hello world", "test save load"]
tokenizer.train(
texts=texts,
vocab_size=30,
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
texts=texts, vocab_size=30, special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
)
with tempfile.TemporaryDirectory() as temp_dir:
save_path = os.path.join(temp_dir, "test_tokenizer.json")
# Save tokenizer
tokenizer.save(save_path)
assert os.path.exists(save_path)
# Load tokenizer
loaded_tokenizer = BPETokenizer.load(save_path)
# Test that vocab size is the same
assert tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size()
# Test that vocabularies are the same
assert tokenizer.get_vocab() == loaded_tokenizer.get_vocab()
# Test that both can encode/decode (even if tokens differ due to BPE state)
text = "hello world"
original_tokens = tokenizer.encode(text)
loaded_tokens = loaded_tokenizer.encode(text)
# Both should produce valid token lists
assert isinstance(original_tokens, list)
assert isinstance(loaded_tokens, list)
assert len(original_tokens) > 0
assert len(loaded_tokens) > 0
# Both should be able to decode
original_decoded = tokenizer.decode(original_tokens)
loaded_decoded = loaded_tokenizer.decode(loaded_tokens)
assert isinstance(original_decoded, str)
assert isinstance(loaded_decoded, str)
print("✅ BPE tokenizer save/load test passed")
@@ -206,18 +194,16 @@ def test_gpt_with_tokenizer():
"""Test GPT model with tokenizer integration."""
from llm.models.gpt import GPT
from llm.tokenizers import BPETokenizer
# Create and train tokenizer
tokenizer = BPETokenizer()
texts = ["hello world", "test integration"]
tokenizer.train(
texts=texts,
vocab_size=50,
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
texts=texts, vocab_size=50, special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
)
vocab_size = tokenizer.get_vocab_size()
# Create GPT model with tokenizer's vocab size
config = {
"vocab_size": vocab_size,
@@ -225,19 +211,19 @@ def test_gpt_with_tokenizer():
"num_heads": 4,
"num_layers": 2,
"max_position_embeddings": 256,
"dropout": 0.1
"dropout": 0.1,
}
model = GPT(config)
# Test with tokenized input
text = "hello world"
tokens = tokenizer.encode(text, add_special_tokens=False)
input_ids = torch.tensor([tokens])
with torch.no_grad():
logits = model(input_ids)
assert logits.shape == (1, len(tokens), vocab_size)
print("✅ GPT with tokenizer integration test passed")
@@ -245,7 +231,7 @@ def test_gpt_with_tokenizer():
def run_all_tests():
"""Run all basic tests."""
print("🧪 Running basic tests for llm library...")
test_gpt_model_creation()
test_bpe_tokenizer_basic()
test_token_embeddings()
@@ -254,7 +240,7 @@ def run_all_tests():
test_gpt_generation()
test_bpe_tokenizer_save_load()
test_gpt_with_tokenizer()
print("🎉 All basic tests passed!")

View File

@@ -8,15 +8,15 @@ from llm.tokenizers import BaseTokenizer
class ConcreteTokenizer(BaseTokenizer):
"""Concrete implementation for testing BaseTokenizer."""
def train(self, texts: list, vocab_size: int = 1000, **kwargs):
"""Dummy implementation for testing."""
pass
def encode(self, text: str, **kwargs) -> list:
"""Dummy implementation for testing."""
return [1, 2, 3]
def decode(self, tokens: list, **kwargs) -> str:
"""Dummy implementation for testing."""
return "decoded text"
@@ -24,33 +24,33 @@ class ConcreteTokenizer(BaseTokenizer):
class TestBaseTokenizer:
"""Test cases for BaseTokenizer."""
def test_initialization(self):
"""Test that BaseTokenizer can be initialized through concrete class."""
tokenizer = ConcreteTokenizer()
assert tokenizer is not None
assert tokenizer.vocab == {}
assert tokenizer.vocab_size == 0
def test_encode_implemented(self):
"""Test that encode method works in concrete implementation."""
tokenizer = ConcreteTokenizer()
result = tokenizer.encode("test text")
assert result == [1, 2, 3]
def test_decode_implemented(self):
"""Test that decode method works in concrete implementation."""
tokenizer = ConcreteTokenizer()
result = tokenizer.decode([1, 2, 3])
assert result == "decoded text"
def test_get_vocab_size(self):
"""Test that get_vocab_size method works."""
tokenizer = ConcreteTokenizer()
tokenizer.vocab = {"a": 0, "b": 1, "c": 2}
tokenizer.vocab_size = 3
assert tokenizer.get_vocab_size() == 3
def test_get_vocab(self):
"""Test that get_vocab method works."""
tokenizer = ConcreteTokenizer()

View File

@@ -10,18 +10,18 @@ from llm.tokenizers import BPETokenizer
class TestBPETokenizer:
"""Test cases for BPETokenizer."""
@pytest.fixture
def sample_texts(self):
"""Sample texts for training tokenizer."""
return [
"Искусственный интеллект",
"Нейронные сети",
"Нейронные сети",
"Машинное обучение",
"Глубокое обучение",
"Трансформеры"
"Трансформеры",
]
@pytest.fixture
def trained_tokenizer(self, sample_texts):
"""Create and train a BPE tokenizer."""
@@ -29,128 +29,130 @@ class TestBPETokenizer:
tokenizer.train(
texts=sample_texts,
vocab_size=100,
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"],
)
return tokenizer
def test_initialization(self):
"""Test that BPETokenizer can be initialized."""
tokenizer = BPETokenizer()
assert tokenizer is not None
def test_train_tokenizer(self, sample_texts):
"""Test that tokenizer can be trained."""
tokenizer = BPETokenizer()
tokenizer.train(
texts=sample_texts,
vocab_size=50,
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"],
)
assert tokenizer.get_vocab_size() > 0
assert len(tokenizer.get_vocab()) == tokenizer.get_vocab_size()
def test_encode_decode(self, trained_tokenizer):
"""Test encoding and decoding text."""
text = "Искусственный интеллект"
# Encode text
tokens = trained_tokenizer.encode(text)
assert isinstance(tokens, list)
assert len(tokens) > 0
assert all(isinstance(token, int) for token in tokens)
# Decode tokens
decoded_text = trained_tokenizer.decode(tokens)
assert isinstance(decoded_text, str)
# Decoded text should be similar to original (may have special tokens)
assert len(decoded_text) > 0
def test_encode_with_special_tokens(self, trained_tokenizer):
"""Test encoding with special tokens."""
text = "Нейронные сети"
# Without special tokens
tokens_no_special = trained_tokenizer.encode(text, add_special_tokens=False)
# With special tokens
tokens_with_special = trained_tokenizer.encode(text, add_special_tokens=True)
# Should have more tokens when special tokens are added
assert len(tokens_with_special) >= len(tokens_no_special)
def test_vocab_size(self, trained_tokenizer):
"""Test vocabulary size."""
vocab_size = trained_tokenizer.get_vocab_size()
assert isinstance(vocab_size, int)
assert vocab_size > 0
vocab = trained_tokenizer.get_vocab()
assert isinstance(vocab, dict)
assert len(vocab) == vocab_size
def test_special_tokens(self, trained_tokenizer):
"""Test that special tokens are in vocabulary."""
vocab = trained_tokenizer.get_vocab()
# Check that special tokens are in vocabulary
special_tokens = ["<pad>", "<unk>", "<bos>", "<eos>"]
for token in special_tokens:
assert token in vocab
assert isinstance(vocab[token], int)
def test_save_load(self, trained_tokenizer, sample_texts):
"""Test saving and loading tokenizer."""
with tempfile.TemporaryDirectory() as temp_dir:
save_path = os.path.join(temp_dir, "test_tokenizer.json")
# Save tokenizer
trained_tokenizer.save(save_path)
assert os.path.exists(save_path)
# Load tokenizer
loaded_tokenizer = BPETokenizer.load(save_path)
assert loaded_tokenizer is not None
# Check that loaded tokenizer works the same
original_vocab = trained_tokenizer.get_vocab()
loaded_vocab = loaded_tokenizer.get_vocab()
assert original_vocab == loaded_vocab
assert trained_tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size()
assert (
trained_tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size()
)
# Test encoding consistency
text = sample_texts[0]
original_tokens = trained_tokenizer.encode(text)
loaded_tokens = loaded_tokenizer.encode(text)
assert original_tokens == loaded_tokens
def test_unknown_tokens(self, trained_tokenizer):
"""Test handling of unknown tokens."""
# Use text that likely contains unknown subwords
text = "xyzabc123" # Random text that shouldn't be in training data
tokens = trained_tokenizer.encode(text)
assert len(tokens) > 0
# Should be able to decode back (even if it's mostly unk tokens)
decoded = trained_tokenizer.decode(tokens)
assert isinstance(decoded, str)
def test_empty_text(self, trained_tokenizer):
"""Test encoding and decoding empty text."""
tokens = trained_tokenizer.encode("")
assert isinstance(tokens, list)
decoded = trained_tokenizer.decode([])
assert decoded == ""
def test_tokenize_method(self, trained_tokenizer):
"""Test the tokenize method."""
text = "Искусственный интеллект"
tokens = trained_tokenizer.tokenize(text)
assert isinstance(tokens, list)
assert len(tokens) > 0
assert all(isinstance(token, str) for token in tokens)