IT/django

django-celery-beat으로 periodic tasks 동적으로 제어하기

seyeonHello 2021. 1. 21. 14:51

celery?

task queue를 만들어 필요한 일을 등록 후 백그라운드에서 처리가 가능합니다.

celery는 비동기 처리 방식을 사용하기 때문에, 사용자가 해당 작업을 기다리지 않고 다른 작업을 진행할 수 있어 사용자의 속도 측면에서 유리합니다.

→ 또한 주기적으로 반복적인 일을 수행해야하는 경우, 스케줄러 celery beat를 이용합니다. 

 

시작하기

$ pip3 install django-celery-beat

 

settings.py

INSTALLED_APPS = [
    'django_celery_beat',
]


'''
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
'''

 

$ python3 manage.py migrate

 

celery.py

from __future__ import absolute_import, unicode_literals
import sys
from kombu.utils import encoding
sys.modules['celery.utils.encoding'] = encoding

import os, django
from celery import Celery
from datetime import timedelta
from django.conf import settings
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', '프로젝트이름.settings')

app = Celery('프로젝트이름', include=['프로젝트이름.tasks'])
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Asia/Seoul',
    CELERY_ENABLE_UTC=False,
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler',
)

django.setup()


app.autodiscover_tasks()
if __name__ == '__main__':
    app.start()

 

tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app

from django.conf import settings

import django
django.setup()

from celery import shared_task

@shared_task
def test_task():
    print("hello world")

 

한 앱의 view.py

from django_celery_beat.models import PeriodicTask, IntervalSchedule #model를 import한다.


def start_task(request):
    schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)
    if PeriodicTask.objects.filter(name='test_task').exists(): #'test_task'가 등록되어 있으면,
    	p_test=PeriodicTask.objects.get(name='test_task')
        p_test.enabled=True #실행시킨다.
        p_test.interval=schedule
        p_test.save()
    else: #'test_task'가 등록되어 있지 않으면, 새로 생성한다
        PeriodicTask.objects.create(
        interval=schedule,  #앞서 정의한 schedule       
        name='test_task',          
        task='bracken.tasks.test_task',
        )

 

→ 마찬가지로 task를 중지시키고 싶다면, p_test.enabled=False로 변경하면 됩니다. 저같은 경우, REST 프레임워크를 통해 enable 상태, 주기등을 변경하였습니다. 

 

admin페이지에서도 확인할 수 있습니다.

맨 첫번째 행 test_task: every 10 seconds에 들어가 수정 또한 가능합니다.

 

마지막으로 실행해보겠습니다.

$ celery -A 프로젝트이름 worker --beat --loglevel=info

 

 


참고 사이트

https://www.cnblogs.com/-wenli/p/13746509.html

 

Django 使用django-celery-beat实现动态添加周期性任务 - -零 - 博客园

前期准备 1.beat插件安装 pip3 install django-celery-beat 2.注册APP INSTALLED_APPS = [ .... 'django_celery_b

www.cnblogs.com