메뉴 여닫기
개인 메뉴 토글
로그인하지 않음
만약 지금 편집한다면 당신의 IP 주소가 공개될 수 있습니다.

멀티프로세싱

DB CAFE

파이썬 멀티프로세싱




Python multiprocessing 모듈 사용법




예제




import numpy  as np
import pandas as pd
import matplotlib.pyplot as plt
import sqlite3
import multiprocessing as mp
import time

class get_pattern():
    def __init__(self, stock_code_list):
        self.db = sqlite3.connect("stock_price(day).db")
        self.stock_code_list = stock_code_list

    ### Query data
    def query_data(self, code):
        ## Pass ETN
        if code[0] == 'Q':
            pass

        cursor = self.db.cursor()
        
        data = cursor.execute("SELECT * FROM %s WHERE date > 20170101" %code)
        cols = [column[0] for column in data.description]
        
        query_data = pd.DataFrame.from_records(data=data.fetchall(), columns=cols)

        return query_data


    ### Search Pattern
    def get_pattern(self, queried_data):
        queried_data['close_D-1'] = queried_data['close'].shift(1) * 0.99
        queried_data['Pattern_Today'] = (queried_data['low'] < queried_data['close_D-1']) & (queried_data['volume'] * queried_data['close'] > 10000000000)
        queried_data['Profit'] = (queried_data['close'] / queried_data['close_D-1'] - 1) * 100

        return queried_data['Profit'][queried_data['Pattern_Today'] == True]
 

    ## run
    def run(self):
        filtered_profit = pd.Series()

        for code in self.stock_code_list:
            history_data = self.query_data(code)
            filtered_data = self.get_pattern(history_data)

            filtered_profit = pd.concat([filtered_profit, filtered_data])

        return filtered_profit
    

# Multiprocessing
def multi_run(code_list):
    get_p = get_pattern(code_list)
    profit = get_p.run()

    return profit


# parallelize dataframe
def parallelize_dataframe(df, func, cores):

    list_split = np.array_split(df, cores)
    pool = mp.Pool(num_cores) ## pool 객체 생성
    parallelized_df = pd.concat(pool.map(func, list_split)) ## 멀티프로세싱 후 데이터프레임 합체

    pool.close()
    pool.join()

    return parallelized_df

if __name__ == '__main__':   
    
    # import stock_code_list
    stock_code_list = pd.read_hdf('code_list.h5')
    
    # 내 PC의 cpu 코어 갯수를 확인한다.
    print(mp.cpu_count())

    ### (1) 멀티프로세싱으로 작동
    ### 멀티프로세싱 (코어 갯수 : 8)
    ###

    start = time.time()  # 시작 시간
    total_profit = parallelize_dataframe(stock_code_list, multi_run, mp.cpu_count()) # storing result
    print("time :", time.time() - start)  # 현재시각 - 시작시간 = 실행 시간

    plt.hist(total_profit, bins=60, range=(-30, 30))
    plt.show()


    ### (2) 시간 비교를 위해 단일 프로세싱으로 작동
    ### 단일 프로세스로 작동
    ###

    start = time.time()  # 시작 시간

    get_p = get_pattern(code_list)
    total_profit = get_p.run()

    print("time :", time.time() - start)  # 현재시각 - 시작시간 = 실행 시간

    plt.hist(total_profit, bins=60, range=(-30, 30))
    plt.show()
  • 다음 예제에서는 parallelize_dataframe 함수에서 멀티프로세싱을 수행한다.
  • 결과 데이터의 사이즈는 88460행이다.
  • 코드를 작동해서 확인해 볼 수는 없으나, 실질적으로 쿼리되는 데이터의 양은 약 25만 행으로 추정한다.
 (추출 근거는 17년 1월 1일부터 영업일(약 900일) * 검색종목수(테이블 수 2981개)를 근사치로 설정하여 25만행으로 산출하였다.)​

실행을 해보니 결과는 다음과 같았다.

단일 프로세싱 time : 30.910 2코어 멀티 프로세싱 time : 15.925 4코어 멀티 프로세싱 time : 11.130 8코어 멀티 프로세싱 time : 10.087

  • 속도의 변화만 보기 위해 평균치는 내지 않았다.

시간 차이가 작동때마다 차이가 발생할 수 있음.

  • 8코어는 실제코어가 8코어가 아니고, 하이퍼쓰레딩이 적용되어있어 8개로 인식한다. 실제 코어 수는 4개이다.

우리는 이 테스트 코드를 통해 다음과 같은 결과를 도출해 낼 수 있다.

  • 2개의 코어로 분산만 해줘도 속도 개선이 확연히 개선되었다.
  • 코어를 n배한다고 해서 정확히 속도가 1/n배로는 개선되지 않는다.

​따라서 적당한 코어 갯수 설정을 통한 멀티프로세싱을 하면, 데이터 분석 속도를 가시적으로 개선해 볼 수 있을 것이다.

MPIRE 를 이용한 멀티 프로세싱

from mpire import WorkerPool

from bigO import BigO

lib = BigO()


def time_consuming_function(x):
    arr = lib.genRandomArray(100)
    print(f"sum: {sum(arr)}")


if __name__ == "__main__":  # Windows에서 필수
    with WorkerPool(n_jobs=5) as pool:
        results = pool.map(time_consuming_function, range(10))


""" 결과
랜덤 배열을 매번 생성해서 합을 출력

sum: -107
sum: -1012
sum: 976
sum: 974
sum: -1509
sum: -294
sum: -831
sum: -403
sum: -924
sum: -57

"""

JOBLIB 와 RAY 비교

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool

import ray
from dask.distributed import Client
from joblib import Parallel, delayed
from mpire import WorkerPool


# Serial processing
results = [time_consuming_function(x) for x in data]

# Multiprocessing
with Pool(processes=5) as pool:
    results = pool.map(time_consuming_function, data)
    
# ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=5) as pool:
    results = list(pool.map(time_consuming_function, data))

# Joblib
results = Parallel(n_jobs=5)(delayed(time_consuming_function)(x) for x in data)

# Dask
client = Client(n_workers=5)
results = client.gather([client.submit(time_consuming_function, x) for x in data])
client.close()

# Ray
ray.init(num_cpus=5)
remote_function = ray.remote(time_consuming_function)
results = ray.get([remote_function.remote(x) for x in data])

# MPIRE
with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, data)