🍜 Python asyncio - 비동기 asyncio.gather / asyncio.create_task 응용편
1️⃣ await asyncio.gather(*tasks)만 사용하는 경우
레스토랑의 요리사들이 각 요리를 준비하고 모든 요리가 다 완성된 후 한꺼번에 손님에게 서빙한다.
- 메뉴: 샐러드, 파스타, 스테이크
import asyncio
import time
async def prepare_dish(dish):
print(f"Starting to prepare {dish}...")
await asyncio.sleep(2) # 준비 시간
print(f"{dish} is ready!")
return f"{dish} is ready!"
async def main():
# 메뉴 주문
orders = ["Salad", "Pasta", "Steak"]
tasks = [prepare_dish(dish) for dish in orders]
# 모든 요리가 완성될 때까지 기다렸다가 서빙
results = await asyncio.gather(*tasks)
print("Serving all dishes at once:")
for result in results:
print(result)
# 실행
asyncio.run(main())
실행 결과 (로그)
Starting to prepare Salad...
Starting to prepare Pasta...
Starting to prepare Steak...
Salad is ready!
Pasta is ready!
Steak is ready!
Serving all dishes at once:
Salad is ready!
Pasta is ready!
Steak is ready!
2️⃣ asyncio.create_task + await asyncio.gather 함께 사용하는 경우
요리사들이 요리를 독립적으로 준비하고 모든 요리가 완성된 후 한꺼번에 손님에게 서빙한다.
async def prepare_dish(dish):
print(f"Starting to prepare {dish}...")
await asyncio.sleep(2) # 준비 시간
print(f"{dish} is ready!")
return f"{dish} is ready!"
async def main():
orders = ["Salad", "Pasta", "Steak"]
tasks = [asyncio.create_task(prepare_dish(dish)) for dish in orders]
# 요리가 끝났을 때 한꺼번에 결과를 확인
results = await asyncio.gather(*tasks)
print("Serving all dishes at once:")
for result in results:
print(result)
asyncio.run(main())
실행 결과(로그)
Starting to prepare Salad...
Starting to prepare Pasta...
Starting to prepare Steak...
Salad is ready!
Pasta is ready!
Steak is ready!
Serving all dishes at once:
Salad is ready!
Pasta is ready!
Steak is ready!
3️⃣ asyncio.create_task만 사용하는 경우
요리가 준비될 때마다 즉시 손님에게 서빙된다. 모든 요리가 끝날 때까지 기다리지 않고, 준비된 요리부터 바로바로 서빙한다.
async def prepare_and_serve_dish(dish):
print(f"Starting to prepare {dish}...")
await asyncio.sleep(2) # 준비 시간
print(f"{dish} is ready! Serving {dish} now.")
return f"{dish} is served!"
async def main():
orders = ["Salad", "Pasta", "Steak"]
tasks = [asyncio.create_task(prepare_and_serve_dish(dish)) for dish in orders]
# 각 요리가 준비될 때마다 서빙됨
for task in tasks:
await task
asyncio.run(main())
실행 결과(로그)
Starting to prepare Salad...
Starting to prepare Pasta...
Starting to prepare Steak...
Salad is ready! Serving Salad now.
Pasta is ready! Serving Pasta now.
Steak is ready! Serving Steak now.
요약
- await asyncio.gather(*tasks): 모든 작업이 끝날 때까지 기다렸다가 한꺼번에 결과를 확인
- asyncio.create_task + await asyncio.gather: 작업들을 개별적으로 시작하여, 모든 작업이 완료되면 한꺼번에 결과를 확인
- asyncio.create_task만 사용: 작업이 끝날 때마다 바로바로 결과를 확인하며 진행
1️⃣ ㆍ2️⃣ 의경우 로그가 같다, 성능도 같을까?
예시함수
async def fetch_and_chunk_file(session, url, output_folder, chunk_size=500000):
try:
async with session.get(url, timeout=7200) as response:
response.raise_for_status()
content_encoding = response.headers.get("Content-Encoding", "")
decompressed_data = await response.read()
if content_encoding == "gzip":
try:
with gzip.GzipFile(fileobj=BytesIO(decompressed_data)) as gz:
decompressed_data = gz.read()
except gzip.BadGzipFile:
logger.warning("Failed to decompress as gzip.")
decoded_data = None
for enc in ["cp949"]:
try:
decoded_data = decompressed_data.decode(enc)
logger.info(f"Successfully decoded with encoding: {enc}")
break
except UnicodeDecodeError:
logger.warning(f"Failed to decode with encoding: {enc}")
if decoded_data is None:
raise UnicodeDecodeError(
"Unable to decode the data with any of the attempted encodings."
)
lines = decoded_data.splitlines()
chunk = []
for i, line in enumerate(lines):
chunk.append(line)
if (i + 1) % chunk_size == 0:
await save_chunk_as_file(chunk, output_folder, i // chunk_size)
chunk = []
if chunk:
await save_chunk_as_file(chunk, output_folder, len(lines) // chunk_size)
except aiohttp.ClientError as e:
logger.error(f"Error fetching file from {url}: {e}")
raise
위함수는 청크 50만 로우마다 디코드된 데이터를 save_chunk_as_file 함수로 보내서 txt로 저장하려는 예시함수다.
이걸 응용해보자
1️⃣ await asyncio.gather(*tasks) 만
tasks = [
fetch_and_chunk_file(session, URLS[name], f"api/ssg/txtToexcel/data/{name}")
for name in url_keys
if name in URLS
]
await asyncio.gather(*tasks)
2️⃣ asyncio.create_task + await asyncio.gather
tasks = [
asyncio.create_task(
fetch_and_chunk_file(session, URLS[name], f"api/ssg/txtToexcel/data/{name}")
)
for name in url_keys
if name in URLS
]
await asyncio.gather(*tasks)
차이점은 asyncio.create_task와 함께 await asyncio.gather 를 사용하는 경우 인지는 이미 파악하셨을것같다.
asyncio.create_task로 독립적으로 실행되므로, 작업 완료 시점에서 바로 메모리가 회수되고 다른 백그라운드 작업을 더 쉽게 시작할 수 있다.
fetch_and_chunk_file 함수가 청크 단위로 데이터를 처리하고, 각 청크마다 파일에 저장하는 구조라면, 청크가 처리될 때마다 메모리가 빠르게 반환될 수 있기 때문에 create_task를 사용하여 백그라운드에서 실행하면 메모리 사용을 최적화하는 데 도움이 되기 때문이다.
전체 작업 완료를 기다리기보다, 작업이 끝나는 순서대로 백그라운드에서 메모리를 회수하게 되므로, 대규모 데이터에서 더 유리할 수 있다.
asyncio.create_task로 작업을 생성하면 각 작업이 독립적으로 실행되고, 실행이 완료될 때 메모리를 해제 할 수 있다. await asyncio.gather(*tasks)는 모든 작업이 완료될 때까지 기다리지만, 개별 작업은 fetch_and_chunk_file에서 chunk_size로 데이터를 한 번에 처리하고 청크를 파일에 저장하면서 메모리가 반환된다.
응용편 끝.