feat: initial project setup with LLM architecture and HF integration

- Add LLM library with GPT model implementation
- Add hf-proxy for HuggingFace integration
- Add experiments for training and generation
- Add comprehensive documentation and examples
- Configure uv workspace with proper dependencies
This commit is contained in:
Sergey Penkovsky
2025-10-04 22:40:21 +03:00
commit ec07546ea8
54 changed files with 9337 additions and 0 deletions

10
llm/.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
llm/.python-version Normal file
View File

@@ -0,0 +1 @@
3.10

0
llm/README.md Normal file
View File

17
llm/pyproject.toml Normal file
View File

@@ -0,0 +1,17 @@
[project]
name = "llm"
version = "0.1.0"
description = "Research library for LLM architectures"
readme = "README.md"
authors = [
{ name = "Sergey Penkovsky", email = "sergey.penkovsky@gmail.com" }
]
requires-python = ">=3.10"
dependencies = [
"torch>=2.3.0",
"numpy>=1.24.0",
]
[build-system]
requires = ["uv_build>=0.8.22,<0.9.0"]
build-backend = "uv_build"

2
llm/src/llm/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from llm!"

View File

View File

@@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from llm!"

View File

@@ -0,0 +1,20 @@
# llm/core/base_model.py
import torch.nn as nn
from abc import ABC, abstractmethod
class BaseModel(nn.Module, ABC):
"""Базовый класс для всех LLM."""
def __init__(self, config):
super().__init__()
self.config = config
@abstractmethod
def forward(self, input_ids, attention_mask=None):
"""Прямой проход модели."""
pass
@abstractmethod
def generate(self, input_ids, max_length=50):
"""Генерация текста (greedy или sampling)."""
pass

View File

@@ -0,0 +1,96 @@
from torch import nn
import torch
from .feed_forward import FeedForward
from .multi_head_attention import MultiHeadAttention
class Decoder(nn.Module):
"""
Декодер трансформера - ключевой компонент архитектуры Transformer.
Предназначен для:
- Обработки последовательностей с учетом контекста (самовнимание)
- Постепенного генерирования выходной последовательности
- Учета масок для предотвращения "заглядывания в будущее"
Алгоритм работы:
1. Входной тензор (batch_size, seq_len, emb_size)
2. Многоголовое внимание с residual connection и LayerNorm
3. FeedForward сеть с residual connection и LayerNorm
4. Выходной тензор (batch_size, seq_len, emb_size)
Основные характеристики:
- Поддержка масок внимания
- Residual connections для стабилизации градиентов
- Layer Normalization после каждого sub-layer
- Конфигурируемые параметры внимания
Примеры использования:
1. Базовый случай:
>>> decoder = Decoder(num_heads=8, emb_size=512, head_size=64, max_seq_len=1024)
>>> x = torch.randn(1, 10, 512) # [batch, seq_len, emb_size]
>>> output = decoder(x)
>>> print(output.shape)
torch.Size([1, 10, 512])
2. С маской внимания:
>>> mask = torch.tril(torch.ones(10, 10)) # Нижнетреугольная маска
>>> output = decoder(x, mask)
3. Инкрементальное декодирование:
>>> for i in range(10):
>>> output = decoder(x[:, :i+1, :], mask[:i+1, :i+1])
"""
def __init__(self,
num_heads: int,
emb_size: int,
head_size: int,
max_seq_len: int,
dropout: float = 0.1
):
"""
Инициализация декодера.
Параметры:
num_heads: int - количество голов внимания
emb_size: int - размерность эмбеддингов
head_size: int - размерность каждой головы внимания
max_seq_len: int - максимальная длина последовательности
dropout: float (default=0.1) - вероятность dropout
"""
super().__init__()
self._heads = MultiHeadAttention(
num_heads=num_heads,
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len,
dropout=dropout
)
self._ff = FeedForward(emb_size=emb_size, dropout=dropout)
self._norm1 = nn.LayerNorm(emb_size)
self._norm2 = nn.LayerNorm(emb_size)
def forward(self, x: torch.Tensor, mask: torch.Tensor = None) -> torch.Tensor:
"""
Прямой проход через декодер.
Вход:
x: torch.Tensor - входной тензор [batch_size, seq_len, emb_size]
mask: torch.Tensor (optional) - маска внимания [seq_len, seq_len]
Возвращает:
torch.Tensor - выходной тензор [batch_size, seq_len, emb_size]
Алгоритм forward:
1. Применяем MultiHeadAttention к входу
2. Добавляем residual connection и LayerNorm
3. Применяем FeedForward сеть
4. Добавляем residual connection и LayerNorm
"""
# Self-Attention блок
attention = self._heads(x, mask)
out = self._norm1(attention + x)
# FeedForward блок
ffn_out = self._ff(out)
return self._norm2(ffn_out + out)

View File

@@ -0,0 +1,80 @@
from torch import nn
import torch
class FeedForward(nn.Module):
"""
Слой прямой связи (Feed Forward Network) для архитектуры трансформеров.
Этот слой состоит из двух линейных преобразований с расширением внутренней размерности
в 4 раза и механизмом dropout для регуляризации. Между линейными слоями применяется
активация ReLU.
Алгоритм работы:
1. Входной тензор x (размерность: [batch_size, seq_len, emb_size])
2. Линейное преобразование: emb_size -> 4*emb_size
3. Активация ReLU
4. Линейное преобразование: 4*emb_size -> emb_size
5. Применение dropout
6. Возврат результата (размерность: [batch_size, seq_len, emb_size])
Предназначение:
- Добавляет нелинейность в архитектуру трансформера
- Обеспечивает взаимодействие между различными размерностями эмбеддингов
- Работает независимо для каждого токена в последовательности
Примеры использования:
>>> # Инициализация слоя
>>> ff = FeedForward(emb_size=512, dropout=0.1)
>>>
>>> # Прямой проход
>>> x = torch.randn(32, 10, 512) # [batch_size, seq_len, emb_size]
>>> output = ff(x)
>>> print(output.shape) # torch.Size([32, 10, 512])
>>>
>>> # Работа с разными типами данных
>>> x_double = torch.randn(32, 10, 512, dtype=torch.float64)
>>> output_double = ff(x_double)
>>> print(output_double.dtype) # torch.float64
"""
def __init__(self, emb_size: int, dropout: float = 0.1):
"""
Инициализация слоя Feed Forward Network.
Args:
emb_size: Размерность входных эмбеддингов
dropout: Вероятность dropout для регуляризации (по умолчанию: 0.1)
"""
super().__init__()
# Первый линейный слой (расширение размерности)
self._layer1 = nn.Linear(emb_size, emb_size * 4)
# ReLU активация
self._relu = nn.ReLU()
# Второй линейный слой (сжатие обратно)
self._layer2 = nn.Linear(emb_size * 4, emb_size)
# Dropout
self._dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor):
"""
Прямой проход через слой Feed Forward Network.
Args:
x: Входной тензор размерности [batch_size, seq_len, emb_size]
Returns:
Тензор той же размерности, что и входной
"""
# Сохраняем dtype входных данных
input_dtype = x.dtype
# Приводим веса к нужному типу если необходимо
if input_dtype != self._layer1.weight.dtype:
self._layer1 = self._layer1.to(dtype=input_dtype)
self._layer2 = self._layer2.to(dtype=input_dtype)
# Пропустим тензор x по очереди через все созданные слои
x = self._layer1(x)
x = self._relu(x)
x = self._layer2(x)
return self._dropout(x)

View File

