使用 CTC 模型实现语音识别
本文实现了一个基于CTC损失函数的自动语音识别系统,采用DeepSpeech2架构结合2D CNN和RNN。系统使用LJSpeech数据集进行训练和评估,通过词错误率衡量性能。文章详细介绍了数据处理流程、模型构建方法和训练过程,包括音频预处理、频谱图计算、CTC损失函数实现以及DeepSpeech2模型的搭建。项目提供了完整的代码实现,从数据加载、预处理到模型训练和评估,为构建端到端的语音识别系统

项目简介
本项目实现了一个基于连接主义时序分类 (CTC) 损失函数的自动语音识别系统。项目结合了2D CNN、RNN和CTC损失来构建ASR系统,类似于DeepSpeech2架构。
系统使用来自LibriVox项目的LJSpeech数据集进行训练和评估,通过词错误率(WER)来衡量模型性能。
环境设置
项目需要以下Python库:
import pandas as pd # 用于数据处理和分析
import numpy as np # 用于科学计算
import tensorflow as tf # 深度学习框架
from tensorflow import keras # Keras API,用于构建和训练深度学习模型
from tensorflow.keras import layers # Keras 层模块
import matplotlib.pyplot as plt # 用于数据可视化
from IPython import display # 用于在 Jupyter 中显示音频和其他媒体
from jiwer import wer # 用于计算词错误率 (Word Error Rate)
安装所需依赖:
pip install jiwer
加载LJSpeech数据集
项目使用LJSpeech数据集,包含13,100个音频文件。以下代码用于下载和解压数据集:
# LJSpeech 数据集下载链接
data_url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
# 使用 Keras 的工具函数下载并解压数据集
data_path = keras.utils.get_file("LJSpeech-1.1", data_url, untar=True)
# 音频文件所在路径
wavs_path = data_path + "/wavs/"
# 元数据文件路径
metadata_path = data_path + "/metadata.csv"
读取并解析元数据文件:
# 读取并解析元数据文件
metadata_df = pd.read_csv(metadata_path, sep="|", header=None, quoting=3)
# 为数据框添加列名
metadata_df.columns = ["file_name", "transcription", "normalized_transcription"]
# 只保留文件名和标准化转录文本两列
metadata_df = metadata_df[["file_name", "normalized_transcription"]]
# 打乱数据顺序并重置索引
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)
数据集按 90%:10% 的比例拆分为训练集和验证集:
# 按 90%:10% 的比例拆分训练集和验证集
split = int(len(metadata_df) * 0.90)
df_train = metadata_df[:split] # 训练集(前90%的数据)
df_val = metadata_df[split:] # 验证集(后10%的数据)
预处理
定义词汇表和字符映射层:
# 转录文本中接受的字符集
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]
# 字符到整数的映射层
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
# 整数到字符的反向映射层
num_to_char = keras.layers.StringLookup(
vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)
STFT参数设置:
# STFT 参数设置
# 窗长度(以样本数表示)
frame_length = 256
# 帧移(每次滑动的样本数)
frame_step = 160
# FFT 大小
fft_length = 384
数据处理函数实现:
def encode_single_sample(wav_file, label):
"""
处理单个音频样本及其标签的函数
"""
# 读取和处理音频
file = tf.io.read_file(wavs_path + wav_file + ".wav")
audio, _ = tf.audio.decode_wav(file)
audio = tf.squeeze(audio, axis=-1)
audio = tf.cast(audio, tf.float32)
# 计算短时傅里叶变换 (STFT) 得到频谱图
spectrogram = tf.signal.stft(
audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
)
# 处理频谱图
spectrogram = tf.abs(spectrogram)
spectrogram = tf.math.pow(spectrogram, 0.5)
# 归一化
means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
spectrogram = (spectrogram - means) / (stddevs + 1e-10)
# 处理标签
label = tf.strings.lower(label)
label = tf.strings.unicode_split(label, input_encoding="UTF-8")
label = char_to_num(label)
return spectrogram, label
创建Dataset对象
# 定义批次大小
batch_size = 32
# 创建训练数据集
train_dataset = tf.data.Dataset.from_tensor_slices(
(list(df_train["file_name"]), list(df_train["normalized_transcription"]))
)
# 对数据集应用转换、批量处理和预取优化
train_dataset = (
train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
.padded_batch(batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
# 创建验证数据集
validation_dataset = tf.data.Dataset.from_tensor_slices(
(list(df_val["file_name"]), list(df_val["normalized_transcription"]))
)
validation_dataset = (
validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
.padded_batch(batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
模型构建
定义CTC损失函数:
def CTCLoss(y_true, y_pred):
"""
计算连接主义时序分类 (CTC) 损失的函数
"""
# 获取批次大小、输入序列长度和标签序列长度
batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
# 为批次中的每个样本创建输入长度和标签长度的数组
input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
# 计算CTC批次成本
loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
return loss
构建DeepSpeech2模型:
def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):
"""
构建类似于DeepSpeech2的语音识别模型
"""
# 模型输入层
input_spectrogram = layers.Input((None, input_dim), name="input")
# 扩展维度以使用2D CNN
x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)
# 第一个卷积层
x = layers.Conv2D(
filters=32,
kernel_size=[11, 41],
strides=[2, 2],
padding="same",
use_bias=False,
name="conv_1",
)(x)
x = layers.BatchNormalization(name="conv_1_bn")(x)
x = layers.ReLU(name="conv_1_relu")(x)
# 第二个卷积层
x = layers.Conv2D(
filters=32,
kernel_size=[11, 21],
strides=[1, 2],
padding="same",
use_bias=False,
name="conv_2",
)(x)
x = layers.BatchNormalization(name="conv_2_bn")(x)
x = layers.ReLU(name="conv_2_relu")(x)
# 重塑结果体积以输入到RNN层
x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
# 堆叠多个双向GRU层
for i in range(1, rnn_layers + 1):
recurrent = layers.GRU(
units=rnn_units,
activation="tanh",
recurrent_activation="sigmoid",
use_bias=True,
return_sequences=True,
reset_after=True,
name=f"gru_{i}",
)
x = layers.Bidirectional(
recurrent, name=f"bidirectional_{i}", merge_mode="concat"
)(x)
if i < rnn_layers:
x = layers.Dropout(rate=0.5)(x)
# 全连接层
x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
x = layers.ReLU(name="dense_1_relu")(x)
x = layers.Dropout(rate=0.5)(x)
# 分类层,输出每个时间步的字符概率分布
output = layers.Dense(units=output_dim + 1, activation="softmax")(x)
# 构建模型
model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")
# 编译模型
opt = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=opt, loss=CTCLoss)
return model
创建模型实例:
# 创建模型
model = build_model(
input_dim=fft_length // 2 + 1, # 频谱图的频率轴长度
output_dim=char_to_num.vocabulary_size(), # 输出维度等于词汇表大小
rnn_units=512, # GRU单元数
)
模型训练
定义训练回调类:
class CallbackEval(keras.callbacks.Callback):
"""每轮训练结束后显示一批输出结果"""
def __init__(self, dataset):
super().__init__()
self.dataset = dataset
def on_epoch_end(self, epoch: int, logs=None):
predictions = []
targets = []
for batch in self.dataset:
X, y = batch
batch_predictions = model.predict(X)
batch_predictions = decode_batch_predictions(batch_predictions)
predictions.extend(batch_predictions)
for label in y:
label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
targets.append(label)
# 计算词错误率 (WER)
wer_score = wer(targets, predictions)
print(f"Word Error Rate: {wer_score:.4f}")
# 随机显示样本预测结果
for i in np.random.randint(0, len(predictions), 2):
print(f"Target : {targets[i]}")
print(f"Prediction: {predictions[i]}")
开始模型训练:
# 定义训练轮次数量
epochs = 1 # 注:实际应用中应训练约50个轮次或更多
# 创建验证回调函数
validation_callback = CallbackEval(validation_dataset)
# 训练模型
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=epochs,
callbacks=[validation_callback],
)
解码与推理
解码网络输出的实用函数:
def decode_batch_predictions(pred):
"""
将模型的预测结果解码为文本
"""
# 为批次中的每个样本创建输入长度数组
input_len = np.ones(pred.shape[0]) * pred.shape[1]
# 使用贪婪搜索解码
results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
# 遍历结果并将其转换回文本
output_text = []
for result in results:
result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
output_text.append(result)
return output_text
在验证集上评估模型:
# 在更多验证样本上检查结果
predictions = []
targets = []
for batch in validation_dataset:
X, y = batch
batch_predictions = model.predict(X)
batch_predictions = decode_batch_predictions(batch_predictions)
predictions.extend(batch_predictions)
for label in y:
label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
targets.append(label)
# 计算词错误率 (WER)
wer_score = wer(targets, predictions)
print(f"Word Error Rate: {wer_score:.4f}")
结论与性能
在实际应用中,使用GeForce RTX 2080 Ti GPU训练约50个轮次,每个轮次大约需要5-6分钟,最终模型可达到词错误率(WER)≈ 16%到17%。
以下是一些转录结果示例:
音频文件: LJ017-0009.wav
- 目标 : sir thomas overbury was undoubtedly poisoned by lord rochester in the reign of james the first
- 预测: cer thomas overbery was undoubtedly poisoned by lordrochester in the reign of james the first
音频文件: LJ003-0340.wav
- 目标 : the committee does not seem to have yet understood that newgate could be only and properly replaced
- 预测: the committee does not seem to have yet understood that newgate could be only and proberly replace
音频文件: LJ011-0136.wav
- 目标 : still no sentence of death was carried out for the offense and in eighteen thirtytwo
- 预测: still no sentence of death was carried out for the offense and in eighteen thirtytwo
预训练模型
示例可在HuggingFace上获取:
| 训练模型 | 演示 |
|---|---|
![]() |
![]() |
参考资料
完整代码
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer
data_url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
data_path = keras.utils.get_file("LJSpeech-1.1", data_url, untar=True)
wavs_path = data_path + "/wavs/"
metadata_path = data_path + "/metadata.csv"
metadata_df = pd.read_csv(metadata_path, sep="|", header=None, quoting=3)
metadata_df.columns = ["file_name", "transcription", "normalized_transcription"]
metadata_df = metadata_df[["file_name", "normalized_transcription"]]
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)
split = int(len(metadata_df) * 0.90)
df_train = metadata_df[:split]
df_val = metadata_df[split:]
print(f"Size of the training set: {len(df_train)}")
print(f"Size of the validation set: {len(df_val)}")
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
num_to_char = keras.layers.StringLookup(
vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)
print(
f"The vocabulary is: {char_to_num.get_vocabulary()} "
f"(size ={char_to_num.vocabulary_size()})"
)
frame_length = 256
frame_step = 160
fft_length = 384
def encode_single_sample(wav_file, label):
file = tf.io.read_file(wavs_path + wav_file + ".wav")
audio, _ = tf.audio.decode_wav(file)
audio = tf.squeeze(audio, axis=-1)
audio = tf.cast(audio, tf.float32)
spectrogram = tf.signal.stft(
audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
)
spectrogram = tf.abs(spectrogram)
spectrogram = tf.math.pow(spectrogram, 0.5)
means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
spectrogram = (spectrogram - means) / (stddevs + 1e-10)
label = tf.strings.lower(label)
label = tf.strings.unicode_split(label, input_encoding="UTF-8")
label = char_to_num(label)
return spectrogram, label
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices(
(list(df_train["file_name"]), list(df_train["normalized_transcription"]))
)
train_dataset = (
train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
.padded_batch(batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
validation_dataset = tf.data.Dataset.from_tensor_slices(
(list(df_val["file_name"]), list(df_val["normalized_transcription"]))
)
validation_dataset = (
validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
.padded_batch(batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
fig = plt.figure(figsize=(8, 5))
for batch in train_dataset.take(1):
spectrogram = batch[0][0].numpy()
spectrogram = np.array([np.trim_zeros(x) for x in np.transpose(spectrogram)])
label = batch[1][0]
label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
ax = plt.subplot(2, 1, 1)
ax.imshow(spectrogram, vmax=1)
ax.set_title(label)
ax.axis("off")
file = tf.io.read_file(wavs_path + list(df_train["file_name"])[0] + ".wav")
audio, _ = tf.audio.decode_wav(file)
audio = audio.numpy()
ax = plt.subplot(2, 1, 2)
plt.plot(audio)
ax.set_title("Signal Wave")
ax.set_xlim(0, len(audio))
display.display(display.Audio(np.transpose(audio), rate=16000))
plt.show()
def CTCLoss(y_true, y_pred):
batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
return loss
def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):
input_spectrogram = layers.Input((None, input_dim), name="input")
x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)
x = layers.Conv2D(
filters=32,
kernel_size=[11, 41],
strides=[2, 2],
padding="same",
use_bias=False,
name="conv_1",
)(x)
x = layers.BatchNormalization(name="conv_1_bn")(x)
x = layers.ReLU(name="conv_1_relu")(x)
x = layers.Conv2D(
filters=32,
kernel_size=[11, 21],
strides=[1, 2],
padding="same",
use_bias=False,
name="conv_2",
)(x)
x = layers.BatchNormalization(name="conv_2_bn")(x)
x = layers.ReLU(name="conv_2_relu")(x)
x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
for i in range(1, rnn_layers + 1):
recurrent = layers.GRU(
units=rnn_units,
activation="tanh",
recurrent_activation="sigmoid",
use_bias=True,
return_sequences=True,
reset_after=True,
name=f"gru_{i}",
)
x = layers.Bidirectional(
recurrent, name=f"bidirectional_{i}", merge_mode="concat"
)(x)
if i < rnn_layers:
x = layers.Dropout(rate=0.5)(x)
x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
x = layers.ReLU(name="dense_1_relu")(x)
x = layers.Dropout(rate=0.5)(x)
output = layers.Dense(units=output_dim + 1, activation="softmax")(x)
model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")
opt = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=opt, loss=CTCLoss)
return model
model = build_model(
input_dim=fft_length // 2 + 1,
output_dim=char_to_num.vocabulary_size(),
rnn_units=512,
)
model.summary(line_length=110)
def decode_batch_predictions(pred):
input_len = np.ones(pred.shape[0]) * pred.shape[1]
results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
output_text = []
for result in results:
result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
output_text.append(result)
return output_text
class CallbackEval(keras.callbacks.Callback):
def __init__(self, dataset):
super().__init__()
self.dataset = dataset
def on_epoch_end(self, epoch: int, logs=None):
predictions = []
targets = []
for batch in self.dataset:
X, y = batch
batch_predictions = model.predict(X)
batch_predictions = decode_batch_predictions(batch_predictions)
predictions.extend(batch_predictions)
for label in y:
label = (
tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
)
targets.append(label)
wer_score = wer(targets, predictions)
print("-" * 100)
print(f"Word Error Rate: {wer_score:.4f}")
print("-" * 100)
for i in np.random.randint(0, len(predictions), 2):
print(f"Target : {targets[i]}")
print(f"Prediction: {predictions[i]}")
print("-" * 100)
epochs = 1
validation_callback = CallbackEval(validation_dataset)
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=epochs,
callbacks=[validation_callback],
)
predictions = []
targets = []
for batch in validation_dataset:
X, y = batch
batch_predictions = model.predict(X)
batch_predictions = decode_batch_predictions(batch_predictions)
predictions.extend(batch_predictions)
for label in y:
label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
targets.append(label)
wer_score = wer(targets, predictions)
print("-" * 100)
print(f"Word Error Rate: {wer_score:.4f}")
print("-" * 100)
for i in np.random.randint(0, len(predictions), 5):
print(f"Target : {targets[i]}")
print(f"Prediction: {predictions[i]}")
print("-" * 100)
更多推荐


所有评论(0)