Data_Analysis_Track_33/Python

Python_Deeplearning_pytorch_03(딥러닝-MLP구현)

lsc99 2023. 10. 17. 17:52

Pytorch 개발 Process

1. 데이터 준비

  • Dataset 준비
  • Dataloader 생성

 

2. 입력과 출력을 연결하는 Layer(층)으로 이뤄진 네트워크(모델)을 정의

  • Sequential 방식: 순서대로 쌓아올린 네트워크로 이뤄진 모델을 생성하는 방식
            - layer를 순서대로 쌓은 모델을 구현할때 간단히 모델을 정의할 수 있다.
            - layer block을 정의하는데 사용할 수 있다.
  • Subclass 방식: 네트워크를 정의하는 클래스를 구현.
            - 다양한 구조의 모델을 정의할 수 있다.
            - inializer에서 필요한 layer들을 생성한다.
            - forward(self, X) 메소드에 forward propagation 계산을 구현한다.

 

3. train(학습)

  • train 함수, test 함수 정의

 

4. test set 최종평가


MNIST 이미지 분류

  • [MNIST] (Modified National Institute of Standards and Technology) database
  • 흑백 손글씨 숫자 0-9까지 10개의 범주로 구분해놓은 데이터셋
  • 하나의 이미지는 28 * 28 pixel 의 크기
  • 6만개의 Train 이미지와 1만개의 Test 이미지로 구성됨.

 

import

# torch: 공통 구현체
# torchvision: 영상처리를 위한 구현체
import torch
import torch.nn as nn # 딥러닝 모델을 구성하는 함수들의 모듈.
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import matplotlib.pyplot as plt
import numpy as np
import os

print("파이토치 버전:", torch.__version__)

 

device 설정

# 처리를 cpu를 이용할지 gpu를 이용할지 설정
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)

 

변수 설정 (BATCH_SIZE, N_EPOCH, LR ....)

# 뒤에서 여러번 사용할 값들을 저장할 변수 + 
# 모델에 설정할 값(모델자체, 학습할 때 필요한 값 등등)으로 성능에 영향을 주는 값들을 저장할 변수
      # === 하이퍼 파라미터(Hyper Parameter)
BATCH_SIZE = 256 #  모델의 파라미터를 업데이트할 때 사용할 데이터의 개수. 한번에 몇개 데이터를 입력할 지
N_EPOCH = 20     # 1 epoch : 전체 train dataset을 한번 학습한 것
LR = 0.001       # 학습률. 파라미터 update할 때 gradient값에 곱해줄값.
                    # (gradient를 새로운 파라미터 계산할 때 얼마나 반영할지 비율)
# step : 파라미터를 1번 update하는 단위. 1step당 학습데이터수 : batch_size
# epoch : 전체 학습데이터셋을 한번 학습한 단위
#         1epoc당 step횟수 = ceil(총 데이터수/batch_size)

DATASET_SAVE_PATH = 'datasets' # 데이터셋을 저장할 디렉토리 경로
MODEL_SAVE_PATH = 'models'      # 

os.makedirs(DATASET_SAVE_PATH, exist_ok=True)
os.makedirs(DATASET_SAVE_PATH, exist_ok=True)

 

MNIST dataset Loading

Dataset

  • Dataset: input/output data를 묶어서 관리.
  • DataLoader : Dataset의 데이터들을 모델에 어떻게 제공할지를 관리.

 

훈련용(train_set), 테스트용(test_set) 데이터셋 가져오기

# MNIST Dataset을 다운로드 + Dataset 객체를 생성
# train dataset - 6만장
# torchvision.datasets
train_set = datasets.MNIST(root=DATASET_SAVE_PATH # 데이터셋을 저장할 디렉토리 경로
                           ,train=True # trainset(학습용): True, testset(검증용): False
                           ,download=True # root에 저장된 데이터파일들이 없을때 다운로드 받을지 여부
                           ,transform=transforms.ToTensor() # 데이터 전처리.
                          )
# ToTensor(): ndarray, PIL.Image객체를 torch.Tensor로 변환. Pixcel값 정규화(normalize): 0 ~ 1 실수로 변환
test_set = datasets.MNIST(root = DATASET_SAVE_PATH
                          ,train=False
                          ,download=True
                          ,transform=transforms.ToTensor()
                         )

 

train_set과 test_set 기본 정보

print(type(train_set), type(test_set)) # Dataset 타입
# Dataset 조회
print(train_set)
print(test_set)

# 데이터 포인트 개수
print(len(train_set), len(test_set))

 

DataLoader

