Sunday, August 25, 2013

Aggregation over Data Partition with Apache PIG with nested Foreach

While working on a Big Data project for a major US retailer, we had to slice and dice the sales transaction table very intensely. Some of the build-in Apache PIG features -- such as aggregation over data partition with nested foreach --made our job easier by helping us to avoid writing custom code.
Here is a very simplified version of transaction table of a retailer.

Transaction IdProduct IdTransaction TypeQuantity
10001S10
10011R3
10022S6

Typically, sales transaction are written in journal entry fashion. Two common types of transactions are Sales and Return. For example, Transaction ID 1000 and 1002 are sales transaction where Transaction Type is “S”. Transaction Id 1001 is return transaction where transaction type is “R”.
Suppose a business report needs to show the Total Sale, Total Return and Net Sale (Total Sale – Total Return) by each Product Id. Example report should look like following table.

Product IDTotal SaleTotal ReturnNet Sale
11037
2606


This report requires pivoting and aggregation over Product ID partition of the entire dataset. In SQL, PARTITION BY and OVER syntax would achieve the goal. In Apache PIG, the same can be done with curly bracket syntax.

Here is the PIG script with nested foreach



/* Load the input csv file */
transaction = LOAD 'tran.csv' USING PigStorage(',') AS (transaction_id:chararray, productId:int, transctionType:chararray,quantity:int);

/* group by the Product Id */
groupByProductId = GROUP  transaction  BY ( productId );

/* Aggregation over Partition with curly bracket. Notice that filter and aggregation 
   functions don’t operate on the entire dataset. They only work on the partitioned 
   dataset by the current Product Id */
   report = FOREACH groupByProductId  {
 /* get all the Sale transactions for the current Product ID */ 
   sale = FILTER transaction BY transctionType == 'S';
 /* get all the Return transactions for the current Product ID */ 
 return = FILTER transaction BY transctionType == 'R';
 GENERATE  group AS productId
  /* get the total sale by current Product Id */
   ,SUM(sale.quantity) AS totalSale
                /* get the total sale by current Product Id. If there is no return record found, put zero */
                 ,(COUNT(return) > 0 ? SUM(return.quantity) : 0 )AS totalReturn 
                /* get the net sale for the current Product Id */ 
                 ,SUM(sale.quantity) - (COUNT(return) > 0 ? SUM(return.quantity) : 0) AS netSale;
         }
/* Store the report in csv file */
store report into ‘report.csv’  USING PigStorage(',');   

Wednesday, June 26, 2013

Json to Avro



import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.io.ByteArrayInputStream;


public class JasonToAvro {

/**
* @param args
* @throws Exception 
*/
public static void main(String[] args) throws Exception {

String json = "{\"username\":\"miguno\",\"tweet\":\"Rock: Nerf paper, scissors is fine.\",\"timestamp\": 1366150681 }";
String schemastr ="{ \"type\" : \"record\", \"name\" : \"twitter_schema\", \"namespace\" : \"com.miguno.avro\", \"fields\" : [ { \"name\" : \"username\", \"type\" : \"string\", \"doc\"  : \"Name of the user account on Twitter.com\" }, { \"name\" : \"tweet\", \"type\" : \"string\", \"doc\"  : \"The content of the user's Twitter message\" }, { \"name\" : \"timestamp\", \"type\" : \"long\", \"doc\"  : \"Unix epoch time in seconds\" } ], \"doc:\" : \"A basic schema for storing Twitter messages\" }";

byte[] avroByteArray = fromJasonToAvro(json,schemastr);

Schema schema = Schema.parse(schemastr);
DatumReader<Genericrecord> reader1 = new GenericDatumReader<Genericrecord>(schema);

Decoder decoder1 = DecoderFactory.get().binaryDecoder(avroByteArray, null);
GenericRecord result = reader1.read(null, decoder1);

System.out.println(result.get("username").toString());
System.out.println(result.get("tweet").toString());
System.out.println(result.get("timestamp"));

}

/**
* @param json
* @param schemastr
* @throws Exception 
*/
static byte[] fromJasonToAvro(String json, String schemastr) throws Exception {

InputStream input = new ByteArrayInputStream(json.getBytes());
DataInputStream din = new DataInputStream(input);

Schema schema = Schema.parse(schemastr);

Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
Object datum = reader.read(null, decoder);


GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

w.write(datum, e);
e.flush();

return outputStream.toByteArray();
}
}