在這里貼一個(gè)pig源碼的分析,做pig很長(zhǎng)時(shí)間沒做筆記,不包含任何細(xì)節(jié),以后有機(jī)會(huì)再說吧
http://blackproof.iteye.com/blog/1769219hadoop pig入門總結(jié)
pig簡(jiǎn)介
pig數(shù)據(jù)類型
pig latin語(yǔ)法
pig udf自定義
pig derived衍生
推薦書籍 programming pig
推薦網(wǎng)站 http://pig.apache.org/docs/r0.10.0/basic.html
pig簡(jiǎn)介
pig是hadoop上層的衍生架構(gòu),與hive類似。對(duì)比hive(hive類似sql,是一種聲明式的語(yǔ)言),pig是一種過程語(yǔ)言,類似于存儲(chǔ)過程一步一步得進(jìn)行數(shù)據(jù)轉(zhuǎn)化。
pig數(shù)據(jù)類型
double > float > long > int > bytearray
tuple|bag|map|chararray > bytearray
double float long int chararray bytearray都相當(dāng)于pig的基本類型
tuple相當(dāng)于數(shù)組 ,但是可以類型不一,舉例('dirkzhang','dallas',41)
Bag相當(dāng)于tuple的一個(gè)集合,舉例{('dirk',41),('kedde',2),('terre',31)},在group的時(shí)候會(huì)生成bag
Map相當(dāng)于哈希表,key為chararray,value為任意類型,例如['name'#dirk,'age'#36,'num'#41
nulls 表示的不只是數(shù)據(jù)不存在,他更表示數(shù)據(jù)是unkown
pig latin語(yǔ)法
1:load
LOAD 'data' [USING function] [AS schema];
例如:
load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}' USING com.xxxx.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);
Table = load ‘url’ as (id,name…..); //table和load之間除了等號(hào)外 還必須有個(gè)空格 不然會(huì)出錯(cuò),url一定要帶引號(hào),且只能是單引號(hào)。
2:filter
alias = FILTER alias BY expression;
Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and 和or連接各條件
例如:
filter = filter load20 by ( MONTH_ID == '1210' and DAY_ID == '18' and PROV_ID == '010' );
3:group
alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
pig的分組,不僅是數(shù)據(jù)上的分組,在數(shù)據(jù)的schema形式上也進(jìn)行分組為groupcolumn:bag
Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括號(hào)必須加
可以使用ALL實(shí)現(xiàn)對(duì)所有字段的分組
4:foreach
alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];
alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression [AS schema] [expression [AS schema]….]
};
一般跟generate一塊使用
Table = foreach Table generate (id,name);括號(hào)可加可不加。
avg = foreach Table generate group, AVG(age); MAX ,MIN..
在進(jìn)行數(shù)據(jù)過濾時(shí),建議盡早使用foreach generate將多余的數(shù)據(jù)過濾掉,減少數(shù)據(jù)交換
5:join
Inner join Syntax
alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];
Outer join Syntax
alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
join/left join / right join
daily = load 'A' as (id,name, sex);
divs = load 'B' as (id,name, sex);
join
jnd = join daily by (id, name), divs by (id, name);
left join
jnd = join daily by (id, name) left outer, divs by (id, name);
也可以同時(shí)多個(gè)變量,但只用于inner join
A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;
6: union
alias = UNION [ONSCHEMA] alias, alias [, alias …];
union 相當(dāng)與sql中的union,但與sql不通的是pig中的union可以針對(duì)兩個(gè)不同模式的變量:如果兩個(gè)變量模式相同,那么union后的變量模式與 變量的模式一樣;如果一個(gè)變量的模式可以由另一各變量的模式強(qiáng)制類型轉(zhuǎn)換,那么union后的變量模式與轉(zhuǎn)換后的變量模式相同;否則,union后的變量 沒有模式。
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;
C: {x: int,y: float}
A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: double,y: double}
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown.
注意:在pig 1.0中 執(zhí)行不了最后一種union。
如果需要對(duì)兩個(gè)具有不通列名的變量union的話,可以使用onschema關(guān)鍵字
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}
join和union之后alias的別名會(huì)變
7:Dump
dump alias
用于在屏幕上顯示數(shù)據(jù)。
8:Order by
alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];
A = order Table by id desc;
9:distinct
A = distinct alias;
10:limit
A = limit alias 10;
11:sample
SAMPLE alias size;
隨機(jī)抽取指定比例(0到1)的數(shù)據(jù)。
some = sample divs 0.1;
13:cross
alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];
將多個(gè)數(shù)據(jù)集中的數(shù)據(jù)按照字段名進(jìn)行同值組合,形成笛卡爾積。
--cross.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;
15:split
Syntax
SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];
A = LOAD 'data' AS (f1:int,f2:int,f3:int);
DUMP A;
(1,2,3)
(4,5,6)
(7,8,9)
SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);
DUMP X;
(1,2,3)
(4,5,6)
DUMP Y;
(4,5,6)
DUMP Z;
(1,2,3)
(7,8,9)
16:store
Store … into … Using…
pig在別名維護(hù)上:
1、join
如e = join d by name,b by name;
g = foreach e generate $0 as one:chararray, $1 as two:int, $2 as three:chararray,$3 asfour:int;
他生成的schemal:
e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}
g: {one: chararray,two: int,three: chararray,four: int}
2、group
B = GROUP A BY age;
----------------------------------------------------------------------| B | group: int | A: bag({name: chararray,age: int,gpa: float}) |----------------------------------------------------------------------| | 18 | {(John, 18, 4.0), (Joe, 18, 3.8)} || | 20 | {(Bill, 20, 3.9)} |---------------------------------------------------------------------- (18,{(John,18,4.0F),(Joe,18,3.8F)})
pig udf自定義
pig支持嵌入user defined function,一個(gè)簡(jiǎn)單的udf 繼承于evalFunc,通常用在filter,foreach中
Java代碼
public class MyUDF extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
if(input == null || input.size() ==0)
return null;
try {
String val = (String) input.get(0);
return new StringBuffer(val).append(" pig").toString();
} catch (Exception e) {
throw new IOException(e.getMessage());
}
}
}
pig支持udf in loader and store
udf loader 需要繼承于LoadFunc
udf storer 需要繼承于StoreFunc
這類似于hadoop中寫inputformat和outputformat
其中vertica就是寫了一個(gè)DB版本的
這里貼一個(gè)簡(jiǎn)單的loader的例子:
Java代碼
public class MyLoader extends LoadFunc{
protected RecordReader recordReader = null;
private PreparedStatement ps;
private Connection conn;
private final String jdbcURL;
private final String user;
private final String pwd;
private final String querySql;
private ResultSet rs;
public MyLoader(String driver,String jdbcURL,String user,String pwd,String querySql){
try {
Class.forName(driver);
} catch (Exception e) {
// TODO: handle exception
}
this.jdbcURL = jdbcURL;
this.user = user;
this.pwd = pwd;
this.querySql = querySql;
}
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// TODO 重要的讀取過程
Text val = null;
boolean next = false;
try {
next = rs.next();
} catch (Exception e) {
// TODO: handle exception
}
if(!next)
return null;
ResultSetMetaData rsmd;
try {
// rsmd = result
} catch (Exception e) {
// TODO: handle exception
}
return null;
}
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException {
this.recordReader = arg0;
}
@Override
public void setLocation(String arg0, Job arg1) throws IOException {
//no idea
}
public ResourceSchema getSchema(String location,Job job) throws IOException{
Configuration conf = job.getConfiguration();
Schema schema = new Schema();
try {
//TODO:reader from database table
// Connection conn = DriverManager.getConnection(this.jdbcURL, this.user, this.pwd);
FieldSchema fieldName = new FieldSchema("name", DataType.CHARARRAY);
FieldSchema fieldPosition = new FieldSchema("position", DataType.INTEGER);
schema.add(fieldName);
schema.add(fieldPosition);
} catch (Exception e) {
//TODO log exception
}
return null;
}
public void prepareToRead(){
}
}
其中g(shù)etNext方法就是如何處理reader讀取出的數(shù)據(jù)
getSchema可以固定讀取數(shù)據(jù)的schema
setLocation可以處理輸入的數(shù)據(jù)源
prepareToRead是讀取數(shù)據(jù)之前,可以在此做標(biāo)識(shí),等等
pig 衍生
1.penny:
1. Penny的描述
Penny是pig的貢獻(xiàn)項(xiàng)目,是pig的調(diào)試和監(jiān)控工具,而且支持根據(jù)API自定義penny的監(jiān)視器和協(xié)作器,已實(shí)現(xiàn)不同的功能;
2. Penny的總架構(gòu)
Penny將監(jiān)視器插入到pig的工作操作中,主要用于監(jiān)視pig數(shù)據(jù)流的變化,監(jiān)視器可以調(diào)用協(xié)作器,完成各種功能。
3. Penny的總類圖關(guān)系
ParsePigScript負(fù)責(zé)根據(jù)用戶監(jiān)視器生成新計(jì)劃newPlan,在ToolsPigServer中根據(jù)以前的腳本執(zhí)行新計(jì)劃。在執(zhí)行新計(jì)劃時(shí),當(dāng)監(jiān)視器監(jiān)視對(duì)象數(shù)據(jù)發(fā)生變化,出發(fā)監(jiān)視器,運(yùn)行自定義的業(yè)務(wù),也可以將數(shù)據(jù)流變化傳回協(xié)作器里處理,總類圖如下:
4. Penny的使用
Penny的使用需要自定義兩個(gè)類,一個(gè)類繼承于監(jiān)視器基類MonitorAgent,另一個(gè)繼承于協(xié)作器基類Coordinator。然后根據(jù)上邊類圖,就可以使用PennyServer和ParsePigScript進(jìn)行監(jiān)控和調(diào)試
5.在pig中就可以找到penny這個(gè)貢獻(xiàn)的源碼
Vertica:
vertica是pig loader和storer的udf
附件里是vertica,來(lái)自github,和vertica的介紹使用文檔
貼一篇將vertica的帖子
http://blackproof.iteye.com/blog/1791995推薦書籍
programming pig
推薦網(wǎng)址
http://pig.apache.org/docs/r0.10.0/basic.html 官網(wǎng)
pig pen開發(fā)工具,這個(gè)我現(xiàn)在玩得還不熟,就不介紹了,有興趣的可以去搜搜玩玩
我在工作中pig的使用,主要是數(shù)據(jù)的ETL,所以比較適合。在選擇pig hive還是其他非hadoop架構(gòu),如redis,這還是一個(gè)需要繼續(xù)嘗試探索的問題。