멀티프로세싱
DB CAFE
thumb_up 추천메뉴 바로가기
- DBA { Oracle DBA 명령어 > DBA 초급 과정 > DBA 고급 과정 }
- 튜닝 { 오라클 튜닝 목록 }
- 모델링 { 데이터 모델링 가이드 }
3 예제[편집]
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sqlite3
import multiprocessing as mp
import time
class get_pattern():
def __init__(self, stock_code_list):
self.db = sqlite3.connect("stock_price(day).db")
self.stock_code_list = stock_code_list
### Query data
def query_data(self, code):
## Pass ETN
if code[0] == 'Q':
pass
cursor = self.db.cursor()
data = cursor.execute("SELECT * FROM %s WHERE date > 20170101" %code)
cols = [column[0] for column in data.description]
query_data = pd.DataFrame.from_records(data=data.fetchall(), columns=cols)
return query_data
### Search Pattern
def get_pattern(self, queried_data):
queried_data['close_D-1'] = queried_data['close'].shift(1) * 0.99
queried_data['Pattern_Today'] = (queried_data['low'] < queried_data['close_D-1']) & (queried_data['volume'] * queried_data['close'] > 10000000000)
queried_data['Profit'] = (queried_data['close'] / queried_data['close_D-1'] - 1) * 100
return queried_data['Profit'][queried_data['Pattern_Today'] == True]
## run
def run(self):
filtered_profit = pd.Series()
for code in self.stock_code_list:
history_data = self.query_data(code)
filtered_data = self.get_pattern(history_data)
filtered_profit = pd.concat([filtered_profit, filtered_data])
return filtered_profit
# Multiprocessing
def multi_run(code_list):
get_p = get_pattern(code_list)
profit = get_p.run()
return profit
# parallelize dataframe
def parallelize_dataframe(df, func, cores):
list_split = np.array_split(df, cores)
pool = mp.Pool(num_cores) ## pool 객체 생성
parallelized_df = pd.concat(pool.map(func, list_split)) ## 멀티프로세싱 후 데이터프레임 합체
pool.close()
pool.join()
return parallelized_df
if __name__ == '__main__':
# import stock_code_list
stock_code_list = pd.read_hdf('code_list.h5')
# 내 PC의 cpu 코어 갯수를 확인한다.
print(mp.cpu_count())
### (1) 멀티프로세싱으로 작동
### 멀티프로세싱 (코어 갯수 : 8)
###
start = time.time() # 시작 시간
total_profit = parallelize_dataframe(stock_code_list, multi_run, mp.cpu_count()) # storing result
print("time :", time.time() - start) # 현재시각 - 시작시간 = 실행 시간
plt.hist(total_profit, bins=60, range=(-30, 30))
plt.show()
### (2) 시간 비교를 위해 단일 프로세싱으로 작동
### 단일 프로세스로 작동
###
start = time.time() # 시작 시간
get_p = get_pattern(code_list)
total_profit = get_p.run()
print("time :", time.time() - start) # 현재시각 - 시작시간 = 실행 시간
plt.hist(total_profit, bins=60, range=(-30, 30))
plt.show()
- 다음 예제에서는 parallelize_dataframe 함수에서 멀티프로세싱을 수행한다.
- 결과 데이터의 사이즈는 88460행이다.
- 코드를 작동해서 확인해 볼 수는 없으나, 실질적으로 쿼리되는 데이터의 양은 약 25만 행으로 추정한다.
(추출 근거는 17년 1월 1일부터 영업일(약 900일) * 검색종목수(테이블 수 2981개)를 근사치로 설정하여 25만행으로 산출하였다.)
실행을 해보니 결과는 다음과 같았다.
단일 프로세싱 time : 30.910 2코어 멀티 프로세싱 time : 15.925 4코어 멀티 프로세싱 time : 11.130 8코어 멀티 프로세싱 time : 10.087
- 속도의 변화만 보기 위해 평균치는 내지 않았다.
시간 차이가 작동때마다 차이가 발생할 수 있음.
- 8코어는 실제코어가 8코어가 아니고, 하이퍼쓰레딩이 적용되어있어 8개로 인식한다. 실제 코어 수는 4개이다.
우리는 이 테스트 코드를 통해 다음과 같은 결과를 도출해 낼 수 있다.
- 2개의 코어로 분산만 해줘도 속도 개선이 확연히 개선되었다.
- 코어를 n배한다고 해서 정확히 속도가 1/n배로는 개선되지 않는다.
따라서 적당한 코어 갯수 설정을 통한 멀티프로세싱을 하면, 데이터 분석 속도를 가시적으로 개선해 볼 수 있을 것이다.
3.1 MPIRE 를 이용한 멀티 프로세싱[편집]
from mpire import WorkerPool
from bigO import BigO
lib = BigO()
def time_consuming_function(x):
arr = lib.genRandomArray(100)
print(f"sum: {sum(arr)}")
if __name__ == "__main__": # Windows에서 필수
with WorkerPool(n_jobs=5) as pool:
results = pool.map(time_consuming_function, range(10))
""" 결과
랜덤 배열을 매번 생성해서 합을 출력
sum: -107
sum: -1012
sum: 976
sum: 974
sum: -1509
sum: -294
sum: -831
sum: -403
sum: -924
sum: -57
"""
3.2 JOBLIB 와 RAY 비교[편집]
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool
import ray
from dask.distributed import Client
from joblib import Parallel, delayed
from mpire import WorkerPool
# Serial processing
results = [time_consuming_function(x) for x in data]
# Multiprocessing
with Pool(processes=5) as pool:
results = pool.map(time_consuming_function, data)
# ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=5) as pool:
results = list(pool.map(time_consuming_function, data))
# Joblib
results = Parallel(n_jobs=5)(delayed(time_consuming_function)(x) for x in data)
# Dask
client = Client(n_workers=5)
results = client.gather([client.submit(time_consuming_function, x) for x in data])
client.close()
# Ray
ray.init(num_cpus=5)
remote_function = ray.remote(time_consuming_function)
results = ray.get([remote_function.remote(x) for x in data])
# MPIRE
with WorkerPool(n_jobs=5) as pool:
results = pool.map(time_consuming_function, data)