Commit e84f5d00 by 前钰

Upload New File

parent 51ac9b7d
import argparse # 用于解析命令行参数
import argparse # 用于解析命令行参数
import torch
import torch.optim as optim # PyTorch中的优化器
from torch.utils.data import DataLoader # PyTorch中用于加载数据的工具
from tqdm import tqdm # 用于在循环中显示进度条
from torch.optim.lr_scheduler import CosineAnnealingLR # 余弦退火学习率调度器
import torch.nn.functional as F # PyTorch中的函数库
from torchvision import datasets # PyTorch中的视觉数据集
import torchvision.transforms as transforms # PyTorch中的数据变换操作
from tensorboardX import SummaryWriter # 用于创建TensorBoard日志的工具
import os # Python中的操作系统相关功能
from utils import AverageMeter, accuracy # 自定义工具模块,用于计算模型的平均值和准确度
from models import model_dict # 自定义模型字典,包含了各种模型的定义
import numpy as np # NumPy库,用于数值计算
import time # Python中的时间相关功能
import random # Python中的随机数生成器
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义加权 Focal Loss 类
class WeightedFocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2, weights=None, reduction='mean'):
"""
alpha: Focal Loss 的缩放系数,控制整体 loss 大小
gamma: Focal Loss 的聚焦系数,gamma越大,对难分类样本关注越多
weights: 每个类别的权重 tensor,用于处理类别不平衡
reduction: 返回 loss 的方式,'mean' 取平均,'sum' 求和,否则返回每个样本的 loss
"""
super(WeightedFocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
self.weights = weights
self.reduction = reduction
def forward(self, inputs, targets):
"""
inputs: 模型输出 logits,形状 [batch_size, num_classes]
targets: 真实标签,形状 [batch_size],取值为类别索引
"""
# 计算标准交叉熵损失,但不进行 reduction,保持每个样本单独 loss
ce_loss = F.cross_entropy(inputs, targets, weight=self.weights, reduction='none')
# 计算 pt = exp(-ce_loss),即模型对真实类别的预测概率
pt = torch.exp(-ce_loss)
# 计算 Focal Loss 核心公式
# alpha * (1-pt)^gamma * CE
# - (1-pt)^gamma 用于降低易分类样本的 loss,聚焦难分类样本
focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
# 根据 reduction 决定返回方式
if self.reduction == 'mean':
return focal_loss.mean() # 返回 batch 平均 loss
elif self.reduction == 'sum':
return focal_loss.sum() # 返回 batch 总和 loss
return focal_loss # 返回每个样本的 loss,形状 [batch_size]
# ------------------- 类别权重计算示例 -------------------
# 四个类别的样本比例分别为 26, 43, 73, 10
samples_per_class = torch.tensor([26, 43, 73, 10], dtype=torch.float32)
# 计算每个类别的权重,样本少的类别权重大
weights = 1.0 / samples_per_class
# 归一化,使所有权重加起来为 1
weights = weights / weights.sum()
# 将权重移动到模型所在设备(CPU 或 GPU)
weights = weights.to(device)
# ====== 单独的损失函数 ======
ce_loss_fn = nn.CrossEntropyLoss(weight=weights)
focal_loss_fn = WeightedFocalLoss(alpha=1, gamma=2, weights=weights)
# ====== 组合损失函数 ======
def combined_loss(outputs, targets, alpha=0.5):
"""
outputs: 模型预测 (batch_size, num_classes)
targets: 真实标签 (batch_size)
alpha: CE 与 FocalLoss 的比例系数,0.5 表示各占一半
"""
ce = ce_loss_fn(outputs, targets)
focal = focal_loss_fn(outputs, targets)
return alpha * ce + (1 - alpha) * focal
parser = argparse.ArgumentParser() # 导入argparse模块,用于解析命令行参数
parser.add_argument("--model_names", type=str, default="resnet18") # 添加命令行参数,指定模型名称,默认为"resnet18"
parser.add_argument("--pre_trained", type=bool, default=False) #指定是否使用预训练模型,默认为False
parser.add_argument("--classes_num", type=int, default=4) # 指定类别数,默认为4
parser.add_argument("--dataset", type=str, default="new_COVID_19_Radiography_Dataset") # 指定数据集名称,默认为"new_COVID_19_Radiography_Dataset"
parser.add_argument("--batch_size", type=int, default=32) # 指定批量大小,默认为64
parser.add_argument("--epoch", type=int, default=100) # 指定训练轮次数,默认为20
parser.add_argument("--lr", type=float, default=0.01) # 指定学习率,默认为0.01
parser.add_argument("--momentum", type=float, default=0.9) # 优化器的动量,默认为 0.9
parser.add_argument("--weight-decay", type=float, default=1e-4) # 权重衰减(正则化项),默认为 5e-4
parser.add_argument("--seed", type=int, default=33) # 指定随机种子,默认为33
parser.add_argument("--gpu-id", type=int, default=0) # 指定GPU编号,默认为0
parser.add_argument("--print_freq", type=int, default=1) # 打印训练信息的频率,默认为 1(每个轮次打印一次)
parser.add_argument("--exp_postfix", type=str, default="seed33") # 实验结果文件夹的后缀,默认为 "seed33"
parser.add_argument("--txt_name", type=str, default="lr0.01_wd1e-4_early_stop_lrs") # 文本文件名称,默认为 "lr0.01_wd5e-4"
parser.add_argument("--weights", type=str, default="resnet18-5c106cde.pth", help="预训练权重文件路径")
# ====== argparse 新增参数 ======
parser.add_argument("--patience", type=int, default=10, help="验证集性能无提升的最大容忍epoch数")
parser.add_argument("--min_delta", type=float, default=1e-3, help="认为有提升的最小差值")
# python train.py --modele_names vgg --batch_size 64 --lr 0.001
args = parser.parse_args()
# ====== 定义 EarlyStopping 类 ====== pytorch.lighting
class EarlyStopping: # 早停器:当验证指标长时间不提升时,提前结束训练
def __init__(self, patience=5, min_delta=0.0):
self.patience = patience # 容忍“无提升”的最大 epoch 数,超过就早停
self.min_delta = min_delta # 认为“有提升”的最小幅度阈值(小于等于它视为没提升)
self.best_score = None # 记录历史最佳的验证指标(如 val_acc / val_dice)
self.counter = 0 # 已连续“无提升”的轮数计数器
self.early_stop = False # 是否触发早停的标志位
def step(self, metric): # 每个 epoch 后调用:metric 为本轮验证指标(假定越大越好)
if self.best_score is None: # 第一次调用,还没有历史最佳
self.best_score = metric # 初始化最佳指标为当前值
return False # 第一轮不可能早停,直接继续训练
if metric - self.best_score > self.min_delta: # 若相对最佳值有“足够提升”(超过 min_delta)
self.best_score = metric # 更新最佳指标
self.counter = 0 # 重置“无提升”计数器
else: # 否则认为本轮没有有效提升
self.counter += 1 # “无提升”计数 +1
if self.counter >= self.patience: # 连续无提升的轮数达到/超过耐心值
self.early_stop = True # 触发早停
return self.early_stop # 返回是否需要早停(True=应停止训练)
def seed_torch(seed=74):
random.seed(seed) # Python random module.
os.environ['PYTHONHASHSEED'] = str(seed) # 为了禁止hash随机化,使得实验可复现
np.random.seed(seed) # Numpy module.
torch.manual_seed(seed) # 为CPU设置随机种子
torch.cuda.manual_seed(seed) # 为当前GPU设置随机种子
torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
# 设置cuDNN:cudnn中对卷积操作进行了优化,牺牲了精度来换取计算效率。如果需要保证可重复性,可以使用如下设置:
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
# 实际上这个设置对精度影响不大,仅仅是小数点后几位的差别。所以如果不是对精度要求极高,其实不太建议修改,因为会使计算效率降低。
print('random seed has been fixed')
seed_torch(seed=args.seed)
os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu_id) # 设置环境变量 CUDA_VISIBLE_DEVICES,指定可见的 GPU 设备,仅在需要时使用特定的 GPU 设备进行训练
exp_name = args.exp_postfix # 从命令行参数中获取实验名称后缀
exp_path = "./report/{}/{}/{}".format(args.dataset, args.model_names, exp_name) # 创建实验结果文件夹的路径
os.makedirs(exp_path, exist_ok=True)
# dataloader
transform_train = transforms.Compose([# transforms.RandomRotation(90), # 随机旋转图像
transforms.Resize([256, 256]), # # 调整图像大小为 256x256 像素
transforms.RandomCrop(224), # 随机裁剪图像为 224x224 大小
transforms.ToTensor(),
transforms.Normalize((0.3738, 0.3738, 0.3738), # # 对图像进行标准化
(0.3240, 0.3240, 0.3240))])
transform_test = transforms.Compose([transforms.Resize([224, 224]),
transforms.ToTensor(),
transforms.Normalize((0.3738, 0.3738, 0.3738),
(0.3240, 0.3240, 0.3240))])
trainset = datasets.ImageFolder(root=os.path.join('new_COVID_19_Radiography_Dataset', 'train'),
transform=transform_train)
testset = datasets.ImageFolder(root=os.path.join('new_COVID_19_Radiography_Dataset', 'val'),
transform=transform_test)
# 创建训练数据加载器
train_loader = DataLoader(trainset, batch_size=args.batch_size, num_workers=4,
# 后台工作线程数量,可以并行加载数据以提高效率
shuffle=True, pin_memory=True) # 如果可用,将数据加载到 GPU 内存中以提高训练速度
# 创建测试数据加载器
test_loader = DataLoader(testset, batch_size=args.batch_size, num_workers=4,
shuffle=False, pin_memory=True)
# train
def train_one_epoch(model, optimizer, train_loader):
model.train()
acc_recorder = AverageMeter() # 用于记录精度的工具
loss_recorder = AverageMeter() # 用于记录损失的工具
for (inputs, targets) in tqdm(train_loader, desc="train"): # 遍历训练数据加载器 train_loader 中的每个批次数据,使用 tqdm 包装以显示进度条。
# for i, (inputs, targets) in enumerate(train_loader):
# 如果当前设备支持 CUDA 加速,则将输入数据和目标数据送到 GPU 上进行计算,设置 non_blocking=True 可以使数据异步加载,提高效率。
if torch.cuda.is_available():
inputs = inputs.cuda(non_blocking=True)
targets = targets.cuda(non_blocking=True)
out = model(inputs)
# loss = F.cross_entropy(out, targets) # 计算损失(交叉熵损失)
loss = combined_loss(out, targets)
# 记录损失值 # 调用 update 方法,传入当前批次的损失值 loss.item() 和该批次的样本数量 inputs.size(0)。
# 这样做是为了根据样本数量加权计算损失的平均值,确保不同批次的损失贡献相等,而不受批次大小的影响。
loss_recorder.update(loss.item(), n=inputs.size(0))
acc = accuracy(out, targets)[0] # 计算精度 # 取出 top-1 精度
acc_recorder.update(acc.item(), n=inputs.size(0)) # 记录精度值 按样本数量加权平均
optimizer.zero_grad() # 清零之前的梯度
loss.backward() # 反向传播,计算梯度
optimizer.step() # 更新模型参数
losses = loss_recorder.avg # 计算平均损失
acces = acc_recorder.avg # 计算平均精度
return losses, acces # 返回平均损失和平均精度
def evaluation(model, test_loader):
# 将模型设置为评估模式,不会进行参数更新
model.eval()
acc_recorder = AverageMeter() # 初始化两个计量器,用于记录准确度和损失
loss_recorder = AverageMeter()
with torch.no_grad():
for img, label in tqdm(test_loader, desc="Evaluating"):
# for img, label in test_loader: # 迭代测试数据加载器中的每个批次
if torch.cuda.is_available():
img = img.cuda()
label = label.cuda()
out = model(img)
acc = accuracy(out, label)[0] # 计算准确度
# loss = F.cross_entropy(out, label) # 计算交叉熵损失
loss = combined_loss(out, label)
acc_recorder.update(acc.item(), img.size(0)) # 更新准确率记录器,记录当前批次的准确率 img.size(0)表示批次中的样本数量
loss_recorder.update(loss.item(), img.size(0)) # 更新损失记录器,记录当前批次的损失
losses = loss_recorder.avg # 计算所有批次的平均损失
acces = acc_recorder.avg # 计算所有批次的平均准确率
return losses, acces # 返回平均损失和准确率
def train(model, optimizer, train_loader, test_loader, scheduler):
since = time.time() # 记录训练开始时间
best_acc = -1 # 初始化最佳准确度为-1,以便跟踪最佳模型
f = open(os.path.join(exp_path, "{}.txt".format(args.txt_name)), "w") # 打开一个用于写入训练过程信息的文件
early_stopper = EarlyStopping(patience=args.patience, min_delta=args.min_delta)
for epoch in range(args.epoch):
# 在训练集上执行一个周期的训练,并获取训练损失和准确度
train_losses, train_acces = train_one_epoch(
model, optimizer, train_loader
)
# 在测试集上评估模型性能,获取测试损失和准确度
test_losses, test_acces = evaluation(model, test_loader)
# 如果当前测试准确度高于历史最佳准确度,更新最佳准确度并保存模型参数
if test_acces > best_acc:
best_acc = test_acces
state_dict = dict(epoch=epoch + 1, model=model.state_dict(), acc=test_acces)
name = os.path.join(exp_path, "ckpt", "best.pth")
os.makedirs(os.path.dirname(name), exist_ok=True)
torch.save(state_dict, name)
scheduler.step() # 更新学习率调度器
# 定义一个包含4个字符串的列表,用来给 TensorBoard 中记录的曲线命名
tags = ['train_losses', # 训练损失(loss)
'train_acces',
'test_losses',
'test_acces']
tb_writer.add_scalar(tags[0], train_losses, epoch + 1) # 将训练损失记录到TensorBoard中,对应 tag: 'train_losses'
tb_writer.add_scalar(tags[1], train_acces, epoch + 1) # 将训练准确率记录到 TensorBoard 中,对应 tag: 'train_acces'
tb_writer.add_scalar(tags[2], test_losses, epoch + 1)
tb_writer.add_scalar(tags[3], test_acces, epoch + 1)
# # 使用tqdm包装循环以查看进度
# for tag in tqdm(tags, desc=f"Epoch {epoch + 1}/{args.epoch}"):
# tb_writer.add_scalar(tag, train_losses, epoch + 1)
# 打印训练过程信息,以及将信息写入文件
# 说明:args.print_freq 是参数中设定的打印频率,例如为 1 表示每轮都打印,为 5 表示每 5 轮打印一次
if (epoch + 1) % args.print_freq == 0:
# 构建要打印/写入文件的字符串,包含 epoch 编号、模型名称、训练与验证的 loss 和 accuracy
msg = "epoch:{} model:{} train loss:{:.2f} acc:{:.2f} test loss{:.2f} acc:{:.2f}\n".format(
epoch + 1,
args.model_names,
train_losses,
train_acces,
test_losses,
test_acces,
)
print(msg) # 将训练信息打印到控制台
f.write(msg) # 将信息写入日志文件 f
f.flush()
# ====== 早停检查 ======
if early_stopper.step(test_acces):
print(f"早停触发:验证集准确率在 {args.patience} 个 epoch 内未提升")
break
# 输出训练结束后的最佳准确度和总训练时间
msg_best = "model:{} best acc:{:.2f}\n".format(args.model_names, best_acc)
time_elapsed = "traninng time: {}".format(time.time() - since)
print(msg_best)
f.write(msg_best)
f.write(time_elapsed)
f.close()
if __name__ == "__main__":
# 创建 TensorBoard 日志目录路径,目录结构为 runs/数据集名称/模型名称/实验后缀
tb_path = "runs/{}/{}/{}".format(args.dataset, args.model_names,
args.exp_postfix)
# 创建 TensorBoard 写入器,指定日志保存目录为 tb_path,用于后续写入训练信息以便可视化
tb_writer = SummaryWriter(log_dir=tb_path)
lr = args.lr
model = model_dict[args.model_names](num_classes=args.classes_num, pretrained=args.pre_trained) # 根据命令行参数创建神经网络模型
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# # 如果指定了预训练权重路径,则加载权重
# if args.weights is not None:
# print(f"Loading pretrained weights from {args.weights}")
# weights_dict = torch.load(args.weights, map_location=device) # vit
# # 如果权重是保存的完整checkpoint(dict),尝试取'model'键
# if isinstance(weights_dict, dict) and 'model' in weights_dict:
# weights_dict = weights_dict['model']
# # 删除分类头权重,防止类别数不匹配(一般head层名可能是'head.weight', 'head.bias')
# for k in ['head.weight', 'head.bias']:
# if k in weights_dict:
# del weights_dict[k]
# # 加载权重,strict=False表示允许忽略不匹配的权重
# load_result = model.load_state_dict(weights_dict, strict=False)
# print("Loaded weights:", load_result)
# 如果指定了预训练权重路径,则加载权重 resnet
if args.weights is not None:
print(f"Loading pretrained weights from {args.weights}")
weights_dict = torch.load(args.weights, map_location=torch.device('cpu')) # resnet
# 删除分类头权重
for k in ['fc.weight', 'fc.bias']:
if k in weights_dict:
del weights_dict[k]
# 加载权重,strict=False表示允许忽略不匹配的权重
load_result = model.load_state_dict(weights_dict, strict=False)
model = model.to(device) # resnet
print("Loaded weights:", load_result)
# 创建随机梯度下降(SGD)优化器,传入模型参数和超参数配置
optimizer = optim.SGD(
model.parameters(), # 需要更新的模型参数
lr=lr, # 学习率
momentum=args.momentum, # 动量因子,帮助加速收敛
nesterov=True, # 是否使用Nesterov动量
weight_decay=args.weight_decay, # 权重衰减(L2正则化)
)
scheduler = CosineAnnealingLR(optimizer, T_max=args.epoch) # 创建余弦退火学习率调度器 自动调整lr
train(model, optimizer, train_loader, test_loader, scheduler) # 开始训练过程
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment