Monday, October 31, 2016

Microservice Framework - Part 1

Mesos/Marathon, Docker, Weave and Flocker on Vagrant 


Github:     https://github.com/reza-rahim/microservice

Youtube:  https://youtu.be/-AMdZjGXMCo


Idempotence (/ˌaɪdᵻmˈpoʊtəns/ eye-dəm-poh-təns)[1] is the property of certain operations in mathematics and computer science, that can be applied multiple times without changing the result beyond the initial application. -- Wikipedia
All the build and deployment scripts of the framework are Idempotent. Means that a script only change the system to get to the desired state. If the desired state is present, the script won't changed the system. As a result, we can run any script anytime without casuing any unwanted side-effect.
  1. Docker: Docker is a tool that allows developers, sys-admins etc. to easily deploy their applications in a sandbox (called containers) to run on the host operating system i.e. Linux.
  2. Mesos: Apache Mesos is a centralised fault-tolerant cluster manager. It’s designed for distributed computing environments to provide resource isolation and management across a cluster of slave nodes.
    A Mesos cluster is made up of four major components:
    1. ZooKeepers
    2. Mesos masters
    3. Mesos slaves
    4. Frameworks (such as Marathon )
  3. Marathon: Mesos only provides the basic “kernel” layer of your cluster. Marathon framework is the equivalent of the Linux upstart or init daemons, designed for long-running applications.
    Marathon framework is repossible for scheduling Docker container and keep them running on Mesos Cluster.
  4. Weave: Weave Net creates a virtual network that connects Docker containers across multiple hosts and enables their automatic discovery. With Weave Net, portable microservices-based applications consisting of multiple containers can run anywhere: on one host, multiple hosts or even across cloud providers and data centers.
  5. Flocker: Flocker is an open-source container data volume manager for Dockerized applications. It helps to move the external persistence volume with the container. For example, if a Docker Container moves from one host another host, Flocker would re-mount the existing volume to the newly provisioned container. So the statefull containers --like database -- can be moved with ease.



There are four vagrant machines
  1. 10.0.15.10(mgmt): mgmt acts as cluster management node. It is used for running various Ansible build and deploy scripts such as building cluster, building Docker images or deploying Marathon jobs.
  2. 10.0.15.11(node1): node1 holds Zookeeper, Mesos Master and Marathon. It serves as a Mesos slave as well. It also runs special Docker Container called registry. The registry serves as local Docker registry. All application Docker containers get pushed into the local registry before get deployed on the cluster.
  3. 10.0.15.12(node2) node3serves only as a Mesos slaves. It runs application containers.
  4. 10.0.15.13(node3) node serves only as a Mesos slaves. It runs application containers.



For example, let's look nodeapp build process:
  • ~/ansible/mesos-app/nodeapp/Dockerfile contains the logic to build the nodeapp Docker container.
  • ~/ansible/mesos-app/nodeapp/ansible/roles/deploy/files/application.json.j2 contains the template for Marathon job. For information how to define a Marathon job please refer to mesosphere.github.io.
  • ~/ansible/mesos-app/nodeapp/deploy_with_docker_build.sh would build a new Docker image, push the image to local docker registry and deploy the Marathon job
  • ~/ansible/mesos-app/nodeapp/deploy_without_docker_build.sh only deploy the Marathon job. It's helpfull when we are only changing the deploymen parameter such a number of instances or amount of memory for the container.


Weave interconnect Docker Containers across Multiple Hosts by creating Sofware Define Network. For example, nodeapp container can be accessed from nginx container by using nodeapp.weave.local DNS name.
Flocker is configured to use ZFS file system. In a cloud environment, the system should be using network persistence volume such as EBS or Ceph. When we move the mongo db Docker container form node2 to node3, Flocker would move all the data from node2 using ZFS replication feature.

Logging

All Docker containers are configured to use rsyslog driver. Each container goes goes to
/var/log/docker//docker.log.
Each local rsyslog is set up to forward the local Docker log to central rsyslog server for log aggregation. For this application node1 is set to be the cental log server. On node1 the centralized log could be found at:
/var/log/remote/
For prodcution system, log could be aggregated using ELK Stack
Mesos and Marathon related error could be found on the Mesos UI.
mesos-master, mesos-slave and marathon runs as systemd service. journalctl can be used view their log files.
journalctl -u mesos-master
journalctl -u mesos-slave
journalctl -u marathon

Debuging

log in to node1 from mgmt
   ssh node1
   sudo su
   docker ps
