首先要引入Maven 依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.11</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version>
</dependency>
如果有版本冲突,目前我的解决方案是把spark-sftp_2.11 的代码拷贝到项目中,然后可以任意修改代码满足项目需求。比如这里我就把spark-sftp_2.11 的代码拷贝到项目的
com.xum.example.plugins.sftp包路径下
SFTP 中测试文件的内容是(流水号|日期|时间|状态|金额|描述|预留字段)
seq0001|20220408|101010|S|10.00||
seq0002|20220408|101011|F|11.00||
下面案例具体展示spark-sftp、SparkSQL、RDD、DataFrame 的用法
object SparkSftpExample
{
// 主流程
def main(args: Array[String])
{
// 初始化Spark 环境
val sparkSession = SparkSession.builder().appName("SparkSftpExample").getOrCreate()
// 按照txt 格式将SFTP 上的文件转换成DataFrame
val datDF = sparkSession.read.
format("com.xum.example.plugins.sftp"). // spark-sftp 插件对应的包路径
option("host", "99.14.143.24"). // sftp 的地址
option("port", "22"). // sftp 的端口
option("username", "xumenger"). // sftp 的用户名
option("password", "testPasswd"). // sftp 的登录密码
option("fileType", "txt"). // 文件的格式
option("hdfsTempLocation", "/user/xum/temp/"). // sftp 文件下载临时存储到HDFS 的哪个路径,这个目录要提前创建
load("/home/test/xum/TradeData.txt") // 下载sftp 服务器上的哪个文件
// DataFrame 转换成RDD
val rowRdd = datDF.rdd
// RDD[ROW] 转换成RDD[String]
val strRdd: RDD[String] = rowRdd.map(_.mkString(""))
// 按照<\\|>分割每行数据
val arrayRdd: RDD[Array[String]] = strRdd.map(str => str.split("\\|"))
// 键值
val keys = Array("TrxSeq", "TrxDat", "TrxTim", "TrxSts", "TrxAmt", "TrxDsc", "Spc120")
// 按照键值转换成Map。下面的JSONObject 将Map 转成json,要求是mutable.Map
val mapRdd: RDD[Map[String,String]] = arrayRdd.map(arr => {
var map: Map[String, String] = Map()
for (i <- 0 until keys.length) {
if (i >= arr.length)
// 考虑.txt 数据文件最后几个字段为空,在解析的时候被移除的情况
map += (keys(i) -> "")
else
map += (keys(i) -> arr(i))
}
map
})
// key、value转换成JSON格式
val jsonRdd: RDD[String] = mapRdd.map(map => {
val json = new JSONObject(map)
val jsonString = json.toString()
jsonString
})
// json RDD 转换成DataFrame
val tblDF = sparkSession.read.json(jsonRdd)
// 创建临时视图
tblDF.createOrReplaceTempView("TRADE_TABLE")
// 执行SQL
val retDF = sparkSession.sql("select TrxSeq, TrxDat, TrxTim, TrxSts, TrxAmt from TRADE_TABLE")
// 保存执行结果
val retRowRdd = retDF.rdd
retRowRdd.foreach(println(_))
}
}
输出结果为
[seq0001,20220408,101010,S,10.00]
[seq0002,20220408,101011,F,11.00]