행위

"멀티프로세싱"의 두 판 사이의 차이

DB CAFE

(새 문서: == 파이썬 멀티프로세싱 == {{틀:타이틀 투명 |제목= # multiprocessing 이라는 모듈을 이용해 멀티프로세싱을 수행. # multiprocessing 은 threading 모...)
 
 
(같은 사용자의 중간 판 15개는 보이지 않습니다)
1번째 줄: 1번째 줄:
 
== 파이썬 멀티프로세싱 ==
 
== 파이썬 멀티프로세싱 ==
{{틀:타이틀 투명
+
{{틀:고지상자
|제목=
+
|제목= - 주요 사항 
# multiprocessing 이라는  모듈을 이용해 멀티프로세싱을 수행.
+
# multiprocessing 모듈 이용 멀티프로세싱 수행.
 
# multiprocessing 은 threading 모듈과 유사한 API를 사용하여 프로세스 스포닝(spawning)을 지원하는 패키지임.  
 
# multiprocessing 은 threading 모듈과 유사한 API를 사용하여 프로세스 스포닝(spawning)을 지원하는 패키지임.  
 
# multiprocessing 패키지는 지역과 원격 동시성을 모두 제공하며 스레드 대신 서브 프로세스를 사용하여 전역 인터프리터 록 을 효과적으로 피합니다. 이것 때문에, multiprocessing 모듈은 프로그래머가 주어진 기계에서 다중 프로세서를 최대한 활용할 수 있게 합니다.  
 
# multiprocessing 패키지는 지역과 원격 동시성을 모두 제공하며 스레드 대신 서브 프로세스를 사용하여 전역 인터프리터 록 을 효과적으로 피합니다. 이것 때문에, multiprocessing 모듈은 프로그래머가 주어진 기계에서 다중 프로세서를 최대한 활용할 수 있게 합니다.  
9번째 줄: 9번째 줄:
  
 
==  Python multiprocessing 모듈 사용법 ==
 
==  Python multiprocessing 모듈 사용법 ==
 +
{{틀:고지상자
 +
|아이콘=alarm
 +
|제목= - 사용 방법
 
# 파이썬에서 multiprocessing 모둘을 불러온다.(멀티프로세싱 모듈은 pip를 통해 따로 설치해 줄 필요 없음)
 
# 파이썬에서 multiprocessing 모둘을 불러온다.(멀티프로세싱 모듈은 pip를 통해 따로 설치해 줄 필요 없음)
 
# pool 객체를 생성
 
# pool 객체를 생성
 
## 이 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시키는 편리한 방법을 제공합니다(데이터 병렬 처리)
 
## 이 객체는 여러 입력 값에 걸쳐 함수의 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시키는 편리한 방법을 제공합니다(데이터 병렬 처리)
* pool 객체를 공식 문서 설명
+
##* pool 객체를 공식 문서 설명
** pool 객체를 이용해 멀티프로세싱을 할 수 있게 객체를 생성해준다.
+
##** pool 객체를 이용해 멀티프로세싱을 할 수 있게 객체를 생성해준다.
** pool(코어 갯수) 로 만들 수 있다.
+
##** pool(코어 갯수) 로 만들 수 있다.
 
# pool 객체에 작업을 매핑해준다.
 
# pool 객체에 작업을 매핑해준다.
* pool.map(함수, (인풋 데이터)) 형식으로 작업을 매핑해준다.
+
#* pool.map(함수, (인풋 데이터)) 형식으로 작업을 매핑해준다.
* 코어 갯수로 나누어 작업을 처리하게 된다.
+
#* 코어 갯수로 나누어 작업을 처리하게 된다.
 
# 실행
 
# 실행
객체를 생성한 뒤, 실제로 속도가 빨라지는지 한번 확인해본다.
+
#* 객체를 생성한 뒤, 실제로 속도가 빨라지는지 한번 확인해본다.
 +
}}
  
== 예제 ==
+
== 예제 ==
 +
{{틀:고지상자
 +
|제목=다음 코드는 2017년부터 특정 가격(전일종가 -1%) 에 해당되는 주식 종목을 검색하여,
 +
당일 종가기준 수익률이 얼마나 되는지 검색하는 코드이다.
  
다음 코드는 2017년부터 특정 가격(전일종가 -1%) 에 해당되는 주식 종목을 검색하여, 당일 종가기준 수익률이 얼마나 되는지 검색하는 코드이다.
 
 
작동 플로우는 일봉데이터DB에서 쿼리 → 조건에 맞는지 확인 → 조건 충족하는 데이터를 수합하여 히스토그램으로 시현 순으로 작동된다.
 
작동 플로우는 일봉데이터DB에서 쿼리 → 조건에 맞는지 확인 → 조건 충족하는 데이터를 수합하여 히스토그램으로 시현 순으로 작동된다.
 +
}}
 +
<source lang=python>
  
<source lang=python>
 
 
import numpy  as np
 
import numpy  as np
 
import pandas as pd
 
import pandas as pd
130번째 줄: 137번째 줄:
 
     plt.hist(total_profit, bins=60, range=(-30, 30))
 
     plt.hist(total_profit, bins=60, range=(-30, 30))
 
     plt.show()
 
     plt.show()
</python>
+
</source>
  
 
* 다음 예제에서는 parallelize_dataframe 함수에서 멀티프로세싱을 수행한다.
 
* 다음 예제에서는 parallelize_dataframe 함수에서 멀티프로세싱을 수행한다.
156번째 줄: 163번째 줄:
 
데이터 분석 속도를 가시적으로 개선해 볼 수 있을 것이다.
 
데이터 분석 속도를 가시적으로 개선해 볼 수 있을 것이다.
  
+
=== MPIRE 를 이용한 멀티 프로세싱 ===
 +
<source lang=python>
 +
from mpire import WorkerPool
 +
 
 +
from bigO import BigO
 +
 
 +
lib = BigO()
 +
 
 +
 
 +
def time_consuming_function(x):
 +
    arr = lib.genRandomArray(100)
 +
    print(f"sum: {sum(arr)}")
 +
 
 +
 
 +
if __name__ == "__main__":  # Windows에서 필수
 +
    with WorkerPool(n_jobs=5) as pool:
 +
        results = pool.map(time_consuming_function, range(10))
 +
 
 +
 
 +
""" 결과
 +
랜덤 배열을 매번 생성해서 합을 출력
 +
 
 +
sum: -107
 +
sum: -1012
 +
sum: 976
 +
sum: 974
 +
sum: -1509
 +
sum: -294
 +
sum: -831
 +
sum: -403
 +
sum: -924
 +
sum: -57
 +
 
 +
"""
 +
 
 +
</source>
 +
 
 +
=== JOBLIB 와 RAY 비교 ===
 +
<source lang=python>
 +
from concurrent.futures import ProcessPoolExecutor
 +
from multiprocessing import Pool
 +
 
 +
import ray
 +
from dask.distributed import Client
 +
from joblib import Parallel, delayed
 +
from mpire import WorkerPool
 +
 
 +
 
 +
# Serial processing
 +
results = [time_consuming_function(x) for x in data]
 +
 
 +
# Multiprocessing
 +
with Pool(processes=5) as pool:
 +
    results = pool.map(time_consuming_function, data)
 +
   
 +
# ProcessPoolExecutor
 +
with ProcessPoolExecutor(max_workers=5) as pool:
 +
    results = list(pool.map(time_consuming_function, data))
 +
 
 +
# Joblib
 +
results = Parallel(n_jobs=5)(delayed(time_consuming_function)(x) for x in data)
 +
 
 +
# Dask
 +
client = Client(n_workers=5)
 +
results = client.gather([client.submit(time_consuming_function, x) for x in data])
 +
client.close()
 +
 
 +
# Ray
 +
ray.init(num_cpus=5)
 +
remote_function = ray.remote(time_consuming_function)
 +
results = ray.get([remote_function.remote(x) for x in data])
 +
 
 +
# MPIRE
 +
with WorkerPool(n_jobs=5) as pool:
 +
    results = pool.map(time_consuming_function, data)
 +
 
 +
</source>
 +
[[category:python]]

2023년 5월 25일 (목) 01:09 기준 최신판

thumb_up 추천메뉴 바로가기


1 파이썬 멀티프로세싱[편집]




2 Python multiprocessing 모듈 사용법[편집]




3 예제[편집]



import numpy  as np
import pandas as pd
import matplotlib.pyplot as plt
import sqlite3
import multiprocessing as mp
import time

class get_pattern():
    def __init__(self, stock_code_list):
        self.db = sqlite3.connect("stock_price(day).db")
        self.stock_code_list = stock_code_list

    ### Query data
    def query_data(self, code):
        ## Pass ETN
        if code[0] == 'Q':
            pass

        cursor = self.db.cursor()
        
        data = cursor.execute("SELECT * FROM %s WHERE date > 20170101" %code)
        cols = [column[0] for column in data.description]
        
        query_data = pd.DataFrame.from_records(data=data.fetchall(), columns=cols)

        return query_data


    ### Search Pattern
    def get_pattern(self, queried_data):
        queried_data['close_D-1'] = queried_data['close'].shift(1) * 0.99
        queried_data['Pattern_Today'] = (queried_data['low'] < queried_data['close_D-1']) & (queried_data['volume'] * queried_data['close'] > 10000000000)
        queried_data['Profit'] = (queried_data['close'] / queried_data['close_D-1'] - 1) * 100

        return queried_data['Profit'][queried_data['Pattern_Today'] == True]
 

    ## run
    def run(self):
        filtered_profit = pd.Series()

        for code in self.stock_code_list:
            history_data = self.query_data(code)
            filtered_data = self.get_pattern(history_data)

            filtered_profit = pd.concat([filtered_profit, filtered_data])

        return filtered_profit
    

# Multiprocessing
def multi_run(code_list):
    get_p = get_pattern(code_list)
    profit = get_p.run()

    return profit


# parallelize dataframe
def parallelize_dataframe(df, func, cores):

    list_split = np.array_split(df, cores)
    pool = mp.Pool(num_cores) ## pool 객체 생성
    parallelized_df = pd.concat(pool.map(func, list_split)) ## 멀티프로세싱 후 데이터프레임 합체

    pool.close()
    pool.join()

    return parallelized_df

if __name__ == '__main__':   
    
    # import stock_code_list
    stock_code_list = pd.read_hdf('code_list.h5')
    
    # 내 PC의 cpu 코어 갯수를 확인한다.
    print(mp.cpu_count())

    ### (1) 멀티프로세싱으로 작동
    ### 멀티프로세싱 (코어 갯수 : 8)
    ###

    start = time.time()  # 시작 시간
    total_profit = parallelize_dataframe(stock_code_list, multi_run, mp.cpu_count()) # storing result
    print("time :", time.time() - start)  # 현재시각 - 시작시간 = 실행 시간

    plt.hist(total_profit, bins=60, range=(-30, 30))
    plt.show()


    ### (2) 시간 비교를 위해 단일 프로세싱으로 작동
    ### 단일 프로세스로 작동
    ###

    start = time.time()  # 시작 시간

    get_p = get_pattern(code_list)
    total_profit = get_p.run()

    print("time :", time.time() - start)  # 현재시각 - 시작시간 = 실행 시간

    plt.hist(total_profit, bins=60, range=(-30, 30))
    plt.show()
  • 다음 예제에서는 parallelize_dataframe 함수에서 멀티프로세싱을 수행한다.
  • 결과 데이터의 사이즈는 88460행이다.
  • 코드를 작동해서 확인해 볼 수는 없으나, 실질적으로 쿼리되는 데이터의 양은 약 25만 행으로 추정한다.
 (추출 근거는 17년 1월 1일부터 영업일(약 900일) * 검색종목수(테이블 수 2981개)를 근사치로 설정하여 25만행으로 산출하였다.)​

실행을 해보니 결과는 다음과 같았다.

단일 프로세싱 time : 30.910 2코어 멀티 프로세싱 time : 15.925 4코어 멀티 프로세싱 time : 11.130 8코어 멀티 프로세싱 time : 10.087

  • 속도의 변화만 보기 위해 평균치는 내지 않았다.

시간 차이가 작동때마다 차이가 발생할 수 있음.

  • 8코어는 실제코어가 8코어가 아니고, 하이퍼쓰레딩이 적용되어있어 8개로 인식한다. 실제 코어 수는 4개이다.

우리는 이 테스트 코드를 통해 다음과 같은 결과를 도출해 낼 수 있다.

  • 2개의 코어로 분산만 해줘도 속도 개선이 확연히 개선되었다.
  • 코어를 n배한다고 해서 정확히 속도가 1/n배로는 개선되지 않는다.

​따라서 적당한 코어 갯수 설정을 통한 멀티프로세싱을 하면, 데이터 분석 속도를 가시적으로 개선해 볼 수 있을 것이다.

3.1 MPIRE 를 이용한 멀티 프로세싱[편집]

from mpire import WorkerPool

from bigO import BigO

lib = BigO()


def time_consuming_function(x):
    arr = lib.genRandomArray(100)
    print(f"sum: {sum(arr)}")


if __name__ == "__main__":  # Windows에서 필수
    with WorkerPool(n_jobs=5) as pool:
        results = pool.map(time_consuming_function, range(10))


""" 결과
랜덤 배열을 매번 생성해서 합을 출력

sum: -107
sum: -1012
sum: 976
sum: 974
sum: -1509
sum: -294
sum: -831
sum: -403
sum: -924
sum: -57

"""

3.2 JOBLIB 와 RAY 비교[편집]

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Pool

import ray
from dask.distributed import Client
from joblib import Parallel, delayed
from mpire import WorkerPool


# Serial processing
results = [time_consuming_function(x) for x in data]

# Multiprocessing
with Pool(processes=5) as pool:
    results = pool.map(time_consuming_function, data)
    
# ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=5) as pool:
    results = list(pool.map(time_consuming_function, data))

# Joblib
results = Parallel(n_jobs=5)(delayed(time_consuming_function)(x) for x in data)

# Dask
client = Client(n_workers=5)
results = client.gather([client.submit(time_consuming_function, x) for x in data])
client.close()

# Ray
ray.init(num_cpus=5)
remote_function = ray.remote(time_consuming_function)
results = ray.get([remote_function.remote(x) for x in data])

# MPIRE
with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, data)