본문 바로가기

Data_Analysis_Track_33/Python

Python_Deeplearning_pytorch_06(딥러닝모델_성능개선)

성능개선 기법 실습

모듈 정의

 

train.py

  • 모델 학습과 검증 함수 정의

 

module dir 생성

import os
os.makedirs('module', exist_ok=True)

 

모델 학습과 검증 함수 정의

%%writefile module/train.py

import torch
import time

# multi와 binary 정확도 계산이 다르다.

def test_multi_classification(dataloader, model, loss_fn, device="cpu") -> tuple:
    """
    다중 분류 검증/평가 함수
    
    [parameter]
        dataloader: DataLoader - 검증할 대상 데이터로더
        model: 검증할 모델
        loss_fn: 모델 추정값과 정답의 차이를 계산할 loss 함수.
        device: str - 연산을 처리할 장치. default-"cpu", gpu-"cuda"
    [return]
        tuple: (loss, accuracy)
    """
    model.eval() # 모델을 평가모드로 변환
    size = len(dataloader.dataset) # 전체 데이터수
    num_batches = len(dataloader)  #  step 수
    
    test_loss, test_accuracy = 0., 0.
    
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            # 정확도 계산
            pred_label = torch.argmax(pred, axis=-1)
            test_accuracy += torch.sum(pred_label == y).item()
            
        test_loss /= num_batches
        test_accuracy /= size  #전체 개수로 나눈다.
    return test_loss, test_accuracy

def test_binary_classification(dataloader, model, loss_fn, device="cpu") -> tuple:
    """
    이진 분류 검증/평가 함수
    
    [parameter]
        dataloader: DataLoader - 검증할 대상 데이터로더
        model: 검증할 모델
        loss_fn: 모델 추정값과 정답의 차이를 계산할 loss 함수.
        device: str - 연산을 처리할 장치. default-"cpu", gpu-"cuda"
    [return]
        tuple: (loss, accuracy)
    """
    model.eval() # 모델을 평가모드로 변환
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    
    test_loss, test_accuracy = 0., 0.
    
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            ## 정확도 계산
            pred_label = (pred >= 0.5).type(torch.int32)
            test_accuracy += (pred_label == y).sum().item() 
            
        test_loss /= num_batches
        test_accuracy /= size   #전체 개수로 나눈다.
    return test_loss, test_accuracy    

