본문 바로가기
인공지능

[인공지능개론] Transformer②

by 반달링 2024. 8. 20.

이제 진짜 Transformer에 대해 알아보자. Transformer는 NLP 분야에서 많이 쓰이던 RNN 구조가 아닌 Attention 매커니즘을 사용하여 번역 부분에서 SOTA를 차지하였고 현재는 다양한 분야에서 우수한 성능을 보여주고 있다. 

Transformer model의 전체 구조는 아래와 같다. 여기서 주의할 점은 내부적으로는 Self Attention을 수행하지만 Encoder의 결과를 Decoder의 입력으로 전달할 때에는 Cross Attention을 한다는 점이다.

sequence data의 패턴을 찾는 예제와 함께 Transformer  구조를 분석해보자. 이는  input sequence로 1,1,1,1이 들어온다면 1이 반복되는 패턴을 인식하여 1,1,1,1,1,1,1,1,1을 출력하고, input sequence로 0,1,0,1,0이 들어온다면 0과 1이 번갈아가며 변하는 패턴을 인식하여 이어서 1,0,1,0,1,0,1,0,1을 출력하는 간단한 task이다.

먼저 필요한 라이브러리를 import 해준다.

import torch
import torch.nn as nn
import torch.optim as optim

import math
import numpy as np
import random

Transformer를 구현하기 이전에 task에 필요한 data 생성을 먼저 구현하자.

generate_random_data 함수는 random data를 생성하는 것으로, 데이터의 형태는 각각의 sequence가 SOS token, 1 또는 0으로 이루어진 8개의 요소, EOS token으로 이루어진 쌍이다. 이때, SOS token은 Start of Sentence, EOS token은 End of Sentence이다. 데이터의 전체 개수는 입력으로 주어진 n의 1/3로 설정되고 각각의 sequence는 list로 표현되며, random하게 섞여 반환된다.

batchify_data 함수는 입력으로 받은 data를 지정된 batch 크기로 나누어 반환하며, 각 batch는 padding을 추가해 최대 길이에 맞출 수 있다. 이 또한 batch list로 반환한다. 이렇게 생성된 data들은 train_dataloader와 val_dataloader로 저장한다.

def generate_random_data(n):
    SOS_token = np.array([2])
    EOS_token = np.array([3])
    length = 8

    data = []

    # 1,1,1,1,1 -> 1,1,1,1,1
    for i in range(n // 3):
        X = np.concatenate((SOS_token, np.ones(length), EOS_token))
        y = np.concatenate((SOS_token, np.ones(length), EOS_token))
        data.append([X, y])

    # 0,0,0,0 -> 0,0,0,0
    for i in range(n // 3):
        X = np.concatenate((SOS_token, np.zeros(length), EOS_token))
        y = np.concatenate((SOS_token, np.zeros(length), EOS_token))
        data.append([X, y])

    # 1,0,1,0 -> 1,0,1,0,1
    for i in range(n // 3):
        X = np.zeros(length)
        start = random.randint(0, 1)

        X[start::2] = 1

        y = np.zeros(length)
        if X[-1] == 0:
            y[::2] = 1
        else:
            y[1::2] = 1

        X = np.concatenate((SOS_token, X, EOS_token))
        y = np.concatenate((SOS_token, y, EOS_token))
        data.append([X, y])

    np.random.shuffle(data)

    return data

#크기가 16인 배치 형태로 만들어 줍니다.
def batchify_data(data, batch_size=16, padding=False, padding_token=-1):
    batches = []
    for idx in range(0, len(data), batch_size):
        # batch_size 크기가 아닌 경우 마지막 비트를 얻지 않도록 합니다.
        if idx + batch_size < len(data):
            # 여기서 배치의 최대 길이를 가져와 PAD 토큰으로 길이를 정규화해야 합니다.
            if padding:
                max_batch_length = 0
                # batch에서 가장 긴 문장 가져오기
                for seq in data[idx : idx + batch_size]:
                    if len(seq) > max_batch_length:
                        max_batch_length = len(seq)

                # 최대 길이에 도달할 때까지 X 패딩 토큰을 추가합니다.
                for seq_idx in range(batch_size):
                    remaining_length = max_bath_length - len(data[idx + seq_idx])
                    data[idx + seq_idx] += [padding_token] * remaining_length

            batches.append(np.array(data[idx : idx + batch_size]).astype(np.int64))

    print(f"{len(batches)} batches of size {batch_size}")

    return batches


train_data = generate_random_data(9000)
val_data = generate_random_data(3000)

train_dataloader = batchify_data(train_data)
val_dataloader = batchify_data(val_data)

Transformer는 torch.nn.Transformer를 통해 쉽게 구현할 수 있다.

class Transformer(nn.Module):
    def __init__(self, num_tokens, dim_model, num_heads, num_encoder_layers, num_decoder_layers, dropout_p, ):
        super().__init__()

        # Layers
        self.transformer = nn.Transformer(
            d_model=dim_model,                      # embedding vector의 크기
            nhead=num_heads,                        # multi-head attention에서의 head 수
            num_encoder_layers=num_encoder_layers,  # encoder layers의 수
            num_decoder_layers=num_decoder_layers,  # decoder layers의 수
            dropout=dropout_p,                      # dropout 비율
        )

    def forward(self):
        pass

torch.nn.Transformer 모델의 주요 파라미터를 살펴보자.

d_model : embedding vector의 크기로, transformer의 encoder와 decoder에서의 정해진 입출력의 크기를 의미하며, default 값은 512이다.

nhead : multi-head attention 모델의 head 수로, default값은 8이다.

num_encoder_layers : encoder가 총 몇 층으로 구성되어 있는지를 나타내며, default 값은 6이다.

num_decoder_layers : decoder가 총 몇 층으로 구성되어 있는지를 나타내며, default 값은 6이다.

dim_feedforward : FeedForward Neural Network 은닉층의 크기로, default = 2048이다.

class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len):
        super().__init__()

        self.dropout = nn.Dropout(dropout_p)

        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) # 1/10000^(2i/dim_model)

        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)

        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding", pos_encoding)

    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])

