카테고리 없음

Multi-class Weather 데이터셋

dldbwls0818 2025. 2. 2. 03:17

1. Multi-class Weather DataSet

- Multi-class Weather DataSet은 다양한 기상 조건을 포함하는 이미지 데이터셋으로, 주로 기계 학습 및 딥러닝 모델을 학습하거나 평가하는데에 사용됩니다.

- 맑음, 눈, 비, 흐림과 같은 여러 날씨 유형으로 label이 지정된 Multi-class Classification 문제를 다룹니다.

- 각 class는 다양한 시간대, 계절, 지역에서 촬영된 이미지를 포함하여 현실 세계의 다양성을 반영하도록 설계되었습니다.

- 이를 통해 모델은 날씨 조건을 정확히 분류하고, 기상 관측, 자동화된 날씨 보고, 혹은 자율주행 차량의 환경 인식 시스템과 같은 다양한 응용분야에서 활용될 수 있습니다.

 

※ kaggle 데이터셋 다운로드

링크 클릭 → 로그인 후 우측상단 다운로드 클릭 → Kaggle CLI 선택 후 리눅스 명령어 복사 → !로 리눅스 명령어 실행

! kaggle datasets download pratik2901/multiclass-weather-dataset

 

※ 데이터셋을 Colab의 세션저장소로 저장하기(세션이 종료되면 사라짐)

import os
import zipfile
import random
from shutil import copyfile, rmtree

zip_file = 'multiclass-weather-dataset.zip'
base_dir = './Multi-class Weather Dataset'
train_dir = './train'
test_dir = './test'
extract_path = '.'

with zipfile.ZipFile(zip_file, 'r') as zip_ref :
    zip_ref.extractall(extract_path)
    
if os.path.exists(train_dir) :
    rmtree(train_dir)
if os.path.exists(test_dir) :
    rmtree(test_dir)
    
# exist_ok=True : 파일이 존재해도 오류가 발생하지 않게 함
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

for category in categories :
    os.makedirs(os.path.join(train_dir, category), exist_ok=True)
    os.makedirs(os.path.join(test_dir, category), exist_ok=True)
    
# 각 카테고리별 데이터 파일 나누기
for category in categories :
    category_path = os.path.join(base_dir, category)
    # os.listdir(파일이 담긴 폴더경로) : 안에 들어있는 파일 이름들이 담긴 리스트를 반환
    files = os.listdir(category_path)

    # 데이터 섞기
    random.shuffle(files)

    # 데이터를 8:2로 나누기
    split_idx = int(len(files) * 0.8)
    train_files = files[:split_idx]
    test_files = files[split_idx:]

    for file in train_files :
        src = os.path.join(category_path, file)
        dst = os.path.join(train_dir, category, file)
        copyfile(src, dst)

    for file in test_files :
        src = os.path.join(category_path, file)
        dst = os.path.join(test_dir, category, file)
        copyfile(src, dst)

print('데이터 분리가 완료되었습니다.')

 

@ 사용되는 모듈 전부 불러오기

import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision.datasets as datasets
from torchvision.utils import make_grid
import torch.optim as optim
import torch.nn as nn
# 딥러닝 관련한 함수들을 모아둠.
import torch.nn.functional as F
from torch.utils.data import random_split
from torch.utils.data import DataLoader

import matplotlib.pyplot as plt
import matplotlib.image as image
import numpy as np

 

※ transforms.ToTensor()

- 이미지를 PyTorch Tensor형태로 변환합니다

- 이미지의 픽셀 값을 [0, 255] 범위에서 [0.0, 1.0] 범위로 정규화 합니다.

- 이미지 차원을 (H, W, C) 형식에서 PyTorch에서 사용하는 (C, H, W) 형식으로 변환합니다

  • H : 이미지의 높이(Height)
  • W : 이미지의 너비(Width)
  • C : 채널(Channel, ex : RGB 이미지의 경우 3, 흑백 이미지의 경우 1)

※ transforms.Normalize(mean, std)

