開發與維運

UDF精簡使用大全

一、UDF的分類

UDF類型

描述

UDF(User Defined Scalar Function)

用戶自定義標量值函數。其輸入與輸出是一對一的關係,即讀入一行數據,輸出一個值。

UDTF(User Defined Table Valued Function)

自定義表值函數。用於解決調用一次函數輸出多行數據的需求。UDTF是唯一能夠返回多個字段的自定義函數。UDTF不等於UDT(User Defined Type)。

UDAF(User Defined Aggregation Function)

自定義聚合函數。其輸入與輸出是多對一的關係,即將多條輸入記錄聚合成一個輸出值。UDAF可以與SQL中的GROUP BY語句聯用。具體語法請參見

聚合函數

二、UDF參數解析

MaxCompute數據類型與Java數據類型的對應關係如下。

注意點:

  • 此處ARRAY類型對應的Java類型是List,而不是數組。
  • VARCHAR,BINART,STRUCT一些數據類型是ODPS獨有的
  • Java中對應的數據類型以及返回值數據類型是對象,數據類型首字母需大寫。

MaxCompute Type

Java Type

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INT

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL

java.math.BigDecimal

BOOLEAN

java.lang.Boolean

STRING

java.lang.String

VARCHAR

com.aliyun.odps.data.Varchar

BINARY

com.aliyun.odps.data.Binary

DATETIME

java.util.Date

TIMESTAMP

java.sql.Timestamp

ARRAY

java.util.List

MAP

java.util.Map

STRUCT

com.aliyun.odps.data.Struct

MaxCompute 2.0版本支持定義Java UDF時,使用Writable類型作為參數和返回值。MaxCompute數據類型和Java Writable類型的映射關係如下。

MaxCompute Type

Java Writable Type

TINYINT

ByteWritable

SMALLINT

ShortWritable

INT

IntWritable

BIGINT

LongWritable

FLOAT

FloatWritable

DOUBLE

DoubleWritable

DECIMAL

BigDecimalWritable

BOOLEAN

BooleanWritable

STRING

Text

VARCHAR

VarcharWritable

BINARY

BytesWritable

DATETIME

DatetimeWritable

TIMESTAMP

TimestampWritable

INTERVAL_YEAR_MONTH

IntervalYearMonthWritable

INTERVAL_DAY_TIME

IntervalDayTimeWritable

ARRAY

N/A

MAP

N/A

STRUCT

N/A

MaxCompute SQL Type

Python 2 Type

BIGINT

INT

STRING

STR

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

INT

FLOAT

FLOAT

CHAR

STR

VARCHAR

STR

BINARY

BYTEARRAY

DATE

INT

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

MaxCompute SQL Type

Python 3 Type

BIGINT

INT

STRING

UNICODE

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

DATETIME.DATETIME

FLOAT

FLOAT

CHAR

UNICODE

VARCHAR

UNICODE

BINARY

BYTES

DATE

DATETIME.DATE

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

三、UDF的使用方式

UDF、UDTF、UDAT可進行參考文檔

https://help.aliyun.com/document_detail/27867.html?spm=a2c4g.11186623.6.762.463d7468xnFPHb

JAVA UDF

UDF的高級使用:

3.1UDF中的變長參數

java語言:

package com.mrtest.cn;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.annotation.Resolve;
import java.util.ArrayList;
import java.util.List;
@Resolve({"*->array"})
public class TestUDF extends UDF {
    public List evaluate(String ... s) {
        List list = new ArrayList();
        for (String name : s) {
            list.add(name);
        }
        return list;
    }
}

Python語言:

from odps.udf import annotate
@annotate("*->bigint")
class ParamFunc(object):
   def evaluate(self, *nums):
        sum = 0
        for num in nums:
            sum=num+sum
        return sum

3.2UDF的重載

注意事項:對於List與List是不能解析對應的方法的,這種屬於類型擦除

package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.UDF;
public class UDFExample extends UDF {
  public String evaluate(String a) {
    return "s2s:" + a;
  }
  public String evaluate(String a, String b) {
    return "ss2s:" + a + "," + b;
  }
  public String evaluate(String a, String b, String c) {
    return "sss2s:" + a + "," + b + "," + c;
  }
}

3.3UDF訪問對應文件和表

java語言:

package com.aliyun.odps.examples.udf;
import com.aliyun.odps.udf.ExecutionContext;
import com.aliyun.odps.udf.UDF;
import com.aliyun.odps.udf.UDFException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
public class UDFResource extends UDF {
  ExecutionContext ctx;
  long fileResourceLineCount;
  long tableResource1RecordCount;
  long tableResource2RecordCount;
  @Override
  public void setup(ExecutionContext ctx) throws UDFException {
    this.ctx = ctx;
    try {
      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
      BufferedReader br = new BufferedReader(new InputStreamReader(in));
      String line;
      fileResourceLineCount = 0;
      while ((line = br.readLine()) != null) {
        fileResourceLineCount++;
      }
      br.close();
      Iterator iterator = ctx.readResourceTable("table_resource1").iterator();
      tableResource1RecordCount = 0;
      while (iterator.hasNext()) {
        tableResource1RecordCount++;
        iterator.next();
      }
      iterator = ctx.readResourceTable("table_resource2").iterator();
      tableResource2RecordCount = 0;
      while (iterator.hasNext()) {
        tableResource2RecordCount++;
        iterator.next();
      }
    } catch (IOException e) {
      throw new UDFException(e);
    }
  }
  /**
   * project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
   */
  public String evaluate(String a, String b) {
    return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
        + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
        + tableResource2RecordCount;
  }
}