Transformer는 단어의 입력을 순차적으로 받는 방식이 아니지만 sequential data를 다루기 때문에, positional encoding을 이용하여 각 token의 위치 정보를 encoding하여 입력으로 전달한다. embedding vector가 모여 만들어진 sequence 행렬에 sin함수와 cos함수의 값을 더해주는 방식으로, embedding vector 내의 짝수 index에는 sin 값을, 홀수 index에는 cos 값을 사용한다.

positions_list : 위치를 나타내는 리스트로, 입력 sequence에서의 embedding vector의 위치를 나타낸다.

max_len : 사용할 sequence의 최대 길이를 의미한다.

register_buffer로 layer를 등록하면 optimizer가 업데이트하지 않는다. forward에서는 input token embedding에 positional encoing을 더하고 dropout을 적용한다.

이제 Transformer 모델을 구체화해보자.

Transformer의 Encoder Block에는 Multi-Head Attention과 Position-Wise Feed Forward Network로 구성되어 있다.

Multi-Head Attention은 weight와의 vector 곱 연산을 수행하여 Scaled Dot-Product Attention, 또는 자기 자신에게 Attention을 수행하여 Self Attention이라고도 한다. 같은 입력 sequence로부터 Encoder Block에 입력된 vector들이 weight와의 vector 곱을 통해 Query, Key, Value를 생성한다. 주어진 Query에 대해 모든 Key와의 유사도를 계산한 후에 softmax를 적용한다. autocorrelation으로 자기자신과 곱할 때 가장 큰 값을 도출하게 된다. 해당 유사도를 Key와 mapping되어 있는 각각의 Value에 반영하는 매커니즘이다.

Position-Wise Feed Forward Network는 Encoder와 Decoder에서 공통적으로 가지고 있는 sub-layer로, attention layer를 통과한 값들을 입력으로 받아 fully connected layer로 작용한다.

Transformer의 Decoder Block에는 Masked Multi-Head Attention, Multi-Head Attention, Position-Wise Feed Forward Network로 구성되어 있다. 앞선 Encoder와 다르게 특이한 점은 Masked Multi-Head Attention이다. Multi-Head Attention 앞에 Masked가 붙는 이유는 Transformer는 전체 sequence 행렬을 입력으로 받기 때문에, 현재 시점의 단어를 예측하고자 할때 입력 sequecne 행렬로부터 미래 시점의 단어까지도 참고할 수 있는 현상이 발생하므로, 현 시점보다 미래의 단어를 참고하지 못하도록 look-ahead mask를 도입하는 것이다. 즉, inference를 수행할 때 cheating하지 못하도록 Masking하기 때문이다. 연산은 Encoder의 Multi-Head Attention과 동일하며, 계산된 Attention score(유사도) 행렬에 Masking을 적용한다.