CONTAINER ID        IMAGE                                          COMMAND                  CREATED             STATUS              PORTS                           NAMES
fa942f9ca616       10.0.15.11:5000/application/app/nginx:1.10.0   "/w/w nginx -g 'daemo"   29 minutes ago      Up 29 minutes       443/tcp, 0.0.0.0:9080->80/tcp   mesos-f5127b14-55b2-491b-af9f-004e2d644890-S2.8b044821-4fff-49be-934c-cc5331d3b7a5
e8e2bd1f14bf        registry:2                                     "/entrypoint.sh /etc/"   About an hour ago   Up About an hour                                    mesos-f5127b14-55b2-491b-af9f-004e2d644890-S2.20cdaa1c-f0e7-470a-a9d9-69b2da047900
0281332620aa        weaveworks/weaveexec:1.7.2                     "/home/weave/weavepro"   About an hour ago   Up About an hour                                    weaveproxy
a259f83ddd41        weaveworks/weave:1.7.2                         "/home/weave/weaver -"   About an hour ago   Up About an hour                                    weave
fa942f9ca616 is the current docker process id for nginx container. Now let's log into the nginx Docker container.
docker exec -it fa942f9ca616 bash
your promt should change to:
root@nginx:/#
Now we are at the contianer propmt, issue the following command ip :
ip a
15: ethwe@if16:  mtu 1410 qdisc noqueue state UP group default
    link/ether ea:f7:30:4c:a2:93 brd ff:ff:ff:ff:ff:ff link-netnsid 0
    inet 10.2.0.1/16 scope global ethwe
       valid_lft forever preferred_lft forever
    inet6 fe80::e8f7:30ff:fe4c:a293/64 scope link
       valid_lft forever preferred_lft forever
10.2.0.1 is ip for nginx container assigned by weave net.
Now ping the nodeapp container.
for i inseq 1 5; do ping -c 2 nodeapp; done
--- nodeapp.weave.local ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 999ms
rtt min/avg/max/mdev = 0.472/0.489/0.506/0.017 ms
PING nodeapp.weave.local (10.2.192.0) 56(84) bytes of data.
64 bytes from nodeapp.weave.local (10.2.192.0): icmp_seq=1 ttl=64 time=0.384 ms
64 bytes from nodeapp.weave.local (10.2.192.0): icmp_seq=2 ttl=64 time=0.536 ms

--- nodeapp.weave.local ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 999ms
rtt min/avg/max/mdev = 0.384/0.460/0.536/0.076 ms
PING nodeapp.weave.local (10.2.128.1) 56(84) bytes of data.
64 bytes from nodeapp.weave.local (10.2.128.1): icmp_seq=1 ttl=64 time=0.472 ms
64 bytes from nodeapp.weave.local (10.2.128.1): icmp_seq=2 ttl=64 time=0.543 ms

Ping is returning multiple ip (10.2.192.0, 10.2.128.1). Because there are multiple instances of nodeapp container and weave DNS is load balancing between them.
Let’s look at the nginx configuration file
cat /etc/nginx/conf.d/default.conf

server {
    listen 80;

    server_name localhost;

    location / {
        proxy_pass http://nodeapp:3000;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_cache_bypass $http_upgrade;
    }
}
let's curl the nodejs application running on port 3000 on nodeapp container.
curl http://nodeapp:3000

flocker

log back out to mgmt node
use flockerctl to see all the volumes managed by Flocker.
vagrant@mgmt:~$ flockerctl list
/usr/local/bin/flockerctl: 3: read: Illegal option -d
DATASET                                SIZE    METADATA                             STATUS         SERVER
62748416-3d6a-49f2-8583-52ea07d60700   1.00G   maximum_size=1073741824,name=   attached ✅   03e9e291 (10.0.15.12)


Use Vagrant 1.8.6 or up

Instruction:

  1. Clone the git
    git clone https://github.com/reza-rahim/microservice
  2. Change the dir
    cd microservice
  3. Bring Vagrant machines up
    vagrant up
  4. Log in to mgmt vagrant box
    vagrant ssh mgmt
  5. Build the mesos/marathon cluster
    ./mesos_build_cluster.sh
    1. mesos ui: http://10.0.15.11:5050/
    2. matathon: http://10.0.15.11:8080/
  6. Deploy the nginx, Node.js and Mongo Db Application
    source /etc/bash.bashrc
    
    ./mesos_deploy_app.sh
    
    1. Application UI: http://10.0.15.11:9080/
    At this point, you can move to Overview of the Framework. After understanding the system, you can comeback and perform the next three steps
  7. Scale up Node.js app from 2 instance to 3 instance
    ./mesos_deploy_scaleup_app.sh
  8. Move the Mongo DB from 10.0.15.12 to 10.0.15.13, the data movement is done by flocker with ZFS file system.
    ./mesos_move_db.sh
  9. Start the weave scope
    ./mesos_weave_scope.sh
    1. Weave Scope UI: http://10.0.15.10:4040

