pig tutorial 4 – inner join, outer join, replicated join, skewed join

Inner JOIN

Use the JOIN operator to perform an inner, equijoin join of two or more relations based on common field values. The JOIN operator always performs an inner join. Inner joins ignore null keys, so it makes sense to filter them out before the join.Note that the JOIN and COGROUP operators perform similar functions. JOIN creates a flat set of output records while COGROUP creates a nested set of output records.

alias = JOIN alias BY {expression|'(‘expression [, expression …]’)’} (, alias BY {expression|'(‘expression [, expression …]’)’} …) [USING ‘replicated’ | ‘skewed’ | ‘merge’] [PARALLEL n];

Lets join the service data with the metric data

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6

Input Metric Data

1,100,600
2,200,700
3,300,800

Lets run cross on the above datasets


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

B = LOAD 'metric.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

C = JOIN A BY service_id, B BY service_id;

dump C;

(1,NDATEST,/shelf=0/slot/port=1,1,100,600)
(2,NDATEST,/shelf=0/slot/port=2,2,200,700)
(3,NDATEST,/shelf=0/slot/port=3,3,300,800)

Outer Join

Use the OUTER JOIN operator to perform left, right, or full outer joins. The Pig Latin syntax closely adheres to the SQL standard. The keyword OUTER is optional for outer joins the keywords LEFT, RIGHT and FULL will imply left outer, right outer and full outer joins respectively when OUTER is omitted. Outer join has two constraints

1. Outer joins will only work provided the relations which need to produce nulls (in the case of non-matching keys) have schemas.

2. Outer joins will only work for two-way joins; to perform a multi-way outer join, you will need to perform multiple two-way outer join statements.

alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING ‘replicated’ | ‘skewed’] [PARALLEL n];

Lets join the service data with the metric data

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6

Input Metric Data

1,100,600
2,200,700
3,300,800
11,334,656


A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

B = LOAD 'metric.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

C = JOIN A BY service_id LEFT OUTER, B BY service_id;

Using replicated join


D = JOIN A BY service_id LEFT OUTER, B BY service_id USING 'replicated';

Using Skewed


D = JOIN A BY service_id LEFT OUTER, B BY service_id USING 'skewed';

output will be


(1,NDATEST,/shelf=0/slot/port=1,1,100,600)
(2,NDATEST,/shelf=0/slot/port=2,2,200,700)
(3,NDATEST,/shelf=0/slot/port=3,3,300,800)
(4,NDATEST,/shelf=0/slot/port=5,,,)
(4,NDATEST,/shelf=0/slot/port=4,,,)
(6,NDATEST,/shelf=0/slot/port=6,,,)

Right outer join

C = JOIN A BY service_id RIGHT OUTER, B BY service_id;

output will be

(1,NDATEST,/shelf=0/slot/port=1,1,100,600)
(2,NDATEST,/shelf=0/slot/port=2,2,200,700)
(3,NDATEST,/shelf=0/slot/port=3,3,300,800)
(,,,11,334,656)

Full Outer Join

C = JOIN A BY service_id FULL, B BY service_id;

Output will be

(1,NDATEST,/shelf=0/slot/port=1,1,100,600)
(2,NDATEST,/shelf=0/slot/port=2,2,200,700)
(3,NDATEST,/shelf=0/slot/port=3,3,300,800)
(4,NDATEST,/shelf=0/slot/port=5,,,)
(4,NDATEST,/shelf=0/slot/port=4,,,)
(6,NDATEST,/shelf=0/slot/port=6,,,)
(,,,11,334,656)

Outer joins will only work provided the relations which need to produce nulls (in the case of non-matching keys) have schemas.

Lets take a example


A = LOAD 'service.txt' using PigStorage(',') ;

B = LOAD 'metric.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

C = JOIN A BY $0 LEFT OUTER, B BY $0 ;

Here rows form relation B may have null values if there is no match. So to do a left outer join defining schema for relation B is mandatory or else pig will throw an exception.

Joining three relation

Input Service Data

1,NDATEST,/shelf=0/slot/port=1
2,NDATEST,/shelf=0/slot/port=2
3,NDATEST,/shelf=0/slot/port=3
4,NDATEST,/shelf=0/slot/port=4
4,NDATEST,/shelf=0/slot/port=5
6,NDATEST,/shelf=0/slot/port=6

Input Metric Data

1,100,600
2,200,700
3,300,800
11,334,656

Input perf data

1,2345454.44,232323.343

Inner Join

A = LOAD 'service.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray ) ;

B = LOAD 'metric.txt' using PigStorage(',') AS (service_id:chararray , neid:chararray,portid:chararray );

C = LOAD 'perf.txt' using PigStorage(',') AS (service_id:chararray , cpu:float,memory:float );

D = JOIN A BY service_id, B BY service_id,C BY service_id;

dump D;

(1,NDATEST,/shelf=0/slot/port=1,1,100,600,1,2345454.5,232323.34)

Outer Join

We have to achieve it using two commands here first join the relation a with b into ab and then ab with c. If you think about it, it is not natural to do it within a single command because of the intermediate state ab.


AB = JOIN A BY service_id LEFT OUTER, B BY service_id;

describe AB;

AB: {A::service_id: chararray,A::neid: chararray,A::portid: chararray,B::service_id: chararray,B::neid: chararray,B::portid: chararray}

ABC = JOIN AB BY A::service_id LEFT OUTER, C BY service_id;

dump ABC;

(1,NDATEST,/shelf=0/slot/port=1,1,100,600,1,2345454.5,232323.34)
(2,NDATEST,/shelf=0/slot/port=2,2,200,700,,,)
(3,NDATEST,/shelf=0/slot/port=3,3,300,800,,,)
(4,NDATEST,/shelf=0/slot/port=4,,,,,,)
(4,NDATEST,/shelf=0/slot/port=5,,,,,,)
(6,NDATEST,/shelf=0/slot/port=6,,,,,,)