class Transformer(nn.Module):
    # Constructor
    def __init__(self, num_tokens, dim_model, num_heads, num_encoder_layers, num_decoder_layers, dropout_p,):
        super().__init__()

        # INFO
        self.model_type = "Transformer"
        self.dim_model = dim_model

        # LAYERS
        self.positional_encoder = PositionalEncoding(dim_model=dim_model, dropout_p=dropout_p, max_len=5000)
        self.embedding = nn.Embedding(num_tokens, dim_model)
        self.transformer = nn.Transformer(
            d_model=dim_model,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout_p,
        )
        self.out = nn.Linear(dim_model, num_tokens)

    def forward(self, src, tgt, tgt_mask=None, src_pad_mask=None, tgt_pad_mask=None):
        # src, Tgt size -> (batch_size, src sequence length)

        # Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
        src = self.embedding(src) * math.sqrt(self.dim_model)
        tgt = self.embedding(tgt) * math.sqrt(self.dim_model)
        src = self.positional_encoder(src)
        tgt = self.positional_encoder(tgt)

        # Dimension -> (sequence length, batch_size, dim_model)
        src = src.permute(1,0,2)
        tgt = tgt.permute(1,0,2)

        # Transformer blocks - Out size = (sequence length, batch_size, num_tokens)
        transformer_out = self.transformer(src, tgt, tgt_mask=tgt_mask, src_key_padding_mask=src_pad_mask, tgt_key_padding_mask=tgt_pad_mask)
        out = self.out(transformer_out)

        return out

    def get_tgt_mask(self, size) -> torch.tensor:
        mask = torch.tril(torch.ones(size, size) == 1) # Lower triangular matrix
        mask = mask.float()
        mask = mask.masked_fill(mask == 0, float('-inf')) # Convert zeros to -inf
        mask = mask.masked_fill(mask == 1, float(0.0)) # Convert ones to 0

        return mask

    def create_pad_mask(self, matrix: torch.tensor, pad_token: int) -> torch.tensor:
        return (matrix == pad_token)
  1. Input Embedding :입력 sequence src와 tgt를 embedding layer에 통과시키고 균형잡힌 gradient 흐름을 유지하기 위해 dim_model의 제곱근으로 scaling한다.
  2. Positional Encoding : embedding에 positional Encoding을 추가하여 sequence의 위치 정보를 추가한다.
  3. Encoder, Decoder Block : self.transformer를 통해 PyTorch의 nn.Transformer class를 인스턴스화하여 내부적으로 Encoder Block과 Decoder Block을 구성하고 있다.
  4. Output : self.out을 통해 Linear Layer를 지나 출력한다.

이 중에 masking하는 함수에 대해 좀더 자세히 살펴보자. get_tgt_mask는 target sequence(tgt)에서 미래 시점의 token을 참고하지 못하도록 하삼각 행렬을 생성한다. 이는 모델이 현 시점과 과거 시점의 token만을 참고하도록 masking하는 역할을 한다. create_pad_mask는 padding token을 masking하여 모델이 padding을 무시하도록 한다. 해당 함수들에서 만든 mask를 transformer 함수에 함께 전달하여 성공적으로 Masked Multi-Head Attention을 이루도록 한다.

device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer(num_tokens=4,
                    dim_model=8,
                    num_heads=2,
                    num_encoder_layers=3,
                    num_decoder_layers=3,
                    dropout_p=0.1).to(device)
opt = torch.optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()

device를 정의하여 torch의 cuda 이용이 가능한 경우는 GPU를 사용하고 그렇지 않은 경우는 CPU를 사용하도록 장치를 설정한다.

optimizer : SGD(Stochastic Gradient Descent)

loss function : CrossEntropyLoss

def train_loop(model, opt, loss_fn, dataloader):
    model.train()
    total_loss = 0

    for batch in dataloader:
        X, y = batch[:, 0], batch[:, 1]
        X, y = torch.tensor(X).to(device), torch.tensor(y).to(device)

        # 이제 tgt를 1만큼 이동하여 <SOS>를 사용하여 pos 1에서 토큰을 예측
        y_input = y[:,:-1]
        y_expected = y[:,1:]

        # 다음 단어를 마스킹하려면 마스크 가져오기
        sequence_length = y_input.size(1)
        tgt_mask = model.get_tgt_mask(sequence_length).to(device)

        # X, y_input 및 tgt_mask를 전달하여 표준 training
        pred = model(X, y_input, tgt_mask)

        # Permute 를 수행하여 batch first
        pred = pred.permute(1, 2, 0)
        loss = loss_fn(pred, y_expected)

        opt.zero_grad()
        loss.backward()
        opt.step()

        total_loss += loss.detach().item()

    return total_loss / len(dataloader)

