HTTP Gateway : Invoking Celery Tasks from Java (Non Python Application) – Part #3

HTTP Gateway is the ideal way for celery tasks invocation and status polling from any non python languages like java. For this, first we have to set up a django application which would handle all the incoming requests for tasks invocation and status polling. As a first step, we need to install python virtual enviroment named env (not mandatory, But I prefer this as this wont affect python packages installed in the machine).

Step1 :
activate the virtual environment and install all required libraries like django,djecelery etc.

Step 2:
copy and paste the django app for http-gateway in this activated virtual environment,env.(I have uploaded the code for this app in github, https://github.com/nishijayaraj/celery-HTTPgateway)

This app contains all tasks in tasks.py file. Now run the app from this activated virtual environment using the following command :
python manage.py runserver

Now with this, our djnago app would start at localhost:8080, to which we can send rest api calls. It is likely to be getting some module not found errors at this point, in case u get them, just try to install the missing packages and libraries in this activated virtual environment. and then run the app again.

The structure of the entire app is described in the image added below :
Screenshot from 2016-07-14 18:21:27

Now we need workers to execute the tasks. So open a terminal and run the following command from the root directory of our application (env/celery-HTTPgateway),

celery worker -A tasks –loglevel=INFO

It would then list all registered tasks there. We would be able to see to tasks,
1.tasks.hello_world
2.UploadTask

Now let’s see how we can invoke a celery task and poll its status using REST apis. These rest apis can be called from any programming languages using appropriate native apis.

For the sake of simplicity, here I am using linux curl command for simulating REST api calls.
( / at end of the url is mandatory for this command to work)

Open a terminal and run the following command,

curl -X GET http://localhost:8000/apply/tasks.hello_world/

Then you would a json data as response,

{“ok”: “true”, “task_id”: “0fc2150e-b321-4cc6-aaef-b1ce9b30e7fe”}

The respose contains the id of the invoked tasks which can be used to track its status.

Status Polling:

curl -X GET http://localhost:8000/0fc2150e-b321-4cc6-aaef-b1ce9b30e7fe/status/

Response:
{“task”: {“status”: “SUCCESS”, “result”: “Hello world………”, “id”: “0fc2150e-b321-4cc6-aaef-b1ce9b30e7fe”}}

These api would display custom states,something at which celery flower apis fails most of the time. A task named UploadTask has been written in tasks.py file with a view to showcase this feature. For this, first we need to invoke the tasks and then track the status.

Task invocation :

curl -X GET http://localhost:8000/apply/tasks.UploadTask/
Response –
{
“ok”: “true”,
“task_id”: “cc51e093-372f-42c1-8344-c1def70c544a”
}

The status checking of above task, can be done :

curl -X GET http://localhost:8000/cc51e093-372f-42c1-8344-c1def70c544a/status/
Response
{
“task”: {
“status”: “PROGRESS”,
“result”: {
“progress”: 0
},
“id”: “cc51e093-372f-42c1-8344-c1def70c544a”
}
}

References :
http://sofc.developer-works.com/article/25718503/How+to+use+Celery+in+Java+web+Application
https://github.com/ask/celery/tree/master/examples/celery_http_gateway

Advertisements

Invoking Celery Tasks from Java Application – Part #2

In the previous post we have seen how to invoke a celery tasks from java application. but it was based on sending messge to  rabbitMQ queue using respective rabbitMQ libraries. But in this post, let’s be be familiar with more convenient way or rather using Rest APIs.

For this, we need to install a celery monitoring tool called flower. Not all version of flower is supposed to serve our purpose. What worked for me is the development version. (the command to install is written below)
pip install https://github.com/mher/flower/zipball/master#egg=flower

So let me assume that we have tasks.py with a task named add

@app.task
def add(x, y):
print x+y

Now run the worker
celery -A tasks worker –loglevel=info

Starting flower
Finally it is time to start flower so that we access/control both tasks and workers using flower REST apis. For that we need to run the following command :

celery flower -A appname (celery flower -A tasks)

Care should be taken to specify the project name in the above command(here tasks) when we start flower because the apis would not work properly otherwise.

Now this can be viewed from the url http://localhost:5555 (or using respective hostname). This has got different tabs to show the status of tasks, workers and so on. So basically what we are going to do is, use the the apis which flower is using for aforementioned feature, directly in our application.

In order to simulate REST api call, throughout this post I am using curl command as I am coming from linux background. This apis can be integrated from any programming languages.

1. Invoking a celery task

curl -X POST -d ‘{“args”:[1,2]}’ http://localhost:5555/api/task/async-apply/tasks.add

this would trigger celery task add with parameters 1 and 2 and would generate an output similar to the following:

{
“task-id”: “81775ebb-7d88-4e91-b580-b3a2d79fe668”,
“state”: “PENDING”
}

So this api would return the task id of the generaed task, which can be used for tracking it whenever we want.

2. Retrieving information regarding a specific task using its id

curl -X GET http://localhost:5555/api/task/info/81775ebb-7d88-4e91-b580-b3a2d79fe668

output :
{
“task-id”: “81775ebb-7d88-4e91-b580-b3a2d79fe668”,
“result”: “‘None'”,
“clock”: 371,
“routing_key”: null,
“retries”: 0,
“failed”: false,
“state”: “SUCCESS”,
“kwargs”: “{}”,
“sent”: false,
“expires”: null,
“exchange”: null,
“started”: 1466248131.745754,
“timestamp”: 1466248131.837694,
“args”: “[1, 2]”,
“worker”: “celery@space-Vostro-3800”,
“revoked”: false,
“received”: 1466248131.744577,
“exception”: null,
“name”: “tasks.add”,
“succeeded”: 1466248131.837694,
“traceback”: null,
“eta”: null,
“retried”: false,
“runtime”: 0.09263942600227892
}

3. Listing all the tasks sent to workers

curl -X GET http://localhost:5555/api/tasks

output :
{
“81775ebb-7d88-4e91-b580-b3a2d79fe668”: {
“received”: 1466248131.744577,
“revoked”: false,
“name”: “tasks.add”,
“succeeded”: 1466248131.837694,
“clock”: 371,
“started”: 1466248131.745754,
“timestamp”: 1466248131.837694,
“args”: “[1, 2]”,
“retries”: 0,
“failed”: false,
“state”: “SUCCESS”,
“result”: “‘None'”,
“retried”: false,
“kwargs”: “{}”,
“runtime”: 0.09263942600227892,
“sent”: false,
“uuid”: “81775ebb-7d88-4e91-b580-b3a2d79fe668”
},
“50c589e1-b613-496f-af1e-c94c04b163dc”: {
“received”: 1466248086.289584,
“revoked”: false,
“name”: “tasks.add”,
“succeeded”: 1466248086.339701,
“clock”: 313,
“started”: 1466248086.291148,
“timestamp”: 1466248086.339701,
“args”: “[4, 3]”,
“retries”: 0,
“failed”: false,
“state”: “SUCCESS”,
“result”: “‘None'”,
“retried”: false,
“kwargs”: “{}”,
“runtime”: 0.049509562999446644,
“sent”: false,
“uuid”: “50c589e1-b613-496f-af1e-c94c04b163dc”
}
}

4. Terminating a task
curl -X POST -d ‘terminate=True’ http://localhost:5555/api/task/revoke/81775ebb-7d88-4e91-b580-b3a2d79fe668

References :
https://pypi.python.org/pypi/flower
http://flower.readthedocs.io/en/latest/api.html

http://nbviewer.jupyter.org/github/mher/flower/blob/master/docs/api.ipynb