pig tutorial 3 – Flatten, GROUP, COGROUP, CROSS, DISTINCT, FILTER, FOREACH, LIMIT, Load, ORDER, SAMPLE, SPLIT, STORE, STREAM and UNION Operators

Flatten Operator

The FLATTEN operator which is an arithmetic operator looks like a UDF syntactically, but it is actually an operator that changes the structure of tuples and bags in a way that a UDF cannot. Flatten un-nests tuples as well as bags. The idea is the same, but the operation and result is different for each type of structure.

For tuples, flatten substitutes the fields of a tuple in place of the tuple. For example, consider a relation that has a tuple of the form (a, (b, c)). The expression GENERATE $0, flatten($1), will cause that tuple to become (a, b, c).

Lets take an example

Input data

Here the first two fields are split by comma and the third field by |^

12345,NDATEST|^/shelf=0/slot/port=27
12367,NDATEST|^/shelf=0/slot/port=13
54545,NDATEST|^/shelf=0/slot/port=17
50936,NDATEST|^/shelf=0/slot/port=18


A = LOAD '/user/home/blogdata' using PigStorage(',') AS (service_id:chararray , neid_portid:chararray );

B = foreach A generate service_id,STRSPLIT(neid_portid,'\\|\\^');

dump B;

The above code prints the output has below but what we need is a flattened output like (12345,NDATEST,/shelf=0/slot/port=27)


(12345,(NDATEST,/shelf=0/slot/port=27))
(12367,(NDATEST,/shelf=0/slot/port=13))
(54545,(NDATEST,/shelf=0/slot/port=17))
(50936,(NDATEST,/shelf=0/slot/port=18))

So achieve this we can use the flatten operator as below

B = foreach A generate service_id,FLATTEN(STRSPLIT(neid_portid,'\\|\\^'));

Now the output is as expected


(12345,NDATEST,/shelf=0/slot/port=27)
(12367,NDATEST,/shelf=0/slot/port=13)
(54545,NDATEST,/shelf=0/slot/port=17)
(50936,NDATEST,/shelf=0/slot/port=18)

Relational Operators
GROUP

Groups the data in one or multiple relations. GROUP is the same as COGROUP. For readability, programmers usually use GROUP when only one relation is involved and COGROUP with multiple relations are involved.

alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING ‘collected’] [PARALLEL n];

collected -Allows for more efficient computation of a group if the loader guarantees that the data for the same key is continuous and is given to a single map. As of this release, only the Zebra loader makes this guarantee. The efficiency is achieved by performing the group operation in map rather than reduce (see Zebra and Pig). This feature cannot be used with the COGROUP operator.

PARALLEL = Increase the parallelism of a job by specifying the number of reduce tasks, n. The default value for n is 1 (one reduce task). Parallel only affects the number of reduce tasks. Map parallelism is determined by the input file, one map for each HDFS block.
If you don’t specify parallel, you still get the same map parallelism but only one reduce task.

Input Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6

Below is an example of using group


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

B = group A all;

The result will be as below


(all,{(6,NDATEST,/shelf=0/slot/port=6),(4,NDATEST,/shelf=0/slot/port=5),(4,NDATEST,/shelf=0/slot/port=4),(3,NDATEST,/shelf=0/slot/port=3),(2,NDATEST,/shelf=0/slot/port=2),(1,NDATEST,/shelf=0/slot/port=1)})

Lets flatten the result

C = foreach B generate $0,flatten($1);

The result will be as below

(all,6,NDATEST,/shelf=0/slot/port=6)
(all,4,NDATEST,/shelf=0/slot/port=5)
(all,4,NDATEST,/shelf=0/slot/port=4)
(all,3,NDATEST,/shelf=0/slot/port=3)
(all,2,NDATEST,/shelf=0/slot/port=2)
(all,1,NDATEST,/shelf=0/slot/port=1)
Grouping using multiple fields

B = group A by (service_id,neid);


Result will be as below

((1,NDATEST),{(1,NDATEST,/shelf=0/slot/port=1)})
((2,NDATEST),{(2,NDATEST,/shelf=0/slot/port=2)})
((3,NDATEST),{(3,NDATEST,/shelf=0/slot/port=3)})
((4,NDATEST),{(4,NDATEST,/shelf=0/slot/port=5),(4,NDATEST,/shelf=0/slot/port=4)})
((6,NDATEST),{(6,NDATEST,/shelf=0/slot/port=6)})

flattening it will give the below result

C = foreach B generate flatten($1);

(1,NDATEST,/shelf=0/slot/port=1)
(2,NDATEST,/shelf=0/slot/port=2)
(3,NDATEST,/shelf=0/slot/port=3)
(4,NDATEST,/shelf=0/slot/port=5)
(4,NDATEST,/shelf=0/slot/port=4)
(6,NDATEST,/shelf=0/slot/port=6)

COGROUP

COGROUP is the same as GROUP. For readability, programmers usually use GROUP when only one relation is involved and COGROUP with multiple relations re involved.

