一、UDF的分類
UDF類型 |
描述 |
UDF(User Defined Scalar Function) |
用戶自定義標量值函數。其輸入與輸出是一對一的關係,即讀入一行數據,輸出一個值。 |
UDTF(User Defined Table Valued Function) |
自定義表值函數。用於解決調用一次函數輸出多行數據的需求。UDTF是唯一能夠返回多個字段的自定義函數。UDTF不等於UDT(User Defined Type)。 |
UDAF(User Defined Aggregation Function) |
自定義聚合函數。其輸入與輸出是多對一的關係,即將多條輸入記錄聚合成一個輸出值。UDAF可以與SQL中的GROUP BY語句聯用。具體語法請參見 。 |
二、UDF參數解析
MaxCompute數據類型與Java數據類型的對應關係如下。
注意點:
- 此處ARRAY類型對應的Java類型是List,而不是數組。
- VARCHAR,BINART,STRUCT一些數據類型是ODPS獨有的
- Java中對應的數據類型以及返回值數據類型是對象,數據類型首字母需大寫。
MaxCompute Type |
Java Type |
TINYINT |
java.lang.Byte |
SMALLINT |
java.lang.Short |
INT |
java.lang.Integer |
BIGINT |
java.lang.Long |
FLOAT |
java.lang.Float |
DOUBLE |
java.lang.Double |
DECIMAL |
java.math.BigDecimal |
BOOLEAN |
java.lang.Boolean |
STRING |
java.lang.String |
VARCHAR |
com.aliyun.odps.data.Varchar |
BINARY |
com.aliyun.odps.data.Binary |
DATETIME |
java.util.Date |
TIMESTAMP |
java.sql.Timestamp |
ARRAY |
java.util.List |
MAP |
java.util.Map |
STRUCT |
com.aliyun.odps.data.Struct |
MaxCompute 2.0版本支持定義Java UDF時,使用Writable類型作為參數和返回值。MaxCompute數據類型和Java Writable類型的映射關係如下。
MaxCompute Type |
Java Writable Type |
TINYINT |
ByteWritable |
SMALLINT |
ShortWritable |
INT |
IntWritable |
BIGINT |
LongWritable |
FLOAT |
FloatWritable |
DOUBLE |
DoubleWritable |
DECIMAL |
BigDecimalWritable |
BOOLEAN |
BooleanWritable |
STRING |
Text |
VARCHAR |
VarcharWritable |
BINARY |
BytesWritable |
DATETIME |
DatetimeWritable |
TIMESTAMP |
TimestampWritable |
INTERVAL_YEAR_MONTH |
IntervalYearMonthWritable |
INTERVAL_DAY_TIME |
IntervalDayTimeWritable |
ARRAY |
N/A |
MAP |
N/A |
STRUCT |
N/A |
MaxCompute SQL Type |
Python 2 Type |
BIGINT |
INT |
STRING |
STR |
DOUBLE |
FLOAT |
BOOLEAN |
BOOL |
DATETIME |
INT |
FLOAT |
FLOAT |
CHAR |
STR |
VARCHAR |
STR |
BINARY |
BYTEARRAY |
DATE |
INT |
DECIMAL |
DECIMAL.DECIMAL |
ARRAY |
LIST |
MAP |
DICT |
STRUCT |
COLLECTIONS.NAMEDTUPLE |
MaxCompute SQL Type |
Python 3 Type |
BIGINT |
INT |
STRING |
UNICODE |
DOUBLE |
FLOAT |
BOOLEAN |
BOOL |
DATETIME |
DATETIME.DATETIME |
FLOAT |
FLOAT |
CHAR |
UNICODE |
VARCHAR |
UNICODE |
BINARY |
BYTES |
DATE |
DATETIME.DATE |
DECIMAL |
DECIMAL.DECIMAL |
ARRAY |
LIST |
MAP |
DICT |
STRUCT |
COLLECTIONS.NAMEDTUPLE |
三、UDF的使用方式
UDF、UDTF、UDAT可進行參考文檔
https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHb
JAVA UDF
UDF的高級使用:
3.1UDF中的變長參數
java語言:
package com.mrtest.cn; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.annotation.Resolve; import java.util.ArrayList; import java.util.List; "*->array"}) ({public class TestUDF extends UDF { public List evaluate(String ... s) { List list = new ArrayList(); for (String name : s) { list.add(name); } return list; } }
Python語言:
from odps.udf import annotate "*->bigint") (class ParamFunc(object): def evaluate(self, *nums): sum = 0 for num in nums: sum=num+sum return sum
3.2UDF的重載
注意事項:對於List與List是不能解析對應的方法的,這種屬於類型擦除
package com.aliyun.odps.examples.udf; import com.aliyun.odps.udf.UDF; public class UDFExample extends UDF { public String evaluate(String a) { return "s2s:" + a; } public String evaluate(String a, String b) { return "ss2s:" + a + "," + b; } public String evaluate(String a, String b, String c) { return "sss2s:" + a + "," + b + "," + c; } }
3.3UDF訪問對應文件和表
java語言:
package com.aliyun.odps.examples.udf; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Iterator; public class UDFResource extends UDF { ExecutionContext ctx; long fileResourceLineCount; long tableResource1RecordCount; long tableResource2RecordCount; public void setup(ExecutionContext ctx) throws UDFException { this.ctx = ctx; try { InputStream in = ctx.readResourceFileAsStream("file_resource.txt"); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line; fileResourceLineCount = 0; while ((line = br.readLine()) != null) { fileResourceLineCount++; } br.close(); Iterator iterator = ctx.readResourceTable("table_resource1").iterator(); tableResource1RecordCount = 0; while (iterator.hasNext()) { tableResource1RecordCount++; iterator.next(); } iterator = ctx.readResourceTable("table_resource2").iterator(); tableResource2RecordCount = 0; while (iterator.hasNext()) { tableResource2RecordCount++; iterator.next(); } } catch (IOException e) { throw new UDFException(e); } } /** * project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb */ public String evaluate(String a, String b) { return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount; } }
python語言:
#coding: utf-8 from odps.udf import annotate from odps.distcache import get_cache_file 'double -> double') (class Compute(object): def __init__(self): import json #獲取對應文本文件 cache_file = get_cache_file('file.txt') dataMat = [] for line in cache_file : curLine = line.strip().split(',') #處理邏輯 cache_file.close() #獲取對應的表文件 records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = [record[1]] #處理邏輯 def evaluate(self, input): #處理邏輯
3.4UDF訪問外部網絡(VPC、外部網絡、專有網絡)
https://help.aliyun.com/document_detail/187866.html
3.5UDF使用第三方包
https://help.aliyun.com/document_detail/189752.html
#coding: utf-8 # explode.py from odps.udf import annotate from odps.distcache import get_cache_archive import datetime def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) sys.path.append(os.path.dirname(dir_names[0])) "string->boolean") (class is_workday_udf(object): def __init__(self): include_package_path('chinese-calendar-master.zip') def evaluate(self, date_str): # try: import chinese_calendar date_strs = date_str.split("-") year_num = int(date_strs[0]) month_num = int(date_strs[1]) day_num = int(date_strs[2]) date_num = datetime.date(year=year_num, month=month_num, day=day_num) result = chinese_calendar.is_workday(date_num) return result # except: # return True
函數的註冊
執行的select的的操作
set odsp.pypy.enabled=false; set odps.isolation.session.enable=true; select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}') as a;
3.6使用嵌入式開發UDF
java語言:
CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING #CODE ('lang'='JAVA') package com.mypackage; import com.aliyun.odps.udf.UDF; public class Reverse extends UDF { public String evaluate(String input) { if (input == null) return null; StringBuilder ret = new StringBuilder(); for (int i = input.toCharArray().length - 1; i >= 0; i--) { ret.append(input.toCharArray()[i]); } return ret.toString(); } } #END CODE;
SELECT foo('abdc');
- 嵌入式代碼塊可以置於USING後或腳本末尾,置於USING後的代碼塊作用域僅為CREATE TEMPORARY FUNCTION語句。
- CREATE TEMPORARY FUNCTION創建的函數為臨時函數,僅在本次執行生效,不會存入MaxCompute的Meta系統。
python語言:
CREATE TEMPORARY FUNCTION foo AS 'embedded.UDFTest' USING #CODE ('lang'='PYTHON', 'filename'='embedded') from odps.udf import annotate "bigint->bigint") (class UDFTest(object): def evaluate(self, a): return a * a #END CODE;
SELECT foo(4);
- Python代碼的縮進需要符合Python語言規範。
- 由於註冊Python UDF時AS後的類名需要包含Python源碼的文件名,您可以通過’filename’=’embedded’指定一個虛擬文件名。
3.7使用SQL語言定義函數
create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINT as begin @temp := @a + @b; @my_sum := @temp + @c; end;
本文使用的方案,在其中使用的時候要注意幾點:
1.如果在使用企業版DataWorks的API測試時,出現權限錯誤,需要提工單開通DataWorks的API功能。
2.在使用時要注意計算的類型以及計算方式,將本文的實踐代碼略作修改,另外統計時也要關注各個字段的單位,以及數據類型方便最後的統計計算。
3.要仔細查看對應的文檔介紹,熟悉其使用方法以及對應的參數設置。
歡迎加入“MaxCompute開發者社區2群”,點擊鏈接申請加入或掃描二維碼