@@ -0,0 +1,84 @@
import torch
from torch import nn
import torch.nn.functional as F
from math import sqrt
class HeadAttention(nn.Module):
"""
Реализация одного головного механизма внимания из архитектуры Transformer.
Выполняет scaled dot-product attention с маскированием будущих позиций (causal attention).
Основной алгоритм:
1. Линейные преобразования входных данных в Q (query), K (key), V (value)
2. Вычисление scores = Q·K^T / sqrt(d_k)
3. Применение causal маски (заполнение -inf будущих позиций)
4. Softmax для получения весов внимания
5. Умножение весов на значения V
Пример использования:
>>> attention = HeadAttention(emb_size=64, head_size=32, max_seq_len=128)
>>> x = torch.randn(1, 10, 64) # [batch_size, seq_len, emb_size]
>>> output = attention(x) # [1, 10, 32]
Параметры:
emb_size (int): Размер входного эмбеддинга
head_size (int): Размерность выхода головы внимания
max_seq_len (int): Максимальная длина последовательности
Примечания:
- Использует нижнетреугольную маску для предотвращения "заглядывания в будущее"
- Автоматически адаптируется к разным версиям PyTorch
- Поддерживает batch-обработку входных данных
"""
def __init__(self, emb_size: int, head_size: int, max_seq_len: int):
super().__init__()
self._emb_size = emb_size
self._head_size = head_size
self._max_seq_len = max_seq_len
# Линейные преобразования для Q, K, V
self._k = nn.Linear(emb_size, head_size)
self._q = nn.Linear(emb_size, head_size)
self._v = nn.Linear(emb_size, head_size)
# Создание causal маски
mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
self.register_buffer('_tril_mask', mask.bool() if hasattr(torch, 'bool') else mask.byte())
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Прямой проход через слой внимания.
Аргументы:
x (torch.Tensor): Входной тензор формы [batch_size, seq_len, emb_size]
Возвращает:
torch.Tensor: Выходной тензор формы [batch_size, seq_len, head_size]
Исключения:
ValueError: Если длина последовательности превышает max_seq_len
Пример внутренних преобразований:
Для входа x.shape = [2, 5, 64]:
1. Q/K/V преобразования -> [2, 5, 32]
2. Scores = Q·K^T -> [2, 5, 5]
3. После маски и softmax -> [2, 5, 5]
4. Умножение на V -> [2, 5, 32]
"""
seq_len = x.shape[1]
if seq_len > self._max_seq_len:
raise ValueError(f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}")
# 1. Линейные преобразования
k = self._k(x) # [B, T, hs]
q = self._q(x) # [B, T, hs]
# 2. Вычисление scores
scores = q @ k.transpose(-2, -1) / sqrt(self._head_size)
# 3. Применение causal маски
scores = scores.masked_fill(~self._tril_mask[:seq_len, :seq_len], float('-inf'))
# 4. Softmax и умножение на V
weights = F.softmax(scores, dim=-1)
return weights @ self._v(x)

View File

@@ -0,0 +1,104 @@
from torch import nn
import torch
from .head_attention import HeadAttention
class MultiHeadAttention(nn.Module):
"""
Реализация механизма многоголового внимания (Multi-Head Attention) из архитектуры Transformer.
Основные характеристики:
- Параллельная обработка входных данных несколькими головами внимания
- Поддержка маскирования (causal mask и пользовательские маски)
- Финальная проекция с dropout регуляризацией
Математическое описание:
MultiHead(Q, K, V) = Concat(head_1, ..., head_h)W^O
где head_i = Attention(QW_i^Q, KW_i^K, VW_i^V)
Примеры использования:
1. Базовый пример:
>>> mha = MultiHeadAttention(num_heads=8, emb_size=512, head_size=64, max_seq_len=1024)
>>> x = torch.randn(2, 50, 512) # [batch_size, seq_len, emb_size]
>>> output = mha(x) # [2, 50, 512]
2. С использованием маски:
>>> mask = torch.tril(torch.ones(50, 50)) # Causal mask
>>> output = mha(x, mask)
3. Интеграция в Transformer:
>>> # В составе Transformer слоя
>>> self.attention = MultiHeadAttention(...)
>>> x = self.attention(x, mask)
"""
def __init__(self, num_heads: int, emb_size: int, head_size: int, max_seq_len: int, dropout: float = 0.1):
"""
Инициализация многоголового внимания.
Параметры:
num_heads (int): Количество голов внимания. Типичные значения: 4-16
emb_size (int): Размерность входных и выходных эмбеддингов
head_size (int): Размерность каждой головы внимания (обычно emb_size // num_heads)
max_seq_len (int): Максимальная длина последовательности
dropout (float): Вероятность dropout (по умолчанию 0.1)
Контрольные значения:
- num_heads * head_size должно равняться emb_size
- head_size обычно выбирают 32-128
- max_seq_len зависит от задачи (512 для BERT, 2048 для GPT-3)
"""
super().__init__()
self._heads = nn.ModuleList([
HeadAttention(
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len
) for _ in range(num_heads)
])
self._layer = nn.Linear(head_size * num_heads, emb_size)
self._dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor, mask: torch.Tensor = None):
"""
Прямой проход через слой многоголового внимания.
Подробное описание преобразований тензоров:
1. Входной тензор [batch_size, seq_len, emb_size] разделяется на N голов:
- Каждая голова получает тензор [batch_size, seq_len, head_size]
2. Каждая голова вычисляет attention:
- Вход: [batch_size, seq_len, head_size]
- Выход: [batch_size, seq_len, head_size]
3. Конкатенация результатов:
- Объединенный выход: [batch_size, seq_len, num_heads * head_size]
4. Линейная проекция:
- Выход: [batch_size, seq_len, emb_size]
5. Применение dropout
Аргументы:
x (torch.Tensor): Входной тензор формы [batch_size, seq_len, emb_size]
mask (torch.Tensor, optional): Маска внимания формы [seq_len, seq_len]
Возвращает:
torch.Tensor: Выходной тензор формы [batch_size, seq_len, emb_size]
Пример преобразований для emb_size=512, num_heads=8:
Вход: [4, 100, 512]
-> Каждая голова: [4, 100, 64]
-> После внимания: 8 x [4, 100, 64]
-> Конкатенация: [4, 100, 512]
-> Проекция: [4, 100, 512]
-> Dropout: [4, 100, 512]
"""
# 1. Вычисляем attention для каждой головы
attention_outputs = [head(x) for head in self._heads]
# 2. Объединяем результаты всех голов
concatenated_attention = torch.cat(attention_outputs, dim=-1)
# 3. Проецируем в пространство эмбеддингов
projected_output = self._layer(concatenated_attention)
# 4. Применяем dropout для регуляризации
final_output = self._dropout(projected_output)
return final_output

View File

@@ -0,0 +1,90 @@
import torch
from torch import nn, Tensor
class PositionalEmbeddings(nn.Module):
"""
Класс для создания позиционных эмбеддингов через nn.Embedding.
Позиционные эмбеддинги используются в нейросетях для передачи информации
о позиции элементов в последовательности (например, в Transformer).
Особенности:
- Создаёт обучаемые позиционные эмбеддинги фиксированной длины
- Поддерживает обработку последовательностей переменной длины
- Автоматически размещает вычисления на том же устройстве, что и параметры
Args:
max_seq_len (int): Максимальная длина последовательности
emb_size (int): Размерность векторного представления позиций
Пример использования:
>>> pos_encoder = PositionalEmbeddings(max_seq_len=100, emb_size=256)
>>> # Получить эмбеддинги для последовательности из 10 элементов
>>> embeddings = pos_encoder(10) # Tensor shape: [10, 256]
>>> # Использование в модели
>>> class MyModel(nn.Module):
... def __init__(self):
... super().__init__()
... self.pos_emb = PositionalEmbeddings(100, 256)
... def forward(self, x):
... pos = self.pos_emb(x.size(1))
... return x + pos # Добавляем позиционную информацию
"""
def __init__(self, max_seq_len: int, emb_size: int):
super().__init__()
self.max_seq_len = max_seq_len
self.emb_size = emb_size
self.embedding = nn.Embedding(
num_embeddings=max_seq_len,
embedding_dim=emb_size
)
def forward(self, seq_len: int) -> Tensor:
"""
Возвращает позиционные эмбеддинги для заданной длины последовательности.
Args:
seq_len (int): Длина последовательности (1 <= seq_len <= max_seq_len)
Returns:
Tensor: Тензор позиционных эмбеддингов формы [seq_len, emb_size]
Raises:
IndexError: Если seq_len выходит за допустимые границы
Пример:
>>> pos_encoder = PositionalEmbeddings(100, 64)
>>> emb = pos_encoder(10) # Тензор 10x64
"""
if seq_len < 1 or seq_len > self.max_seq_len:
raise IndexError(f"Длина {seq_len} должна быть от 1 до {self.max_seq_len}")
positions = torch.arange(seq_len, device=self.embedding.weight.device)
return self.embedding(positions)
if __name__ == "__main__":
# Демонстрация работы
print("Пример использования PositionalEmbeddings:")
pos_emb = PositionalEmbeddings(max_seq_len=50, emb_size=128)
# Пример 1: Базовое использование
print("\n1. Базовый пример:")
emb = pos_emb(10)
print(f"Форма выходного тензора: {emb.shape}")
print(f"Среднее значение: {emb.mean().item():.4f}")
# Пример 2: Интеграция с моделью
print("\n2. Пример интеграции с моделью:")
class DemoModel(nn.Module):
def __init__(self):
super().__init__()
self.pos_emb = PositionalEmbeddings(50, 128)
def forward(self, x):
pos = self.pos_emb(x.size(1))
return x + pos # Добавляем позиционную информацию
model = DemoModel()
input_tensor = torch.randn(2, 10, 128) # [batch, seq, features]
output = model(input_tensor)
print(f"Вход: {input_tensor.shape}, Выход: {output.shape}")

View File

@@ -0,0 +1,68 @@
import torch
from torch import nn
from torch import Tensor
class TokenEmbeddings(nn.Module):
"""
Модуль PyTorch для преобразования индексов токенов в векторные представления (эмбеддинги).
Преобразует целочисленные индексы токенов в обучаемые векторные представления фиксированного размера.
Обычно используется как первый слой в нейронных сетях для задач NLP.
Аргументы:
vocab_size (int): Размер словаря (количество уникальных токенов)
emb_size (int): Размерность векторных представлений
Форматы данных:
- Вход: тензор (batch_size, seq_len) индексов токенов
- Выход: тензор (batch_size, seq_len, emb_size) векторных представлений
Примеры использования:
>>> embedding_layer = TokenEmbeddings(vocab_size=10000, emb_size=256)
>>> tokens = torch.tensor([[1, 2, 3], [4, 5, 6]]) # batch_size=2, seq_len=3
>>> embeddings = embedding_layer(tokens)
>>> embeddings.shape
torch.Size([2, 3, 256])
Примечание:
- Индексы должны быть в диапазоне [0, vocab_size-1]
- Эмбеддинги инициализируются случайно и обучаются в процессе тренировки модели
"""
def __init__(self, vocab_size: int, emb_size: int):
super().__init__()
self._embedding = nn.Embedding(
num_embeddings=vocab_size,
embedding_dim=emb_size
)
def forward(self, x: Tensor) -> Tensor:
return self._embedding(x)
@property
def num_embeddings(self) -> int:
"""Возвращает размер словаря"""
return self._embedding.num_embeddings
@property
def embedding_dim(self) -> int:
"""Возвращает размерность эмбеддингов"""
return self._embedding.embedding_dim
if __name__ == "__main__":
# Пример использования
embedding = TokenEmbeddings(vocab_size=100, emb_size=128)
# Создаем тензор с индексами в пределах vocab_size (0-99)
tensor = torch.tensor([
[11, 45, 76, 34],
[34, 67, 45, 54]
])
# Проверяем индексы
if (tensor >= 100).any():
raise ValueError("Some indices are out of vocabulary range (vocab_size=100)")
output = embedding(tensor)
print("Embeddings shape:", output.shape)
print(f"{output.shape} | {output.mean().item():.11f}") # Формат как в ТЗ

View File

@@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from llm!"

View File

View File

View File

View File

@@ -0,0 +1,3 @@
from .gpt import GPT
__all__ = ["GPT"]

View File

@@ -0,0 +1,264 @@
# llm/models/gpt/gpt2.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from llm.core.base_model import BaseModel
from llm.core.decoder import Decoder
from llm.core.token_embeddings import TokenEmbeddings
from llm.core.positional_embeddings import PositionalEmbeddings
class GPT(BaseModel):
"""GPT-like трансформер для генерации текста
Args:
vocab_size: Размер словаря
max_seq_len: Макс. длина последовательности
emb_size: Размерность эмбеддингов
num_heads: Количество голов внимания
head_size: Размерность голов внимания
num_layers: Количество слоёв декодера
dropout: Вероятность dropout (default=0.1)
device: Устройство (default='cpu')
"""
def __init__(self, config):
super().__init__(config)
# Инициализация слоев
self._max_seq_len = config["max_position_embeddings"]
self._token_embeddings = TokenEmbeddings(
vocab_size=config["vocab_size"],
emb_size=config["embed_dim"]
)
self._position_embeddings = PositionalEmbeddings(
max_seq_len=config["max_position_embeddings"],
emb_size=config["embed_dim"]
)
self._dropout = nn.Dropout(config["dropout"])
# head_size = emb_size // num_heads
self._decoders = nn.ModuleList([Decoder(
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
max_seq_len=config["max_position_embeddings"],
dropout=config["dropout"]
) for _ in range(config["num_layers"])])
self._linear = nn.Linear(config["embed_dim"], config["vocab_size"])
@property
def max_seq_len(self):
"""Возвращает максимальную длину последовательности."""
return self._max_seq_len
def forward(self, x: torch.Tensor, attention_mask=None) -> torch.Tensor:
"""Прямой проход через GPT
Args:
x: Входной тензор [batch_size, seq_len]
Returns:
Тензор логитов [batch_size, seq_len, vocab_size]
"""
# Проверка длины последовательности
if x.size(1) > self._max_seq_len:
raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self._max_seq_len}")
# Эмбеддинги токенов и позиций
tok_out = self._token_embeddings(x) # [batch, seq_len, emb_size]
pos_out = self._position_embeddings(x.size(1)) # [seq_len, emb_size]
# Комбинирование
out = self._dropout(tok_out + pos_out.unsqueeze(0)) # [batch, seq_len, emb_size]
# Стек декодеров
for decoder in self._decoders:
out = decoder(out)
return self._linear(out) # [batch, seq_len, vocab_size]
# def forward(self, input_ids, attention_mask=None):
# B, T = input_ids.size()
# pos = torch.arange(0, T, device=input_ids.device).unsqueeze(0)
#
# x = self.token_emb(input_ids) + self.pos_emb(pos)
#
# for block in self.blocks:
# x = block(x, attention_mask)
#
# x = self.ln_f(x)
# logits = self.head(x)
# return logits
def generate(self,
x: torch.Tensor,
max_new_tokens: int,
do_sample: bool,
temperature: float = 1.0,
top_k: int = None,
top_p: float = None,
attention_mask: torch.Tensor = None, # Добавляем для совместимости с HF
**kwargs # Игнорируем остальные параметры
) -> torch.Tensor:
"""Авторегрессивная генерация текста.
Параметры:
x: Входной тензор с индексами токенов формы [batch_size, seq_len],
где batch_size - размер батча, seq_len - длина последовательности.
max_new_tokens: Максимальное количество новых токенов для генерации.
do_sample: Флаг выбора режима генерации:
- True: вероятностное сэмплирование
- False: жадный поиск (argmax)
temperature: Параметр температуры для сэмплирования:
- >1.0 - более случайные результаты
- 1.0 - нейтральное значение
- <1.0 - более предсказуемые результаты
Должна быть > 0 (по умолчанию: 1.0)
top_k: Если задан (и do_sample=True), используется top-k сэмплирование:
- Выбираются только top_k самых вероятных токенов
- Остальным токенам устанавливается вероятность 0
- None: отключено (по умолчанию)
top_p: Если задан (и do_sample=True), используется nucleus (top-p) сэмплирование:
- Выбираются токены с кумулятивной вероятностью ≤ top_p
- Гарантируется, что хотя бы один токен остаётся (даже если его вероятность > top_p)
- None: отключено (по умолчанию)
- Должен быть в диапазоне (0, 1]
Возвращает:
torch.Tensor: Тензор с расширенной последовательностью токенов формы
[batch_size, seq_len + max_new_tokens]
Исключения:
ValueError: Если входная последовательность длиннее max_seq_len
ValueError: Если temperature <= 0
ValueError: Если одновременно заданы top_k и top_p
ValueError: Если top_k задан и ≤ 0
ValueError: Если top_p задан и не в диапазоне (0, 1]
Примеры:
>>> # Жадная генерация
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=False)
>>>
>>> # Вероятностная генерация с top-k
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, top_k=50)
>>>
>>> # Nucleus sampling (top-p)
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, top_p=0.9)
>>>
>>> # Комбинация температуры и top-k
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True,
... temperature=0.7, top_k=50)
Примечания:
1. Для детерминированных результатов в режиме сэмплирования
зафиксируйте random seed (torch.manual_seed).
2. Температура влияет только на режим сэмплирования (do_sample=True).
3. Одновременное использование top_k и top_p запрещено.
4. При do_sample=False параметры top_k, top_p и temperature игнорируются.
Args:
x (torch.Tensor): Входной тензор с индексами токенов формы [batch_size, seq_len],
где batch_size - размер батча, seq_len - длина последовательности.
max_new_tokens (int): Максимальное количество новых токенов для генерации.
do_sample (bool): Флаг выбора режима генерации:
- True: вероятностное сэмплирование
- False: жадный поиск (argmax)
temperature (float): Параметр температуры для сэмплирования:
- >1.0 - более случайные результаты
- 1.0 - нейтральное значение
- <1.0 - более предсказуемые результаты
Должна быть > 0 (по умолчанию: 1.0)
Returns:
torch.Tensor: Тензор с расширенной последовательностью токенов формы
[batch_size, seq_len + max_new_tokens]
Raises:
ValueError: Если входная последовательность длиннее max_seq_len
ValueError: Если temperature <= 0
Examples:
>>> # Жадная генерация
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=False)
>>>
>>> # Вероятностная генерация с температурой
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, temperature=0.7)
>>>
>>> # Более случайная генерация
>>> output = model.generate(input_ids, max_new_tokens=10, do_sample=True, temperature=1.5)
Note:
Для детерминированных результатов в режиме сэмплирования
зафиксируйте random seed (torch.manual_seed).
Температура влияет только на режим сэмплирования (do_sample=True).
"""
for _ in range(max_new_tokens):
# 1. Обрезаем вход, если последовательность слишком длинная
x_cond = x[:, -self._max_seq_len:]
# 2. Передаем последовательность в метод forward класса GPT и полуаем логиты.
logits = self.forward(x_cond)
# 3. Берем логиты для последнего токена
last_logits = logits[:, -1, :] # [batch_size, vocab_size]
# Масштабируем логиты температурой
if temperature > 0:
logits_scaled = last_logits / temperature
else:
logits_scaled = last_logits
if do_sample == True and top_k != None:
_, topk_indices = torch.topk(logits_scaled, top_k, dim=-1)
# # Заменим все НЕ top-k логиты на -inf
masked_logits = logits_scaled.clone()
vocab_size = logits_scaled.size(-1)
# создаём маску: 1, если токен НЕ в topk_indices
mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
mask.scatter_(1, topk_indices, 0) # 0 там, где top-k индексы
masked_logits[mask.byte()] = float('-inf')
logits_scaled = masked_logits
if do_sample == True and top_p != None:
# 1. Применим softmax, чтобы получить вероятности:
probs = F.softmax(logits_scaled, dim=-1) # [B, vocab_size]
# 2. Отсортируем токены по убыванию вероятностей:
sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
# 3. Посчитаем кумулятивную сумму вероятностей:
cum_probs = torch.cumsum(sorted_probs, dim=-1) # [B, vocab_size]
# 4. Определим маску: оставить токены, пока сумма < top_p
sorted_mask = (cum_probs <= top_p).byte() # [B, vocab_size]
# Гарантируем, что хотя бы первый токен останется
sorted_mask[:, 0] = 1
# 5. Преобразуем маску обратно в оригинальный порядок:
# Создаём полную маску из 0
mask = torch.zeros_like(probs, dtype=torch.uint8)
# Устанавливаем 1 в местах нужных токенов
mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
# 6. Зануляем логиты токенов вне топ-p:
logits_scaled[~mask] = float('-inf')
# 4. Применяем Softmax
probs = F.softmax(logits_scaled, dim=-1) # [batch_size, vocab_size]
if do_sample == True:
# 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
next_token = torch.multinomial(probs, num_samples=1) # [batch_size, 1]
else:
# 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
next_token = torch.argmax(probs, dim=-1, keepdim=True) # [batch_size, 1]
# 6. Добавляем его к последовательности
x = torch.cat([x, next_token], dim=1) # [batch_size, seq_len+1]
return x
# def generate(self, input_ids, max_length=50):
# for _ in range(max_length):
# logits = self.forward(input_ids)
# next_token = torch.argmax(logits[:, -1, :], dim=-1, keepdim=True)
# input_ids = torch.cat([input_ids, next_token], dim=1)
# return input_ids

0
llm/src/llm/py.typed Normal file
View File

View File

@@ -0,0 +1,19 @@
"""
Модуль токенизаторов для библиотеки llm.
Предоставляет различные реализации токенизаторов:
- BPE (Byte Pair Encoding) токенизатор
- Базовый интерфейс для создания собственных токенизаторов
Примеры использования:
>>> from llm.tokenizers import BPETokenizer, SimpleBPETokenizer
>>> tokenizer = BPETokenizer()
>>> tokenizer.train(["текст для обучения", "еще текст"])
>>> tokens = tokenizer.encode("привет мир")
>>> text = tokenizer.decode(tokens)
"""
from .base_tokenizer import BaseTokenizer
from .bpe_tokenizer import BPETokenizer, SimpleBPETokenizer
__all__ = ["BaseTokenizer", "BPETokenizer", "SimpleBPETokenizer"]

View File

@@ -0,0 +1,174 @@
"""
Базовый класс для токенизаторов.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import json
class BaseTokenizer(ABC):
"""
Абстрактный базовый класс для всех токенизаторов.
Определяет общий интерфейс для токенизации текста.
"""
def __init__(self):
self.vocab: Dict[str, int] = {}
self.inverse_vocab: Dict[int, str] = {}
self.vocab_size: int = 0
# Специальные токены
self.pad_token = "<pad>"
self.unk_token = "<unk>"
self.bos_token = "<bos>"
self.eos_token = "<eos>"
self.pad_token_id: Optional[int] = None
self.unk_token_id: Optional[int] = None
self.bos_token_id: Optional[int] = None
self.eos_token_id: Optional[int] = None
@abstractmethod
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""
Обучение токенизатора на текстах.
Args:
texts: Список текстов для обучения
vocab_size: Желаемый размер словаря
**kwargs: Дополнительные параметры обучения
"""
pass
@abstractmethod
def encode(self, text: str, **kwargs) -> List[int]:
"""
Кодирование текста в последовательность токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры кодирования
Returns:
List[int]: Список идентификаторов токенов
"""
pass
@abstractmethod
def decode(self, tokens: List[int], **kwargs) -> str:
"""
Декодирование последовательности токенов в текст.
Args:
tokens: Список идентификаторов токенов
**kwargs: Дополнительные параметры декодирования
Returns:
str: Декодированный текст
"""
pass
def tokenize(self, text: str, **kwargs) -> List[str]:
"""
Токенизация текста в список строковых токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры
Returns:
List[str]: Список токенов
"""
token_ids = self.encode(text, **kwargs)
return [self.inverse_vocab.get(token_id, self.unk_token) for token_id in token_ids]
def get_vocab(self) -> Dict[str, int]:
"""Возвращает словарь токенизатора."""
return self.vocab.copy()
def get_vocab_size(self) -> int:
"""Возвращает размер словаря."""
return self.vocab_size
def add_special_tokens(self, special_tokens: List[str]):
"""
Добавляет специальные токены в словарь.
Args:
special_tokens: Список специальных токенов
"""
for token in special_tokens:
if token not in self.vocab:
token_id = len(self.vocab)
self.vocab[token] = token_id
self.inverse_vocab[token_id] = token
self.vocab_size += 1
# Обновляем ID специальных токенов
self.pad_token_id = self.vocab.get(self.pad_token)
self.unk_token_id = self.vocab.get(self.unk_token)
self.bos_token_id = self.vocab.get(self.bos_token)
self.eos_token_id = self.vocab.get(self.eos_token)
def save(self, filepath: str):
"""
Сохраняет токенизатор в файл.
Args:
filepath: Путь для сохранения
"""
config = {
'vocab': self.vocab,
'vocab_size': self.vocab_size,
'pad_token': self.pad_token,
'unk_token': self.unk_token,
'bos_token': self.bos_token,
'eos_token': self.eos_token,
'tokenizer_type': self.__class__.__name__
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, filepath: str):
"""
Загружает токенизатор из файла.
Args:
filepath: Путь к файлу
Returns:
BaseTokenizer: Загруженный токенизатор
"""
with open(filepath, 'r', encoding='utf-8') as f:
config = json.load(f)
# Создаем экземпляр токенизатора
tokenizer = cls()
tokenizer.vocab = config['vocab']
tokenizer.vocab_size = config['vocab_size']
tokenizer.pad_token = config['pad_token']
tokenizer.unk_token = config['unk_token']
tokenizer.bos_token = config['bos_token']
tokenizer.eos_token = config['eos_token']
# Создаем обратный словарь
tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()}
# Обновляем ID специальных токенов
tokenizer.pad_token_id = tokenizer.vocab.get(tokenizer.pad_token)
tokenizer.unk_token_id = tokenizer.vocab.get(tokenizer.unk_token)
tokenizer.bos_token_id = tokenizer.vocab.get(tokenizer.bos_token)
tokenizer.eos_token_id = tokenizer.vocab.get(tokenizer.eos_token)
return tokenizer
def __len__(self) -> int:
"""Возвращает размер словаря."""
return self.vocab_size
def __repr__(self) -> str:
return f"{self.__class__.__name__}(vocab_size={self.vocab_size})"

View File

@@ -0,0 +1,428 @@
"""
BPE (Byte Pair Encoding) токенизатор.
Реализация алгоритма BPE для токенизации текста.
"""
import re
from collections import defaultdict, Counter
from typing import List, Dict, Tuple, Optional
from .base_tokenizer import BaseTokenizer
class BPETokenizer(BaseTokenizer):
"""
BPE токенизатор для обработки текста.
Реализует алгоритм Byte Pair Encoding для создания субсловных токенов.
Примеры использования:
>>> tokenizer = BPETokenizer()
>>> tokenizer.train(["пример текста для обучения"], vocab_size=1000)
>>> tokens = tokenizer.encode("новый текст")
>>> text = tokenizer.decode(tokens)
"""
def __init__(self):
super().__init__()
self.merges: Dict[Tuple[str, str], int] = {}
self.pattern = r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+"""
self.compiled_pattern = re.compile(self.pattern, re.UNICODE)
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""
Обучение BPE токенизатора на текстах.
Args:
texts: Список текстов для обучения
vocab_size: Желаемый размер словаря
**kwargs: Дополнительные параметры
- min_frequency: Минимальная частота для мерджа
- special_tokens: Список специальных токенов
"""
# Инициализация базового словаря
self._initialize_vocab()
# Добавляем специальные токены если указаны
special_tokens = kwargs.get('special_tokens', [self.pad_token, self.unk_token, self.bos_token, self.eos_token])
self.add_special_tokens(special_tokens)
# Предобработка текстов
words = self._preprocess_texts(texts)
# Получаем начальные токены
vocab = self._get_initial_vocab(words)
# Выполняем BPE мерджи
self._perform_merges(vocab, vocab_size, kwargs.get('min_frequency', 2))
# Строим финальный словарь
self._build_final_vocab()
def _initialize_vocab(self):
"""Инициализирует базовый словарь."""
self.vocab.clear()
self.inverse_vocab.clear()
self.merges.clear()
self.vocab_size = 0
def _preprocess_texts(self, texts: List[str]) -> List[List[str]]:
"""
Предобработка текстов для обучения.
Args:
texts: Список текстов
Returns:
List[List[str]]: Предобработанные слова
"""
words = []
for text in texts:
# Базовая нормализация
text = text.lower().strip()
# Токенизация на слова
tokens = self.compiled_pattern.findall(text)
words.append(tokens)
return words
def _get_initial_vocab(self, words: List[List[str]]) -> Dict[str, int]:
"""
Создает начальный словарь из символов.
Args:
words: Список токенизированных текстов
Returns:
Dict[str, int]: Начальный словарь частот
"""
vocab = Counter()
for word_list in words:
for word in word_list:
# Разбиваем слово на символы и добавляем специальный символ конца слова
chars = list(word) + ['</w>']
vocab.update([''.join(chars[i:i+1]) for i in range(len(chars))])
return vocab
def _perform_merges(self, vocab: Dict[str, int], target_vocab_size: int, min_frequency: int):
"""
Выполняет BPE мерджи до достижения целевого размера словаря.
Args:
vocab: Начальный словарь
target_vocab_size: Целевой размер словаря
min_frequency: Минимальная частота для мерджа
"""
current_vocab_size = len(vocab) + len(self.vocab)
while current_vocab_size < target_vocab_size:
# Находим наиболее частую пару
pairs = self._get_stats(vocab)
if not pairs:
break
best_pair = max(pairs, key=pairs.get)
if pairs[best_pair] < min_frequency:
break
# Выполняем мердж
vocab = self._merge_vocab(vocab, best_pair)
self.merges[best_pair] = len(self.merges)
current_vocab_size += 1
def _get_stats(self, vocab: Dict[str, int]) -> Dict[Tuple[str, str], int]:
"""
Собирает статистику по парам символов.
Args:
vocab: Словарь токенов
Returns:
Dict[Tuple[str, str], int]: Частоты пар
"""
pairs = defaultdict(int)
for word, freq in vocab.items():
symbols = word.split()
for i in range(len(symbols) - 1):
pairs[symbols[i], symbols[i + 1]] += freq
return pairs
def _merge_vocab(self, vocab: Dict[str, int], pair: Tuple[str, str]) -> Dict[str, int]:
"""
Объединяет пару символов в словаре.
Args:
vocab: Исходный словарь
pair: Пара для объединения
Returns:
Dict[str, int]: Обновленный словарь
"""
new_vocab = {}
bigram = re.compile(r'(?<!\\S)' + re.escape(pair[0]) + r' ' + re.escape(pair[1]) + r'(?!\\S)')
replacement = pair[0] + pair[1]
for word in vocab:
new_word = bigram.sub(replacement, word)
new_vocab[new_word] = vocab[word]
return new_vocab
def _build_final_vocab(self):
"""Строит финальный словарь токенизатора."""
# Собираем все уникальные токены из мерджей
all_tokens = set()
# Добавляем специальные токены
all_tokens.update([self.pad_token, self.unk_token, self.bos_token, self.eos_token])
# Добавляем токены из мерджей
for pair in self.merges:
all_tokens.update(pair)
# Создаем словарь
for i, token in enumerate(sorted(all_tokens)):
self.vocab[token] = i
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
self.vocab_size = len(self.vocab)
# Обновляем ID специальных токенов
self.pad_token_id = self.vocab.get(self.pad_token)
self.unk_token_id = self.vocab.get(self.unk_token)
self.bos_token_id = self.vocab.get(self.bos_token)
self.eos_token_id = self.vocab.get(self.eos_token)
def encode(self, text: str, **kwargs) -> List[int]:
"""
Кодирует текст в последовательность токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры
- add_special_tokens: Добавлять специальные токены
Returns:
List[int]: Список идентификаторов токенов
"""
add_special_tokens = kwargs.get('add_special_tokens', False)
# Токенизация текста
tokens = self.compiled_pattern.findall(text)
# Применяем BPE к каждому токену
bpe_tokens = []
for token in tokens:
# Преобразуем токен в BPE представление
bpe_token = self._apply_bpe(token)
bpe_tokens.extend(bpe_token)
# Конвертируем в ID
token_ids = []
for token in bpe_tokens:
token_id = self.vocab.get(token, self.unk_token_id)
if token_id is not None:
token_ids.append(token_id)
# Добавляем специальные токены если нужно
if add_special_tokens:
if self.bos_token_id is not None:
token_ids.insert(0, self.bos_token_id)
if self.eos_token_id is not None:
token_ids.append(self.eos_token_id)
return token_ids
def _apply_bpe(self, token: str) -> List[str]:
"""
Применяет BPE к одному токену.
Args:
token: Входной токен
Returns:
List[str]: Список BPE токенов
"""
# Простая реализация - в реальной реализации нужно применять обученные мерджи
word = token + '</w>'
tokens = [word[i:i+1] for i in range(len(word))]
# Применяем мерджи (упрощенная версия)
# В полной реализации нужно применять все обученные мерджи
for pair in self.merges:
i = 0
while i < len(tokens) - 1:
if tokens[i] == pair[0] and tokens[i + 1] == pair[1]:
tokens[i] = tokens[i] + tokens[i + 1]
del tokens[i + 1]
else:
i += 1
return tokens
def decode(self, tokens: List[int], **kwargs) -> str:
"""
Декодирует последовательность токенов в текст.
Args:
tokens: Список идентификаторов токенов
**kwargs: Дополнительные параметры
- skip_special_tokens: Пропускать специальные токены
Returns:
str: Декодированный текст
"""
skip_special_tokens = kwargs.get('skip_special_tokens', True)
# Конвертируем ID в токены
token_strings = []
for token_id in tokens:
token = self.inverse_vocab.get(token_id, self.unk_token)
# Пропускаем специальные токены если нужно
if skip_special_tokens and token in [self.pad_token, self.unk_token, self.bos_token, self.eos_token]:
continue
token_strings.append(token)
# Объединяем токены в текст
text = ''.join(token_strings)
# Убираем маркер конца слова
text = text.replace('</w>', ' ')
return text.strip()
def save(self, filepath: str):
"""
Сохраняет BPE токенизатор в файл.
Args:
filepath: Путь для сохранения
"""
import json
config = {
'vocab': self.vocab,
'merges': {f"{k[0]} {k[1]}": v for k, v in self.merges.items()},
'vocab_size': self.vocab_size,
'pad_token': self.pad_token,
'unk_token': self.unk_token,
'bos_token': self.bos_token,
'eos_token': self.eos_token,
'pattern': self.pattern,
'tokenizer_type': self.__class__.__name__
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, filepath: str):
"""
Загружает BPE токенизатор из файла.
Args:
filepath: Путь к файлу
Returns:
BPETokenizer: Загруженный токенизатор
"""
import json
with open(filepath, 'r', encoding='utf-8') as f:
config = json.load(f)
tokenizer = cls()
tokenizer.vocab = config['vocab']
tokenizer.vocab_size = config['vocab_size']
tokenizer.pad_token = config['pad_token']
tokenizer.unk_token = config['unk_token']
tokenizer.bos_token = config['bos_token']
tokenizer.eos_token = config['eos_token']
tokenizer.pattern = config.get('pattern', tokenizer.pattern)
tokenizer.compiled_pattern = re.compile(tokenizer.pattern, re.UNICODE)
# Восстанавливаем мерджи
merges = config.get('merges', {})
tokenizer.merges = {}
for k, v in merges.items():
parts = k.split()
if len(parts) == 2:
tokenizer.merges[(parts[0], parts[1])] = v
# Создаем обратный словарь
tokenizer.inverse_vocab = {v: k for k, v in tokenizer.vocab.items()}
# Обновляем ID специальных токенов
tokenizer.pad_token_id = tokenizer.vocab.get(tokenizer.pad_token)
tokenizer.unk_token_id = tokenizer.vocab.get(tokenizer.unk_token)
tokenizer.bos_token_id = tokenizer.vocab.get(tokenizer.bos_token)
tokenizer.eos_token_id = tokenizer.vocab.get(tokenizer.eos_token)
return tokenizer
# Упрощенная версия для быстрого старта
class SimpleBPETokenizer(BPETokenizer):
"""
Упрощенная версия BPE токенизатора для демонстрации.
"""
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""Упрощенное обучение для демонстрации."""
# Инициализация базового словаря
self._initialize_vocab()
# Добавляем базовые токены
special_tokens = [self.pad_token, self.unk_token, self.bos_token, self.eos_token]
self.add_special_tokens(special_tokens)
# Простая реализация - собираем все символы
all_chars = set()
for text in texts:
all_chars.update(text)
# Добавляем символы в словарь
for char in sorted(all_chars):
if char not in self.vocab:
self.vocab[char] = len(self.vocab)
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
self.vocab_size = len(self.vocab)
# Обновляем ID специальных токенов
self.pad_token_id = self.vocab.get(self.pad_token)
self.unk_token_id = self.vocab.get(self.unk_token)
self.bos_token_id = self.vocab.get(self.bos_token)
self.eos_token_id = self.vocab.get(self.eos_token)
def encode(self, text: str, **kwargs) -> List[int]:
"""Упрощенное кодирование - разбиваем на символы."""
add_special_tokens = kwargs.get('add_special_tokens', False)
token_ids = []
for char in text:
token_id = self.vocab.get(char, self.unk_token_id)
if token_id is not None:
token_ids.append(token_id)
if add_special_tokens:
if self.bos_token_id is not None:
token_ids.insert(0, self.bos_token_id)
if self.eos_token_id is not None:
token_ids.append(self.eos_token_id)
return token_ids
def decode(self, tokens: List[int], **kwargs) -> str:
"""Упрощенное декодирование."""
skip_special_tokens = kwargs.get('skip_special_tokens', True)
chars = []
for token_id in tokens:
char = self.inverse_vocab.get(token_id, self.unk_token)
if skip_special_tokens and char in [self.pad_token, self.unk_token, self.bos_token, self.eos_token]:
continue
chars.append(char)
return ''.join(chars)

