Java – Batch Upload to Database

Usually I/O operations are costly. When I  tried to upload my huge csv file of 1.8 lakhs records one by one,  into a MySQL table , it took almost more than 30 minutes. And obviously it was not an acceptable result. So I had to resort  to batch uploading. On writing data as batches of 1000 records, everything was over in 30-40 seconds.

Lets see the code in detail.

The important thing to remember is that we need to turn off  “auto commit” mode. This means that if this mode is enable, every time record is pushed DB memory, it would automatically get written into tables, nullifying the effects of batch upload. At the same we have to enable it just before writing the records into table once enough number of records are pushed into DB memory or cache, using the commit().

In the the following example, records are read from a csv file named input.csv and its first three fields are written into tables called “batch” in the DB test. MySQL was DB of my choice.

At first auto commit mode is turned off by calling setAutoCommit(false) on DB connection object. Each record will be read and pushed them into DB cache using the addBatch(); 

When the we have 1000 records in the cache ie count variables becomes multiples of 1000, we need to write them into DB . For that we call following method, executeBatch(); Since we have disabled the auto commit mode, we need to enable that as well by calling commit()  on connection object in order to get this data written into the DB.

Gitub link here

BatcUpload.java

public class BatchUpload {

public static void main(String[] args) throws IOException, SQLException {

String line = “”;
String delimiter = “,”;
int count = 1;
DBConnector.createConnection();
Connection dbConn = DBConnector.getDBConnection();
System.out.println(dbConn);
PreparedStatement ps = DBConnector.getPSInstance();
dbConn.setAutoCommit(false);
String inputFile = “input.csv”;
BufferedReader br = new BufferedReader(new FileReader(inputFile));
while ((line = br.readLine()) != null) {
String[] entities = line.split(delimiter);
try {
ps.setString(1,entities[0]);
ps.setString(2,entities[1]);
ps.setString(3,entities[2]);

ps.addBatch();
if(count%1000==0){
ps.executeBatch();
dbConn.commit();
}
count++;

System.out.println(“Records are inserted into DBUSER table!”);

} catch (SQLException e) {

System.out.println(e.getMessage());

}

}

/*** To write the remaining records into DB*/

ps.executeBatch();
dbConn.commit();
dbConn.close();

}
}

DBConnector.java 

