I have set up a Pyramid server with one method that sends tasks through Celery (in a distributed fashion):
celery_app = Celery('mycelery', broker='', backend='')
@view_config(route_name='run', request_method='POST')
def run_task(request):
req = request.json_body
task = celery_app.signature(
req['mytaskname'],
kwargs={'data': req['mykey']},
queue='jobs'
).delay()
if __name__ == '__main__':
with Configurator() as config:
config.add_route('run', '/v2/run')
config.scan()
app = config.make_wsgi_app()
server = make_server(API_HOST, int(API_PORT), app)
info(f'server at {API_HOST}:{API_PORT}')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
Note how the task name is included in the request body, which implies that the user need to known beforehand which are the available task names. Now, I just need to set up the worker on a different machine and point to the same Celery broker instance:
celery_app = Celery('mycelery', broker='', backend='')
@celery_app.task(name='example_task_name', bind=True)
def my_task(self, data):
pass
I want to be able to set up (and kill) workers at any time on different machines, but the server should list me all available task names which I can use in my application with a brief description of them. I thought about using the decorator worker_ready.connect to send a signal to the API, but I am not able to successfully integrate Celery with Pyramid:
@worker_ready.connect
def register_worker(sender, **k):
celery_app.signature(
'join',
kwargs={'task_name': 'example_task_name', 'task_description': 'example task'},
queue='events'
).delay()
Any ideas?