本文共 3929 字,大约阅读时间需要 13 分钟。
我们当然可以通过SQL的 UDF函数等来完成字符串解析,在streamingpro中也很简单,只要注册下你的UDF函数库即可:
"udf_register": { "desc": "测试", "strategy": "....SparkStreamingRefStrategy", "algorithm": [], "ref": [], "compositor": [ { "name": "...SQLUDFCompositor", "params": [ { "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions" } ] } ] }
这样你就可以在SQL中使用MLfunctions里面所有的udf函数了。然而为此专门提供一个jar包也是略显麻烦。
这个时候如果能直接写脚本解析就好了,最好是能支持各种脚本,比如groovy,javascript,python,scala,java等。任何一个会编程的人都可以实现一个比较复杂的解析逻辑。 核心是ScriptCompositor模块:{ "name": "...ScriptCompositor", "params": [ { "inputTableName": "test", "outputTableName": "test3" }, { "raw": [ "val Array(a,b)=rawLine.split(\"\t\");", "Map(\"a\"->a,\"b\"->b)" ] } ] }如果我想在代码里直接处理所有的列,则如下:
{ "name": "streaming.core.compositor.spark.transformation.ScriptCompositor", "params": [ { "inputTableName": "test2", "outputTableName": "test3", "useDocMap": true }, { "anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)" } ]}通过添加useDocMap为true,则你在代码里可以通过doc(doc是个Map[String,Any]) 来获取你想要的任何字段,然后形成一个新的Map。 如果你只要新生成Map里的字段,忽略掉旧的,则设置ignoreOldColumns=true 即可。 你可以把代码放到一个文件里,如下:
{ "name": "....ScriptCompositor", "params": [ { "inputTableName": "test", "outputTableName": "test3" }, { "raw": "file:///tmp/raw_process.scala" } ] }通过inputTableName指定输入的表,outputTableName作为输出结果表。 raw代表inputTableName中你需要解析的字段,然后通过你的scala脚本进行解析。在脚本中 rawLine 是固定的,对应raw字段(其他字段也是一样)的值。脚本只有一个要求,最后的返回结果暂时需要是个Map[String,Any]。 这里,你只是提供了一个map作为返回值,作为一行,然后以outputTableName指定的名字输出,作为下一条SQL的输入,所以StreamingPro需要推测出你的Schema。 数据量大到一定程度,推测Schema的效率就得不到保证,这个时候,你可以通过配置schema来提升性能:
{ "name": "....ScriptCompositor", "params": [ { "inputTableName": "test", "outputTableName": "test3", "schema": "file:///tmp/schema.scala", "useDocMap": true }, { "raw": "file:///tmp/raw_process.scala" } ] }schema.scala的内容大致如下:
Some(StructType(Array(StructField("a", StringType, true),StructField("b", StringType, true))))后续roadmap是:
举个案例,从HDFS读取一个文件,并且映射为只有一个raw字段的表,接着通过ScriptCompositor配置的scala代码解析raw字段,展开成a,b两个字段,然后继续用SQL继续处理,最后输出。
{ "convert_data_parquet": { "desc": "测试", "strategy": "...SparkStreamingStrategy", "algorithm": [], "ref": [], "compositor": [ { "name": "...SQLSourceCompositor", "params": [ { "path": "file:///tmp/hdfsfile", "format": "org.apache.spark.sql.execution.datasources.hdfs", "fieldName": "raw" } ] }, { "name": "...JSONTableCompositor", "params": [ { "tableName": "test" } ] }, { "name": "...ScriptCompositor", "params": [ { "inputTableName": "test", "outputTableName": "test3" }, { "raw": [ "val Array(a,b)=rawLine.split(\"\t\");", "Map(\"a\"->a,\"b\"->b)" ] } ] }, { "name": "...transformation.SQLCompositor", "params": [ { "sql": "select a,b from test3 " } ] }, { "name": "...streaming.core.compositor.spark.output.SQLUnitTestCompositor", "params": [ { } ] } ], "configParams": { } }}体验地址:
转载地址:http://potpx.baihongyu.com/