一、整合说明
Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:
这里我服务端安装的Kafka版本为2.2.0(Released Mar 22, 2019) ,按照官方0.10.x+的整合文档进行整合,不适用于0.8.x版本的Kafka。
二、写入数据到Kafka
2.1 项目结构
2.2 项目主要依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <properties> <storm.version>1.2.2</storm.version> <kafka.version>2.2.0</kafka.version> </properties>
<dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>
|
2.3 DataSourceSpout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; }
@Override public void nextTuple() { String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); }
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); }
private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); }
}
|
产生的模拟数据格式如下:
1 2 3 4 5 6 7 8 9
| Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
|
2.4 WritingToKafkaApp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
public class WritingToKafkaApp {
private static final String BOOTSTRAP_SERVERS = "hadoop001:9092"; private static final String TOPIC_NAME = "storm-topic";
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt<String, String>() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector(TOPIC_NAME)) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());
builder.setSpout("sourceSpout", new DataSourceSpout(), 1); builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");
if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWritingToKafkaApp", new Config(), builder.createTopology()); } } }
|
2.5 测试准备工作
进行测试前需要启动Kakfa:
1. 启动Kakfa
Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的:
1 2 3 4 5
| bin/zkServer.sh start
bin/zookeeper-server-start.sh config/zookeeper.properties
|
启动单节点kafka用于测试:
1
| # bin/kafka-server-start.sh config/server.properties
|
2. 创建topic
1 2 3 4 5
| bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic
# 查看所有主题 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
|
3. 启动消费者
启动一个消费者用于观察写入情况,启动命令如下:
2.6 测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用maven-shade-plugin
进行打包,打包命令如下:
1
| # mvn clean package -D maven.test.skip=true
|
启动后,消费者监听情况如下:
三、从Kafka中读取数据
3.1 项目结构
3.2 ReadingFromKafkaApp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
public class ReadingFromKafkaApp {
private static final String BOOTSTRAP_SERVERS = "hadoop001:9092"; private static final String TOPIC_NAME = "storm-topic";
public static void main(String[] args) {
final TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1); builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");
if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalReadingFromKafkaApp", new Config(), builder.createTopology()); } }
private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) { return KafkaSpoutConfig.builder(bootstrapServers, topic) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setRetry(getRetryService()) .setOffsetCommitPeriodMs(10_000) .build(); }
private static KafkaSpoutRetryService getRetryService() { return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); } }
|
3.3 LogConsoleBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class LogConsoleBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; }
public void execute(Tuple input) { try { String value = input.getStringByField("value"); System.out.println("received from kafka : "+ value); collector.ack(input); }catch (Exception e){ e.printStackTrace(); collector.fail(input); }
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
} }
|
这里从value
字段中获取kafka输出的值数据。
在开发中,我们可以通过继承RecordTranslator
接口定义了Kafka中Record与输出流之间的映射关系,可以在构建KafkaSpoutConfig
的时候通过构造器或者setRecordTranslator()
方法传入,并最后传递给具体的KafkaSpout
。
默认情况下使用内置的DefaultRecordTranslator
,其源码如下,FIELDS
中 定义了tuple中所有可用的字段:主题,分区,偏移量,消息键,值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> { private static final long serialVersionUID = -5782462870112305750L; public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value"); @Override public List<Object> apply(ConsumerRecord<K, V> record) { return new Values(record.topic(), record.partition(), record.offset(), record.key(), record.value()); }
@Override public Fields getFieldsFor(String stream) { return FIELDS; }
@Override public List<String> streams() { return DEFAULT_STREAM; } }
|
3.4 启动测试
这里启动一个生产者用于发送测试数据,启动命令如下:
1
| # bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic
|
本地运行的项目接收到从Kafka发送过来的数据:
用例源码下载地址:storm-kafka-integration
参考资料
- Storm Kafka Integration (0.10.x+)