Saturday, January 11, 2014

Linear Regression vs Logistic Regression

Linear Regression is used to establish a relationship between Dependent and Indipendent variables, which is useful in estimating the resultant dependent variable in case indipendent variable change. For example -

Using a Linear Regression, the relationship between Rain (R) and Umbrella Sales (U) is found to be - U = 2R + 5000

This equation says that for every 1mm of Rain, there is a demand for 5002 umbrellas. So, using Simple Regression, you can estimate the value of your variable.

Logistic Regression on the other hand is used to ascertain the probability of an event. And this event is captured in binary format, i.e. 0 or 1.

Example - I want to ascertain if a customer will buy my product or not. For this, I would run a Logistic Regression on the (relevant) data and my dependent variable would be a binary variable (1=Yes; 0=No).

In terms of graphical representation, Linear Regression gives a linear line as an output, once the values are plotted on the graph. Whereas, the logistic regression gives an S-shaped line.

Sunday, August 25, 2013

Aggregation over Data Partition with Apache PIG with nested Foreach

While working on a Big Data project for a major US retailer, we had to slice and dice the sales transaction table very intensely. Some of the build-in Apache PIG features -- such as aggregation over data partition with nested foreach --made our job easier by helping us to avoid writing custom code.
Here is a very simplified version of transaction table of a retailer.

Transaction IdProduct IdTransaction TypeQuantity
10001S10
10011R3
10022S6

Typically, sales transaction are written in journal entry fashion. Two common types of transactions are Sales and Return. For example, Transaction ID 1000 and 1002 are sales transaction where Transaction Type is “S”. Transaction Id 1001 is return transaction where transaction type is “R”.
Suppose a business report needs to show the Total Sale, Total Return and Net Sale (Total Sale – Total Return) by each Product Id. Example report should look like following table.

Product IDTotal SaleTotal ReturnNet Sale
11037
2606


This report requires pivoting and aggregation over Product ID partition of the entire dataset. In SQL, PARTITION BY and OVER syntax would achieve the goal. In Apache PIG, the same can be done with curly bracket syntax.

Here is the PIG script with nested foreach



/* Load the input csv file */
transaction = LOAD 'tran.csv' USING PigStorage(',') AS (transaction_id:chararray, productId:int, transctionType:chararray,quantity:int);

/* group by the Product Id */
groupByProductId = GROUP  transaction  BY ( productId );

/* Aggregation over Partition with curly bracket. Notice that filter and aggregation 
   functions don’t operate on the entire dataset. They only work on the partitioned 
   dataset by the current Product Id */
   report = FOREACH groupByProductId  {
 /* get all the Sale transactions for the current Product ID */ 
   sale = FILTER transaction BY transctionType == 'S';
 /* get all the Return transactions for the current Product ID */ 
 return = FILTER transaction BY transctionType == 'R';
 GENERATE  group AS productId
  /* get the total sale by current Product Id */
   ,SUM(sale.quantity) AS totalSale
                /* get the total sale by current Product Id. If there is no return record found, put zero */
                 ,(COUNT(return) > 0 ? SUM(return.quantity) : 0 )AS totalReturn 
                /* get the net sale for the current Product Id */ 
                 ,SUM(sale.quantity) - (COUNT(return) > 0 ? SUM(return.quantity) : 0) AS netSale;
         }
/* Store the report in csv file */
store report into ‘report.csv’  USING PigStorage(',');   

Wednesday, June 26, 2013

Json to Avro



import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.io.ByteArrayInputStream;


