
오랜 기간 동안 프로젝트를 진행하며 시간에 쫓기다보니, 구현한 기능들에 대해서 정리할 기회가 잘 없었던 것 같습니다. 그래서 최근에 리팩토링 중인 비동기 작업 처리 기능에 대해 정리해보고자 합니다.
Celery란?
- Celery는 분산 메세지 전달을 기반으로 하는 파이썬 오픈소스 비동기 작업 큐(task queue) 시스템으로, 분산 처리를 지원하며 작업을 백그라운드에서 처리하는 데 사용합니다.
- Celery의 핵심 키워드: Producer(작업 요청), Broker(작업 전달), Worker(작업 실행), Backend(결과 저장)

왜 도입했는가?
- 장기 실행되는 데이터 처리 파이프라인을 비동기 작업으로 실행하고, 시스템 리소스를 관리하기 위해 사용
- 프로젝트에서 워크플로우 기반 작업 실행, 작업 상태 관리에 활용

웹에서 데이터 분석 워크플로우를 정의하여 작업을 실행하는 것이 프로젝트의 주요 요구 사항이었습니다. 여러 유저들이 동시에 긴 실행 시간을 요구하는 작업을 처리하고 관리할 수 있도록 구현해야했기 때문에 비동기 작업 처리가 가능한 Celery를 도입하여 구현했습니다.
Celery 적용 방식 및 주요 코드 소개
Celery 설정
긴 실행 시간을 요구하는 작업들을 비동기로 처리해야하는 프로젝트 요구사항에 맞게 celery의 주요 설정들을 정의했습니다.
주요 설정은 다음과 같습니다.
- task_track_started=True : 해당 설정을 활성화하면 "PENDING"이 아닌 "STARTED"로 작업 상태가 기록되기 때문에, 작업이 언제 시작되는지 확인할 수 있습니다.
- task_acks_late=False : False일 경우, 작업이 시작되자마자 큐에서 제거됩니다. 즉, 작업이 중간에 실패해도 다시 실행되지 않습니다(작업이 실패하면 사용자에게 실패 원인 로그를 보여주기 위해 설정했습니다).
- broker_transport_options(visibility_timeout, confirm_publish, confirm_timeout) : 메세지 브로커(RabbitMQ)에서 작업이 사라지지 않도록 메세지 처리 및 확인 시간을 정의하고, 전송 여부에 대해 확인하도록 하는 설정했습니다.
- 이외에 broker가 붙은 여러 설정들은 긴 작업 시간에도 RabbitMQ와 Celery의 연결을 유지하여 rabbitmq time out 오류 방지를 위해 설정했는데, 이에 대해 완벽히 해결하진 못해서 조금 더 고민이 필요한 부분 같습니다.
from celery import current_app as current_celery_app
from app.common.config import settings
def create_celery():
celery_app = current_celery_app
celery_app.config_from_object(settings, namespace='CELERY')
celery_app.conf.update(task_track_started=True)
celery_app.conf.update(task_acks_late=False)
celery_app.conf.update(task_serializer='json')
celery_app.conf.update(result_serializer='json')
celery_app.conf.update(accept_content=['json'])
celery_app.conf.update(result_expires=200)
celery_app.conf.update(result_persistent=True)
celery_app.conf.update(worker_send_task_events=True)
# 긴 작업을 위한 타임아웃 설정 추가
celery_app.conf.update(broker_transport_options={
'visibility_timeout': 400000,
'confirm_publish': True,
'confirm_timeout': 60.0
})
# Task 실행 시간 제한
celery_app.conf.update(task_time_limit=86400)
celery_app.conf.update(task_soft_time_limit=86200)
# 연결 안정성 강화
celery_app.conf.update(
broker_heartbeat=0,
broker_connection_timeout=60,
broker_connection_retry=True,
broker_connection_max_retries=10,
broker_connection_retry_delay=1
)
return celery_app
프로젝트의 백엔드가 FastAPI로 구성되어 있기 때문에 Celery를 이와 통합하여 동시에 실행되도록 코드를 작성했습니다.
실행 컨테이너(Docker)는 나뉘어져 있지만 FastAPI 어플리케이션이 실행되면 Celery 인스턴스가 생성되도록 의도했습니다.
from fastapi import FastAPI
from app.common.config import settings
from app.common.utils.celery_utils import create_celery
app = FastAPI(
title=settings.PROJECT_NAME
)
app.celery_app = create_celery()
celery의 메세지 브로커인 RabbitMQ와의 연결과 작업 결과를 저장하는 백엔드인 PostgreSQL과의 연결을 설정하고, 작업 전용 큐를 생성하여 어플리케이션에서 실행하는 작업은 'workflow_task'로 라우팅 되도록 설정했습니다.
from kombu import Queue
from os import environ
from pydantic import BaseSettings
class Settings(BaseSettings):
CELERY_BROKER_URL: str = environ.get("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")
CELERY_RESULT_BACKEND: str = environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
CELERY_TASK_QUEUES: list = (
Queue("celery"),
Queue("workflow_task"),
)
CELERY_TASK_ROUTES = (route_task,)
settings = Settings()
Celery 워커를 실행하면서 주기적으로 시스템 리소스를 모니터링하여 일정 기준 이상 사용되면 큐를 중지, 재개하도록 했습니다.
해당 코드는 시스템 리소스를 관리할만한 마땅한 방안이 떠오르지 않아 임시로 적용했었습니다... 적절한 개선 방안이 있을 시 조언해주시면 감사하겠습니다..!
import psutil
import GPUtil
import time
from celery import current_app as celery_app
CPU_USAGE_LIMIT = 85
MEMORY_USAGE_LIMIT = 90
GPU_USAGE_LIMIT = 90
def check_system_usage():
while True:
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
gpus = GPUtil.getGPUs()
gpu_usage = max((gpu.load * 100 for gpu in gpus), default=0)
if cpu_usage >= CPU_USAGE_LIMIT or memory_usage >= MEMORY_USAGE_LIMIT or gpu_usage >= GPU_USAGE_LIMIT:
print("⚠️ 리소스 초과")
celery_app.control.cancel_consumer("workflow_task")
else:
print("✅ 정상 상태")
celery_app.control.add_consumer("workflow_task")
time.sleep(5)
celery 컨테이너를 실행하는 Dockerfile에 정의되어 있는 실행 명령어입니다.
현재는 단일 워커만 실행하도록 설정했지만 추후 Task를 처리할 큐를 확장함에 따라 Worker도 확장할 계획입니다.
CMD ["python", "-m", "celery", "-A", "app.main.celery", "worker", "-n", "worker@%h", "--loglevel=info", "-E", "-Q", "workflow_task"]
Celery 작업 실행
FastAPI 어플리케이션에서 Celery Task가 실행되는 코드입니다.
해당 코드를 통해 API는 즉시 응답을 반환하지만 비동기적(apply_async)으로 백그라운드에서 Celery Task가 실행됩니다.
from fastapi import APIRouter
router = APIRouter()
@router.post("/compile")
def compileWorkflow():
...
# Celery 작업 실행
process_task = process_data_task.apply_async(
args=[current_user.username, user_snakefile_path, plugin_dependency_path, target_list],
kwargs={'user_id': current_user.id, 'workflow_id': workflow.id},
ignore_result=False
)
...
커스텀 Task 클래스를 정의하여 Task의 시점(before_start, after_return) 혹은 상태(on_success, on_failure, on_revoke)에 따라 Task 정보를 데이터베이스에 저장하도록 했습니다.
추가로 MyRequest를 통해 Task 실패 시, 예외 처리를 강화하여 확실한 실패 처리를 기록하도록 의도했습니다
class MyRequest(Request):
"""Custom request to detect RuntimeError and ensure task failure is logged correctly."""
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super().on_failure(exc_info, send_failed_event=send_failed_event, return_ok=return_ok)
exception = exc_info.exception
if isinstance(exception, RuntimeError):
logger.warning(f"RuntimeError detected in task {self.task.name}: {exception}")
else:
logger.warning(f"Failure detected in task {self.task.name}: {exception}")
class MyTask(Task):
Request = MyRequest # Custom Request class 적용
def before_start(self, task_id, args, kwargs):
start_time = datetime.now()
print(f'Task {task_id} started at {start_time}')
user_id = kwargs.get('user_id')
workflow_id = kwargs.get('workflow_id')
start_task(user_id, task_id, workflow_id, start_time)
def on_success(self, retval, task_id: str, args, kwargs):
end_time = datetime.now()
print(f'Task {task_id} completed at {end_time}, return value: {retval}')
user_id = kwargs.get('user_id')
end_task(user_id, task_id, end_time, status='SUCCESS')
def on_failure(self, exc, task_id: str, args, kwargs, einfo):
"""Ensure the failure is logged and state is correctly updated."""
logger.error(f"Task {task_id} failed due to {exc}")
end_time = datetime.now()
print(f'Task {task_id} failed at {end_time}, error: {exc}')
user_id = kwargs.get('user_id')
end_task(user_id, task_id, end_time, status='FAILURE')
def on_revoke(self, task_id: str, kwargs, terminated, signum, expired):
end_time = datetime.now()
print(f'Task {task_id} revoked at {end_time}')
user_id = kwargs.get('user_id')
end_task(user_id, task_id, end_time, status='REVOKED')
def after_return(self, status, retval, task_id, args, kwargs, einfo):
print('----------------------------------------')
print(f'Task {task_id} returned with status {status}, return value: {retval}')
print('----------------------------------------')
Celery Task를 @shared_task 데코레이터를 사용해 정의했습니다.
주요 설정은 아래와 같습니다.
- bind=True : Task 인스턴스(self)를 바인딩하여 self.~~ 형식의 메서드 사용 가능(예를 들어 self.update_state()로 직접 Task 상태를 코드 실행 중 바꿀 수 있습니다).
- base=MyTask: MyTask 클래스를 상속하여 커스텀 Task 이벤트를 발생.
- name="workflow_task:process_data_task": 특정 큐(앞서 정의한 workflow_task 큐)에서 실행될 수 있도록 네임스페이스 지정.
@shared_task(bind=True, base=MyTask, name="workflow_task:process_data_task")
def process_data_task(self, username: str, snakefile_path: str, plugin_dependency_path: str,
targets: list, user_id: int, workflow_id: int):
...
result = snakemakeProcess(targets, snakefile_path)
if result["returncode"] != 0:
error_message = f"Snakemake process failed with error:\n{result.get('stderr', 'No error message')}"
self.update_state(state="FAILURE", meta={"error": error_message})
raise RuntimeError(error_message)
else:
return {"status": "Success", "message": "Processing complete"}
앞으로의 개선 방향
현재 프로젝트에서 구현한 Celery 기반의 작업 처리 시스템은 긴 실행 시간을 가진 작업을 비동기 방식으로 수행할 수 있도록 설계했지만, 몇 가지 보완점을 정리해봤습니다.
1. RabbitMQ Timeout 오류 방지 설정 부족
- 장기 실행 작업이 많아질 경우 RabbitMQ와 Celery 간 연결이 끊어질 가능성이 있음.
- 메세지 브로커 관련 설정 조정을 통해 해결 가능한지 검토해봐야 함.
2. 시스템 리소스 관리 최적화 부족
- 현재 CPU, 메모리, GPU 사용량 모니터링이 가능하지만, 리소스 초과 시 효율적으로 워커를 제어하는 기능이 부족함.
- 동적 작업 할당 및 워커 자동 확장 기능을 고려해볼 수 있음.
3. 작업 대기열 관리 기능 부족
- 모든 작업이 동일한 큐에서 실행되므로 우선순위 기반 태스크 처리 방식이 필요함.
- 작업 유형에 따라 다른 큐를 사용하는 전략을 도입을 고려해볼 수 있음.
프로젝트 코드와 같이 기능들을 정리하다보니 Celery에 대해서 개념 정리가 되면서도 부족한 점들이 잘 보이네요..! 다음 글에서는 보완점을 개선한 결과를 다뤄보겠습니다.
감사합니다.
'백엔드' 카테고리의 다른 글
| Docker 기반 플러그인 아키텍처 설계 (0) | 2026.03.29 |
|---|---|
| Celery 비동기 시스템 개선기: 리소스 기반 동시성 제어 (0) | 2026.03.28 |
| Celery 비동기 시스템 개선기: Timeout 오류 해결 (0) | 2026.03.28 |
| Alembic을 이용한 DB 테이블 생성 레거시 개선하기 (0) | 2026.03.25 |
| Docker로 GPU 환경 구성하기 (0) | 2025.03.30 |