먹고 기도하고 코딩하라

[인프런 리프 2기] 4주 - 파이썬 병렬성과 AsyncIO 본문

일상

[인프런 리프 2기] 4주 - 파이썬 병렬성과 AsyncIO

2G Dev 2021. 4. 2. 21:58
728x90
728x90

이전 글 보기

 

[인프런 리프 2기] 4주 - 파이썬 병행성

이전 글 보기 [인프런 리프 2기] 3주 - 파이썬 일급 함수 오늘은 인프런 리프 3주차 미션인 섹션 5를 정리해 보겠습니다. 제가 수강하는 강의는 [우리를 위한 프로그래밍], 파이썬 중급 과정 인프

dev-dain.tistory.com

 

 

오늘은 인프런 리프 4주차 미션인 섹션 6, 7을 정리해 보겠습니다.

 

 

제가 수강하는 강의는 [우리를 위한 프로그래밍], 파이썬 중급 과정 인프런 오리지널 강의입니다.

 

 

 

 

섹션 7 : 최종 실습

이 포스팅에서는 저번에 못 다룬 병렬성 (futures) 부분을 다루고, 최종 실습이라고 되어 있는 AsyncIO 부분과 beautifulsoup4 일부분을 다루는데요. 자세한 내용은 다음과 같습니다:

(1) 병렬성 : 비동기 작업을 위한 futures
(2) asyncIO 사용법 (비동기 I/O 코루틴 작업)

 

 

 

 

7-1. 병렬성 : 비동기 작업을 위한 futures

futures는 비동기 작업 실행을 위한 것인데요. 비동기 작업으로 잘 하면 file을 쓰는 작업(I/O) 등으로 block되거나 시간이 많이 지연되는 동안 CPU 및 리소스가 지나치게 낭비되는 것을 막을 수 있습니다. 비동기 작업과 적합하게 코드를 짤 경우 성능 향상 효과를 많이 볼 수도 있다고 하고요. 최대한 자원을 알뜰히 잘 활용해서 동시성을 실현하는 것이 futures의 목표입니다.

futures는 비동기 실행을 위한 API를 고수준으로 작성해 사용하기 쉽도록 개선한 것입니다. Python 3.2 이전에는 비동기 프로그래밍을 위해 threads, multiprocess 등을 import해서 사용해야 했지만, 이제는 concurrent.futures만 import해 오면 더 간단하게 사용할 수 있습니다.

concurrent.futures의 장점으로는 1번째, 멀티 스레딩/멀티 프로세싱 API가 통일되어 사용이 아주 쉽다는 점입니다. 아래 코드에서 확인해 보겠지만 사용 방법이 거의 똑같고 ThreadPool을 할지 ProcessPool을 할지 정도 차이만 있어서 바꿔 쓰기도 쉽습니다. 2번째, 실행 중인 작업이 취소됐거나 완료됐는지 등의 여부를 확인하기 쉽고, 타임아웃 옵션도 있으며 동기화 코드를 쉽게 작성할 수 있다는 점입니다.

 

그럼 시작해 보겠습니다. futures를 써서 동시성 프로그래밍을 하는 방법은 2가지가 있는데 첫 번째는 concurrent.futures의 map 함수를 쓰는 것이고, 두 번째는 wait과 as_completed를 쓰는 것입니다. 첫 번째 방법부터 봅시다.

1부터 10만, 100만, 1000만, 1억까지 계속 숫자를 누적해가며 더하는 작업을 해야 한다고 생각해 봅시다. 순차적으로 실행한다면 시간이 꽤 걸리겠죠? 사실 다른 건 다 실행할만하지만 1억까지 더하는 게 시간이 제일 많이 걸리긴 할 겁니다. 이 일들이 서로 어떤 영향을 미치는 건 없기 때문에 각자 따로 실행해도 괜찮은 작업입니다.

 

import os
import time
from concurrent import futures

WORK_LIST = [100000, 1000000, 10000000, 100000000]

# 누적 합계 함수(제너레이터)
def sum_generator(n):
  return sum(n for n in range(1, n + 1))

