Python_Deeplearning_pytorch_03(딥러닝-MLP구현)
Pytorch 개발 Process
1. 데이터 준비
- Dataset 준비
- Dataloader 생성
2. 입력과 출력을 연결하는 Layer(층)으로 이뤄진 네트워크(모델)을 정의
- Sequential 방식: 순서대로 쌓아올린 네트워크로 이뤄진 모델을 생성하는 방식
- layer를 순서대로 쌓은 모델을 구현할때 간단히 모델을 정의할 수 있다.
- layer block을 정의하는데 사용할 수 있다. - Subclass 방식: 네트워크를 정의하는 클래스를 구현.
- 다양한 구조의 모델을 정의할 수 있다.
- inializer에서 필요한 layer들을 생성한다.
- forward(self, X) 메소드에 forward propagation 계산을 구현한다.
3. train(학습)
- train 함수, test 함수 정의
4. test set 최종평가
MNIST 이미지 분류
- [MNIST] (Modified National Institute of Standards and Technology) database
- 흑백 손글씨 숫자 0-9까지 10개의 범주로 구분해놓은 데이터셋
- 하나의 이미지는 28 * 28 pixel 의 크기
- 6만개의 Train 이미지와 1만개의 Test 이미지로 구성됨.
import
# torch: 공통 구현체
# torchvision: 영상처리를 위한 구현체
import torch
import torch.nn as nn # 딥러닝 모델을 구성하는 함수들의 모듈.
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
import os
print("파이토치 버전:", torch.__version__)
device 설정
# 처리를 cpu를 이용할지 gpu를 이용할지 설정
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)
변수 설정 (BATCH_SIZE, N_EPOCH, LR ....)
# 뒤에서 여러번 사용할 값들을 저장할 변수 +
# 모델에 설정할 값(모델자체, 학습할 때 필요한 값 등등)으로 성능에 영향을 주는 값들을 저장할 변수
# === 하이퍼 파라미터(Hyper Parameter)
BATCH_SIZE = 256 # 모델의 파라미터를 업데이트할 때 사용할 데이터의 개수. 한번에 몇개 데이터를 입력할 지
N_EPOCH = 20 # 1 epoch : 전체 train dataset을 한번 학습한 것
LR = 0.001 # 학습률. 파라미터 update할 때 gradient값에 곱해줄값.
# (gradient를 새로운 파라미터 계산할 때 얼마나 반영할지 비율)
# step : 파라미터를 1번 update하는 단위. 1step당 학습데이터수 : batch_size
# epoch : 전체 학습데이터셋을 한번 학습한 단위
# 1epoc당 step횟수 = ceil(총 데이터수/batch_size)
DATASET_SAVE_PATH = 'datasets' # 데이터셋을 저장할 디렉토리 경로
MODEL_SAVE_PATH = 'models' #
os.makedirs(DATASET_SAVE_PATH, exist_ok=True)
os.makedirs(DATASET_SAVE_PATH, exist_ok=True)
MNIST dataset Loading
Dataset
- Dataset: input/output data를 묶어서 관리.
- DataLoader : Dataset의 데이터들을 모델에 어떻게 제공할지를 관리.
훈련용(train_set), 테스트용(test_set) 데이터셋 가져오기
# MNIST Dataset을 다운로드 + Dataset 객체를 생성
# train dataset - 6만장
# torchvision.datasets
train_set = datasets.MNIST(root=DATASET_SAVE_PATH # 데이터셋을 저장할 디렉토리 경로
,train=True # trainset(학습용): True, testset(검증용): False
,download=True # root에 저장된 데이터파일들이 없을때 다운로드 받을지 여부
,transform=transforms.ToTensor() # 데이터 전처리.
)
# ToTensor(): ndarray, PIL.Image객체를 torch.Tensor로 변환. Pixcel값 정규화(normalize): 0 ~ 1 실수로 변환
test_set = datasets.MNIST(root = DATASET_SAVE_PATH
,train=False
,download=True
,transform=transforms.ToTensor()
)
train_set과 test_set 기본 정보
print(type(train_set), type(test_set)) # Dataset 타입
# Dataset 조회
print(train_set)
print(test_set)
# 데이터 포인트 개수
print(len(train_set), len(test_set))
DataLoader
# Dataset을 모델에 어떻게 제공할지를 설정. -> 학습/평가시 설정된대로 데이터를 loading
# 훈련용 DataLoader
train_loader = DataLoader(train_set # Dataset
,batch_size=BATCH_SIZE # batch_size를 설정
,shuffle=True # 한 epoch이 끝나면 다음 epoch전에 데이터를 섞을지 여부. (섞는다 -> True)
,drop_last=True # 마지막 batch의 데이터수가 batch_size보다 적을 경우 버릴지(학습에 사용안함 -> True) 여부
)
# 평가용 DataLoader
test_loader = DataLoader(test_set
,batch_size=BATCH_SIZE
)
데이터셋의 개수를 batch_size로 나눈 값 -> 1 epoch당 step 수
- trainset의 경우 : 60000/256
- testset의 경우 : 10000/256
print("1 epoch당 step 수")
print("trainset:", len(train_loader))
print("testset:", len(test_loader))
print(np.ceil(60000/256), np.ceil(10000/256))
네트워크(모델) 정의
- Network : 전체 모델 구조
# class로 정의: nn.Module을 상속해서 정의
class MnistModel(nn.Module):
def __init__(self):
"""
모델 객체 생성시 모델을 구현(정의)할 때 필요한 것들을 초기화.
필요한 것: Layer들. 등등
"""
super().__init__()
# 784(pixcel수) -> 128 개로 축소
self.lr1 = nn.Linear(28 * 28, 128) # input feature크기, output size
# 128 feature -> 64 축소
self.lr2 = nn.Linear(128, 64)
# 64 feature -> 출력결과 10 (각 범주의 확률)
self.lr3 = nn.Linear(64, 10)
# Activation(활성) 함수 -> 비선형함수 : ReLU
self.relu = nn.ReLU() # f(x) = max(x, 0)
def forward(self, x):
"""
input data를 입력 받아서 output 데이터를 만들때 까지의 계산 흐름을 정의
===> forward propagation
parameter
x : 입력데이터
return
torch.Tensor: 출력데이터(모델 예측결과.)
"""
# init에서 생성한 함수들을 이용해서 계산
# x -> 1차원으로 변환 -> lr1 -> relu -> lr2 -> relu -> lr3 -> output
# input (batch_size, channel, height, width) => (batch_size, 전체pixcel)
x = torch.flatten(x, start_dim=1) # (b, c, h, w) -> (b, c*h*w)
x = self.lr1(x)
x = self.relu(x)
x = self.lr2(x)
x = self.relu(x)
output = self.lr3(x)
return output
reshape : 배열의 형태 변환
flatten : axis 합치기
i = torch.arange(28 * 28).reshape(1, 1, 28, 28)
print(i.shape) # [1:데이터개수, 1:channel, 28:height, 28:width]
# -> 1차원 => 데이터개수는 유지 [1, 784]
# 다차원 Tensor => 1차원 => flatten([start_dim=axis]) # 지정한 axis부터 합친다.
print(torch.flatten(i).shape)
torch.flatten(i, start_dim=1).shape # axis 0은 유지하고, axis 1부터 나머지를 합친다.
모델 객체 생성
# 정의한 모델 클래스로부터 모델 객체를 생성
model = MnistModel()
print(model)
torchinfo 패키지 설치
# 모델의 연산 흐름 및 정보를 확인=> torchinfo 패키지를 사용
!pip install torchinfo
summary
from torchinfo import summary
summary(model, (100, 784)) # summary(모델객체, input shape: tuple)
첫번째 이미지(pred_batch[0])에 대한 추론해보기
# train dataset의 첫번째 배치를 이용해서 모델에 추론.
x_batch, y_batch = next(iter(train_loader)) # (input, output)
print(x_batch.shape, y_batch.shape)
# 추론
pred_batch = model(x_batch) # model의 forward() 메소드가 실행.
pred_batch.shape
결과값 비교
- 자신의 프로그램의 결과값을 비교해보기
# 첫번째 이미지에 대한 추론결과 -> class별 확률 => 가장 큰 확률의 index를 조회
print(pred_batch[0])
pred_batch[0].argmax() # 추론결과
# 추론 결과와 실제값 y_batch[i]의 값과 비교
y_batch[0] # 정답이 아니다. 틀림
torch.sum(pred_batch.argmax(dim=1) == y_batch) # 추론을 맞힌 개수
정확도 -> 추론을 맞힌 개수 / 256(batch_size -> 한번에 입력한 데이터의 수)
train
모델, loss function, optimizer 생성
# 모델을 device로 옮긴다. (모델을 이용한 계산을 CPU에서 할지 GPU에서 할지)
# 참고: device로 옮기는 것 => model, 모델에 넣을 input data, output data
# model = MnistModel()
model = model.to(device)
# loss function
# 다중분류문제 : crossentropy, 이진분류 문제 : binary crossentropy ==> log loss
# 다중분류: label이 여러개, 이진분류: yes/no 분류
loss_fn = nn.CrossEntropyLoss() #객체, nn.functional.cross_entropy() 함수
# Optimizer ==> 모델 파라미터들 최적화 ==> 경사하강법을 구현
optimizer = torch.optim.Adam(model.parameters() # 최적화 대상 파라미터들
,lr = LR # 학습률
)
# model.parameters() : generator
a = next(model.parameters())
a.shape
a
학습 및 검증
- epoch 횟수가 늘어날 때마다 Loss의 값은 줄고, Accuracy의 값은 늘어나는 것을 확인할 수 있다.
import time # 학습 시간 체크
## 학습 => train(훈련) + validation(1 epoch 학습한 모델성능 검증)
# 에폭(epoch)별 학습결과를 저장할 리스트들
train_loss_list = [] # train set으로 검증했을 때 loss (loss_fn계산값)
val_loss_list = [] # test set으로 검증했을 때 loss
val_accuracy_list = [] # test set으로 검증했을 때 accuracy(정확도)-전체중 맞은 비율
start = time.time()
# Train
for epoch in range(N_EPOCH):
#################################
# Train
#################################
model.train() # 모델을 train모드로 변경.
train_loss = 0.0 # 현재 epoch의 학습 결과 loss를 저장할 변수.
# 배치단위로 학습
for X_train, y_train in train_loader: # batch 단위 (input, output) 튜플로 반환.
# 1. X, y를 device로 옮긴다. (model, X, y는 같은 device상에 위치해야한다.)
X_train, y_train = X_train.to(device), y_train.to(device)
# 2. 추론
pred = model(X_train)
# 3. Loss 계산
loss = loss_fn(pred, y_train) # args 순서: (모델예측값, 정답)
# 4. 모델의 파라미터 업데이트(최적화)
## 1. 파라미터의 gradient값들을 초기화
optimizer.zero_grad()
## 2. gradient 계산 ===> 계산결과는 파라미터.grad 속성에 저장.
loss.backward()
## 3. 파라미터(weight, bias) 업데이트 ( 파라미터 - 학습률*grad)
optimizer.step()
#### 현재 batch의 loss값을 train_loss변수에 누적
train_loss += loss.item() # Tensor -> 파이썬 값
# 1 epoch학습 종료
# epoch의 평균 loss를 계산해서 리스트에 저장. (train_loss: step별 loss를 누적)
train_loss_list.append(train_loss / len(train_loader)) #step수 나눔.
########################################
# validate(검증) - test(validation) set(학습할 때 사용하지 않았던 데이터셋)
########################################
model.eval() # 모델을 검증(평가) 모드로 전환.
## 현재 epoch대한 검증결과(loss, accuracy)를 저장할 변수
val_loss = 0.0
val_acc = 0.0
### 모델 추정을 위한 연산 - forward propagation
#### 검증/평가/서비스 -> gradient계산이 필요없다. => 도함수를 계산할 필요 없다.
with torch.no_grad():
## batch 단위로 검증
for X_val, y_val in test_loader:
# 1. device로 옮기기
X_val, y_val = X_val.to(device), y_val.to(device)
# 2. 모델을 이용해 추론
pred_val = model(X_val)
# 3. 검증
## 1. loss 계산 + val_loss에 누적
val_loss = val_loss + loss_fn(pred_val, y_val).item()
## 2. 정확도(accuarcy): 맞은것개수/전체개수
val_acc = val_acc + torch.sum(pred_val.argmax(axis=-1) == y_val).item()
# test set 전체에대한 검증이 완료 => 현 epoch 에 대한 검증 완료
## val_loss, val_acc 값을 리스트에 저장.
val_loss_list.append(val_loss / len(test_loader)) # loss 는 step 수 나눔.
val_accuracy_list.append(val_acc / len(test_loader.dataset) ) # 전체 데이터 개수로 나눈다.
## 현재 epoch train 결과를 출력
print(f"[{epoch+1:2d}/{N_EPOCH:2d}] Train Loss: {train_loss_list[-1]},\
Val Loss: {val_loss_list[-1]}, Val Accuracy: {val_accuracy_list[-1]}")
end = time.time()
print(f"학습에 걸린시간: {end-start}초")
학습 로그 시각화
- Train Loss는 epoch 횟수에 따라 계속 작아지는 반면, Validation Loss와 Validation accuracy는 작아지다가 특정 구간에서 커지게 된다.
- 이를 과적합이라고 한다. 때문에 과적합 방지를 위해 적당한 epoch 횟수를 결정해야 한다.
# epoch별 loss, accuracy의 변화흐름을 시각화
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(range(N_EPOCH), train_loss_list, label="train")
plt.plot(range(N_EPOCH), val_loss_list, label="Validation")
plt.title("Loss")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(range(N_EPOCH), val_accuracy_list)
plt.title("Validation accuracy")
plt.tight_layout()
plt.show()
학습된 모델 저장 및 불러오기
새로운 디렉토리를 만들어 저장할 경로를 지정한다.
save_path = os.path.join(MODEL_SAVE_PATH, "mnist")
os.makedirs(save_path, exist_ok=True)
save_file_path = os.path.join(save_path, "mnist_mlp.pth")
print(save_file_path)
저장
torch.save(model, save_file_path)
모델 불러오기 & 정보 확인
# 모델 불러오기
load_model = torch.load(save_file_path)
load_model
summary
from torchinfo import summary
summary(load_model, (BATCH_SIZE, 1, 28, 28))
모델 평가
# device로 옮기기
load_model = load_model.to(device)
# 평가모드로 변환
load_model.eval()
test_loss, test_acc = 0.0, 0.0
with torch.no_grad():
for X, y in test_loader:
# device 옮기기
X, y = X.to(device), y.to(device)
# 추정
pred = load_model(X)
# 평가 - loss, accuracy
test_loss += loss_fn(pred, y).item()
test_acc += torch.sum(pred.argmax(axis=-1) == y).item()
test_loss /= len(test_loader) # step수로 나누기
test_acc /= len(test_loader.dataset) # 총 데이터수로 나누기
print(f"Test loss: {test_loss}, Test accuracy: {test_acc}")
새로운 데이터 추론
import
from glob import glob
import cv2
새로운 데이터들을 리스트 변수로 선언
img_file_list = glob("test_img/num/*.png") # 확장자가 png인 모든 파일, *.* : 디렉토리 안의 모든 파일
img_file_list
test_img들을 분류할 수 있도록 resize 및 tensor로 변형하여 input_tensor에 저장
file_cnt = len(img_file_list)
print("개수:", file_cnt)
input_tensor = torch.zeros((file_cnt, 28, 28)) # 분류할 이미지를 저장할 변수.
for i in range(file_cnt):
# image 읽기
test_img = cv2.imread(img_file_list[i], cv2.IMREAD_GRAYSCALE)
# 28 * 28 resize
test_img = cv2.resize(test_img, (28, 28))
# ndarray -> torch.Tensor + 정규화 (0~1) => input_tensor에 추가
input_tensor[i] = transforms.ToTensor()(test_img)
print(input_tensor.shape)
# print(input_tensor.shape)
print(input_tensor.min(), input_tensor.max())
추론 및 확률로 변환(softmax)
# 추론
load_model.to(device)
load_model.eval()
input_tensor = input_tensor.to(device)
pred_new = load_model(input_tensor)
# 모델이 예측한 값을 확률로 변환 => softmax 함수
pred_proba = nn.Softmax(dim=-1)(pred_new)
print(pred_new.shape)
pred_new
pred_proba.argmax -> 확률중 가장 높은 확률의 값을 가져온다 -> 새로운 데이터들의 예측값들을 확인
pred_label = pred_proba.argmax(dim=-1)
pred_label
예측한 가장 높은 확률의 값의 확률
- 예측한 값이 맞을 확률이라고 생각하면 쉽다.
pred_label_proba = pred_proba.max(dim=-1).values
pred_label_proba
그래프로 표현
plt.figure(figsize = (15, 7))
for i in range(file_cnt):
plt.subplot(3, 4, i+1)
plt.imshow(input_tensor[i].to("cpu").numpy(), cmap='gray')
plt.title(f"{pred_label[i].item()}-{pred_label_proba[i].item():.2f}")
plt.tight_layout()
plt.show()