# Dataset을 모델에 어떻게 제공할지를 설정. -> 학습/평가시 설정된대로 데이터를 loading
# 훈련용 DataLoader
train_loader = DataLoader(train_set # Dataset
                          ,batch_size=BATCH_SIZE # batch_size를 설정
                          ,shuffle=True # 한 epoch이 끝나면 다음 epoch전에 데이터를 섞을지 여부. (섞는다 -> True)
                          ,drop_last=True # 마지막 batch의 데이터수가 batch_size보다 적을 경우 버릴지(학습에 사용안함 -> True) 여부
                         )
# 평가용 DataLoader
test_loader = DataLoader(test_set
                         ,batch_size=BATCH_SIZE
                        )

 

데이터셋의 개수를 batch_size로 나눈 값 -> 1 epoch당 step 수

- trainset의 경우 : 60000/256

- testset의 경우 : 10000/256

print("1 epoch당 step 수")
print("trainset:", len(train_loader))
print("testset:", len(test_loader))

print(np.ceil(60000/256), np.ceil(10000/256))

 

네트워크(모델) 정의

  • Network : 전체 모델 구조

 

# class로 정의: nn.Module을 상속해서 정의
class MnistModel(nn.Module):

    def __init__(self):
        """
        모델 객체 생성시 모델을 구현(정의)할 때 필요한 것들을 초기화.
        필요한 것: Layer들. 등등
        """
        super().__init__()

        # 784(pixcel수) -> 128 개로 축소
        self.lr1 = nn.Linear(28 * 28, 128) # input feature크기, output size
        # 128 feature -> 64 축소
        self.lr2 = nn.Linear(128, 64)
        # 64 feature -> 출력결과 10 (각 범주의 확률)
        self.lr3 = nn.Linear(64, 10)
        # Activation(활성) 함수 -> 비선형함수 : ReLU
        self.relu = nn.ReLU() # f(x) = max(x, 0)
        
    def forward(self, x):
        """
        input data를 입력 받아서 output 데이터를 만들때 까지의 계산 흐름을 정의
        ===> forward propagation
        parameter
            x : 입력데이터
        return
            torch.Tensor: 출력데이터(모델 예측결과.)
        """
        # init에서 생성한 함수들을 이용해서 계산
        # x -> 1차원으로 변환 -> lr1 -> relu -> lr2 -> relu -> lr3 -> output
        # input (batch_size, channel, height, width) => (batch_size, 전체pixcel)
        x = torch.flatten(x, start_dim=1) # (b, c, h, w) -> (b, c*h*w)
        
        x = self.lr1(x)
        x = self.relu(x)
        
        x = self.lr2(x)
        x = self.relu(x)

        output = self.lr3(x)
        return output

 

reshape : 배열의 형태 변환

flatten : axis 합치기

i = torch.arange(28 * 28).reshape(1, 1, 28, 28)
print(i.shape) # [1:데이터개수, 1:channel, 28:height, 28:width]
# -> 1차원 => 데이터개수는 유지 [1, 784]
# 다차원 Tensor => 1차원 => flatten([start_dim=axis]) # 지정한 axis부터 합친다.
print(torch.flatten(i).shape)
torch.flatten(i, start_dim=1).shape # axis 0은 유지하고, axis 1부터 나머지를 합친다.

 

모델 객체 생성

# 정의한 모델 클래스로부터 모델 객체를 생성
model = MnistModel()
print(model)

 

torchinfo 패키지 설치

# 모델의 연산 흐름 및 정보를 확인=> torchinfo 패키지를 사용
!pip install torchinfo

 

summary

from torchinfo import summary
summary(model, (100, 784)) # summary(모델객체, input shape: tuple)

 

첫번째 이미지(pred_batch[0])에 대한 추론해보기

# train dataset의 첫번째 배치를 이용해서 모델에 추론.
x_batch, y_batch = next(iter(train_loader)) # (input, output)
print(x_batch.shape, y_batch.shape)

 

# 추론
pred_batch = model(x_batch) # model의 forward() 메소드가 실행.
pred_batch.shape

 

결과값 비교

- 자신의 프로그램의 결과값을 비교해보기

# 첫번째 이미지에 대한 추론결과 -> class별 확률 => 가장 큰 확률의 index를 조회
print(pred_batch[0])
pred_batch[0].argmax() # 추론결과

 

# 추론 결과와 실제값 y_batch[i]의 값과 비교
y_batch[0] # 정답이 아니다. 틀림

 

 

torch.sum(pred_batch.argmax(dim=1) == y_batch) # 추론을 맞힌 개수

 