시작 시간과 끝나는 시간을 재기 위해 time 모듈을 import한 다음, concurrent 라이브러리에서 futures 모듈도 import해 옵니다. WORK_LIST에는 각 스레드(혹은 프로세스)들이 해야 할 일들을 담아 두었습니다.

 

def main():
  # Worker count
  worker = min(10, len(WORK_LIST))  # 10개거나 작업의 개수 중 작은 수로 스레드 만들기
  
  # 시작시간
  start_tm = time.time()

  with futures.ThreadPoolExecutor() as executor:
    # 저절로 맵핑됨. map -> 작업 순서 유지, 즉시 실행
    result = executor.map(sum_generator, WORK_LIST)

  # 종료시간
  end_tm = time.time() - start_tm
  # 출력 포맷
  msg = '\n Result -> {} Time : {:.2f}s'
  # 최종 결과 출력
  print(msg.format(list(result), end_tm))


# main 파일에서 실행하는 거라면.. (시작점이 있어야 함)
if __name__ == '__main__':
  main()

다음 main 함수를 작성합니다. 지금까지는 main 함수를 따로 작성하지 않고 그냥 전역 스코프에서 모든 함수를 불러왔지만, 시작점이 있어야 futures 정상 사용이 가능하기 때문에 따로 함수를 만들었습니다. 그리고 맨 아래에서 현재 실행하는 모듈의 이름을 체크해서 메인 루틴이라면 main()을 실행하도록 합니다. 이게 왜 필요한지 말로는 설명이 쉬운데 글로 풀어 쓰려니 어렵네요. 🤣 코딩도장의 이 글을 보는 게 도움이 될 것 같습니다.

worker는 작업자 개수를 가리키도록 하는데요, 10개나 WORK_LIST 중 더 작은 것을 선택합니다.

with futures.ThreadPoolExecutor() as executor: 이 문장은 futures의 ThreadPoolExecutor, 즉 스레드를 사용해서 해당 작업을 하겠다는 의미입니다. executor에는 map이라는 메소드가 있는데, 첫 번째로 작업이 있는 함수를 받고 두 번째로는 스레드를 몇 개나 풀어서 할지 그 개수를 쓰게 되어 있습니다. 그 결과는 저절로 맵핑되어 result에 반환받게 됩니다.

실행 결과는 다음과 같습니다.

 

# Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000] Time : 16.24s

result 리스트에 각 WORK_LIST 요소에 sum_generator를 돌린 결과값이 순서대로 잘 들어가 있고, 작업을 실행한 시간도 잘 나와 있습니다.

이는 스레드를 이용해서 작업한 예시인데요, 프로세스를 이용해서 작업할 수도 있습니다. 직접 해 보니 프로세스를 이용한 경우가 아주 약간 더 빠를 때가 많았습니다.

바꿔주는 방법은 간단합니다. ThreadPoolExecutor라고 된 부분을 ProcessPoolExecutor로 바꿔주면 됩니다.

 

  with futures.ProcessPoolExecutor() as executor:
    result = executor.map(sum_generator, WORK_LIST)
    
# Result -> [5000050000, 500000500000, 50000005000000, 5000000050000000] Time : 11.11s

쉽게 생각해서, 스레드를 이용해서 동시성 프로그래밍을 하려면 ThreadPoolExecutor를, 프로세스를 이용하려면 ProcessPoolExecutor를 사용하면 됩니다. 

 

이제 두 번째, wait과 as_completed를 이용한 방법을 살펴봅시다.

먼저 문제를 간단히 하기 위해 앞의 것과 똑같이 1부터 10만, 100만, 1000만, 1억을 누적해서 더하는 일을 한다고 합시다. 그리고 futures.ThreadPoolExecutor처럼 쓰지 않기 위해 다음과 같이 import를 하겠습니다. 

 

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [100000, 1000000, 10000000, 100000000]

def sum_generator(n):
  return sum(n for n in range(1, n + 1))

wait과 as_completed를 추가해서 import했습니다. 이제 main 함수를 작성해 보겠습니다.

 

