2

I am working on a project in which I need to insert data into Cassandra database. So for that I am using Pelops client.

I have a Multithreaded code which will insert into Cassandra database using Pelops client. And I am using ExecutorService for that.

In my program, each thread will work on some range, like

Thread1 will work on 1 to 20
Thread2 will work on 21 to 40
...
...

Below is the code I have which I am using to insert into Cassandra database-

private static int noOfThreads = 5;
private static int noOfTasks = 100;
private static int startRange = 1;

    public static void main(String[] args) {

        LOG.info("Loading data in Cassandra database..!!");

        ExecutorService service = Executors.newFixedThreadPool(noOfThreads);

        try {
            // queue some tasks
            for (int i = 0, nextId = startRange; i < noOfThreads; i++, nextId += noOfTasks) {

                service.submit(new CassandraTask(nextId, noOfTasks));
            }

            service.shutdown();

            service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            LOG.warn("Threw a Interrupted Exception in" + CNAME + ".PelopsLnPClient: boss told me to stop...Not my fault!!");
        } catch (Exception e) {
            LOG.error("Threw a Exception in" + CNAME + e);
        } 
    }

Below is the CassandraTask class that implements Runnable interface

class CassandraTask implements Runnable {

    private final int id;
    private final int noOfTasks;

    private final String nodes = "localhost";
    private final String thrift_connection_pool = "Test Cluster";
    private final String keyspace = "my_keyspace";
    private final String column_family = "PROFILE_USER";

        public CassandraTask(int nextId, int noOfTasks) {
            this.id = nextId;
            this.noOfTasks = noOfTasks;

        }


        public void run() {

            try {

                cassandraConnection();
                Mutator mutator = Pelops.createMutator(thrift_connection_pool);

                for (int userId = id; userId < id + noOfTasks; userId++) {

                    mutator.writeColumns(column_family, String.valueOf(userId),
                            mutator.newColumnList(
                                    mutator.newColumn("unt", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("rtising", "{\"lv\":[{\"v\":{\"thirdPartyAdsOnhostdomain\":null,\"hostdomainAdsOnThirdParty\":null,\"userId\":" + userId + "},\"cn\":2}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("selling_price_main_cats", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("and_keyword_rules", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("categories_purchased", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("omer_service", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("graphic", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"), 
                                    mutator.newColumn("rite_searches", "{\"lv\":[{\"v\":{\"regSiteId\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}")
                                    ));
                }

                mutator.execute(ConsistencyLevel.ONE);

            } catch (Exception e) {
                System.err.println("Threw a Exception in " + e);
            } finally {
                Pelops.shutdown();
            }
        }

        /**
         * Making a Cassandra Connection by adding nodes
         *
         /
        private void cassandraConnection() {

            Cluster cluster = new Cluster(nodes, 9160);
            Pelops.addPool(thrift_connection_pool, cluster, keyspace);

        }
    }

Whenever I am running the above program, I am getting below exceptions always-

Threw a Exception in java.lang.RuntimeException: exception while registering MBean, com.scale7.cassandra.pelops.pool:type=PooledNode-my_keyspace-localhost

Can anyone help me out with this what wrong I am doing here? I believe there is some minor mistake I am making here? If I am running slowly slowly then I don't get this exception. By slow I means, by putting breakpoint in the code. Very strange somehow.

I am working with Cassandra 1.2.3

Any help will be appreciated.

AKIWEB
  • 19,008
  • 67
  • 180
  • 294
arsenal
  • 23,366
  • 85
  • 225
  • 331
  • instead of this `System.err.println("Threw a Exception in " + e);` can you actually use e.printStackTrace() and post it in your question so we can get more details about the problem? – Lyuben Todorov Apr 09 '13 at 09:30

1 Answers1

1

Which client version are you using? As far as I can see each thread creates a pool (with same name!) to cassandra and each one shutdown the Pelops client.

Move the pool creation in the main class, create just one pool and access it from the threads and never call Pelops.shutdown() until the last thread performed the execute method.

Carlo

Carlo Bertuccini
  • 19,615
  • 3
  • 28
  • 39
  • Thanks carlo for the suggestion. It's working fine after that. But I just wanted to make sure with the way I did is right or not? As it might be possible that I am missing some minor details into that. Can you take a look and let me know if my updated code looks right with the way I am creating cassandra connection and cluster as well. Thanks for the help. – arsenal Apr 09 '13 at 19:51
  • As far as I can see everything is ok -- the way you connect to Cassandra is ok, you could add some other configuration but only if you need ... for instance `String nodes = "localhost"; int port = 9_160; boolean nodeDiscovery = true; Config casconf = new Config(port, true, 0); Cluster cluster= new Cluster(nodes, casconf, nodeDiscovery); Pelops.addPool(thrift_connection_pool, cluster, keyspace);` Ciao, Carlo – Carlo Bertuccini Apr 09 '13 at 21:22
  • As far as the `mutator.newColumn("lmd", String.valueOf(new Date().getTime()))` I want remember you that you already have the timestamp for each column so you may don't need to write it! And more remember that with Pelops you could write long writing `mutator.newColumn("lmd", Bytes.fromLong(System.currenttimemillis())` – Carlo Bertuccini Apr 09 '13 at 21:28
  • Yeah make sense to me. Thanks a lot Carlo for the suggestion. And regarding timestamp thing, where do I have the timestamp for each column? I am not sure about this. – arsenal Apr 09 '13 at 21:30
  • I have one more question related to Cassandra database with `Pelops client`. I have opened a new [question](http://stackoverflow.com/questions/15912501/efficient-way-to-connect-to-cassandra-database-using-pelops-client) for that. In that, I am trying to make a Singleton class for connecting to Cassandra database. Can you take a look and help me out there? – arsenal Apr 09 '13 at 21:32