Flink 入门案例:Flink 流处理案例

| 分类 大数据之spark  大数据之flink  | 标签 Spark  大数据  数据处理  Flink  Scala  Maven  Kafka  流处理  数据分析  Sink  算子 

首先引入对应的Maven 依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.1</version>
</dependency>
<!-- 需要flink-clients,否则Eclipse下启动报错 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.12.1</version>
</dependency>

直接看一个消费Kafka 消息,简单转换后输出的程序案例

package com.xum.flinks


import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


object FlinkStreamingExample 
{
  def main(args: Array[String]) 
  {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 开启checkpoint,时间间隔为毫秒
    env.enableCheckpointing(5000L);
    
    // 选择状态后端
    // env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
    // env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
    
    // Kafka 集群信息
    val properties: Properties = new Properties
    properties.setProperty("bootstrap.servers", "99.13.223.58:9092");
    properties.setProperty("group.id", "TestGroupId");
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    // 指定kafka的消费者从哪里开始消费数据
    // 共有三种方式,
    // #earliest
    // 当各分区下有已提交的offset时,从提交的offset开始消费;
    // 无提交的offset时,从头开始消费
    // #latest
    // 当各分区下有已提交的offset时,从提交的offset开始消费;
    // 无提交的offset时,消费新产生的该分区下的数据
    // #none
    // topic各分区都存在已提交的offset时,
    // 从offset后开始消费;
    // 只要有一个分区不存在已提交的offset,则抛出异常
    properties.put("auto.offset.reset", "latest");
    
    // 创建消费者
    val consumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer("TEST_TOPIC", new SimpleStringSchema(), properties)
    
    // 设置checkpoint后在提交offset,即oncheckpoint模式。该值默认为true,
    consumer.setCommitOffsetsOnCheckpoints(true)
    
    // 隐式转换解决报错
    // could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
    // not enough arguments for method addSource: (implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence$9.
    import org.apache.flink.api.scala._
    val source: DataStream[String] = env.addSource(consumer);
    
    // map 转换(解密)
    val textStream = source.map(record => {
                                            val result = "转换后的字符串: " + record 
                                            result
                                          }
                               )
    
    // 输出示例
    textStream.print()
    
    // Flink没有Spark 中的foreach算子
    textStream.addSink(str => {
      println(str)
    })
    
    // 执行
    env.execute("Flink Kafka Example")
  }

}



如果本篇文章对您有所帮助,您可以通过微信(左)或支付宝(右)对作者进行打赏!


上一篇     下一篇