0

I have set up a Pyramid server with one method that sends tasks through Celery (in a distributed fashion):

celery_app = Celery('mycelery', broker='', backend='')

@view_config(route_name='run', request_method='POST')
def run_task(request):
        req = request.json_body
        task = celery_app.signature(
            req['mytaskname'],
            kwargs={'data': req['mykey']},
            queue='jobs'
        ).delay()

if __name__ == '__main__':
    with Configurator() as config:
        config.add_route('run', '/v2/run')
        config.scan()

        app = config.make_wsgi_app()

    server = make_server(API_HOST, int(API_PORT), app)
    info(f'server at {API_HOST}:{API_PORT}')

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass

Note how the task name is included in the request body, which implies that the user need to known beforehand which are the available task names. Now, I just need to set up the worker on a different machine and point to the same Celery broker instance:

celery_app = Celery('mycelery', broker='', backend='')

@celery_app.task(name='example_task_name', bind=True)
def my_task(self, data):
    pass

I want to be able to set up (and kill) workers at any time on different machines, but the server should list me all available task names which I can use in my application with a brief description of them. I thought about using the decorator worker_ready.connect to send a signal to the API, but I am not able to successfully integrate Celery with Pyramid:

@worker_ready.connect
def register_worker(sender, **k):
    celery_app.signature(
        'join',
        kwargs={'task_name': 'example_task_name', 'task_description': 'example task'},
        queue='events'
    ).delay()

Any ideas?

gelonida
  • 5,327
  • 2
  • 23
  • 41
benhid
  • 124
  • 1
  • 17

0 Answers0