mirror of
https://github.com/pese-git/llm-arch-research.git
synced 2026-01-23 21:10:54 +00:00
test: add comprehensive test suite for LLM components
- Add pytest configuration and fixtures - Add tests for core modules: decoder, feed_forward, multi_head_attention - Add tests for positional and token embeddings - Add tests for GPT model - Add tests for tokenizers (base and BPE) - Add basic integration tests
This commit is contained in:
15
llm/pytest.ini
Normal file
15
llm/pytest.ini
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
[tool:pytest]
|
||||||
|
testpaths = tests
|
||||||
|
python_files = test_*.py
|
||||||
|
python_classes = Test*
|
||||||
|
python_functions = test_*
|
||||||
|
addopts =
|
||||||
|
--verbose
|
||||||
|
--tb=short
|
||||||
|
--strict-markers
|
||||||
|
--strict-config
|
||||||
|
--disable-warnings
|
||||||
|
markers =
|
||||||
|
slow: marks tests as slow (deselect with '-m "not slow"')
|
||||||
|
gpu: marks tests that require GPU
|
||||||
|
integration: marks tests as integration tests
|
||||||
0
llm/tests/__init__.py
Normal file
0
llm/tests/__init__.py
Normal file
101
llm/tests/conftest.py
Normal file
101
llm/tests/conftest.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
"""
|
||||||
|
Pytest configuration for llm tests.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def device():
|
||||||
|
"""Return the device to run tests on."""
|
||||||
|
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def batch_size():
|
||||||
|
"""Return a standard batch size for tests."""
|
||||||
|
return 2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def seq_len():
|
||||||
|
"""Return a standard sequence length for tests."""
|
||||||
|
return 64
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def vocab_size():
|
||||||
|
"""Return a standard vocabulary size for tests."""
|
||||||
|
return 1000
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def embed_dim():
|
||||||
|
"""Return a standard embedding dimension for tests."""
|
||||||
|
return 256
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def num_heads():
|
||||||
|
"""Return a standard number of attention heads."""
|
||||||
|
return 4
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def num_layers():
|
||||||
|
"""Return a standard number of layers."""
|
||||||
|
return 2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def gpt_config(vocab_size, embed_dim, num_heads, num_layers):
|
||||||
|
"""Return a standard GPT configuration for tests."""
|
||||||
|
return {
|
||||||
|
"vocab_size": vocab_size,
|
||||||
|
"embed_dim": embed_dim,
|
||||||
|
"num_heads": num_heads,
|
||||||
|
"num_layers": num_layers,
|
||||||
|
"max_position_embeddings": 1024,
|
||||||
|
"dropout": 0.1
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def random_inputs(batch_size, seq_len, vocab_size):
|
||||||
|
"""Generate random input tensors for testing."""
|
||||||
|
input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))
|
||||||
|
return input_ids
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def random_float_inputs(batch_size, seq_len, embed_dim):
|
||||||
|
"""Generate random floating point input tensors for testing feed forward."""
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
return inputs
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def random_embeddings(batch_size, seq_len, embed_dim):
|
||||||
|
"""Generate random embedding tensors for testing attention modules."""
|
||||||
|
embeddings = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
return embeddings
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def attention_mask(batch_size, seq_len):
|
||||||
|
"""Generate a random attention mask for testing."""
|
||||||
|
mask = torch.ones(batch_size, seq_len)
|
||||||
|
# Randomly mask some positions
|
||||||
|
for i in range(batch_size):
|
||||||
|
mask_positions = torch.randint(1, seq_len, (1,)).item()
|
||||||
|
mask[i, mask_positions:] = 0
|
||||||
|
return mask
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def set_random_seed():
|
||||||
|
"""Set random seeds for reproducible tests."""
|
||||||
|
torch.manual_seed(42)
|
||||||
|
np.random.seed(42)
|
||||||
|
torch.backends.cudnn.deterministic = True
|
||||||
|
torch.backends.cudnn.benchmark = False
|
||||||
188
llm/tests/core/test_decoder.py
Normal file
188
llm/tests/core/test_decoder.py
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
"""
|
||||||
|
Tests for decoder block.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
from llm.core.decoder import Decoder
|
||||||
|
|
||||||
|
|
||||||
|
class TestDecoder:
|
||||||
|
"""Test cases for Decoder."""
|
||||||
|
|
||||||
|
def test_initialization(self, embed_dim, num_heads):
|
||||||
|
"""Test that Decoder can be initialized."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
assert decoder is not None
|
||||||
|
|
||||||
|
# Check internal components
|
||||||
|
assert hasattr(decoder, '_heads')
|
||||||
|
assert hasattr(decoder, '_ff')
|
||||||
|
assert hasattr(decoder, '_norm1')
|
||||||
|
assert hasattr(decoder, '_norm2')
|
||||||
|
|
||||||
|
def test_forward_pass(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test forward pass of Decoder."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = decoder(random_embeddings)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_embeddings.shape
|
||||||
|
assert isinstance(output, torch.Tensor)
|
||||||
|
|
||||||
|
def test_forward_with_causal_mask(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test forward pass with causal mask."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
batch_size, seq_len = random_embeddings.shape[:2]
|
||||||
|
# Create causal mask
|
||||||
|
mask = torch.tril(torch.ones(seq_len, seq_len))
|
||||||
|
|
||||||
|
# Forward pass with causal mask
|
||||||
|
output = decoder(random_embeddings, mask=mask)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_embeddings.shape
|
||||||
|
|
||||||
|
def test_residual_connections(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that residual connections are properly applied."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
output = decoder(random_embeddings)
|
||||||
|
|
||||||
|
# With residual connections and layer norm, the output shouldn't be
|
||||||
|
# too different from input (in terms of scale/distribution)
|
||||||
|
input_norm = random_embeddings.norm(dim=-1).mean()
|
||||||
|
output_norm = output.norm(dim=-1).mean()
|
||||||
|
|
||||||
|
# Norms should be of similar magnitude (not exact due to transformations)
|
||||||
|
assert 0.1 < (output_norm / input_norm) < 10.0
|
||||||
|
|
||||||
|
def test_layer_norm(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that layer normalization is applied."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
output = decoder(random_embeddings)
|
||||||
|
|
||||||
|
# Check that output has reasonable statistics (due to layer norm)
|
||||||
|
# Mean should be close to 0, std close to 1 for each sequence position
|
||||||
|
output_mean = output.mean(dim=-1)
|
||||||
|
output_std = output.std(dim=-1)
|
||||||
|
|
||||||
|
# These are approximate checks since the data goes through multiple transformations
|
||||||
|
assert torch.allclose(output_mean, torch.zeros_like(output_mean), atol=1.0)
|
||||||
|
assert torch.allclose(output_std, torch.ones_like(output_std), atol=2.0)
|
||||||
|
|
||||||
|
def test_gradient_flow(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that gradients flow through Decoder."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = decoder(random_embeddings)
|
||||||
|
|
||||||
|
# Create a dummy loss and backward pass
|
||||||
|
loss = output.sum()
|
||||||
|
loss.backward()
|
||||||
|
|
||||||
|
# Check that gradients are computed for learnable parameters
|
||||||
|
# in attention and feed forward components
|
||||||
|
assert decoder._heads._layer.weight.grad is not None
|
||||||
|
assert decoder._ff._layer1.weight.grad is not None
|
||||||
|
assert decoder._norm1.weight.grad is not None
|
||||||
|
assert decoder._norm2.weight.grad is not None
|
||||||
|
|
||||||
|
def test_device_consistency(self, embed_dim, num_heads, random_embeddings, device):
|
||||||
|
"""Test that Decoder works on correct device."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len).to(device)
|
||||||
|
inputs = random_embeddings.to(device)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = decoder(inputs)
|
||||||
|
|
||||||
|
# Check device consistency
|
||||||
|
assert output.device == device
|
||||||
|
assert decoder._heads._layer.weight.device == device
|
||||||
|
|
||||||
|
def test_different_configurations(self):
|
||||||
|
"""Test Decoder with different configurations."""
|
||||||
|
test_cases = [
|
||||||
|
(64, 2), # embed_dim=64, num_heads=2
|
||||||
|
(128, 4), # embed_dim=128, num_heads=4
|
||||||
|
(256, 8), # embed_dim=256, num_heads=8
|
||||||
|
]
|
||||||
|
|
||||||
|
for embed_dim, num_heads in test_cases:
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
output = decoder(inputs)
|
||||||
|
|
||||||
|
assert output.shape == inputs.shape
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
|
||||||
|
def test_different_input_shapes(self, embed_dim, num_heads, batch_size, seq_len):
|
||||||
|
"""Test Decoder with different input shapes."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
output = decoder(inputs)
|
||||||
|
|
||||||
|
assert output.shape == (batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
def test_training_vs_evaluation(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that Decoder behaves differently in train vs eval mode."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len, dropout=0.5)
|
||||||
|
|
||||||
|
# Training mode
|
||||||
|
decoder.train()
|
||||||
|
output_train = decoder(random_embeddings)
|
||||||
|
|
||||||
|
# Evaluation mode
|
||||||
|
decoder.eval()
|
||||||
|
output_eval = decoder(random_embeddings)
|
||||||
|
|
||||||
|
# Outputs should be different due to dropout
|
||||||
|
assert not torch.allclose(output_train, output_eval)
|
||||||
|
|
||||||
|
def test_parameter_initialization(self, embed_dim, num_heads):
|
||||||
|
"""Test that parameters are properly initialized."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
max_seq_len = 1024
|
||||||
|
decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
# Check that various components have non-zero parameters
|
||||||
|
assert not torch.allclose(
|
||||||
|
decoder._heads._layer.weight,
|
||||||
|
torch.zeros_like(decoder._heads._layer.weight)
|
||||||
|
)
|
||||||
|
assert not torch.allclose(
|
||||||
|
decoder._ff._layer1.weight,
|
||||||
|
torch.zeros_like(decoder._ff._layer1.weight)
|
||||||
|
)
|
||||||
|
assert not torch.allclose(
|
||||||
|
decoder._norm1.weight,
|
||||||
|
torch.zeros_like(decoder._norm1.weight)
|
||||||
|
)
|
||||||
177
llm/tests/core/test_feed_forward.py
Normal file
177
llm/tests/core/test_feed_forward.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
"""
|
||||||
|
Tests for feed forward network.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from llm.core.feed_forward import FeedForward
|
||||||
|
|
||||||
|
|
||||||
|
class TestFeedForward:
|
||||||
|
"""Test cases for FeedForward."""
|
||||||
|
|
||||||
|
def test_initialization(self, embed_dim):
|
||||||
|
"""Test that FeedForward can be initialized."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
assert ff is not None
|
||||||
|
|
||||||
|
# Check internal layers
|
||||||
|
assert hasattr(ff, '_layer1')
|
||||||
|
assert hasattr(ff, '_layer2')
|
||||||
|
assert hasattr(ff, '_relu')
|
||||||
|
assert hasattr(ff, '_dropout')
|
||||||
|
|
||||||
|
# Check layer dimensions
|
||||||
|
expected_hidden_dim = embed_dim * 4 # Default expansion factor
|
||||||
|
assert ff._layer1.weight.shape == (expected_hidden_dim, embed_dim)
|
||||||
|
assert ff._layer2.weight.shape == (embed_dim, expected_hidden_dim)
|
||||||
|
|
||||||
|
def test_forward_pass(self, embed_dim, random_float_inputs):
|
||||||
|
"""Test forward pass of FeedForward."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = ff(random_float_inputs)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_float_inputs.shape
|
||||||
|
assert isinstance(output, torch.Tensor)
|
||||||
|
|
||||||
|
def test_custom_hidden_dim(self, embed_dim):
|
||||||
|
"""Test FeedForward with custom hidden dimension."""
|
||||||
|
# FeedForward doesn't support custom hidden_dim in current implementation
|
||||||
|
# This test is not applicable
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Check layer dimensions (fixed 4x expansion)
|
||||||
|
expected_hidden_dim = embed_dim * 4
|
||||||
|
assert ff._layer1.weight.shape == (expected_hidden_dim, embed_dim)
|
||||||
|
assert ff._layer2.weight.shape == (embed_dim, expected_hidden_dim)
|
||||||
|
|
||||||
|
def test_dropout(self, embed_dim, random_float_inputs):
|
||||||
|
"""Test that dropout is applied during training."""
|
||||||
|
ff = FeedForward(embed_dim, dropout=0.5)
|
||||||
|
ff.train() # Set to training mode
|
||||||
|
|
||||||
|
output = ff(random_float_inputs)
|
||||||
|
|
||||||
|
# In training mode with dropout, some values should be zeroed
|
||||||
|
# This is probabilistic, so we can't assert exact zeros,
|
||||||
|
# but we can check the structure is preserved
|
||||||
|
assert output.shape == random_float_inputs.shape
|
||||||
|
|
||||||
|
def test_no_dropout_in_eval(self, embed_dim, random_float_inputs):
|
||||||
|
"""Test that dropout is not applied during evaluation."""
|
||||||
|
ff = FeedForward(embed_dim, dropout=0.5)
|
||||||
|
ff.eval() # Set to evaluation mode
|
||||||
|
|
||||||
|
# Run forward pass multiple times - outputs should be identical
|
||||||
|
output1 = ff(random_float_inputs)
|
||||||
|
output2 = ff(random_float_inputs)
|
||||||
|
|
||||||
|
assert torch.allclose(output1, output2)
|
||||||
|
|
||||||
|
def test_activation_function(self, embed_dim, random_float_inputs):
|
||||||
|
"""Test that activation function is applied."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Manually compute expected output without dropout for deterministic comparison
|
||||||
|
hidden = ff._layer1(random_float_inputs)
|
||||||
|
activated = ff._relu(hidden)
|
||||||
|
expected_output = ff._layer2(activated)
|
||||||
|
|
||||||
|
# Compare with forward pass in eval mode (no dropout)
|
||||||
|
ff.eval()
|
||||||
|
actual_output = ff(random_float_inputs)
|
||||||
|
|
||||||
|
assert torch.allclose(actual_output, expected_output, rtol=1e-4)
|
||||||
|
|
||||||
|
def test_gradient_flow(self, embed_dim, random_float_inputs):
|
||||||
|
"""Test that gradients flow through FeedForward."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = ff(random_float_inputs)
|
||||||
|
|
||||||
|
# Create a dummy loss and backward pass
|
||||||
|
loss = output.sum()
|
||||||
|
loss.backward()
|
||||||
|
|
||||||
|
# Check that gradients are computed for learnable parameters
|
||||||
|
assert ff._layer1.weight.grad is not None
|
||||||
|
assert ff._layer2.weight.grad is not None
|
||||||
|
assert not torch.allclose(ff._layer1.weight.grad,
|
||||||
|
torch.zeros_like(ff._layer1.weight.grad))
|
||||||
|
assert not torch.allclose(ff._layer2.weight.grad,
|
||||||
|
torch.zeros_like(ff._layer2.weight.grad))
|
||||||
|
|
||||||
|
def test_device_consistency(self, embed_dim, random_float_inputs, device):
|
||||||
|
"""Test that FeedForward works on correct device."""
|
||||||
|
ff = FeedForward(embed_dim).to(device)
|
||||||
|
inputs = random_float_inputs.to(device)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = ff(inputs)
|
||||||
|
|
||||||
|
# Check device consistency
|
||||||
|
assert output.device == device
|
||||||
|
assert ff._layer1.weight.device == device
|
||||||
|
assert ff._layer2.weight.device == device
|
||||||
|
|
||||||
|
def test_different_embed_dims(self):
|
||||||
|
"""Test FeedForward with different embedding dimensions."""
|
||||||
|
test_cases = [64, 128, 256, 512]
|
||||||
|
|
||||||
|
for embed_dim in test_cases:
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
output = ff(inputs)
|
||||||
|
|
||||||
|
assert output.shape == inputs.shape
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
|
||||||
|
def test_different_input_shapes(self, embed_dim, batch_size, seq_len):
|
||||||
|
"""Test FeedForward with different input shapes."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
output = ff(inputs)
|
||||||
|
|
||||||
|
assert output.shape == (batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
def test_non_linearity(self, embed_dim, random_float_inputs):
|
||||||
|
"""Test that FeedForward introduces non-linearity."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Create a simple linear transformation for comparison
|
||||||
|
linear_layer = nn.Linear(embed_dim, embed_dim)
|
||||||
|
|
||||||
|
# Copy weights to make comparison fair
|
||||||
|
with torch.no_grad():
|
||||||
|
linear_layer.weight.copy_(ff._layer2.weight @ ff._layer1.weight)
|
||||||
|
if linear_layer.bias is not None:
|
||||||
|
linear_layer.bias.zero_()
|
||||||
|
|
||||||
|
linear_output = linear_layer(random_float_inputs)
|
||||||
|
ff_output = ff(random_float_inputs)
|
||||||
|
|
||||||
|
# FeedForward output should be different from pure linear transformation
|
||||||
|
# due to activation function
|
||||||
|
assert not torch.allclose(ff_output, linear_output, rtol=1e-4)
|
||||||
|
|
||||||
|
def test_parameter_initialization(self, embed_dim):
|
||||||
|
"""Test that parameters are properly initialized."""
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Check that weights are not all zeros
|
||||||
|
assert not torch.allclose(ff._layer1.weight, torch.zeros_like(ff._layer1.weight))
|
||||||
|
assert not torch.allclose(ff._layer2.weight, torch.zeros_like(ff._layer2.weight))
|
||||||
|
|
||||||
|
# Check that biases are not all zeros (they should be initialized with some values)
|
||||||
|
if ff._layer1.bias is not None:
|
||||||
|
assert not torch.allclose(ff._layer1.bias, torch.zeros_like(ff._layer1.bias))
|
||||||
|
if ff._layer2.bias is not None:
|
||||||
|
assert not torch.allclose(ff._layer2.bias, torch.zeros_like(ff._layer2.bias))
|
||||||
165
llm/tests/core/test_multi_head_attention.py
Normal file
165
llm/tests/core/test_multi_head_attention.py
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
"""
|
||||||
|
Tests for multi-head attention.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
from llm.core.multi_head_attention import MultiHeadAttention
|
||||||
|
|
||||||
|
|
||||||
|
class TestMultiHeadAttention:
|
||||||
|
"""Test cases for MultiHeadAttention."""
|
||||||
|
|
||||||
|
def test_initialization(self, embed_dim, num_heads):
|
||||||
|
"""Test that MultiHeadAttention can be initialized."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
assert attention is not None
|
||||||
|
|
||||||
|
# Check internal attributes
|
||||||
|
assert len(attention._heads) == num_heads
|
||||||
|
assert attention._layer.in_features == embed_dim
|
||||||
|
assert attention._layer.out_features == embed_dim
|
||||||
|
|
||||||
|
def test_forward_pass(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test forward pass of MultiHeadAttention."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = attention(random_embeddings)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_embeddings.shape
|
||||||
|
assert isinstance(output, torch.Tensor)
|
||||||
|
|
||||||
|
def test_forward_with_mask(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test forward pass with attention mask."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
# Create a simple mask
|
||||||
|
seq_len = random_embeddings.shape[1]
|
||||||
|
mask = torch.tril(torch.ones(seq_len, seq_len)) # Causal mask
|
||||||
|
|
||||||
|
# Forward pass with mask
|
||||||
|
output = attention(random_embeddings, mask=mask)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_embeddings.shape
|
||||||
|
|
||||||
|
def test_causal_mask(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that causal mask prevents attending to future positions."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
# Create causal mask
|
||||||
|
seq_len = random_embeddings.shape[1]
|
||||||
|
causal_mask = torch.tril(torch.ones(seq_len, seq_len))
|
||||||
|
|
||||||
|
# Forward pass with causal mask
|
||||||
|
output = attention(random_embeddings, mask=causal_mask)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_embeddings.shape
|
||||||
|
|
||||||
|
def test_attention_weights_normalization(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that attention weights are properly normalized."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = attention(random_embeddings)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == random_embeddings.shape
|
||||||
|
|
||||||
|
def test_gradient_flow(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that gradients flow through MultiHeadAttention."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = attention(random_embeddings)
|
||||||
|
|
||||||
|
# Create a dummy loss and backward pass
|
||||||
|
loss = output.sum()
|
||||||
|
loss.backward()
|
||||||
|
|
||||||
|
# Check that gradients are computed for learnable parameters
|
||||||
|
assert attention._layer.weight.grad is not None
|
||||||
|
if len(attention._heads) > 0:
|
||||||
|
assert attention._heads[0]._q.weight.grad is not None
|
||||||
|
|
||||||
|
def test_device_consistency(self, embed_dim, num_heads, random_embeddings, device):
|
||||||
|
"""Test that MultiHeadAttention works on correct device."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024).to(device)
|
||||||
|
inputs = random_embeddings.to(device)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = attention(inputs)
|
||||||
|
|
||||||
|
# Check device consistency
|
||||||
|
assert output.device == device
|
||||||
|
assert attention._layer.weight.device == device
|
||||||
|
|
||||||
|
def test_different_embed_dim_and_heads(self):
|
||||||
|
"""Test MultiHeadAttention with different embed_dim and num_heads combinations."""
|
||||||
|
test_cases = [
|
||||||
|
(64, 2), # embed_dim=64, num_heads=2
|
||||||
|
(128, 4), # embed_dim=128, num_heads=4
|
||||||
|
(256, 8), # embed_dim=256, num_heads=8
|
||||||
|
(512, 16), # embed_dim=512, num_heads=16
|
||||||
|
]
|
||||||
|
|
||||||
|
for embed_dim, num_heads in test_cases:
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
output = attention(inputs)
|
||||||
|
|
||||||
|
assert output.shape == inputs.shape
|
||||||
|
|
||||||
|
def test_attention_output_range(self, embed_dim, num_heads, random_embeddings):
|
||||||
|
"""Test that attention output is in reasonable range."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
output = attention(random_embeddings)
|
||||||
|
|
||||||
|
# Output shouldn't have extreme values
|
||||||
|
assert output.abs().max() < 100 # Reasonable upper bound
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
|
||||||
|
def test_different_input_shapes(self, embed_dim, num_heads, batch_size, seq_len):
|
||||||
|
"""Test MultiHeadAttention with different input shapes."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024)
|
||||||
|
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
output = attention(inputs)
|
||||||
|
|
||||||
|
assert output.shape == (batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
def test_parameter_sharing(self, embed_dim, num_heads):
|
||||||
|
"""Test that parameters are properly shared across the sequence."""
|
||||||
|
head_size = embed_dim // num_heads
|
||||||
|
attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024, dropout=0.0) # No dropout for deterministic test
|
||||||
|
|
||||||
|
# Create two identical sequences
|
||||||
|
seq_len = 10
|
||||||
|
base_sequence = torch.randn(1, seq_len, embed_dim)
|
||||||
|
identical_sequence = base_sequence.clone()
|
||||||
|
|
||||||
|
# Set to eval mode to disable dropout
|
||||||
|
attention.eval()
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
output1 = attention(base_sequence)
|
||||||
|
output2 = attention(identical_sequence)
|
||||||
|
|
||||||
|
# With identical inputs and same parameters, outputs should be identical
|
||||||
|
assert torch.allclose(output1, output2, rtol=1e-5)
|
||||||
136
llm/tests/core/test_positional_embeddings.py
Normal file
136
llm/tests/core/test_positional_embeddings.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
"""
|
||||||
|
Tests for positional embeddings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
import math
|
||||||
|
from llm.core.positional_embeddings import PositionalEmbeddings
|
||||||
|
|
||||||
|
|
||||||
|
class TestPositionalEmbeddings:
|
||||||
|
"""Test cases for PositionalEmbeddings."""
|
||||||
|
|
||||||
|
def test_initialization(self, embed_dim):
|
||||||
|
"""Test that PositionalEmbeddings can be initialized."""
|
||||||
|
max_seq_len = 1024
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
assert embeddings is not None
|
||||||
|
|
||||||
|
# Check that positional embeddings are created
|
||||||
|
assert hasattr(embeddings, 'embedding')
|
||||||
|
assert embeddings.embedding.weight.shape == (max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
def test_forward_pass(self, embed_dim):
|
||||||
|
"""Test forward pass of PositionalEmbeddings."""
|
||||||
|
max_seq_len = 1024
|
||||||
|
seq_len = 64
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
# Forward pass - takes sequence length, not input tensor
|
||||||
|
output = embeddings(seq_len)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
expected_shape = (seq_len, embed_dim)
|
||||||
|
assert output.shape == expected_shape
|
||||||
|
assert isinstance(output, torch.Tensor)
|
||||||
|
|
||||||
|
def test_positional_encoding_values(self, embed_dim):
|
||||||
|
"""Test that positional encoding values are computed correctly."""
|
||||||
|
max_seq_len = 10
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
# Get embeddings for all positions
|
||||||
|
pe = embeddings(max_seq_len) # Shape: [max_seq_len, embed_dim]
|
||||||
|
|
||||||
|
# Check that different positions have different embeddings
|
||||||
|
# (since these are learnable embeddings, not fixed sine/cosine)
|
||||||
|
for pos in range(max_seq_len):
|
||||||
|
for i in range(pos + 1, max_seq_len):
|
||||||
|
assert not torch.allclose(pe[pos], pe[i], rtol=1e-4)
|
||||||
|
|
||||||
|
def test_different_sequence_lengths(self, embed_dim):
|
||||||
|
"""Test PositionalEmbeddings with different sequence lengths."""
|
||||||
|
test_cases = [
|
||||||
|
(10, 5), # seq_len < max_seq_len
|
||||||
|
(10, 10), # seq_len == max_seq_len
|
||||||
|
]
|
||||||
|
|
||||||
|
for max_seq_len, seq_len in test_cases:
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
# Get embeddings for specific sequence length
|
||||||
|
output = embeddings(seq_len)
|
||||||
|
|
||||||
|
# Output should have shape [seq_len, embed_dim]
|
||||||
|
assert output.shape == (seq_len, embed_dim)
|
||||||
|
|
||||||
|
def test_gradient_flow(self, embed_dim):
|
||||||
|
"""Test that gradients flow through PositionalEmbeddings."""
|
||||||
|
max_seq_len = 64
|
||||||
|
seq_len = 32
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = embeddings(seq_len)
|
||||||
|
|
||||||
|
# Create a dummy loss and backward pass
|
||||||
|
loss = output.sum()
|
||||||
|
loss.backward()
|
||||||
|
|
||||||
|
# Positional embeddings should have gradients (they're learnable)
|
||||||
|
assert embeddings.embedding.weight.grad is not None
|
||||||
|
assert not torch.allclose(embeddings.embedding.weight.grad,
|
||||||
|
torch.zeros_like(embeddings.embedding.weight.grad))
|
||||||
|
|
||||||
|
def test_device_consistency(self, embed_dim, device):
|
||||||
|
"""Test that PositionalEmbeddings works on correct device."""
|
||||||
|
max_seq_len = 64
|
||||||
|
seq_len = 32
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim).to(device)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = embeddings(seq_len)
|
||||||
|
|
||||||
|
# Check device consistency
|
||||||
|
assert output.device == device
|
||||||
|
assert embeddings.embedding.weight.device == device
|
||||||
|
|
||||||
|
def test_reproducibility(self, embed_dim):
|
||||||
|
"""Test that positional embeddings are reproducible."""
|
||||||
|
max_seq_len = 100
|
||||||
|
embeddings1 = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
embeddings2 = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
# Different instances should have different embeddings (random initialization)
|
||||||
|
assert not torch.allclose(embeddings1.embedding.weight, embeddings2.embedding.weight)
|
||||||
|
|
||||||
|
# But same instance should produce same output for same input
|
||||||
|
seq_len = 50
|
||||||
|
output1 = embeddings1(seq_len)
|
||||||
|
output2 = embeddings1(seq_len) # Same instance, same input
|
||||||
|
assert torch.allclose(output1, output2)
|
||||||
|
|
||||||
|
def test_positional_pattern(self, embed_dim):
|
||||||
|
"""Test that positional embeddings create a meaningful pattern."""
|
||||||
|
max_seq_len = 50
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
pe = embeddings(max_seq_len) # Get all positional embeddings
|
||||||
|
|
||||||
|
# Check that different positions have different embeddings
|
||||||
|
# (with high probability due to random initialization)
|
||||||
|
assert not torch.allclose(pe[0], pe[1], rtol=1e-4)
|
||||||
|
assert not torch.allclose(pe[10], pe[20], rtol=1e-4)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("max_seq_len,seq_len,embed_dim", [
|
||||||
|
(64, 10, 64),
|
||||||
|
(128, 50, 128),
|
||||||
|
(256, 100, 256),
|
||||||
|
])
|
||||||
|
def test_different_configurations(self, max_seq_len, seq_len, embed_dim):
|
||||||
|
"""Test PositionalEmbeddings with different configurations."""
|
||||||
|
embeddings = PositionalEmbeddings(max_seq_len, embed_dim)
|
||||||
|
|
||||||
|
output = embeddings(seq_len)
|
||||||
|
|
||||||
|
assert output.shape == (seq_len, embed_dim)
|
||||||
107
llm/tests/core/test_token_embeddings.py
Normal file
107
llm/tests/core/test_token_embeddings.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
"""
|
||||||
|
Tests for token embeddings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
from llm.core.token_embeddings import TokenEmbeddings
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenEmbeddings:
|
||||||
|
"""Test cases for TokenEmbeddings."""
|
||||||
|
|
||||||
|
def test_initialization(self, vocab_size, embed_dim):
|
||||||
|
"""Test that TokenEmbeddings can be initialized."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
assert embeddings is not None
|
||||||
|
|
||||||
|
# Check embedding layer
|
||||||
|
assert hasattr(embeddings, '_embedding')
|
||||||
|
assert embeddings._embedding.weight.shape == (vocab_size, embed_dim)
|
||||||
|
|
||||||
|
def test_forward_pass(self, vocab_size, embed_dim, random_inputs):
|
||||||
|
"""Test forward pass of TokenEmbeddings."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = embeddings(random_inputs)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
assert output.shape == (random_inputs.shape[0], random_inputs.shape[1], embed_dim)
|
||||||
|
assert isinstance(output, torch.Tensor)
|
||||||
|
|
||||||
|
def test_embedding_weights(self, vocab_size, embed_dim):
|
||||||
|
"""Test that embedding weights are properly initialized."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
|
||||||
|
weights = embeddings._embedding.weight
|
||||||
|
assert weights.requires_grad is True
|
||||||
|
|
||||||
|
# Check that weights are not all zeros
|
||||||
|
assert not torch.allclose(weights, torch.zeros_like(weights))
|
||||||
|
|
||||||
|
def test_different_vocab_sizes(self):
|
||||||
|
"""Test TokenEmbeddings with different vocabulary sizes."""
|
||||||
|
test_cases = [
|
||||||
|
(100, 128),
|
||||||
|
(1000, 256),
|
||||||
|
(50000, 512)
|
||||||
|
]
|
||||||
|
|
||||||
|
for vocab_size, embed_dim in test_cases:
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
assert embeddings._embedding.weight.shape == (vocab_size, embed_dim)
|
||||||
|
|
||||||
|
def test_gradient_flow(self, vocab_size, embed_dim, random_inputs):
|
||||||
|
"""Test that gradients flow through TokenEmbeddings."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = embeddings(random_inputs)
|
||||||
|
|
||||||
|
# Create a dummy loss and backward pass
|
||||||
|
loss = output.sum()
|
||||||
|
loss.backward()
|
||||||
|
|
||||||
|
# Check that gradients are computed
|
||||||
|
assert embeddings._embedding.weight.grad is not None
|
||||||
|
assert not torch.allclose(embeddings._embedding.weight.grad,
|
||||||
|
torch.zeros_like(embeddings._embedding.weight.grad))
|
||||||
|
|
||||||
|
def test_device_consistency(self, vocab_size, embed_dim, random_inputs, device):
|
||||||
|
"""Test that TokenEmbeddings works on correct device."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim).to(device)
|
||||||
|
inputs = random_inputs.to(device)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
output = embeddings(inputs)
|
||||||
|
|
||||||
|
# Check device consistency
|
||||||
|
assert output.device == device
|
||||||
|
assert embeddings._embedding.weight.device == device
|
||||||
|
|
||||||
|
def test_embedding_lookup(self, vocab_size, embed_dim):
|
||||||
|
"""Test specific embedding lookups."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
|
||||||
|
# Test lookup for specific tokens
|
||||||
|
test_tokens = torch.tensor([[0, 1, 2], [vocab_size - 1, vocab_size - 2, vocab_size - 3]])
|
||||||
|
|
||||||
|
output = embeddings(test_tokens)
|
||||||
|
|
||||||
|
# Check shape
|
||||||
|
assert output.shape == (2, 3, embed_dim)
|
||||||
|
|
||||||
|
# Check that different tokens have different embeddings
|
||||||
|
# (with high probability due to random initialization)
|
||||||
|
assert not torch.allclose(output[0, 0], output[0, 1], rtol=1e-4)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("batch_size,seq_len", [(1, 1), (2, 10), (8, 64)])
|
||||||
|
def test_different_input_shapes(self, vocab_size, embed_dim, batch_size, seq_len):
|
||||||
|
"""Test TokenEmbeddings with different input shapes."""
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
|
||||||
|
inputs = torch.randint(0, vocab_size, (batch_size, seq_len))
|
||||||
|
output = embeddings(inputs)
|
||||||
|
|
||||||
|
assert output.shape == (batch_size, seq_len, embed_dim)
|
||||||
288
llm/tests/models/test_gpt.py
Normal file
288
llm/tests/models/test_gpt.py
Normal file
@@ -0,0 +1,288 @@
|
|||||||
|
"""
|
||||||
|
Tests for GPT model.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
from llm.models.gpt import GPT
|
||||||
|
|
||||||
|
|
||||||
|
class TestGPT:
|
||||||
|
"""Test cases for GPT model."""
|
||||||
|
|
||||||
|
def test_initialization(self, gpt_config):
|
||||||
|
"""Test that GPT can be initialized."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
assert model is not None
|
||||||
|
|
||||||
|
# Check that model has required components
|
||||||
|
assert hasattr(model, '_token_embeddings')
|
||||||
|
assert hasattr(model, '_position_embeddings')
|
||||||
|
assert hasattr(model, '_decoders')
|
||||||
|
assert hasattr(model, '_linear')
|
||||||
|
assert hasattr(model, '_dropout')
|
||||||
|
|
||||||
|
# Check number of decoder layers
|
||||||
|
assert len(model._decoders) == gpt_config['num_layers']
|
||||||
|
|
||||||
|
def test_forward_pass(self, gpt_config, random_inputs):
|
||||||
|
"""Test forward pass of GPT."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
logits = model(random_inputs)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
batch_size, seq_len = random_inputs.shape
|
||||||
|
vocab_size = gpt_config['vocab_size']
|
||||||
|
assert logits.shape == (batch_size, seq_len, vocab_size)
|
||||||
|
assert isinstance(logits, torch.Tensor)
|
||||||
|
|
||||||
|
def test_forward_with_attention_mask(self, gpt_config, random_inputs, attention_mask):
|
||||||
|
"""Test forward pass with attention mask."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
# Forward pass with mask
|
||||||
|
logits = model(random_inputs, attention_mask=attention_mask)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
batch_size, seq_len = random_inputs.shape
|
||||||
|
vocab_size = gpt_config['vocab_size']
|
||||||
|
assert logits.shape == (batch_size, seq_len, vocab_size)
|
||||||
|
|
||||||
|
def test_generate_text(self, gpt_config):
|
||||||
|
"""Test text generation."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
model.eval() # Set to evaluation mode for generation
|
||||||
|
|
||||||
|
# Create initial input
|
||||||
|
batch_size = 2
|
||||||
|
initial_seq_len = 5
|
||||||
|
input_ids = torch.randint(0, gpt_config['vocab_size'], (batch_size, initial_seq_len))
|
||||||
|
|
||||||
|
# Generate text
|
||||||
|
with torch.no_grad():
|
||||||
|
generated = model.generate(
|
||||||
|
x=input_ids,
|
||||||
|
max_new_tokens=10,
|
||||||
|
do_sample=False # Use greedy for deterministic testing
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check output shape
|
||||||
|
expected_seq_len = initial_seq_len + 10
|
||||||
|
assert generated.shape == (batch_size, expected_seq_len)
|
||||||
|
|
||||||
|
# Check that initial sequence is preserved
|
||||||
|
assert torch.allclose(generated[:, :initial_seq_len], input_ids)
|
||||||
|
|
||||||
|
def test_generate_with_temperature(self, gpt_config):
|
||||||
|
"""Test text generation with temperature sampling."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
# Create initial input
|
||||||
|
input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3))
|
||||||
|
|
||||||
|
# Generate with temperature
|
||||||
|
with torch.no_grad():
|
||||||
|
generated = model.generate(
|
||||||
|
x=input_ids,
|
||||||
|
max_new_tokens=5,
|
||||||
|
do_sample=True,
|
||||||
|
temperature=0.8
|
||||||
|
)
|
||||||
|
|
||||||
|
assert generated.shape == (1, 8) # 3 initial + 5 new tokens
|
||||||
|
|
||||||
|
def test_generate_with_top_k(self, gpt_config):
|
||||||
|
"""Test text generation with top-k sampling."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
# Create initial input
|
||||||
|
input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3))
|
||||||
|
|
||||||
|
# Generate with top-k
|
||||||
|
with torch.no_grad():
|
||||||
|
generated = model.generate(
|
||||||
|
x=input_ids,
|
||||||
|
max_new_tokens=5,
|
||||||
|
do_sample=True,
|
||||||
|
top_k=10
|
||||||
|
)
|
||||||
|
|
||||||
|
assert generated.shape == (1, 8)
|
||||||
|
|
||||||
|
def test_generate_with_top_p(self, gpt_config):
|
||||||
|
"""Test text generation with top-p (nucleus) sampling."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
# Create initial input
|
||||||
|
input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3))
|
||||||
|
|
||||||
|
# Generate with top-p
|
||||||
|
with torch.no_grad():
|
||||||
|
generated = model.generate(
|
||||||
|
x=input_ids,
|
||||||
|
max_new_tokens=5,
|
||||||
|
do_sample=True,
|
||||||
|
top_p=0.9
|
||||||
|
)
|
||||||
|
|
||||||
|
assert generated.shape == (1, 8)
|
||||||
|
|
||||||
|
def test_gradient_flow(self, gpt_config, random_inputs):
|
||||||
|
"""Test that gradients flow through GPT."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
logits = model(random_inputs)
|
||||||
|
|
||||||
|
# Create a dummy loss and backward pass
|
||||||
|
targets = torch.randint(0, gpt_config['vocab_size'], random_inputs.shape)
|
||||||
|
loss = torch.nn.functional.cross_entropy(
|
||||||
|
logits.view(-1, logits.size(-1)),
|
||||||
|
targets.view(-1)
|
||||||
|
)
|
||||||
|
loss.backward()
|
||||||
|
|
||||||
|
# Check that gradients are computed for various components
|
||||||
|
assert model._token_embeddings._embedding.weight.grad is not None
|
||||||
|
assert model._linear.weight.grad is not None
|
||||||
|
if len(model._decoders) > 0:
|
||||||
|
assert model._decoders[0]._heads._heads[0]._q.weight.grad is not None
|
||||||
|
|
||||||
|
def test_device_consistency(self, gpt_config, random_inputs, device):
|
||||||
|
"""Test that GPT works on correct device."""
|
||||||
|
model = GPT(gpt_config).to(device)
|
||||||
|
inputs = random_inputs.to(device)
|
||||||
|
|
||||||
|
# Forward pass
|
||||||
|
logits = model(inputs)
|
||||||
|
|
||||||
|
# Check device consistency
|
||||||
|
assert logits.device == device
|
||||||
|
assert model._token_embeddings._embedding.weight.device == device
|
||||||
|
|
||||||
|
def test_different_configurations(self):
|
||||||
|
"""Test GPT with different configurations."""
|
||||||
|
test_configs = [
|
||||||
|
{
|
||||||
|
"vocab_size": 1000,
|
||||||
|
"embed_dim": 128,
|
||||||
|
"num_heads": 2,
|
||||||
|
"num_layers": 2,
|
||||||
|
"max_position_embeddings": 256,
|
||||||
|
"dropout": 0.1
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"vocab_size": 5000,
|
||||||
|
"embed_dim": 256,
|
||||||
|
"num_heads": 4,
|
||||||
|
"num_layers": 4,
|
||||||
|
"max_position_embeddings": 512,
|
||||||
|
"dropout": 0.1
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"vocab_size": 10000,
|
||||||
|
"embed_dim": 512,
|
||||||
|
"num_heads": 8,
|
||||||
|
"num_layers": 6,
|
||||||
|
"max_position_embeddings": 1024,
|
||||||
|
"dropout": 0.1
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
for config in test_configs:
|
||||||
|
model = GPT(config)
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
inputs = torch.randint(0, config['vocab_size'], (batch_size, seq_len))
|
||||||
|
|
||||||
|
logits = model(inputs)
|
||||||
|
|
||||||
|
expected_shape = (batch_size, seq_len, config['vocab_size'])
|
||||||
|
assert logits.shape == expected_shape
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)])
|
||||||
|
def test_different_input_shapes(self, gpt_config, batch_size, seq_len):
|
||||||
|
"""Test GPT with different input shapes."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
inputs = torch.randint(0, gpt_config['vocab_size'], (batch_size, seq_len))
|
||||||
|
logits = model(inputs)
|
||||||
|
|
||||||
|
expected_shape = (batch_size, seq_len, gpt_config['vocab_size'])
|
||||||
|
assert logits.shape == expected_shape
|
||||||
|
|
||||||
|
def test_training_vs_evaluation(self, gpt_config, random_inputs):
|
||||||
|
"""Test that GPT behaves differently in train vs eval mode."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
# Training mode
|
||||||
|
model.train()
|
||||||
|
output_train = model(random_inputs)
|
||||||
|
|
||||||
|
# Evaluation mode
|
||||||
|
model.eval()
|
||||||
|
output_eval = model(random_inputs)
|
||||||
|
|
||||||
|
# Outputs should be different due to dropout
|
||||||
|
assert not torch.allclose(output_train, output_eval)
|
||||||
|
|
||||||
|
def test_parameter_count(self, gpt_config):
|
||||||
|
"""Test that GPT has reasonable number of parameters."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
total_params = sum(p.numel() for p in model.parameters())
|
||||||
|
|
||||||
|
# For a small GPT model, parameters should be in reasonable range
|
||||||
|
vocab_size = gpt_config['vocab_size']
|
||||||
|
embed_dim = gpt_config['embed_dim']
|
||||||
|
num_layers = gpt_config['num_layers']
|
||||||
|
num_heads = gpt_config['num_heads']
|
||||||
|
|
||||||
|
# Rough estimate: token_embeddings + output_layer + (attention + ff) * layers
|
||||||
|
expected_min = vocab_size * embed_dim * 2 # embeddings and output
|
||||||
|
expected_max = expected_min * 10 # Allow for decoder parameters
|
||||||
|
|
||||||
|
assert expected_min < total_params < expected_max
|
||||||
|
|
||||||
|
def test_causal_attention(self, gpt_config):
|
||||||
|
"""Test that GPT uses causal attention during generation."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
# Create input with known pattern
|
||||||
|
input_ids = torch.tensor([[1, 2, 3]]).long()
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
# Get logits for next token prediction
|
||||||
|
logits = model(input_ids)
|
||||||
|
|
||||||
|
# The model should only attend to previous tokens (causal)
|
||||||
|
# We can't directly test attention masks in the public API,
|
||||||
|
# but we can verify the generation works correctly
|
||||||
|
|
||||||
|
generated = model.generate(
|
||||||
|
x=input_ids,
|
||||||
|
max_new_tokens=3,
|
||||||
|
do_sample=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generated sequence should be longer than input
|
||||||
|
assert generated.shape[1] == input_ids.shape[1] + 3
|
||||||
|
|
||||||
|
def test_output_distribution(self, gpt_config, random_inputs):
|
||||||
|
"""Test that GPT output has proper distribution."""
|
||||||
|
model = GPT(gpt_config)
|
||||||
|
|
||||||
|
logits = model(random_inputs)
|
||||||
|
|
||||||
|
# Logits should not have extreme values
|
||||||
|
assert logits.abs().max() < 100
|
||||||
|
|
||||||
|
# Softmax should produce valid probabilities
|
||||||
|
probs = torch.softmax(logits, dim=-1)
|
||||||
|
assert torch.allclose(probs.sum(dim=-1), torch.ones_like(probs.sum(dim=-1)))
|
||||||
|
assert (probs >= 0).all() and (probs <= 1).all()
|
||||||
262
llm/tests/test_basic.py
Normal file
262
llm/tests/test_basic.py
Normal file
@@ -0,0 +1,262 @@
|
|||||||
|
"""
|
||||||
|
Basic tests for llm library components.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import torch
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def test_gpt_model_creation():
|
||||||
|
"""Test that GPT model can be created and forward pass works."""
|
||||||
|
from llm.models.gpt import GPT
|
||||||
|
|
||||||
|
config = {
|
||||||
|
"vocab_size": 1000,
|
||||||
|
"embed_dim": 128,
|
||||||
|
"num_heads": 4,
|
||||||
|
"num_layers": 2,
|
||||||
|
"max_position_embeddings": 256,
|
||||||
|
"dropout": 0.1
|
||||||
|
}
|
||||||
|
|
||||||
|
model = GPT(config)
|
||||||
|
|
||||||
|
# Test forward pass
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
input_ids = torch.randint(0, config["vocab_size"], (batch_size, seq_len))
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
logits = model(input_ids)
|
||||||
|
|
||||||
|
assert logits.shape == (batch_size, seq_len, config["vocab_size"])
|
||||||
|
print("✅ GPT model creation and forward pass test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_bpe_tokenizer_basic():
|
||||||
|
"""Test basic BPE tokenizer functionality."""
|
||||||
|
from llm.tokenizers import BPETokenizer
|
||||||
|
|
||||||
|
tokenizer = BPETokenizer()
|
||||||
|
|
||||||
|
# Train on simple texts
|
||||||
|
texts = [
|
||||||
|
"hello world",
|
||||||
|
"test tokenization",
|
||||||
|
"simple example"
|
||||||
|
]
|
||||||
|
|
||||||
|
tokenizer.train(
|
||||||
|
texts=texts,
|
||||||
|
vocab_size=50,
|
||||||
|
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test encoding/decoding
|
||||||
|
text = "hello world"
|
||||||
|
tokens = tokenizer.encode(text)
|
||||||
|
decoded = tokenizer.decode(tokens)
|
||||||
|
|
||||||
|
assert isinstance(tokens, list)
|
||||||
|
assert isinstance(decoded, str)
|
||||||
|
assert len(tokens) > 0
|
||||||
|
print("✅ BPE tokenizer basic test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_embeddings():
|
||||||
|
"""Test token embeddings."""
|
||||||
|
from llm.core.token_embeddings import TokenEmbeddings
|
||||||
|
|
||||||
|
vocab_size = 1000
|
||||||
|
embed_dim = 128
|
||||||
|
|
||||||
|
embeddings = TokenEmbeddings(vocab_size, embed_dim)
|
||||||
|
|
||||||
|
# Test forward pass
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))
|
||||||
|
|
||||||
|
output = embeddings(input_ids)
|
||||||
|
|
||||||
|
assert output.shape == (batch_size, seq_len, embed_dim)
|
||||||
|
print("✅ Token embeddings test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_multi_head_attention():
|
||||||
|
"""Test multi-head attention."""
|
||||||
|
from llm.core.multi_head_attention import MultiHeadAttention
|
||||||
|
|
||||||
|
num_heads = 4
|
||||||
|
emb_size = 128
|
||||||
|
head_size = emb_size // num_heads
|
||||||
|
max_seq_len = 256
|
||||||
|
|
||||||
|
attention = MultiHeadAttention(num_heads, emb_size, head_size, max_seq_len)
|
||||||
|
|
||||||
|
# Test forward pass
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
inputs = torch.randn(batch_size, seq_len, emb_size)
|
||||||
|
|
||||||
|
output = attention(inputs)
|
||||||
|
|
||||||
|
assert output.shape == inputs.shape
|
||||||
|
print("✅ Multi-head attention test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_feed_forward():
|
||||||
|
"""Test feed forward network."""
|
||||||
|
from llm.core.feed_forward import FeedForward
|
||||||
|
|
||||||
|
embed_dim = 128
|
||||||
|
|
||||||
|
ff = FeedForward(embed_dim)
|
||||||
|
|
||||||
|
# Test forward pass
|
||||||
|
batch_size, seq_len = 2, 16
|
||||||
|
inputs = torch.randn(batch_size, seq_len, embed_dim)
|
||||||
|
|
||||||
|
output = ff(inputs)
|
||||||
|
|
||||||
|
assert output.shape == inputs.shape
|
||||||
|
print("✅ Feed forward test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_gpt_generation():
|
||||||
|
"""Test GPT text generation."""
|
||||||
|
from llm.models.gpt import GPT
|
||||||
|
|
||||||
|
config = {
|
||||||
|
"vocab_size": 1000,
|
||||||
|
"embed_dim": 128,
|
||||||
|
"num_heads": 4,
|
||||||
|
"num_layers": 2,
|
||||||
|
"max_position_embeddings": 256,
|
||||||
|
"dropout": 0.1
|
||||||
|
}
|
||||||
|
|
||||||
|
model = GPT(config)
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
# Test greedy generation
|
||||||
|
input_ids = torch.randint(0, config["vocab_size"], (1, 5))
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
generated = model.generate(
|
||||||
|
x=input_ids,
|
||||||
|
max_new_tokens=3,
|
||||||
|
do_sample=False
|
||||||
|
)
|
||||||
|
|
||||||
|
assert generated.shape == (1, 8) # 5 initial + 3 new tokens
|
||||||
|
print("✅ GPT generation test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_bpe_tokenizer_save_load():
|
||||||
|
"""Test BPE tokenizer save/load functionality."""
|
||||||
|
from llm.tokenizers import BPETokenizer
|
||||||
|
|
||||||
|
tokenizer = BPETokenizer()
|
||||||
|
|
||||||
|
# Train on simple texts
|
||||||
|
texts = ["hello world", "test save load"]
|
||||||
|
tokenizer.train(
|
||||||
|
texts=texts,
|
||||||
|
vocab_size=30,
|
||||||
|
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
|
||||||
|
)
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
save_path = os.path.join(temp_dir, "test_tokenizer.json")
|
||||||
|
|
||||||
|
# Save tokenizer
|
||||||
|
tokenizer.save(save_path)
|
||||||
|
assert os.path.exists(save_path)
|
||||||
|
|
||||||
|
# Load tokenizer
|
||||||
|
loaded_tokenizer = BPETokenizer.load(save_path)
|
||||||
|
|
||||||
|
# Test that vocab size is the same
|
||||||
|
assert tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size()
|
||||||
|
|
||||||
|
# Test that vocabularies are the same
|
||||||
|
assert tokenizer.get_vocab() == loaded_tokenizer.get_vocab()
|
||||||
|
|
||||||
|
# Test that both can encode/decode (even if tokens differ due to BPE state)
|
||||||
|
text = "hello world"
|
||||||
|
original_tokens = tokenizer.encode(text)
|
||||||
|
loaded_tokens = loaded_tokenizer.encode(text)
|
||||||
|
|
||||||
|
# Both should produce valid token lists
|
||||||
|
assert isinstance(original_tokens, list)
|
||||||
|
assert isinstance(loaded_tokens, list)
|
||||||
|
assert len(original_tokens) > 0
|
||||||
|
assert len(loaded_tokens) > 0
|
||||||
|
|
||||||
|
# Both should be able to decode
|
||||||
|
original_decoded = tokenizer.decode(original_tokens)
|
||||||
|
loaded_decoded = loaded_tokenizer.decode(loaded_tokens)
|
||||||
|
assert isinstance(original_decoded, str)
|
||||||
|
assert isinstance(loaded_decoded, str)
|
||||||
|
|
||||||
|
print("✅ BPE tokenizer save/load test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_gpt_with_tokenizer():
|
||||||
|
"""Test GPT model with tokenizer integration."""
|
||||||
|
from llm.models.gpt import GPT
|
||||||
|
from llm.tokenizers import BPETokenizer
|
||||||
|
|
||||||
|
# Create and train tokenizer
|
||||||
|
tokenizer = BPETokenizer()
|
||||||
|
texts = ["hello world", "test integration"]
|
||||||
|
tokenizer.train(
|
||||||
|
texts=texts,
|
||||||
|
vocab_size=50,
|
||||||
|
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
|
||||||
|
)
|
||||||
|
|
||||||
|
vocab_size = tokenizer.get_vocab_size()
|
||||||
|
|
||||||
|
# Create GPT model with tokenizer's vocab size
|
||||||
|
config = {
|
||||||
|
"vocab_size": vocab_size,
|
||||||
|
"embed_dim": 128,
|
||||||
|
"num_heads": 4,
|
||||||
|
"num_layers": 2,
|
||||||
|
"max_position_embeddings": 256,
|
||||||
|
"dropout": 0.1
|
||||||
|
}
|
||||||
|
|
||||||
|
model = GPT(config)
|
||||||
|
|
||||||
|
# Test with tokenized input
|
||||||
|
text = "hello world"
|
||||||
|
tokens = tokenizer.encode(text, add_special_tokens=False)
|
||||||
|
input_ids = torch.tensor([tokens])
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
logits = model(input_ids)
|
||||||
|
|
||||||
|
assert logits.shape == (1, len(tokens), vocab_size)
|
||||||
|
print("✅ GPT with tokenizer integration test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def run_all_tests():
|
||||||
|
"""Run all basic tests."""
|
||||||
|
print("🧪 Running basic tests for llm library...")
|
||||||
|
|
||||||
|
test_gpt_model_creation()
|
||||||
|
test_bpe_tokenizer_basic()
|
||||||
|
test_token_embeddings()
|
||||||
|
test_multi_head_attention()
|
||||||
|
test_feed_forward()
|
||||||
|
test_gpt_generation()
|
||||||
|
test_bpe_tokenizer_save_load()
|
||||||
|
test_gpt_with_tokenizer()
|
||||||
|
|
||||||
|
print("🎉 All basic tests passed!")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run_all_tests()
|
||||||
58
llm/tests/tokenizers/test_base_tokenizer.py
Normal file
58
llm/tests/tokenizers/test_base_tokenizer.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
"""
|
||||||
|
Tests for base tokenizer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from llm.tokenizers import BaseTokenizer
|
||||||
|
|
||||||
|
|
||||||
|
class ConcreteTokenizer(BaseTokenizer):
|
||||||
|
"""Concrete implementation for testing BaseTokenizer."""
|
||||||
|
|
||||||
|
def train(self, texts: list, vocab_size: int = 1000, **kwargs):
|
||||||
|
"""Dummy implementation for testing."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def encode(self, text: str, **kwargs) -> list:
|
||||||
|
"""Dummy implementation for testing."""
|
||||||
|
return [1, 2, 3]
|
||||||
|
|
||||||
|
def decode(self, tokens: list, **kwargs) -> str:
|
||||||
|
"""Dummy implementation for testing."""
|
||||||
|
return "decoded text"
|
||||||
|
|
||||||
|
|
||||||
|
class TestBaseTokenizer:
|
||||||
|
"""Test cases for BaseTokenizer."""
|
||||||
|
|
||||||
|
def test_initialization(self):
|
||||||
|
"""Test that BaseTokenizer can be initialized through concrete class."""
|
||||||
|
tokenizer = ConcreteTokenizer()
|
||||||
|
assert tokenizer is not None
|
||||||
|
assert tokenizer.vocab == {}
|
||||||
|
assert tokenizer.vocab_size == 0
|
||||||
|
|
||||||
|
def test_encode_implemented(self):
|
||||||
|
"""Test that encode method works in concrete implementation."""
|
||||||
|
tokenizer = ConcreteTokenizer()
|
||||||
|
result = tokenizer.encode("test text")
|
||||||
|
assert result == [1, 2, 3]
|
||||||
|
|
||||||
|
def test_decode_implemented(self):
|
||||||
|
"""Test that decode method works in concrete implementation."""
|
||||||
|
tokenizer = ConcreteTokenizer()
|
||||||
|
result = tokenizer.decode([1, 2, 3])
|
||||||
|
assert result == "decoded text"
|
||||||
|
|
||||||
|
def test_get_vocab_size(self):
|
||||||
|
"""Test that get_vocab_size method works."""
|
||||||
|
tokenizer = ConcreteTokenizer()
|
||||||
|
tokenizer.vocab = {"a": 0, "b": 1, "c": 2}
|
||||||
|
tokenizer.vocab_size = 3
|
||||||
|
assert tokenizer.get_vocab_size() == 3
|
||||||
|
|
||||||
|
def test_get_vocab(self):
|
||||||
|
"""Test that get_vocab method works."""
|
||||||
|
tokenizer = ConcreteTokenizer()
|
||||||
|
tokenizer.vocab = {"a": 0, "b": 1, "c": 2}
|
||||||
|
assert tokenizer.get_vocab() == {"a": 0, "b": 1, "c": 2}
|
||||||
156
llm/tests/tokenizers/test_bpe_tokenizer.py
Normal file
156
llm/tests/tokenizers/test_bpe_tokenizer.py
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
"""
|
||||||
|
Tests for BPE tokenizer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
from llm.tokenizers import BPETokenizer
|
||||||
|
|
||||||
|
|
||||||
|
class TestBPETokenizer:
|
||||||
|
"""Test cases for BPETokenizer."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_texts(self):
|
||||||
|
"""Sample texts for training tokenizer."""
|
||||||
|
return [
|
||||||
|
"Искусственный интеллект",
|
||||||
|
"Нейронные сети",
|
||||||
|
"Машинное обучение",
|
||||||
|
"Глубокое обучение",
|
||||||
|
"Трансформеры"
|
||||||
|
]
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def trained_tokenizer(self, sample_texts):
|
||||||
|
"""Create and train a BPE tokenizer."""
|
||||||
|
tokenizer = BPETokenizer()
|
||||||
|
tokenizer.train(
|
||||||
|
texts=sample_texts,
|
||||||
|
vocab_size=100,
|
||||||
|
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
|
||||||
|
)
|
||||||
|
return tokenizer
|
||||||
|
|
||||||
|
def test_initialization(self):
|
||||||
|
"""Test that BPETokenizer can be initialized."""
|
||||||
|
tokenizer = BPETokenizer()
|
||||||
|
assert tokenizer is not None
|
||||||
|
|
||||||
|
def test_train_tokenizer(self, sample_texts):
|
||||||
|
"""Test that tokenizer can be trained."""
|
||||||
|
tokenizer = BPETokenizer()
|
||||||
|
tokenizer.train(
|
||||||
|
texts=sample_texts,
|
||||||
|
vocab_size=50,
|
||||||
|
special_tokens=["<pad>", "<unk>", "<bos>", "<eos>"]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert tokenizer.get_vocab_size() > 0
|
||||||
|
assert len(tokenizer.get_vocab()) == tokenizer.get_vocab_size()
|
||||||
|
|
||||||
|
def test_encode_decode(self, trained_tokenizer):
|
||||||
|
"""Test encoding and decoding text."""
|
||||||
|
text = "Искусственный интеллект"
|
||||||
|
|
||||||
|
# Encode text
|
||||||
|
tokens = trained_tokenizer.encode(text)
|
||||||
|
assert isinstance(tokens, list)
|
||||||
|
assert len(tokens) > 0
|
||||||
|
assert all(isinstance(token, int) for token in tokens)
|
||||||
|
|
||||||
|
# Decode tokens
|
||||||
|
decoded_text = trained_tokenizer.decode(tokens)
|
||||||
|
assert isinstance(decoded_text, str)
|
||||||
|
# Decoded text should be similar to original (may have special tokens)
|
||||||
|
assert len(decoded_text) > 0
|
||||||
|
|
||||||
|
def test_encode_with_special_tokens(self, trained_tokenizer):
|
||||||
|
"""Test encoding with special tokens."""
|
||||||
|
text = "Нейронные сети"
|
||||||
|
|
||||||
|
# Without special tokens
|
||||||
|
tokens_no_special = trained_tokenizer.encode(text, add_special_tokens=False)
|
||||||
|
|
||||||
|
# With special tokens
|
||||||
|
tokens_with_special = trained_tokenizer.encode(text, add_special_tokens=True)
|
||||||
|
|
||||||
|
# Should have more tokens when special tokens are added
|
||||||
|
assert len(tokens_with_special) >= len(tokens_no_special)
|
||||||
|
|
||||||
|
def test_vocab_size(self, trained_tokenizer):
|
||||||
|
"""Test vocabulary size."""
|
||||||
|
vocab_size = trained_tokenizer.get_vocab_size()
|
||||||
|
assert isinstance(vocab_size, int)
|
||||||
|
assert vocab_size > 0
|
||||||
|
|
||||||
|
vocab = trained_tokenizer.get_vocab()
|
||||||
|
assert isinstance(vocab, dict)
|
||||||
|
assert len(vocab) == vocab_size
|
||||||
|
|
||||||
|
def test_special_tokens(self, trained_tokenizer):
|
||||||
|
"""Test that special tokens are in vocabulary."""
|
||||||
|
vocab = trained_tokenizer.get_vocab()
|
||||||
|
|
||||||
|
# Check that special tokens are in vocabulary
|
||||||
|
special_tokens = ["<pad>", "<unk>", "<bos>", "<eos>"]
|
||||||
|
for token in special_tokens:
|
||||||
|
assert token in vocab
|
||||||
|
assert isinstance(vocab[token], int)
|
||||||
|
|
||||||
|
def test_save_load(self, trained_tokenizer, sample_texts):
|
||||||
|
"""Test saving and loading tokenizer."""
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
save_path = os.path.join(temp_dir, "test_tokenizer.json")
|
||||||
|
|
||||||
|
# Save tokenizer
|
||||||
|
trained_tokenizer.save(save_path)
|
||||||
|
assert os.path.exists(save_path)
|
||||||
|
|
||||||
|
# Load tokenizer
|
||||||
|
loaded_tokenizer = BPETokenizer.load(save_path)
|
||||||
|
assert loaded_tokenizer is not None
|
||||||
|
|
||||||
|
# Check that loaded tokenizer works the same
|
||||||
|
original_vocab = trained_tokenizer.get_vocab()
|
||||||
|
loaded_vocab = loaded_tokenizer.get_vocab()
|
||||||
|
|
||||||
|
assert original_vocab == loaded_vocab
|
||||||
|
assert trained_tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size()
|
||||||
|
|
||||||
|
# Test encoding consistency
|
||||||
|
text = sample_texts[0]
|
||||||
|
original_tokens = trained_tokenizer.encode(text)
|
||||||
|
loaded_tokens = loaded_tokenizer.encode(text)
|
||||||
|
|
||||||
|
assert original_tokens == loaded_tokens
|
||||||
|
|
||||||
|
def test_unknown_tokens(self, trained_tokenizer):
|
||||||
|
"""Test handling of unknown tokens."""
|
||||||
|
# Use text that likely contains unknown subwords
|
||||||
|
text = "xyzabc123" # Random text that shouldn't be in training data
|
||||||
|
|
||||||
|
tokens = trained_tokenizer.encode(text)
|
||||||
|
assert len(tokens) > 0
|
||||||
|
|
||||||
|
# Should be able to decode back (even if it's mostly unk tokens)
|
||||||
|
decoded = trained_tokenizer.decode(tokens)
|
||||||
|
assert isinstance(decoded, str)
|
||||||
|
|
||||||
|
def test_empty_text(self, trained_tokenizer):
|
||||||
|
"""Test encoding and decoding empty text."""
|
||||||
|
tokens = trained_tokenizer.encode("")
|
||||||
|
assert isinstance(tokens, list)
|
||||||
|
|
||||||
|
decoded = trained_tokenizer.decode([])
|
||||||
|
assert decoded == ""
|
||||||
|
|
||||||
|
def test_tokenize_method(self, trained_tokenizer):
|
||||||
|
"""Test the tokenize method."""
|
||||||
|
text = "Искусственный интеллект"
|
||||||
|
tokens = trained_tokenizer.tokenize(text)
|
||||||
|
|
||||||
|
assert isinstance(tokens, list)
|
||||||
|
assert len(tokens) > 0
|
||||||
|
assert all(isinstance(token, str) for token in tokens)
|
||||||
Reference in New Issue
Block a user