亚马逊AWS官方博客

Amazon Redshift UDF实现对比

Mike Kong, 亚马逊云科技专业服务团队

在数据仓库深度应用的场景下,仅靠SQL往往很难完成我们所需要的各种特殊数据分析处理,这时候,我们有必要使用其它一些方法去完成任务,其中一种方法就是使用UDF(User-defined Functions)即用户自定义函数。

Amazon Redshift 是一种快速、完全托管式 PB 级数据仓库服务,它使得通过现有商业智能工具对您的所有数据进行高效分析变得简单而实惠。Amazon Redshift支持UDF功能,客户可以通过Redshift UDF完成一些面向报表或应用的数据转换工作,与数据湖的中后台处理相互结合;客户也可以把原有的UDF较容易地迁移到Redshift UDF,有效降低迁移成本,例如将基于Java开发的Hive UDF迁移到Redshift UDF。

Amazon Redshift包含了三种UDF实现方式,分别是SQL UDF,Scalar Python UDF以及Lambda UDF。这篇博客主要介绍了Amazon Redshift UDF不同实现方式及其效果,并对比各种实现,帮助客户在不同场景下选择合适的UDF实现方式。

一、SQL UDF的实现

SQL UDF是最简单的一种UDF实现,它通过SQL实现。当业务场景比较简单时,我们可以选择SQL UDF以较少的代码量实现相应的功能。

1. 实现方式

(1)创建函数示例

该示例是简单计算(参数1 + 参数2 * 0.5)/ 2,比如(10 + 18 * 0.5) / 2,结果为9.5.

create function sf_get_avg (float, float)
  returns float immutable as $$
  select ($1 + $2 * 0.5) / 2
$$ language sql;

(2)SQL调用验证

select sf_get_avg(10,18)

(3)返回结果

二、Python UDF 的实现

Python UDF 是通过Redshift内置的python实现UDF,通过python UDF你可以实现各种相对复杂的数据分析处理需求。

1. 实现方式

(1)创建函数示例

CREATE OR REPLACE FUNCTION f_get_avg (in_param_1 float, in_param_2 float) RETURNS float IMMUTABLE as $$ 
    def get_avg(in_param_1, in_param_2):
        import logging
    
        logger = logging.getLogger()
        try:
            avg_result = (in_param_1 + in_param_2 * 0.5) / 2
        except Exception as e:
            logger.error(str(e))

        return avg_result
        
    return get_avg(in_param_1, in_param_2)
$$ LANGUAGE plpythonu;

(2)SQL调用验证

select f_get_avg(10,18)

(3)返回结果

2. 环境配置

2.1 安装依赖包

Redshift Python已经包含了常见的数据处理包(如numpy,pandas),如果需要使用其它依赖包(需要适配于版本python 2.7),可以下载对应包并在压缩后上传S3,然后通过CREATE LIBRARY创建自定义包库,Redshift在后台会把需要安装的包会先放在本地路径(如/rdsdbdata/user_lib/),然后解压到内部依赖包路径(/rdsdbbin/opt/redshift/lib/python2.7/site-packages/)。上述中的CREATE LIBRARY有两种方式实现。

Option 1 – 创建自定义包库SQL代码示例如下:

CREATE LIBRARY fuzzywuzzy LANGUAGE plpythonu FROM 's3://your-redshift-bucket/lib/fuzzywuzzy.zip' CREDENTIALS 'aws_access_key_id=xxx;aws_secret_access_key=xxx';

Option 2 – 使用IM Role作为credentials(这里假设你的IAM Role已配置好相应的权限):

CREATE LIBRARY fuzzywuzzy LANGUAGE plpythonu FROM 's3://your-redshift-bucket/lib/fuzzywuzzy.zip' CREDENTIALS 'aws_iam_role=arn:xxxxxxxx:role/your-lambda-service-role';

对于Redshift Python内置依赖包清单及安装自定义包的详细过程,我们可参考附录“已安装依赖包清单及安装包过程” [1]。

2.2 调试