정확도 -> 추론을 맞힌 개수 / 256(batch_size -> 한번에 입력한 데이터의 수)

 

train

모델, loss function, optimizer 생성

# 모델을 device로 옮긴다. (모델을 이용한 계산을 CPU에서 할지 GPU에서 할지)
# 참고: device로 옮기는 것 => model, 모델에 넣을 input data, output data
# model = MnistModel()
model = model.to(device)

# loss function
# 다중분류문제 : crossentropy, 이진분류 문제 : binary crossentropy ==> log loss
# 다중분류: label이 여러개, 이진분류: yes/no 분류
loss_fn = nn.CrossEntropyLoss() #객체,  nn.functional.cross_entropy() 함수

# Optimizer ==> 모델 파라미터들 최적화 ==> 경사하강법을 구현
optimizer = torch.optim.Adam(model.parameters() # 최적화 대상 파라미터들
                            ,lr = LR # 학습률
                            )

 

# model.parameters() : generator
a = next(model.parameters())
a.shape
a

 

학습 및 검증

- epoch 횟수가 늘어날 때마다 Loss의 값은 줄고, Accuracy의 값은 늘어나는 것을 확인할 수 있다.

import time # 학습 시간 체크
## 학습 => train(훈련) + validation(1 epoch 학습한 모델성능 검증)
# 에폭(epoch)별 학습결과를 저장할 리스트들
train_loss_list = [] # train set으로 검증했을 때 loss (loss_fn계산값)
val_loss_list = []   # test set으로 검증했을 때 loss
val_accuracy_list = [] # test set으로 검증했을 때 accuracy(정확도)-전체중 맞은 비율
start = time.time()
# Train
for epoch in range(N_EPOCH):
    #################################
    # Train
    #################################
    model.train() # 모델을 train모드로 변경.
    train_loss = 0.0 # 현재 epoch의 학습 결과 loss를 저장할 변수.
    # 배치단위로 학습
    for X_train, y_train in train_loader:  # batch 단위 (input, output) 튜플로 반환.
        # 1. X, y를 device로 옮긴다. (model, X, y는 같은 device상에 위치해야한다.)
        X_train, y_train = X_train.to(device), y_train.to(device)
        # 2. 추론
        pred = model(X_train)
        # 3. Loss 계산
        loss = loss_fn(pred, y_train) # args 순서: (모델예측값,  정답)
        # 4. 모델의 파라미터 업데이트(최적화)
        ## 1. 파라미터의 gradient값들을 초기화
        optimizer.zero_grad()
        ## 2. gradient 계산 ===> 계산결과는 파라미터.grad 속성에 저장.
        loss.backward()
        ## 3. 파라미터(weight, bias) 업데이트 ( 파라미터 - 학습률*grad)
        optimizer.step()
        #### 현재 batch의 loss값을 train_loss변수에 누적
        train_loss += loss.item()  # Tensor -> 파이썬 값
    # 1 epoch학습 종료
    # epoch의 평균 loss를 계산해서 리스트에 저장. (train_loss: step별 loss를 누적)
    train_loss_list.append(train_loss / len(train_loader))  #step수 나눔.
    ########################################
    # validate(검증) - test(validation) set(학습할 때 사용하지 않았던 데이터셋)
    ########################################
    model.eval() # 모델을 검증(평가) 모드로 전환.
    ## 현재 epoch대한 검증결과(loss, accuracy)를 저장할 변수
    val_loss = 0.0
    val_acc = 0.0
    ### 모델 추정을 위한 연산 - forward propagation
    #### 검증/평가/서비스 -> gradient계산이 필요없다. => 도함수를 계산할 필요 없다.
    with torch.no_grad():
        ## batch 단위로 검증
        for X_val, y_val in test_loader:
            # 1. device로 옮기기
            X_val, y_val = X_val.to(device), y_val.to(device)
            # 2. 모델을 이용해 추론
            pred_val = model(X_val)
            # 3. 검증
            ## 1. loss 계산 + val_loss에 누적
            val_loss = val_loss + loss_fn(pred_val, y_val).item()
            ## 2. 정확도(accuarcy): 맞은것개수/전체개수
            val_acc = val_acc + torch.sum(pred_val.argmax(axis=-1) == y_val).item()
        # test set 전체에대한 검증이 완료 => 현 epoch 에 대한 검증 완료
        ## val_loss, val_acc 값을 리스트에 저장.
        val_loss_list.append(val_loss / len(test_loader))  # loss 는 step 수 나눔.
        val_accuracy_list.append(val_acc / len(test_loader.dataset) ) # 전체 데이터 개수로 나눈다.
    ## 현재 epoch train 결과를 출력
    print(f"[{epoch+1:2d}/{N_EPOCH:2d}] Train Loss: {train_loss_list[-1]},\
 Val Loss: {val_loss_list[-1]}, Val Accuracy: {val_accuracy_list[-1]}")