public class DBConnector {
private static final String DB_DRIVER = “com.mysql.jdbc.DRIVER”;
private static final String DB_CONNECTION = “jdbc:mysql://localhost:3306/test”;
private static final String DB_USER = “root”;
private static final String DB_PASSWORD = “root”;

private static Connection conn;
private static PreparedStatement ps;

public static void createConnection() {

// conn = null;
System.out.println(“asdasdfafds”);

try {

Class.forName(DB_DRIVER);

} catch(ClassNotFoundException cnf){
System.out.println(“Driver could not be loaded: ” + cnf);
}

try{
conn = DriverManager.getConnection(DB_CONNECTION, DB_USER,DB_PASSWORD);
String query = “INSERT INTO batch”
+ “(userID, username, address) VALUES”
+ “(?,?,?)”;

ps = conn.prepareStatement(query);
}
catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public static Connection getDBConnection() {

return conn;

}

public static PreparedStatement getPSInstance() {
return ps;

}
}

Advertisements

Error processing package nginx

During the installation of nginx using apt repository ie when we use the following command :
apt-get install nginx we may come across some error messages like this,

Job for nginx.service failed. See ‘systemctl status nginx.service’ and ‘journalctl -xn’ for details.
invoke-rc.d: initscript nginx, action “start” failed.
dpkg: error processing package nginx-full (–configure):
subprocess installed post-installation script returned error exit status 1
dpkg: dependency problems prevent configuration of nginx:
nginx depends on nginx-full (>= 1.6.2-5+deb8u4) | nginx-light (>= 1.6.2-5+deb8u4) | nginx-extras (>= 1.6.2-5+deb8u4); however:
Package nginx-full is not configured yet.
Package nginx-light is not installed.
Package nginx-extras is not installed.
nginx depends on nginx-full (<< 1.6.2-5+deb8u4.1~) | nginx-light (<< 1.6.2-5+deb8u4.1~) | nginx-extras (<< 1.6.2-5+deb8u4.1~); however:
Package nginx-full is not configured yet.
Package nginx-light is not installed.
Package nginx-extras is not installed.

dpkg: error processing package nginx (–configure):
dependency problems – leaving unconfigured
Errors were encountered while processing:
nginx-full
nginx
E: Sub-process /usr/bin/dpkg returned an error code (1)

Fix

Stopping the apache service (or current webserver) before we try to install nginx, would solve this issue. Once we get nginx installed, we can start apache service again.

Hence the following steps are supposed to solve this issue.

1. sudo systemctl stop apache2.service
2. sudo apt-get install nginx
3. sudo systemctl start apache2.service

Celery in Production – Supervisor

In this tutorial, we are going to see how celery is set up in production environment, where both workers and other processes such as monitoring tool like flower are to be run continuously. During development stage, both worker and flower processes used to get stopped somehow forcing me to restart the every now and then. A solution for this, as suggested in official site of Celery, is to make use of tools like Supervisor.

In production you will want to run the worker in the background as a daemon and some times there may be a chance of stopping of celery worker automatically then it should be restarted automatically. To do these tasks you need to use the tools provided like supervisord.

Installing Supervisor

First we need to set up python virtual environment. Then run the following command to create a virtual environment for our demo projects :
virtualenv env

Now move to this folder env and activate the this virtual environment:
source bin/activate

(Now we need to install both celery and rabitMQ in this virtual environment using pip,)

Now install supervisor using the following command :

pip install supervisor

This would create a configuration file named echo_supervisord_conf

Now run the following command to generate the config file :
echo_supervisord_conf > supervisord.conf

This would generate a config file, supervisord.conf where lies all the keys for our magic………….
Now move this file to the destination folder where we have written codes for celery. In my case I have a folder named project inside this env folder (which contains files such as tasks.py etc )

Now cd to projects folder.

Now open the file we have just copied, and add the following lines

[program:tasks]
command=celery worker -A tasks –loglevel=INFO
stdout_logfile=celeryd.log
stderr_logfile=celeryd.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600

[program:flower]
command=celery flower -A tasks
stdout_logfile=flower.log
stderr_logfile=flower.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600

Since we need to run both worker and flower processes, they need to be added as two separate tasks as written above. Also we can set these as a group so that they would be started and stopped together. Most of the fields in these are selfexplanatory, however, if you would like to get a clear picture, you can this

Now Starting the demons :
Just go to the project directory (folder where we copied the config file) and open the terminal
and run the following command,
supervisord

This would start both flower and celery worker as demons.

Stopping Supervisord

If we want stop supervisord, just runthe following command,

killall supervisord

Ref :
https://micropyramid.com/blog/celery-with-supervisor/
http://jamie.curle.io/posts/bottle-and-supervisord/
https://serversforhackers.com/monitoring-processes-with-supervisord

HTTP Gateway : Invoking Celery Tasks from Java (Non Python Application) – Part #3

HTTP Gateway is the ideal way for celery tasks invocation and status polling from any non python languages like java. For this, first we have to set up a django application which would handle all the incoming requests for tasks invocation and status polling. As a first step, we need to install python virtual enviroment named env (not mandatory, But I prefer this as this wont affect python packages installed in the machine).

Step1 :
activate the virtual environment and install all required libraries like django,djecelery etc.

Step 2:
copy and paste the django app for http-gateway in this activated virtual environment,env.(I have uploaded the code for this app in github, https://github.com/nishijayaraj/celery-HTTPgateway)

This app contains all tasks in tasks.py file. Now run the app from this activated virtual environment using the following command :
python manage.py runserver

Now with this, our djnago app would start at localhost:8080, to which we can send rest api calls. It is likely to be getting some module not found errors at this point, in case u get them, just try to install the missing packages and libraries in this activated virtual environment. and then run the app again.

The structure of the entire app is described in the image added below :
Screenshot from 2016-07-14 18:21:27

Now we need workers to execute the tasks. So open a terminal and run the following command from the root directory of our application (env/celery-HTTPgateway),

celery worker -A tasks –loglevel=INFO

It would then list all registered tasks there. We would be able to see to tasks,
1.tasks.hello_world
2.UploadTask

Now let’s see how we can invoke a celery task and poll its status using REST apis. These rest apis can be called from any programming languages using appropriate native apis.

For the sake of simplicity, here I am using linux curl command for simulating REST api calls.
( / at end of the url is mandatory for this command to work)

Open a terminal and run the following command,

curl -X GET http://localhost:8000/apply/tasks.hello_world/

Then you would a json data as response,

{“ok”: “true”, “task_id”: “0fc2150e-b321-4cc6-aaef-b1ce9b30e7fe”}

The respose contains the id of the invoked tasks which can be used to track its status.

Status Polling:

curl -X GET http://localhost:8000/0fc2150e-b321-4cc6-aaef-b1ce9b30e7fe/status/

Response:
{“task”: {“status”: “SUCCESS”, “result”: “Hello world………”, “id”: “0fc2150e-b321-4cc6-aaef-b1ce9b30e7fe”}}

These api would display custom states,something at which celery flower apis fails most of the time. A task named UploadTask has been written in tasks.py file with a view to showcase this feature. For this, first we need to invoke the tasks and then track the status.

Task invocation :

curl -X GET http://localhost:8000/apply/tasks.UploadTask/
Response –
{
“ok”: “true”,
“task_id”: “cc51e093-372f-42c1-8344-c1def70c544a”
}

The status checking of above task, can be done :

curl -X GET http://localhost:8000/cc51e093-372f-42c1-8344-c1def70c544a/status/
Response
{
“task”: {
“status”: “PROGRESS”,
“result”: {
“progress”: 0
},
“id”: “cc51e093-372f-42c1-8344-c1def70c544a”
}
}

References :
http://sofc.developer-works.com/article/25718503/How+to+use+Celery+in+Java+web+Application
https://github.com/ask/celery/tree/master/examples/celery_http_gateway

Invoking Celery Tasks from Java Application – Part #2

In the previous post we have seen how to invoke a celery tasks from java application. but it was based on sending messge to  rabbitMQ queue using respective rabbitMQ libraries. But in this post, let’s be be familiar with more convenient way or rather using Rest APIs.

For this, we need to install a celery monitoring tool called flower. Not all version of flower is supposed to serve our purpose. What worked for me is the development version. (the command to install is written below)
pip install https://github.com/mher/flower/zipball/master#egg=flower

So let me assume that we have tasks.py with a task named add

@app.task
def add(x, y):
print x+y

Now run the worker
celery -A tasks worker –loglevel=info

Starting flower
Finally it is time to start flower so that we access/control both tasks and workers using flower REST apis. For that we need to run the following command :

celery flower -A appname (celery flower -A tasks)

Care should be taken to specify the project name in the above command(here tasks) when we start flower because the apis would not work properly otherwise.

Now this can be viewed from the url http://localhost:5555 (or using respective hostname). This has got different tabs to show the status of tasks, workers and so on. So basically what we are going to do is, use the the apis which flower is using for aforementioned feature, directly in our application.

In order to simulate REST api call, throughout this post I am using curl command as I am coming from linux background. This apis can be integrated from any programming languages.

1. Invoking a celery task

curl -X POST -d ‘{“args”:[1,2]}’ http://localhost:5555/api/task/async-apply/tasks.add

this would trigger celery task add with parameters 1 and 2 and would generate an output similar to the following:

{
“task-id”: “81775ebb-7d88-4e91-b580-b3a2d79fe668”,
“state”: “PENDING”
}

So this api would return the task id of the generaed task, which can be used for tracking it whenever we want.

2. Retrieving information regarding a specific task using its id

curl -X GET http://localhost:5555/api/task/info/81775ebb-7d88-4e91-b580-b3a2d79fe668

output :
{
“task-id”: “81775ebb-7d88-4e91-b580-b3a2d79fe668”,
“result”: “‘None'”,
“clock”: 371,
“routing_key”: null,
“retries”: 0,
“failed”: false,
“state”: “SUCCESS”,
“kwargs”: “{}”,
“sent”: false,
“expires”: null,
“exchange”: null,
“started”: 1466248131.745754,
“timestamp”: 1466248131.837694,
“args”: “[1, 2]”,
“worker”: “celery@space-Vostro-3800”,
“revoked”: false,
“received”: 1466248131.744577,
“exception”: null,
“name”: “tasks.add”,
“succeeded”: 1466248131.837694,
“traceback”: null,
“eta”: null,
“retried”: false,
“runtime”: 0.09263942600227892
}

3. Listing all the tasks sent to workers

curl -X GET http://localhost:5555/api/tasks

output :
{
“81775ebb-7d88-4e91-b580-b3a2d79fe668”: {
“received”: 1466248131.744577,
“revoked”: false,
“name”: “tasks.add”,
“succeeded”: 1466248131.837694,
“clock”: 371,
“started”: 1466248131.745754,
“timestamp”: 1466248131.837694,
“args”: “[1, 2]”,
“retries”: 0,
“failed”: false,
“state”: “SUCCESS”,
“result”: “‘None'”,
“retried”: false,
“kwargs”: “{}”,
“runtime”: 0.09263942600227892,
“sent”: false,
“uuid”: “81775ebb-7d88-4e91-b580-b3a2d79fe668”
},
“50c589e1-b613-496f-af1e-c94c04b163dc”: {
“received”: 1466248086.289584,
“revoked”: false,
“name”: “tasks.add”,
“succeeded”: 1466248086.339701,
“clock”: 313,
“started”: 1466248086.291148,
“timestamp”: 1466248086.339701,
“args”: “[4, 3]”,
“retries”: 0,
“failed”: false,
“state”: “SUCCESS”,
“result”: “‘None'”,
“retried”: false,
“kwargs”: “{}”,
“runtime”: 0.049509562999446644,
“sent”: false,
“uuid”: “50c589e1-b613-496f-af1e-c94c04b163dc”
}
}

4. Terminating a task
curl -X POST -d ‘terminate=True’ http://localhost:5555/api/task/revoke/81775ebb-7d88-4e91-b580-b3a2d79fe668

References :
https://pypi.python.org/pypi/flower
http://flower.readthedocs.io/en/latest/api.html

http://nbviewer.jupyter.org/github/mher/flower/blob/master/docs/api.ipynb

 

 

Reading Java property file in Python

Accessing  a java property file in a python code is an easy task. For this we need to install, a python module called pyjavaproperties. (There are many other ways in which we can do this. I prefer this module)

For installing this, please run the following command :

sudo pip install http://pypi.python.org/packages/source/p/pyjavaproperties/pyjavaproperties-0.6.tar.gz

How to use it 

we have a property file named  config.properties and which is as follows,

config.properties
user=Crunchify
company1=Google
company2=eBay
company3=Yahoo
Now open a python ide and add the following lines
from pyjavaproperties import Properties
p = Properties()
p.load(open('test2.properties'))
p.list()     #will all the properties and its valuesprint                                               print p['user']   #prints Cruchify
Ref :                                                                                                   https://pypi.python.org/pypi/pyjavaproperties                                                           https://www.versioneye.com/python/pyjavaproperties/0.6

Uploading and Extracting data from mysql as csv

Extract data as csv from mysql table 

From the termial run the following command (you may need to unformat  this if you are copying this command, the quotes may appear in different format),

mysql -u root -pspace123 -e “SELECT * from employee” mydb > asd.csv                                                                         The above command export data from the table employee of the database mydb into a csv file called asd.csv . By default the delimiter would be tab in this csv file and this can be overidden. Also note that there no space after -p. The password is expected be written along with -p attribute unlike what we do for -u. Otherwise error will be thrown.

asd.csv                                                                                                                                                                                   empno ename
1              ram
1000       Jeena

In this, the header row can be omitted by specifying -N parameter along with the previous command

mysql -N -u root -pspace123 -e “SELECT * from employee” mydb > asd.csv

asd.csv
1               ram
1000        Jeena

Upload csv into mysql table                                                                                                                                                        For this, the only requirement is that we need to have a table in the database with the same and structure as that of the csv file we are going to upload. In my case I have an employee.csv file and table with same name employee and same structure.

employee.csv
1;ram
1000;Jeena

Now type the following command from the terminal,

mysqlimport -u root -pspace123 –fields-terminated-by=’;’ –local mydb employee.csv

This would upload this csv file to the table employee and that can be viewed    from mysql console

mysql> select * from employee;
Screenshot from 2016-06-04 12:52:07

Upload csv file into Remote Table

mysqlimport -h remote-host-name –port 3306 -u username -ppassword –fields-terminated-by=’;’ –local remote-db-name filename.csv-on-local-machine

mysqlimport -h example.com –port 3306 -u testUser -ptestUser123 –fields-terminated-by=’;’ –local demo test.csv