调试时,我们可通过logging/logger 把日志打印到svl_udf_log里,查看自定义日志信息。查看示例如下:

select * from svl_udf_log order by created desc;

2.3 注意事项

(1)Redshift Python目前使用定制的python 2.7,所有的依赖包也是基于此版本,有些包的function可能在此版本下并不能很好地支持,如遇到此类情况可以考虑使用Lambda UDF或其它形式的代码组合处理。

(2)定制的python 2.7版本与本地部署的环境有差异,部分函数即使在本地部署的python 2.7测试通过,仍需在Redshift Python下测试验证。

(3)涉及到UTF-8编码(如中文处理),最好在python头加入如下标识,避免一些中文处理的异常:

# -*- coding: utf-8 -*-

(4)使用python UDF,有一些限制条件,比如:a. 用户安装的依赖包大小不可超过100 MB;b. 不支持SUPER 和 HLLSKETCH 数据类型等。详细情况请参考“UDF Constraints”[2]

(5)python的数据类型一般没有长度限制,但数据库接收的返回值的数据类型是有长度限制的,过长的返回内容会引起报错。

三、Lambda UDF 的实现

Lambda UDF是通过Redshift外部服务Amazon Lambda实现UDF功能,然后通过CREATE EXTERNAL FUNCTION 关联到对应的Lambda函数。

Lambda UDF在入参以及出参方面,与Python UDF相比有明显的不同,区别如下表所示:

Python UDF Lambda UDF
入参 值(如字符串、数值等) Json格式的字段内容
出参 值(如字符串、数值等) Json格式的字段内容

1. 实现方式

(1)创建函数示例

方式一

我们直接把上述的Python UDF转化成Lambda UDF,代码示例如下:

import json

def get_avg(in_param_1, in_param_2):
    avg_result = 0
    error_msg = ''
    try:
        avg_result = str((in_param_1 + in_param_2 * 0.5) / 2)
    except Exception as e:
        error_msg = str(e)
    return avg_result, error_msg


def lambda_handler(event, context):
    error_msg = ''
    result = list()
    for x in event['arguments']:
        avg_result, error_msg = get_avg(x[0], x[1])
        result.append(avg_result)

    if (error_msg is None) | (error_msg == ''):
        ret_json = json.dumps({"results": result, "success": True})
    else:
        ret_json = json.dumps({"success": False, "error_msg": error_msg})
    return ret_json

方式二

以下代码示例通过pandas进行矢量化数据处理,从而有效地提高处理性能。

import json
import numpy as np
import pandas as pd

def get_avg(list_args):
    avg_result = list()
    error_msg = ''
    try:
        df = pd.DataFrame(list_args, columns = ['in_param_1', 'in_param_2'], dtype=np.float64)
        avg_result = (df['in_param_1'] + df['in_param_2'] * 0.5) / 2
    except Exception as e:
        error_msg = str(e)
    return avg_result.tolist(), error_msg


def lambda_handler(event, context):
    error_msg = ''
avg_result, error_msg = get_avg(event['arguments'])
avg_result = avg_result.astype(str)

    if (error_msg is None) | (error_msg == ''):
        ret_json = json.dumps({"results": avg_result, "success": True})
    else:
        ret_json = json.dumps({"success": False, "error_msg": error_msg})
    return ret_json

(2)关联函数示例

CREATE OR REPLACE EXTERNAL FUNCTION ef_get_avg(NUMERIC, NUMERIC) RETURNS VARCHAR IMMUTABLE
LAMBDA 'ef_get_avg' IAM_ROLE 'arn:xxxxxxxx:role/your-lambda-service-role';

这里假设你的IAM Role已配置好相应的权限

(3)SQL调用验证

select ef_get_avg(10,18)

(4)返回结果

2. 环境配置

2.1 安装依赖包

在亚马逊云科技海外某些区域如us-east-1, ap-northeast-1等,可以直接选用Layers DataWrangler,里面已经包含了很多常用的数据处理依赖包(如pandas, pyarrow, boto3等),具体可参考“Data Wrangler Github”[3]。

