文章目录
-
- 前言
- CIFAR10简介
- Backbone选择
- 训练+测试
-
- 训练环境及超参设置
- 完整代码
- 部分测试结果
- 完整工程文件
- Reference
前言
分享一下本人去年入门深度学习时,在CIFAR10数据集上做的图像分类任务,使用了多个主流的backbone网络,希望可以为同样想入门深度学习的同志们,提供一个方便上手、容易理解的参考教程。
CIFAR10简介
CIFAR-10数据集是图像分类领域经典的数据集,由 Hinton 的学生 Alex Krizhevsky 和 Ilya Sutskever 整理得到,一共包含10个类别的 RGB彩色图片:飞机( airplane )、汽车( automobile )、鸟类( bird )、猫( cat )、鹿( deer )、狗( dog )、蛙类( frog )、马( horse )、船( ship )和卡车( truck ),图片的尺寸为 32×32 ,数据集中一共有 50000 张训练圄片和 10000 张测试图片。 CIFAR-10 的图片样例如图所示
Pytorch中提供了如下命令可以直接将CIFAR10数据集下载到本地:
import torchvision
dataset = torchvision.datasets.CIFAR10(root, train=True, download=True, transform)
- root:数据集加载到本地的路径
- train=True:True表示加载训练集,False加载测试集
- download=True:True表示加载数据集到root,若数据集已经存在,则不会再加载
- transform:数据增强
这里分享一个加载CIFAR10数据集的完整代码:
# 设置数据增强
print('==> Preparing data..')
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# 加载CIFAR10数据集
trainset = torchvision.datasets.CIFAR10(
root=opt.data, train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=opt.batch_size, shuffle=True, num_workers=2)
testset = torchvision.datasets.CIFAR10(
root=opt.data, train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
testset, batch_size=100, shuffle=False, num_workers=2)
Backbone选择
本文主要尝试了以下几个主流的backbone网络,并在CIFAR10上实现了图像分类任务:
- LetNet
- AlexNet
- VGG
- GoogLeNet(InceptionNet)
- ResNet
- DenseNet
- ResNeXt
- SENet
- MobileNetv2-v3
- ShuffleNetv2
- EfficientNetB0
- Darknet53
- CSPDarknet53
这里放上测试结果最好的ResNet模块的构建代码,其他代码放到最后完整工程backbone文件夹中:
"""
pytorch实现ResNet50、ResNet101和ResNet152:
"""
import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
# conv1 7 x 7 64 stride=2
def Conv1(channel_in, channel_out, stride=2):
return nn.Sequential(
nn.Conv2d(
channel_in,
channel_out,
kernel_size=7,
stride=stride,
padding=3,
bias=False
),
nn.BatchNorm2d(channel_out),
# 会改变输入数据的值
# 节省反复申请与释放内存的空间与时间
# 只是将原来的地址传递,效率更好
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=stride, padding=1)
)
# 构建ResNet18-34的网络基础模块
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(
in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion * planes,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion * planes)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
# 构建ResNet50-101-152的网络基础模块
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, in_planes, planes, stride=1):
super(Bottleneck, self).__init__()
# 构建 1x1, 3x3, 1x1的核心卷积块
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
stride=stride, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.conv3 = nn.Conv2d(planes, self.expansion *
planes, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm2d(self.expansion * planes)
# 采用1x1的kernel,构建shout cut
# 注意这里除了第一个bottleblock之外,都需要下采样,所以步长要设置为stride=2
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion * planes,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion * planes)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = F.relu(self.bn2(self.conv2(out)))
out = self.bn3(self.conv3(out))
out += self.shortcut(x)
out = F.relu(out)
return out
# 搭建ResNet模板块
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# 逐层搭建ResNet
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(512 * block.expansion, num_classes)
# 参数初始化
# for m in self.modules():
# if isinstance(m, nn.Conv2d):
# nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
# elif isinstance(m, nn.BatchNorm2d):
# nn.init.constant_(m.weight, 1)
# nn.init.constant_(m.bias, 0)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
# layers = [ ] 是一个列表
# 通过下面的for循环遍历配置列表,可以得到一个由 卷积操作、池化操作等 组成的一个列表layers
# return nn.Sequential(*layers),即通过nn.Sequential函数将列表通过非关键字参数的形式传入(列表layers前有一个星号)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
def ResNet18():
return ResNet(BasicBlock, [2, 2, 2, 2])
def ResNet34():
return ResNet(BasicBlock, [3, 4, 6, 3])
def ResNet50():
return ResNet(Bottleneck, [3, 4, 6, 3])
def ResNet101():
return ResNet(Bottleneck, [3, 4, 23, 3])
def ResNet152():
return ResNet(Bottleneck, [3, 8, 36, 3])
# 测试
# if __name__ == '__main__':
# model = ResNet50()
# print(model)
#
# input = torch.randn(1, 3, 32, 32)
# out = model(input)
# print(out.shape)
训练+测试
训练环境及超参设置
本文的训练环境和超参数设置如下:
- 1块1080 Ti GPU
- epoch为100
- batch-size为128
- 优化器:SGD
- 学习率:余弦退火有序调整学习率
主要步骤如下:
-
加载数据集
- 将数据集加载到本地
- 按batch-size加载到dataLoader
-
设置相关参数
- 指定GPU
- 训练相关参数
- 断点续训
- 模型保存参数
- 设置优化器
- 设置学习率
-
循环每个epoch
- 开启训练
- 开启测试
- 学习率调整
- 数据可视化
- 打印结果
完整代码
'''Train CIFAR10 with PyTorch.'''
import torchvision.transforms as transforms
import time
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import os
import argparse
# 导入模型
from backbones.ResNet import ResNet18
# 指定GPU
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
# 用于计算GPU运行时间
def time_sync():
# pytorch-accurate time
if torch.cuda.is_available():
torch.cuda.synchronize()
return time.time()
# Training
def train(epoch):
model.train()
train_loss = 0
correct = 0
total = 0
train_acc = 0
# 开始迭代每个batch中的数据
for batch_idx, (inputs, targets) in enumerate(trainloader):
# inputs:[b,3,32,32], targets:[b]
# train_outputs:[b,10]
inputs, targets = inputs.to(device), targets.to(device)
# print(inputs.shape)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
# 计算损失
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
# 计算准确率
train_acc = correct / total
# 每训练100个batch打印一次训练集的loss和准确率
if (batch_idx + 1) % 100 == 0:
print('[INFO] Epoch-{}-Batch-{}: Train: Loss-{:.4f}, Accuracy-{:.4f}'.format(epoch + 1,
batch_idx + 1,
loss.item(),
train_acc))
# 计算每个epoch内训练集的acc
total_train_acc.append(train_acc)
# Testing
def test(epoch, ckpt):
global best_acc
model.eval()
test_loss = 0
correct = 0
total = 0
test_acc = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(testloader):
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
test_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
test_acc = correct / total
print(
'[INFO] Epoch-{}-Test Accurancy: {:.3f}'.format(epoch + 1, test_acc), '\n')
total_test_acc.append(test_acc)
# 保存权重文件
acc = 100. * correct / total
if acc > best_acc:
print('Saving..')
state = {
'net': model.state_dict(),
'acc': acc,
'epoch': epoch,
}
if not os.path.isdir('checkpoint'):
os.mkdir('checkpoint')
torch.save(state, ckpt)
best_acc = acc
if __name__ == '__main__':
# 设置超参
parser = argparse.ArgumentParser(description='PyTorch CIFAR10 Training')
parser.add_argument('--epochs', type=int, default=100)
parser.add_argument('--batch_size', type=int, default=128)
parser.add_argument('--data', type=str, default='cifar10')
parser.add_argument('--T_max', type=int, default=100)
parser.add_argument('--lr', default=0.1, type=float, help='learning rate')
parser.add_argument('--resume', '-r', action='store_true', help='resume from checkpoint')
parser.add_argument('--checkpoint', type=str, default='checkpoint/ResNet18-CIFAR10.pth')
opt = parser.parse_args()
# 设置相关参数
device = torch.device('cuda:0') if torch.cuda.is_available() else 'cpu'
best_acc = 0 # best test accuracy
start_epoch = 0 # start from epoch 0 or last checkpoint epoch
classes = ('plane', 'car', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck')
# 设置数据增强
print('==> Preparing data..')
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# 加载CIFAR10数据集
trainset = torchvision.datasets.CIFAR10(
root=opt.data, train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=opt.batch_size, shuffle=True, num_workers=2)
testset = torchvision.datasets.CIFAR10(
root=opt.data, train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
testset, batch_size=100, shuffle=False, num_workers=2)
# print(trainloader.dataset.shape)
# 加载模型
print('==> Building model..')
model = ResNet18().to(device)
# DP训练
if device == 'cuda':
model = torch.nn.DataParallel(model)
cudnn.benchmark = True
# 加载之前训练的参数
if opt.resume:
# Load checkpoint.
print('==> Resuming from checkpoint..')
assert os.path.isdir('checkpoint'), 'Error: no checkpoint directory found!'
checkpoint = torch.load(opt.checkpoint)
model.load_state_dict(checkpoint['net'])
best_acc = checkpoint['acc']
start_epoch = checkpoint['epoch']
# 设置损失函数与优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=opt.lr,
momentum=0.9, weight_decay=5e-4)
# 余弦退火有序调整学习率
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=opt.T_max)
# ReduceLROnPlateau(自适应调整学习率)
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
# 记录training和testing的acc
total_test_acc = []
total_train_acc = []
# 记录训练时间
tic = time_sync()
# 开始训练
for epoch in range(opt.epochs):
train(epoch)
test(epoch, opt.checkpoint)
# 动态调整学习率
scheduler.step()
# ReduceLROnPlateau(自适应调整学习率)
# scheduler.step(loss_val)
# 数据可视化
plt.figure()
plt.plot(range(opt.epochs), total_train_acc, label='Train Accurancy')
plt.plot(range(opt.epochs), total_test_acc, label='Test Accurancy')
plt.xlabel('Epoch')
plt.ylabel('Accurancy')
plt.title('ResNet18-CIFAR10-Accurancy')
plt.legend()
plt.savefig('output/ResNet18-CIFAR10-Accurancy.jpg') # 自动保存plot出来的图片
plt.show()
# 输出best_acc
print(f'Best Acc: {best_acc * 100}%')
toc = time_sync()
# 计算本次运行时间
t = (toc - tic) / 3600
print(f'Training Done. ({t:.3f}s)')
部分测试结果
Backbone | Best Acc |
---|---|
MobileNetv2 | 93.37% |
VGG16 | 93.80% |
DenseNet121 | 94.55% |
GoogLeNet | 95.02% |
ResNeXt29_32×4d | 95.18% |
ResNet50 | 95.20% |
SENet18 | 95.22% |
ResNet18 | 95.23% |
完整工程文件
Pytorch实现CIFAR10图像分类任务测试集准确率达95%
Reference
CIFAR-10 数据集
深度学习入门基础教程(二) CNN做CIFAR10数据集图像分类 pytorch版代码
Pytorch CIFAR10 图像分类篇 汇总
pytorch-cifar:使用PyTorch在CIFAR10上为95.47%