CROSS

Computes the cross product of two or more relations.

alias = CROSS alias, alias [, alias …] [PARALLEL n];

Use the CROSS operator to compute the cross product (Cartesian product) of two or more relations.CROSS is an expensive operation and should be used sparingly.

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6

Input Metric Data

1,100,600
2,200,700
3,300,800

Lets run cross on the above datasets


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

B = LOAD 'metric.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

C = cross A,B;

Output will be

(6,NDATEST,/shelf=0/slot/port=6,3,300,800)
(6,NDATEST,/shelf=0/slot/port=6,2,200,700)
(6,NDATEST,/shelf=0/slot/port=6,1,100,600)
(4,NDATEST,/shelf=0/slot/port=5,3,300,800)
(4,NDATEST,/shelf=0/slot/port=5,2,200,700)
(4,NDATEST,/shelf=0/slot/port=5,1,100,600)
(4,NDATEST,/shelf=0/slot/port=4,3,300,800)
(4,NDATEST,/shelf=0/slot/port=4,2,200,700)
(4,NDATEST,/shelf=0/slot/port=4,1,100,600)
(3,NDATEST,/shelf=0/slot/port=3,3,300,800)
(3,NDATEST,/shelf=0/slot/port=3,2,200,700)
(3,NDATEST,/shelf=0/slot/port=3,1,100,600)
(2,NDATEST,/shelf=0/slot/port=2,3,300,800)
(2,NDATEST,/shelf=0/slot/port=2,2,200,700)
(2,NDATEST,/shelf=0/slot/port=2,1,100,600)
(1,NDATEST,/shelf=0/slot/port=1,3,300,800)
(1,NDATEST,/shelf=0/slot/port=1,2,200,700)
(1,NDATEST,/shelf=0/slot/port=1,1,100,600)

DISTINCT

Use the DISTINCT operator to remove duplicate tuples in a relation. DISTINCT does not preserve the original order of the contents (to eliminate duplicates, Pig must first sort the data). You cannot use DISTINCT on a subset of fields. To do this, use FOREACH … GENERATE to select the fields, and then use DISTINCT.

alias = DISTINCT alias [PARALLEL n];

Input Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6
3,NDATEST,/shelf=0/slot/port=3


A = LOAD 'service-2.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

B = DISTINCT A:

dump A;

output will be

(1,NDATEST,/shelf=0/slot/port=1)
(2,NDATEST,/shelf=0/slot/port=2)
(3,NDATEST,/shelf=0/slot/port=3)
(4,NDATEST,/shelf=0/slot/port=4)
(4,NDATEST,/shelf=0/slot/port=5)
(6,NDATEST,/shelf=0/slot/port=6)

Distinct on subset of fields

Input data

6,NDATEST,/shelf=0/slot/port=6
1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST2,/shelf=0/slot/port=4
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST2,/shelf=0/slot/port=5
3,NDATEST,/shelf=0/slot/port=3

We will use top function to achieve this TOP(topN,column,relation) . To this function, as inputs, we have to pass a relation, the number of tuples we want, and the column name whose values are being compared. This function will return a bag containing the required columns.


D = GROUP A BY ($1);

(NDATEST,{(3,NDATEST,/shelf=0/slot/port=3),(3,NDATEST,/shelf=0/slot/port=3),(3,NDATEST,/shelf=0/slot/port=3),(2,NDATEST,/shelf=0/slot/port=2),(1,NDATEST,/shelf=0/slot/port=1),(6,NDATEST,/shelf=0/slot/port=6)})
(NDATEST2,{(4,NDATEST2,/shelf=0/slot/port=5),(4,NDATEST2,/shelf=0/slot/port=4)})

distinct_row = FOREACH D {

result=TOP(1,2,$1);
GENERATE flatten(result);

}

output

(6,NDATEST,/shelf=0/slot/port=6)
(4,NDATEST2,/shelf=0/slot/port=5)

FILTER

Selects tuples from a relation based on some condition.Use the FILTER operator to work with tuples or rows of data if you want to work with columns of data, use the FOREACH …GENERATE operation. FILTER is commonly used to select the data that you want or conversely, to filter out the data you don’t want.

alias = FILTER alias BY expression;

Input data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:int , neid:chararray,portid:chararray );

B = FILTER A BY service_id == 6;

output

(6,NDATEST,/shelf=0/slot/port=6)

FOREACH

Generates data transformations based on columns of data.

alias = FOREACH { gen_blk | nested_gen_blk } [AS schema];

alias = The name of relation (outer bag);

gen_blk = FOREACH … GENERATE used with a relation (outer bag)

Example

C = foreach B generate $0,flatten($1);

nested_gen_blk = FOREACH … GENERATE used with a inner bag.

Example


distinct_row = FOREACH D {
result=TOP(1,2,$1);
GENERATE flatten(result);

}

AS schema

A = LOAD 'service.txt' using PigStorage(',') ;

