消息"/>
使用storm trident消费kafka消息
一、前言
storm通过保证数据至少被处理一次来保证数据的完整性,由于元祖可以重发,对于一些需要数据精确的场景,可以考虑用storm trident实现。
传统的事物型拓扑中存在几种bolt:
1.1 BasicBolt
这是最基本的Bolt,BasicBolt每次只能处理一个tuple,而且必须等前一个tuple成功处理后下一个tuple才能继续处理,显然效率不高。
1.2 BatchBolt
storm的一个优势就是能够批量处理tuple,BatchBolt支持批量处理tuple,每一个batch中的tuple都会调用execute(),处理完成后调用finishBatch方法。
1.3 Committer BatchBolt
标记为Committer的BatchBolt和基本的BasicBolt的区别在于二者调用finishBatch()的时机不同,标记为Committer的BatchBolt在提交阶段就会调用finishBatch()。
二、storm trident的使用
storm目前的版本下载 已经将事物拓扑的实现封装trident,trident目前支持3种不同的事物接口,一种是非事物型的(不介绍,因为基本不用),一种是事务性的TransactionalTridentKafkaSpout,而我们比较常用的是透明型事物OpaqueTridentKafkaSpout(事务型应用最重要的一点是要判断一批消息是新的还是已来过的)。
2.1 TransactionalTridentKafkaSpout
原理是每次在数据库中存了txid,IPartitionedTransactionalSpout的每一个tuple都会绑定在固定的批次(batch)中。
一个批次无论重发多少次,它也只有一个唯一且相同的事务ID,它所包含的内容都是完全一致的,而一个tuple无论被重发多少次只会在同一个批次里。
使用方式下载如下:
- TridentTopology topology = new TridentTopology();
- TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(zkHosts, topic, spoutId);
- tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new ConvertStringScheme());
- /**
- * 支持事物,支持失败重发
- *
- */
- TransactionalTridentKafkaSpout transactionalTridentKafkaSpout = new TransactionalTridentKafkaSpout(
- tridentKafkaConfig);
- topology.newStream("name",transactionalTridentKafkaSpout)
- .shuffle()
- .each(new Fields("msg"), new SpilterFunction(), new Fields("sentence"))
- .groupBy(new Fields("sentence"))
- .aggregate(new Fields("sentence"), new SumWord(),new Fields("sum"))
- .parallelismHint(5)
- .each(new Fields("sum"), new PrintFilter_partition());
- Config config = new Config();
- StormSubmitter.submitTopology("XXX", config,
- topology.build());
但貌似目前TransactionalTridentKafkaSpout有个bug,启动会报:classCastException(非代码问题)
然而我们可以想到的是,IPartitionedTransactionalSpout会有一个问题,假设一批消息在被bolt消费过程中失败了,需要spout重发,此时如果正巧遇到消息发送中间件故障,
例如某一个分区不可读,spout为了保证重发时每一批次包含的tuple一致,它只能等待消息中间件恢复,也就是卡在那里无法再继续发送给bolt消息了,直至消息中间件恢复(因为它必须发送一样的Batch)。
2.2 OpaqueTridentKafkaSpout
IOpaquePartitionedTransactionalSpout不保证每次重发一个批次的消息所包含的tuple完全一致。也就是说某个tuple可能第一次在txid=1的批次中出现,后面有可能在txid=3的批次中出现。这种情况只出现在当某一批次消息消费失败需要重发且恰巧消息中间件故障时。这时,IOpaquePartitionedTransactionalSpout不是等待消息中间件故障恢复,而是先读取可读的partition。例如txid=1的批次在消费过程中失败了,需要重发,恰巧消息中间件的16个分区有1个分区(partition=3)因为故障不可读了。这时候IOpaquePartitionedTransactionalSpout会先读另外的15个分区,完成txid=1这个批次的发送,这时候同样的批次其实包含的tuple已经少了。假设在txid=3时消息中间件的故障恢复了,那之前在txid=1且在分区partition=3的还没有被发送的tuple会被重新发送, 包含在txid=3的批次中,所以其不保证每批次的batch包含的tuple是一样的。
2.2.1 实战
首先搭建下载好zk,kafka,storm的分布式环境,先起zk,然后kafka然后storm.分别启动后效果jps看一下
master机器:
slave1机器:
slave2机器:
hosts里面配置
2.2.1.1 创建topic
2.2.1.2 写storm消费端
main方法下载
Java代码- public static void main(String[] args) throws AlreadyAliveException,
- InvalidTopologyException, AuthorizationException {
- TridentTopology topology = new TridentTopology();
- TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, topic);
- kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
- OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(
- kafkaConfig);
- topology.newStream("test_kafka2storm_opaqueTrident",
- opaqueTridentKafkaSpout)
- .parallelismHint(3)
- .shuffle()
- .each(new Fields("str"), new SpilterFunction(), new Fields("sentence"))
- .groupBy(new Fields("sentence"))
- .aggregate(new Fields("sentence"), new SumWord(),
- new Fields("sum")).parallelismHint(5)
- .each(new Fields("sum"), new PrintFilter_partition());
- Config config = new Config();
- config.setDebug(false);
- config.setNumWorkers(2);
- StormSubmitter.submitTopology("test_kafka2storm_opaqueTrident_topology", config,
- topology.build());
- }
SpilterFunction下载:
Java代码- import org.apache.storm.trident.operation.BaseFunction;
- import org.apache.storm.trident.operation.TridentCollector;
- import org.apache.storm.trident.tuple.TridentTuple;
- import org.apache.storm.tuple.Values;
- public class SpilterFunction extends BaseFunction {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- String sentens = tuple.getString(0);
- String[] array = sentens.split("\\s+");
- for(int i=0;i<array.length;i++){
- System.out.println("spilter emit:" + array[i]);
- collector.emit(new Values(array[i]));
- }
- }
- }
- import java.util.HashMap;
- import java.util.Map;
- import org.apachemons.collections.MapUtils;
- import org.apache.storm.trident.operation.BaseAggregator;
- import org.apache.storm.trident.operation.TridentCollector;
- import org.apache.storm.trident.operation.TridentOperationContext;
- import org.apache.storm.trident.tuple.TridentTuple;
- import org.apache.storm.tuple.Values;
- public class SumWord extends BaseAggregator<Map<String,Integer>> {
- private static final long serialVersionUID = 1L;
- /**
- * 属于哪个batch
- */
- private Object batchId;
- /**
- * 属于哪个分区
- */
- private int partitionId;
- /**
- * 分区数量
- */
- private int numPartitions;
- /**
- * 用来统计
- */
- private Map<String,Integer> state;
- 下载
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map conf, TridentOperationContext context) {
- state = new HashMap<String,Integer>();
- partitionId = context.getPartitionIndex();
- numPartitions = context.numPartitions();
- }
- @Override
- public Map<String, Integer> init(Object batchId, TridentCollector collector) {
- this.batchId = batchId;
- return state;
- }
- @Override
- public void aggregate(Map<String, Integer> val, TridentTuple tuple,
- TridentCollector collector) {
- System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions
- +",batchId:" + batchId);
- String word = tuple.getString(0);
- val.put(word, MapUtils.getInteger(val, word, 0)+1);
- System.out.println("sumWord:" + val);
- }
- @Override
- public void complete(Map<String, Integer> val, TridentCollector collector) {
- collector.emit(new Values(val));
- }
- }
- import org.apache.storm.trident.operation.BaseFilter;
- import org.apache.storm.trident.tuple.TridentTuple;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class PrintFilter_partition extends BaseFilter {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PrintFilter_partition.class);
- private static final long serialVersionUID = 1L;
- @Override
- public boolean isKeep(TridentTuple tuple) {
- LOGGER.info("打印出来的tuple:" + tuple);
- return true;
- }
- }
更多推荐
使用storm trident消费kafka消息
发布评论