Python实现MaxCompute UDF/UDAF/UDTF

发表于:2017-12-21 09:38  作者:隐林   来源:云栖社区

字体: | 上一篇 | 下一篇 |我要投稿 | 推荐标签: 软件开发 Python

  摘要:参数与返回值类型 参数与返回值通过如下方式指定: @odps.udf.annotate(signature) Python UDF目前支持ODPS SQL数据类型有:bigint, string, double, boolean和datetime。
  MaxCompute 的 UDF 包括:UDF,UDAF 和 UDTF 三种函数,本文将重点介绍如何通过 Python实现这三种函数。
  参数与返回值类型
  参数与返回值通过如下方式指定:
  @odps.udf.annotate(signature)
  Python UDF目前支持ODPS SQL数据类型有:bigint, string, double, boolean和datetime。SQL语句在执行之前,所有函数的参数类型和返回值类型必须确定。因此对于Python这一动态类型语言,需要通过对UDF类加decorator的方式指定函数签名。
  函数签名signature通过字符串指定,语法如下:
  arg_type_list '->' type_list
  arg_type_list: type_list | '*' | ''
  type_list: [type_list ','] type
  type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime'
  ·  箭头左边表示参数类型,右边表示返回值类型。
  ·  只有UDTF的返回值可以是多列, UDF和UDAF只能返回一列。
  ·  ‘*’代表变长参数,使用变长参数,UDF/UDTF/UDAF可以匹配任意输入参数。
  下面是合法的signature的例子:
  'bigint,double->string'            # 参数为bigint、double,返回值为string
  'bigint,boolean->string,datetime'  # UDTF参数为bigint、boolean,返回值为string,datetime
  '*->string'                        # 变长参数,输入参数任意,返回值为string
  '->double'                         # 参数为空,返回值为double
  Query语义解析阶段会将检查到不符合函数签名的用法,抛出错误禁止执行。执行期,UDF函数的参数会以函数签名指定的类型传给用户。用户的返回值类型也要与函数签名指定的类型一致,否则检查到类型不匹配时也会报错。ODPS SQL数据类型对应Python类型如下:
  注解:
  ·  Datetime类型是以int的形式传给用户代码的,值为epoch utc time起始至今的毫秒数。用户可以通过Python标准库中的datetime模块处理日期时间类型。
  ·  NULL值对应Python里的None。
  此外,odps.udf.int(value[, silent=True])的参数也做了调整。增加了参数 silent 。当 silent 为 True 时,如果 value 无法转为 int ,不会抛出异常,而是返回 None 。
  UDF
  实现Python UDF非常简单,只需要定义一个new-style class,并实现 evaluate 方法。下面是一个例子:
from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
  注解:Python UDF必须通过annotate指定函数签名。
  UDAF
  ·  class odps.udf.BaseUDAF:继承此类实现Python UDAF。
  ·  BaseUDAF.new_buffer():实现此方法返回聚合函数的中间值的buffer。buffer必须是mutable object(比如list, dict),并且buffer的大小不应该随数据量递增,在极限情况下,buffer marshal过后的大小不应该超过2Mb。
  ·  BaseUDAF.iterate(buffer[, args, ...]):实现此方法将args聚合到中间值buffer中。
  ·  BaseUDAF.merge(buffer, pbuffer):实现此方法将两个中间值buffer聚合到一起,即将pbuffer merge到buffer中。
  ·  BaseUDAF.terminate(buffer):实现此方法将中间值buffer转换为ODPS SQL基本类型。
  下面是一个UDAF求平均值的例子。
#coding:utf-8
from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('double->double')
class Average(BaseUDAF):
def new_buffer(self):
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
  UDTF
  ·  class odps.udf.BaseUDTF:Python UDTF的基类,用户继承此类,并实现 process , close 等方法。
  ·  BaseUDTF.init():初始化方法,继承类如果实现这个方法,则必须在一开始调用基类的初始化方法 super(BaseUDTF, self).init() 。init 方法在整个UDTF生命周期中只会被调用一次,即在处理第一条记录之前。当UDTF需要保存内部状态时,可以在这个方法中初始化所有状态。
  ·  BaseUDTF.process([args, ...]):这个方法由ODPS SQL框架调用,SQL中每一条记录都会对应调用一次 process , process 的参数为SQL语句中指定的UDTF输入参数。
  ·  BaseUDTF.forward([args, ...]):UDTF的输出方法,此方法由用户代码调用。每调用一次 forward ,就会输出一条记录。 forward 的参数为SQL语句中指定的UDTF的输出参数。
  ·  BaseUDTF.close():UDTF的结束方法,此方法由ODPS SQL框架调用,并且只会被调用一次,即在处理完最后一条记录之后。
  下面是一个UDTF的例子。
  #coding:utf-8
  # explode.py
  from odps.udf import annotate
  from odps.udf import BaseUDTF
  @annotate('string -> string')
  class Explode(BaseUDTF):
  """将string按逗号分隔输出成多条记录
  """
  def process(self, arg):
  props = arg.split(',')
  for p in props:
  self.forward(p)
  注解:Python UDTF也可以不加annotate指定参数类型和返回值类型。这样,函数在SQL中使用时可以匹配任意输入参数,但返回值类型无法推导,所有输出参数都将认为是string类型。因此在调用 forward 时,就必须将所有输出值转成 str 类型。
  引用资源
  Python UDF可以通过 odps.distcache 模块引用资源文件,目前支持引用文件资源和表资源。
  ·  odps.distcache.get_cache_file(resource_name):
  o    返回指定名字的资源内容。 resource_name 为 str 类型,对应当前Project中已存在的资源名。如果资源名非法或者没有相应的资源,会抛出异常。
  o    返回值为 file-like object ,在使用完这个object后,调用者有义务调用 close 方法释放打开的资源文件。
  下面是使用 get_cache_file 的例子:
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->string')
class DistCacheExample(object):
def __init__(self):
cache_file = get_cache_file('test_distcache.txt')
kv = {}
for line in cache_file:
line = line.strip()
if not line:
continue
k, v = line.split()
kv[int(k)] = v
cache_file.close()
self.kv = kv
def evaluate(self, arg):
return self.kv.get(arg)
  ·  odps.distcache.get_cache_table(resource_name)
  o    返回指定资源表的内容。 resource_name 为 str 类型,对应当前Project中已存在的资源表名。如果资源名非法或者没有相应的资源,会抛出异常。
  o    返回值为 generator 类型,调用者通过遍历获取表的内容,每次遍历得到的是以 tuple 形式存在的表中的一条记录。
  下面是使用 get_cache_table 的例子:
from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
def __init__(self):
self.records = list(get_cache_table('udf_test'))
self.counter = 0
self.ln = len(self.records)
def evaluate(self):
if self.counter > self.ln - 1:
return None
ret = self.records[self.counter]
self.counter += 1
return str(ret)

上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理。

评 论

论坛新帖

顶部 底部


建议使用IE 6.0以上浏览器,800×600以上分辨率,法律顾问:上海信义律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2021, 沪ICP备05003035号
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪公网安备 31010102002173号

51Testing官方微信

51Testing官方微博

扫一扫 测试知识全知道