C = foreach A generate $1 AS service_id:int,$2 AS neid:chararray,$3 AS portid:chararray;

describe C;

C: {service_id: int,neid: chararray,portid: chararray}

LIMIT

Use the LIMIT operator to limit the number of output tuples. If the specified number of output tuples is equal to or exceeds the number of tuples in the relation, the output will include all tuples in the relation.There is no guarantee which tuples will be returned, and the tuples that are returned can change from one run to the next. A particular set of tuples can be requested using the ORDER operator followed by LIMIT.The LIMIT operator allows Pig to avoid processing all tuples in a relation. In most cases a query that uses LIMIT will run more efficiently than an identical query that does not use LIMIT. It is always a good idea to use limit if you can.

alias = LIMIT alias n;

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray ) ;

B = LIMIT A 2;

dump B;

output will be

(1,NDATEST,/shelf=0/slot/port=1)
(2,NDATEST,/shelf=0/slot/port=2)

Load Operator

Use the LOAD operator to load data from the file system.

A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

Note that, if no schema is specified, the fields are not named and all fields default to type bytearray.

describe A;

A: {service_id: chararray,neid: chararray,portid: chararray}

ORDER

Sorts a relation based on one or more fields. In Pig, relations are unordered. If you order relation A to produce relation X (X = ORDER A BY * DESC) relations A and X still contain the same thing and if you retrieve the contents of relation X (DUMP X;) they are guaranteed to be in the order you specified however if you further process relation X (Y = FILTER X BY $0 > 1;) there is no guarantee that the contents will be processed in the order you originally specified (descending).

alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray ) ;

B = ORDER A BY service_id DESC;

dump B;

(6,NDATEST,/shelf=0/slot/port=6)
(4,NDATEST,/shelf=0/slot/port=5)
(4,NDATEST,/shelf=0/slot/port=4)
(3,NDATEST,/shelf=0/slot/port=3)
(2,NDATEST,/shelf=0/slot/port=2)
(1,NDATEST,/shelf=0/slot/port=1)

SAMPLE

Use the SAMPLE operator to select a random data sample with the stated sample size. SAMPLE is a probabalistic operator; there is no guarantee that the exact same number of tuples will be returned for a particular sample size each time the operator is used.

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray ) ;

B = SAMPLE A 0.5;

dump B;

(1,NDATEST,/shelf=0/slot/port=1)
(2,NDATEST,/shelf=0/slot/port=2)

SPLIT

Partitions a relation into two or more relations.Use the SPLIT operator to partition the contents of a relation into two or more relations based on some expression. Depending on the conditions stated in the expression a tuple may be assigned to more than one relation or a tuple may not be assigned to any relation.

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:int , neid:chararray,portid:chararray ) ;

SPLIT A INTO X IF service_id<3, Y IF service_id == 4, Z IF (service_id<7 AND service_id>4);

dump X;

(1,NDATEST,/shelf=0/slot/port=1)
(2,NDATEST,/shelf=0/slot/port=2)

dump Y;

(4,NDATEST,/shelf=0/slot/port=4)
(4,NDATEST,/shelf=0/slot/port=5)

dump Z;

(6,NDATEST,/shelf=0/slot/port=6)

STORE

Stores or saves results to the file system. Use the STORE operator to run (execute) Pig Latin statements and save results to the file system.

In the below example data is stored using PigStorage and the comma is used as the field delimiter.

STORE A INTO 'myoutput' USING PigStorage(',');

In the below example data is stored using HCatStorer to store the data in hive partition and the partition value is passed in the constructor.

store A_valid_data into '${service_table_name}' USING org.apache.hive.hcatalog.pig.HCatStorer('date=${date}');

STREAM

Use the STREAM operator to send data through an external script or program. Multiple stream operators can appear in the same Pig script. The stream operators can be adjacent to each other or have other operations in between.

A = LOAD 'data';

B = STREAM A THROUGH 'stream.pl -n 5';

UNION

Computes the union of two or more relations. Use the UNION operator to merge the contents of two or more relations. The UNION operator does not preserve the order of tuples. Both the input and output relations are interpreted as unordered bags of tuples and it does not ensure that all tuples adhere to the same schema or that they have the same number of fields. In a typical scenario, however, this should be the case therefore, it is the user's responsibility to either ensure that the tuples in the input relations have the same schema or be able to process varying tuples in the output relation and also it does not eliminate duplicate tuples.

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6

Input Metric Data

1,100,600
2,200,700
3,300,800
11,334,656


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray ) ;

B = LOAD 'metric.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

X = UNION A, B;

output will be

(1,NDATEST,/shelf=0/slot/port=1)
(2,NDATEST,/shelf=0/slot/port=2)
(3,NDATEST,/shelf=0/slot/port=3)
(4,NDATEST,/shelf=0/slot/port=4)
(4,NDATEST,/shelf=0/slot/port=5)
(6,NDATEST,/shelf=0/slot/port=6)
(1,100,600)
(2,200,700)
(3,300,800)
(11,334,656)