1. Multi-class Weather DataSet
- Multi-class Weather DataSet은 다양한 기상 조건을 포함하는 이미지 데이터셋으로, 주로 기계 학습 및 딥러닝 모델을 학습하거나 평가하는데에 사용됩니다.
- 맑음, 눈, 비, 흐림과 같은 여러 날씨 유형으로 label이 지정된 Multi-class Classification 문제를 다룹니다.
- 각 class는 다양한 시간대, 계절, 지역에서 촬영된 이미지를 포함하여 현실 세계의 다양성을 반영하도록 설계되었습니다.
- 이를 통해 모델은 날씨 조건을 정확히 분류하고, 기상 관측, 자동화된 날씨 보고, 혹은 자율주행 차량의 환경 인식 시스템과 같은 다양한 응용분야에서 활용될 수 있습니다.
※ kaggle 데이터셋 다운로드
링크 클릭 → 로그인 후 우측상단 다운로드 클릭 → Kaggle CLI 선택 후 리눅스 명령어 복사 → !로 리눅스 명령어 실행
! kaggle datasets download pratik2901/multiclass-weather-dataset
※ 데이터셋을 Colab의 세션저장소로 저장하기(세션이 종료되면 사라짐)
import os
import zipfile
import random
from shutil import copyfile, rmtree
zip_file = 'multiclass-weather-dataset.zip'
base_dir = './Multi-class Weather Dataset'
train_dir = './train'
test_dir = './test'
extract_path = '.'
with zipfile.ZipFile(zip_file, 'r') as zip_ref :
zip_ref.extractall(extract_path)
if os.path.exists(train_dir) :
rmtree(train_dir)
if os.path.exists(test_dir) :
rmtree(test_dir)
# exist_ok=True : 파일이 존재해도 오류가 발생하지 않게 함
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
for category in categories :
os.makedirs(os.path.join(train_dir, category), exist_ok=True)
os.makedirs(os.path.join(test_dir, category), exist_ok=True)
# 각 카테고리별 데이터 파일 나누기
for category in categories :
category_path = os.path.join(base_dir, category)
# os.listdir(파일이 담긴 폴더경로) : 안에 들어있는 파일 이름들이 담긴 리스트를 반환
files = os.listdir(category_path)
# 데이터 섞기
random.shuffle(files)
# 데이터를 8:2로 나누기
split_idx = int(len(files) * 0.8)
train_files = files[:split_idx]
test_files = files[split_idx:]
for file in train_files :
src = os.path.join(category_path, file)
dst = os.path.join(train_dir, category, file)
copyfile(src, dst)
for file in test_files :
src = os.path.join(category_path, file)
dst = os.path.join(test_dir, category, file)
copyfile(src, dst)
print('데이터 분리가 완료되었습니다.')
@ 사용되는 모듈 전부 불러오기
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision.datasets as datasets
from torchvision.utils import make_grid
import torch.optim as optim
import torch.nn as nn
# 딥러닝 관련한 함수들을 모아둠.
import torch.nn.functional as F
from torch.utils.data import random_split
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import matplotlib.image as image
import numpy as np
※ transforms.ToTensor()
- 이미지를 PyTorch Tensor형태로 변환합니다
- 이미지의 픽셀 값을 [0, 255] 범위에서 [0.0, 1.0] 범위로 정규화 합니다.
- 이미지 차원을 (H, W, C) 형식에서 PyTorch에서 사용하는 (C, H, W) 형식으로 변환합니다
- H : 이미지의 높이(Height)
- W : 이미지의 너비(Width)
- C : 채널(Channel, ex : RGB 이미지의 경우 3, 흑백 이미지의 경우 1)
※ transforms.Normalize(mean, std)
- 텐서로 변환된 이미지의 픽셀 값을 -1 ~ 1로 정규화(Normalization)합니다.
- mean : 각 채널(R, G, B)의 평균값
- std : 각 채널의 표준편차
- mean = [0.5, 0.5, 0.5] : R, G, B 채널 각각의 평균을 0.5로 설정
- std = [0.5, 0.5, 0.5] : R, G, B 채널 각각의 표준편차를 0.5로 설정
- 이 정규화는 일반적으로 픽셀값의 범위를 [-1, 1][-1, 1][-1, 1]로 조정하기 위해 사용됩니다.
( -1 ~ 1 사이로 정규화 시켜주는 것이 더 좋은 방법입니다. )
# Compose : 순차적으로 적용시키게 함
transform_train = transforms.Compose([
# Resize() : 무조건 적용, 이미지를 입력한 px로 바꿔줌(모델에 학습시킬 때 일정한 크기로 학습시키기 위해)
transforms.Resize((256, 256)),
# RandomHorizontalFlip() : 50% 확률로 좌우 반전시킴
transforms.RandomHorizontalFlip(),
# ToTensor() : 무조건 적용됨
transforms.ToTensor(),
transforms.Normalize(
mean = [0.5, 0.5, 0.5],
std = [0.5, 0.5, 0.5]
)
])
# 테스트 데이터에는 노이즈를 줄 필요가 없기 때문에 학습 데이터와 다르게 적용
transform_test = transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize(
mean = [0.5, 0.5, 0.5],
std = [0.5, 0.5, 0.5]
)
])
※ ImageFolder
- datasets.ImageFolder는 이미지 데이터를 특정 디렉터리 구조에서 로드하는 클래스입니다.
- 디렉터리 이름을 label로 간주하며, 각 디렉터리 내의 이미지 파일들을 해당 label에 할당합니다. (지도학습과 비슷하다고 생각하시면 편하실겁니다.)
- ImageFolder는 이미지 데이터를 PyTorch 데이터셋 형식으로 변환하므로, DataLoader와 함께 사용하여 배치 처리 및 데이터 증강을 쉽게 적용할 수 있습니다.
train_dataset = datasets.ImageFolder(
root='train/',
transform=transform_train,
)
dataset_size = len(train_dataset)
train_size = int(dataset_size * 0.8) # valid(검증)을 사용하기 위해
val_size = dataset_size - train_size
# train 데이터셋을 train과 valid 데이터셋으로 나누기
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])
test_dataset = datasets.ImageFolder(
root='test/',
transform=transform_test,
)
train_dataloader = DataLoader(train_dataset, batch_size = 64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)
※ make_grid
- 여러 이미지를 하나의 격자(grid) 형태로 합칩니다.
- 출력 이미지의 기본 배치는 가로로 나열된 이미지이며, 간격은 기본적으로 2px입니다.
# 이미지 확인하기
plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['figure.dpi'] = 60
plt.rcParams.update({'font.size': 20})
def imshow(input):
# transpose : 처음 (H, W, C) 형식으로 바꿔줌, transforms를 통해 (C, H, W) 형식으로 변환되었기 때문
input = input.numpy().transpose((1, 2, 0))
mean = np.array([0.5, 0.5, 0.5])
std = np.array([0.5, 0.5, 0.5])
input = std * input + mean # 정규화 해제 (역정규화)
# clip() : 값이 0보다 작은 경우 0, 1보다 큰 경우 1로 변환
input = np.clip(input, 0, 1)
plt.imshow(input)
plt.show()
class_names = { 0:'Cloudy', 1:'Rain', 2:'Shine', 3:'Sunrise' }
iterator = iter(train_dataloader)
imgs, labels = next(iterator)
out = make_grid(imgs[:4])
imshow(out)
print([class_names[labels[i].item()] for i in range(4)])
2. 다양한 모델 만들기
2-1. nn.Module 상속
- 모델 구성 요소 관리 : layer와 parameter를 자동으로 관리합니다.
- 순전파(Forward) 정의 : forward() 메서드를 통해 간단하고 일관된 순전파 과정을 정의합니다.
- 계층적 설계 : 서브 모듈을 활용해 복잡한 모델을 쉽게 설계 할 수 있습니다.
- 유틸리티 제공 : parameter 저장/로드, 학습/추론 모드 전환 등 다양한 기능을 제공합니다.
- PyTorch 호환성 : 최적화, 데이터 로더 등 PyTorch의 다른 기능과 손쉽게 통합 할 수 있습니다.
- 추상화 : 저수준 작업을 추상화하여 개발자의 생산성을 향상시킬 수 있습니다.
class Model1(nn.Module) :
def __init__(self) :
super(Model1, self).__init__()
self.linear1 = nn.Linear(256*256*3, 4)
self.flatten = nn.Flatten()
def forward(self, x) :
x = self.flatten(x)
x = self.linear1(x)
return x
@ Model1
- 특징 : 단일 선형 계층만 포함되어 있습니다.
- 활성화 함수나 추가 계층이 없으므로 모델이 표현할 수 있는 함수는 단순한 선형 변환에 제한됩니다.
- 표현력 부족으로 복잡한 데이터(비선형 데이터)를 학습하지 못할 가능성이 높습니다.
class Model2(nn.Module) :
def __init__(self) :
super(Model2, self).__init__()
self.linear1 = nn.Linear(256*256*3, 64)
self.linear2 = nn.Linear(64, 4)
self.flatten = nn.Flatten()
def forward(self, x) :
x = self.flatten(x)
x = self.linear1(x)
x = self.linear2(x)
return x
@ Model2
- 특징 : 두 개의 선형 계층을 사용하여 입력 데이터를 단계적으로 압축합니다.
- 활성화 함수가 없으므로 각 계층 간의 변환은 선형적입니다.
- Model1에 비해 표현력이 풍부하지만 여전히 부족하고 활성화 함수가 없기 때문에 비선형 데이터를 학습하지 못할 가능성이 있습니다.
2-2. nn.Dropout()
- nn.Dropout()은 PyTorch에서 제공하는 Overfitting을 방지하기 위한 layer입니다.
- Dropout은 학습 과정 중 일부 뉴런을 무작위로 '비활성화' 함으로써, 모델이 특정 뉴런에 지나치게 의존하지 않도록 와줍니다.
- 이를 통해 일반화 성능이 향상됩니다.
class Model3(nn.Module) :
def __init__(self) :
super(Model3, self).__init__()
self.linear1 = nn.Linear(256*256*3, 128)
self.dropout1 = nn.Dropout(0.5)
self.linear2 = nn.Linear(128, 64)
self.dropout2 = nn.Dropout(0.5)
self.linear3 = nn.Linear(64, 32)
self.dropout3 = nn.Dropout(0.5)
self.linear4 = nn.Linear(32, 4)
self.flatten = nn.Flatten()
def forward(self, x) :
x = self.flatten(x)
x = F.relu(self.linear1(x))
x = self.dropout1(x) # 50%의 뉴런을 무작위로 비활성화
x = F.relu(self.linear2(x))
x = self.dropout2(x)
x = F.relu(self.linear3(x))
x = self.dropout3(x)
x = self.linear4(x)
return x
@ Model3
- 특징 : 다중 구조와 ReLU 활성화 함수를 사용하여 비선형적 특징을 학습할 수 있습니다.
- Dropout을 사용하여 Overfitting을 방지하였습니다.
( 활성화 함수를 섞어 쓰지 않는 이유 : 성능적인 측면 + 기울기 소실 문제를 야기 할 수 있기 때문에 역전파에서 문제가 생길 수 있습니다.)
※ 학습, 검증, 테스트를 수행하는 함수 정의하기
import time
def train() :
start_time = time.time()
print(f'***[Epoch: {epoch + 1} - Training]***')
# model.train() : 모델을 학습모드로 전환함
# 학습, 드롭아웃, 배치 정규화 등 학습시에만 활성화하도록 설정
# Gradient Decent(경사 하강)를 활성화 함
model.train()
total = 0 # 학습 데이터 수
running_loss = 0.0 # 배치 단위 loss 값
running_corrects = 0 # 배치 단위 맞춘 예측 계수
for i, batch in enumerate(train_dataloader) :
imgs, labels = batch # batch(imgs, labels)
imgs, labels = imgs.cuda(), labels.cuda()
outputs = model(imgs)
optimizer.zero_grad()
# 확률값으로 변환하기 전 값(로짓 값)
# 하나의 배치에 img값도 나오는데 굳이 필요 없이 때문에 _를 사용
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total += labels.shape[0]
running_loss += loss.item()
running_corrects += torch.sum(preds == labels.data)
if (i == 0) or (i % log_step == log_step - 1) :
print(f'[Batch: {i+1}] running train loss:{running_loss / total}, running train accuracy: {running_corrects / total}')
print(f'train loss: {running_loss / total}, accuracy: {running_corrects / total}')
print('elapsed time:', time.time() - start_time)
return running_loss / total, (running_corrects / total).item()
def validate() :
start_time = time.time()
print(f'--[Epoch: {epoch + 1} - Validation]***')
# model.eval() : 모델을 검증모드로 전환함.
# 데이터 증강을 비활성화
# Gradient Decent를 비활성화 함.
model.eval()
total = 0
running_loss = 0.0
running_corrects = 0
for i, batch in enumerate(val_dataloader) :
imgs, labels = batch # batch(imgs, labels)
imgs, labels = imgs.cuda(), labels.cuda()
# with torch.no_grad() : gradient decent가 동작하지 않는 영역을 만듬.
with torch.no_grad() :
outputs = model(imgs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
total += labels.shape[0]
running_loss += loss.item()
running_corrects += torch.sum(preds == labels.data)
if (i == 0) or (i % log_step == log_step - 1) :
print(f'[Batch: {i+1}] running val loss:{running_loss / total}, running val accuracy: {running_corrects / total}')
print(f'val loss: {running_loss / total}, accuracy: {running_corrects / total}')
print('elapsed time:', time.time() - start_time)
return running_loss / total, (running_corrects / total).item()
def test() :
start_time = time.time()
print(f'***[test]***')
# model.eval() : 모델을 검증모드로 전환함.
# 데이터 증강을 비활성화
# Gradient Decent를 비활성화 함.
model.eval()
total = 0 # 학습 데이터 수
running_loss = 0.0 # 배치 단위 loss 값
running_corrects = 0 # 배치 단위 맞춘 예측 계수
# i = index
# 아래 for문은 11개의 배치 세트가 있어서 11번만 돈다.
for i, batch in enumerate(test_dataloader) :
imgs, labels = batch # batch(imgs, labels)
imgs, labels = imgs.cuda(), labels.cuda()
# with torch.no_grad() : gradient decent가 동작하지 않는 영역을 만듬.
with torch.no_grad() :
outputs = model(imgs)
_, preds = torch.max(outputs, 1) # model 출력값 중에서 최댓값이 나옴(로짓) # [0.1, 0.2, 0.1, 0.6] 이런식으로 그중에 큰값. _는 img가 굳이 필요없어서
loss = criterion(outputs, labels)
total += labels.shape[0]
running_loss += loss.item()
running_corrects += torch.sum(preds == labels.data)
if (i == 0) or (i % log_step == log_step - 1) :
print(f'[Batch: {i+1}] running test loss:{running_loss / total}, running test accuracy: {running_corrects / total}')
print(f'test loss: {running_loss / total}, accuracy: {running_corrects / total}')
print('elapsed time:', time.time() - start_time)
return running_loss / total, (running_corrects / total).item()
※ learning rate를 동적으로 관리하는 함수 정의하기
def adjust_learning_rate(optimizer, epoch) :
lr = learning_rate
if epoch >= 3 :
lr /= 10
if epoch >= 7 :
lr /= 10
# optimizer.param_groups : 옵티마이저는 학습률과 관련된 파라미터 그룹을 관리
# 'lr'키로 학습률 값을 설정함
for param_group in optimizer.param_groups :
param_group['lr'] = lr
※ 초기화(model1)
learning_rate = 0.01
log_step = 11
model = Model1()
model = model.cuda()
criterion = nn.CrossEntropyLoss()
# momentum : 중력, Adam에 설정된 값, lr에 입력값을 곱함
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum = 0.9)
num_epochs = 20
best_val_acc = 0
best_epoch = 0
history = []
accuracy = []
※ Model1 실행 및 최적의 파라미터 저장하기
os.makedirs("weights", exist_ok=True)
for epoch in range(num_epochs) :
adjust_learning_rate(optimizer, epoch)
train_loss, train_acc = train()
val_loss, val_acc = validate()
history.append((train_loss, val_loss))
accuracy.append((train_acc, val_acc))
if val_acc > best_val_acc :
print('[info] best validation accuarcy!')
best_val_acc = val_acc
best_epoch = epoch
# torch.save() : 파일로 세이브
# model.state_dic() : 가중치를 저장
# pth 또는 pt 확장자면 파이토치 파일
# h5 또는 h 확장자면 텐서플로우 파일
torch.save(model.state_dict(), f'weights/best_checkpoint_epoch_{epoch + 1}.pth')
torch.save(model.state_dict(), f'weights/last_checkpoint_epoch{epoch + 1}.pth')
※ Epoch에 따른 Accuracy값 시각화
plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--',label='validation')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
※ Model1의 test 실행
test_loss, test_accuracy = test()
print(f"Test loss: {test_loss:.8f}")
print(f"Test accuracy: {test_accuracy * 100.:.2f}%")
※ Model2로 같은과정 진행
os.makedirs("weights", exist_ok=True)
learning_rate = 0.01
log_step = 20
model = Model2()
model = model.cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
num_epochs = 20
best_val_acc = 0
best_epoch = 0
history = []
accuracy = []
for epoch in range(num_epochs):
adjust_learning_rate(optimizer, epoch)
train_loss, train_acc = train()
val_loss, val_acc = validate()
history.append((train_loss, val_loss))
accuracy.append((train_acc, val_acc))
if val_acc > best_val_acc:
print("[Info] best validation accuracy!")
best_val_acc = val_acc
best_epoch = epoch
torch.save(model.state_dict(), f"weights/best_checkpoint_epoch_{epoch + 1}.pth")
torch.save(model.state_dict(), f"weights/last_checkpoint_epoch_{num_epochs}.pth")
plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--',label='validation')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
test_loss, test_accuracy = test()
print(f"Test loss: {test_loss:.8f}")
print(f"Test accuracy: {test_accuracy * 100.:.2f}%")
※ Model3로 같은과정 진행
os.makedirs("weights", exist_ok=True)
learning_rate = 0.01
log_step = 20
model = Model3()
model = model.cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
num_epochs = 20
best_val_acc = 0
best_epoch = 0
history = []
accuracy = []
for epoch in range(num_epochs):
adjust_learning_rate(optimizer, epoch)
train_loss, train_acc = train()
val_loss, val_acc = validate()
history.append((train_loss, val_loss))
accuracy.append((train_acc, val_acc))
if val_acc > best_val_acc:
print("[Info] best validation accuracy!")
best_val_acc = val_acc
best_epoch = epoch
torch.save(model.state_dict(), f"weights/best_checkpoint_epoch_{epoch + 1}.pth")
torch.save(model.state_dict(), f"weights/last_checkpoint_epoch_{num_epochs}.pth")
plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--',label='validation')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
test_loss, test_accuracy = test()
print(f"Test loss: {test_loss:.8f}")
print(f"Test accuracy: {test_accuracy * 100.:.2f}%")