{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Complete the transformer architecture" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# set up the env\n", "\n", "import pytest\n", "import ipytest\n", "import unittest\n", "\n", "ipytest.autoconfig()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Transformer Model\n", "\n", "The encoder-decoder architecture based on the Transformer structure is illustrated in figure below. The left and right sides correspond to the encoder and decoder structures, respectively. They consist of several basic Transformer blocks (represented by the gray boxes in the figure), stacked N times. Each component comprises multiple Transformer blocks, which are stacked N times.\n", "\n", "Here's an overview of the key components and processes involved in the semantic abstraction process from input to output:\n", "\n", "Encoder:\n", "\n", "The encoder takes an input sequence {xi}ti=1, where each xi represents the representation of a word in the text sequence.\n", "It consists of stacked Transformer blocks. Each block includes:\n", "Attention Layer: Utilizes multi-head attention mechanisms to capture dependencies between words in the input sequence, facilitating the modeling of long-range dependencies without traditional recurrent structures.\n", "Position-wise Feedforward Layer: Applies complex transformations to the representations of each word in the input sequence.\n", "Residual Connections: Directly connect the input and output of the attention and feedforward layers, aiding in efficient information flow and model optimization.\n", "Layer Normalization: Normalizes the output representations of the attention and feedforward layers, stabilizing optimization.\n", "Decoder:\n", "\n", "The decoder generates an output sequence {yi}ti=1 based on the representations learned by the encoder.\n", "Similar to the encoder, it consists of stacked Transformer blocks, each including the same components as described above.\n", "In addition, the decoder includes an additional attention mechanism that focuses on the encoder's output to incorporate context information during sequence generation.\n", "Overall, the encoder-decoder architecture based on the Transformer structure allows for effective semantic abstraction by leveraging attention mechanisms, position-wise feedforward layers, residual connections, and layer normalization. This architecture enables the model to capture complex dependencies between words in the input sequence and generate meaningful outputs for various sequence-to-sequence tasks.\n", "\n", ":::{figure} https://static-1300131294.cos.ap-shanghai.myqcloud.com/images/llm/Transformer-python-%281%29.png\n", "Transformer-based encoder and decoder Architecture\n", ":::\n", "\n", "Next, we'll discuss the specific functionalities and implementation methods of each module in detail." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Embedding Layer\n", "\n", "The Embedding Layer in the Transformer model is responsible for converting discrete token indices into continuous vector representations. Each token index is mapped to a high-dimensional vector, which is learned during the training process. These embeddings capture semantic and syntactic information about the tokens.\n", "\n", "Implementation in PyTorch:\n", "\n", "We define a PositionalEncoder class that inherits from nn.Module.\n", "The constructor initializes the positional encoding matrix (pe) based on the given d_model (dimension of the model) and max_seq_len (maximum sequence length).\n", "The forward method scales the input embeddings (x) by the square root of the model dimension and adds the positional encoding matrix (pe) to the input embeddings.\n", "Note that we're using PyTorch's Variable and autograd to ensure that the positional encoding is compatible with the autograd mechanism for backpropagation.\n", "Finally, the PositionalEncoder class can be used within a larger PyTorch model to incorporate positional information into word embeddings." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import torch\n", "import torch.nn as nn\n", "import math\n", "import copy\n", "import time\n", "import torch.optim as optim\n", "import torch.nn.functional as F\n", "from torch.autograd import Variable\n", "import numpy as np\n", "\n", "class PositionalEncoder(nn.Module):\n", " def __init__(self, d_model, max_seq_len=80):\n", " super().__init__()\n", " self.d_model = d_model\n", " # Creating a constant PE matrix based on pos and i\n", " pe = torch.zeros(max_seq_len, d_model)\n", " for pos in range(max_seq_len):\n", " for i in range(0, d_model, 2):\n", " pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / d_model)))\n", " pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / d_model)))\n", " pe = pe.unsqueeze(0)\n", " self.register_buffer('pe', pe)\n", "\n", " def forward(self, x):\n", " # Scaling word embeddings to make them relatively larger\n", " x = x * math.sqrt(self.d_model)\n", " # Adding positional constants to word embedding representations\n", " seq_len = x.size(1)\n", " x = x + Variable(self.pe[:, :seq_len], requires_grad=False).cuda()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
Check result by executing below... 📝
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "%%ipytest -qq\n", "\n", "class TestPositionalEncoder(unittest.TestCase):\n", " def setUp(self):\n", " self.d_model = 512\n", " self.max_seq_len = 10 # Maximum sequence length for testing\n", " self.positional_encoder = PositionalEncoder(self.d_model, self.max_seq_len)\n", "\n", " def test_forward(self):\n", " # Create a sample input tensor representing word embeddings\n", " batch_size = 2\n", " seq_length = 5\n", " word_embeddings = torch.randn(batch_size, seq_length, self.d_model)\n", "\n", " # Forward pass through the PositionalEncoder module\n", " output = self.positional_encoder(word_embeddings)\n", "\n", " # Check if the output shape matches the input shape\n", " assert output.shape == (batch_size, seq_length, self.d_model)\n", "\n", " # Check if positional encoding is correctly applied\n", " # Example: Verify if the first element of the first embedding vector matches the expected value\n", " expected_first_element = torch.sin(torch.tensor([0.0])) * math.sqrt(self.d_model)\n", " assert math.isclose(output[0, 0, 0].item(), expected_first_element.item(), rel_tol=1e-6)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Attention Layer\n", "The Attention Layer in the Transformer model enables the model to focus on different parts of the input sequence when processing each token. It computes attention scores between each pair of tokens in the input sequence and generates a context vector for each token based on the importance of other tokens. This mechanism allows the model to capture long-range dependencies in the input sequence effectively.\n", "\n", "Implementation in PyTorch:\n", "\n", "The MultiHeadAttention class defines a multi-head self-attention layer.\n", "The forward method performs linear operations to divide inputs into multiple heads, computes attention scores, and aggregates the outputs of multiple heads." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class MultiHeadAttention(nn.Module):\n", " def __init__(self, heads, d_model, dropout=0.1):\n", " super().__init__()\n", " self.d_model = d_model\n", " self.d_k = d_model // heads\n", " self.h = heads\n", " self.q_linear = nn.Linear(d_model, d_model)\n", " self.v_linear = nn.Linear(d_model, d_model)\n", " self.k_linear = nn.Linear(d_model, d_model)\n", " self.dropout = nn.Dropout(dropout)\n", " self.out = nn.Linear(d_model, d_model)\n", "\n", " def attention(self, q, k, v, d_k, mask=None, dropout=None):\n", " scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)\n", " # Masking out those units added for length padding, setting them to zero after softmax computation\n", " if mask is not None:\n", " mask = mask.unsqueeze(1)\n", " scores = scores.masked_fill(mask == 0, -1e9)\n", " scores = F.softmax(scores, dim=-1)\n", " if dropout is not None:\n", " scores = dropout(scores)\n", " output = torch.matmul(scores, v)\n", " return output\n", "\n", " def forward(self, q, k, v, mask=None):\n", " bs = q.size(0)\n", " # Linear operations to divide into h heads\n", " k = self.k_linear(k).view(bs, -1, self.h, self.d_k)\n", " q = self.q_linear(q).view(bs, -1, self.h, self.d_k)\n", " v = self.v_linear(v).view(bs, -1, self.h, self.d_k)\n", " # Matrix transposition\n", " k = k.transpose(1, 2)\n", " q = q.transpose(1, 2)\n", " v = v.transpose(1, 2)\n", " # Computing attention\n", " scores = self.attention(q, k, v, self.d_k, mask, self.dropout)\n", " # Concatenating multiple heads and feeding into the final linear layer\n", " concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)\n", " output = self.out(concat)\n", " return output\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
Check result by executing below... 📝
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "%%ipytest -qq\n", "\n", "class TestMultiHeadAttention(unittest.TestCase):\n", " def test_forward(self):\n", " # Instantiate MultiHeadAttention module\n", " heads = 4\n", " d_model = 64\n", " dropout = 0.1\n", " multihead_attn = MultiHeadAttention(heads, d_model, dropout)\n", "\n", " # Create sample input tensors\n", " batch_size = 2\n", " seq_length = 5\n", " q = torch.randn(batch_size, seq_length, d_model)\n", " k = torch.randn(batch_size, seq_length, d_model)\n", " v = torch.randn(batch_size, seq_length, d_model)\n", " mask = torch.randint(0, 2, (batch_size, 1, seq_length)) # Example mask tensor\n", "\n", " # Forward pass through the MultiHeadAttention module\n", " output = multihead_attn(q, k, v, mask)\n", "\n", " # Check output shape\n", " self.assertEqual(output.shape, (batch_size, seq_length, d_model))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feedforward Layer\n", "\n", "The Position-wise Feedforward Layer in the Transformer model applies a simple feedforward neural network independently to each position in the sequence. It consists of two linear transformations with a non-linear activation function (commonly ReLU) applied in between. This layer helps capture complex interactions between different dimensions of the input embeddings.\n", "\n", "Implementation in PyTorch:\n", "\n", "The FeedForward class defines a feedforward layer.\n", "The forward method applies ReLU activation to the output of the first linear transformation, followed by dropout, and then performs the second linear transformation to produce the final output." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class FeedForward(nn.Module):\n", " def __init__(self, d_model, d_ff=2048, dropout=0.1):\n", " super().__init__()\n", " # Setting d_ff default to 2048\n", " self.linear_1 = nn.Linear(d_model, d_ff)\n", " self.dropout = nn.Dropout(dropout)\n", " self.linear_2 = nn.Linear(d_ff, d_model)\n", "\n", " def forward(self, x):\n", " x = self.dropout(F.relu(self.linear_1(x)))\n", " x = self.linear_2(x)\n", " return x\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
Check result by executing below... 📝
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "%%ipytest -qq\n", "\n", "class TestFeedForward(unittest.TestCase):\n", " def test_forward(self):\n", " # Instantiate FeedForward module\n", " d_model = 512\n", " d_ff = 2048\n", " dropout = 0.1\n", " feed_forward = FeedForward(d_model, d_ff, dropout)\n", "\n", " # Create sample input tensor\n", " batch_size = 2\n", " seq_length = 5\n", " input_tensor = torch.randn(batch_size, seq_length, d_model)\n", "\n", " # Forward pass through the FeedForward module\n", " output = feed_forward(input_tensor)\n", "\n", " # Check output shape\n", " self.assertEqual(output.shape, (batch_size, seq_length, d_model))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Residual Connection and Layer Normalization\n", "\n", "Residual Connection:\n", "The Residual Connection, also known as skip connection, is a technique used in deep neural networks to mitigate the vanishing gradient problem and facilitate the flow of information through the network. In the context of the Transformer model, residual connections are added around each sub-layer (such as attention and feedforward layers) before applying layer normalization. This allows the model to learn residual representations and thus ease the optimization process.\n", "\n", "Layer Normalization:\n", "Layer Normalization is a technique used to stabilize the training of deep neural networks by normalizing the activations of each layer. In the Transformer model, layer normalization is applied after each sub-layer (such as attention and feedforward layers) and before the residual connection. It normalizes the activations along the feature dimension, allowing the model to learn more robust representations and accelerate convergence during training.\n", "\n", "Implementation in PyTorch:\n", "\n", "The NormLayer class defines a layer normalization layer.\n", "The forward method computes the layer normalization using the given input tensor x." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class NormLayer(nn.Module):\n", " def __init__(self, d_model, eps=1e-6):\n", " super().__init__()\n", " self.size = d_model\n", " # Layer normalization includes two learnable parameters\n", " self.alpha = nn.Parameter(torch.ones(self.size))\n", " self.bias = nn.Parameter(torch.zeros(self.size))\n", " self.eps = eps\n", "\n", " def forward(self, x):\n", " norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) \\\n", " / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias\n", " return norm\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
Check result by executing below... 📝
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "jupyter": { "source_hidden": true }, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "%%ipytest -qq\n", "\n", "class TestNormLayer(unittest.TestCase):\n", " def test_forward(self):\n", " # Instantiate NormLayer module\n", " d_model = 512\n", " eps = 1e-6\n", " norm_layer = NormLayer(d_model, eps)\n", "\n", " # Create sample input tensor\n", " batch_size = 2\n", " seq_length = 5\n", " input_tensor = torch.randn(batch_size, seq_length, d_model)\n", "\n", " # Forward pass through the NormLayer module\n", " output = norm_layer(input_tensor)\n", "\n", " # Check output shape\n", " self.assertEqual(output.shape, (batch_size, seq_length, d_model))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Encoder and Decoder Structure\n", "Encoder Structure:\n", "The Encoder in the Transformer model consists of multiple stacked Encoder layers. Each Encoder layer typically contains a Multi-Head Attention sub-layer followed by a FeedForward sub-layer, each with Residual Connection and Layer Normalization.\n", "\n", "Decoder Structure:\n", "Similarly, the Decoder in the Transformer model also consists of multiple stacked Decoder layers. Each Decoder layer contains three sub-layers:\n", "\n", "Masked Multi-Head Attention sub-layer to attend to previous tokens in the output sequence.\n", "Multi-Head Attention sub-layer that attends to the encoder's output.\n", "FeedForward sub-layer. Again, each sub-layer is followed by Residual Connection and Layer Normalization.\n", "\n", "Below are the Python implementations for the Encoder and Decoder structures:\n", "\n", "The EncoderLayer and DecoderLayer classes define encoder and decoder layers, respectively.\n", "The Encoder and Decoder classes define encoder and decoder modules, respectively, composed of multiple layers of encoder or decoder layers.\n", "These classes follow the architecture described in the text, including the use of multi-head attention, feedforward layers, residual connections, and layer normalization." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Embedder(nn.Module):\n", " def __init__(self, vocab_size, d_model):\n", " super(Embedder, self).__init__()\n", " self.embed = nn.Embedding(vocab_size, d_model)\n", " self.d_model = d_model\n", "\n", " def forward(self, x):\n", " return self.embed(x) * np.sqrt(self.d_model)\n", "\n", "def get_clones(module, N):\n", " return nn.ModuleList([copy.deepcopy(module) for i in range(N)])\n", "\n", "class PositionalEncoder(nn.Module):\n", " def __init__(self, d_model, dropout, max_len=5000):\n", " super(PositionalEncoder, self).__init__()\n", " self.dropout = nn.Dropout(p=dropout)\n", "\n", " pe = torch.zeros(max_len, d_model)\n", " position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)\n", " div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))\n", " pe[:, 0::2] = torch.sin(position * div_term)\n", " pe[:, 1::2] = torch.cos(position * div_term)\n", " pe = pe.unsqueeze(0).transpose(0, 1)\n", " self.register_buffer('pe', pe)\n", "\n", " def forward(self, x):\n", " x = x + self.pe[:x.size(0), :]\n", " return self.dropout(x)\n", "\n", "class EncoderLayer(nn.Module):\n", " def __init__(self, d_model, heads, dropout=0.1):\n", " super().__init__()\n", " self.norm_1 = NormLayer(d_model)\n", " self.norm_2 = NormLayer(d_model)\n", " self.attn = MultiHeadAttention(heads, d_model, dropout=dropout)\n", " self.ff = FeedForward(d_model, dropout=dropout)\n", " self.dropout_1 = nn.Dropout(dropout)\n", " self.dropout_2 = nn.Dropout(dropout)\n", "\n", " def forward(self, x, mask):\n", " x2 = self.norm_1(x)\n", " x = x + self.dropout_1(self.attn(x2, x2, x2, mask))\n", " x2 = self.norm_2(x)\n", " x = x + self.dropout_2(self.ff(x2))\n", " return x\n", "\n", "\n", "class Encoder(nn.Module):\n", " def __init__(self, vocab_size, d_model, N, heads, dropout):\n", " super().__init__()\n", " self.N = N\n", " self.embed = Embedder(vocab_size, d_model)\n", " self.pe = PositionalEncoder(d_model, dropout=dropout)\n", " self.layers = get_clones(EncoderLayer(d_model, heads, dropout), N)\n", " self.norm = NormLayer(d_model)\n", "\n", " def forward(self, src, mask):\n", " x = self.embed(src)\n", " x = self.pe(x)\n", " for i in range(self.N):\n", " x = self.layers[i](x, mask)\n", " return self.norm(x)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "class DecoderLayer(nn.Module):\n", " def __init__(self, d_model, heads, dropout=0.1):\n", " super().__init__()\n", " self.norm_1 = NormLayer(d_model)\n", " self.norm_2 = NormLayer(d_model)\n", " self.norm_3 = NormLayer(d_model)\n", " self.dropout_1 = nn.Dropout(dropout)\n", " self.dropout_2 = nn.Dropout(dropout)\n", " self.dropout_3 = nn.Dropout(dropout)\n", " self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout)\n", " self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout)\n", " self.ff = FeedForward(d_model, dropout=dropout)\n", "\n", " def forward(self, x, e_outputs, src_mask, trg_mask):\n", " x2 = self.norm_1(x)\n", " x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))\n", " x2 = self.norm_2(x)\n", " x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs, src_mask))\n", " x2 = self.norm_3(x)\n", " x = x + self.dropout_3(self.ff(x2))\n", " return x\n", "\n", "\n", "class Decoder(nn.Module):\n", " def __init__(self, vocab_size, d_model, N, heads, dropout):\n", " super().__init__()\n", " self.N = N\n", " self.embed = Embedder(vocab_size, d_model)\n", " self.pe = PositionalEncoder(d_model, dropout=dropout)\n", " self.layers = get_clones(DecoderLayer(d_model, heads, dropout), N)\n", " self.norm = NormLayer(d_model)\n", "\n", " def forward(self, trg, e_outputs, src_mask, trg_mask):\n", " x = self.embed(trg)\n", " x = self.pe(x)\n", " for i in range(self.N):\n", " x = self.layers[i](x, e_outputs, src_mask, trg_mask)\n", " return self.norm(x)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The overall implementation of the Transformer encoder and decoder structure:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Transformer(nn.Module):\n", " def __init__(self, src_vocab, trg_vocab, d_model, N, heads, dropout):\n", " super().__init__()\n", " self.encoder = Encoder(src_vocab, d_model, N, heads, dropout)\n", " self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout)\n", " self.out = nn.Linear(d_model, trg_vocab)\n", "\n", " def forward(self, src, trg, src_mask, trg_mask):\n", " e_outputs = self.encoder(src, src_mask)\n", " d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)\n", " output = self.out(d_output)\n", " return output\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The training process for the Transformer model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Sample English and French text data\n", "en_data = [\n", " \"I love coding.\",\n", " \"Machine learning is fascinating.\",\n", " \"Natural language processing is fun.\"\n", "]\n", "\n", "fr_data = [\n", " \"J'adore coder.\",\n", " \"L'apprentissage automatique est fascinant.\",\n", " \"Le traitement du langage naturel est amusant.\"\n", "]\n", "\n", "def tokenize_en(sentence):\n", " # You can implement a more sophisticated tokenizer here if needed\n", " return sentence.lower().split() # Simple tokenizer, converts to lowercase and splits by space\n", "\n", "def tokenize_fr(sentence):\n", " # You can implement a more sophisticated tokenizer here if needed\n", " return sentence.lower().split() # Simple tokenizer, converts to lowercase and splits by space\n", "# Tokenize English and French text\n", "en_sentences = [tokenize_en(sentence) for sentence in en_data]\n", "fr_sentences = [tokenize_fr(sentence) for sentence in fr_data]\n", "\n", "# Create English and French vocabularies\n", "en_vocab = {'': 0, '': 1, '': 2, '': 3} # Initialize with special tokens\n", "fr_vocab = {'': 0, '': 1, '': 2, '': 3} # Initialize with special tokens\n", "\n", "# Build English vocabulary\n", "for sentence in en_sentences:\n", " for word in sentence:\n", " if word not in en_vocab:\n", " en_vocab[word] = len(en_vocab)\n", "\n", "# Build French vocabulary\n", "for sentence in fr_sentences:\n", " for word in sentence:\n", " if word not in fr_vocab:\n", " fr_vocab[word] = len(fr_vocab)\n", "\n", "# Reverse vocabularies to get index-to-token mappings\n", "en_index_to_word = {index: word for word, index in en_vocab.items()}\n", "fr_index_to_word = {index: word for word, index in fr_vocab.items()}\n", "\n", "# Model parameters\n", "d_model = 512\n", "heads = 8\n", "N = 6\n", "src_vocab = len(en_vocab)\n", "trg_vocab = len(fr_vocab)\n", "dropout = 0.1 \n", "\n", "# Initialize the model\n", "model = Transformer(src_vocab, trg_vocab, d_model, N, heads, dropout)\n", "for p in model.parameters():\n", " if p.dim() > 1:\n", " nn.init.xavier_uniform_(p)\n", "\n", "# Optimizer\n", "optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)\n", "\n", "\n", "# Training the model\n", "def train_model(epochs, en_sentences, fr_sentences, print_every=100):\n", " model.train()\n", " start = time.time()\n", " temp = start\n", " total_loss = 0\n", " \n", " for epoch in range(epochs):\n", " for i in range(len(en_sentences)):\n", " src_sentence = en_sentences[i]\n", " trg_sentence = fr_sentences[i]\n", " \n", " src_tensor = torch.LongTensor([en_vocab[word] for word in src_sentence])\n", " trg_tensor = torch.LongTensor([fr_vocab[word] for word in trg_sentence])\n", " \n", " src = src_tensor.unsqueeze(0) # Add batch dimension\n", " trg = trg_tensor.unsqueeze(0) # Add batch dimension\n", " \n", " trg_input = trg[:, :-1]\n", " targets = trg[:, 1:].contiguous().view(-1)\n", " \n", " src_mask, trg_mask = create_masks(src, trg_input)\n", " \n", " preds = model(src, trg_input, src_mask, trg_mask)\n", " \n", " optim.zero_grad()\n", " loss = F.cross_entropy(preds.view(-1, preds.size(-1)), targets, ignore_index=fr_vocab[''])\n", " loss.backward()\n", " optim.step()\n", " \n", " total_loss += loss.item()\n", " \n", " if (i + 1) % print_every == 0:\n", " loss_avg = total_loss / print_every\n", " print(\"time = %dm, epoch %d, iter = %d, loss = %.3f, %ds per %d iters\" % ((time.time() - start) // 60, epoch + 1, i + 1, loss_avg, time.time() - temp, print_every))\n", " total_loss = 0\n", " temp = time.time()\n", " \n", "train_model(1000, en_sentences, fr_sentences, 100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Test the trained model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Test the model\n", "def translate(model, src_sentence, en_vocab, fr_vocab, max_len=80):\n", " model.eval()\n", "\n", " # Tokenize the source sentence\n", " src_tokens = tokenize_en(src_sentence)\n", " \n", " # Convert tokens to indices using the English vocabulary\n", " src_indices = [en_vocab.get(token, en_vocab['']) for token in src_tokens]\n", " \n", " # Convert indices to tensor and add batch dimension\n", " src_tensor = torch.LongTensor(src_indices).unsqueeze(0)\n", " \n", " # Initialize target input with '' token\n", " trg_input = torch.LongTensor([[fr_vocab['']]])\n", " \n", " # Initialize list to store the generated translation\n", " translation = []\n", " \n", " with torch.no_grad():\n", " for i in range(max_len):\n", " # Generate mask for source sentence\n", " src_mask = (src_tensor != en_vocab['']).unsqueeze(-2)\n", " \n", " # Generate mask for target sentence\n", " trg_mask = torch.triu(torch.ones((1, i+1, i+1), device=src_tensor.device)).bool()\n", " \n", " # Generate predictions for next token\n", " preds = model(src_tensor, trg_input, src_mask, trg_mask)\n", " \n", " # Get predicted token (index)\n", " pred_token = preds.argmax(dim=-1)[:,-1].item()\n", " \n", " # Append predicted token to translation list\n", " translation.append(pred_token)\n", " \n", " # If predicted token is end-of-sentence token, stop\n", " if pred_token == fr_vocab['']:\n", " break\n", " \n", " # Append predicted token to target input for next iteration\n", " trg_input = torch.cat([trg_input, torch.LongTensor([[pred_token]])], dim=-1)\n", " \n", " # Convert indices back to tokens using the French vocabulary\n", " translated_sentence = [fr_index_to_word[token] for token in translation]\n", " \n", " return ' '.join(translated_sentence)\n", "\n", "for src_sentence in en_data:\n", " translation = translate(model, src_sentence, en_vocab, fr_vocab)\n", " print(\"Source:\", src_sentence)\n", " print(\"Translation:\", translation)\n", " print()" ] } ], "metadata": { "kernelspec": { "display_name": "open-machine-learning-jupyter-book", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.18" } }, "nbformat": 4, "nbformat_minor": 2 }