- 텐서로 변환된 이미지의 픽셀 값을 -1 ~ 1로 정규화(Normalization)합니다.

  • mean : 각 채널(R, G, B)의 평균값
  • std : 각 채널의 표준편차
  • mean = [0.5, 0.5, 0.5] : R, G, B 채널 각각의 평균을 0.5로 설정
  • std = [0.5, 0.5, 0.5] : R, G, B 채널 각각의 표준편차를 0.5로 설정

- 이 정규화는 일반적으로 픽셀값의 범위를 [-1, 1][-1, 1][-1, 1]로 조정하기 위해 사용됩니다.

( -1 ~ 1 사이로 정규화 시켜주는 것이 더 좋은 방법입니다. )

# Compose : 순차적으로 적용시키게 함
transform_train = transforms.Compose([
	# Resize() : 무조건 적용, 이미지를 입력한 px로 바꿔줌(모델에 학습시킬 때 일정한 크기로 학습시키기 위해)
	transforms.Resize((256, 256)),
    # RandomHorizontalFlip() : 50% 확률로 좌우 반전시킴 
    transforms.RandomHorizontalFlip(),
    # ToTensor() : 무조건 적용됨
    transforms.ToTensor(),
    transforms.Normalize(
    	mean = [0.5, 0.5, 0.5],
        std = [0.5, 0.5, 0.5]
    )
])

# 테스트 데이터에는 노이즈를 줄 필요가 없기 때문에 학습 데이터와 다르게 적용
transform_test = transforms.Compose([
	transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(
    	mean = [0.5, 0.5, 0.5],
        std = [0.5, 0.5, 0.5]
    )
])

※ ImageFolder

- datasets.ImageFolder는 이미지 데이터를 특정 디렉터리 구조에서 로드하는 클래스입니다.

- 디렉터리 이름을 label로 간주하며, 각 디렉터리 내의 이미지 파일들을 해당 label에 할당합니다. (지도학습과 비슷하다고 생각하시면 편하실겁니다.)

- ImageFolder는 이미지 데이터를 PyTorch 데이터셋 형식으로 변환하므로, DataLoader와 함께 사용하여 배치 처리 및 데이터 증강을 쉽게 적용할 수 있습니다.

train_dataset = datasets.ImageFolder(
    root='train/',
    transform=transform_train,
)

dataset_size = len(train_dataset)
train_size = int(dataset_size * 0.8) # valid(검증)을 사용하기 위해
val_size = dataset_size - train_size

# train 데이터셋을 train과 valid 데이터셋으로 나누기
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

test_dataset = datasets.ImageFolder(
    root='test/',
    transform=transform_test,
)

