본문 바로가기

Data_Analysis_Track_33/Python

Python_Deeplearning_pytorch_11(LSTM을 활용한 주가예측)

삼성전자 주가 데이터 다운로드

  • Yahoo Finance 에서 주가 데이터 다운로드
        - 검색 키워드 '005930.KS' 입력
  • 검색 후 Historical Data 선택
  • Start Date: 2000년 1월 4일 End Date: 오늘날짜 선택
  • Apply 클릭 후 다운로드

 

import

import os
import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torchinfo

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split  

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

 

 

Data loading

- 다운 받은 주가 데이터 csv파일 읽어오기

# Data loading + EDA
df = pd.read_csv("datasets/005930.KS.csv")
df.shape

 

 

시계열 데이터

- Date -> datetime 타입으로 변경 -> index 전환.

# Date -> datetime 타입으로 변경 -> index 전환.
## 시계열 데이터는 (행) index로 일시를 가지도록 구성.
df['Date'] = pd.to_datetime(df['Date'])
df.set_index("Date", inplace=True)

 

 

결측치 확인 -> 없다

# 결측치 확인
df.isnull().sum()

 

 

시가, 종가의 흐름을 선그래프로 보기

# 시가, 종가의 흐름을 선그래프로 보기
df[['Open', 'Close']].plot(figsize=(20, 5), alpha=0.5);

df[['Open', 'Close']][:50].plot(figsize=(20, 5), alpha=0.5, marker='.');

 

 

X와 y를 만들기

  • X (input) feature 구성: Open, High, Low, Close, Adj Close, Volumn
  • y (output)

 

- output = df['Close']

- input = 나머지 columns

y_df = df['Close'].to_frame() # (총데이터수, 1)
X_df = df
X_df.shape, y_df.shape

 

 

전처리

  • feature scling - featire 간의 scaling(단위)를 맞추는 작업.
  • X: Standard Scling (평균: 0, 표준편차= 1)
  • y: MinMax Scaling (최소, 최대) => X의 scale과 비슷한 값으로 변환.

 

# 객체생성 -> fit() -> transform()
X_scaler = StandardScaler()
y_scaler = MinMaxScaler()

X = X_scaler.fit_transform(X_df)
y = y_scaler.fit_transform(y_df)

print(type(X), type(y))
X.shape, y.shape

 

 

Sequential Data  구성

 

  • X: 50일치 데이터(ex:1일 ~ 50일), y: 51일째 주가. (ex: 51일)
        - 50일의 연속된 주식데이터를 학습하여 51일째 주가를 예측한다.
        - X의 한개의 데이터가 50일치 주가데이터가 된다.

연속된 날짜가 5인 경우

 

timestep = 50 # seq length
data_X = [] # X 데이터를 모을 리스트. X: (50, 6)
data_y = [] # y 값을 모을 리스트
for i in range(0, y.size - timestep): # 총 데이터 개수 - seq_length: 이 이후 반복시에는 남은 데이터가 51개가 안되서 데이터 구성이 안된다.
    # X: 0 ~ 50-1, y: 50 (1씩 증가)
    _X = X[i:i+timestep]
    _y = y[i+timestep]
    data_X.append(_X)
    data_y.append(_y)

 

 

Train / test set 분리

X_train, X_test, y_train, y_test = train_test_split(data_X, data_y, test_size = 0.2)
np.shape(X_train), np.shape(X_test)

 

 

Dataset, DataLoader 구성

 

Tensor 타입으로 변경

# Tensor 변환: 
# X_train: List -> ndarray -> torch.Tensor 타입으로 변경
X_train_tensor = torch.tensor(np.array(X_train), dtype=torch.float32)
X_test_tensor = torch.tensor(np.array(X_test), dtype=torch.float32)
y_train_tensor = torch.tensor(np.array(y_train), dtype=torch.float32)
y_test_tensor = torch.tensor(np.array(y_test), dtype=torch.float32)

 

 

Dataset 생성

# Dataset 생성 -> raw 데이터: 메모리상의 Tensor ===> TensorDataset
trainset = TensorDataset(X_train_tensor, y_train_tensor)
testset = TensorDataset(X_test_tensor, y_test_tensor)

print('데이터개수: ', len(trainset), len(testset))

 

 

Dataloader 생성

# DataLoader 생성
trainloader = DataLoader(trainset, batch_size=200, shuffle=True, drop_last=True)
testloader = DataLoader(testset, batch_size=200)

