Sunday, August 25, 2013

Aggregation over Data Partition with Apache PIG with nested Foreach

While working on a Big Data project for a major US retailer, we had to slice and dice the sales transaction table very intensely. Some of the build-in Apache PIG features -- such as aggregation over data partition with nested foreach --made our job easier by helping us to avoid writing custom code.
Here is a very simplified version of transaction table of a retailer.

Transaction IdProduct IdTransaction TypeQuantity
10001S10
10011R3
10022S6

Typically, sales transaction are written in journal entry fashion. Two common types of transactions are Sales and Return. For example, Transaction ID 1000 and 1002 are sales transaction where Transaction Type is “S”. Transaction Id 1001 is return transaction where transaction type is “R”.
Suppose a business report needs to show the Total Sale, Total Return and Net Sale (Total Sale – Total Return) by each Product Id. Example report should look like following table.

Product IDTotal SaleTotal ReturnNet Sale
11037
2606


This report requires pivoting and aggregation over Product ID partition of the entire dataset. In SQL, PARTITION BY and OVER syntax would achieve the goal. In Apache PIG, the same can be done with curly bracket syntax.

Here is the PIG script with nested foreach



/* Load the input csv file */
transaction = LOAD 'tran.csv' USING PigStorage(',') AS (transaction_id:chararray, productId:int, transctionType:chararray,quantity:int);

/* group by the Product Id */
groupByProductId = GROUP  transaction  BY ( productId );

/* Aggregation over Partition with curly bracket. Notice that filter and aggregation 
   functions don’t operate on the entire dataset. They only work on the partitioned 
   dataset by the current Product Id */
   report = FOREACH groupByProductId  {
 /* get all the Sale transactions for the current Product ID */ 
   sale = FILTER transaction BY transctionType == 'S';
 /* get all the Return transactions for the current Product ID */ 
 return = FILTER transaction BY transctionType == 'R';
 GENERATE  group AS productId
  /* get the total sale by current Product Id */
   ,SUM(sale.quantity) AS totalSale
                /* get the total sale by current Product Id. If there is no return record found, put zero */
                 ,(COUNT(return) > 0 ? SUM(return.quantity) : 0 )AS totalReturn 
                /* get the net sale for the current Product Id */ 
                 ,SUM(sale.quantity) - (COUNT(return) > 0 ? SUM(return.quantity) : 0) AS netSale;
         }
/* Store the report in csv file */
store report into ‘report.csv’  USING PigStorage(',');