一、简介
下图为Strom的运行流程图,在开发Storm流处理程序时,我们需要采用内置或自定义实现spout
(数据源)和bolt
(处理单元),并通过TopologyBuilder
将它们之间进行关联,形成Topology
。
二、IComponent接口
IComponent
接口定义了Topology中所有组件(spout/bolt)的公共方法,自定义的spout或bolt必须直接或间接实现这个接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public interface IComponent extends Serializable {
void declareOutputFields(OutputFieldsDeclarer declarer);
Map<String, Object> getComponentConfiguration();
}
|
三、Spout
3.1 ISpout接口
自定义的spout需要实现ISpout
接口,它定义了spout的所有可用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId); }
|
3.2 BaseRichSpout抽象类
通常情况下,我们实现自定义的Spout时不会直接去实现ISpout
接口,而是继承BaseRichSpout
。BaseRichSpout
继承自BaseCompont
,同时实现了IRichSpout
接口。
IRichSpout
接口继承自ISpout
和IComponent
,自身并没有定义任何方法:
1 2 3
| public interface IRichSpout extends ISpout, IComponent {
}
|
BaseComponent
抽象类空实现了IComponent
中getComponentConfiguration
方法:
1 2 3 4 5 6
| public abstract class BaseComponent implements IComponent { @Override public Map<String, Object> getComponentConfiguration() { return null; } }
|
BaseRichSpout
继承自BaseCompont
类并实现了IRichSpout
接口,并且空实现了其中部分方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { @Override public void close() {}
@Override public void activate() {}
@Override public void deactivate() {}
@Override public void ack(Object msgId) {}
@Override public void fail(Object msgId) {} }
|
通过这样的设计,我们在继承BaseRichSpout
实现自定义spout时,就只有三个方法必须实现:
- open : 来源于ISpout,可以通过此方法获取用来发送tuples的
SpoutOutputCollector
;
- nextTuple :来源于ISpout,必须在此方法内部发送tuples;
- declareOutputFields :来源于IComponent,声明发送的tuples的名称,这样下一个组件才能知道如何接受。
四、Bolt
bolt接口的设计与spout的类似:
4.1 IBolt 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
void cleanup();
|
4.2 BaseRichBolt抽象类
同样的,在实现自定义bolt时,通常是继承BaseRichBolt
抽象类来实现。BaseRichBolt
继承自BaseComponent
抽象类并实现了IRichBolt
接口。
IRichBolt
接口继承自IBolt
和IComponent
,自身并没有定义任何方法:
1 2 3
| public interface IRichBolt extends IBolt, IComponent {
}
|
通过这样的设计,在继承BaseRichBolt
实现自定义bolt时,就只需要实现三个必须的方法:
- prepare: 来源于IBolt,可以通过此方法获取用来发送tuples的
OutputCollector
;
- execute:来源于IBolt,处理tuples和发送处理完成的tuples;
- declareOutputFields :来源于IComponent,声明发送的tuples的名称,这样下一个组件才能知道如何接收。
五、词频统计案例
5.1 案例简介
这里我们使用自定义的DataSourceSpout
产生词频数据,然后使用自定义的SplitBolt
和CountBolt
来进行词频统计。
案例源码下载地址:storm-word-count
5.2 代码实现
1. 项目依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> </dependency>
|
2. DataSourceSpout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; }
@Override public void nextTuple() { String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); }
@Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); }
private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); }
}
|
上面类使用productData
方法来产生模拟数据,产生数据的格式如下:
1 2 3 4 5 6 7 8 9
| Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
|
3. SplitBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; }
@Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(new Values(word)); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
|
4. CountBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.print("当前实时统计结果:"); counts.forEach((key, value) -> System.out.print(key + ":" + value + "; ")); System.out.println(); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {
} }
|
5. LocalWordCountApp
通过TopologyBuilder将上面定义好的组件进行串联形成 Topology,并提交到本地集群(LocalCluster)运行。通常在开发中,可先用本地模式进行测试,测试完成后再提交到服务器集群运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class LocalWordCountApp {
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountApp", new Config(), builder.createTopology()); }
}
|
6. 运行结果
启动WordCountApp
的main方法即可运行,采用本地模式Storm会自动在本地搭建一个集群,所以启动的过程会稍慢一点,启动成功后即可看到输出日志。
六、提交到服务器集群运行
6.1 代码更改
提交到服务器的代码和本地代码略有不同,提交到服务器集群时需要使用StormSubmitter
进行提交。主要代码如下:
为了结构清晰,这里新建ClusterWordCountApp类来演示集群模式的提交。实际开发中可以将两种模式的代码写在同一个类中,通过外部传参来决定启动何种模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ClusterWordCountApp {
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
try { StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } }
}
|
6.2 打包上传
打包后上传到服务器任意位置,这里我打包后的名称为storm-word-count-1.0.jar
1
| # mvn clean package -Dmaven.test.skip=true
|
6.3 提交Topology
使用以下命令提交Topology到集群:
1 2
| storm jar /usr/appjar/storm-word-count-1.0.jar com.myhhub.wordcount.ClusterWordCountApp
|
出现successfully
则代表提交成功:
6.4 查看Topology与停止Topology(命令行方式)
1 2 3 4 5
| storm list
storm kill ClusterWordCountApp -w 3
|
6.5 查看Topology与停止Topology(界面方式)
使用UI界面同样也可进行停止操作,进入WEB UI界面(8080端口),在Topology Summary
中点击对应Topology 即可进入详情页面进行操作。
七、关于项目打包的扩展说明
mvn package的局限性
在上面的步骤中,我们没有在POM中配置任何插件,就直接使用mvn package
进行项目打包,这对于没有使用外部依赖包的项目是可行的。但如果项目中使用了第三方JAR包,就会出现问题,因为package
打包后的JAR中是不含有依赖包的,如果此时你提交到服务器上运行,就会出现找不到第三方依赖的异常。
这时候可能大家会有疑惑,在我们的项目中不是使用了storm-core
这个依赖吗?其实上面之所以我们能运行成功,是因为在Storm的集群环境中提供了这个JAR包,在安装目录的lib目录下:
为了说明这个问题我在Maven中引入了一个第三方的JAR包,并修改产生数据的方法:
1 2 3 4 5
| <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency>
|
StringUtils.join()
这个方法在commons.lang3
和storm-core
中都有,原来的代码无需任何更改,只需要在import
时指明使用commons.lang3
。
1 2 3 4 5 6 7 8
| import org.apache.commons.lang3.StringUtils;
private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); }
|
此时直接使用mvn clean package
打包运行,就会抛出下图的异常。因此这种直接打包的方式并不适用于实际的开发,因为实际开发中通常都是需要第三方的JAR包。
想把依赖包一并打入最后的JAR中,maven提供了两个插件来实现,分别是maven-assembly-plugin
和maven-shade-plugin
。鉴于本篇文章篇幅已经比较长,且关于Storm打包还有很多需要说明的地方,所以关于Storm的打包方式单独整理至下一篇文章:
Storm三种打包方式对比分析
参考资料
- Running Topologies on a Production Cluster
- Pre-defined Descriptor Files