下面我将详细介绍使用 PyTorch 处理 MNIST 手写数字数据集的完整流程,包括数据加载、模型定义、训练和评估,并解释每一行代码的含义和注意事项。

整个流程可以分为五个主要步骤:准备工作、数据加载与预处理、模型定义、模型训练和模型评估

# MNIST手写数字数据集完整处理流程
# 包含数据加载、模型定义、训练和评估的全步骤

# 1. 导入必要的库
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

# 2. 设置超参数
batch_size = 64       # 每次训练的样本数量
learning_rate = 0.001 # 学习率
num_epochs = 5        # 训练轮数
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 注意:如果有GPU,会使用cuda加速训练,否则使用CPU

# 3. 数据预处理与加载
# 定义数据变换:将图像转为Tensor并标准化
transform = transforms.Compose([
    transforms.ToTensor(),  # 转换为Tensor格式,像素值从0-255归一化到0-1
    # 标准化处理:均值为0.1307,标准差为0.3081(MNIST数据集的统计特性)
    transforms.Normalize((0.1307,), (0.3081,))
])

# 加载训练集
train_dataset = datasets.MNIST(
    root='./data',        # 数据保存路径
    train=True,           # True表示加载训练集
    download=True,        # 如果数据不存在则自动下载
    transform=transform   # 应用上面定义的数据变换
)

# 加载测试集
test_dataset = datasets.MNIST(
    root='./data',
    train=False,          # False表示加载测试集
    download=True,
    transform=transform
)

# 创建数据加载器,用于批量加载数据
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True          # 训练时打乱数据顺序
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False         # 测试时不需要打乱顺序
)

# 4. 可视化样本数据(可选,用于理解数据)
def show_samples():
    # 获取一些随机的训练样本
    dataiter = iter(train_loader)
    images, labels = next(dataiter)
    
    # 显示6个样本
    plt.figure(figsize=(10, 4))
    for i in range(6):
        plt.subplot(1, 6, i+1)
        plt.imshow(images[i].numpy().squeeze(), cmap='gray')
        plt.title(f'Label: {labels[i].item()}')
        plt.axis('off')
    plt.show()

# 调用函数显示样本
show_samples()

# 5. 定义神经网络模型
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # 第一个卷积块:卷积层 + 激活函数 + 池化层
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # 第二个卷积块
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # 全连接层
        self.fc1 = nn.Linear(7 * 7 * 64, 128)  # 经过两次池化后,28x28变为7x7
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(128, 10)  # 10个输出,对应0-9十个数字
        
    def forward(self, x):
        # 前向传播过程
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = x.view(-1, 7 * 7 * 64)  # 展平操作
        x = self.relu3(self.fc1(x))
        x = self.fc2(x)
        return x

# 初始化模型并移动到设备上
model = SimpleCNN().to(device)

# 6. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()  # 交叉熵损失,适合分类问题
optimizer = optim.Adam(model.parameters(), lr=learning_rate)  # Adam优化器

# 7. 训练模型
def train_model():
    # 记录训练过程中的损失和准确率
    train_losses = []
    train_accuracies = []
    
    # 开始训练
    model.train()  # 设置为训练模式
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        
        # 遍历训练数据
        for i, (images, labels) in enumerate(train_loader):
            # 将数据移动到设备上
            images = images.to(device)
            labels = labels.to(device)
            
            # 清零梯度
            optimizer.zero_grad()
            
            # 前向传播
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # 反向传播和优化
            loss.backward()  # 计算梯度
            optimizer.step()  # 更新参数
            
            # 统计损失和准确率
            running_loss += loss.item()
            
            # 计算预测结果
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            # 每100个批次打印一次信息
            if (i + 1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], '
                      f'Loss: {running_loss/100:.4f}, Accuracy: {100*correct/total:.2f}%')
                running_loss = 0.0
        
        # 记录每个epoch的平均损失和准确率
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_acc)
        print(f'Epoch [{epoch+1}/{num_epochs}] completed. Training Accuracy: {epoch_acc:.2f}%')
    
    print('训练完成!')
    return train_losses, train_accuracies

# 调用训练函数
train_losses, train_accuracies = train_model()

