spark-sftp 插件使用说明

| 分类 大数据之spark  | 标签 Spark  大数据  SFTP  数据处理  Maven  Spark插件  HDFS  文件  SQL  SparkSQL  DataFrame  RDD 

首先要引入Maven 依赖

<dependency>
  <groupId>com.springml</groupId>
  <artifactId>spark-sftp_2.11</artifactId>
  <version>1.1.3</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.12</artifactId>
  <version>3.1.1</version>
</dependency>

如果有版本冲突,目前我的解决方案是把spark-sftp_2.11 的代码拷贝到项目中,然后可以任意修改代码满足项目需求。比如这里我就把spark-sftp_2.11 的代码拷贝到项目的com.xum.example.plugins.sftp 包路径下

SFTP 中测试文件的内容是(流水号|日期|时间|状态|金额|描述|预留字段)

seq0001|20220408|101010|S|10.00||
seq0002|20220408|101011|F|11.00||

下面案例具体展示spark-sftp、SparkSQL、RDD、DataFrame 的用法

object SparkSftpExample 
{
  // 主流程
  def main(args: Array[String]) 
  {
    // 初始化Spark 环境
    val sparkSession = SparkSession.builder().appName("SparkSftpExample").getOrCreate()
    
    // 按照txt 格式将SFTP 上的文件转换成DataFrame
    val datDF = sparkSession.read.
                    format("com.xum.example.plugins.sftp").                 // spark-sftp 插件对应的包路径
                    option("host", "99.14.143.24").                         // sftp 的地址
                    option("port", "22").                                   // sftp 的端口
                    option("username", "xumenger").                         // sftp 的用户名
                    option("password", "testPasswd").                       // sftp 的登录密码
                    option("fileType", "txt").                              // 文件的格式
                    option("hdfsTempLocation", "/user/xum/temp/").          // sftp 文件下载临时存储到HDFS 的哪个路径,这个目录要提前创建
                    load("/home/test/xum/TradeData.txt")                    // 下载sftp 服务器上的哪个文件
                    
    // DataFrame 转换成RDD
    val rowRdd = datDF.rdd
    
    // RDD[ROW] 转换成RDD[String]
    val strRdd: RDD[String] = rowRdd.map(_.mkString(""))
    
    // 按照<\\|>分割每行数据
    val arrayRdd: RDD[Array[String]] = strRdd.map(str => str.split("\\|"))
    
    // 键值
    val keys = Array("TrxSeq", "TrxDat", "TrxTim", "TrxSts", "TrxAmt", "TrxDsc", "Spc120")
    
    // 按照键值转换成Map。下面的JSONObject 将Map 转成json,要求是mutable.Map
    val mapRdd: RDD[Map[String,String]] = arrayRdd.map(arr => {
      var map: Map[String, String] = Map()
      for (i <- 0 until keys.length) {
        if (i >= arr.length)
          // 考虑.txt 数据文件最后几个字段为空,在解析的时候被移除的情况
          map += (keys(i) -> "")
        else 
          map += (keys(i) -> arr(i))
      }
      map
    })
    
    // key、value转换成JSON格式
    val jsonRdd: RDD[String] = mapRdd.map(map => {
      val json = new JSONObject(map)
      val jsonString = json.toString()
      jsonString
    })
    
    // json RDD 转换成DataFrame
    val tblDF = sparkSession.read.json(jsonRdd)
    
    // 创建临时视图
    tblDF.createOrReplaceTempView("TRADE_TABLE")
    
    // 执行SQL
    val retDF = sparkSession.sql("select TrxSeq, TrxDat, TrxTim, TrxSts, TrxAmt from TRADE_TABLE")
    
    // 保存执行结果
    val retRowRdd = retDF.rdd
    retRowRdd.foreach(println(_))
  }
  
}

输出结果为

[seq0001,20220408,101010,S,10.00]
[seq0002,20220408,101011,F,11.00]



如果本篇文章对您有所帮助,您可以通过微信(左)或支付宝(右)对作者进行打赏!


上一篇     下一篇