From fb74dc7c17e39b4d611128e902f90693ecdf3bcd Mon Sep 17 00:00:00 2001 From: Sergey Penkovsky Date: Sun, 5 Oct 2025 08:11:18 +0300 Subject: [PATCH] test: add comprehensive test suite for LLM components - Add pytest configuration and fixtures - Add tests for core modules: decoder, feed_forward, multi_head_attention - Add tests for positional and token embeddings - Add tests for GPT model - Add tests for tokenizers (base and BPE) - Add basic integration tests --- llm/pytest.ini | 15 + llm/tests/__init__.py | 0 llm/tests/conftest.py | 101 +++++++ llm/tests/core/test_decoder.py | 188 ++++++++++++ llm/tests/core/test_feed_forward.py | 177 ++++++++++++ llm/tests/core/test_multi_head_attention.py | 165 +++++++++++ llm/tests/core/test_positional_embeddings.py | 136 +++++++++ llm/tests/core/test_token_embeddings.py | 107 +++++++ llm/tests/models/test_gpt.py | 288 +++++++++++++++++++ llm/tests/test_basic.py | 262 +++++++++++++++++ llm/tests/tokenizers/test_base_tokenizer.py | 58 ++++ llm/tests/tokenizers/test_bpe_tokenizer.py | 156 ++++++++++ 12 files changed, 1653 insertions(+) create mode 100644 llm/pytest.ini create mode 100644 llm/tests/__init__.py create mode 100644 llm/tests/conftest.py create mode 100644 llm/tests/core/test_decoder.py create mode 100644 llm/tests/core/test_feed_forward.py create mode 100644 llm/tests/core/test_multi_head_attention.py create mode 100644 llm/tests/core/test_positional_embeddings.py create mode 100644 llm/tests/core/test_token_embeddings.py create mode 100644 llm/tests/models/test_gpt.py create mode 100644 llm/tests/test_basic.py create mode 100644 llm/tests/tokenizers/test_base_tokenizer.py create mode 100644 llm/tests/tokenizers/test_bpe_tokenizer.py diff --git a/llm/pytest.ini b/llm/pytest.ini new file mode 100644 index 0000000..6e6a8a9 --- /dev/null +++ b/llm/pytest.ini @@ -0,0 +1,15 @@ +[tool:pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = + --verbose + --tb=short + --strict-markers + --strict-config + --disable-warnings +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + gpu: marks tests that require GPU + integration: marks tests as integration tests diff --git a/llm/tests/__init__.py b/llm/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/llm/tests/conftest.py b/llm/tests/conftest.py new file mode 100644 index 0000000..b3bfbac --- /dev/null +++ b/llm/tests/conftest.py @@ -0,0 +1,101 @@ +""" +Pytest configuration for llm tests. +""" + +import pytest +import torch +import numpy as np + + +@pytest.fixture +def device(): + """Return the device to run tests on.""" + return torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +@pytest.fixture +def batch_size(): + """Return a standard batch size for tests.""" + return 2 + + +@pytest.fixture +def seq_len(): + """Return a standard sequence length for tests.""" + return 64 + + +@pytest.fixture +def vocab_size(): + """Return a standard vocabulary size for tests.""" + return 1000 + + +@pytest.fixture +def embed_dim(): + """Return a standard embedding dimension for tests.""" + return 256 + + +@pytest.fixture +def num_heads(): + """Return a standard number of attention heads.""" + return 4 + + +@pytest.fixture +def num_layers(): + """Return a standard number of layers.""" + return 2 + + +@pytest.fixture +def gpt_config(vocab_size, embed_dim, num_heads, num_layers): + """Return a standard GPT configuration for tests.""" + return { + "vocab_size": vocab_size, + "embed_dim": embed_dim, + "num_heads": num_heads, + "num_layers": num_layers, + "max_position_embeddings": 1024, + "dropout": 0.1 + } + + +@pytest.fixture +def random_inputs(batch_size, seq_len, vocab_size): + """Generate random input tensors for testing.""" + input_ids = torch.randint(0, vocab_size, (batch_size, seq_len)) + return input_ids + +@pytest.fixture +def random_float_inputs(batch_size, seq_len, embed_dim): + """Generate random floating point input tensors for testing feed forward.""" + inputs = torch.randn(batch_size, seq_len, embed_dim) + return inputs + +@pytest.fixture +def random_embeddings(batch_size, seq_len, embed_dim): + """Generate random embedding tensors for testing attention modules.""" + embeddings = torch.randn(batch_size, seq_len, embed_dim) + return embeddings + + +@pytest.fixture +def attention_mask(batch_size, seq_len): + """Generate a random attention mask for testing.""" + mask = torch.ones(batch_size, seq_len) + # Randomly mask some positions + for i in range(batch_size): + mask_positions = torch.randint(1, seq_len, (1,)).item() + mask[i, mask_positions:] = 0 + return mask + + +@pytest.fixture(autouse=True) +def set_random_seed(): + """Set random seeds for reproducible tests.""" + torch.manual_seed(42) + np.random.seed(42) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False diff --git a/llm/tests/core/test_decoder.py b/llm/tests/core/test_decoder.py new file mode 100644 index 0000000..a710632 --- /dev/null +++ b/llm/tests/core/test_decoder.py @@ -0,0 +1,188 @@ +""" +Tests for decoder block. +""" + +import pytest +import torch +from llm.core.decoder import Decoder + + +class TestDecoder: + """Test cases for Decoder.""" + + def test_initialization(self, embed_dim, num_heads): + """Test that Decoder can be initialized.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + assert decoder is not None + + # Check internal components + assert hasattr(decoder, '_heads') + assert hasattr(decoder, '_ff') + assert hasattr(decoder, '_norm1') + assert hasattr(decoder, '_norm2') + + def test_forward_pass(self, embed_dim, num_heads, random_embeddings): + """Test forward pass of Decoder.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + # Forward pass + output = decoder(random_embeddings) + + # Check output shape + assert output.shape == random_embeddings.shape + assert isinstance(output, torch.Tensor) + + def test_forward_with_causal_mask(self, embed_dim, num_heads, random_embeddings): + """Test forward pass with causal mask.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + batch_size, seq_len = random_embeddings.shape[:2] + # Create causal mask + mask = torch.tril(torch.ones(seq_len, seq_len)) + + # Forward pass with causal mask + output = decoder(random_embeddings, mask=mask) + + # Check output shape + assert output.shape == random_embeddings.shape + + def test_residual_connections(self, embed_dim, num_heads, random_embeddings): + """Test that residual connections are properly applied.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + output = decoder(random_embeddings) + + # With residual connections and layer norm, the output shouldn't be + # too different from input (in terms of scale/distribution) + input_norm = random_embeddings.norm(dim=-1).mean() + output_norm = output.norm(dim=-1).mean() + + # Norms should be of similar magnitude (not exact due to transformations) + assert 0.1 < (output_norm / input_norm) < 10.0 + + def test_layer_norm(self, embed_dim, num_heads, random_embeddings): + """Test that layer normalization is applied.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + output = decoder(random_embeddings) + + # Check that output has reasonable statistics (due to layer norm) + # Mean should be close to 0, std close to 1 for each sequence position + output_mean = output.mean(dim=-1) + output_std = output.std(dim=-1) + + # These are approximate checks since the data goes through multiple transformations + assert torch.allclose(output_mean, torch.zeros_like(output_mean), atol=1.0) + assert torch.allclose(output_std, torch.ones_like(output_std), atol=2.0) + + def test_gradient_flow(self, embed_dim, num_heads, random_embeddings): + """Test that gradients flow through Decoder.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + # Forward pass + output = decoder(random_embeddings) + + # Create a dummy loss and backward pass + loss = output.sum() + loss.backward() + + # Check that gradients are computed for learnable parameters + # in attention and feed forward components + assert decoder._heads._layer.weight.grad is not None + assert decoder._ff._layer1.weight.grad is not None + assert decoder._norm1.weight.grad is not None + assert decoder._norm2.weight.grad is not None + + def test_device_consistency(self, embed_dim, num_heads, random_embeddings, device): + """Test that Decoder works on correct device.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len).to(device) + inputs = random_embeddings.to(device) + + # Forward pass + output = decoder(inputs) + + # Check device consistency + assert output.device == device + assert decoder._heads._layer.weight.device == device + + def test_different_configurations(self): + """Test Decoder with different configurations.""" + test_cases = [ + (64, 2), # embed_dim=64, num_heads=2 + (128, 4), # embed_dim=128, num_heads=4 + (256, 8), # embed_dim=256, num_heads=8 + ] + + for embed_dim, num_heads in test_cases: + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + batch_size, seq_len = 2, 16 + inputs = torch.randn(batch_size, seq_len, embed_dim) + + output = decoder(inputs) + + assert output.shape == inputs.shape + + @pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)]) + def test_different_input_shapes(self, embed_dim, num_heads, batch_size, seq_len): + """Test Decoder with different input shapes.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + inputs = torch.randn(batch_size, seq_len, embed_dim) + output = decoder(inputs) + + assert output.shape == (batch_size, seq_len, embed_dim) + + def test_training_vs_evaluation(self, embed_dim, num_heads, random_embeddings): + """Test that Decoder behaves differently in train vs eval mode.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len, dropout=0.5) + + # Training mode + decoder.train() + output_train = decoder(random_embeddings) + + # Evaluation mode + decoder.eval() + output_eval = decoder(random_embeddings) + + # Outputs should be different due to dropout + assert not torch.allclose(output_train, output_eval) + + def test_parameter_initialization(self, embed_dim, num_heads): + """Test that parameters are properly initialized.""" + head_size = embed_dim // num_heads + max_seq_len = 1024 + decoder = Decoder(num_heads=num_heads, emb_size=embed_dim, head_size=head_size, max_seq_len=max_seq_len) + + # Check that various components have non-zero parameters + assert not torch.allclose( + decoder._heads._layer.weight, + torch.zeros_like(decoder._heads._layer.weight) + ) + assert not torch.allclose( + decoder._ff._layer1.weight, + torch.zeros_like(decoder._ff._layer1.weight) + ) + assert not torch.allclose( + decoder._norm1.weight, + torch.zeros_like(decoder._norm1.weight) + ) diff --git a/llm/tests/core/test_feed_forward.py b/llm/tests/core/test_feed_forward.py new file mode 100644 index 0000000..f894331 --- /dev/null +++ b/llm/tests/core/test_feed_forward.py @@ -0,0 +1,177 @@ +""" +Tests for feed forward network. +""" + +import pytest +import torch +import torch.nn as nn +from llm.core.feed_forward import FeedForward + + +class TestFeedForward: + """Test cases for FeedForward.""" + + def test_initialization(self, embed_dim): + """Test that FeedForward can be initialized.""" + ff = FeedForward(embed_dim) + assert ff is not None + + # Check internal layers + assert hasattr(ff, '_layer1') + assert hasattr(ff, '_layer2') + assert hasattr(ff, '_relu') + assert hasattr(ff, '_dropout') + + # Check layer dimensions + expected_hidden_dim = embed_dim * 4 # Default expansion factor + assert ff._layer1.weight.shape == (expected_hidden_dim, embed_dim) + assert ff._layer2.weight.shape == (embed_dim, expected_hidden_dim) + + def test_forward_pass(self, embed_dim, random_float_inputs): + """Test forward pass of FeedForward.""" + ff = FeedForward(embed_dim) + + # Forward pass + output = ff(random_float_inputs) + + # Check output shape + assert output.shape == random_float_inputs.shape + assert isinstance(output, torch.Tensor) + + def test_custom_hidden_dim(self, embed_dim): + """Test FeedForward with custom hidden dimension.""" + # FeedForward doesn't support custom hidden_dim in current implementation + # This test is not applicable + ff = FeedForward(embed_dim) + + # Check layer dimensions (fixed 4x expansion) + expected_hidden_dim = embed_dim * 4 + assert ff._layer1.weight.shape == (expected_hidden_dim, embed_dim) + assert ff._layer2.weight.shape == (embed_dim, expected_hidden_dim) + + def test_dropout(self, embed_dim, random_float_inputs): + """Test that dropout is applied during training.""" + ff = FeedForward(embed_dim, dropout=0.5) + ff.train() # Set to training mode + + output = ff(random_float_inputs) + + # In training mode with dropout, some values should be zeroed + # This is probabilistic, so we can't assert exact zeros, + # but we can check the structure is preserved + assert output.shape == random_float_inputs.shape + + def test_no_dropout_in_eval(self, embed_dim, random_float_inputs): + """Test that dropout is not applied during evaluation.""" + ff = FeedForward(embed_dim, dropout=0.5) + ff.eval() # Set to evaluation mode + + # Run forward pass multiple times - outputs should be identical + output1 = ff(random_float_inputs) + output2 = ff(random_float_inputs) + + assert torch.allclose(output1, output2) + + def test_activation_function(self, embed_dim, random_float_inputs): + """Test that activation function is applied.""" + ff = FeedForward(embed_dim) + + # Manually compute expected output without dropout for deterministic comparison + hidden = ff._layer1(random_float_inputs) + activated = ff._relu(hidden) + expected_output = ff._layer2(activated) + + # Compare with forward pass in eval mode (no dropout) + ff.eval() + actual_output = ff(random_float_inputs) + + assert torch.allclose(actual_output, expected_output, rtol=1e-4) + + def test_gradient_flow(self, embed_dim, random_float_inputs): + """Test that gradients flow through FeedForward.""" + ff = FeedForward(embed_dim) + + # Forward pass + output = ff(random_float_inputs) + + # Create a dummy loss and backward pass + loss = output.sum() + loss.backward() + + # Check that gradients are computed for learnable parameters + assert ff._layer1.weight.grad is not None + assert ff._layer2.weight.grad is not None + assert not torch.allclose(ff._layer1.weight.grad, + torch.zeros_like(ff._layer1.weight.grad)) + assert not torch.allclose(ff._layer2.weight.grad, + torch.zeros_like(ff._layer2.weight.grad)) + + def test_device_consistency(self, embed_dim, random_float_inputs, device): + """Test that FeedForward works on correct device.""" + ff = FeedForward(embed_dim).to(device) + inputs = random_float_inputs.to(device) + + # Forward pass + output = ff(inputs) + + # Check device consistency + assert output.device == device + assert ff._layer1.weight.device == device + assert ff._layer2.weight.device == device + + def test_different_embed_dims(self): + """Test FeedForward with different embedding dimensions.""" + test_cases = [64, 128, 256, 512] + + for embed_dim in test_cases: + ff = FeedForward(embed_dim) + batch_size, seq_len = 2, 16 + inputs = torch.randn(batch_size, seq_len, embed_dim) + + output = ff(inputs) + + assert output.shape == inputs.shape + + @pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)]) + def test_different_input_shapes(self, embed_dim, batch_size, seq_len): + """Test FeedForward with different input shapes.""" + ff = FeedForward(embed_dim) + + inputs = torch.randn(batch_size, seq_len, embed_dim) + output = ff(inputs) + + assert output.shape == (batch_size, seq_len, embed_dim) + + def test_non_linearity(self, embed_dim, random_float_inputs): + """Test that FeedForward introduces non-linearity.""" + ff = FeedForward(embed_dim) + + # Create a simple linear transformation for comparison + linear_layer = nn.Linear(embed_dim, embed_dim) + + # Copy weights to make comparison fair + with torch.no_grad(): + linear_layer.weight.copy_(ff._layer2.weight @ ff._layer1.weight) + if linear_layer.bias is not None: + linear_layer.bias.zero_() + + linear_output = linear_layer(random_float_inputs) + ff_output = ff(random_float_inputs) + + # FeedForward output should be different from pure linear transformation + # due to activation function + assert not torch.allclose(ff_output, linear_output, rtol=1e-4) + + def test_parameter_initialization(self, embed_dim): + """Test that parameters are properly initialized.""" + ff = FeedForward(embed_dim) + + # Check that weights are not all zeros + assert not torch.allclose(ff._layer1.weight, torch.zeros_like(ff._layer1.weight)) + assert not torch.allclose(ff._layer2.weight, torch.zeros_like(ff._layer2.weight)) + + # Check that biases are not all zeros (they should be initialized with some values) + if ff._layer1.bias is not None: + assert not torch.allclose(ff._layer1.bias, torch.zeros_like(ff._layer1.bias)) + if ff._layer2.bias is not None: + assert not torch.allclose(ff._layer2.bias, torch.zeros_like(ff._layer2.bias)) diff --git a/llm/tests/core/test_multi_head_attention.py b/llm/tests/core/test_multi_head_attention.py new file mode 100644 index 0000000..9134879 --- /dev/null +++ b/llm/tests/core/test_multi_head_attention.py @@ -0,0 +1,165 @@ +""" +Tests for multi-head attention. +""" + +import pytest +import torch +from llm.core.multi_head_attention import MultiHeadAttention + + +class TestMultiHeadAttention: + """Test cases for MultiHeadAttention.""" + + def test_initialization(self, embed_dim, num_heads): + """Test that MultiHeadAttention can be initialized.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + assert attention is not None + + # Check internal attributes + assert len(attention._heads) == num_heads + assert attention._layer.in_features == embed_dim + assert attention._layer.out_features == embed_dim + + def test_forward_pass(self, embed_dim, num_heads, random_embeddings): + """Test forward pass of MultiHeadAttention.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + # Forward pass + output = attention(random_embeddings) + + # Check output shape + assert output.shape == random_embeddings.shape + assert isinstance(output, torch.Tensor) + + def test_forward_with_mask(self, embed_dim, num_heads, random_embeddings): + """Test forward pass with attention mask.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + # Create a simple mask + seq_len = random_embeddings.shape[1] + mask = torch.tril(torch.ones(seq_len, seq_len)) # Causal mask + + # Forward pass with mask + output = attention(random_embeddings, mask=mask) + + # Check output shape + assert output.shape == random_embeddings.shape + + def test_causal_mask(self, embed_dim, num_heads, random_embeddings): + """Test that causal mask prevents attending to future positions.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + # Create causal mask + seq_len = random_embeddings.shape[1] + causal_mask = torch.tril(torch.ones(seq_len, seq_len)) + + # Forward pass with causal mask + output = attention(random_embeddings, mask=causal_mask) + + # Check output shape + assert output.shape == random_embeddings.shape + + def test_attention_weights_normalization(self, embed_dim, num_heads, random_embeddings): + """Test that attention weights are properly normalized.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + # Forward pass + output = attention(random_embeddings) + + # Check output shape + assert output.shape == random_embeddings.shape + + def test_gradient_flow(self, embed_dim, num_heads, random_embeddings): + """Test that gradients flow through MultiHeadAttention.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + # Forward pass + output = attention(random_embeddings) + + # Create a dummy loss and backward pass + loss = output.sum() + loss.backward() + + # Check that gradients are computed for learnable parameters + assert attention._layer.weight.grad is not None + if len(attention._heads) > 0: + assert attention._heads[0]._q.weight.grad is not None + + def test_device_consistency(self, embed_dim, num_heads, random_embeddings, device): + """Test that MultiHeadAttention works on correct device.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024).to(device) + inputs = random_embeddings.to(device) + + # Forward pass + output = attention(inputs) + + # Check device consistency + assert output.device == device + assert attention._layer.weight.device == device + + def test_different_embed_dim_and_heads(self): + """Test MultiHeadAttention with different embed_dim and num_heads combinations.""" + test_cases = [ + (64, 2), # embed_dim=64, num_heads=2 + (128, 4), # embed_dim=128, num_heads=4 + (256, 8), # embed_dim=256, num_heads=8 + (512, 16), # embed_dim=512, num_heads=16 + ] + + for embed_dim, num_heads in test_cases: + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + batch_size, seq_len = 2, 16 + inputs = torch.randn(batch_size, seq_len, embed_dim) + + output = attention(inputs) + + assert output.shape == inputs.shape + + def test_attention_output_range(self, embed_dim, num_heads, random_embeddings): + """Test that attention output is in reasonable range.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + output = attention(random_embeddings) + + # Output shouldn't have extreme values + assert output.abs().max() < 100 # Reasonable upper bound + + @pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)]) + def test_different_input_shapes(self, embed_dim, num_heads, batch_size, seq_len): + """Test MultiHeadAttention with different input shapes.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024) + + inputs = torch.randn(batch_size, seq_len, embed_dim) + output = attention(inputs) + + assert output.shape == (batch_size, seq_len, embed_dim) + + def test_parameter_sharing(self, embed_dim, num_heads): + """Test that parameters are properly shared across the sequence.""" + head_size = embed_dim // num_heads + attention = MultiHeadAttention(num_heads, embed_dim, head_size, max_seq_len=1024, dropout=0.0) # No dropout for deterministic test + + # Create two identical sequences + seq_len = 10 + base_sequence = torch.randn(1, seq_len, embed_dim) + identical_sequence = base_sequence.clone() + + # Set to eval mode to disable dropout + attention.eval() + + with torch.no_grad(): + output1 = attention(base_sequence) + output2 = attention(identical_sequence) + + # With identical inputs and same parameters, outputs should be identical + assert torch.allclose(output1, output2, rtol=1e-5) diff --git a/llm/tests/core/test_positional_embeddings.py b/llm/tests/core/test_positional_embeddings.py new file mode 100644 index 0000000..a31df24 --- /dev/null +++ b/llm/tests/core/test_positional_embeddings.py @@ -0,0 +1,136 @@ +""" +Tests for positional embeddings. +""" + +import pytest +import torch +import math +from llm.core.positional_embeddings import PositionalEmbeddings + + +class TestPositionalEmbeddings: + """Test cases for PositionalEmbeddings.""" + + def test_initialization(self, embed_dim): + """Test that PositionalEmbeddings can be initialized.""" + max_seq_len = 1024 + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + assert embeddings is not None + + # Check that positional embeddings are created + assert hasattr(embeddings, 'embedding') + assert embeddings.embedding.weight.shape == (max_seq_len, embed_dim) + + def test_forward_pass(self, embed_dim): + """Test forward pass of PositionalEmbeddings.""" + max_seq_len = 1024 + seq_len = 64 + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + + # Forward pass - takes sequence length, not input tensor + output = embeddings(seq_len) + + # Check output shape + expected_shape = (seq_len, embed_dim) + assert output.shape == expected_shape + assert isinstance(output, torch.Tensor) + + def test_positional_encoding_values(self, embed_dim): + """Test that positional encoding values are computed correctly.""" + max_seq_len = 10 + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + + # Get embeddings for all positions + pe = embeddings(max_seq_len) # Shape: [max_seq_len, embed_dim] + + # Check that different positions have different embeddings + # (since these are learnable embeddings, not fixed sine/cosine) + for pos in range(max_seq_len): + for i in range(pos + 1, max_seq_len): + assert not torch.allclose(pe[pos], pe[i], rtol=1e-4) + + def test_different_sequence_lengths(self, embed_dim): + """Test PositionalEmbeddings with different sequence lengths.""" + test_cases = [ + (10, 5), # seq_len < max_seq_len + (10, 10), # seq_len == max_seq_len + ] + + for max_seq_len, seq_len in test_cases: + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + + # Get embeddings for specific sequence length + output = embeddings(seq_len) + + # Output should have shape [seq_len, embed_dim] + assert output.shape == (seq_len, embed_dim) + + def test_gradient_flow(self, embed_dim): + """Test that gradients flow through PositionalEmbeddings.""" + max_seq_len = 64 + seq_len = 32 + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + + # Forward pass + output = embeddings(seq_len) + + # Create a dummy loss and backward pass + loss = output.sum() + loss.backward() + + # Positional embeddings should have gradients (they're learnable) + assert embeddings.embedding.weight.grad is not None + assert not torch.allclose(embeddings.embedding.weight.grad, + torch.zeros_like(embeddings.embedding.weight.grad)) + + def test_device_consistency(self, embed_dim, device): + """Test that PositionalEmbeddings works on correct device.""" + max_seq_len = 64 + seq_len = 32 + embeddings = PositionalEmbeddings(max_seq_len, embed_dim).to(device) + + # Forward pass + output = embeddings(seq_len) + + # Check device consistency + assert output.device == device + assert embeddings.embedding.weight.device == device + + def test_reproducibility(self, embed_dim): + """Test that positional embeddings are reproducible.""" + max_seq_len = 100 + embeddings1 = PositionalEmbeddings(max_seq_len, embed_dim) + embeddings2 = PositionalEmbeddings(max_seq_len, embed_dim) + + # Different instances should have different embeddings (random initialization) + assert not torch.allclose(embeddings1.embedding.weight, embeddings2.embedding.weight) + + # But same instance should produce same output for same input + seq_len = 50 + output1 = embeddings1(seq_len) + output2 = embeddings1(seq_len) # Same instance, same input + assert torch.allclose(output1, output2) + + def test_positional_pattern(self, embed_dim): + """Test that positional embeddings create a meaningful pattern.""" + max_seq_len = 50 + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + pe = embeddings(max_seq_len) # Get all positional embeddings + + # Check that different positions have different embeddings + # (with high probability due to random initialization) + assert not torch.allclose(pe[0], pe[1], rtol=1e-4) + assert not torch.allclose(pe[10], pe[20], rtol=1e-4) + + @pytest.mark.parametrize("max_seq_len,seq_len,embed_dim", [ + (64, 10, 64), + (128, 50, 128), + (256, 100, 256), + ]) + def test_different_configurations(self, max_seq_len, seq_len, embed_dim): + """Test PositionalEmbeddings with different configurations.""" + embeddings = PositionalEmbeddings(max_seq_len, embed_dim) + + output = embeddings(seq_len) + + assert output.shape == (seq_len, embed_dim) diff --git a/llm/tests/core/test_token_embeddings.py b/llm/tests/core/test_token_embeddings.py new file mode 100644 index 0000000..c613aae --- /dev/null +++ b/llm/tests/core/test_token_embeddings.py @@ -0,0 +1,107 @@ +""" +Tests for token embeddings. +""" + +import pytest +import torch +from llm.core.token_embeddings import TokenEmbeddings + + +class TestTokenEmbeddings: + """Test cases for TokenEmbeddings.""" + + def test_initialization(self, vocab_size, embed_dim): + """Test that TokenEmbeddings can be initialized.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim) + assert embeddings is not None + + # Check embedding layer + assert hasattr(embeddings, '_embedding') + assert embeddings._embedding.weight.shape == (vocab_size, embed_dim) + + def test_forward_pass(self, vocab_size, embed_dim, random_inputs): + """Test forward pass of TokenEmbeddings.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim) + + # Forward pass + output = embeddings(random_inputs) + + # Check output shape + assert output.shape == (random_inputs.shape[0], random_inputs.shape[1], embed_dim) + assert isinstance(output, torch.Tensor) + + def test_embedding_weights(self, vocab_size, embed_dim): + """Test that embedding weights are properly initialized.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim) + + weights = embeddings._embedding.weight + assert weights.requires_grad is True + + # Check that weights are not all zeros + assert not torch.allclose(weights, torch.zeros_like(weights)) + + def test_different_vocab_sizes(self): + """Test TokenEmbeddings with different vocabulary sizes.""" + test_cases = [ + (100, 128), + (1000, 256), + (50000, 512) + ] + + for vocab_size, embed_dim in test_cases: + embeddings = TokenEmbeddings(vocab_size, embed_dim) + assert embeddings._embedding.weight.shape == (vocab_size, embed_dim) + + def test_gradient_flow(self, vocab_size, embed_dim, random_inputs): + """Test that gradients flow through TokenEmbeddings.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim) + + # Forward pass + output = embeddings(random_inputs) + + # Create a dummy loss and backward pass + loss = output.sum() + loss.backward() + + # Check that gradients are computed + assert embeddings._embedding.weight.grad is not None + assert not torch.allclose(embeddings._embedding.weight.grad, + torch.zeros_like(embeddings._embedding.weight.grad)) + + def test_device_consistency(self, vocab_size, embed_dim, random_inputs, device): + """Test that TokenEmbeddings works on correct device.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim).to(device) + inputs = random_inputs.to(device) + + # Forward pass + output = embeddings(inputs) + + # Check device consistency + assert output.device == device + assert embeddings._embedding.weight.device == device + + def test_embedding_lookup(self, vocab_size, embed_dim): + """Test specific embedding lookups.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim) + + # Test lookup for specific tokens + test_tokens = torch.tensor([[0, 1, 2], [vocab_size - 1, vocab_size - 2, vocab_size - 3]]) + + output = embeddings(test_tokens) + + # Check shape + assert output.shape == (2, 3, embed_dim) + + # Check that different tokens have different embeddings + # (with high probability due to random initialization) + assert not torch.allclose(output[0, 0], output[0, 1], rtol=1e-4) + + @pytest.mark.parametrize("batch_size,seq_len", [(1, 1), (2, 10), (8, 64)]) + def test_different_input_shapes(self, vocab_size, embed_dim, batch_size, seq_len): + """Test TokenEmbeddings with different input shapes.""" + embeddings = TokenEmbeddings(vocab_size, embed_dim) + + inputs = torch.randint(0, vocab_size, (batch_size, seq_len)) + output = embeddings(inputs) + + assert output.shape == (batch_size, seq_len, embed_dim) diff --git a/llm/tests/models/test_gpt.py b/llm/tests/models/test_gpt.py new file mode 100644 index 0000000..7ae2d9d --- /dev/null +++ b/llm/tests/models/test_gpt.py @@ -0,0 +1,288 @@ +""" +Tests for GPT model. +""" + +import pytest +import torch +from llm.models.gpt import GPT + + +class TestGPT: + """Test cases for GPT model.""" + + def test_initialization(self, gpt_config): + """Test that GPT can be initialized.""" + model = GPT(gpt_config) + assert model is not None + + # Check that model has required components + assert hasattr(model, '_token_embeddings') + assert hasattr(model, '_position_embeddings') + assert hasattr(model, '_decoders') + assert hasattr(model, '_linear') + assert hasattr(model, '_dropout') + + # Check number of decoder layers + assert len(model._decoders) == gpt_config['num_layers'] + + def test_forward_pass(self, gpt_config, random_inputs): + """Test forward pass of GPT.""" + model = GPT(gpt_config) + + # Forward pass + logits = model(random_inputs) + + # Check output shape + batch_size, seq_len = random_inputs.shape + vocab_size = gpt_config['vocab_size'] + assert logits.shape == (batch_size, seq_len, vocab_size) + assert isinstance(logits, torch.Tensor) + + def test_forward_with_attention_mask(self, gpt_config, random_inputs, attention_mask): + """Test forward pass with attention mask.""" + model = GPT(gpt_config) + + # Forward pass with mask + logits = model(random_inputs, attention_mask=attention_mask) + + # Check output shape + batch_size, seq_len = random_inputs.shape + vocab_size = gpt_config['vocab_size'] + assert logits.shape == (batch_size, seq_len, vocab_size) + + def test_generate_text(self, gpt_config): + """Test text generation.""" + model = GPT(gpt_config) + model.eval() # Set to evaluation mode for generation + + # Create initial input + batch_size = 2 + initial_seq_len = 5 + input_ids = torch.randint(0, gpt_config['vocab_size'], (batch_size, initial_seq_len)) + + # Generate text + with torch.no_grad(): + generated = model.generate( + x=input_ids, + max_new_tokens=10, + do_sample=False # Use greedy for deterministic testing + ) + + # Check output shape + expected_seq_len = initial_seq_len + 10 + assert generated.shape == (batch_size, expected_seq_len) + + # Check that initial sequence is preserved + assert torch.allclose(generated[:, :initial_seq_len], input_ids) + + def test_generate_with_temperature(self, gpt_config): + """Test text generation with temperature sampling.""" + model = GPT(gpt_config) + model.eval() + + # Create initial input + input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3)) + + # Generate with temperature + with torch.no_grad(): + generated = model.generate( + x=input_ids, + max_new_tokens=5, + do_sample=True, + temperature=0.8 + ) + + assert generated.shape == (1, 8) # 3 initial + 5 new tokens + + def test_generate_with_top_k(self, gpt_config): + """Test text generation with top-k sampling.""" + model = GPT(gpt_config) + model.eval() + + # Create initial input + input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3)) + + # Generate with top-k + with torch.no_grad(): + generated = model.generate( + x=input_ids, + max_new_tokens=5, + do_sample=True, + top_k=10 + ) + + assert generated.shape == (1, 8) + + def test_generate_with_top_p(self, gpt_config): + """Test text generation with top-p (nucleus) sampling.""" + model = GPT(gpt_config) + model.eval() + + # Create initial input + input_ids = torch.randint(0, gpt_config['vocab_size'], (1, 3)) + + # Generate with top-p + with torch.no_grad(): + generated = model.generate( + x=input_ids, + max_new_tokens=5, + do_sample=True, + top_p=0.9 + ) + + assert generated.shape == (1, 8) + + def test_gradient_flow(self, gpt_config, random_inputs): + """Test that gradients flow through GPT.""" + model = GPT(gpt_config) + + # Forward pass + logits = model(random_inputs) + + # Create a dummy loss and backward pass + targets = torch.randint(0, gpt_config['vocab_size'], random_inputs.shape) + loss = torch.nn.functional.cross_entropy( + logits.view(-1, logits.size(-1)), + targets.view(-1) + ) + loss.backward() + + # Check that gradients are computed for various components + assert model._token_embeddings._embedding.weight.grad is not None + assert model._linear.weight.grad is not None + if len(model._decoders) > 0: + assert model._decoders[0]._heads._heads[0]._q.weight.grad is not None + + def test_device_consistency(self, gpt_config, random_inputs, device): + """Test that GPT works on correct device.""" + model = GPT(gpt_config).to(device) + inputs = random_inputs.to(device) + + # Forward pass + logits = model(inputs) + + # Check device consistency + assert logits.device == device + assert model._token_embeddings._embedding.weight.device == device + + def test_different_configurations(self): + """Test GPT with different configurations.""" + test_configs = [ + { + "vocab_size": 1000, + "embed_dim": 128, + "num_heads": 2, + "num_layers": 2, + "max_position_embeddings": 256, + "dropout": 0.1 + }, + { + "vocab_size": 5000, + "embed_dim": 256, + "num_heads": 4, + "num_layers": 4, + "max_position_embeddings": 512, + "dropout": 0.1 + }, + { + "vocab_size": 10000, + "embed_dim": 512, + "num_heads": 8, + "num_layers": 6, + "max_position_embeddings": 1024, + "dropout": 0.1 + } + ] + + for config in test_configs: + model = GPT(config) + batch_size, seq_len = 2, 16 + inputs = torch.randint(0, config['vocab_size'], (batch_size, seq_len)) + + logits = model(inputs) + + expected_shape = (batch_size, seq_len, config['vocab_size']) + assert logits.shape == expected_shape + + @pytest.mark.parametrize("batch_size,seq_len", [(1, 8), (2, 16), (4, 32)]) + def test_different_input_shapes(self, gpt_config, batch_size, seq_len): + """Test GPT with different input shapes.""" + model = GPT(gpt_config) + + inputs = torch.randint(0, gpt_config['vocab_size'], (batch_size, seq_len)) + logits = model(inputs) + + expected_shape = (batch_size, seq_len, gpt_config['vocab_size']) + assert logits.shape == expected_shape + + def test_training_vs_evaluation(self, gpt_config, random_inputs): + """Test that GPT behaves differently in train vs eval mode.""" + model = GPT(gpt_config) + + # Training mode + model.train() + output_train = model(random_inputs) + + # Evaluation mode + model.eval() + output_eval = model(random_inputs) + + # Outputs should be different due to dropout + assert not torch.allclose(output_train, output_eval) + + def test_parameter_count(self, gpt_config): + """Test that GPT has reasonable number of parameters.""" + model = GPT(gpt_config) + + total_params = sum(p.numel() for p in model.parameters()) + + # For a small GPT model, parameters should be in reasonable range + vocab_size = gpt_config['vocab_size'] + embed_dim = gpt_config['embed_dim'] + num_layers = gpt_config['num_layers'] + num_heads = gpt_config['num_heads'] + + # Rough estimate: token_embeddings + output_layer + (attention + ff) * layers + expected_min = vocab_size * embed_dim * 2 # embeddings and output + expected_max = expected_min * 10 # Allow for decoder parameters + + assert expected_min < total_params < expected_max + + def test_causal_attention(self, gpt_config): + """Test that GPT uses causal attention during generation.""" + model = GPT(gpt_config) + model.eval() + + # Create input with known pattern + input_ids = torch.tensor([[1, 2, 3]]).long() + + with torch.no_grad(): + # Get logits for next token prediction + logits = model(input_ids) + + # The model should only attend to previous tokens (causal) + # We can't directly test attention masks in the public API, + # but we can verify the generation works correctly + + generated = model.generate( + x=input_ids, + max_new_tokens=3, + do_sample=False + ) + + # Generated sequence should be longer than input + assert generated.shape[1] == input_ids.shape[1] + 3 + + def test_output_distribution(self, gpt_config, random_inputs): + """Test that GPT output has proper distribution.""" + model = GPT(gpt_config) + + logits = model(random_inputs) + + # Logits should not have extreme values + assert logits.abs().max() < 100 + + # Softmax should produce valid probabilities + probs = torch.softmax(logits, dim=-1) + assert torch.allclose(probs.sum(dim=-1), torch.ones_like(probs.sum(dim=-1))) + assert (probs >= 0).all() and (probs <= 1).all() diff --git a/llm/tests/test_basic.py b/llm/tests/test_basic.py new file mode 100644 index 0000000..1bc2360 --- /dev/null +++ b/llm/tests/test_basic.py @@ -0,0 +1,262 @@ +""" +Basic tests for llm library components. +""" + +import pytest +import torch +import tempfile +import os + + +def test_gpt_model_creation(): + """Test that GPT model can be created and forward pass works.""" + from llm.models.gpt import GPT + + config = { + "vocab_size": 1000, + "embed_dim": 128, + "num_heads": 4, + "num_layers": 2, + "max_position_embeddings": 256, + "dropout": 0.1 + } + + model = GPT(config) + + # Test forward pass + batch_size, seq_len = 2, 16 + input_ids = torch.randint(0, config["vocab_size"], (batch_size, seq_len)) + + with torch.no_grad(): + logits = model(input_ids) + + assert logits.shape == (batch_size, seq_len, config["vocab_size"]) + print("✅ GPT model creation and forward pass test passed") + + +def test_bpe_tokenizer_basic(): + """Test basic BPE tokenizer functionality.""" + from llm.tokenizers import BPETokenizer + + tokenizer = BPETokenizer() + + # Train on simple texts + texts = [ + "hello world", + "test tokenization", + "simple example" + ] + + tokenizer.train( + texts=texts, + vocab_size=50, + special_tokens=["", "", "", ""] + ) + + # Test encoding/decoding + text = "hello world" + tokens = tokenizer.encode(text) + decoded = tokenizer.decode(tokens) + + assert isinstance(tokens, list) + assert isinstance(decoded, str) + assert len(tokens) > 0 + print("✅ BPE tokenizer basic test passed") + + +def test_token_embeddings(): + """Test token embeddings.""" + from llm.core.token_embeddings import TokenEmbeddings + + vocab_size = 1000 + embed_dim = 128 + + embeddings = TokenEmbeddings(vocab_size, embed_dim) + + # Test forward pass + batch_size, seq_len = 2, 16 + input_ids = torch.randint(0, vocab_size, (batch_size, seq_len)) + + output = embeddings(input_ids) + + assert output.shape == (batch_size, seq_len, embed_dim) + print("✅ Token embeddings test passed") + + +def test_multi_head_attention(): + """Test multi-head attention.""" + from llm.core.multi_head_attention import MultiHeadAttention + + num_heads = 4 + emb_size = 128 + head_size = emb_size // num_heads + max_seq_len = 256 + + attention = MultiHeadAttention(num_heads, emb_size, head_size, max_seq_len) + + # Test forward pass + batch_size, seq_len = 2, 16 + inputs = torch.randn(batch_size, seq_len, emb_size) + + output = attention(inputs) + + assert output.shape == inputs.shape + print("✅ Multi-head attention test passed") + + +def test_feed_forward(): + """Test feed forward network.""" + from llm.core.feed_forward import FeedForward + + embed_dim = 128 + + ff = FeedForward(embed_dim) + + # Test forward pass + batch_size, seq_len = 2, 16 + inputs = torch.randn(batch_size, seq_len, embed_dim) + + output = ff(inputs) + + assert output.shape == inputs.shape + print("✅ Feed forward test passed") + + +def test_gpt_generation(): + """Test GPT text generation.""" + from llm.models.gpt import GPT + + config = { + "vocab_size": 1000, + "embed_dim": 128, + "num_heads": 4, + "num_layers": 2, + "max_position_embeddings": 256, + "dropout": 0.1 + } + + model = GPT(config) + model.eval() + + # Test greedy generation + input_ids = torch.randint(0, config["vocab_size"], (1, 5)) + + with torch.no_grad(): + generated = model.generate( + x=input_ids, + max_new_tokens=3, + do_sample=False + ) + + assert generated.shape == (1, 8) # 5 initial + 3 new tokens + print("✅ GPT generation test passed") + + +def test_bpe_tokenizer_save_load(): + """Test BPE tokenizer save/load functionality.""" + from llm.tokenizers import BPETokenizer + + tokenizer = BPETokenizer() + + # Train on simple texts + texts = ["hello world", "test save load"] + tokenizer.train( + texts=texts, + vocab_size=30, + special_tokens=["", "", "", ""] + ) + + with tempfile.TemporaryDirectory() as temp_dir: + save_path = os.path.join(temp_dir, "test_tokenizer.json") + + # Save tokenizer + tokenizer.save(save_path) + assert os.path.exists(save_path) + + # Load tokenizer + loaded_tokenizer = BPETokenizer.load(save_path) + + # Test that vocab size is the same + assert tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size() + + # Test that vocabularies are the same + assert tokenizer.get_vocab() == loaded_tokenizer.get_vocab() + + # Test that both can encode/decode (even if tokens differ due to BPE state) + text = "hello world" + original_tokens = tokenizer.encode(text) + loaded_tokens = loaded_tokenizer.encode(text) + + # Both should produce valid token lists + assert isinstance(original_tokens, list) + assert isinstance(loaded_tokens, list) + assert len(original_tokens) > 0 + assert len(loaded_tokens) > 0 + + # Both should be able to decode + original_decoded = tokenizer.decode(original_tokens) + loaded_decoded = loaded_tokenizer.decode(loaded_tokens) + assert isinstance(original_decoded, str) + assert isinstance(loaded_decoded, str) + + print("✅ BPE tokenizer save/load test passed") + + +def test_gpt_with_tokenizer(): + """Test GPT model with tokenizer integration.""" + from llm.models.gpt import GPT + from llm.tokenizers import BPETokenizer + + # Create and train tokenizer + tokenizer = BPETokenizer() + texts = ["hello world", "test integration"] + tokenizer.train( + texts=texts, + vocab_size=50, + special_tokens=["", "", "", ""] + ) + + vocab_size = tokenizer.get_vocab_size() + + # Create GPT model with tokenizer's vocab size + config = { + "vocab_size": vocab_size, + "embed_dim": 128, + "num_heads": 4, + "num_layers": 2, + "max_position_embeddings": 256, + "dropout": 0.1 + } + + model = GPT(config) + + # Test with tokenized input + text = "hello world" + tokens = tokenizer.encode(text, add_special_tokens=False) + input_ids = torch.tensor([tokens]) + + with torch.no_grad(): + logits = model(input_ids) + + assert logits.shape == (1, len(tokens), vocab_size) + print("✅ GPT with tokenizer integration test passed") + + +def run_all_tests(): + """Run all basic tests.""" + print("🧪 Running basic tests for llm library...") + + test_gpt_model_creation() + test_bpe_tokenizer_basic() + test_token_embeddings() + test_multi_head_attention() + test_feed_forward() + test_gpt_generation() + test_bpe_tokenizer_save_load() + test_gpt_with_tokenizer() + + print("🎉 All basic tests passed!") + + +if __name__ == "__main__": + run_all_tests() diff --git a/llm/tests/tokenizers/test_base_tokenizer.py b/llm/tests/tokenizers/test_base_tokenizer.py new file mode 100644 index 0000000..629aca3 --- /dev/null +++ b/llm/tests/tokenizers/test_base_tokenizer.py @@ -0,0 +1,58 @@ +""" +Tests for base tokenizer. +""" + +import pytest +from llm.tokenizers import BaseTokenizer + + +class ConcreteTokenizer(BaseTokenizer): + """Concrete implementation for testing BaseTokenizer.""" + + def train(self, texts: list, vocab_size: int = 1000, **kwargs): + """Dummy implementation for testing.""" + pass + + def encode(self, text: str, **kwargs) -> list: + """Dummy implementation for testing.""" + return [1, 2, 3] + + def decode(self, tokens: list, **kwargs) -> str: + """Dummy implementation for testing.""" + return "decoded text" + + +class TestBaseTokenizer: + """Test cases for BaseTokenizer.""" + + def test_initialization(self): + """Test that BaseTokenizer can be initialized through concrete class.""" + tokenizer = ConcreteTokenizer() + assert tokenizer is not None + assert tokenizer.vocab == {} + assert tokenizer.vocab_size == 0 + + def test_encode_implemented(self): + """Test that encode method works in concrete implementation.""" + tokenizer = ConcreteTokenizer() + result = tokenizer.encode("test text") + assert result == [1, 2, 3] + + def test_decode_implemented(self): + """Test that decode method works in concrete implementation.""" + tokenizer = ConcreteTokenizer() + result = tokenizer.decode([1, 2, 3]) + assert result == "decoded text" + + def test_get_vocab_size(self): + """Test that get_vocab_size method works.""" + tokenizer = ConcreteTokenizer() + tokenizer.vocab = {"a": 0, "b": 1, "c": 2} + tokenizer.vocab_size = 3 + assert tokenizer.get_vocab_size() == 3 + + def test_get_vocab(self): + """Test that get_vocab method works.""" + tokenizer = ConcreteTokenizer() + tokenizer.vocab = {"a": 0, "b": 1, "c": 2} + assert tokenizer.get_vocab() == {"a": 0, "b": 1, "c": 2} diff --git a/llm/tests/tokenizers/test_bpe_tokenizer.py b/llm/tests/tokenizers/test_bpe_tokenizer.py new file mode 100644 index 0000000..1644ecd --- /dev/null +++ b/llm/tests/tokenizers/test_bpe_tokenizer.py @@ -0,0 +1,156 @@ +""" +Tests for BPE tokenizer. +""" + +import pytest +import tempfile +import os +from llm.tokenizers import BPETokenizer + + +class TestBPETokenizer: + """Test cases for BPETokenizer.""" + + @pytest.fixture + def sample_texts(self): + """Sample texts for training tokenizer.""" + return [ + "Искусственный интеллект", + "Нейронные сети", + "Машинное обучение", + "Глубокое обучение", + "Трансформеры" + ] + + @pytest.fixture + def trained_tokenizer(self, sample_texts): + """Create and train a BPE tokenizer.""" + tokenizer = BPETokenizer() + tokenizer.train( + texts=sample_texts, + vocab_size=100, + special_tokens=["", "", "", ""] + ) + return tokenizer + + def test_initialization(self): + """Test that BPETokenizer can be initialized.""" + tokenizer = BPETokenizer() + assert tokenizer is not None + + def test_train_tokenizer(self, sample_texts): + """Test that tokenizer can be trained.""" + tokenizer = BPETokenizer() + tokenizer.train( + texts=sample_texts, + vocab_size=50, + special_tokens=["", "", "", ""] + ) + + assert tokenizer.get_vocab_size() > 0 + assert len(tokenizer.get_vocab()) == tokenizer.get_vocab_size() + + def test_encode_decode(self, trained_tokenizer): + """Test encoding and decoding text.""" + text = "Искусственный интеллект" + + # Encode text + tokens = trained_tokenizer.encode(text) + assert isinstance(tokens, list) + assert len(tokens) > 0 + assert all(isinstance(token, int) for token in tokens) + + # Decode tokens + decoded_text = trained_tokenizer.decode(tokens) + assert isinstance(decoded_text, str) + # Decoded text should be similar to original (may have special tokens) + assert len(decoded_text) > 0 + + def test_encode_with_special_tokens(self, trained_tokenizer): + """Test encoding with special tokens.""" + text = "Нейронные сети" + + # Without special tokens + tokens_no_special = trained_tokenizer.encode(text, add_special_tokens=False) + + # With special tokens + tokens_with_special = trained_tokenizer.encode(text, add_special_tokens=True) + + # Should have more tokens when special tokens are added + assert len(tokens_with_special) >= len(tokens_no_special) + + def test_vocab_size(self, trained_tokenizer): + """Test vocabulary size.""" + vocab_size = trained_tokenizer.get_vocab_size() + assert isinstance(vocab_size, int) + assert vocab_size > 0 + + vocab = trained_tokenizer.get_vocab() + assert isinstance(vocab, dict) + assert len(vocab) == vocab_size + + def test_special_tokens(self, trained_tokenizer): + """Test that special tokens are in vocabulary.""" + vocab = trained_tokenizer.get_vocab() + + # Check that special tokens are in vocabulary + special_tokens = ["", "", "", ""] + for token in special_tokens: + assert token in vocab + assert isinstance(vocab[token], int) + + def test_save_load(self, trained_tokenizer, sample_texts): + """Test saving and loading tokenizer.""" + with tempfile.TemporaryDirectory() as temp_dir: + save_path = os.path.join(temp_dir, "test_tokenizer.json") + + # Save tokenizer + trained_tokenizer.save(save_path) + assert os.path.exists(save_path) + + # Load tokenizer + loaded_tokenizer = BPETokenizer.load(save_path) + assert loaded_tokenizer is not None + + # Check that loaded tokenizer works the same + original_vocab = trained_tokenizer.get_vocab() + loaded_vocab = loaded_tokenizer.get_vocab() + + assert original_vocab == loaded_vocab + assert trained_tokenizer.get_vocab_size() == loaded_tokenizer.get_vocab_size() + + # Test encoding consistency + text = sample_texts[0] + original_tokens = trained_tokenizer.encode(text) + loaded_tokens = loaded_tokenizer.encode(text) + + assert original_tokens == loaded_tokens + + def test_unknown_tokens(self, trained_tokenizer): + """Test handling of unknown tokens.""" + # Use text that likely contains unknown subwords + text = "xyzabc123" # Random text that shouldn't be in training data + + tokens = trained_tokenizer.encode(text) + assert len(tokens) > 0 + + # Should be able to decode back (even if it's mostly unk tokens) + decoded = trained_tokenizer.decode(tokens) + assert isinstance(decoded, str) + + def test_empty_text(self, trained_tokenizer): + """Test encoding and decoding empty text.""" + tokens = trained_tokenizer.encode("") + assert isinstance(tokens, list) + + decoded = trained_tokenizer.decode([]) + assert decoded == "" + + def test_tokenize_method(self, trained_tokenizer): + """Test the tokenize method.""" + text = "Искусственный интеллект" + tokens = trained_tokenizer.tokenize(text) + + assert isinstance(tokens, list) + assert len(tokens) > 0 + assert all(isinstance(token, str) for token in tokens)