# 8. 绘制训练曲线
def plot_training_curves(losses, accuracies):
    plt.figure(figsize=(12, 5))
    
    # 损失曲线
    plt.subplot(1, 2, 1)
    plt.plot(range(1, num_epochs+1), losses)
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    
    # 准确率曲线
    plt.subplot(1, 2, 2)
    plt.plot(range(1, num_epochs+1), accuracies)
    plt.title('Training Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    
    plt.tight_layout()
    plt.show()

# 绘制训练曲线
plot_training_curves(train_losses, train_accuracies)

# 9. 评估模型
def evaluate_model():
    model.eval()  # 设置为评估模式
    correct = 0
    total = 0
    
    # 不需要计算梯度
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    print(f'测试集准确率: {100 * correct / total:.2f}%')
    return 100 * correct / total

# 评估模型在测试集上的表现
test_accuracy = evaluate_model()

# 10. 可视化预测结果(可选)
def show_predictions():
    model.eval()
    dataiter = iter(test_loader)
    images, labels = next(dataiter)
    
    # 选择前6个样本进行预测
    images = images[:6].to(device)
    labels = labels[:6]
    
    with torch.no_grad():
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
    
    # 显示结果
    plt.figure(figsize=(10, 4))
    for i in range(6):
        plt.subplot(1, 6, i+1)
        plt.imshow(images[i].cpu().numpy().squeeze(), cmap='gray')
        plt.title(f'真实: {labels[i].item()}\n预测: {predicted[i].item()}')
        plt.axis('off')
    plt.tight_layout()
    plt.show()

# 显示预测结果
show_predictions()

# 11. 保存模型(可选)
torch.save(model.state_dict(), 'mnist_cnn_model.pth')
print("模型已保存为 'mnist_cnn_model.pth'")

代码详细解释与注意事项

1. 准备工作

  • 我们导入了 PyTorch 的核心库以及处理图像所需的工具
  • device设置会自动检测是否有可用的 GPU,如果有则使用 GPU 加速训练,否则使用 CPU

2. 数据加载与预处理

  • 数据变换 (transforms)

    • ToTensor()将图像从 PIL 格式转换为 PyTorch 的 Tensor 格式,并将像素值从 0-255 归一化到 0-1 范围
    • Normalize()进行标准化,使用的均值和标准差是 MNIST 数据集的统计特性,这有助于模型更快收敛
  • 数据集加载

    • datasets.MNIST会自动下载数据(如果本地没有)并加载
    • train=True加载训练集(60,000 张图片),train=False加载测试集(10,000 张图片)
  • DataLoader

    • 用于批量加载数据,支持自动打乱数据顺序
    • batch_size=64表示每次处理 64 张图片
    • 训练时shuffle=True打乱数据顺序,测试时shuffle=False保持顺序

3. 模型定义

  • 我们定义了一个简单的卷积神经网络 (SimpleCNN),包含:

    • 两个卷积块:每个卷积块由卷积层、ReLU 激活函数和池化层组成
    • 两个全连接层:最后一层输出 10 个值,对应 0-9 十个数字的预测概率
  • 卷积操作的作用:

    • 提取图像的局部特征,如边缘、纹理等
    • 池化层用于降低特征图尺寸,减少计算量

4. 模型训练

  • 损失函数:使用CrossEntropyLoss,适合多分类问题

  • 优化器:使用Adam优化器,比传统的 SGD 收敛更快

  • 训练过程中的关键步骤:

    1. 清零梯度:optimizer.zero_grad()
    2. 前向传播:计算模型输出和损失
    3. 反向传播:loss.backward()计算梯度
    4. 更新参数:optimizer.step()应用梯度更新
  • 注意事项:

    • 训练前调用model.train()设置为训练模式
    • 定期打印损失和准确率,监控训练进度
    • 将数据和模型移动到相同的设备上(CPU 或 GPU)

5. 模型评估

  • 评估时调用model.eval()设置为评估模式,这会关闭 dropout 等训练特有的操作
  • 使用torch.no_grad()关闭梯度计算,节省内存并加速计算
  • 计算测试集上的准确率,评估模型的泛化能力

6. 常见问题与解决方法

  1. 训练速度慢

    • 检查是否使用了 GPU(代码会自动检测,但需要正确安装 PyTorch GPU 版本)
    • 尝试调大batch_size(受限于 GPU 内存)
  2. 过拟合

    • 增加训练轮数
    • 添加正则化(如 Dropout)
    • 增加数据增强
  3. 准确率低

    • 检查模型结构是否合理
    • 尝试调整学习率
    • 增加训练轮数

通过这个完整流程,你可以加载 MNIST 数据集,训练一个卷积神经网络对手写数字进行分类,并评估模型性能。对于初学者来说,这个例子涵盖了深度学习的基本流程和关键概念,是一个很好的入门练习。

更多推荐