셀러리의 대기열에 있는 작업 목록 검색
대기열에서 아직 처리되지 않은 작업 목록을 검색하려면 어떻게 해야 합니까?
편집: 대기열에 있는 작업 목록을 보려면 다른 답변을 참조하십시오.
당신은 여기를 봐야 합니다: 셀러리 가이드 - 근로자 검사
기본적으로 다음과 같습니다.
my_app = Celery(...)
# Inspect all nodes.
i = my_app.control.inspect()
# Show the items that have an ETA or are scheduled for later processing
i.scheduled()
# Show tasks that are currently active.
i.active()
# Show tasks that have been claimed by workers
i.reserved()
원하는 항목에 따라
Celery+Django를 사용하여 가상 환경에서 터미널에서 직접 명령을 사용하거나 셀러리의 전체 경로를 사용하여 작업을 검사하는 가장 간단한 방법은 다음과 같습니다.
문서: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled
또한 셀러리+래빗을 사용하는 경우에도 마찬가지입니다.MQ 다음 명령을 사용하여 대기열 목록을 검사할 수 있습니다.
더 많은 정보: https://linux.die.net/man/1/rabbitmqctl
$ sudo rabbitmqctl list_queues
만약 당신이 rabbitMQ를 사용하고 있다면, 터미널에서 이것을 사용하세요:
sudo rabbitmqctl list_queues
대기 중인 작업 수가 포함된 대기열 목록을 인쇄합니다.예:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
celerey_mail_worker@torob2.celery.pidbox 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
오른쪽 열의 숫자는 대기열에 있는 작업 수입니다.위의 셀러리 큐에는 166개의 보류 중인 작업이 있습니다.
우선 순위가 지정된 작업을 사용하지 않는 경우 Redis를 사용하는 경우 이 작업은 매우 간단합니다.작업 개수를 가져오는 방법
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
그러나 우선순위가 지정된 작업은 redis에서 다른 키를 사용하므로 전체 그림이 약간 더 복잡합니다.전체 그림은 작업의 모든 우선순위에 대해 redis를 쿼리해야 한다는 것입니다.파이썬(및 플라워 프로젝트)에서 이것은 다음과 같습니다.
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
실제 태스크를 가져오려면 다음과 같은 방법을 사용할 수 있습니다.
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
여기서 반환된 목록을 역직렬화해야 합니다.저의 경우 다음과 같은 작업을 수행할 수 있었습니다.
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
역직렬화에는 잠시 시간이 걸릴 수 있으며 다양한 우선 순위로 작업하려면 위의 명령을 조정해야 합니다.
백엔드에서 작업을 검색하려면 다음을 사용합니다.
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
Json 직렬화를 통한 Redis용 복사 붙여넣기 솔루션:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
그것은 장고와 함께 작동합니다. 바꾸는.yourproject.celery
.
이것은 제 애플리케이션에서 효과가 있었습니다.
def get_queued_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
active_jobs
대기열의 작업에 해당하는 문자열 목록이 됩니다.
CELERY_APP_INSTANCE를 자신의 것과 교환하는 것을 잊지 마십시오.
@ashish가 여기서 그의 답변으로 저를 올바른 방향으로 안내해 주셔서 감사합니다: https://stackoverflow.com/a/19465670/9843399
셀러리 검사 모듈은 작업자의 관점에서만 작업을 인식하는 것으로 보입니다.대기열에 있는 메시지(아직 작업자가 풀링하지 않음)를 보려면 rabbitmq http api와 인터페이스하여 대기열에서 모든 종류의 정보를 검색할 수 있는 pyrabbit을 사용하는 것이 좋습니다.
예를 들어 셀러리를 사용하여 대기열 길이 검색(RabbitMQ, 장고)
대기 중인 작업을 가져올 수 있는 유일한 방법은 시작한 작업 목록을 유지하고 작업이 시작되면 목록에서 자동으로 제거되도록 하는 것입니다.
rabbitmqctl 및 list_message를 사용하면 대기 중인 작업 수에 대한 개요를 얻을 수 있지만 작업 자체는 얻을 수 없습니다. http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
처리 중인 태스크가 포함되어 있지만 아직 완료되지 않은 경우 태스크 목록을 유지하고 태스크 상태를 확인할 수 있습니다.
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
또는 CELERY_RESULT_BACKEND를 사용하여 결과를 저장하고 어떤 태스크가 없는지 확인할 수 있습니다.
제가 알기로는 셀러리는 대기열에서 대기 중인 작업을 검사하기 위한 API를 제공하지 않습니다.브로커별로 다릅니다.에는 "Redis"에서 합니다.celery
(기본값) 대기열은 다음과 같이 단순합니다.
- 중개인에게 연결
- 에 항목
celery
) list(예: LRANGE 명령)
이러한 작업은 사용 가능한 작업자가 선택하기를 기다리는 작업입니다.클러스터에서 일부 작업이 실행 중일 수 있습니다. 이러한 작업은 이미 선택되었기 때문에 이 목록에 없습니다.
특정 대기열에서 작업을 검색하는 프로세스는 브로커별로 다릅니다.
제가 내린 결론은 대기열에 있는 작업의 수를 사용하는 것입니다.rabbitmqctl
여기서 여러 차례 제안된 바와 같이선택한 사용자에게 명령 실행을 허용하려면sudo
저는 여기에 있는 지침을 따랐습니다(명령 전에 sudo를 입력해도 상관없기 때문에 프로필 부분을 편집하지 않았습니다).
는 또한의 Jamesc의 Jamesc를 잡았습니다.grep
그리고.cut
스니펫을 사용하여 하위 프로세스 호출로 마무리했습니다.
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
처음 될 때 재시도를 한 작업 코 제 를 재 트 한 거 다 리 확 음 인 하 수 있 다 니 습 문 해 할 제 결 를 여 실 처 음 행 드 때 될 사 를 도 작 어 소 한 하 경 시 는업 우 이 ▁the▁around ▁letting ▁if ▁by ▁problem ▁you ▁work ▁the ▁the ▁you ▁of ▁can ▁trigger 작 ▁a ▁then▁ainspect().reserved()
다시 시도하면 작업이 결과 백엔드에 등록되고 셀러리에서 이를 확인할 수 있습니다.작업이 수락해야 합니다.self
또는context
재시도 횟수에 액세스할 수 있도록 첫 번째 매개 변수로 사용합니다.
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
이 솔루션은 브로커에 구애받지 않습니다. 즉, Rabbit를 사용하는지 여부에 대해 걱정할 필요가 없습니다.MQ 또는 Red는 태스크를 저장합니다.
편집: 테스트한 결과 이것은 부분적인 해결책에 불과하다는 것을 알게 되었습니다.예약된 크기는 작업자의 프리페치 설정으로 제한됩니다.
from celery.task.control import inspect
def key_in_list(k, l):
return bool([True for i in l if k in i.values()])
def check_task(task_id):
task_value_dict = inspect().active().values()
for task_list in task_value_dict:
if self.key_in_list(task_id, task_list):
return True
return False
와 함께subprocess.run
:
import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))
변경에 주의하십시오.my_proj
와 함께your_proj
꽃 라이브러리를 사용할 수 있는 대기열의 태스크 수를 가져오려면 다음과 같이 단순화된 예를 제시합니다.
from flower.utils.broker import Broker
from django.conf import settings
def get_queue_length(queue):
broker = Broker(settings.CELERY_BROKER_URL)
queues_result = broker.queues([queue])
return queues_result.result()[0]['messages']
언급URL : https://stackoverflow.com/questions/5544629/retrieve-list-of-tasks-in-a-queue-in-celery
'programing' 카테고리의 다른 글
.gitconfig를 사용하여 diff 도구 구성 (0) | 2023.07.19 |
---|---|
C에서 "계속하려면 아무 키나 누르기" 기능 (0) | 2023.07.19 |
iOS 10.0 런타임 충돌에서 NSCameraUsageDescription? (0) | 2023.07.19 |
다른 최적화 수준이 기능적으로 다른 코드로 이어질 수 있습니까? (0) | 2023.07.19 |
LOCAL, BASE 또는 REMOTE 중 최종적으로 사용될 Git 파일의 버전은 무엇입니까? (0) | 2023.07.14 |