其它没有内置DataWrangler的区域(如中国区),如果也想使用此依赖包,可以自行创建对应依赖包并在压缩后加载到custom layer,详细步骤可见“Creating and sharing Lambda Layers” [4]。

2.2 调试

自定义调试或报错信息,可以放入返回结果的error_msg中查看返回错误信息。

在Lambda调试时,可通过Configure test event进行单元测试,示例如下:

2.3 注意事项

(1)Lambda python runtime基于python 3,默认整个处理都是基于UTF-8,无需如Python UDF使用 # -*- coding: UTF-8 -*-。

(2)输入参数从Redshift传递到Lambda时,使用Json 格式的event,从Lambda返回Redshift亦如此,详细的Json格式可参考“Creating a Lambda UDF” [5]。

(3)以下语句可查看已创建的UDF清单:

select * from pg_proc_info where prokind='f' and proname like 'ef%'; -- ef

在这里代表以ef开头的lambda UDF(即External Function)

四、三种UDF的比较

以上三种Redshift UDF实现,各有其优劣势及其适合的场景,在这里对它们进行简单的对比:

SQL UDF Python UDF Lambda UDF
优势

* 最为方便构建

* 仅需熟悉SQL

* 可关联访问其它Redshift表

* 方便构建

* 较为灵活,无需Redshift外其它服务

* 支持更多的开发语言(如:Python, Java, Go, Node.js, C#/PowerShell, Ruby等)

* 对于python实现,基于更高的python 3版本

* 输入是整个字段而不是一个值,可使用更高效的处理方式

* 可方便访问其它亚马逊云科技服务(如基于boto3依赖包)

* 不同region的并发约束有所不同,但一般可达到500以上并发

劣势 * 限于SQL不能很好应对过于复杂的数据转换

* 由于Redshift python版本限制,有些函数不能支持(如boto3,decode等)

* 用户安装的依赖包大小不能超过100 MB
* 并行运行的UDF数不能超过Redshift集群节点数的1/4(如15个节点只能3个UDF同时运行)
* 不支持Redshift的SUPER 和 HLLSKETCH数据类型

* 需要考虑Redshift外的Lambda服务成本
适合场景

* 简单的场景(如:数值计算、字符串简单处理等)

* 需要访问其它表(如:多表关联)

* SQL难以处理的场景(如复杂运算、特定类型或hash转换、半结构化解析处理等) * 复杂场景(如需要引入较多的依赖包、需使用boto3依赖包访问亚马逊云科技服务、特定编码转换、需要较高的并发运行等)

结论

通过本文,你可以了解三种Redshift UDF的不同实现方法及其适合的场景。上述的示例代码可以帮助客户快速实现Redshift UDF交付,支持各种复杂数据分析场景。

如你有其它的问题、建议或经验,欢迎留言给我们。

参考

[1]已安装依赖包清单及安装包过程 – https://docs.thinkwithwp.com/redshift/latest/dg/udf-python-language-support.html

[2] UDF Constraints of scalar Python UDF – https://docs.thinkwithwp.com/redshift/latest/dg/udf-constraints.html

[3] Data Wrangler Github – https://github.com/awslabs/aws-data-wrangler

[4] Creating and sharing Lambda Layers – https://docs.thinkwithwp.com/lambda/latest/dg/configuration-layers.html

[5] Creating a Lambda UDF – https://docs.thinkwithwp.com/redshift/latest/dg/udf-creating-a-lambda-sql-udf.html

本篇作者

孔庆强

亚马逊云科技专业服务团队的大数据顾问,十多年从事数据湖仓及数据分析,为客户提供数据建模、数据治理及整体专业数据解决方案。在个人爱好方面,喜爱健身、音乐、旅行。

赵鑫

亚马逊云科技专业服务团队数据架构师,专注于生命科学、自动驾驶领域的数据架构与数据分析

李烨炜

亚马逊云科技专业服务团队大数据咨询顾问。专注于企业级客户云上数据架构与数据平台设计等相关咨询服务。