成熟了"/>
Spark Streaming Kafka开发现在很成熟了
现在spark streaming流处理很成熟了,基于kafka的官方也提供了2种API。一种是Receiver-based Approach,一种是Direct Approach (No Receivers)。第二种方式性能比较高,是以后的趋势,但目前还叫实验版。
第一种构造方法是:
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
我们一般这么用:
val ths = (1 to numThreads.toInt).map { i =>
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
}
val kafkaStreams = ssc.union(ths)
启动多个,然后在合并,这是为了让receiver更分散。
第二种是:
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
用法如下:
val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
处理结果呢?一般都怎么放?在storm中一般在bolt中写mysql,hive,kafka,hbase。都有,我感觉hbase比较方便一点。当然回写kafka好像也不错。可以试试。
写Hbase:
kafkaStreams.foreachRDD(rdd => {
rdd.foreachPartition(par => {
val conf = HBaseConfiguration.create()
val conn = HConnectionManager.createConnection(conf)
val table: HTableInterface = conn synchronized {
conn.getTable(“Your Table”)
}
table.setAutoFlushTo(false)
table.setWriteBufferSize(10 * 1024 * 1024L)
val list = new java.util.ArrayListPut
par.foreach(a => {
/ ** 业务逻辑
…
**/
val put = new Put(xxx)
list.add(put)
})
table.put(list)
table.flushCommits()
table.close()
conn.close()
})
})
如果感觉处理不过来,可以试试设置如下参数:
spark.streaming.receiver.maxRate // 第一种方式
spark.streaming.kafka.maxRatePerPartition // 第二种方式
建议:流处理最好不要on yarn(特指动态资源分配方式提交到yarn)
更多推荐
Spark Streaming Kafka开发现在很成熟了
发布评论