def main():
  worker = min(10, len(WORK_LIST))  # 10개거나 작업의 개수 중 작은 수로 스레드 만들기
  start_tm = time.time()
  
  # futures
  futures_list = []

  with ProcessPoolExecutor() as executor:
    for work in WORK_LIST:
      # future를 반환할 뿐 (미래에 할 일을 반환), 일을 실제로 하지는 않음
      future = executor.submit(sum_generator, work)
      futures_list.append(future)	# 스케줄링
      print('Scheduled for {} : {}'.format(work, future))	# 스케줄링 확인
    
    # wait 결과 출력하되, 7초까지만 기다리고 그 때까지 못 하는 일은 실패로 간주하고 중단
    result = wait(futures_list, timeout=7)

    # 성공
    print('Completed Tasks ' + str(result.done))
    # 실패
    print('Pending ones after waiting for 7 secs : ' + str(result.not_done))
    # 결과값 출력
    print([future.result() for future in result.done])

    # as_completed 결과 출력
    for future in as_completed(futures_list):
      result = future.result()
      done = future.done()
      cancelled = future.cancelled()  # 취소됐는지 확인

      # future 결과 확인
      print('Future Result : {}, Done : {}'.format(result, done))
      print('Future Cancelled : {}'.format(cancelled))
      
  end_tm = time.time() - start_tm
  msg = '\nTime : {:.2f}s'
  print(msg.format(end_tm))
  
  
if __name__ == '__main__':
  main()

앞의 예시에 비해서 좀 길죠? 일단 futures_list라는 빈 리스트를 만들어서 future, 즉 미래에 할 일들을 담아 놓기로 합니다. with문이 시작되고 나서는 executor.submit(작업 함수, 각개 일) 메소드를 실행해 반환된 작업을 future에 넣고, futures_list에 append시킵니다. 일이 순차적으로 스케줄링되겠죠? 이 때 print문을 확인하면 다음과 같습니다.

 

Scheduled for 100000 : <Future at 0x2976b393640 state=running>
Scheduled for 1000000 : <Future at 0x2976b5a0610 state=pending>
Scheduled for 10000000 : <Future at 0x2976b601790 state=pending>
Scheduled for 100000000 : <Future at 0x2976b601c70 state=pending>

Future라는 타입으로 나옵니다. state는 1개 빼고 pending이네요. 

그 다음 for문을 빠져나가서 wait을 하게 되는데, 이 때 매개변수로 future가 담긴 리스트를 받습니다. timeout 옵션을 줘서 몇 초까지 기다릴 것인지 정할 수 있는데, 7초 이내에 완료되지 않는 작업들은 fail이라고 간주하기로 합니다. 이제 result는 done과 not_done 프로퍼티를 갖는 object가 되는데요. Completed Tasks, Pending ones 출력문과 결과값 출력 결과는 다음과 같습니다.

 

Completed Tasks {<Future at 0x2976b5a0610 state=finished returned int>, <Future at 
0x2976b393640 state=finished returned int>, <Future at 0x2976b601790 state=finished returned int>}

Pending ones after waiting for 7 secs : {<Future at 0x2976b601c70 state=running>}  

[500000500000, 5000050000, 50000005000000]

7초가 지나도 끝나지 않은 건 1부터 1억까지 더하는 작업이네요. 먼저 완료된 100만까지 더한 값, 10만까지 더한 값, 1000만까지 더한 값은 result.done 각각의 result()로 값을 따로 빼내 출력했습니다.

이처럼 result에 wait한 결과를 담아서 done, not_done으로 제한 시간 안에 작업을 완료했는지 확인해보고, 완료된 작업만 받아볼 수도 있습니다.

as_completed 결과도 출력 가능합니다. future가 담긴 리스트인 futures_list를 매개변수로 넣은 as_completed 호출 결과로 for문을 돌리는데, 이 때 각 요소가 가진 result()는 작업이 끝났을 때의 결과값과 동일합니다. future.done()은 작업이 완료됐는지(True/False), future.cancelled()는 작업이 취소됐는지(True/False)를 확인해 볼 수 있습니다.

 

Future Result : 50000005000000, Done : True
Future Cancelled : False
Future Result : 5000050000, Done : True
Future Cancelled : False
Future Result : 500000500000, Done : True
Future Cancelled : False
Future Result : 5000000050000000, Done : True
Future Cancelled : False


Time : 9.46s