print("에폭당 step수:", len(trainloader), len(testloader))

 

 

- 1 step input: (seq, batch, feature) -> (50, 200, 6)

 

LSTM layer 생성후 추론

# LSTM layer 생성후 추론
a, b = next(iter(trainloader)) # 배치 (X, y)
a.shape, b.shape, a.dtype

 

 

lstm = nn.LSTM(input_size=6
               ,hidden_size=10
               # ,batch_first=True # (batch, seq, feature)
              ) 
a = a.permute(1, 0, 2)
o, (h, c) = lstm(a)
o.shape, h.shape, c.shape

 

 

모델 정의

 

class StockLSTM(nn.Module):

    def __init__(self, input_size:"feature수", hidden_size:"hidden(유닛)수", num_layers:"레이어개수"=1, bidirectional:"양방향여부"=False):
        super().__init__()

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, bidirectional=bidirectional)

        D = 2 if bidirectional else 1
        self.output = nn.Linear(in_features= D * num_layers * hidden_size
                                ,out_features=1 # 출력
                               ) 
    
    def forward(self, X):
        # LSTM(특성을 추출) --hidden state--> Linear -> 예측결과
        # X: (batch, seq, feature) -> (seq, batch, feature)
        X = X.permute(1, 0, 2)
        out, (hidden_state, cell_state) = self.lstm(X) # hidden state (seq, batch, hidden)
        hidden_state = hidden_state.permute(1, 0, 2)
        hidden_state = nn.Flatten()(hidden_state)
        pred = self.output(hidden_state) # W@X + b
        # nn.Sigmoid()(pred) # 0 ~ 1 맞춰줌
        return pred

 

 

모델 생성

model = StockLSTM(input_size=6, hidden_size=50)

 

 

summary

torchinfo.summary(model, (200, 50, 6))

 

 

Train

하이퍼파라미터 선언

# 하이퍼파라미터
N_EPOCH = 1000
LR = 0.0001

INPUT_SIZE = 6
HIDDEN_SIZE = 30
NUM_LAYERS = 1
BIDIRECTIONAL = False

 

 

 

model = StockLSTM(INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, BIDIRECTIONAL)
model = model.to(device)

# loss fn = 회귀: MSE
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

 

 

Train 코드

train_loss_list = []
valid_loss_list = []
for epoch in range(N_EPOCH):

    model.train()
    train_loss = 0.0
    for X, y in trainloader:
        # 1. device로 이동
        X, y = X.to(device), y.to(device)
        # 2. 추정
        pred = model(X)
        # 3. Loss 계산
        loss = loss_fn(pred, y)
        # 4. grad 계산
        loss.backward()
        # 5. layer의 파라미터들 update
        optimizer.step()
        # 6. gradient 초기화
        optimizer.zero_grad()
        train_loss += loss.item()
    train_loss /= len(trainloader) # train_loss 평균 계산
    train_loss_list.append(train_loss)

    # 검증
    model.eval()
    valid_loss = 0.0
    for X_valid, y_valid in testloader:
        # 1. device 이동
        X_valid, y_valid = X_valid.to(device), y_valid.to(device)
        with torch.no_grad():
            # 2. 추정
            pred_valid = model(X_valid)
            # 3. loss 계산
            valid_loss += loss_fn(pred_valid, y_valid).item()
    valid_loss /= len(testloader) # valid_loss의 평균
    valid_loss_list.append(valid_loss)
    if epoch % 100 == 0 or epoch == (N_EPOCH-1):
        print(f"[{epoch+1}/{N_EPOCH}] train loss: {train_loss}, valid loss: {valid_loss}")

 

 

1000 epoch 실행결과

valid loss: 3.870847588890077e-05

- 전처리되어 있는 y의 값을 역으로 변환(inverse_transform) -> array([[2733.416049]])

- 2700원 정도의 오차

y_scaler.inverse_transform([[0.0000387]])

 

 

 

new_X = torch.tensor(np.expand_dims(data_X[-1], axis=0), dtype=torch.float32)
new_X.shape

 

 

예측 결과(pred_new)를 모델에 넣어 알아보기

- 실행 결과 -> tensor([[0.7285]]

pred_new = model(new_X.to(device))
pred_new

 

 

전처리되어 있는 y의 값을 역으로 변환(inverse_transform)

- 예측 가격 출력 -> array([[67036.93]]

y_scaler.inverse_transform(pred_new.detach().numpy())
# 예측 결과가 출력된다