View File

@@ -0,0 +1,207 @@
"""
BPE (Byte Pair Encoding) токенизатор.
Реализация алгоритма BPE для токенизации текста.
"""
from typing import List, Dict, Tuple, Optional
from .base_tokenizer import BaseTokenizer
class BPETokenizer(BaseTokenizer):
"""
BPE токенизатор для обработки текста.
Реализует алгоритм Byte Pair Encoding для создания субсловных токенов.
Использует вашу реализацию BPE.
Примеры использования:
>>> tokenizer = BPETokenizer()
>>> tokenizer.train(["пример текста для обучения"], vocab_size=1000)
>>> tokens = tokenizer.encode("новый текст")
>>> text = tokenizer.decode(tokens)
"""
def __init__(self):
super().__init__()
self.merges: Dict[Tuple[str, str], int] = {}
self.vocab_list: List[str] = []
def train(self, texts: List[str], vocab_size: int = 1000, **kwargs):
"""
Обучение BPE токенизатора на текстах.
Args:
texts: Список текстов для обучения
vocab_size: Желаемый размер словаря
**kwargs: Дополнительные параметры
- special_tokens: Список специальных токенов
"""
# Объединяем все тексты в одну строку для обучения
combined_text = " ".join(texts)
# 1. Получаем уникальные токены (символы)
unique_tokens = sorted(set(combined_text))
tokens = unique_tokens.copy()
# 2. Разбиваем текст на токены-символы
sequence = list(combined_text)
# 3. Объединяем токены до достижения нужного размера словаря
while len(tokens) < vocab_size:
# Считаем частоты пар
pair_freq = {}
for i in range(len(sequence) - 1):
pair = (sequence[i], sequence[i + 1])
if pair not in pair_freq:
pair_freq[pair] = 0
pair_freq[pair] += 1
if not pair_freq:
break # нет пар — выходим
# Находим самую частую пару (в случае равенства — та, что встретилась первой)
most_frequent_pair = max(pair_freq.items(), key=lambda x: (x[1], -self._pair_first_index(sequence, x[0])))[0]
# Создаем новый токен
new_token = most_frequent_pair[0] + most_frequent_pair[1]
tokens.append(new_token)
i = 0
new_sequence = []
while i < len(sequence):
if i < len(sequence) - 1 and (sequence[i], sequence[i + 1]) == most_frequent_pair:
new_sequence.append(new_token)
i += 2 # пропускаем два символа — заменённую пару
else:
new_sequence.append(sequence[i])
i += 1
sequence = new_sequence
# 4. Создаем словари
self.vocab_list = tokens.copy()
self.vocab = dict(zip(tokens, range(vocab_size)))
self.inverse_vocab = dict(zip(range(vocab_size), tokens))
self.vocab_size = len(self.vocab)
# Добавляем специальные токены если указаны
special_tokens = kwargs.get('special_tokens', [self.pad_token, self.unk_token, self.bos_token, self.eos_token])
self.add_special_tokens(special_tokens)
def _pair_first_index(self, sequence, pair):
"""Находит первый индекс пары в последовательности."""
for i in range(len(sequence) - 1):
if (sequence[i], sequence[i + 1]) == pair:
return i
return float('inf') # если пара не найдена (в теории не должно случиться)
def encode(self, text: str, **kwargs) -> List[int]:
"""
Кодирует текст в последовательность токенов.
Args:
text: Входной текст
**kwargs: Дополнительные параметры
- add_special_tokens: Добавлять специальные токены
Returns:
List[int]: Список идентификаторов токенов
"""
add_special_tokens = kwargs.get('add_special_tokens', False)
# 1. Разбиваем текст на токены-символы
sequence = list(text)
# 2. Инициализация пустого списка токенов
tokens = []
# 3. Установить i = 0
i = 0
while i < len(text):
# 3.1 Найти все токены в словаре, начинающиеся с text[i]
start_char = text[i]
result = [token for token in self.vocab_list if token.startswith(start_char)]
# 3.2 Выбрать самый длинный подходящий токен
find_token = self._find_max_matching_token(text[i:], result)
if find_token is None:
# Обработка неизвестного символа
tokens.append(text[i]) # Добавляем сам символ как токен
i += 1
else:
# 3.3 Добавить токен в результат
tokens.append(find_token)
# 3.4 Увеличить i на длину токена
i += len(find_token)
# 4. Заменить токены на их ID
token_ids = self._tokens_to_ids(tokens)
# Заменяем -1 на unk_token_id
token_ids = [tid if tid != -1 else self.unk_token_id for tid in token_ids]
# Добавляем специальные токены если нужно
if add_special_tokens:
if self.bos_token_id is not None:
token_ids.insert(0, self.bos_token_id)
if self.eos_token_id is not None:
token_ids.append(self.eos_token_id)
return token_ids
def _find_max_matching_token(self, text: str, tokens: list) -> Optional[str]:
"""Находит самый длинный токен из списка, с которого начинается текст"""
matching = [token for token in tokens if text.startswith(token)]
return max(matching, key=len) if matching else None
def _tokens_to_ids(self, tokens: List[str]) -> List[int]:
"""Конвертирует список токенов в их ID с обработкой неизвестных токенов"""
ids = []
for token in tokens:
if token in self.vocab:
ids.append(self.vocab[token])
else:
ids.append(-1) # Специальное значение
return ids
def decode(self, tokens: List[int], **kwargs) -> str:
"""
Декодирует последовательность токенов в текст.
Args:
tokens: Список идентификаторов токенов
**kwargs: Дополнительные параметры
- skip_special_tokens: Пропускать специальные токены
Returns:
str: Декодированный текст
"""
skip_special_tokens = kwargs.get('skip_special_tokens', True)
# Фильтруем специальные токены если нужно
if skip_special_tokens:
tokens = [tid for tid in tokens if tid not in [
self.pad_token_id, self.unk_token_id, self.bos_token_id, self.eos_token_id
]]
# Конвертируем ID в токены
token_strings = self._ids_to_tokens(tokens)
# Объединяем токены в текст
return ''.join(token_strings)
def _ids_to_tokens(self, ids: List[int]) -> List[str]:
"""Конвертирует список Ids в их tokens"""
tokens = []
for token_id in ids:
if token_id in self.inverse_vocab:
tokens.append(self.inverse_vocab[token_id])
else:
tokens.append(self.unk_token) # Специальное значение
return tokens
class SimpleBPETokenizer(BPETokenizer):
"""
Упрощенная версия BPE токенизатора для демонстрации.
Наследует вашу реализацию, но может быть упрощена при необходимости.
"""
pass

