Merge pull request #1 from pese-git/feature/gpt2

Feature/gpt2
This commit is contained in:
Sergey Penkovsky
2025-10-05 21:30:33 +03:00
committed by GitHub
17 changed files with 899 additions and 71 deletions

View File

@@ -6,7 +6,7 @@
Проект организован как монорепозиторий с использованием **uv** workspace:
- **`llm`** — основная библиотека с реализацией архитектур LLM
- **`llm`** — основная библиотека с реализацией архитектур LLM (GPT, GPT-2)
- **`hf-proxy`** — адаптер для интеграции с HuggingFace
- **`experiments`** — скрипты обучения и экспериментов
- **`notebooks`** — исследовательские ноутбуки
@@ -30,8 +30,9 @@ llm-arch-research/
│ │ ├── feed_forward.py
│ │ ├── token_embeddings.py
│ │ └── positional_embeddings.py
│ ├── models/gpt/ # GPT реализация
│ ├── models/gpt/ # GPT и GPT-2 реализация
│ │ ├── gpt.py
│ │ ├── gpt2.py
│ │ └── __init__.py
│ ├── training/ # утилиты обучения
│ │ ├── dataset.py
@@ -103,11 +104,11 @@ uv run python experiments/hf_integration/generate_with_hf_tools.py
### Использование в коде
```python
from llm.models.gpt import GPT
from llm.models.gpt import GPT, GPT2
from llm.tokenizers import BPETokenizer
from hf_proxy import HFAdapter, HFTokenizerAdapter
# Создание модели
# Создание GPT модели
config = {
"vocab_size": 50257,
"embed_dim": 256,
@@ -118,6 +119,17 @@ config = {
}
model = GPT(config)
# Создание GPT-2 модели (пример)
gpt2_config = {
"vocab_size": 50257,
"embed_dim": 768,
"num_heads": 12,
"num_layers": 12,
"max_position_embeddings": 1024,
"dropout": 0.1
}
gpt2_model = GPT2(gpt2_config)
# Генерация текста
generated = model.generate(
input_ids,
@@ -190,12 +202,13 @@ dependencies = [
## 🎯 Реализованные возможности
### Архитектура GPT
### Архитектуры GPT и GPT-2
- ✅ Токенные и позиционные эмбеддинги
- ✅ Многоголовое внимание с causal mask
- ✅ Декодерные блоки с residual connections
- ✅ Layer normalization
- ✅ Dropout регуляризация
- ✅ Отдельные реализации GPT и GPT-2 (различия в масштабе и деталях архитектуры)
### Генерация текста
- ✅ Жадный поиск (greedy decoding)

View File

@@ -0,0 +1,313 @@
#!/usr/bin/env python3
"""
Experiment: generate_gpt_bpe.py
Description: Генерация текста обученной GPT моделью с BPE токенизатором.
Использует только библиотеку llm без зависимостей от HuggingFace.
"""
import torch
import os
import sys
# Добавляем путь к shared модулям
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from llm.models.gpt import GPT2
from llm.tokenizers import BPETokenizer
from shared.configs import (
BASE_GPT_CONFIG, TEST_PROMPTS, GENERATION_CONFIG, PATHS
)
from shared.data import (
print_experiment_info, ensure_directories, ExperimentLogger
)
def load_model_and_tokenizer() -> tuple:
"""
Загружает обученную модель и токенизатор.
Returns:
tuple: (модель, токенизатор, конфигурация)
"""
# Проверяем существование файлов
if not os.path.exists(PATHS["gpt_bpe_model"]):
raise FileNotFoundError(
f"Модель не найдена: {PATHS['gpt_bpe_model']}\n"
f"Сначала обучите модель: uv run python experiments/llm_only/train_gpt_bpe.py"
)
if not os.path.exists(PATHS["bpe_tokenizer"]):
raise FileNotFoundError(
f"Токенизатор не найден: {PATHS['bpe_tokenizer']}"
)
# Загружаем конфигурацию модели
import json
with open(PATHS["gpt_bpe_config"], 'r', encoding='utf-8') as f:
model_config = json.load(f)
# Загружаем токенизатор
print("🔧 Загрузка BPE токенизатора...")
tokenizer = BPETokenizer.load(PATHS["bpe_tokenizer"])
print(f"✅ Токенизатор загружен (vocab_size={tokenizer.get_vocab_size()})")
# Загружаем модель
print("🔧 Загрузка GPT2 модели...")
model = GPT2(model_config)
model.load_state_dict(torch.load(PATHS["gpt_bpe_model"], map_location='cpu'))
model.eval()
print("✅ Модель загружена")
return model, tokenizer, model_config
def generate_text(
model: GPT2,
tokenizer: BPETokenizer,
prompt: str,
config: dict
) -> str:
"""
Генерирует текст на основе промпта.
Args:
model: Обученная GPT модель
tokenizer: BPE токенизатор
prompt: Входной текст
config: Конфигурация генерации
Returns:
str: Сгенерированный текст
"""
print(f"🔤 Промпт: '{prompt}'")
print(f"📊 Параметры: max_tokens={config['max_new_tokens']}, "
f"temp={config['temperature']}, sample={config['do_sample']}")
# Кодируем промпт
input_ids = tokenizer.encode(prompt, add_special_tokens=False)
input_tensor = torch.tensor([input_ids], dtype=torch.long)
print(f"🎯 Токены промпта: {input_ids}")
print(f"🎯 Токены (текст): {tokenizer.tokenize(prompt)}")
print("🔄 Генерация...")
# Генерируем текст
with torch.no_grad():
generated_ids = model.generate(
x=input_tensor,
max_new_tokens=config["max_new_tokens"],
do_sample=config["do_sample"],
temperature=config["temperature"],
top_k=config["top_k"],
top_p=config["top_p"]
)
# Декодируем результат
generated_text = tokenizer.decode(generated_ids[0].tolist())
return generated_text
def test_different_strategies(model: GPT2, tokenizer: BPETokenizer, prompt: str):
"""
Тестирует разные стратегии генерации на одном промпте.
Args:
model: Обученная модель
tokenizer: BPE токенизатор
prompt: Тестовый промпт
"""
print(f"\n🎭 Сравнение стратегий генерации для промпта: '{prompt}'")
print("=" * 60)
strategies = [
{"name": "🎯 Жадный поиск", "do_sample": False, "temperature": 1.0},
{"name": "🎲 Вероятностная (temp=0.7)", "do_sample": True, "temperature": 0.7},
{"name": "🔥 Случайная (temp=1.2)", "do_sample": True, "temperature": 1.2},
{"name": "❄️ Детерминированная (temp=0.3)", "do_sample": True, "temperature": 0.3},
]
for strategy in strategies:
print(f"\n{strategy['name']}:")
try:
config = GENERATION_CONFIG.copy()
config.update({
"do_sample": strategy["do_sample"],
"temperature": strategy["temperature"],
"max_new_tokens": 20
})
generated = generate_text(model, tokenizer, prompt, config)
# Выделяем сгенерированную часть
generated_part = generated[len(prompt):]
print(f" 📤 Промпт: '{prompt}'")
print(f" 🎯 Сгенерировано: '{generated_part}'")
print(f" 📄 Полный текст: '{generated}'")
except Exception as e:
print(f" ❌ Ошибка: {e}")
def analyze_tokenization(tokenizer: BPETokenizer, texts: list):
"""
Анализирует токенизацию различных текстов.
Args:
tokenizer: BPE токенизатор
texts: Список текстов для анализа
"""
print(f"\n🔍 Анализ токенизации BPE:")
print("=" * 50)
for i, text in enumerate(texts):
print(f"\nТекст {i+1}: '{text}'")
# Токенизация
tokens = tokenizer.encode(text, add_special_tokens=False)
token_strings = tokenizer.tokenize(text)
print(f" Токены (ID): {tokens}")
print(f" Токены (текст): {token_strings}")
print(f" Количество токенов: {len(tokens)}")
print(f" Эффективность: {len(text)} символов → {len(tokens)} токенов")
# Декодирование обратно
decoded = tokenizer.decode(tokens)
if text == decoded:
print(f" ✅ Декодирование корректно")
else:
print(f" ⚠️ Расхождения: '{decoded}'")
def interactive_generation(model: GPT2, tokenizer: BPETokenizer):
"""
Режим интерактивной генерации.
Args:
model: Обученная модель
tokenizer: BPE токенизатор
"""
print(f"\n💬 Интерактивная генерация (для выхода введите 'exit')")
print("-" * 50)
while True:
try:
user_input = input("\n🔤 Введите промпт: ").strip()
if user_input.lower() in ['exit', 'quit', 'выход']:
break
if not user_input:
continue
# Запрашиваем параметры
try:
max_tokens = int(input("📏 Макс. токенов [50]: ") or "50")
temperature = float(input("🌡️ Температура [0.7]: ") or "0.7")
do_sample_input = input("🎲 Сэмплирование (y/n) [y]: ").lower()
do_sample = do_sample_input != 'n'
except:
max_tokens = 50
temperature = 0.7
do_sample = True
print("⚠️ Использую параметры по умолчанию")
config = GENERATION_CONFIG.copy()
config.update({
"max_new_tokens": max_tokens,
"temperature": temperature,
"do_sample": do_sample
})
generated = generate_text(model, tokenizer, user_input, config)
generated_part = generated[len(user_input):]
print(f"\n🎯 Результат:")
print(f" 📤 Промпт: '{user_input}'")
print(f" 🎯 Сгенерировано: '{generated_part}'")
print(f" 📄 Полный текст: '{generated}'")
except KeyboardInterrupt:
print("\n👋 Завершение работы...")
break
except Exception as e:
print(f"❌ Ошибка: {e}")
def main():
"""Основная функция эксперимента."""
# === Настройка эксперимента ===
experiment_name = "Генерация текста GPT2 + BPE (только llm)"
experiment_config = {
"model": "GPT2 с BPE токенизатором",
"стратегия": "автономная генерация",
"вход": "промпты",
"выход": "сгенерированный текст"
}
print_experiment_info(experiment_name, experiment_config)
ensure_directories()
logger = ExperimentLogger(experiment_name)
try:
# Загружаем модель и токенизатор
model, tokenizer, model_config = load_model_and_tokenizer()
# === Анализ токенизации ===
analysis_texts = [
"Искусственный интеллект",
"Нейронные сети",
"Машинное обучение",
]
analyze_tokenization(tokenizer, analysis_texts)
# === Генерация с разными промптами ===
print(f"\n🎯 Генерация текста с разными промптами")
print("=" * 60)
for i, prompt in enumerate(TEST_PROMPTS):
print(f"\n📝 Пример {i+1}/{len(TEST_PROMPTS)}")
print("-" * 40)
try:
generated = generate_text(model, tokenizer, prompt, GENERATION_CONFIG)
# Выделяем сгенерированную часть
generated_part = generated[len(prompt):]
print(f"📤 Промпт: '{prompt}'")
print(f"🎯 Сгенерировано: '{generated_part}'")
print(f"📄 Полный текст: '{generated}'")
print(f"📏 Длина: {len(generated)} символов")
# Логируем успешную генерацию
logger.log_metric(f"generation_length_{i}", len(generated))
except Exception as e:
print(f"❌ Ошибка при генерации: {e}")
continue
# === Сравнение стратегий генерации ===
test_prompt = "Искусственный"
test_different_strategies(model, tokenizer, test_prompt)
# === Интерактивная генерация ===
interactive_generation(model, tokenizer)
# === Сохранение результатов ===
logger.save_logs("checkpoints/llm_only_generation_logs.json")
print(f"\n🎉 Эксперимент генерации завершен успешно!")
except FileNotFoundError as e:
print(f"{e}")
except Exception as e:
print(f"❌ Ошибка в эксперименте: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
Experiment: train_gpt_bpe.py
Description: Обучение GPT модели с собственным BPE токенизатором.
Использует только библиотеку llm без зависимостей от HuggingFace.
"""
import torch
import os
import sys
# Добавляем путь к shared модулям
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from llm.models.gpt import GPT2
from llm.tokenizers import BPETokenizer
from llm.training.dataset import TextDataset
from llm.training.trainer import Trainer
from shared.configs import (
TRAIN_TEXTS, BASE_GPT_CONFIG, BPE_CONFIG,
TRAINING_CONFIG, PATHS, TEST_PROMPTS
)
from shared.data import (
load_training_data, ensure_directories,
print_experiment_info, ExperimentLogger
)
def train_bpe_tokenizer(texts: list, config: dict) -> BPETokenizer:
"""
Обучает BPE токенизатор на текстах.
Args:
texts: Список текстов для обучения
config: Конфигурация токенизатора
Returns:
BPETokenizer: Обученный токенизатор
"""
print("🔧 Обучение BPE токенизатора...")
tokenizer = BPETokenizer()
tokenizer.train(
texts=texts,
vocab_size=config["vocab_size"],
special_tokens=config["special_tokens"]
)
# Сохраняем токенизатор
os.makedirs(os.path.dirname(PATHS["bpe_tokenizer"]), exist_ok=True)
tokenizer.save(PATHS["bpe_tokenizer"])
print(f"✅ BPE токенизатор обучен и сохранен: {PATHS['bpe_tokenizer']}")
print(f"📊 Размер словаря: {tokenizer.get_vocab_size()}")
return tokenizer
def test_tokenizer(tokenizer: BPETokenizer, texts: list):
"""
Тестирует токенизатор на примерах.
Args:
tokenizer: Обученный токенизатор
texts: Список тестовых текстов
"""
print("\n🧪 Тестирование токенизатора:")
for i, text in enumerate(texts[:3]):
print(f"\nПример {i+1}:")
print(f" Исходный текст: '{text}'")
# Кодирование
tokens = tokenizer.encode(text)
token_strings = tokenizer.tokenize(text)
print(f" Токены (ID): {tokens}")
print(f" Токены (текст): {token_strings}")
print(f" Количество токенов: {len(tokens)}")
# Декодирование
decoded = tokenizer.decode(tokens)
print(f" Декодированный: '{decoded}'")
if text == decoded:
print(" ✅ Кодирование/декодирование корректно")
else:
print(" ⚠️ Небольшие расхождения")
def main():
"""Основная функция эксперимента."""
# === Настройка эксперимента ===
experiment_name = "Обучение GPT2 с BPE токенизатором (только llm)"
experiment_config = {
"model": "GPT2",
"tokenizer": "BPE",
"vocab_size": BPE_CONFIG["vocab_size"],
"training_epochs": TRAINING_CONFIG["num_epochs"],
"batch_size": TRAINING_CONFIG["batch_size"],
"learning_rate": TRAINING_CONFIG["learning_rate"]
}
print_experiment_info(experiment_name, experiment_config)
ensure_directories()
logger = ExperimentLogger(experiment_name)
try:
# === Подготовка данных ===
train_texts, val_texts = load_training_data()
print(f"📊 Данные: {len(train_texts)} train, {len(val_texts)} validation")
# === Обучение токенизатора ===
if os.path.exists(PATHS["bpe_tokenizer"]):
print("📝 Загрузка предварительно обученного токенизатора...")
tokenizer = BPETokenizer.load(PATHS["bpe_tokenizer"])
print(f"✅ Токенизатор загружен (vocab_size={tokenizer.get_vocab_size()})")
else:
tokenizer = train_bpe_tokenizer(TRAIN_TEXTS, BPE_CONFIG)
# Тестируем токенизатор
test_tokenizer(tokenizer, TEST_PROMPTS[:3])
# === Инициализация модели ===
model_config = BASE_GPT_CONFIG.copy()
model_config["vocab_size"] = tokenizer.get_vocab_size()
print(f"\n🔧 Инициализация GPT2 модели...")
print(f" Размер словаря: {model_config['vocab_size']}")
print(f" Размер эмбеддингов: {model_config['embed_dim']}")
print(f" Количество слоев: {model_config['num_layers']}")
print(f" Количество голов внимания: {model_config['num_heads']}")
model = GPT2(model_config)
# === Подготовка датасета ===
print(f"\n📊 Подготовка датасета...")
train_dataset = TextDataset(
train_texts,
tokenizer,
block_size=model_config["max_position_embeddings"]
)
print(f" Размер train датасета: {len(train_dataset)} примеров")
# === Обучение модели ===
print(f"\n🎯 Начало обучения GPT2 модели...")
trainer = Trainer(
model=model,
train_dataset=train_dataset,
lr=TRAINING_CONFIG["learning_rate"],
batch_size=TRAINING_CONFIG["batch_size"],
num_epochs=TRAINING_CONFIG["num_epochs"],
warmup_steps=TRAINING_CONFIG["warmup_steps"]
)
# Запускаем обучение
trainer.train()
# === Сохранение модели ===
print(f"\n💾 Сохранение модели...")
os.makedirs(os.path.dirname(PATHS["gpt_bpe_model"]), exist_ok=True)
# Сохраняем модель
torch.save(model.state_dict(), PATHS["gpt_bpe_model"])
# Сохраняем конфигурацию
import json
with open(PATHS["gpt_bpe_config"], 'w', encoding='utf-8') as f:
json.dump(model_config, f, indent=2, ensure_ascii=False)
print(f"✅ Модель сохранена:")
print(f" - {PATHS['gpt_bpe_model']}: веса модели")
print(f" - {PATHS['gpt_bpe_config']}: конфигурация модели")
print(f" - {PATHS['bpe_tokenizer']}: токенизатор")
# === Тестирование генерации ===
print(f"\n🧪 Тестирование генерации текста...")
model.eval()
for prompt in TEST_PROMPTS[:3]:
print(f"\n🔤 Промпт: '{prompt}'")
try:
# Кодируем промпт
input_ids = tokenizer.encode(prompt, add_special_tokens=False)
input_tensor = torch.tensor([input_ids], dtype=torch.long)
# Генерируем текст
with torch.no_grad():
generated_ids = model.generate(
x=input_tensor,
max_new_tokens=20,
do_sample=True,
temperature=0.8
)
# Декодируем результат
generated_text = tokenizer.decode(generated_ids[0].tolist())
generated_part = generated_text[len(prompt):]
print(f"🎯 Сгенерировано: '{generated_part}'")
print(f"📄 Полный текст: '{generated_text}'")
except Exception as e:
print(f"❌ Ошибка генерации: {e}")
# === Сохранение результатов ===
results = {
"experiment": experiment_name,
"model_config": model_config,
"training_config": TRAINING_CONFIG,
"tokenizer_vocab_size": tokenizer.get_vocab_size(),
"final_loss": "см. логи обучения" # В реальном эксперименте можно сохранить final loss
}
logger.save_logs("checkpoints/llm_only_training_logs.json")
print(f"\n🎉 Эксперимент завершен успешно!")
print(f"\n💡 Для использования обученной модели:")
print(f" uv run python experiments/llm_only/generate_gpt_bpe.py")
except Exception as e:
print(f"❌ Ошибка в эксперименте: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -99,7 +99,11 @@ class HFGPTAdapter(PreTrainedModel):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# Основной forward pass
logits = self.llm_model(input_ids)
outputs = self.llm_model(input_ids)
if isinstance(outputs, tuple):
logits = outputs[0]
else:
logits = outputs
loss = None
if labels is not None:

View File

@@ -56,10 +56,24 @@ class HFTokenizerAdapter:
add_special_tokens = kwargs.get('add_special_tokens', True)
# Кодируем текст
#input_ids = self.llm_tokenizer.encode(
# text,
# add_special_tokens=add_special_tokens
#)
if isinstance(text, str):
input_ids = self.llm_tokenizer.encode(
text,
add_special_tokens=add_special_tokens
)
input_ids = [input_ids] # <-- оборачиваем в batch
else:
# Список строк, батч-режим!
input_ids = [
self.llm_tokenizer.encode(
t,
add_special_tokens=add_special_tokens
) for t in text
]
# Применяем truncation
if truncation and max_length is not None and len(input_ids) > max_length:

View File

@@ -0,0 +1,61 @@
# llm/src/llm/core/cached_decoder.py
import torch
from torch import nn
from .feed_forward import FeedForward
from .multi_head_attention import MultiHeadAttention
class CachedDecoder(nn.Module):
"""
Универсальный декодер с поддержкой кэша для autoregressive использования (GPT, LLAMA и пр).
- Поддерживает использование past_key_values для быстрого генеративного инференса.
"""
def __init__(
self,
num_heads: int,
emb_size: int,
head_size: int,
max_seq_len: int,
dropout: float = 0.1,
activation: str = "gelu",
):
super().__init__()
self._heads = MultiHeadAttention(
num_heads=num_heads,
emb_size=emb_size,
head_size=head_size,
max_seq_len=max_seq_len,
dropout=dropout,
)
self._ff = FeedForward(emb_size=emb_size, dropout=dropout, activation=activation)
self._norm1 = nn.LayerNorm(emb_size)
self._norm2 = nn.LayerNorm(emb_size)
def forward(
self,
x: torch.Tensor,
mask: torch.Tensor = None,
use_cache: bool = True,
cache: list = None,
):
"""
x: [batch, seq_len, emb_size]
mask: (optional)
use_cache: использовать ли кэширование KV-слоев (инкрементальный генератив, GPT-style)
cache: список кэшей для голов (или None)
Возвращает: (output, new_cache) если use_cache=True, иначе (output, None)
"""
norm1_out = self._norm1(x)
# Передаём все cache/use_cache дальше в attention
attention, kv_caches = self._heads(
norm1_out, mask=mask, use_cache=use_cache, cache=cache
)
out = attention + x
norm2_out = self._norm2(out)
ffn_out = self._ff(norm2_out)
result = ffn_out + out
if use_cache:
return (result, kv_caches)
else:
return (result, None)

View File

@@ -88,7 +88,7 @@ class Decoder(nn.Module):
4. Добавляем residual connection и LayerNorm
"""
# Self-Attention блок
attention = self._heads(x, mask)
attention, _ = self._heads(x, mask, use_cache=False, cache=None)
out = self._norm1(attention + x)
# FeedForward блок

View File

@@ -1,5 +1,16 @@
from torch import nn
import torch
import math
class GELU(nn.Module):
def __init__(self):
super().__init__()
self.sqrt_2_over_pi = torch.sqrt(torch.tensor(2.0) / math.pi)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return 0.5 * x * (1 + torch.tanh(
self.sqrt_2_over_pi * (x + 0.044715 * torch.pow(x, 3))
))
class FeedForward(nn.Module):
"""
@@ -37,7 +48,7 @@ class FeedForward(nn.Module):
>>> output_double = ff(x_double)
>>> print(output_double.dtype) # torch.float64
"""
def __init__(self, emb_size: int, dropout: float = 0.1):
def __init__(self, emb_size: int, dropout: float = 0.1, activation: str = "relu"):
"""
Инициализация слоя Feed Forward Network.
@@ -49,7 +60,14 @@ class FeedForward(nn.Module):
# Первый линейный слой (расширение размерности)
self._layer1 = nn.Linear(emb_size, emb_size * 4)
# ReLU активация
self._relu = nn.ReLU()
if activation == "relu":
self._activation = nn.ReLU()
elif activation == "gelu":
self._activation = nn.GELU()
elif activation == "gelu_exact":
self._activation = GELU()
else:
raise ValueError(f"Unknown activation: {activation}")
# Второй линейный слой (сжатие обратно)
self._layer2 = nn.Linear(emb_size * 4, emb_size)
# Dropout
@@ -75,6 +93,6 @@ class FeedForward(nn.Module):
# Пропустим тензор x по очереди через все созданные слои
x = self._layer1(x)
x = self._relu(x)
x = self._activation(x)
x = self._layer2(x)
return self._dropout(x)

View File

@@ -45,7 +45,7 @@ class HeadAttention(nn.Module):
mask = torch.tril(torch.ones(max_seq_len, max_seq_len))
self.register_buffer('_tril_mask', mask.bool() if hasattr(torch, 'bool') else mask.byte())
def forward(self, x: torch.Tensor) -> torch.Tensor:
def forward(self, x: torch.Tensor, use_cache: bool = True, cache: tuple = None) -> tuple:
"""
Прямой проход через слой внимания.
@@ -69,16 +69,24 @@ class HeadAttention(nn.Module):
if seq_len > self._max_seq_len:
raise ValueError(f"Длина последовательности {seq_len} превышает максимум {self._max_seq_len}")
# 1. Линейные преобразования
k = self._k(x) # [B, T, hs]
q = self._q(x) # [B, T, hs]
v = self._v(x) # [B, T, hs]
if cache is not None:
k_cache, v_cache = cache
k = torch.cat([k_cache, k], dim=1) # [B, cache_len + T, hs]
v = torch.cat([v_cache, v], dim=1) # [B, cache_len + T, hs]
# 2. Вычисление scores
scores = q @ k.transpose(-2, -1) / sqrt(self._head_size)
# 3. Применение causal маски
if cache is None:
scores = scores.masked_fill(~self._tril_mask[:seq_len, :seq_len], float('-inf'))
# 4. Softmax и умножение на V
weights = F.softmax(scores, dim=-1)
return weights @ self._v(x)
x_out = weights @ v # [B, T, hs]
if use_cache is True:
return (x_out, (k, v))
else:
return (x_out, None)

View File

@@ -58,7 +58,7 @@ class MultiHeadAttention(nn.Module):
self._layer = nn.Linear(head_size * num_heads, emb_size)
self._dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor, mask: torch.Tensor = None):
def forward(self, x: torch.Tensor, mask: torch.Tensor = None, use_cache: bool = True, cache: list = None):
"""
Прямой проход через слой многоголового внимания.
@@ -90,7 +90,15 @@ class MultiHeadAttention(nn.Module):
-> Dropout: [4, 100, 512]
"""
# 1. Вычисляем attention для каждой головы
attention_outputs = [head(x) for head in self._heads]
attention_results = []
for i, head in enumerate(self._heads):
head_cache = cache[i] if cache is not None else None
result = head(x, use_cache=use_cache, cache=head_cache)
attention_results.append(result)
outputs, caches = zip(*attention_results)
attention_outputs = list(outputs)
kv_caches = list(caches)
# 2. Объединяем результаты всех голов
concatenated_attention = torch.cat(attention_outputs, dim=-1)
@@ -101,4 +109,7 @@ class MultiHeadAttention(nn.Module):
# 4. Применяем dropout для регуляризации
final_output = self._dropout(projected_output)
return final_output
if use_cache is True:
return (final_output, kv_caches)
else:
return (final_output, None)

View File

@@ -40,7 +40,7 @@ class PositionalEmbeddings(nn.Module):
embedding_dim=emb_size
)
def forward(self, seq_len: int) -> Tensor:
def forward(self, seq_len: int, start_pos: int = 0) -> Tensor:
"""
Возвращает позиционные эмбеддинги для заданной длины последовательности.
@@ -59,32 +59,8 @@ class PositionalEmbeddings(nn.Module):
"""
if seq_len < 1 or seq_len > self.max_seq_len:
raise IndexError(f"Длина {seq_len} должна быть от 1 до {self.max_seq_len}")
if start_pos == 0:
positions = torch.arange(seq_len, device=self.embedding.weight.device)
else:
positions = torch.arange(start=start_pos, end=start_pos + seq_len, device=self.embedding.weight.device)
return self.embedding(positions)
if __name__ == "__main__":
# Демонстрация работы
print("Пример использования PositionalEmbeddings:")
pos_emb = PositionalEmbeddings(max_seq_len=50, emb_size=128)
# Пример 1: Базовое использование
print("\n1. Базовый пример:")
emb = pos_emb(10)
print(f"Форма выходного тензора: {emb.shape}")
print(f"Среднее значение: {emb.mean().item():.4f}")
# Пример 2: Интеграция с моделью
print("\n2. Пример интеграции с моделью:")
class DemoModel(nn.Module):
def __init__(self):
super().__init__()
self.pos_emb = PositionalEmbeddings(50, 128)
def forward(self, x):
pos = self.pos_emb(x.size(1))
return x + pos # Добавляем позиционную информацию
model = DemoModel()
input_tensor = torch.randn(2, 10, 128) # [batch, seq, features]
output = model(input_tensor)
print(f"Вход: {input_tensor.shape}, Выход: {output.shape}")

View File

@@ -1,3 +1,4 @@
from .gpt import GPT
from .gpt2 import GPT2
__all__ = ["GPT"]
__all__ = ["GPT", "GPT2"]

View File

@@ -0,0 +1,170 @@
import torch
from torch import nn, Tensor
import torch.nn.functional as F
from llm.core.base_model import BaseModel
from llm.core.token_embeddings import TokenEmbeddings
from llm.core.positional_embeddings import PositionalEmbeddings
from llm.core.cached_decoder import CachedDecoder
class GPT2(BaseModel):
def __init__(self, config):
super().__init__(config)
# Инициализация слоев
self._max_seq_len = config["max_position_embeddings"]
self._token_embeddings = TokenEmbeddings(
vocab_size=config["vocab_size"],
emb_size=config["embed_dim"]
)
self._position_embeddings = PositionalEmbeddings(
max_seq_len=config["max_position_embeddings"],
emb_size=config["embed_dim"]
)
self._dropout = nn.Dropout(config["dropout"])
# head_size = emb_size // num_heads
self._decoders = nn.ModuleList([CachedDecoder(
num_heads=config["num_heads"],
emb_size=config["embed_dim"],
head_size=config["embed_dim"] // config["num_heads"],
max_seq_len=config["max_position_embeddings"],
dropout=config["dropout"]
) for _ in range(config["num_layers"])])
self._norm = nn.LayerNorm(config["embed_dim"])
self._linear = nn.Linear(config["embed_dim"], config["vocab_size"])
def forward(self, x: torch.Tensor, use_cache: bool = True, cache: list = None) -> tuple:
# Проверка длины последовательности (только при отсутствии кэша)
if cache is None and x.size(1) > self._max_seq_len:
raise ValueError(f"Длина последовательности {x.size(1)} превышает максимальную {self.max_seq_len}")
# Вычисление start_pos из кэша (если кэш передан)
if cache is not None:
# При кэше обрабатываем только один токен (последний)
seq_len = 1
# Вычисляем start_pos из самого нижнего уровня кэша
if cache and cache[0] and cache[0][0]:
key_cache, _ = cache[0][0] # Первый декодер, первая голова
start_pos = key_cache.size(1) # cache_len
else:
start_pos = 0
else:
# Без кэша работаем как раньше
start_pos = 0
seq_len = x.size(1)
# Эмбеддинги токенов и позиций
tok_out = self._token_embeddings(x) # [batch, seq_len, emb_size]
pos_out = self._position_embeddings(seq_len, start_pos=start_pos) # [seq_len, emb_size]
# Комбинирование
out = self._dropout(tok_out + pos_out.unsqueeze(0)) # [batch, seq_len, emb_size]
# Стек декодеров с передачей кэша
new_cache = []
for i, decoder in enumerate(self._decoders):
decoder_cache = cache[i] if cache is not None else None
decoder_result = decoder(out, use_cache=use_cache, cache=decoder_cache)
# Извлекаем результат из кортежа
if use_cache:
out, decoder_new_cache = decoder_result
new_cache.append(decoder_new_cache)
else:
out = decoder_result[0]
out = self._norm(out)
logits = self._linear(out)
# Возвращаем результат с учетом use_cache
if use_cache:
return (logits, new_cache)
else:
return (logits, None)
def generate(self,
x: torch.Tensor,
max_new_tokens: int,
do_sample: bool,
temperature: float = 1.0,
top_k: int = None,
top_p: float = None,
use_cache: bool = True
) -> torch.Tensor:
cache = None
for _ in range(max_new_tokens):
if use_cache and cache is not None:
# Используем кэш - передаем только последний токен
x_input = x[:, -1:] # [batch_size, 1]
else:
# Первая итерация или кэш отключен - передаем всю последовательность
x_input = x
# Прямой проход с кэшем
logits, new_cache = self.forward(x_input, use_cache=use_cache, cache=cache)
# Обновляем кэш для следующей итерации
if use_cache:
cache = new_cache
last_logits = logits[:, -1, :] # [batch_size, vocab_size]
# Масштабируем логиты температурой
if temperature > 0:
logits_scaled = last_logits / temperature
else:
logits_scaled = last_logits
if do_sample == True and top_k != None:
_, topk_indices = torch.topk(logits_scaled, top_k, dim=-1)
# # Заменим все НЕ top-k логиты на -inf
masked_logits = logits_scaled.clone()
vocab_size = logits_scaled.size(-1)
# создаём маску: 1, если токен НЕ в topk_indices
mask = torch.ones_like(logits_scaled, dtype=torch.uint8)
mask.scatter_(1, topk_indices, 0) # 0 там, где top-k индексы
masked_logits[mask.byte()] = float('-inf')
logits_scaled = masked_logits
if do_sample == True and top_p != None:
# 1. Применим softmax, чтобы получить вероятности:
probs = F.softmax(logits_scaled, dim=-1) # [B, vocab_size]
# 2. Отсортируем токены по убыванию вероятностей:
sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
# 3. Посчитаем кумулятивную сумму вероятностей:
cum_probs = torch.cumsum(sorted_probs, dim=-1) # [B, vocab_size]
# 4. Определим маску: оставить токены, пока сумма < top_p
sorted_mask = (cum_probs <= top_p).byte() # [B, vocab_size]
# Гарантируем, что хотя бы первый токен останется
sorted_mask[:, 0] = 1
# 5. Преобразуем маску обратно в оригинальный порядок:
# Создаём полную маску из 0
mask = torch.zeros_like(probs, dtype=torch.uint8)
# Устанавливаем 1 в местах нужных токенов
mask.scatter_(dim=1, index=sorted_indices, src=sorted_mask)
# 6. Зануляем логиты токенов вне топ-p:
logits_scaled[~mask] = float('-inf')
# 4. Применяем Softmax
probs = F.softmax(logits_scaled, dim=-1) # [batch_size, vocab_size]
if do_sample == True:
# 5. Если do_sample равен True, то отбираем токен случайно с помощью torch.multinomial
next_token = torch.multinomial(probs, num_samples=1) # [batch_size, 1]
else:
# 5. Если do_sample равен False, то выбираем токен с максимальной вероятностью
next_token = torch.argmax(probs, dim=-1, keepdim=True) # [batch_size, 1]
# 6. Добавляем его к последовательности
x = torch.cat([x, next_token], dim=1) # [batch_size, seq_len+1]
return x
@property
def max_seq_len(self) -> int:
return self._max_seq_len

View File

@@ -53,8 +53,12 @@ class Trainer:
input_ids = batch["input_ids"].to(self.device)
labels = batch["labels"].to(self.device)
# Модель возвращает только логиты
logits = self.model(input_ids)
# Универсально обрабатываем выход (tuple/logits)
outputs = self.model(input_ids)
if isinstance(outputs, tuple):
logits = outputs[0]
else:
logits = outputs
# Trainer вычисляет loss
loss = self.compute_lm_loss(logits, labels)
@@ -82,7 +86,11 @@ class Trainer:
input_ids = batch["input_ids"].to(self.device)
labels = batch["labels"].to(self.device)
logits = self.model(input_ids)
outputs = self.model(input_ids)
if isinstance(outputs, tuple):
logits = outputs[0]
else:
logits = outputs
loss = self.compute_lm_loss(logits, labels)
total_loss += loss.item()

View File

@@ -19,7 +19,7 @@ class TestFeedForward:
# Check internal layers
assert hasattr(ff, '_layer1')
assert hasattr(ff, '_layer2')
assert hasattr(ff, '_relu')
assert hasattr(ff, '_activation')
assert hasattr(ff, '_dropout')
# Check layer dimensions
@@ -78,7 +78,7 @@ class TestFeedForward:
# Manually compute expected output without dropout for deterministic comparison
hidden = ff._layer1(random_float_inputs)
activated = ff._relu(hidden)
activated = ff._activation(hidden)
expected_output = ff._layer2(activated)
# Compare with forward pass in eval mode (no dropout)

View File

@@ -27,7 +27,7 @@ class TestMultiHeadAttention:
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
# Forward pass
output = attention(random_embeddings)
output, _ = attention(random_embeddings)
# Check output shape
assert output.shape == random_embeddings.shape
@@ -43,7 +43,7 @@ class TestMultiHeadAttention:
mask = torch.tril(torch.ones(seq_len, seq_len)) # Causal mask
# Forward pass with mask
output = attention(random_embeddings, mask=mask)
output, _ = attention(random_embeddings, mask=mask)
# Check output shape
assert output.shape == random_embeddings.shape
@@ -58,7 +58,7 @@ class TestMultiHeadAttention:
causal_mask = torch.tril(torch.ones(seq_len, seq_len))
# Forward pass with causal mask
output = attention(random_embeddings, mask=causal_mask)
output, _ = attention(random_embeddings, mask=causal_mask)
# Check output shape
assert output.shape == random_embeddings.shape
@@ -69,7 +69,7 @@ class TestMultiHeadAttention:
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
# Forward pass
output = attention(random_embeddings)
output, _ = attention(random_embeddings)
# Check output shape
assert output.shape == random_embeddings.shape
@@ -80,7 +80,7 @@ class TestMultiHeadAttention:
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
# Forward pass
output = attention(random_embeddings)
output, _ = attention(random_embeddings)
# Create a dummy loss and backward pass
loss = output.sum()
@@ -98,7 +98,7 @@ class TestMultiHeadAttention:
inputs = random_embeddings.to(device)
# Forward pass
output = attention(inputs)
output, _ = attention(inputs)
# Check device consistency
assert output.device == device
@@ -119,7 +119,7 @@ class TestMultiHeadAttention:
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = attention(inputs)
output, _ = attention(inputs)
assert output.shape == inputs.shape
@@ -128,7 +128,7 @@ class TestMultiHeadAttention:
head_size = embed_dim // num_heads
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
output = attention(random_embeddings)
output, _ = attention(random_embeddings)
# Output shouldn't have extreme values
assert output.abs().max() < 100 # Reasonable upper bound
@@ -140,7 +140,7 @@ class TestMultiHeadAttention:
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
inputs = torch.randn(batch_size, seq_len, embed_dim)
output = attention(inputs)
output, _ = attention(inputs)
assert output.shape == (batch_size, seq_len, embed_dim)
@@ -158,8 +158,8 @@ class TestMultiHeadAttention:
attention.eval()
with torch.no_grad():
output1 = attention(base_sequence)
output2 = attention(identical_sequence)
output1, _ = attention(base_sequence)
output2, _ = attention(identical_sequence)
# With identical inputs and same parameters, outputs should be identical
assert torch.allclose(output1, output2, rtol=1e-5)

View File

@@ -98,7 +98,7 @@ def test_multi_head_attention():
batch_size, seq_len = 2, 16
inputs = torch.randn(batch_size, seq_len, emb_size)
output = attention(inputs)
output, _ = attention(inputs)
assert output.shape == inputs.shape
print("✅ Multi-head attention test passed")