1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Spark SQL自定义函数

Spark SQL自定义函数

时间:2023-10-13 15:47:37

相关推荐

Spark SQL自定义函数

文章目录

自定义函数分类自定义UDF自定义UDAF[了解]

自定义函数分类

类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能。

spark中的自定义函数有如下3类

1.UDF(User-Defined-Function)

输入一行,输出一行2.UDAF(User-Defined Aggregation Funcation)

输入多行,输出一行3.UDTF(User-Defined Table-Generating Functions)

输入一行,输出多行

自定义UDF

需求

有udf.txt数据格式如下:

Hello abc study small

通过自定义UDF函数将每一行数据转换成大写

select value,smallToBig(value) from t_word

代码演示

import org.apache.spark.SparkContextimport org.apache.spark.sql.{Dataset, SparkSession}object UDFDemo {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()//2创建scval sc: SparkContext = spark.sparkContext//sc.setLogLevel("WARN")//3.读取文件val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\udf.txt")fileDS.show()/*+----------+|value|+----------+|helloworld|| abc||study|| smallWORD|+----------+*//*将每一行数据转换成大写select value,smallToBig(value) from t_word*///注册一个函数名称为smallToBig,功能是传入一个String,返回一个大写的Stringspark.udf.register("smallToBig",(str:String) => {根据业务逻辑对数据进行处理str.toUpperCase()//str.length*10/2/2.toDouble})//创建表fileDS.createOrReplaceTempView("t_word")//使用我们自己定义的函数spark.sql("select value,smallToBig(value) from t_word").show()/*+----------+---------------------+|value|UDF:smallToBig(value)|+----------+---------------------+|helloworld| HELLOWORLD|| abc| ABC||study|STUDY|| smallWORD| SMALLWORD|+----------+---------------------+*/sc.stop()spark.stop()}}

自定义UDAF[了解]

需求

udaf.json数据内容如下

{"name":"Michael","salary":3000}{"name":"Andy","salary":4500}{"name":"Justin","salary":3500}{"name":"Berta","salary":4000}

求取平均工资

继承UserDefinedAggregateFunction方法重写说明

inputSchema:输入数据的类型bufferSchema:产生中间结果的数据类型dataType:最终返回的结果类型deterministic:确保一致性,一般用trueinitialize:指定初始值update:每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)merge:全局聚合(将每个分区的结果进行聚合)evaluate:计算最终的结果

代码演示

import org.apache.spark.SparkContextimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types._import org.apache.spark.sql.{DataFrame, Row, SparkSession}object UDAFDemo {class MyUDAF extends UserDefinedAggregateFunction{//1.输入的数据类型的schemaoverride def inputSchema: StructType = {StructType(List(StructField("input",LongType)))//两种都一样//StructType(StructField("input",LongType)::Nil) }//2.缓冲区数据类型schema,就是转换之后的数据的schemaoverride def bufferSchema: StructType = {//设置缓存中间结果数据//sum : 每次的临时的和 total: 临时的总次数 StructType(List(StructField("sum",LongType),StructField("total",LongType)))}//3.最终的返回值的数据类型override def dataType: DataType = {DoubleType}//4.确定是否相同的输入会有相同的输出override def deterministic: Boolean = {true}//5.初始化内部数据结构 --> 设置X 个变量 每个变量进行初始化数据override def initialize(buffer: MutableAggregationBuffer): Unit = {//算法总和/总数// buffer(0)用于记录临时的数据和buffer(0) = 0L//buffer(1)用于记录临时的数据条数buffer(1) = 0L}/*//分区1{"name":"Michael","salary":3000}{"name":"Andy","salary":4500}//分区2{"name":"Justin","salary":3500}{"name":"Berta","salary":4000}*///List(1,2,3,4,5,6).reduce((a,b)=>a+b)/*1 a=1 b=22 a=3 b=33 a=6 b=44 a=10 b=556*///6.更新数据内部结构,区内计算 --> RDD 中含有多个分区 update是计算一个分区内的数据override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//buffer.getLong(0) 上次缓存的数据//input.getLong(0) 最新输入的数据//(临时输入的总金额)所有的金额相加buffer(0) = buffer.getLong(0) + input.getLong(0)//(临时输入的总数量)一共有多少条数据buffer(1) = buffer.getLong(1) + 1}//7.来自不同分区的数据进行合并,全局合并 --> 上面的update有几个rdd分区就会有几个最终结果override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {//累加第一个分区金额总和 与第二个分区金额总=最终的中金额buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)//累加第一个分区次数 与第二个分区次数=最终的总次数buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//8.计算输出数据平均值override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble / buffer.getLong(1)}}/**main方法入口*/def main(args: Array[String]): Unit = {//1.获取sparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()// val sc: SparkContext = spark.sparkContext//sc.setLogLevel("WARN")//2.读取文件val employeeDF: DataFrame = spark.read.json("D:\\data\\udaf.json")//3.创建临时表employeeDF.createOrReplaceTempView("t_employee")//4.注册UDAF函数spark.udf.register("myavg",new MyUDAF)//5.使用自定义UDAF函数 -->查看薪水spark.sql("select myavg(salary) from t_employee").show()//6.使用内置的avg函数 -->查看平均工资spark.sql("select avg(salary) from t_employee").show()//关闭spark.stop()}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。