end = time.time()
print(f"학습에 걸린시간: {end-start}초")

 

학습 로그 시각화

- Train Loss는 epoch 횟수에 따라 계속 작아지는 반면, Validation Loss와 Validation accuracy는 작아지다가 특정 구간에서 커지게 된다.

    - 이를 과적합이라고 한다. 때문에 과적합 방지를 위해 적당한 epoch 횟수를 결정해야 한다.

# epoch별 loss, accuracy의 변화흐름을 시각화
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(range(N_EPOCH), train_loss_list, label="train")
plt.plot(range(N_EPOCH), val_loss_list, label="Validation")
plt.title("Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(range(N_EPOCH), val_accuracy_list)
plt.title("Validation accuracy")

plt.tight_layout()
plt.show()

 

학습된 모델 저장 및 불러오기

새로운 디렉토리를 만들어 저장할 경로를 지정한다.

save_path = os.path.join(MODEL_SAVE_PATH, "mnist")
os.makedirs(save_path, exist_ok=True)

save_file_path = os.path.join(save_path, "mnist_mlp.pth")
print(save_file_path)

 

저장

torch.save(model, save_file_path)

 

모델 불러오기 & 정보 확인

# 모델 불러오기
load_model = torch.load(save_file_path)
load_model

 

summary

from torchinfo import summary
summary(load_model, (BATCH_SIZE, 1, 28, 28))

 

모델 평가

# device로 옮기기
load_model = load_model.to(device)
# 평가모드로 변환
load_model.eval()

test_loss, test_acc = 0.0, 0.0
with torch.no_grad():
    for X, y in test_loader:
        # device 옮기기
        X, y = X.to(device), y.to(device)
        # 추정
        pred = load_model(X)
        # 평가 - loss, accuracy
        test_loss += loss_fn(pred, y).item()
        test_acc += torch.sum(pred.argmax(axis=-1) == y).item()
    test_loss /= len(test_loader) # step수로 나누기
    test_acc /= len(test_loader.dataset) # 총 데이터수로 나누기
print(f"Test loss: {test_loss}, Test accuracy: {test_acc}")

 

새로운 데이터 추론

import

from glob import glob
import cv2

 

새로운 데이터들을 리스트 변수로 선언

img_file_list = glob("test_img/num/*.png") # 확장자가 png인 모든 파일, *.* : 디렉토리 안의 모든 파일
img_file_list

 

test_img들을 분류할 수 있도록 resize 및 tensor로 변형하여 input_tensor에 저장

file_cnt = len(img_file_list)
print("개수:", file_cnt)
input_tensor = torch.zeros((file_cnt, 28, 28)) # 분류할 이미지를 저장할 변수.

for i in range(file_cnt):
    # image 읽기
    test_img = cv2.imread(img_file_list[i], cv2.IMREAD_GRAYSCALE)
    # 28 * 28 resize
    test_img = cv2.resize(test_img, (28, 28))
    # ndarray -> torch.Tensor + 정규화 (0~1) => input_tensor에 추가
    input_tensor[i] = transforms.ToTensor()(test_img)

 

print(input_tensor.shape)
# print(input_tensor.shape)
print(input_tensor.min(), input_tensor.max())

 

추론 및 확률로 변환(softmax)

# 추론
load_model.to(device)
load_model.eval()

input_tensor = input_tensor.to(device)

pred_new = load_model(input_tensor)
# 모델이 예측한 값을 확률로 변환 => softmax 함수
pred_proba = nn.Softmax(dim=-1)(pred_new)
print(pred_new.shape)
pred_new

 

pred_proba.argmax -> 확률중 가장 높은 확률의 값을 가져온다 -> 새로운 데이터들의 예측값들을 확인

pred_label = pred_proba.argmax(dim=-1)
pred_label

 

예측한 가장 높은 확률의 값의 확률

- 예측한 값이 맞을 확률이라고 생각하면 쉽다.

pred_label_proba = pred_proba.max(dim=-1).values
pred_label_proba

 

그래프로 표현 

plt.figure(figsize = (15, 7))
for i in range(file_cnt):
    plt.subplot(3, 4, i+1)
    plt.imshow(input_tensor[i].to("cpu").numpy(), cmap='gray')
    plt.title(f"{pred_label[i].item()}-{pred_label_proba[i].item():.2f}")

plt.tight_layout()
plt.show()