이렇게 동시에 작업을 빠르게 마칠 수 있습니다. 

만약 이걸 각자 다 따로 하려면 얼마만큼의 시간이 필요할까요? 동시성 프로그래밍이 아니라 1번에 1개의 작업만 하도록 프로그래밍하겠습니다. sum_generator는 이미 있다고 치겠습니다.

 

start_tm = time.time()
for work in WORK_LIST:
  result = sum_generator(work)
  
end_tm = time.time() - start_tm
msg = '\nTime : {:.2f}s'
# 최종 결과 출력 (오 이렇게도 format 가능)
print(msg.format(end_tm))	# Time : 17.54s

대략 2배의 시간이 걸립니다. 만약 선후관계가 중요한 작업이라면 비동시성 프로그래밍을 하거나 await를 걸어주는 등 대기가 필요할 수도 있겠지만, 각자 누적해서 합하는 건 선후 관계도 없고 아무런 연관이 없는 작업입니다. 이런 작업들은 비동기 프로그래밍을 하는 게 시간 아끼는 좋은 방법이겠죠?

 

 

 

 

7-2. asyncIO 사용법 (비동기 I/O 코루틴 작업)

asyncIO는 코루틴을 확장한 문법입니다. 비동기 I/O 코루틴 작업인데요. 앞서 말씀드렸듯 비동기 프로그래밍은 네트워크, 데이터베이스, 웹크롤링 등에 특히 좋습니다. asyncIO를 쓸 때는 사용할 함수를 비동기로 구현해야 의미가 있습니다. 함수에 async 키워드를 붙여봐야 비동기로 구현되지 않았다면 쓸모가 없습니다.

 

asyncIO를 사용하려면 먼저 가상환경을 하나 만들어 asyncio를 설치해주는 것이 좋습니다. 가상환경을 설치해서 쓰는 장점에 대해서는 이전에 설명했었죠? 외부 환경과 꼬이지 않도록 따로 작업실을 만들어 주는 것과 같은 이치입니다.

 

$ python -m venv [directory_name]

$ cd [dir_name]/Scripts
$ activate
$ python -m pip install --upgrade pip
$ pip install asyncio
$ pip install beautifulsoup4
$ pip list

가상환경을 만들고, pip을 업그레이드해준 다음 asyncio와 beautifulsoup4를 설치합니다. 그런 다음 사용하는 에디터에서 파이썬 인터프리터를 해당 가상환경 Lib 내의 python.exe로 설정해 주면 됩니다. 이 가상환경으로 인터프리터를 설정하지 않으면 'No module named bs4' 등의 에러가 날 수 있으니 주의해 주시기 바랍니다.

먼저, asyncio를 사용하고 시간을 재고, 웹사이트를 여는 작업을 하기 위해 여러 가지 모듈을 import해 줍시다. 물론 futures도 import해주고요.

 

import asyncio
import timeit
from urllib.request import urlopen  # Block 함수임
from concurrent.futures import ThreadPoolExecutor
import threading

# 실행 시작 시간
start = timeit.default_timer()

# 서비스 방향이 비슷한 사이트로 실습 권장 (ex) 게시판성 커뮤니티)
urls = [
  'https://daum.net', 'https://naver.com', 
  'https://trello.com', 'https://tistory.com',
  'https://wemakeprice.com/'
]

urls는 요청을 보낼 웹페이지의 주소를 담은 리스트입니다.

asyncio를 사용하려면 def 앞에 async 키워드를 넣으면 됩니다. 그리고 대기해야 하는 작업 앞에 await를 붙이면 됩니다. 자바스크립트의 async-await 문법과 상당히 유사하죠?

 

async def fetch(url, executor):
  # 스레드명 출력
  print('Thread Name : ', threading.current_thread().getName(), 'Start', url)

  # 실행
  result = await loop.run_in_executor(executor, urlopen, url)
  print('Thread Name : ', threading.current_thread().getName(), 'Done', url)

  # 결과 반환
  return result.read()[0:5]

