首先引入对应的Maven 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.1</version>
</dependency>
<!-- 需要flink-clients,否则Eclipse下启动报错 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.1</version>
</dependency>
直接看一个消费Kafka 消息,简单转换后输出的程序案例
package com.xum.flinks
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object FlinkStreamingExample
{
def main(args: Array[String])
{
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 开启checkpoint,时间间隔为毫秒
env.enableCheckpointing(5000L);
// 选择状态后端
// env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
// env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
// Kafka 集群信息
val properties: Properties = new Properties
properties.setProperty("bootstrap.servers", "99.13.223.58:9092");
properties.setProperty("group.id", "TestGroupId");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 指定kafka的消费者从哪里开始消费数据
// 共有三种方式,
// #earliest
// 当各分区下有已提交的offset时,从提交的offset开始消费;
// 无提交的offset时,从头开始消费
// #latest
// 当各分区下有已提交的offset时,从提交的offset开始消费;
// 无提交的offset时,消费新产生的该分区下的数据
// #none
// topic各分区都存在已提交的offset时,
// 从offset后开始消费;
// 只要有一个分区不存在已提交的offset,则抛出异常
properties.put("auto.offset.reset", "latest");
// 创建消费者
val consumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer("TEST_TOPIC", new SimpleStringSchema(), properties)
// 设置checkpoint后在提交offset,即oncheckpoint模式。该值默认为true,
consumer.setCommitOffsetsOnCheckpoints(true)
// 隐式转换解决报错
// could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
// not enough arguments for method addSource: (implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence$9.
import org.apache.flink.api.scala._
val source: DataStream[String] = env.addSource(consumer);
// map 转换(解密)
val textStream = source.map(record => {
val result = "转换后的字符串: " + record
result
}
)
// 输出示例
textStream.print()
// Flink没有Spark 中的foreach算子
textStream.addSink(str => {
println(str)
})
// 执行
env.execute("Flink Kafka Example")
}
}