public class JasonToAvro {

/**
* @param args
* @throws Exception 
*/
public static void main(String[] args) throws Exception {

String json = "{\"username\":\"miguno\",\"tweet\":\"Rock: Nerf paper, scissors is fine.\",\"timestamp\": 1366150681 }";
String schemastr ="{ \"type\" : \"record\", \"name\" : \"twitter_schema\", \"namespace\" : \"com.miguno.avro\", \"fields\" : [ { \"name\" : \"username\", \"type\" : \"string\", \"doc\"  : \"Name of the user account on Twitter.com\" }, { \"name\" : \"tweet\", \"type\" : \"string\", \"doc\"  : \"The content of the user's Twitter message\" }, { \"name\" : \"timestamp\", \"type\" : \"long\", \"doc\"  : \"Unix epoch time in seconds\" } ], \"doc:\" : \"A basic schema for storing Twitter messages\" }";

byte[] avroByteArray = fromJasonToAvro(json,schemastr);

Schema schema = Schema.parse(schemastr);
DatumReader<Genericrecord> reader1 = new GenericDatumReader<Genericrecord>(schema);

Decoder decoder1 = DecoderFactory.get().binaryDecoder(avroByteArray, null);
GenericRecord result = reader1.read(null, decoder1);

System.out.println(result.get("username").toString());
System.out.println(result.get("tweet").toString());
System.out.println(result.get("timestamp"));

}

/**
* @param json
* @param schemastr
* @throws Exception 
*/
static byte[] fromJasonToAvro(String json, String schemastr) throws Exception {

InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);

Schema schema = Schema.parse(schemastr);

Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);


GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

w.write(datum, e);
e.flush();

return outputStream.toByteArray();
}
}

Sunday, May 16, 2010

Deep Copy of java object graph

The following method uses serialization to make deep copies and avoid extensive manual editing or extending of classes. Need to make sure that all classes in the object's graph are serializable.

import java.io.*;

public class ObjectCloner {
 // so that nobody can accidentally create an ObjectCloner object
 private ObjectCloner() {
 }

 // returns a deep copy of an object
 static public Object deepCopy(Object oldObj) throws Exception {
  ObjectOutputStream oos = null;
  ObjectInputStream ois = null;
  try {
   ByteArrayOutputStream bos = new ByteArrayOutputStream(); // A
   oos = new ObjectOutputStream(bos); // B
   // serialize and pass the object
   oos.writeObject(oldObj); // C
   oos.flush(); // D
   ByteArrayInputStream bin = new ByteArrayInputStream(bos
     .toByteArray()); // E
   ois = new ObjectInputStream(bin); // F
   // return the new object
   return ois.readObject(); // G
  } catch (Exception e) {
   System.out.println("Exception in ObjectCloner = " + e);
   throw (e);
  } finally {
   oos.close();
   ois.close();
  }
 }

}

Array Copy with Java 6

@Test
 public void genericArrayCopyOf() {
    Number[] source = { new Double(5.0), new Double(10.0) };
    Number[] target = Arrays.copyOf(source, source.length);
    assertEquals(source, target);
 }
 
 @Test
 public void copyOfWithRange() {
    String[] source = { "0", "1", "2", "3", "4" };
    String[] target = Arrays.copyOfRange(source, 2, 4);
    assertEquals(new String[] { "2", "3" }, target);
 }

 @Test
 public void genericArrayCopyOfWithNewType() {
    Number[] source = { new Double(5.0), new Double(10.0) };
    Double[] target = Arrays.copyOf(source, source.length, Double[].class);
    assertEquals(source, target);
 }

Tuesday, May 11, 2010

Parallel class hierarchies with Java Generic


import java.util.ArrayList;
import java.util.Collection;

/*
 * Super class for Habitat hierarchy 
 */
public abstract class Habitat <A extends Animal> {
 
 /*
  * A generic collection that can hold Animal 
  * or any subclass of animal 
  */
 Collection<A> collection = new ArrayList<A>();
 
 /*
  * add an Inhabitant to the collection.
  * should be overridden by subclass
  */
  public abstract  void addInhabitant( A animal);
}
/*
 * Aquarium class inherit the collection from 
 * Habitat superclass. But limit the collection 
 * to Fish type. 
 */
public class Aquarium extends Habitat <Fish> 
{
 
 /*
  * (non-Javadoc)
  * @see Habitat#addInhabitant(Animal)
  */
 @Override
 public void addInhabitant( Fish fish) { 
  collection.add(fish);
     System.out.println(Aquarium.class); 
   } 
}

/*
 * Super class for Animal hierarchy
 */
public abstract class Animal {

}
public class Fish extends Animal {

}
public class Test {

 /**
  * @param args
  */
 public static void main(String[] args) {
  Animal animal = new Fish();
  Fish fish = new Fish();
  //new Aquarium().addInhabitant(Animal); -- would cause compiler error
  new Aquarium().addInhabitant(fish);
 }

}