model.train()으로 모델을 학습 모드로 설정한다. dataloader에서 minibatch를 가져와 입력데이터(X)와 label(y)를 추출하고 앞서 구성한 transformer model을 이용하여 학습한다. loss를 구하고 계산된 loss를 사용하여 SGD로 모델의 파라미터를 업데이트하며 학습을 진행한다. 손실은 각 minibatch에 대한 평균으로 반환한다.

def validation_loop(model, loss_fn, dataloader):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for batch in dataloader:
            X, y = batch[:, 0], batch[:, 1]
            X, y = torch.tensor(X, dtype=torch.long, device=device), torch.tensor(y, dtype=torch.long, device=device)

            y_input = y[:,:-1]
            y_expected = y[:,1:]

            sequence_length = y_input.size(1)
            tgt_mask = model.get_tgt_mask(sequence_length).to(device)

            pred = model(X, y_input, tgt_mask)

            pred = pred.permute(1, 2, 0)
            loss = loss_fn(pred, y_expected)
            total_loss += loss.detach().item()

    return total_loss / len(dataloader)

model.eval()로 모델을 검증 모드로 설정한다. 이외의 과정은 학습 모드일 때와 동일하다.

def fit(model, opt, loss_fn, train_dataloader, val_dataloader, epochs):
    # plotting하기 위한 리스트 생성
    train_loss_list, validation_loss_list = [], []

    print("Training and validating model")
    for epoch in range(epochs):
        print("-"*25, f"Epoch {epoch + 1}","-"*25)

        train_loss = train_loop(model, opt, loss_fn, train_dataloader)
        train_loss_list += [train_loss]

        validation_loss = validation_loop(model, loss_fn, val_dataloader)
        validation_loss_list += [validation_loss]

        print(f"Training loss: {train_loss:.4f}")
        print(f"Validation loss: {validation_loss:.4f}")
        print()

    return train_loss_list, validation_loss_list

train_loss_list, validation_loss_list = fit(model, opt, loss_fn, train_dataloader, val_dataloader, 10)

각 epoch에서 학습 손실과 검증 손실이 모두 감소하고 있으며, 이는 모델이 점진적으로 학습 데이터에 더 잘 적합되고 있으며, 검증 데이터셋에 대한 일반화 성능 또한 향상되고 있다는 것을 확인할 수 있다.

import matplotlib.pyplot as plt

plt.plot(train_loss_list, label = "Train loss")
plt.plot(validation_loss_list, label = "Validation loss")
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss vs Epoch')
plt.legend()
plt.show()

def predict(model, input_sequence, max_length=15, SOS_token=2, EOS_token=3):
    model.eval()

    y_input = torch.tensor([[SOS_token]], dtype=torch.long, device=device)

    num_tokens = len(input_sequence[0])

    for _ in range(max_length):
        # Get source mask
        tgt_mask = model.get_tgt_mask(y_input.size(1)).to(device)

        pred = model(input_sequence, y_input, tgt_mask)

        next_item = pred.topk(1)[1].view(-1)[-1].item() # num with highest probability
        next_item = torch.tensor([[next_item]], device=device)

        # Concatenate previous input with predicted best word
        y_input = torch.cat((y_input, next_item), dim=1)

        # Stop if model predicts end of sentence
        if next_item.view(-1).item() == EOS_token:
            break

    return y_input.view(-1).tolist()


# Here we test some examples to observe how the model predicts
examples = [
    torch.tensor([[2, 0, 0, 0, 0, 0, 0, 0, 0, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 1, 1, 1, 1, 1, 1, 1, 1, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 1, 0, 1, 0, 1, 0, 1, 0, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 0, 1, 0, 1, 0, 1, 0, 1, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 3]], dtype=torch.long, device=device),
    torch.tensor([[2, 0, 1, 3]], dtype=torch.long, device=device)
]

for idx, example in enumerate(examples):
    result = predict(model, example)
    print(f"Example {idx}")
    print(f"Input: {example.view(-1).tolist()[1:-1]}")
    print(f"Continuation: {result[1:-1]}")
    print()

 

[출처1] 한양대학교 장준혁 교수님 인공지능개론 수업

[출처2] A detailed guide to PyTorch's nn.Transformer() module by Daniel Melchor

댓글