View File

@@ -0,0 +1,2 @@
def hello() -> str:
return "Hello from llm!"

View File

@@ -0,0 +1,142 @@
import torch
from torch.utils.data import Dataset
from typing import List, Any
class TextDataset(Dataset):
"""
Простой датасет для языкового моделирования (LLM).
Работает с любым токенизатором, реализующим интерфейс BaseTokenizer.
"""
def __init__(self, texts: List[str], tokenizer: Any, block_size: int = 128):
"""
Инициализация датасета.
Args:
texts: Список текстов для обучения
tokenizer: Токенизатор с методами encode/decode
block_size: Максимальная длина последовательности
"""
self.examples = []
self.tokenizer = tokenizer
self.block_size = block_size
for text in texts:
# Кодируем текст в токены
input_ids = tokenizer.encode(text, add_special_tokens=False)
# Обрезаем или дополняем до нужной длины
if len(input_ids) > block_size:
input_ids = input_ids[:block_size]
else:
# Дополняем pad_token_id
pad_token_id = getattr(tokenizer, 'pad_token_id', 0)
input_ids = input_ids + [pad_token_id] * (block_size - len(input_ids))
self.examples.append(input_ids)
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
input_ids = torch.tensor(self.examples[idx], dtype=torch.long)
labels = input_ids.clone()
return {"input_ids": input_ids, "labels": labels}
class StreamingTextDataset(Dataset):
"""
Датасет для потоковой обработки больших текстов.
Токенизация происходит на лету, что экономит память.
"""
def __init__(self, texts: List[str], tokenizer: Any, block_size: int = 128):
self.texts = texts
self.tokenizer = tokenizer
self.block_size = block_size
# Получаем pad_token_id из токенизатора
self.pad_token_id = getattr(tokenizer, 'pad_token_id', 0)
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
# Токенизация на лету
input_ids = self.tokenizer.encode(text, add_special_tokens=False)
# Обрезаем или дополняем до нужной длины
if len(input_ids) > self.block_size:
input_ids = input_ids[:self.block_size]
else:
input_ids = input_ids + [self.pad_token_id] * (self.block_size - len(input_ids))
input_ids = torch.tensor(input_ids, dtype=torch.long)
labels = input_ids.clone()
return {"input_ids": input_ids, "labels": labels}
class TextDatasetWithSpecialTokens(TextDataset):
"""
Расширенная версия TextDataset с поддержкой специальных токенов.
"""
def __init__(self, texts: List[str], tokenizer: Any, block_size: int = 128,
add_bos: bool = False, add_eos: bool = False):
"""
Args:
texts: Список текстов
tokenizer: Токенизатор
block_size: Максимальная длина
add_bos: Добавлять токен начала последовательности
add_eos: Добавлять токен конца последовательности
"""
self.examples = []
self.tokenizer = tokenizer
self.block_size = block_size
self.add_bos = add_bos
self.add_eos = add_eos
for text in texts:
# Кодируем с специальными токенами
input_ids = tokenizer.encode(
text,
add_special_tokens=True,
add_bos_token=add_bos,
add_eos_token=eos
)
# Учитываем специальные токены при обрезке/дополнении
effective_block_size = block_size
if add_bos:
effective_block_size -= 1
if add_eos:
effective_block_size -= 1
if len(input_ids) > effective_block_size:
input_ids = input_ids[:effective_block_size]
# Добавляем специальные токены если нужно
if add_bos and hasattr(tokenizer, 'bos_token_id') and tokenizer.bos_token_id is not None:
input_ids = [tokenizer.bos_token_id] + input_ids
if add_eos and hasattr(tokenizer, 'eos_token_id') and tokenizer.eos_token_id is not None:
input_ids = input_ids + [tokenizer.eos_token_id]
# Дополняем до полной длины
pad_token_id = getattr(tokenizer, 'pad_token_id', 0)
if len(input_ids) < block_size:
input_ids = input_ids + [pad_token_id] * (block_size - len(input_ids))
self.examples.append(input_ids)
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
input_ids = torch.tensor(self.examples[idx], dtype=torch.long)
labels = input_ids.clone()
return {"input_ids": input_ids, "labels": labels}