def train(dataloader, model, loss_fn, optimizer, device="cpu", mode:"binary or multi"='binary'):
    """
    모델을 1 epoch 학습시키는 함수

    [parameter]
        dataloader: DataLoader - 학습데이터셋을 제공하는 DataLoader
        model - 학습대상 모델
        loss_fn: 모델 추정값과 정답의 차이를 계산할 loss 함수.
        optimizer - 최적화 함수
        device: str - 연산을 처리할 장치. default-"cpu", gpu-"cuda"
        mode: str - 분류 종류. binary 또는 multi
    [return]
        tuple: 학습후 계산한 Train set에 대한  train_loss, train_accuracy
    """
    model.train()
    size = len(dataloader.dataset) #총 데이터수

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)

        loss = loss_fn(pred, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    if mode == 'binary':
        train_loss, train_accuracy = test_binary_classification(dataloader, model, loss_fn, device)
    else:
        train_loss, train_accuracy = test_multi_classification(dataloader, model, loss_fn, device)
    return train_loss, train_accuracy



def fit(train_loader, val_loader, model, loss_fn, optimizer, epochs, save_best_model=True, save_model_path=None, early_stopping=True, patience=10, device='cpu',  mode:"binary or multi"='binary'):
    """
    모델을 학습시키는 함수

    [parameter]
        train_loader (Dataloader): Train dataloader
        test_loader (Dataloader): validation dataloader
        model (Module): 학습시킬 모델
        loss_fn (_Loss): Loss function
        optimizer (Optimizer): Optimizer
        epochs (int): epoch수
        save_best_model (bool, optional): 학습도중 성능개선시 모델 저장 여부. Defaults to True.
        save_model_path (str, optional): save_best_model=True일 때 모델저장할 파일 경로. Defaults to None.
        early_stopping (bool, optional): 조기 종료 여부. Defaults to True.
        patience (int, optional): 조기종료 True일 때 종료전에 성능이 개선될지 몇 epoch까지 기다릴지 epoch수. Defaults to 10.
        device (str, optional): device. Defaults to 'cpu'.
        mode(str, optinal): 분류 종류. "binary(default) or multi
    [return]
        tuple: 에폭 별 성능 리스트. (train_loss_list, train_accuracy_list, validation_loss_list, validataion_accuracy_list)
    """

    train_loss_list = []
    train_accuracy_list = []
    val_loss_list = []
    val_accuracy_list = []
    
        
    if save_best_model:
        best_score_save = torch.inf

    ############################
    # early stopping
    #############################
    if early_stopping:
        trigger_count = 0
        best_score_es = torch.inf
    
    # 모델 device로 옮기기
    model = model.to(device)
    s = time.time()
    for epoch in range(epochs):
        # train() 함수 호출
        train_loss, train_accuracy = train(train_loader, model, loss_fn, optimizer, device=device, mode=mode)
        # 검증 - test_xxxx_classification() 함수 호출
        if mode == "binary":
            val_loss, val_accuracy = test_binary_classification(val_loader, model, loss_fn, device=device)
        else:
            val_loss, val_accuracy = test_multi_classification(val_loader, model, loss_fn, device=device)

        # 현 에폭의 검증결과를 각 list에 추가.
        train_loss_list.append(train_loss)
        train_accuracy_list.append(train_accuracy)
        val_loss_list.append(val_loss)
        val_accuracy_list.append(val_accuracy)
        
        # 로그 출력
        print(f"Epoch[{epoch+1}/{epochs}] - Train loss: {train_loss:.5f} Train Accucracy: {train_accuracy:.5f} || Validation Loss: {val_loss:.5f} Validation Accuracy: {val_accuracy:.5f}")
        print('='*100)
        
        # 모델 저장
        if save_best_model:
            if val_loss < best_score_save:
                torch.save(model, save_model_path)
                print(f"저장: {epoch+1} - 이전 : {best_score_save}, 현재: {val_loss}")
                best_score_save = val_loss
        
        # early stopping 처리            
        if early_stopping:
            if val_loss < best_score_es: 
                best_score_es = val_loss  
                trigger_count = 0
                                
            else:
                trigger_count += 1                
                if patience == trigger_count:
                    print(f"Early stopping: Epoch - {epoch}")
                    break
            
    e = time.time()
    print(e-s, "초")
    return train_loss_list, train_accuracy_list, val_loss_list, val_accuracy_list

 

data.py

  • dataset 생성 함수 제공 모듈

 

%%writefile module/data.py
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

def load_mnist_dataset(root_path, batch_size, is_train=True):
    """
    mnist dataset dataloader 제공 함수
    [parameter]
        root_path: str|Path - 데이터파일 저장 디렉토리
        batch_size: int
        is_train: bool = True - True: Train dataset, False - Test dataset
    [return]
        DataLoader 
    """
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    dataset = datasets.MNIST(root=root_path, train=is_train, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=is_train)  # shuffle: train이면 True, test면 False 할 것이므로 is_train을 넣음.
    
    return dataloader

def load_fashion_mnist_dataset(root_path, batch_size, is_train=True):
    """
    fashion mnist dataset dataloader 제공 함수
    [parameter]
        root_path: str|Path - 데이터파일 저장 디렉토리
        batch_size: int
        is_train: bool = True - True: Train dataset, False - Test dataset
    [return]
        DataLoader
    """
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    dataset = datasets.FashionMNIST(root=root_path, train=is_train, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=is_train)  # shuffle: train이면 True, test면 False 할 것이므로 is_train을 넣음.
    
    return dataloader

 

 

 

module에서 위에서 정의한 data 불러온다.

- mnist_trainloader, testloader 정의

from module import data

mnist_trainloader = data.load_mnist_dataset("datasets", 200, True) # (root, batch_size, train)
mnist_testloader = data.load_mnist_dataset("datasets", 200, False)

 

데이터수 & step수

# 데이터수
len(mnist_trainloader.dataset), len(mnist_testloader.dataset)

# step수
len(mnist_trainloader), len(mnist_testloader)

 

import

import torch
import torch.nn as nn

import torchinfo

from module import train, data

import os
import matplotlib.pyplot as plt

device = "cuda" if torch.cuda.is_available() else "cpu"
device

 

하이퍼파라미터, 변수 정의

LR = 0.001
N_EPOCH = 10
BATCH_SIZE = 200

# DATASET, MODEL 저장할 ROOT 디렉토리 경로
DATASET_ROOT_PATH = 'datasets'
MODEL_SAVE_ROOT_PATH = 'models'

 

Data 준비

- mnist 데이터 로딩

train_loader = data.load_mnist_dataset(DATASET_ROOT_PATH, BATCH_SIZE, True)
test_loader = data.load_mnist_dataset(DATASET_ROOT_PATH, BATCH_SIZE, False)

 

모델의 크기 변경에 따른 성능변화

 

모델정의

class SmallSizeModel(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.output = nn.Linear(784, 10) # in: feature수 - (1*28*28), out: class개수(10)
        
    def forward(self, X):
        out = nn.Flatten()(X)
        out = self.output(out)
        return out

 

모델생성

# 모델생성
small_model = SmallSizeModel()
torchinfo.summary(small_model, (BATCH_SIZE, 1, 28, 28))

 

loss, optimizer

# loss
loss_fn = nn.CrossEntropyLoss()
# optimizer
optimizer = torch.optim.Adam(small_model.parameters(), lr=LR)

 

train

# train -> module.fit()
train_loss_list, train_acc_list, valid_loss_list, valid_acc_list = train.fit(train_loader
                                                                             ,test_loader
                                                                             ,small_model
                                                                             ,loss_fn
                                                                             ,optimizer
                                                                             ,N_EPOCH
                                                                             ,save_best_model=True
                                                                             ,save_model_path=os.path.join(MODEL_SAVE_ROOT_PATH, "small_model.pth")
                                                                             ,device=device
                                                                             ,mode="multi"
                                                                            )

 

trainset, validationset의 loss 그래프로 확인하기

plt.figure(figsize =(10, 10))
plt.subplot(1, 2, 1)
plt.plot(train_loss_list, label = "trainset")
plt.plot(valid_loss_list, label = "validationset")
plt.title("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_acc_list, label = "trainset")
plt.plot(valid_acc_list, label = "validationset")
plt.title("accuracy")
plt.legend()

plt.tight_layout()
plt.show()

 

작성할 layer가 너무 많은 경우 묶어서 정의

# nn.Sequential()을 이용해 Layer block 정의하기.
# 여러 Layer들을 묶어준다.
layer_block = nn.Sequential(
    nn.Linear(20, 30)
    ,nn.ReLU()
)
a = torch.ones(1, 20)
r = layer_block(a)
print(r.shape)
r

 

새로운 모델 정의(모델의 크기를 키움)

class BigSizeModel(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.b1 = nn.Sequential(nn.Linear(784, 2048), nn.ReLU())
        self.b2 = nn.Sequential(nn.Linear(2048, 1024), nn.ReLU())
        self.b3 = nn.Sequential(nn.Linear(1024, 512), nn.ReLU())
        self.b4 = nn.Sequential(nn.Linear(512, 256), nn.ReLU())
        self.b5 = nn.Sequential(nn.Linear(256, 128), nn.ReLU())
        self.b6 = nn.Sequential(nn.Linear(128, 64), nn.ReLU())
        
        self.output = nn.Linear(64, 10)
        
    def forward(self, X):
        X = nn.Flatten()(X)
        out = self.b1(X)
        out = self.b2(out)
        out = self.b3(out)
        out = self.b4(out)
        out = self.b5(out)
        out = self.b6(out)
        return self.output(out)

 

big_model - summary

big_model = BigSizeModel()
torchinfo.summary(big_model, (BATCH_SIZE, 1, 28, 28))

 

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(big_model.parameters(), lr=LR)

 

새로운 모델(BigSizeModel)로 학습

# 학습
train_loss_list2, train_acc_list2, valid_loss_list2, valid_acc_list2 = train.fit(train_loader
                                                                             ,test_loader
                                                                             ,big_model
                                                                             ,loss_fn
                                                                             ,optimizer
                                                                             ,N_EPOCH
                                                                             ,save_best_model=True
                                                                             ,save_model_path=os.path.join(MODEL_SAVE_ROOT_PATH, "big_model.pth")
                                                                             ,device=device
                                                                             ,mode="multi"
                                                                            )

 

결과확인

- 더 좋은 효과를 보인다.

plt.figure(figsize =(10, 10))
plt.subplot(1, 2, 1)
plt.plot(train_loss_list2, label = "trainset")
plt.plot(valid_loss_list2, label = "validationset")
plt.title("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_acc_list2, label = "trainset")
plt.plot(valid_acc_list2, label = "validationset")
plt.title("accuracy")
plt.legend()

plt.tight_layout()
plt.show()

 

Dropout 예제

  • dropout 각 레이어에 적용
        - dropout은 nn.Dropout 객체를 사용
        - 객체 생성시 dropout_rate 설정: 0.2 ~ 0.5
        - Drop시킬 노드를 가진 Layer 뒤에 추가한다.

 

Dropout 모델 정의

class DropoutModel(nn.Module):

    def __init__(self, drop_rate=0.5):
        super().__init__()
        self.b1 = nn.Sequential(nn.Linear(784, 256)       # 선형함수
                                ,nn.ReLU()                # 비선형함수
                                ,nn.Dropout(p=drop_rate)  # dropout
                               )
        self.b2 = nn.Sequential(nn.Linear(256, 256)
                                ,nn.ReLU()
                                ,nn.Dropout(p=drop_rate)
                               )
        self.b3 = nn.Sequential(nn.Linear(256, 128)
                                ,nn.ReLU()
                                ,nn.Dropout(p=drop_rate)
                               )
        self.b4 = nn.Sequential(nn.Linear(128, 128)
                                ,nn.ReLU()
                                ,nn.Dropout(p=drop_rate)
                               )
        self.output = nn.Sequential(nn.Linear(128, 10)
                                ,nn.Dropout(p=drop_rate)
                               )
        
    def forward(self, X):
        out = nn.Flatten()(X)
        out = self.b1(out)
        out = self.b2(out)
        out = self.b3(out)
        out = self.b4(out)
        out = self.output(out)
        return out

 

Dropout 모델 d_model 생성 및 summary

d_model = DropoutModel().to(device) # drop_rate=0.5(default)
torchinfo.summary(d_model, (BATCH_SIZE, 1, 28, 28))

 

Dropout 모델 d_model로 학습

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(d_model.parameters(), lr=LR)

result = train.fit(train_loader, test_loader, d_model, loss_fn, optimizer, N_EPOCH, save_best_model=False, early_stopping=False, device=device, mode="multi")

 

Dropout 이해 프로그램 

# Dropout 원리 이해
a = torch.randn(5, 3) # random (5, 3) 배열 a 생성
# print(a)
# a.shape
ln = nn.Linear(3, 4)
b = ln(a) # (input = 3, output = 4) Linear 실행 b 생성
# print(b)
do = nn.Dropout(0.5) # 0.5 -> dropout 비율
c = do(b) # b에 대한 Dropout 실행 c
c

 

Batch Normalization

  • Dense => BN => Activation
  • 각 Layer 에서 출력된 값을 평균=0, 표준편차=1로 정규화 하여 각 Layer의 입력분포를 균일하게 만들어 준다.

 

실습코드(BatchNorm1d) 간단이해

a = torch.rand(10, 5) # (10-data수, 5-feature수)
# BatchNorm1d(feature수), BatchNorm2d(channel수)
bn = nn.BatchNorm1d(5) # features => 1차원: BatchNorm1d(), 3차원(이미지): BatchNorm2d()
b = bn(a)
b

 

 

# feature별 평균/표준편차
print("a의 평균:", a.mean(axis=0))
print("표준편차:", a.std(axis=0))

# BatchNorm1d의 결과의 feature별 평균/표준편차
print("b의 평균:", b.mean(axis=0))
print("표준편차:", b.std(axis=0))

 

BatchNorm 모델 BNModel 정의

# 순서 : Linear -> BatchNorm -> Activation (-> Dropout)
class BNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.b1 = nn.Sequential(nn.Linear(784, 256)
                                ,nn.BatchNorm1d(256) # 입력 : 1차원 -> feature
                                ,nn.ReLU()
                               )
        self.b2 = nn.Sequential(nn.Linear(256, 128)
                                ,nn.BatchNorm1d(128)
                                ,nn.ReLU()
                               )
        self.b3 = nn.Sequential(nn.Linear(128, 64)
                                ,nn.BatchNorm1d(64)
                                ,nn.ReLU()
                               )
        self.output = nn.Linear(64, 10)
        
    def forward(self, X):
        out = nn.Flatten()(X)
        out = self.b1(out)
        out = self.b2(out)
        out = self.b3(out)
        out = self.output(out)
        return out

 

 

BatchNorm(BNModel) 모델 bn_model 생성 및 summary

# 모델 생성
bn_model = BNModel().to(device)
torchinfo.summary(bn_model)

 

BatchNorm(BNModel) 모델 bn_model 학습

# train
# loss
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(bn_model.parameters(), lr=LR)

result_bn = train.fit(train_loader, test_loader, bn_model, loss_fn, optimizer, N_EPOCH, save_best_model=False, early_stopping=False, device=device, mode="multi")

 

Learning rate decay

  • Optimizer와 Learning rate scheduler의 속성, 메소드 확인
  • 파이토치는 torch.optim 모듈에서 다양한 Learning rate 알고리즘을 제공한다.


optimizer.param_groups

optimizer = torch.optim.Adam(bn_model.parameters(), lr=0.001)

# 옵티마이저 객체관련 여러 정보들을 조회 => optimizer.param_groups
print(type(optimizer.param_groups), len(optimizer.param_groups))
print(type(optimizer.param_groups[0]))
info_dict = optimizer.param_groups[0]
info_dict.keys()

 

현재 learning rate 확인

info_dict['lr'] # 현재 learning rate

 

학습률의 변화흐름을 시각화하는 함수 정의

# 학습률의 변화흐름을 시각화하는 함수
def plot_lr(title, lr_list):
    # title - 그래프 제목
    # lr_list - 에폭별로 적용된 Learning rate
    plt.figure(figsize=(10, 5))
    plt.plot(range(len(lr_list)), lr_list)

    plt.title(title)
    plt.xticks([x for x in range(len(lr_list)) if x % 5 == 0], rotation=45)
    plt.xlabel("Epoch")
    plt.ylabel("LR")
    plt.grid(True
             ,axis="x" # x축 기준 그리드만 표시
             ,linestyle=":"
            )
    plt.show()

 

확인

# 테스트
plot_lr("My Scheduler", list(range(1, 1000, 10)))


Optimizer의 Learning rate(학습율) 조정을 통한 성능향상

StepLR

 

optimizer = torch.optim.Adam(bn_model.parameters(), lr=0.001)
# lr_scheduler : optimizer의 lr는 초기 학습률
step_scheduler = torch.optim.lr_scheduler.StepLR(optimizer # 학습률을 변화시킬 optimizer
                                                 ,step_size=30 # 몇 step/epoch마다 LR을 변경시킬지 간격
                                                 ,gamma=0.5 # 변경 비율. new_lr = 현재lr * gamma
                                                )
# optimizer의 lr을 30 에폭/step 마다 0.5배씩 변경

 

# 현재 LR 조회
optimizer.param_groups[0]['lr'], step_scheduler.get_last_lr()

 

N_EPOCHS = 200
STEP_SIZE = 10
steplr_list = [] # epoch별 lr를 저장할 변수.

for epoch in range(N_EPOCH):

    # step
    for step in range(STEP_SIZE): # for x,y in dataloader: 대신 사용
        # 학습
        # 모델 예측 -> Loss 계산 -> 파라미터 업데이트
        optimizer.step()
        # learning rate 변경 요청. => 30번 요청이 들어오면 변경 * gamma
        # step단위로 변경
        # step_scheduler.step()
    step_scheduler.step()  # epoch 단위로 변경.
    steplr_list.append(step_scheduler.get_last_lr()[0])     #  현재 epoch의 lr을 저장

 

결과 확인

plot_lr("Step LR", steplr_list)

 

CosineAnnealingLR

  • cosine 그래프를 그리면서 learning rate를 변경 하는 방식. 
  • 최근에는 learning rate를 단순히 감소시키기 보다는 감소와 증가를 반복하여 진동하는 방식으로 최적점을 찾아가는 알고리즘을 많이 사용한다. 이러한 방법 중 가장 간단하면서도 많이 사용되는 방법이 CosineAnnealingLR이다.

 

# Annealing : 가열 <-> 냉각
optimizer = torch.optim.Adam(bn_model.parameters(), lr=0.001)
ca_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer
                                                          ,T_max=10 # 한 cycle 주기(단위: 에폭,step)
                                                          ,eta_min=0.0001 # 최소 학습률                                                        
                                                         )
# 범위: 초기 LR ~ eta_min

 

ca_lr_list = []
for epoch in range(N_EPOCH):
    # 한 에폭
    for step in range(STEP_SIZE):
        # 한 스텝
        optimizer.step()
        
    ca_scheduler.step()
    ca_lr_list.append(ca_scheduler.get_last_lr()[0])

 

결과 확인

plot_lr("CosineAnnealingLR", ca_lr_list)

 

CosineAnnealingWarmRestarts

  • cosine annealing의 스케쥴링에 cosine 주기의 에폭을 점점 늘리거나 줄일 수 있다. (보통 늘린다.)

 

optimizer = torch.optim.Adam(bn_model.parameters(), lr=0.001)
caws_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer
                                                                      ,T_0=10 # 초기 변화 주기(cycle)
                                                                      ,T_mult=2  # 변화주기를 어떤 비율로 변경할지. T_0 * T_mult = 새주기
                                                                      ,eta_min=1e-5 # 0.00001. 최소학습률
                                                                     )
# 변화 범위: 초기학습률 ~ eta_min

 

caws_lr_list = []

for epoch in range(N_EPOCH):
    # 한 에폭
    for step in range(STEP_SIZE):
        # 한 스텝
        optimizer.step()
    caws_scheduler.step()
    caws_lr_list.append(caws_scheduler.get_last_lr()[0])

 

결과 확인

plot_lr("CosineAnnealingWarmRestart", caws_lr_list)