基本信息
源码名称:storm+kafka应用案例
源码大小:28.62M
文件格式:.rar
开发语言:Java
更新时间:2020-12-21
友情提示:(无需注册或充值,赞助后即可获取资源下载链接)
嘿,亲!知识可是无价之宝呢,但咱这精心整理的资料也耗费了不少心血呀。小小地破费一下,绝对物超所值哦!如有下载和支付问题,请联系我们QQ(微信同号):813200300
本次赞助数额为: 2 元×
微信扫码支付:2 元
×
请留下您的邮箱,我们将在2小时内将文件发到您的邮箱
源码介绍
package cn.itcast.storm.topology; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import cn.itcast.storm.bolt.WordSpliter; import cn.itcast.storm.bolt.WriterBolt; import cn.itcast.storm.spout.MessageScheme; public class KafkaTopo { public static void main(String[] args) throws Exception { String zkRoot = "/kafka-storm"; String spoutId = "KafkaSpout"; BrokerHosts brokerHosts = new ZkHosts("weekend01:2181,weekend02:2181,weekend03:2181"); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "replicationgirls", zkRoot, spoutId); spoutConfig.forceFromStart = true; spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); TopologyBuilder builder = new TopologyBuilder(); //设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping(spoutId); builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("word-spilter", new Fields("word")); Config conf = new Config(); conf.setNumWorkers(4); conf.setNumAckers(0); conf.setDebug(false); //LocalCluster用来将topology提交到本地模拟器运行,方便开发调试 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordcount", conf, builder.createTopology()); //提交topology到storm集群中运行 // StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology()); } }