View File

@@ -0,0 +1,14 @@
import torch.optim as optim
def get_optimizer(model, lr=3e-4, weight_decay=0.01, optimizer_type="adamw"):
"""
Возвращает оптимизатор для обучения модели.
"""
if optimizer_type.lower() == "adamw":
return optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
elif optimizer_type.lower() == "adam":
return optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
elif optimizer_type.lower() == "sgd":
return optim.SGD(model.parameters(), lr=lr, momentum=0.9)
else:
raise ValueError(f"Неизвестный тип оптимизатора: {optimizer_type}")

View File

@@ -0,0 +1,13 @@
from torch.optim.lr_scheduler import LambdaLR
def get_linear_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps):
"""
Линейный планировщик обучения с warmup.
"""
def lr_lambda(current_step):
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
return max(0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps)))
return LambdaLR(optimizer, lr_lambda)

View File

@@ -0,0 +1,90 @@
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from tqdm import tqdm
from llm.training.optimizer import get_optimizer
from llm.training.scheduler import get_linear_schedule_with_warmup
class Trainer:
"""
Универсальный класс обучения LLM (GPT, LLaMA, Mistral и т.д.)
"""
def __init__(self, model, train_dataset, val_dataset=None, lr=3e-4, batch_size=8, num_epochs=3, warmup_steps=100):
self.model = model
self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
self.val_loader = DataLoader(val_dataset, batch_size=batch_size) if val_dataset else None
self.optimizer = get_optimizer(model, lr=lr)
self.scheduler = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(self.device)
self.num_epochs = num_epochs
self.warmup_steps = warmup_steps
def compute_lm_loss(self, logits, labels):
"""
Вычисляет loss для языкового моделирования.
Сдвигает логиты и метки для предсказания следующего токена.
"""
# Сдвигаем логиты и метки для языкового моделирования
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# Вычисляем cross-entropy loss
loss = F.cross_entropy(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1),
ignore_index=-100 # Игнорируем padding tokens
)
return loss
def train(self):
total_steps = len(self.train_loader) * self.num_epochs
self.scheduler = get_linear_schedule_with_warmup(self.optimizer, self.warmup_steps, total_steps)
for epoch in range(self.num_epochs):
self.model.train()
total_loss = 0
progress_bar = tqdm(self.train_loader, desc=f"Epoch {epoch+1}/{self.num_epochs}")
for batch in progress_bar:
self.optimizer.zero_grad()
input_ids = batch["input_ids"].to(self.device)
labels = batch["labels"].to(self.device)
# Модель возвращает только логиты
logits = self.model(input_ids)
# Trainer вычисляет loss
loss = self.compute_lm_loss(logits, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
self.scheduler.step()
total_loss += loss.item()
progress_bar.set_postfix(loss=loss.item())
avg_loss = total_loss / len(self.train_loader)
print(f"Epoch {epoch+1} finished — avg loss: {avg_loss:.4f}")
if self.val_loader:
self.evaluate()
def evaluate(self):
self.model.eval()
total_loss = 0
with torch.no_grad():
for batch in self.val_loader:
input_ids = batch["input_ids"].to(self.device)
labels = batch["labels"].to(self.device)
logits = self.model(input_ids)
loss = self.compute_lm_loss(logits, labels)
total_loss += loss.item()
avg_loss = total_loss / len(self.val_loader)
print(f"Validation loss: {avg_loss:.4f}")