fetch라는 비동기 함수를 만듭니다. 현재 작업 중인 스레드 이름을 출력한 다음 실행합니다. urlopen(url) 작업을 executor가 실행하도록 합니다. 이 때 이 executor는 ThreadPoolExecutor나 ProcessPoolExecutor의 반환값임을 주지하시면 됩니다. await 키워드를 붙임으로써, loop.run_in_executor 작업이 끝나고 다음 작업을 할 것을 보장하게 됩니다. 결과를 기다리지 않으면 result는 None이 될 수도 있죠. ^^ loop는 아래 main 함수를 실행하는 부분에 있으니 아래 코드를 참고하시기 바랍니다. 

다음, main 함수를 보겠습니다.

 

async def main():
  # 스레드 풀 생성
  executor = ThreadPoolExecutor(max_workers=10)

  # future 객체 모아서 gather에서 실행
  futures = [
    # 실행할 함수를 쌍으로 넣어줌. url 하나당 하나의 스레드가 필요함
    asyncio.ensure_future(fetch(url, executor)) for url in urls
  ]

  # 결과 취합 (yield 자리에 await를 넣음)
  result = await asyncio.gather(*futures)
  
  print()
  print('Result : ', result)



if __name__ == '__main__':
  # 루프 초기화
  loop = asyncio.get_event_loop()
  # 작업 완료까지 대기
  loop.run_until_complete(main())
  # 수행 시간 계산
  duration = timeit.default_timer() - start
  # 총 실행 시간
  print('Total Running Time : ', duration)

executor를 만들어 주고, futures에 url 리스트의 각 url들에 asyncio.ensure_future 함수를 실행한 결과를 리스트 컴프리헨션으로 만들어서 넣어줍니다. 이 때, ensure_future 안에 비동기 함수(async def)인 fetch가 들어갑니다.

yield를 해야 할 자리에 await를 넣어 주면 됩니다. asyncio.gather를 사용해서 futures를 언패킹한 것을 result로 받아 줍니다. 

main 함수를 실행하는 자리에서는 일단 asyncio.get_event_loop()로 루프를 받아 main 함수 작업 완료까지 대기합니다. 그럼 main 함수가 실행된 다음에야 그 다음 문장이 실행되는데요. 이 때 코드 맨꼭대기에 계산한 시작 시간과의 차이를 계산합니다. 

요건 수행 결과를 따로 적지는 않겠습니다. ^^ 

이외에도 beautifulsoup4를 약간 사용하는 부분이 있었는데, 따로 적을만큼 내용이 많지 않았기 때문에 적지 않습니다. 만약 추가로 어떻게 사용해야 하는지 궁금하다면, 강의를 참고해 보시는 게 좋겠습니다.

 


 

이로써 섹션 7까지 내용을 모두 마쳤습니다!

처음에는 할만하다 싶다가 나중에 확 난이도가 올라가면서 거듭해서 봐야 겨우 이해가 됐네요.

파이썬을 나름대로 잘 활용한다고 생각했는데 사실은 그게 아니었던 모양입니다. ^^;

비동기 프로그래밍은 우리의 소중한 작업 시간을 아껴주고 작업 능률을 올릴 수 있는 좋은 방법입니다. 잘못 이해하고 쓰면 효과를 별로 보지 못할 수도 있지만요.

자바스크립트에서는 비동기 프로그래밍을 자주 사용했는데 이 기회에 파이썬 웹크롤링을 하거나 헤비한 작업을 할 때는 비동기 프로그래밍이 가능한지 검토하고 프로그래밍해야겠습니다.

 

나머지 자세한 내용은 강의에서 확인해 보시기 바랍니다. ㅎㅎ

 

 

이 포스팅은 '인프런 리프' 활동의 일환으로, <우리를 위한 프로그래밍 : 파이썬 중급> 강의를 무료로 제공받고 수강한 후기를 담고 있습니다.

 

 

 

 

References

 

 

다음 글 보기

 

[인프런 리프 2기] 리프 활동을 마치며

한 달간의 리프 활동을 무사히 마쳤습니다. :) 오늘은 한 달간 리프 활동을 마친 후기를 적어보고자 합니다. 강의 소개를 간단히 하자면, 제가 들은 강의는 우리를 위한 프로그래밍 : 파이썬 중급

dev-dain.tistory.com

 

728x90
반응형
0 Comments
댓글쓰기 폼