diff --git a/aiServer/stock/ai_model.py b/aiServer/stock/ai_model.py index 2866a31..7144ef4 100644 --- a/aiServer/stock/ai_model.py +++ b/aiServer/stock/ai_model.py @@ -1,5 +1,8 @@ +import datetime +import math +import pickle from typing import List, Dict, Any -from datetime import datetime + import FinanceDataReader as fdr import joblib import numpy as np @@ -8,63 +11,51 @@ from bs4 import BeautifulSoup from sklearn.linear_model import LinearRegression from sklearn.model_selection import train_test_split -from .models import NewsData -import pickle -from .models import StockModelInfo +from common.exceptions import BadRequest, InternalServerError +from .models import NewsData, StockModelInfo # NewsData, StockModelInfo 모델 임포트 + + + +# Output 클래스는 주식 주문 결과를 나타내는 데이터 구조를 제공합니다. class Output: def __init__(self, product_number, name, quantity): self.product_number = product_number self.name = name self.quantity = quantity - def to_dict(self): + def to_dict(self) -> Dict[str, Any]: return { 'productNumber': self.product_number, 'name': self.name, - "quantity": self.quantity, + 'quantity': self.quantity, } -open_dif_data_list = [] -newslabel_match_openchange = [] -predicted_results = [] -stock_orders = [] - - +# 총 자본금(amount)과 주식 목록(stocks)을 기반으로 주문 비율을 계산합니다. def get_stock_order_ratio(amount: int, stocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - for stock in stocks: - stock_code = stock['productNumber'] - today_price = get_today_open_price(stock_code) - stock['today_price'] = today_price - - - open_dif_data_list = prepare_stock_data(stocks) - print("open_dif_data_list" + str(open_dif_data_list)) - - # 모델 만드는 코드 - for i in range(len(stocks)): - make_model(next((item[1] for item in open_dif_data_list if item[0] == stocks[i]['name']), None), stocks[i]['name']) - - newslabel_match_openchange = predict_add_news_label(open_dif_data_list) - print("newslabel_match_openchange=" + str(newslabel_match_openchange)) + try: + for stock in stocks: + stock_code = stock['productNumber'] + today_price = get_today_open_price(stock_code) + if today_price is None: + raise BadRequest(f"오늘의 시가 데이터를 찾을 수 없습니다: {stock_code}") + stock['today_price'] = today_price - predicted_results = predict_result(newslabel_match_openchange, open_dif_data_list, stocks) - print("predicted_result=" + str(predicted_results)) - stock_orders = calculate_stock_amounts(predicted_results, amount, stocks) - print("stock_orders=" + str(stock_orders)) + open_dif_data_list = prepare_stock_data(stocks) + newslabel_match_openchange = predict_add_news_label(open_dif_data_list) + predicted_results = predict_result(newslabel_match_openchange, open_dif_data_list, stocks) + stock_orders = calculate_stock_amounts(predicted_results, amount, stocks) - outputs = [] - for order in stock_orders: - output = Output(order['productNumber'], order['name'], order['quantity']) - outputs.append(output.to_dict()) + return [Output(order['productNumber'], order['name'], order['quantity']).to_dict() for order in stock_orders] - return outputs + except Exception as e: + raise InternalServerError(f"주문 비율 계산 중 오류 발생: {str(e)}") +# 주식 목록을 기반으로 오픈 데이터 준비 def prepare_stock_data(stocks: List[Dict[str, Any]]) -> List[tuple]: - start = (2000, 1, 1) - start = datetime.datetime(*start) + start = datetime.datetime(2000, 1, 1) end = datetime.date.today() open_dif_data_list = [] @@ -77,278 +68,120 @@ def prepare_stock_data(stocks: List[Dict[str, Any]]) -> List[tuple]: return open_dif_data_list +# 주어진 데이터프레임에서 'Open' 가격의 변화량을 계산합니다. def calculate_open_diff(df: pd.DataFrame) -> pd.DataFrame: - data = df['Open'][df['Volume'] != 0] - data = data.to_frame() - open_dif = [] - - for i in range(len(data)): - if i == 0: - open_dif.append(0) - else: - open_dif.append(data['Open'].iloc[i] - data['Open'].iloc[i - 1]) - data['Change'] = open_dif - + data = df[['Open']].loc[df['Volume'] != 0] + data['Change'] = data['Open'].diff().fillna(0) return data -def scrape_news_titles_and_dates(): - title_list = [] - date_list = [] - url_base = 'https://www.sedaily.com/NewsList/GD05' - - # Get existing data from the database - existing_news = NewsData.objects.all() - existing_titles_dates = set((news.title, news.date) for news in existing_news) - - for i in range(1, 5): - url = url_base if i == 1 else f'{url_base}/New/{i}' - response = requests.get(url) - response.raise_for_status() - soup = BeautifulSoup(response.content, 'html.parser') - - titles = soup.select('.article_tit') - dates = soup.select('.date') - - for title in titles: - if title not in existing_titles_dates: - title_list.append(title.get_text()) - for date in dates: - if date not in existing_titles_dates: - date = datetime.strptime(date.get_text(), '%Y.%m.%d').date() - date_list.append(date) - - for i in range(min(len(title_list), len(date_list))): - # 기존 데이터 확인 - if not NewsData.objects.filter(title=title_list[i], date=date_list[i]).exists(): - # 데이터가 없을 때만 저장 - news_data = NewsData(title=title_list[i], date=date_list[i]) - news_data.save() - else: - print(f"Duplicate entry found for title: {title_list[i]}, date: {date_list[i]}. Skipping insertion.") - - return title_list, date_list - -# news_url_base = 'https://www.sedaily.com/NewsList/GD05' - -# title_list, date_list = scrape_news_titles_and_dates(news_url_base, 100) -# makemodel_title_list, makemodel_date_list = scrape_news_titles_and_dates(news_url_base, 200) - - +# 주식 데이터에 뉴스 레이블을 추가합니다. def predict_add_news_label(open_dif_data_list: List[tuple]) -> Dict[str, pd.DataFrame]: - newslabel_match_openchange = {} - for company_name, data in open_dif_data_list: - labeled_data = add_news_label(data, company_name) - newslabel_match_openchange[company_name] = labeled_data - - return newslabel_match_openchange + return {company_name: add_news_label(data, company_name) for company_name, data in open_dif_data_list} +# 뉴스 데이터를 기반으로 주식 데이터에 레이블을 추가합니다. def add_news_label(data: pd.DataFrame, name: str) -> pd.DataFrame: - today_news_title = [] - today_date_list = [] - - scrape_news_titles_and_dates() - - # 데이터베이스의 데이터 중 절반만 쿼리 - half_data = NewsData.objects.all()[:NewsData.objects.count() // 2] - - # 절반 데이터를 출력하거나 처리 - for data in half_data: - # print(data.title, data.date) - if name in data.title: - today_news_title.append(data.title) - today_date_list.append(data.date) + try: + today_news_data = NewsData.objects.filter(title__contains=name) + today_news_title_date = pd.DataFrame(list(today_news_data.values('title', 'date'))) + today_news_title_date.rename(columns={'date': 'Date'}, inplace=True) - today_news_title_date = pd.DataFrame({'title': today_news_title, 'Date': today_date_list}) - today_news_title_date['Date'] = pd.to_datetime(today_news_title_date['Date']) + if today_news_title_date.empty: + data['title_label'] = 0 + return data - if len(today_news_title) == 0: - today_news_title_date['title_label'] = 0 - newslabel_match_openchange = pd.merge(today_news_title_date, data, on='Date') - return newslabel_match_openchange + SA_lr_best = joblib.load('./static/SA_lr_best.pkl') + tfidf = joblib.load('./static/tfidf.pkl') + today_data_title_tfidf = tfidf.transform(today_news_title_date['title']) + today_data_title_predict = SA_lr_best.predict(today_data_title_tfidf) + today_news_title_date['title_label'] = today_data_title_predict - SA_lr_best = joblib.load('./static/SA_lr_best.pkl') - tfidf = joblib.load('./static/tfidf.pkl') - today_data_title_tfidf = tfidf.transform(today_news_title_date['title']) - today_data_title_predict = SA_lr_best.predict(today_data_title_tfidf) - today_news_title_date['title_label'] = today_data_title_predict + return pd.merge(today_news_title_date[['Date', 'title_label']], data, on='Date', how='right') - newslabel_match_openchange = pd.merge(today_news_title_date, data, on='Date') - print(newslabel_match_openchange) - return newslabel_match_openchange + except Exception as e: + raise InternalServerError(f"뉴스 레이블 추가 중 오류 발생: {str(e)}") +# 주어진 데이터에 대해 선형 회귀 모델을 만듭니다. def make_model(data: pd.DataFrame, name: str) -> LinearRegression: - today_news_title = [] - today_date_list = [] - - all_news_data = NewsData.objects.all() - - for data in all_news_data: - if name in data.title: - today_news_title.append(data.title) - today_date_list.append(data.date) - - today_news_title_date = pd.DataFrame({'title': today_news_title, 'Date': today_date_list}) - today_news_title_date['Date'] = pd.to_datetime(today_news_title_date['Date']) - - if len(today_news_title) == 0: - tomorrow_stock = joblib.load('./static/tomorrow_stock.pkl') - return tomorrow_stock - - SA_lr_best = joblib.load('./static/SA_lr_best.pkl') - tfidf = joblib.load('./static/tfidf.pkl') - today_data_title_tfidf = tfidf.transform(today_news_title_date['title']) - today_data_title_predict = SA_lr_best.predict(today_data_title_tfidf) - today_news_title_date['title_label'] = today_data_title_predict - - newslabel_match_openchange = pd.merge(today_news_title_date, data, on='Date') - sentiments = newslabel_match_openchange['title_label'] - weights = np.random.rand(len(newslabel_match_openchange)) - - if sum(weights) == 0: - weights[0] = weights[0] + 0.5104 - weighted_avg = calculate_weighted_average(sentiments, weights) - - label_dif = list() - - for i in range(len(newslabel_match_openchange)): - label_dif.append([newslabel_match_openchange['title_label'][i], newslabel_match_openchange['Change'][i]]) - - ylist = list() - - for i in range(len(newslabel_match_openchange)): - ylist.append(newslabel_match_openchange['Open'][i]) - - X = np.array(label_dif) - y = np.array(ylist) - - if len(X) <= 1: - tomorrow_stock = joblib.load('./static/tomorrow_stock.pkl') - return tomorrow_stock - - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) - - model = LinearRegression() - model.fit(X_train, y_train) - - if StockModelInfo.objects.filter(stock_name=name).exists(): - print(f"Model for stock {name} already exists. Skipping save.") - return - - # 데이터베이스에 모델 저장 - model_file_path = f'models/{name}_model.pkl' - - # Save the model to a .pkl file - with open(model_file_path, 'wb') as f: - pickle.dump(model, f) + try: + if data is None or len(data) <= 1: + raise ValueError(f"Insufficient data for training model for {name}") - # Save the model information to the database - StockModelInfo.objects.update_or_create( - stock_name=name, - defaults={'model_file_path': model_file_path} - ) + X = data[['title_label', 'Change']].values + y = data['Open'].values + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) -def load_stock_model(name: str): - try: - # Retrieve the model information from the database - stock_model_info = StockModelInfo.objects.get(stock_name=name) - model_file_path = stock_model_info.model_file_path + model = LinearRegression() + model.fit(X_train, y_train) - # Load the model from the file - with open(model_file_path, 'rb') as f: - model = pickle.load(f) + # 모델을 DB에 저장 + model_data = pickle.dumps(model) + StockModelInfo.objects.update_or_create(stock_name=name, defaults={'model_data': model_data}) return model - except StockModelInfo.DoesNotExist: - print(f"No model found for stock: {name}") - return None + except Exception as e: + raise InternalServerError(f"모델 생성 중 오류 발생: {str(e)}") -def predict_result(newslabel_match_openchange: Dict[str, pd.DataFrame], open_dif_data_list: List[tuple], stocks) -> Dict[str, Dict[str, float]]: + +# 주어진 데이터에 대해 예측 결과를 계산합니다. +def predict_result(newslabel_match_openchange: Dict[str, pd.DataFrame], open_dif_data_list: List[tuple], stocks) -> \ +Dict[str, Dict[str, float]]: predicted_stock_openprice = {} - stock_code = '' for company_name, data in newslabel_match_openchange.items(): - # open_dif_data_list에서 company_name에 맞는 데이터를 찾아서 사용 - for stock in stocks: - if company_name == stock['name']: - stock_code = stock['productNumber'] - - raw_data = next((item[1] for item in open_dif_data_list if item[0] == company_name), None) - - if raw_data is None: - print(f"Warning: Data for {company_name} not found in open_dif_data_list") - continue + stock_code = next((stock['productNumber'] for stock in stocks if stock['name'] == company_name), None) + if not stock_code: + raise BadRequest(f"Stock code for {company_name} not found.") try: - model = load_stock_model(company_name) - predicted_price = predicted_tomorrow_openprice(data, company_name, model) - today_price = get_today_open_price(stock_code) # 오늘의 시가 가져오기 추가함 - if predicted_price != 0: + model = make_model(data, company_name) + predicted_price = predicted_tomorrow_openprice(data, model) + today_price = get_today_open_price(stock_code) + if today_price is not None: predicted_stock_openprice[company_name] = { "predicted_price": predicted_price, "today_price": today_price } - except (IndexError, KeyError) as e: - print(f"Warning: Skipping {company_name} due to error: {e}") + except Exception as e: + raise InternalServerError(f"{company_name} 예측 중 오류 발생: {str(e)}") return predicted_stock_openprice -def predicted_tomorrow_openprice(data: pd.DataFrame, name: str, model: LinearRegression) -> int: - if np.isnan(data['title_label'].iloc[0]): +# 주어진 모델을 사용하여 다음 날의 주가를 예측합니다. +def predicted_tomorrow_openprice(data: pd.DataFrame, model: LinearRegression) -> int: + if data['title_label'].isna().any(): return 0 - weight_seed = len(data['title_label']) - sentiments = data['title_label'] - weights = np.random.rand(weight_seed) - - if sum(weights) == 0: - weights[0] = weights[0] + 0.5104 - weighted_avg = calculate_weighted_average(sentiments, weights) - input_data = np.array([weighted_avg, weights if np.isscalar(weights) else weights[0]]).reshape(1, -1) - predicted_price = model.predict(input_data) + X_last = data[['title_label', 'Change']].values[-1].reshape(1, -1) + predicted_price = model.predict(X_last) return int(predicted_price[0]) -def calculate_weighted_average(sentiments: np.ndarray, weights: np.ndarray) -> float: - if len(sentiments) != len(weights): - raise ValueError("감정 점수 리스트와 가중치 리스트의 길이가 일치해야 합니다.") - - weighted_sum = sum(s * w for s, w in zip(sentiments, weights)) - total_weight = sum(weights) - - if total_weight == 0: - raise ValueError("총 가중치의 합이 0이 될 수 없습니다.") - - return weighted_sum / total_weight - - -def get_today_open_price(stock_code: str) -> float: - today = datetime.datetime.today().strftime('%Y-%m-%d') - df = fdr.DataReader(stock_code, today, today) - - if not df.empty: - return df['Open'].iloc[0] - else: - print(f"오늘의 시가 데이터를 찾을 수 없습니다: {stock_code}") - return None - - -def calculate_stock_amounts(predicted_results: Dict[str, Dict[str, float]], amount: float, stocks) -> List[Dict[str, Any]]: +# 주어진 예측 결과를 기반으로 주식 주문량을 계산합니다. +def calculate_stock_amounts(predicted_results: Dict[str, Dict[str, float]], amount: float, + stocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: stock_orders = [] - stock_code = '' + + # 상승 비율 계산 및 필터링 (0 이하 제거) for company_name, prices in predicted_results.items(): predicted_price = prices['predicted_price'] today_price = prices['today_price'] + if today_price == 0: + print(f"Skipping {company_name} due to today_price being 0.") + continue + + # 상승률 계산 increase_decrease_rate = (predicted_price - today_price) / today_price * 100 + # 상승률이 0 이하인 경우는 제외 if increase_decrease_rate > 0: stock_order = { 'name': company_name, @@ -356,30 +189,82 @@ def calculate_stock_amounts(predicted_results: Dict[str, Dict[str, float]], amou } stock_orders.append(stock_order) + # 상승률로 정렬 (내림차순) stock_orders.sort(key=lambda x: x['increase_decrease_rate'], reverse=True) total_increase_decrease_rate = sum(order['increase_decrease_rate'] for order in stock_orders) + # 상승률이 양수인 주식이 없는 경우 처리 + if total_increase_decrease_rate == 0: + print("No stocks with a positive increase rate. Check model predictions and data.") + return [] + for order in stock_orders: company_name = order['name'] rate = order['increase_decrease_rate'] / total_increase_decrease_rate allocated_amount = amount * rate - quantity = allocated_amount // predicted_results[company_name]['today_price'] + today_price = predicted_results[company_name]['today_price'] - for stock in stocks: - if stock['name'] == company_name: - stock_code = stock['productNumber'] + # 주식 수량 계산 (할당된 금액 기준) + quantity = math.floor(allocated_amount / today_price) + stock_code = next(stock['productNumber'] for stock in stocks if stock['name'] == company_name) - # order['rate'] = rate * 100 - # order['stock_amount'] = allocated_amount order['quantity'] = int(quantity) order['productNumber'] = stock_code - order['name'] = company_name + + # 디버깅 정보 출력 + print( + f"Order for {company_name}: Quantity: {order['quantity']}, Allocated Amount: {allocated_amount:.2f}, Rate: {rate:.2f}") return stock_orders -def get_stocklist_top50(predicted_results: Dict[str, Dict[str, float]], stocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - sorted_stock_orders = calculate_stock_amounts(predicted_results, 0, stocks) - top_50_stocks = sorted_stock_orders[:50] +# 주어진 주식 코드에 대한 오늘의 시가를 반환합니다. +def get_today_open_price(stock_code: str) -> float: + today = datetime.date.today().strftime('%Y-%m-%d') + df = fdr.DataReader(stock_code, today, today) - return top_50_stocks \ No newline at end of file + if not df.empty: + return df['Open'].iloc[0] + else: + raise BadRequest(f"오늘의 시가 데이터를 찾을 수 없습니다: {stock_code}") + + +# 감정 점수와 가중치를 기반으로 가중 평균을 계산합니다. +def calculate_weighted_average(sentiments: np.ndarray, weights: np.ndarray) -> float: + if len(sentiments) != len(weights): + raise ValueError("감정 점수 리스트와 가중치 리스트의 길이가 일치해야 합니다.") + + weighted_sum = sum(s * w for s, w in zip(sentiments, weights)) + total_weight = sum(weights) + + if total_weight == 0: + raise ValueError("총 가중치의 합이 0이 될 수 없습니다.") + + return weighted_sum / total_weight + + +# 주어진 URL 베이스와 페이지 수를 기반으로 뉴스를 크롤링하고 DB에 저장하는 함수 +def crawl_and_store_news_data(url_base: str, page_count: int) -> (List[str], List[str]): + title_list = [] + date_list = [] + + for i in range(1, page_count + 1): + url = url_base if i == 1 else f'{url_base}/New/{i}' + response = requests.get(url) + response.raise_for_status() + soup = BeautifulSoup(response.content, 'html.parser') + + titles = soup.select('.article_tit') + rel_times = soup.select('.rel_time') + + for title, date in zip(titles, rel_times): + title_text = title.get_text().strip() + date_text = datetime.datetime.strptime(date.get_text().strip(), '%Y.%m.%d').date() + + # 중복 검사 후 DB에 저장 + if not NewsData.objects.filter(title=title_text, date=date_text).exists(): + NewsData.objects.create(title=title_text, date=date_text) + title_list.append(title_text) + date_list.append(date_text) + + return title_list, date_list diff --git a/aiServer/stock/models.py b/aiServer/stock/models.py index f2d0104..61590a8 100644 --- a/aiServer/stock/models.py +++ b/aiServer/stock/models.py @@ -1,19 +1,18 @@ from django.db import models -# Create your models here. class NewsData(models.Model): title = models.CharField(max_length=255) date = models.DateField() - + class Meta: unique_together = ('title', 'date') class StockModelInfo(models.Model): stock_name = models.CharField(max_length=100, unique=True) - model_file_path = models.CharField(max_length=255) + model_file_path = models.CharField(max_length=255, default='') # 기본값 추가 created_at = models.DateTimeField(auto_now_add=True) def __str__(self): - return self.stock_name \ No newline at end of file + return self.stock_name diff --git a/aiServer/stock/views.py b/aiServer/stock/views.py index eed0812..66d558e 100644 --- a/aiServer/stock/views.py +++ b/aiServer/stock/views.py @@ -1,7 +1,6 @@ from django.http import JsonResponse from rest_framework.decorators import api_view - -from stock.ai_model import Output, get_stock_order_ratio +from stock.ai_model import Output, get_stock_order_ratio, crawl_and_store_news_data @api_view(['GET']) @@ -17,22 +16,18 @@ def stock_evaluate(request) -> JsonResponse: response = get_stock_order_ratio(amount, stocks) - # output1 = Output( - # product_number="005930", # 회사코드 - # name="삼성전자", # 회사 이름 - # quantity=1 # 몇 주 구매할지 나타남 - # ) - # - # output2 = Output( - # product_number="066570", # 회사코드 - # name="LG전자", # 회사 이름 - # quantity=1 # 몇 주 구매할지 나타남 - # ) - # - # response = [] - # response.append(output1.to_dict()) - # response.append(output2.to_dict()) - # - # print(response) + print(response) return JsonResponse({"stocks": response}, safe=True) + + +@api_view(['GET']) +def crawl_news(request) -> JsonResponse: + """ + 뉴스 크롤링을 수행하고 결과를 DB에 저장하는 API 엔드포인트 + """ + news_url_base = 'https://www.sedaily.com/NewsList/GD05' + page_count = 200 # 크롤링할 페이지 수 + title_list, date_list = crawl_and_store_news_data(news_url_base, page_count) + + return JsonResponse({"message": "News data crawled and stored successfully.", "titles": title_list}, safe=True)