train_dataloader = DataLoader(train_dataset, batch_size = 64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

 

※ make_grid

- 여러 이미지를 하나의 격자(grid) 형태로 합칩니다.

- 출력 이미지의 기본 배치는 가로로 나열된 이미지이며, 간격은 기본적으로 2px입니다.

# 이미지 확인하기
plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['figure.dpi'] = 60
plt.rcParams.update({'font.size': 20})

def imshow(input):
    # transpose : 처음 (H, W, C) 형식으로 바꿔줌, transforms를 통해 (C, H, W) 형식으로 변환되었기 때문
    input = input.numpy().transpose((1, 2, 0)) 
    mean = np.array([0.5, 0.5, 0.5])
    std = np.array([0.5, 0.5, 0.5])
    input = std * input + mean # 정규화 해제 (역정규화)
     # clip() : 값이 0보다 작은 경우 0, 1보다 큰 경우 1로 변환
    input = np.clip(input, 0, 1)
    plt.imshow(input)
    plt.show()
    
class_names = { 0:'Cloudy', 1:'Rain', 2:'Shine', 3:'Sunrise' }

iterator = iter(train_dataloader)
imgs, labels = next(iterator)
out = make_grid(imgs[:4])
imshow(out)

print([class_names[labels[i].item()] for i in range(4)])

2. 다양한 모델 만들기

2-1. nn.Module 상속

  1. 모델 구성 요소 관리 : layer와 parameter를 자동으로 관리합니다.
  2. 순전파(Forward) 정의 : forward() 메서드를 통해 간단하고 일관된 순전파 과정을 정의합니다.
  3. 계층적 설계 : 서브 모듈을 활용해 복잡한 모델을 쉽게 설계 할 수 있습니다.
  4. 유틸리티 제공 : parameter 저장/로드, 학습/추론 모드 전환 등 다양한 기능을 제공합니다.
  5. PyTorch 호환성 : 최적화, 데이터 로더 등 PyTorch의 다른 기능과 손쉽게 통합 할 수 있습니다.
  6. 추상화 : 저수준 작업을 추상화하여 개발자의 생산성을 향상시킬 수 있습니다.
class Model1(nn.Module) :
    def __init__(self) :
        super(Model1, self).__init__()
        self.linear1 = nn.Linear(256*256*3, 4)
        self.flatten = nn.Flatten()

    def forward(self, x) :
        x = self.flatten(x)
        x = self.linear1(x)
        return x

 

@ Model1

- 특징 : 단일 선형 계층만 포함되어 있습니다.

- 활성화 함수나 추가 계층이 없으므로 모델이 표현할 수 있는 함수는 단순한 선형 변환에 제한됩니다.

- 표현력 부족으로 복잡한 데이터(비선형 데이터)를 학습하지 못할 가능성이 높습니다.

class Model2(nn.Module) :
    def __init__(self) :
        super(Model2, self).__init__()
        self.linear1 = nn.Linear(256*256*3, 64)
        self.linear2 = nn.Linear(64, 4)
        self.flatten = nn.Flatten()

    def forward(self, x) :
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.linear2(x)
        return x

 

@ Model2

- 특징 : 두 개의 선형 계층을 사용하여 입력 데이터를 단계적으로 압축합니다.

- 활성화 함수가 없으므로 각 계층 간의 변환은 선형적입니다.

- Model1에 비해 표현력이 풍부하지만 여전히 부족하고 활성화 함수가 없기 때문에 비선형 데이터를 학습하지 못할 가능성이 있습니다.

2-2. nn.Dropout()

- nn.Dropout()은 PyTorch에서 제공하는 Overfitting을 방지하기 위한 layer입니다.

- Dropout은 학습 과정 중 일부 뉴런을 무작위로 '비활성화' 함으로써, 모델이 특정 뉴런에 지나치게 의존하지 않도록 와줍니다.

- 이를 통해 일반화 성능이 향상됩니다.

class Model3(nn.Module) :
    def __init__(self) :
        super(Model3, self).__init__()
        self.linear1 = nn.Linear(256*256*3, 128)
        self.dropout1 = nn.Dropout(0.5)
        self.linear2 = nn.Linear(128, 64)
        self.dropout2 = nn.Dropout(0.5)
        self.linear3 = nn.Linear(64, 32)
        self.dropout3 = nn.Dropout(0.5)
        self.linear4 = nn.Linear(32, 4)
        self.flatten = nn.Flatten()

    def forward(self, x) :
        x = self.flatten(x) 
        x = F.relu(self.linear1(x)) 
        x = self.dropout1(x) # 50%의 뉴런을 무작위로 비활성화
        x = F.relu(self.linear2(x))
        x = self.dropout2(x)
        x = F.relu(self.linear3(x)) 
        x = self.dropout3(x)
        x = self.linear4(x)
        return x

 

@ Model3

- 특징 : 다중 구조와 ReLU 활성화 함수를 사용하여 비선형적 특징을 학습할 수 있습니다.

- Dropout을 사용하여 Overfitting을 방지하였습니다.

( 활성화 함수를 섞어 쓰지 않는 이유 : 성능적인 측면 + 기울기 소실 문제를 야기 할 수 있기 때문에 역전파에서 문제가 생길 수 있습니다.)

 

※ 학습, 검증, 테스트를 수행하는 함수 정의하기

import time

def train() :
    start_time = time.time()
    print(f'***[Epoch: {epoch + 1} - Training]***')
    # model.train() : 모델을 학습모드로 전환함
    # 학습, 드롭아웃, 배치 정규화 등 학습시에만 활성화하도록 설정
    # Gradient Decent(경사 하강)를 활성화 함
    model.train()
    total = 0               # 학습 데이터 수
    running_loss = 0.0      # 배치 단위 loss 값
    running_corrects = 0    # 배치 단위 맞춘 예측 계수

    for i, batch in enumerate(train_dataloader) :
        imgs, labels = batch # batch(imgs, labels)
        imgs, labels = imgs.cuda(), labels.cuda()
        outputs = model(imgs)
        optimizer.zero_grad()
        # 확률값으로 변환하기 전 값(로짓 값)
        # 하나의 배치에 img값도 나오는데 굳이 필요 없이 때문에 _를 사용
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total += labels.shape[0]
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

        if (i == 0) or (i % log_step == log_step - 1) :
            print(f'[Batch: {i+1}] running train loss:{running_loss / total}, running train accuracy: {running_corrects / total}')

    print(f'train loss: {running_loss / total}, accuracy: {running_corrects / total}')
    print('elapsed time:', time.time() - start_time)
    return running_loss / total, (running_corrects / total).item()
    
def validate() :
    start_time = time.time()
    print(f'--[Epoch: {epoch + 1} - Validation]***')
    # model.eval() : 모델을 검증모드로 전환함.
    # 데이터 증강을 비활성화
    # Gradient Decent를 비활성화 함.
    model.eval()
    total = 0
    running_loss = 0.0
    running_corrects = 0

    for i, batch in enumerate(val_dataloader) :
        imgs, labels = batch # batch(imgs, labels)
        imgs, labels = imgs.cuda(), labels.cuda()

        # with torch.no_grad() : gradient decent가 동작하지 않는 영역을 만듬.
        with torch.no_grad() :
            outputs = model(imgs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

        total += labels.shape[0]
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

        if (i == 0) or (i % log_step == log_step - 1) :
            print(f'[Batch: {i+1}] running val loss:{running_loss / total}, running val accuracy: {running_corrects / total}')

    print(f'val loss: {running_loss / total}, accuracy: {running_corrects / total}')
    print('elapsed time:', time.time() - start_time)
    return running_loss / total, (running_corrects / total).item()
    
def test() :
    start_time = time.time()
    print(f'***[test]***')
    # model.eval() : 모델을 검증모드로 전환함.
    # 데이터 증강을 비활성화
    # Gradient Decent를 비활성화 함.
    model.eval()
    total = 0               # 학습 데이터 수
    running_loss = 0.0      # 배치 단위 loss 값
    running_corrects = 0    # 배치 단위 맞춘 예측 계수

    # i = index
    # 아래 for문은 11개의 배치 세트가 있어서 11번만 돈다.
    for i, batch in enumerate(test_dataloader) :
        imgs, labels = batch # batch(imgs, labels)
        imgs, labels = imgs.cuda(), labels.cuda()

        # with torch.no_grad() : gradient decent가 동작하지 않는 영역을 만듬.
        with torch.no_grad() :
            outputs = model(imgs)
            _, preds = torch.max(outputs, 1) # model 출력값 중에서 최댓값이 나옴(로짓) # [0.1, 0.2, 0.1, 0.6] 이런식으로 그중에 큰값. _는 img가 굳이 필요없어서
            loss = criterion(outputs, labels)

        total += labels.shape[0]
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

        if (i == 0) or (i % log_step == log_step - 1) :
            print(f'[Batch: {i+1}] running test loss:{running_loss / total}, running test accuracy: {running_corrects / total}')

    print(f'test loss: {running_loss / total}, accuracy: {running_corrects / total}')
    print('elapsed time:', time.time() - start_time)
    return running_loss / total, (running_corrects / total).item()

※ learning rate를 동적으로 관리하는 함수 정의하기

def adjust_learning_rate(optimizer, epoch) :
    lr = learning_rate
    if epoch >= 3 :
        lr /= 10
    if epoch >= 7 :
        lr /= 10
    # optimizer.param_groups : 옵티마이저는 학습률과 관련된 파라미터 그룹을 관리
    # 'lr'키로 학습률 값을 설정함
    for param_group in optimizer.param_groups :
        param_group['lr'] = lr

※ 초기화(model1)

learning_rate = 0.01
log_step = 11
model = Model1()
model = model.cuda()
criterion = nn.CrossEntropyLoss()
# momentum : 중력, Adam에 설정된 값, lr에 입력값을 곱함
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum = 0.9)
num_epochs = 20
best_val_acc = 0
best_epoch = 0

history = []
accuracy = []

※ Model1 실행 및 최적의 파라미터 저장하기

os.makedirs("weights", exist_ok=True)

for epoch in range(num_epochs) :
    adjust_learning_rate(optimizer, epoch)
    train_loss, train_acc = train()
    val_loss, val_acc = validate()
    history.append((train_loss, val_loss))
    accuracy.append((train_acc, val_acc))
    if val_acc > best_val_acc :
        print('[info] best validation accuarcy!')
        best_val_acc = val_acc
        best_epoch = epoch
        # torch.save() : 파일로 세이브
        # model.state_dic() : 가중치를 저장
        # pth 또는 pt 확장자면 파이토치 파일
        # h5 또는 h 확장자면 텐서플로우 파일
        torch.save(model.state_dict(), f'weights/best_checkpoint_epoch_{epoch + 1}.pth')

torch.save(model.state_dict(), f'weights/last_checkpoint_epoch{epoch + 1}.pth')

※ Epoch에 따른 Accuracy값 시각화

plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--',label='validation')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()

※ Model1의 test 실행

test_loss, test_accuracy = test()
print(f"Test loss: {test_loss:.8f}")
print(f"Test accuracy: {test_accuracy * 100.:.2f}%")

※ Model2로 같은과정 진행

os.makedirs("weights", exist_ok=True)

learning_rate = 0.01
log_step = 20
model = Model2()
model = model.cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
num_epochs = 20
best_val_acc = 0
best_epoch = 0

history = []
accuracy = []

for epoch in range(num_epochs):
    adjust_learning_rate(optimizer, epoch)
    train_loss, train_acc = train()
    val_loss, val_acc = validate()
    history.append((train_loss, val_loss))
    accuracy.append((train_acc, val_acc))

    if val_acc > best_val_acc:
        print("[Info] best validation accuracy!")
        best_val_acc = val_acc
        best_epoch = epoch
        torch.save(model.state_dict(), f"weights/best_checkpoint_epoch_{epoch + 1}.pth")

torch.save(model.state_dict(), f"weights/last_checkpoint_epoch_{num_epochs}.pth")

plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--',label='validation')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()

test_loss, test_accuracy = test()
print(f"Test loss: {test_loss:.8f}")
print(f"Test accuracy: {test_accuracy * 100.:.2f}%")

※ Model3로 같은과정 진행

os.makedirs("weights", exist_ok=True)

learning_rate = 0.01
log_step = 20
model = Model3()
model = model.cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
num_epochs = 20
best_val_acc = 0
best_epoch = 0

history = []
accuracy = []

for epoch in range(num_epochs):
    adjust_learning_rate(optimizer, epoch)
    train_loss, train_acc = train()
    val_loss, val_acc = validate()
    history.append((train_loss, val_loss))
    accuracy.append((train_acc, val_acc))

    if val_acc > best_val_acc:
        print("[Info] best validation accuracy!")
        best_val_acc = val_acc
        best_epoch = epoch
        torch.save(model.state_dict(), f"weights/best_checkpoint_epoch_{epoch + 1}.pth")

torch.save(model.state_dict(), f"weights/last_checkpoint_epoch_{num_epochs}.pth")

plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--',label='validation')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()

test_loss, test_accuracy = test()
print(f"Test loss: {test_loss:.8f}")
print(f"Test accuracy: {test_accuracy * 100.:.2f}%")