1. seq2seq
一般情况下,encoder 为RNN网络,decoder 也为RNN网络
给定句子对<Source, Target>, 输入为Source = <x1, x2, , xn>, Target=<y1, y2, , yn>
经过Encoder编码输出为C=F(x1, x2, , xn)
解码过程为:
yi = G(C, y1, y2, , yi-1)
class Seq2Seq(nn.Module):
def __init__(self):
super(Seq2Seq, self).__init__()
self.enc_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
self.dec_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
self.fc = nn.Linear(n_hidden, n_class)
def forward(self, enc_input, enc_hidden, dec_input):
enc_input = enc_input.transpose(0, 1) # enc_input: [max_len(=n_step, time step), batch_size, n_class]
dec_input = dec_input.transpose(0, 1) # dec_input: [max_len(=n_step, time step), batch_size, n_class]
# enc_states : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
_, enc_states = self.enc_cell(enc_input, enc_hidden)
# outputs : [max_len+1(=6), batch_size, num_directions(=1) * n_hidden(=128)]
outputs, _ = self.dec_cell(dec_input, enc_states)
model = self.fc(outputs) # model : [max_len+1(=6), batch_size, n_class]
return model
心得: 注意在seq2seq中,需要对输入单元长度进行规定,不足max_len需要进行补全
Decoder的输入为Encoder隐层的状态enc_states,训练过程中输入为targets,测过程需要进行初始化。
2. attention 机制
隐层的hidden_state实际上记录了在单词在上下文中的语义,可以用来消歧
传统的seq2seq中丢弃了编码器的中间状态,而仅使用最终输出,在句子长度比较长时,会导致开始的输入被忽略。
注意力机制:即将“注意力”集中与对应的或者相关的部分,通过gradient-descent 和back_propogation学习“映射”
在每个时间步,解码器的输入为解码器的当前状态以及编码器的隐藏状态,需要执行以下操作:
2.1 计算每个编码器状态的分数
使用每个编码器以及解码器的当前状态(即最后一个编码器),训练一个前馈神经网络,得到状态<h1, h2, h3, , h5>对应的分数<s1, s2, s3, , ,s5>
目的是寻找所有与目标值相关的状态
2.2 计算每个编码器状态的权重
利用softmax计算分数权重e1, e2, , , e5
2.3 计算上下文向量
context_score = e1 * h1 + e2 * h2 +...... +e5 * h5
2.4 将上下文向量与前一时间输出相连
合并向量送至解码器
在每个时间步,
基于解码器上一个时间步隐藏状态和匹配的输入,得到当前输出,
计算当前输出与编码器输出的匹配程度,得到权重,进一步计算上下文向量
再将上下文向量与输出相连
for i in range(n_step): # each time step
# dec_output : [n_step(=1), batch_size(=1), num_directions(=1) * n_hidden]
# hidden : [num_layers(=1) * num_directions(=1), batch_size(=1), n_hidden]
dec_output, hidden = self.dec_cell(dec_inputs[i].unsqueeze(0), hidden)
attn_weights = self.get_att_weight(dec_output, enc_outputs) # attn_weights : [1, 1, n_step]
trained_attn.append(attn_weights.squeeze().data.numpy())
# matrix-matrix product of matrices [1,1,n_step] x [1,n_step,n_hidden] = [1,1,n_hidden]
context = attn_weights.bmm(enc_outputs.transpose(0, 1))
dec_output = dec_output.squeeze(0) # dec_output : [batch_size(=1), num_directions(=1) * n_hidden]
context = context.squeeze(1) # [1, num_directions(=1) * n_hidden]
model[i] = self.out(torch.cat((dec_output, context), 1))
2.5 解码输出
代码
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
import matplotlib.pyplot as plt
dtype = torch.FloatTensor
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# Parameter
n_hidden = 128
class Attention(nn.Module):
def __init__(self):
super(Attention, self).__init__()
self.enc_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
self.dec_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
# Linear for attention
self.attn = nn.Linear(n_hidden, n_hidden)
self.out = nn.Linear(n_hidden * 2, n_class)
def forward(self, enc_inputs, hidden, dec_inputs):
enc_inputs = enc_inputs.transpose(0, 1) # enc_inputs: [n_step(=n_step, time step), batch_size, n_class]
dec_inputs = dec_inputs.transpose(0, 1) # dec_inputs: [n_step(=n_step, time step), batch_size, n_class]
# enc_outputs : [n_step, batch_size, num_directions(=1) * n_hidden], matrix F
# enc_hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
enc_outputs, enc_hidden = self.enc_cell(enc_inputs, hidden)
trained_attn = []
hidden = enc_hidden
n_step = len(dec_inputs)
model = Variable(torch.empty([n_step, 1, n_class]))
for i in range(n_step): # each time step
# dec_output : [n_step(=1), batch_size(=1), num_directions(=1) * n_hidden]
# hidden : [num_layers(=1) * num_directions(=1), batch_size(=1), n_hidden]
dec_output, hidden = self.dec_cell(dec_inputs[i].unsqueeze(0), hidden)
attn_weights = self.get_att_weight(dec_output, enc_outputs) # attn_weights : [1, 1, n_step]
trained_attn.append(attn_weights.squeeze().data.numpy())
# matrix-matrix product of matrices [1,1,n_step] x [1,n_step,n_hidden] = [1,1,n_hidden]
context = attn_weights.bmm(enc_outputs.transpose(0, 1))
dec_output = dec_output.squeeze(0) # dec_output : [batch_size(=1), num_directions(=1) * n_hidden]
context = context.squeeze(1) # [1, num_directions(=1) * n_hidden]
model[i] = self.out(torch.cat((dec_output, context), 1))
# make model shape [n_step, n_class]
return model.transpose(0, 1).squeeze(0), trained_attn
def get_att_weight(self, dec_output, enc_outputs): # get attention weight one 'dec_output' with 'enc_outputs'
n_step = len(enc_outputs)
attn_scores = Variable(torch.zeros(n_step)) # attn_scores : [n_step]
for i in range(n_step):
attn_scores[i] = self.get_att_score(dec_output, enc_outputs[i])
# Normalize scores to weights in range 0 to 1
return F.softmax(attn_scores).view(1, 1, -1)
def get_att_score(self, dec_output, enc_output): # enc_outputs [batch_size, num_directions(=1) * n_hidden]
score = self.attn(enc_output) # score : [batch_size, n_hidden]
return torch.dot(dec_output.view(-1), score.view(-1)) # inner product make scalar value
训练
input_batch, output_batch, target_batch = make_batch(sentences)
# hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
hidden = Variable(torch.zeros(1, 1, n_hidden))
model = Attention()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Train
for epoch in range(100):
optimizer.zero_grad()
output, _ = model(input_batch, hidden, output_batch)
loss = criterion(output, target_batch.squeeze(0))
if (epoch + 1) % 400 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
# Test
test_batch = [np.eye(n_class)[[word_dict[n] for n in 'SPPPP']]]
test_batch = Variable(torch.Tensor(test_batch))
predict, trained_attn = model(input_batch, hidden, test_batch)
predict = predict.data.max(1, keepdim=True)[1]
print(sentences[0], '->', [number_dict[n.item()] for n in predict.squeeze()])
# Show Attention
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot(1, 1, 1)
ax.matshow(trained_attn, cmap='viridis')
ax.set_xticklabels([''] + sentences[0].split(), fontdict={'fontsize': 14})
ax.set_yticklabels([''] + sentences[2].split(), fontdict={'fontsize': 14})
plt.show()
3. BilSTM Attention 机制
3.1 model
注意双向LSTM的输出为两倍
[batch_num, seq_len, hidden_size] -> [batch_size, seq_len * direction, hidden_size]
class BiLSTM_Attention(nn.Module):
def __init__(self):
super(BiLSTM_Attention, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, n_hidden, bidirectional=True, batch_first=True)
self.out = nn.Linear(n_hidden * 2, num_classes)
# lstm_output : [batch_size, n_step, n_hidden * num_directions(=2)], F matrix
def attention_net(self, lstm_output, final_state):
hidden = final_state.view(-1, n_hidden * 2, 1) # hidden : [batch_size, n_hidden * num_directions(=2), 1(=n_layer)]
attn_weights = torch.bmm(lstm_output, hidden).squeeze(2) # attn_weights : [batch_size, n_step]
soft_attn_weights = F.softmax(attn_weights, 1)
# [batch_size, n_hidden * num_directions(=2), n_step] * [batch_size, n_step, 1] = [batch_size, n_hidden * num_directions(=2), 1]
context = torch.bmm(lstm_output.transpose(1, 2), soft_attn_weights.unsqueeze(2)).squeeze(2)
return context, soft_attn_weights.data.numpy() # context : [batch_size, n_hidden * num_directions(=2)]
def forward(self, X, batch_size):
input = self.embedding(X) # input : [batch_size, len_seq, embedding_dim]
#input = input.permute(1, 0, 2) # input : [len_seq, batch_size, embedding_dim]
hidden_state = Variable(torch.zeros(1*2, batch_size, n_hidden)) # [num_layers(=1) * num_directions(=2), batch_size, n_hidden]
cell_state = Variable(torch.zeros(1*2, batch_size, n_hidden)) # [num_layers(=1) * num_directions(=2), batch_size, n_hidden]
# final_hidden_state, final_cell_state : [num_layers(=1) * num_directions(=2), batch_size, n_hidden]
output, (final_hidden_state, final_cell_state) = self.lstm(input, (hidden_state, cell_state))
#output = output.permute(1, 0, 2) # output : [batch_size, len_seq, n_hidden]
attn_output, attention = self.attention_net(output, final_hidden_state)
return self.out(attn_output), attention # model : [batch_size, num_classes], attention : [batch_size, n_step]
3.1 train
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import matplotlib.pyplot as plt
dtype = torch.FloatTensor
# Bi-LSTM(Attention) Parameters
embedding_dim = 2
n_hidden = 5 # number of hidden units in one cell
num_classes = 2 # 0 or 1
# 3 words sentences (=sequence_length is 3)
sentences = ["i love you", "he loves me", "she likes baseball", "i hate you", "sorry for that", "this is awful"]
labels = [1, 1, 1, 0, 0, 0] # 1 is good, 0 is not good.
word_list = " ".join(sentences).split()
word_list = list(set(word_list))
word_dict = {w: i for i, w in enumerate(word_list)}
vocab_size = len(word_dict)
inputs = []
for sen in sentences:
inputs.append(np.asarray([word_dict[n] for n in sen.split()]))
targets = []
for out in labels:
targets.append(out) # To using Torch Softmax Loss function
input_batch = Variable(torch.LongTensor(inputs))
target_batch = Variable(torch.LongTensor(targets))
model = BiLSTM_Attention()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training
for epoch in range(500):
optimizer.zero_grad()
output, attention = model(input_batch, batch_size=6)
loss = criterion(output, target_batch)
if (epoch + 1) % 100 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
# Test
test_text = 'sorry hate you'
tests = [np.asarray([word_dict[n] for n in test_text.split()])]
test_batch = Variable(torch.LongTensor(tests))
# Predict
predict, _ = model(test_batch, 1)
predict = predict.data.max(1, keepdim=True)[1]
if predict[0][0] == 0:
print(test_text,"is Bad Mean...")
else:
print(test_text,"is Good Mean!!")
fig = plt.figure(figsize=(6, 3)) # [batch_size, n_step]
ax = fig.add_subplot(1, 1, 1)
ax.matshow(attention, cmap='viridis')
ax.set_xticklabels(['']+['first_word', 'second_word', 'third_word'], fontdict={'fontsize': 14}, rotation=90)
ax.set_yticklabels(['']+['batch_1', 'batch_2', 'batch_3', 'batch_4', 'batch_5', 'batch_6'], fontdict={'fontsize': 14})
plt.show()
报错:IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)
原因通常为loss = criterion(out, target) 输入参数放反