1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Hive 之 用户自定义函数 UDF UDAF UDTF

Hive 之 用户自定义函数 UDF UDAF UDTF

时间:2021-04-30 14:28:42

相关推荐

Hive 之 用户自定义函数 UDF UDAF UDTF

一 什么是UDF

UDF是UserDefined Function 用户自定义函数的缩写。Hive中除了原生提供的一些函数之外,如果还不能满足我们当前需求,我们可以自定义函数。

除了UDF 之外,我们还可以定义聚合函数UDAF 和 Table-Generating函数

二 如何创建UDF函数

2.1编写JAVA类,需要继承UDF类或者GenericUDF

一般需要返回简单数据类型的,继承UDF就可以,然后实现evaluate方法;如果类型稍微复杂的可以使用GenericUDF,然后实现initializegetDisplayString evaluate方法。

publicclassUDFStripDoubleQuotesextendsUDF{

private staticfinalStringDOUBLE_QUOTES="\"";

private staticfinalStringBLANK_SYMBOL="";

publicText evaluate (Text text)throwsUDFArgumentException{

if(null==text ||BLANK_SYMBOL.equals(text)) {

throw newUDFArgumentException("The function STRIP_DOUBLE_QUOTES(s) takes exactly 1arguments.");

}

Stringtemp = text.toString().trim();

if(temp.startsWith(DOUBLE_QUOTES) ||temp.endsWith(DOUBLE_QUOTES)) {

temp = temp.replace(DOUBLE_QUOTES,BLANK_SYMBOL);

}

return newText(temp);

}

}

2.2编译这个java类并打成jar包

2.3在hive中添加jar包

hive(hadoop)> add jar /opt/data/UDFStripDoubleQuotes.jar;

Added/opt/data/UDFStripDoubleQuotes.jar to class path

Addedresource: /opt/data/UDFStripDoubleQuotes.jar

2.4创建临时函数和永久函数

2.4.1创建临时函数

语法:CREATETEMPORARY FUCNTION strip_double_quotes

AS' com.hive.udf.UDFStripDoubleQuotes';

2.4.2创建永久函数

CREATEFUNCTION [db_name.]function_name AS class_name

[USINGJAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];

说白了其实就是把jar放到HDFS上,然后指定这个函数是哪一个数据库的,然后跟一个URL,这一个url就是你jar包所放的那个HDFS目录

举个例子:

CREATEFUNCTION hadoop09.lu_str AS 'com.hive.udf.LowerAnd

UpperUDF'USING JAR 'hdfs:/var/hive/udf/lowerOrUpper.jar';

2.5测试

准备测试数据:/opt/data/quotes.txt

"10" "ACCOUNTING" "NEW YORK"

"20" "RESEARCH" "DALLAS"

"30" "SALES" 'CHICAGO'

"40" "OPERATIONS" 'BOSTON'

Hive这边:

CREATETABLE t_dept LIKE dept;

LOADDATA LOCAL INPATH '/opt/data/quotes.txt' INTO TABLE t_dept;

SELECTstrip_double_quotes(dname) name, strip_double_quotes(loc) loc FROM t_dept;

运行结果:

name loc

ACCOUNTING NEW YORK

RESEARCH DALLAS

SALES 'CHICAGO'

OPERATIONS 'BOSTON'

三 如何创建UDAF函数

继承AbstractGenericUDAFAverageEvaluator,并且继承Generic

UDAFEvaluator。GenericUDAFEvaluator就是根据job不同的阶段执行不同的方法。Hive通过GenericUDAFEvaluator.Model来确定job的执行阶段。

那有哪些阶段呢?

PARTIAL1:从原始数据到部分聚合,会调用iterate,terminatePartial方法 -->map的输入 到 输出

PARTIAL2:从部分数据聚合和部分数据聚合,会调用merge和terminatePartial--> map的输出 到reduce输入

FINAL: 从部分数据聚合到全部数据聚合,调用merge和 terminate方法 -->Reduce输入到输出

COMPLETE: 从原始数据到全部数据聚合,会调用iterate和 terminate方法;没有reduce阶段,只有map阶段

有几个注意点:

如果要聚合的数据量比较大,我们需要注意内存是否够,很容易出现内存溢出的问题;

尽可能重用对象,尽量避免new对象,尽量减轻JVM垃圾回收的过程。

