一个基于Transformer模型的中文问答系统926.1
这个代码实现了一个基于Transformer模型的中文问答系统。
·
这个代码实现了一个基于Transformer模型的中文问答系统。以下是代码的主要功能和可能的完善方向:
主要功能
- 数据处理:代码首先定义了处理中文文本的函数,包括分词、构建词汇表、将句子转换为张量等。
- 数据加载:从.jsonl或.json文件中加载问题和答案数据,并进行数据增强。
- 模型定义:定义了Transformer模型,包括编码器、解码器和位置编码。
- 训练过程:使用PyTorch进行模型训练,包括动态调整批处理大小和隐藏层大小以适应GPU内存限制。
- 预测功能:实现了一个预测函数,用于生成对输入问题的答案。
- 图形界面:使用Tkinter创建了一个简单的图形用户界面,用户可以输入问题并查看生成的答案。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import random
import tkinter as tk
import jieba
import matplotlib.pyplot as plt
import os
import json
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from torch.cuda.amp import GradScaler, autocast
from nltk.translate.bleu_score import corpus_bleu
from rouge import Rouge
# 特殊标记
PAD_TOKEN = "<PAD>"
UNK_TOKEN = "<UNK>"
SOS_TOKEN = "<SOS>"
EOS_TOKEN = "<EOS>"
# 中文词汇表和索引映射
word2index = {PAD_TOKEN: 0, UNK_TOKEN: 1, SOS_TOKEN: 2, EOS_TOKEN: 3}
index2word = {0: PAD_TOKEN, 1: UNK_TOKEN, 2: SOS_TOKEN, 3: EOS_TOKEN}
# 使用 jieba 进行中文分词
def tokenize_chinese(sentence):
tokens = jieba.lcut(sentence)
return tokens
# 构建词汇表
def build_vocab(sentences):
global word2index, index2word
vocab_size = len(word2index)
for sentence in sentences:
for token in tokenize_chinese(sentence):
if token not in word2index:
word2index[token] = vocab_size
index2word[vocab_size] = token
vocab_size += 1
return vocab_size
# 将句子转换为张量
def sentence_to_tensor(sentence, max_length=50):
tokens = tokenize_chinese(sentence)
indices = [word2index.get(token, word2index[UNK_TOKEN]) for token in tokens]
indices = [word2index[SOS_TOKEN]] + indices + [word2index[EOS_TOKEN]]
indices += [word2index[PAD_TOKEN]] * (max_length - len(indices))
return torch.tensor(indices, dtype=torch.long), len(indices)
# 读取 .jsonl 和 .json 文件中的数据
def load_data(file_path):
if file_path.endswith('.jsonl'):
with open(file_path, 'r', encoding='utf-8') as f:
lines = [json.loads(line) for line in f.readlines()]
elif file_path.endswith('.json'):
with open(file_path, 'r', encoding='utf-8') as f:
lines = json.load(f)
else:
raise ValueError("不支持的文件格式。请使用 .jsonl 或 .json。")
questions = [line['question'] for line in lines]
answers = [random.choice(line['human_answers'] + line['chatgpt_answers']) for line in lines]
return questions, answers
# 数据增强函数
def data_augmentation(sentence):
tokens = tokenize_chinese(sentence)
augmented_sentence = []
# 随机插入
if random.random() < 0.1:
insert_token = random.choice(list(word2index.keys())[4:]) # 避免插入特殊标记
insert_index = random.randint(0, len(tokens))
tokens.insert(insert_index, insert_token)
# 随机删除
if random.random() < 0.1 and len(tokens) > 1:
delete_index = random.randint(0, len(tokens) - 1)
del tokens[delete_index]
# 随机交换
if len(tokens) > 1 and random.random() < 0.1:
index1, index2 = random.sample(range(len(tokens)), 2)
tokens[index1], tokens[index2] = tokens[index2], tokens[index1]
# 同义词替换
if random.random() < 0.1:
for i in range(len(tokens)):
if random.random() < 0.1:
synonyms = get_synonyms(tokens[i])
if synonyms:
tokens[i] = random.choice(synonyms)
# 语义保持的句子重写
if random.random() < 0.1:
tokens = rewrite_sentence(tokens)
augmented_sentence = ''.join(tokens)
return augmented_sentence
# 获取同义词
def get_synonyms(word):
# 这里可以使用外部库或API来获取同义词
return []
# 语义保持的句子重写
def rewrite_sentence(tokens):
# 这里可以使用外部库或API来进行句子重写
return tokens
# 定义数据集
class ChatDataset(Dataset):
def __init__(self, questions, answers):
self.questions = questions
self.answers = answers
def __len__(self):
return len(self.questions)
def __getitem__(self, idx):
input_tensor, input_length = sentence_to_tensor(self.questions[idx])
target_tensor, target_length = sentence_to_tensor(self.answers[idx])
return input_tensor, target_tensor, input_length, target_length
# 自定义 collate 函数
def collate_fn(batch):
inputs, targets, input_lengths, target_lengths = zip(*batch)
inputs = nn.utils.rnn.pad_sequence(inputs, batch_first=True, padding_value=word2index[PAD_TOKEN])
targets = nn.utils.rnn.pad_sequence(targets, batch_first=True, padding_value=word2index[PAD_TOKEN])
return inputs, targets, torch.tensor(input_lengths), torch.tensor(target_lengths)
# 创建数据集和数据加载器
def create_dataset_and_dataloader(questions_file, answers_file, batch_size=10, shuffle=True, split_ratio=0.8):
questions, answers = load_data(questions_file)
vocab_size = build_vocab(questions + answers)
dataset = ChatDataset(questions, answers)
# 分割数据集
train_size = int(split_ratio * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
return train_dataset, train_dataloader, val_dataset, val_dataloader, vocab_size
# 定义模型结构
class TransformerModel(nn.Module):
def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1):
super(TransformerModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.positional_encoding = nn.Parameter(torch.zeros(1, 50, d_model))
self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
self.fc_out = nn.Linear(d_model, vocab_size)
self.d_model = d_model
def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
src = self.embedding(src) * math.sqrt(self.d_model) + self.positional_encoding[:, :src.size(1), :]
tgt = self.embedding(tgt) * math.sqrt(self.d_model) + self.positional_encoding[:, :tgt.size(1), :]
output = self.transformer(src, tgt, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask)
output = self.fc_out(output)
return output
# 生成掩码
def generate_square_subsequent_mask(sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
# 动态调整 batch_size 和 hidden_size
def adjust_batch_size_and_hidden_size(initial_batch_size, initial_hidden_size, dataloader, vocab_size):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
max_memory = torch.cuda.get_device_properties(device).total_memory
current_memory = 0
batch_size = initial_batch_size
hidden_size = initial_hidden_size
while True:
try:
model = TransformerModel(vocab_size, hidden_size, 8, 6, 6, 2048, 0.1).to(device)
dataloader = DataLoader(dataloader.dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
inputs, targets, input_lengths, target_lengths = next(iter(dataloader))
inputs, targets = inputs.to(device), targets.to(device)
input_lengths = input_lengths.cpu().clone().detach()
target_lengths = target_lengths.cpu().clone().detach()
src_mask = generate_square_subsequent_mask(inputs.size(1)).to(device)
tgt_mask = generate_square_subsequent_mask(targets.size(1)).to(device)
src_padding_mask = (inputs == word2index[PAD_TOKEN]).to(device)
tgt_padding_mask = (targets == word2index[PAD_TOKEN]).to(device)
outputs = model(inputs, targets, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask)
current_memory = torch.cuda.memory_allocated(device)
current_utilization = torch.cuda.utilization(device)
if current_memory < max_memory * 0.7 and current_utilization < 70:
batch_size *= 2
hidden_size *= 2
else:
break
except RuntimeError:
batch_size //= 2
hidden_size //= 2
if batch_size < 1 or hidden_size < 1:
break
return batch_size, hidden_size
# 实例化模型和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_dataset, train_dataloader, val_dataset, val_dataloader, vocab_size = create_dataset_and_dataloader('data.jsonl', 'data.jsonl')
initial_batch_size = 80
initial_hidden_size = 256
batch_size, hidden_size = adjust_batch_size_and_hidden_size(initial_batch_size, initial_hidden_size, train_dataloader, vocab_size)
print(f"最优批处理大小: {batch_size}, 最优隐藏层大小: {hidden_size}")
model = TransformerModel(vocab_size, hidden_size, 8, 6, 6, 2048, 0.1).to(device)
# 加载预训练模型和分词器
model_path = './models/model.pth'
tokenizer_path = './models/tokenizer.pth'
if os.path.exists(model_path) and os.path.exists(tokenizer_path):
print("加载现有的模型和分词器...")
model = torch.load(model_path, map_location=device)
tokenizer = torch.load(tokenizer_path, map_location=device)
word2index = tokenizer['word2index']
index2word = tokenizer['index2word']
else:
print("创建新的模型和分词器...")
# 训练模型
def train(model, train_dataloader, val_dataloader, num_epochs, learning_rate=0.001, save_path='model.pth'):
criterion = nn.CrossEntropyLoss(ignore_index=word2index[PAD_TOKEN])
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)
scaler = GradScaler()
loss_values = []
best_val_loss = float('inf')
patience = 5
no_improvement_count = 0
gradient_accumulation_steps = 4 # 梯度累积步数
for epoch in range(num_epochs):
model.train()
total_loss = 0
for i, (inputs, targets, input_lengths, target_lengths) in enumerate(train_dataloader):
inputs, targets = inputs.to(device), targets.to(device)
input_lengths = input_lengths.cpu().clone().detach()
target_lengths = target_lengths.cpu().clone().detach()
src_mask = generate_square_subsequent_mask(inputs.size(1)).to(device)
tgt_mask = generate_square_subsequent_mask(targets.size(1)).to(device)
src_padding_mask = (inputs == word2index[PAD_TOKEN]).to(device)
tgt_padding_mask = (targets == word2index[PAD_TOKEN]).to(device)
optimizer.zero_grad()
with autocast():
outputs = model(inputs, targets, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask)
outputs = outputs.view(-1, vocab_size) # Reshape outputs to [batch_size * max_target_len, vocab_size]
targets = targets.view(-1) # Reshape targets to [batch_size * max_target_len]
loss = criterion(outputs, targets)
scaler.scale(loss).backward()
if (i + 1) % gradient_accumulation_steps == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
total_loss += loss.item()
print(f"Batch [{i + 1}/{len(train_dataloader)}], Loss: {loss.item():.20f}, Outputs shape: {outputs.shape}, Targets shape: {targets.shape}")
avg_loss = total_loss / len(train_dataloader)
loss_values.append(avg_loss)
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.20f}")
# 验证
model.eval()
with torch.no_grad():
val_loss = 0
correct_predictions = 0
total_samples = 0
references = []
hypotheses = []
for inputs, targets, input_lengths, target_lengths in val_dataloader:
inputs, targets = inputs.to(device), targets.to(device)
input_lengths = input_lengths.cpu().clone().detach()
target_lengths = target_lengths.cpu().clone().detach()
src_mask = generate_square_subsequent_mask(inputs.size(1)).to(device)
tgt_mask = generate_square_subsequent_mask(targets.size(1)).to(device)
src_padding_mask = (inputs == word2index[PAD_TOKEN]).to(device)
tgt_padding_mask = (targets == word2index[PAD_TOKEN]).to(device)
outputs = model(inputs, targets, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask)
outputs = outputs.view(-1, vocab_size) # Reshape outputs to [batch_size * max_target_len, vocab_size]
targets = targets.view(-1) # Reshape targets to [batch_size * max_target_len]
loss = criterion(outputs, targets)
val_loss += loss.item()
predicted_indices = outputs.argmax(dim=1)
for pred, target, target_len in zip(predicted_indices, targets, target_lengths):
if target_len > 0: # 确保 target_len 不为 0
pred = pred[:target_len]
target = target[:target_len]
correct = (pred == target).all().item()
if correct:
correct_predictions += 1
total_samples += 1
references.append([[index2word[t.item()] for t in target]])
hypotheses.append([index2word[p.item()] for p in pred])
val_accuracy = correct_predictions / total_samples if total_samples > 0 else 0
bleu_score = corpus_bleu(references, hypotheses)
rouge = Rouge()
rouge_scores = rouge.get_scores([' '.join(h) for h in hypotheses], [' '.join(r[0]) for r in references])
rouge_1_f1 = sum([s['rouge-1']['f'] for s in rouge_scores]) / len(rouge_scores)
print(f"验证 Loss: {val_loss / len(val_dataloader):.20f}, 验证 Accuracy: {val_accuracy:.20f}, BLEU: {bleu_score:.20f}, ROUGE-1 F1: {rouge_1_f1:.20f}")
if val_loss < best_val_loss:
best_val_loss = val_loss
no_improvement_count = 0
torch.save(model, save_path)
print("保存新的最佳模型。")
else:
no_improvement_count += 1
if no_improvement_count >= patience:
scheduler.step(val_loss)
no_improvement_count = 0
plt.plot(range(1, num_epochs + 1), loss_values)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练 Loss 曲线')
plt.show()
# 创建 tokenizer 字典
tokenizer = {'word2index': word2index, 'index2word': index2word}
# 保存分词器
def save_tokenizer(tokenizer, save_path='tokenizer.pth'):
torch.save(tokenizer, save_path)
# 训练模型并保存
train(model, train_dataloader, val_dataloader, num_epochs=5, save_path='./models/model.pth')
# 保存分词器
save_tokenizer(tokenizer, save_path='./models/tokenizer.pth')
# 预测函数
def predict(question, max_length=100):
model.eval()
with torch.no_grad():
input_tensor, input_length = sentence_to_tensor(question)
input_tensor = input_tensor.unsqueeze(0).to(device)
input_length = [input_length]
src_mask = generate_square_subsequent_mask(input_tensor.size(1)).to(device)
src_padding_mask = (input_tensor == word2index[PAD_TOKEN]).to(device)
memory = model.transformer.encoder(model.embedding(input_tensor) * math.sqrt(model.d_model) + model.positional_encoding[:, :input_tensor.size(1), :], src_mask, src_padding_mask)
decoder_input = torch.tensor([[word2index[SOS_TOKEN]]], device=device)
decoded_words = []
last_word = None
for _ in range(max_length): # 设置一个较大的最大长度来避免潜在的循环
tgt_mask = generate_square_subsequent_mask(decoder_input.size(1)).to(device)
tgt_padding_mask = (decoder_input == word2index[PAD_TOKEN]).to(device)
decoder_output = model.transformer.decoder(model.embedding(decoder_input) * math.sqrt(model.d_model) + model.positional_encoding[:, :decoder_input.size(1), :], memory, tgt_mask, tgt_mask, None, tgt_padding_mask)
decoder_output = model.fc_out(decoder_output[:, -1, :])
top1 = decoder_output.argmax(1).item()
if top1 == word2index[EOS_TOKEN]:
break
elif top1 != word2index[PAD_TOKEN] and top1 != word2index[UNK_TOKEN] and (last_word is None or top1 != last_word):
decoded_words.append(index2word[top1])
last_word = top1
else:
last_word = None
decoder_input = torch.cat([decoder_input, torch.tensor([[top1]], device=device)], dim=1)
return ''.join(decoded_words)
# 创建图形界面
def on_predict():
question = question_entry.get()
if question.strip() == "":
result_label.config(text="请输入有效的问题。")
return
try:
answer = predict(question)
# 对答案进行简单的后处理,去除多余空格
answer = " ".join(answer.split())
result_label.config(text=f'答案: {answer}')
conversation_text.insert(tk.END, f"问: {question}\n答: {answer}\n\n")
except Exception as e:
result_label.config(text=f"生成答案时发生错误: {str(e)}")
def on_clear():
question_entry.delete(0, 'end')
result_label.config(text="")
conversation_text.delete(1.0, tk.END)
# 创建主窗口
root = tk.Tk()
root.title("羲和")
root.geometry("600x600") # 设置窗口大小
# 输入框
question_label = tk.Label(root, text="请输入你的问题:", font=("Arial", 14))
question_label.pack(pady=10)
question_entry = tk.Entry(root, width=50, font=("Arial", 12))
question_entry.pack(pady=10)
# 按钮框架
button_frame = tk.Frame(root)
button_frame.pack(pady=10)
# 生成按钮
generate_button = tk.Button(button_frame, text="生成答案", command=on_predict, font=("Arial", 12), bg="#4CAF50", fg="white")
generate_button.pack(side=tk.LEFT, padx=10)
# 清除按钮
clear_button = tk.Button(button_frame, text="清除", command=on_clear, font=("Arial", 12), bg="#F44336", fg="white")
clear_button.pack(side=tk.LEFT, padx=10)
# 结果标签
result_label = tk.Label(root, text="", font=("Arial", 12), wraplength=500, justify="left")
result_label.pack(pady=20)
# 对话记录文本框
conversation_label = tk.Label(root, text="对话记录:", font=("Arial", 14))
conversation_label.pack(pady=10)
conversation_text = tk.Text(root, height=10, width=50, font=("Arial", 12), yscrollcommand=True)
conversation_text.pack(pady=10)
# 添加提示信息
tip_label = tk.Label(root, text="提示:本模型可能存在一定的局限性,答案仅供参考。", font=("Arial", 10), fg="gray")
tip_label.pack(pady=10)
question_entry.focus_set() # 生成答案后自动选中输入框
# 主事件循环
root.mainloop()
# 在程序结束时释放 GPU 内存
if torch.cuda.is_available():
torch.cuda.empty_cache()
完善方向
- 数据增强:当前的数据增强功能较为基础,可以考虑引入更复杂的增强策略,如同义词替换、语义保持的句子重写等。
- 模型优化:可以尝试使用更先进的模型结构,如Transformer-XL、XLNet等,或者使用预训练模型进行微调。
- 训练策略:当前的训练策略较为基础,可以考虑引入更复杂的训练策略,如混合精度训练、分布式训练等。
- 评估指标:当前的评估仅使用了准确率,可以考虑引入更多评估指标,如BLEU、ROUGE等,以更全面地评估模型性能。
- 用户界面:当前的用户界面较为简单,可以考虑增加更多功能,如历史对话记录、问题分类等。
- 错误处理:当前的错误处理较为简单,可以考虑增加更详细的错误信息和处理策略。
- 性能优化:可以考虑优化代码性能,如使用更高效的分词器、优化数据加载和处理过程等。
更多推荐
所有评论(0)