一、版本说明 Spark针对Kafka的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8
和spark-streaming-kafka-0-10
,其主要区别如下:
spark-streaming-kafka-0-8
spark-streaming-kafka-0-10
Kafka版本
0.8.2.1 or higher
0.10.0 or higher
AP状态
Deprecated 从Spark 2.3.0版本开始,Kafka 0.8支持已被弃用
Stable(稳定版)
语言支持
Scala, Java, Python
Scala, Java
Receiver DStream
Yes
No
Direct DStream
Yes
Yes
SSL / TLS Support
No
Yes
Offset Commit API(偏移量提交)
No
Yes
Dynamic Topic Subscription (动态主题订阅)
No
Yes
本文使用的Kafka版本为kafka_2.12-2.2.0
,故采用第二种方式进行整合。
二、项目依赖 项目采用Maven进行构建,主要依赖如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <properties > <scala.version > 2.12</scala.version > </properties > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming_$ {scala.version} </artifactId > <version > $ {spark.version} </version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming-kafka-0-10_$ {scala.version} </artifactId > <version > 2.4.3</version > </dependency > </dependencies >
完整源码见本仓库:spark-streaming-kafka
三、整合Kafka 通过调用KafkaUtils
对象的createDirectStream
方法来创建输入流,完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark streaming 整合 kafka */ object KafkaDirectStream { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("KafkaDirectStream" ).setMaster("local[2]" ) val streamingContext = new StreamingContext(sparkConf, Seconds(5 )) val kafkaParams = Map [String, Object]( "bootstrap.servers" -> "hadoop001:9092" , "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark-streaming-group" , "auto.offset.reset" -> "latest" , "enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("spark-streaming-topic" ) val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value)).print() streamingContext.start() streamingContext.awaitTermination() }}
3.1 ConsumerRecord 这里获得的输入流中每一个Record实际上是ConsumerRecord<K, V>
的实例,其包含了Record的所有可用信息,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ConsumerRecord <K , V > { public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP; public static final int NULL_SIZE = -1 ; public static final int NULL_CHECKSUM = -1 ; private final String topic; private final int partition; private final long offset; private final long timestamp; private final TimestampType timestampType; private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; private final K key; private final V value; ..... }
3.2 生产者属性 在示例代码中kafkaParams
封装了Kafka消费者的属性,这些属性和Spark Streaming无关,是Kafka原生API中就有定义的。其中服务器地址、键序列化器和值序列化器是必选的,其他配置是可选的。其余可选的配置项如下:
1. fetch.min.byte 消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker会等待有足够的可用数据时才会把它返回给消费者。
2. fetch.max.wait.ms broker返回给消费者数据的等待时间。
3. max.partition.fetch.bytes 分区返回给消费者的最大字节数。
4. session.timeout.ms 消费者在被认为死亡之前可以与服务器断开连接的时间。
5. auto.offset.reset 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
latest(默认值) :在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据;
earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
6. enable.auto.commit 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false。
7. client.id 客户端id,服务器用来识别消息的来源。
8. max.poll.records 单次调用poll()
方法能够返回的记录数量。
9. receive.buffer.bytes 和 send.buffer.byte 这两个参数分别指定TCP socket 接收和发送数据包缓冲区的大小,-1代表使用操作系统的默认值。
3.3 位置策略 Spark Streaming中提供了如下三种位置策略,用于指定Kafka主题分区与Spark执行程序Executors之间的分配关系:
1 2 3 4 5 6 7 @Experimental def PreferFixed(hostMap : collection.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) @Experimental def PreferFixed(hostMap : ju.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(hostMap)
3.4 订阅方式 Spark Streaming提供了两种主题订阅方式,分别为Subscribe
和SubscribePattern
。后者可以使用正则匹配订阅主题的名称。其构造器分别如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 /** * @param 需要订阅的主题的集合 * @param Kafka消费者参数 * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或auto.offset.reset属性的值 */ def Subscribe[K , V]( topics: ju.Collection[jl.String ], kafkaParams: ju.Map [String , Object], offsets: ju.Map [TopicPartition, jl.Long]): ConsumerStrategy[K , V] = { ... } /** * @param 需要订阅的正则 * @param Kafka消费者参数 * @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或auto.offset.reset属性的值 */ def SubscribePattern[K , V]( pattern: ju.regex.Pattern , kafkaParams: collection.Map [String , Object], offsets: collection.Map [TopicPartition, Long]): ConsumerStrategy[K , V] = { ... }
在示例代码中,我们实际上并没有指定第三个参数offsets
,所以程序默认采用的是配置的auto.offset.reset
属性的值latest,即在偏移量无效的情况下,消费者将从其启动之后生成的最新的记录开始读取数据。
3.5 提交偏移量 在示例代码中,我们将enable.auto.commit
设置为true,代表自动提交。在某些情况下,你可能需要更高的可靠性,如在业务完全处理完成后再提交偏移量,这时候可以使用手动提交。想要进行手动提交,需要调用Kafka原生的API :
commitSync
: 用于异步提交;
commitAsync
:用于同步提交。
具体提交方式可以参见:Kafka消费者详解
四、启动测试 4.1 创建主题 1. 启动Kakfa Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的:
1 2 3 4 5 bin/zkServer.sh start bin /zookeeper-server -start.sh config/zookeeper.properties
启动单节点kafka用于测试:
1 # bin/kafka-server-start.sh config/server.properties
2. 创建topic 1 2 3 4 5 6 7 8 9 bin/ kafka-topics. sh --create \ --bootstrap-server hadoop001:9092 \ --replication-factor 1 \ --partitions 1 \ --topic spark-streaming- topic # 查看所有主题 bin/ kafka-topics. sh --list --bootstrap-server hadoop001:9092
3. 创建生产者 这里创建一个Kafka生产者,用于发送测试数据:
1 bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic
4.2 本地模式测试 这里我直接使用本地模式启动Spark Streaming程序。启动后使用生产者发送数据,从控制台查看结果。
从控制台输出中可以看到数据流已经被成功接收,由于采用kafka-console-producer.sh
发送的数据默认是没有key的,所以key值为null。同时从输出中也可以看到在程序中指定的groupId
和程序自动分配的clientId
。
参考资料
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html l