python語言:

#coding: utf-8
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('double -> double')
class Compute(object):
    def __init__(self):
            import json
            #獲取對應文本文件
            cache_file = get_cache_file('file.txt')
            dataMat = []
            for line in cache_file :
                curLine = line.strip().split(',')
                #處理邏輯
            cache_file.close()
            #獲取對應的表文件
            records = list(get_cache_table('table_resource1'))
         for record in records:
             self.my_dict[record[0]] = [record[1]]
                #處理邏輯
    def evaluate(self, input):
        #處理邏輯

      

3.4UDF訪問外部網絡(VPC、外部網絡、專有網絡)

https://help.aliyun.com/document_detail/187866.html

3.5UDF使用第三方包

https://help.aliyun.com/document_detail/189752.html

#coding: utf-8
# explode.py
from odps.udf import annotate
from odps.distcache import get_cache_archive
import datetime
def include_package_path(res_name):
    import os, sys
    archive_files = get_cache_archive(res_name)
    dir_names = sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files 
                        if '.dist_info' not in f.name], key=lambda v: len(v))
    sys.path.append(os.path.dirname(dir_names[0]))
@annotate("string->boolean")
class is_workday_udf(object):
    def __init__(self):
        include_package_path('chinese-calendar-master.zip')
    def evaluate(self, date_str):
        # try:
        import chinese_calendar
        date_strs = date_str.split("-")
        year_num = int(date_strs[0])
        month_num = int(date_strs[1])
        day_num = int(date_strs[2])
        date_num = datetime.date(year=year_num, month=month_num, day=day_num)
        result = chinese_calendar.is_workday(date_num)
        return result
        # except:
        #     return True

函數的註冊

image

執行的select的的操作

set odsp.pypy.enabled=false;
set odps.isolation.session.enable=true;
select my_json('{"info":"11","desc":"a|b","filename":"4b-2a-3c-4d-5b"}') as a;

3.6使用嵌入式開發UDF

java語言:

CREATE TEMPORARY FUNCTION foo AS 'com.mypackage.Reverse' USING
#CODE ('lang'='JAVA')
package com.mypackage;
import com.aliyun.odps.udf.UDF;
public class Reverse extends UDF {
  public String evaluate(String input) {
    if (input == null) return null;
    StringBuilder ret = new StringBuilder();
    for (int i = input.toCharArray().length - 1; i >= 0; i--) {
      ret.append(input.toCharArray()[i]);
    }
    return ret.toString();
  }
}
#END CODE;

SELECT foo('abdc');

  • 嵌入式代碼塊可以置於USING後或腳本末尾,置於USING後的代碼塊作用域僅為CREATE TEMPORARY FUNCTION語句。
  • CREATE TEMPORARY FUNCTION創建的函數為臨時函數,僅在本次執行生效,不會存入MaxCompute的Meta系統。

python語言:

CREATE TEMPORARY FUNCTION foo AS 'embedded.UDFTest' USING
#CODE ('lang'='PYTHON', 'filename'='embedded')
from odps.udf import annotate
@annotate("bigint->bigint")
class UDFTest(object):
  def evaluate(self, a):
    return a * a
#END CODE;

SELECT foo(4);

  • Python代碼的縮進需要符合Python語言規範。
  • 由於註冊Python UDF時AS後的類名需要包含Python源碼的文件名,您可以通過’filename’=’embedded’指定一個虛擬文件名。

3.7使用SQL語言定義函數

create sql function my_sum(@a BIGINT, @b BIGINT, @c BIGINT) returns @my_sum BIGINT
as begin 
    @temp := @a + @b;
    @my_sum := @temp + @c;
end;
create sql function my_func(@s STRING)
AS if(@s rlike '"git_(m|a)"', 1, 0);

本文使用的方案,在其中使用的時候要注意幾點:

1.如果在使用企業版DataWorks的API測試時,出現權限錯誤,需要提工單開通DataWorks的API功能。

2.在使用時要注意計算的類型以及計算方式,將本文的實踐代碼略作修改,另外統計時也要關注各個字段的單位,以及數據類型方便最後的統計計算。

3.要仔細查看對應的文檔介紹,熟悉其使用方法以及對應的參數設置。

歡迎加入“MaxCompute開發者社區2群”,點擊鏈接申請加入或掃描二維碼

https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745

image

Leave a Reply

Your email address will not be published. Required fields are marked *