大數據IMF傳奇行動絕密課程第72課:Spark SQL UDF和UDAF解密與實戰


第72課:Spark SQL UDF和UDAF解密與實戰

/**
* scala代碼
*/

package com.tom.spark.sql

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
* UDF:User Defined Function, 用戶自定義的函數,函數的輸入是一條具體的數據記錄,實現上講就是普通的scala函數;
* UDAF:User Defined Aggregation Function, 用戶自定義的聚合函數,函數本身作用於數據集合,能夠在聚合操作的基礎上進行自定義操作;
* 實質上講,例如說UDF會被Spark SQL中的catalyst封裝成為expression,最終會通過eval方法來計算輸入的輸入Row,此處的Row和DataFrame
* 中的Row沒有任何關系
*/

object SparkSQLUDFUDAF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("SparkSQLUDFUDAF")
val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

//模擬實際使用的數據
val bigData = Array("Spark", "Spark", "Hadoop", "spark", "Hadoop", "spark", "Hadoop", "Hadoop", "spark", "spark")

/**
* 基於提供的數據創建DataFrame
*/

val bigDataRdd = sc.parallelize(bigData)
val bigDataRDDRow = bigDataRdd.map(item => {Row(item)})

val structType = StructType(Array(
new StructField("word", StringType, true)
))
val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow, structType)

bigDataDF.registerTempTable("bigDataTable") //注冊成為臨時表

/**
* 通過SQLContext注冊UDF,在Scala 2.10.x版本UDF函數最多可以接收22個輸入參數
*/

sqlContext.udf.register("computeLength", (input: String) => input.length)

//直接在sql中使用udf,就像使用SQL自帶的內部函數一樣
sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show

sqlContext.udf.register("wordcount", new MyUDAF)

sqlContext.sql("select word, wordcount(word) as count,computeLength(word) as length " +
"from bigDataTable group by word").show

// while(true){}

}
}

/**
* 按照模板實現UDAF
*/

class MyUDAF extends UserDefinedAggregateFunction {
/**
* 該方法指定具體輸入數據的類型
* @return
*/

override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))

/**
* 在進行聚合操作的時候所要處理的數據的結果的類型
* @return
*/

override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))

/**
* 指定UDAF函數計算后返回的結果類型
* @return
*/

override def dataType: DataType = IntegerType

/**
* 確保一致性,一般都用true
* @return
*/

override def deterministic: Boolean = true

/**
* 在Aggregate之前每組數據的初始化結果
* @param buffer
*/

override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0 }

/**
* 在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算
* 本地的聚合操作,相當於Hadoop MapReduce模型中的Combiner
* @param buffer
* @param input
*/

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + 1
}

/**
* 最后在分布式節點進行Local Reduce完成后需要進行全局級別的Merge操作
* @param buffer1
* @param buffer2
*/

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}

/**
* 返回UDAF最后的計算結果
* @param buffer
* @return
*/

override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com