파이썬 비동기 처리

파이썬 비동기 처리

Published
August 25, 2024
Last updated
Last updated September 7, 2024
Tistory
Category
💡
이 포스트는 현재 작성중입니다

비동기&동기 그리고 블로킹&논블로킹

JS에서의 비동기 처리

파이썬에서의 비동기 처리

async, await, Coroutine, asyncio

병렬 실행 asyncio.gather()

 

requests와 aiohttp

Django에서의 비동기 처리

asgiref.sync_to_async()

Queryset의 a~ 동작

Celery

 

실습: 사진 다운로드 비동기 처리

class Command(BaseCommand): help = "google_place_id를 이용해 모든 음식점의 사진을 업데이트 합니다" def handle(self, *args, **options): restaurants = Restaurant.objects.all() for restaurant in restaurants: photos = fetch_gmap_photos(restaurant.google_place_id) self.process_photos(restaurant, photos) def process_photos(self, restaurant, photos): for photo_data in photos: try: photo_uri = fetch_gmap_photo_url(photo_data.get("name")) image_name, image_file = self.fetch_photo_file(photo_uri) memo = "" self.create_review_photo(restaurant, memo, image_name, image_file) self.stdout.write( self.style.SUCCESS( f"사진을 추가했습니다({restaurant.__str__()}): [{photo_data.get('widthPx')}x{photo_data.get('heightPx')}]{memo}, {photo_uri}" ) ) except Exception as e: self.stdout.write( self.style.ERROR( f"사진을 추가하는 중 오류가 발생했습니다({restaurant.__str__()})" ) ) self.stdout.write(self.style.ERROR(str(e))) self.stdout.write( self.style.SUCCESS( f"사진{len(photos)}개를 추가했습니다({restaurant.__str__()})" ) ) def fetch_photo_file(self, photo_url) -> Tuple[str, ContentFile]: response = requests.get(photo_url) if response.status_code != 200: raise Exception(response.json()) content_disposition = response.headers.get("Content-Disposition") image_name = content_disposition.split("filename=")[-1].strip('"') return (image_name, ContentFile(response.content)) def create_review_photo(restaurant, memo, image_name, image_file) -> None: review_photo = ReviewPhoto.objects.create( restaurant=restaurant, review=None, memo=memo, ) review_photo.photo_file.save(image_name, image_file, save=True) def fetch_gmap_photo_url(photo_id: str) -> str: """ Places API의 장소 사진 (신규)를 사용하여 사진의 url을 검색합니다. """ if not photo_id: raise ValueError("photo_id is empty") MAX_WIDTH = 1920 # 최대 가능 너비 4800 url = f"https://places.googleapis.com/v1/{photo_id}/media" response = requests.get( url, params={ "key": GOOGLE_MAPS_API_KEY, "maxWidthPx": MAX_WIDTH, "skipHttpRedirect": True, }, ) if response.status_code != 200: raise Exception(response.json()) photo_uri = response.json().get("photoUri", "") return photo_uri

1. fetch_gmap_photo_url()을 async 함수로 변경

def fetch_gmap_photo_url(photo_id: str) -> str: if not photo_id: raise ValueError("photo_id is empty") MAX_WIDTH = 1920 # 최대 가능 너비 4800 url = f"https://places.googleapis.com/v1/{photo_id}/media" response = requests.get( url, params={ "key": GOOGLE_MAPS_API_KEY, "maxWidthPx": MAX_WIDTH, "skipHttpRedirect": True, }, ) if response.status_code != 200: raise Exception(response.json()) photo_uri = response.json().get("photoUri", "") return photo_uri
위 함수에서 비동기 처리가 필요한 부분은 requests.get() 부분이다. 파이썬에서 requests를 비동기로 대체하려면 aiohttp라는 라이브러리를 사용할 수 있다.
async def fetch_gmap_photo_url(photo_id: str) -> str: if not photo_id: raise ValueError("photo_id is empty") MAX_WIDTH = 1920 # 최대 가능 너비 4800 url = f"https://places.googleapis.com/v1/{photo_id}/media" response = requests.get( url, params={ "key": GOOGLE_MAPS_API_KEY, "maxWidthPx": MAX_WIDTH, "skipHttpRedirect": True, }, ) if response.status_code != 200: raise Exception(response.json()) photo_uri = response.json().get("photoUri", "") return photo_uri
그리고 async fetch_gmap_photo_url() 비동기 함수를 호출하기 위해 asyncio.run()을 사용해준다.
def process_photos(self, restaurant, photos): ... photo_uri = asyncio.run(fetch_gmap_photo_url(photo_data.get("name")))
 

photo_uri들을 일괄적으로 가져오기

이렇게만 변경한다면 전체 절차는 어짜피 동기적으로 흘러가기 때문에 성능상 이점은 없다. 최종적인 코드는 모두 비동기로 묶어 process_photos 자체를 비동기적으로 실행하겠지만, 그 전에 비동기 동시 처리시의 속도 향상을 비교해보기 위해 코드를 간단하게 수정하여 테스트 해보았다.
async def fetch_all_photo_urls(self, photos): tasks = [fetch_gmap_photo_url(photo_data.get("name")) for photo_data in photos] return await asyncio.gather(*tasks) # photo_uri들을 동기적으로 일괄 가져오기 print("동기적으로 가져오기 시작 -----") photo_urls = [ fetch_gmap_photo_url_sync(photo_data.get("name")) for photo_data in photos ] print("동기적으로 가져오기 끝 -----") # photo_uri들을 비동기적으로 일괄 가져오기 print("비동기적으로 가져오기 시작 -----") photo_urls = asyncio.run(self.fetch_all_photo_urls(photos)) print("비동기적으로 가져오기 끝 -----")
 
결과는 좌측 영상에서 확인할 수 있다.
네트워크 작업에서 latency는 CPU 연산보다는 통신 작업에서 대부분 발생하기 때문에 기다리는 시간을 동시에 진행하게 만듦으로서 속도가 매우 빨라지는 것을 확인할 수 있다.
특히 사진과 같이 무거운것을 가져오는 것도 아니고, 단순히 url string만을 가져오는 요청도 큰 차이가 나는 것을 볼 수 있다. 그렇기에 이후 진행할 사진 다운로드는 이것보다 더 큰 차이가 나기에, 네트워크 요청에서의 비동기 처리의 중요성을 알 수 있다.