為了方便用戶使用 MapReduce及UDF的Java SDK進行開發工作,MaxCompute 提供了Eclipse開發插件。該插件能夠模擬MapReduce及UDF的運行過程,為用戶提供本地調試手段,並提供了簡單的模板生成功能。
一、安裝
與MapReduce提供的本地運行模式不同,Eclipse插件不能夠與ODPS同步數據。用戶使用的數據需要手動拷貝到Eclipse插件的warehouse目錄下。
下載Eclipse插件後,將軟件包解壓,會看到如下jar內容:
odps-eclipse-plugin-bundle-X.X.X.jar
將插件放置在Eclipse安裝目錄的plugins子目錄下。打開Eclipse,點擊右上角的打開透視圖(Open Perspective)。
點擊後出現透視圖列表。
擇ODPS,隨後點擊OK鍵。同樣在右上角會出現ODPS圖標,表示插件生效。
1、 創建ODPS工程
創建ODPS工程有兩種方式。
方式一:
在左上角選擇文件(File) -> 新建(New)->Project->ODPS->ODPS Project,創建工程(示例中使用ODPS作為工程名)。
創建ODPS工程後會出現如下對話框。輸入Project name,選擇ODPS客戶端路徑(客戶端需要提前下載),並確認(點擊Finish)。
創建好工程後,在左側包資源管理器(Package Explorer)中可以看到如下目錄結構。
方式二:
直接點擊左上角的"新建"。
彈出對話框後,選擇"ODPS Project",點擊"下一步"。
後續操作同方式一。
2、MapReduce開發插件介紹
(1). 快速運行WordCount示例
選擇ODPS項目中的WordCount示例。
右鍵"WordCount.java",依次點擊"Run As","ODPS MapReduce"。
彈出對話框後,選擇"example_project",點擊確認。
運行成功後,會出現以下結果提示。
(2). 運行自定義MapReduce程序
右鍵選擇src目錄,選擇新建(New) -> Mapper。
選擇Mapper後出現下面的對話框。輸入Mapper類的名字,並確認。
會看到在左側包資源管理器(Package Explorer)中,src目錄下生成文件UserMapper.java。該文件的內容即是一個Mapper類的模板。
package odps;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class UserMapper extends MapperBase {
@Override
public void setup(TaskContext context) throws IOException {
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
模板中,將package名稱默認配置為"odps",用戶可以根據自己的需求進行修改。編寫模板內容。
package odps;
import java.io.IOException;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class UserMapper extends MapperBase {
Record word;
Record one;
Counter gCnt;
@Override
public void setup(TaskContext context) throws IOException {
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.set(new Object[] { 1L });
gCnt = context.getCounter("MyCounters", "global_counts");
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
for (int i = 0; i < record.getColumnCount(); i++) {
String[] words = record.get(i).toString().split("\\s+");
for (String w : words) {
word.set(new Object[] { w });
Counter cnt = context.getCounter("MyCounters", "map_outputs");
cnt.increment(1);
gCnt.increment(1);
context.write(word, one);
}
}
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
同理,右鍵選擇src目錄,選擇新建(New)->Reduce。
輸入Reduce類的名字(本示例使用UserReduce)。同樣在包資源管理器(Package Explorer)中,src目錄下生成文件UserReduce.java。該文件的內容即是一個Reduce類的模板。
package odps;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.counter.Counter;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;
public class UserReduce extends ReducerBase {
private Record result;
Counter gCnt;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
gCnt = context.getCounter("MyCounters", "global_counts");
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long count = 0;
while (values.hasNext()) {
Record val = values.next();
count += (Long) val.get(0);
}
result.set(0, key.get(0));
result.set(1, count);
Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
cnt.increment(1);
gCnt.increment(1);
context.write(result);
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
創建main函數,右鍵選擇src目錄,選擇新建(New) -> MapReduce Driver。填寫Driver Name(示例中是UserDriver), Mapper及Recduce類(示例中是UserMapper及UserReduce),並確認。同樣會在src目錄下看到MyDriver.java文件。
編輯driver內容。
package odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.examples.mr.WordCount.SumCombiner;
import com.aliyun.odps.examples.mr.WordCount.SumReducer;
import com.aliyun.odps.examples.mr.WordCount.TokenizerMapper;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class UserDriver {
public static void main(String[] args) throws OdpsException {
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(SumReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
InputUtils.addTable(
TableInfo.builder().tableName("wc_in1").cols(new String[] { "col2", "col3" }).build(), job);
InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
RunningJob rj = JobClient.runJob(job);
rj.waitForCompletion();
}
}
運行MapReduce程序,選中UserDriver.java,右鍵選擇Run As -> ODPS MapReduce,點擊確認。出現如下對話框。
選擇ODPS Project為example_project,點擊Finish按鈕開始本地運行MapReduce程序。
有如上輸出信息,說明本地運行成功。運行的輸出結果在warehouse目錄下。
wc_out即是輸出目錄,R_000000即是結果文件。通過本地調試,確定輸出結果正確後,可以通過Eclipse導出(Export)功能將MapReduce打包。打包後將jar包上傳到ODPS中。
本地調試通過後,用戶可以通過Eclipse的Export功能將代碼打成jar包,供後續分佈式環境使用。在本示例中,我們將程序包命名為mr-examples.jar。選擇src目錄,點擊Export。
選擇導出模式為Jar File。
僅需要導出src目錄下package(com.aliyun.odps.mapred.open.example),Jar File名稱指定為"mr-examples.jar"。
確認後,導出成功。
如果用戶想在本地模擬新建Project,可以在warehouse下面,創建一個新的子目錄(與example_project平級的目錄)。
|____my_project (項目空間目錄)
|____ <__tables__>
| |__table_name1(非分區表)
| | |____ data(文件)
| | |
| | |____ <__schema__> (文件)
| |
| |__table_name2(分區表)
| |_____partition_name=partition_value(分區目錄)
| | |____ data(文件)
| |
| |____ <__schema__> (文件)
|
|____ <__resources__>
|
|___table_resource_name (表資源)
| |____<__ref__>
|
|___ file_resource_name(文件資源)
schema文件示例:
非分區表:
project=project_name
table=table_name columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
分區表:
project=project_name
table=table_name columns=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING partitions=col1:BIGINT,col2:DOUBLE,col3:BOOLEAN,col4:DATETIME,col5:STRING
data文件示例:
1,1.1,true,2015-06-04 11:22:42 896,hello world
N,N,N,N,N
8.4 UDF開發插件介紹
(1). Local Debug UDF程序
在本章節我們將介紹如何使用Eclipse插件開發並在本地運行UDF。UDAF和UDTF的編寫執行過程與UDF類似,均可參考UDF的示例介紹完成。ODPS Eclipse插件提供兩種運行UDF的方式,菜單欄和右鍵單擊快速運行方式。
菜單欄運行
從菜單欄選擇Run-->Run Configurations...彈出如下對話框。
用戶可以新建一個Run Configuration,選擇運行的UDF類及類型、選擇ODPS Project、填寫輸入表信息。
上述配置中,"Table"表示UDF的輸入表,"Partitions"表示讀取某個分區下的數據,分區由逗號分隔,"Columns"表示列,將依次作為UDF函數的參數被傳入,列名由逗號分隔。
點擊"Run"運行,運行結果將顯示在控制檯中。
右鍵單擊快速運行
選中一個udf.java文件(比如:UDFExample.java)並單擊鼠標右鍵,選擇"Run As" -> "Run UDF|UDAF|UDTF"。
填入配置信息。
上述配置中,"Table"表示UDF的輸入表,"Partitions"表示讀取某個分區下的數據,分區由逗號分隔,"Columns"表示列,將依次作為UDF函數的參數被傳入,列名由逗號分隔。
點擊"Finish"後,運行UDF,獲得輸出結果。
(2). 運行用戶自定義UDF程序
右擊一個工程並選擇"New-->UDF"(或者選擇菜單欄File-->New-->UDF)。
填寫UDF類名然後點擊"Finish"。在對應的src目錄下生成與UDF類名同名的Java文件,編輯該java文件內容。
package odps;
import com.aliyun.odps.udf.UDF;
public class UserUDF extends UDF {
/**
* project: example_project
* table: wc_in1
* columns: col1,col2
*
*/
public String evaluate(String a, String b) {
return "ss2s:" + a + "," + b;
}
}
右擊該java文件(如UserUDF.java),選擇"Run As",再選擇"ODPS UDF|UDTF|UDAF"。
配置如下對話框。
點擊"finish",得出結果。
ss2s:A1,A2
ss2s:A1,A2
ss2s:A1,A2
ss2s:A1,A2