publicclassUDAFAddextendsAbstractGenericUDAFResolver{

static finalLogLOG= LogFactory.getLog(UDAFAdd.class.getName());

@Override

publicGenericUDAFEvaluator getEvaluator(TypeInfo[]arguments)throwsSemanticException {

//check arguments length

if(arguments.length != 1) {

throw newUDFArgumentException("Exactly one argument is expected.");

}

//check if arguments data type is primitive

if(arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

throw newUDFArgumentException("Argument is not expected.");

}

switch(((PrimitiveTypeInfo)arguments[0]).getPrimitiveCategory()) {

caseBYTE:

caseSHORT:

caseINT:

caseLONG:

return newUDAFAddLong();

caseFLOAT:

caseDOUBLE:

return newUDAFAddDouble();

default:

throw newUDFArgumentException("Only numeric or string type argumentsare accepted but "

+arguments[0].getTypeName() + " is passed.");

}

}

public staticclassUDAFAddDoubleextendsGenericUDAFEvaluator{

privatePrimitiveObjectInspectorinputOI;

privateDoubleWritable result;

//invoke the INIT method on the each stage

@Override

publicObjectInspector init(Modemode, ObjectInspector[] arguments)throwsHiveException {

super.init(mode,arguments);

//INIT double value

result =newDoubleWritable(0);

inputOI =(DoubleObjectInspector)arguments[0];

returnPrimitiveObjectInspectorFactory.writableDoubleObjectInspector;

}

/**

* it is used for storing aggregation resultduring the process of aggregation

*@authornickyzhang

*

*/

static classAddDoubleAggextendsAbstractAggregationBuffer{

booleanempty;

doublesum;

@Override

public intestimate() {

returnJavaDataModel.PRIMITIVES1+ JavaDataModel.PRIMITIVES2;

}

}

/**Get a new aggregation object*/

@Override

publicAggregationBuffergetNewAggregationBuffer()throwsHiveException {

AddDoubleAggaddDoubleAgg =newAddDoubleAgg();

reset(addDoubleAgg);

returnaddDoubleAgg;

}

/** Reset the aggregation. This is useful if we want toreuse the same aggregation. */

@Override

public voidreset(AggregationBufferagg)throwsHiveException {

AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;

addDoubleAgg.empty = Boolean.TRUE;

addDoubleAgg.sum = 0;

}

/** Iterate through original data.*/

@Override

public voiditerate(AggregationBufferagg, Object[] arguments)throwsHiveException {

if(arguments.length != 1) {

throw newUDFArgumentException("Just one argument expected!");

}

this.merge(agg,arguments);

}

/** Get partial aggregation result.*/

@Override

publicObjectterminatePartial(AggregationBufferagg)throwsHiveException {

returnterminate(agg);

}

/**Combiner or Reduce merge themapper*/

@Override

public voidmerge(AggregationBufferagg, Object partial)throwsHiveException {

if(partial ==null) {

return;

}

AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;

addDoubleAgg.empty =false;

addDoubleAgg.sum += PrimitiveObjectInspectorUtils.getDouble(partial,inputOI);

}

/** Get final aggregation result */

@Override

publicObjectterminate(AggregationBufferagg)throwsHiveException {

AddDoubleAggaddDoubleAgg = (AddDoubleAgg)agg;

if(addDoubleAgg.empty) {

return null;

}

result.set(addDoubleAgg.sum);

returnresult;

}

}

public staticclassUDAFAddLongextendsGenericUDAFEvaluator{

privatePrimitiveObjectInspectorinputOI;

privateLongWritableresult;

//invoke the INIT method on the each stage

@Override

publicObjectInspector init(Modemode, ObjectInspector[] arguments)throwsHiveException {

super.init(mode,arguments);

//INIT double value

result =newLongWritable(0);

inputOI =(LongObjectInspector)arguments[0];

returnPrimitiveObjectInspectorFactory.writableLongObjectInspector;

}

/**

* it is used for storing aggregation resultduring the process of aggregation

*@authornickyzhang

*

*/

static classAddLongAggextendsAbstractAggregationBuffer{

booleanempty;

longsum;

@Override

public intestimate() {

returnJavaDataModel.PRIMITIVES1+ JavaDataModel.PRIMITIVES2;

}

}

/**Get a new aggregation object*/

@Override

publicAggregationBuffergetNewAggregationBuffer()throwsHiveException {

AddLongAggaddLongAgg =newAddLongAgg();

reset(addLongAgg);

returnaddLongAgg;

}

/** Reset the aggregation. This is useful if we want toreuse the same aggregation. */

@Override

public voidreset(AggregationBufferagg)throwsHiveException {

AddLongAggaddLongAgg = (AddLongAgg)agg;

addLongAgg.empty = Boolean.TRUE;

addLongAgg.sum = 0;

}

/** Iterate through original data.*/

@Override

public voiditerate(AggregationBufferagg, Object[] arguments)throwsHiveException {

if(arguments.length != 1) {

throw newUDFArgumentException("Just one argument expected!");

}

this.merge(agg,arguments);

}

/** Get partial aggregation result.*/

@Override

publicObjectterminatePartial(AggregationBufferagg)throwsHiveException {

returnterminate(agg);

}

/**Combiner or Reduce merge themapper*/

@Override

public voidmerge(AggregationBufferagg, Object partial)throwsHiveException{

if(partial ==null) {

return;

}

AddLongAggaddLongAgg = (AddLongAgg)agg;

addLongAgg.empty =false;

addLongAgg.sum += PrimitiveObjectInspectorUtils.getDouble(partial,inputOI);

}

/** Get final aggregation result */

@Override

publicObjectterminate(AggregationBufferagg)throwsHiveException {

AddLongAggaddLongAgg = (AddLongAgg)agg;

if(addLongAgg.empty) {

return null;

}

result.set(addLongAgg.sum);

returnresult;

}

}

}

四 如何创建UDTF函数

一般用于解析工作,比如说解析url,然后获取url信息,需要继承GenericUDTF.

publicclassUDTFEmailextendsGenericUDTF{

@Override

publicStructObjectInspector initialize(StructObjectInspectorinspector)throwsUDFArgumentException {

if(inspector ==null) {

throw newUDFArgumentException("arguments is null");

}

Listargs = inspector.getAllStructFieldRefs();

if(CollectionUtils.isEmpty(args) ||args.size()!= 1){

throw newUDFArgumentException("UDF tables only one argument");

}

List<String>fields =newArrayList<String>();

fields.add("name");

fields.add("email");

List<ObjectInspector>fieldIOList =newArrayList<ObjectInspector>();

fieldIOList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldIOList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

returnObjectInspectorFactory.getStandardStructObjectInspector(fields,fieldIOList);

}

@Override

public voidprocess(Object[]args)throwsHiveException {

if(ArrayUtils.isEmpty(args) ||args.length != 1) {

return;

}

Stringname = args[0].toString();

Stringemail = name +"@";

super.forward(newString[] {name,email});

}

@Override

public voidclose()throwsHiveException {

super.forward(newString[] {"complete","finish"});

}

}

五UDF UDAF UDTF 区别

UDF:一进一出

UDAF:多进一出,一般聚合用

UDTF:一进多出

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。