Bolt拆得好Storm跑得好 增加并行处理速度 Storm & kafka处理实时日志实战topology经验谈

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

项目背景介绍:

我们通过日志系统跟踪每个接口的运行状态,收发时间,平均速度,成功率,平均响应时长。日志被收集到了kafka,日志实时处理采用storm框架。一个请求有一条服务接收日志,一条服务器响应日志。需要通过日志实时处理,统计出每个接口的每分钟的指标,存到ES。

昨天的Topology图画成这个样子,如果有看过昨天的文章的话,应该还有印象。

Storm & kafka 日志统计代码流程转成多个Bolt

昨天写文章的时候,就思考要不要将CountBolt拆成MergeLogBolt加CountBolt,原来在一个CountBolt中即要做日志合并,又要做统计。今天整理代码的时候反复的思考,考虑到并行度的、代理管理维护、协作开发,多个维度,同时也考虑代码逻辑清晰度,果断拆开了。所以今天的图变成了:

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

如果细心看的话,每一个bolt都可以调整并行度了,可以依据每个bolt的处理速度配置合理科学的并行度。比如,要是EsBolt慢,可以增加EsBolt并行度,如果EsBolt并不是瓶颈所在,一个EsBolt也可以的话,可以配置为并行度为1。同时还可以考虑调整bolt之间的流控制,综合日志特点再调整并行度配置,从而提升集群处理的吞吐量。

如何享受宜信星火金服宜心理财实现财富增值。:
扫码二维码
通过宜信星火金服活动链接 http://www.ixinghuo.com/qcode.php?yixinqcode
通过宜信星火金服理财师店铺链接:https://xinghuo.yixin.com/yiidea
通过宜信星火金服理财师移动端邀请页面 https://xinghuo.yixin.com/mobile/activityPage/shareShop/yiidea
5.通过宜信星火金服理财师店铺宜心理财团队短链接:
http://yixin.hk
http://yixin.ceo
http://yue.ma
通过宜信星火金服宜心理财团队网站页面
http://www.yixinlicai.com.cn
通过 宜信.公司 | 宜信.网络 | 宜信.net

目前我们拆分的Bolt细化到了PareBolt,MergeLogBolt,CountBolt,MergeCountBolt,EsBolt,划分原则如下:

1、单一职责,每个Bolt仅做一件事情。比如MergeBolt,CountBolt,原来是划到一个CountBolt,进行代码梳理的时候发现,MergeBolt需要维护一个组装log的HashMap,并要定时进行清理一定时间窗口未组装成功的孤立日志,组装成功的要及时从HashMap 中删除。而CountBolt维护统计数据是按timeKey,域名、业务名来组织 HashMap,时间窗口的处理逻辑完全不一样。对于MergeLog来讲,组一个删除一个,再定时清理孤立日志就好。而CountBolt则不一样。感觉夹杂在一起,给同事介绍起来也费劲,代码组织也不够清晰。拆成两个bolt之后,感觉代码逻辑更加清晰了。也更好的利用Storm提供的实时计算能力,可依据计算情况,为每个bolt设置相应的并行度,达到更高的吞吐量。简单的来讲,就是将一个大的逻辑,拆成一个一个小的逻辑,用tuple串起来,每个bolt都有明确的输入tuple,明确的输出tuple,也好进行代码逻辑问题定位。

2、协作原则,相对于在一整串代码逻辑中去写逻辑,别人读懂代码会更加费劲,往往是懵menbelity,而划成一个一个bolt之后,一个bolt的逻辑更加简单好懂。每个Bolt拿到tuple只做一件明确的事情,可以更好的并行开发。大家只要约定好输入的tuple,产出的tuple。比如,我们将Es操作划成一个Bolt,落地数据的小伙伴只要专注于Es操作,做合并的小伙伴只要专注于合并。哈哈,是不是很神奇。

收尾,这是一天下来的工作心得,对bolt的划分经验也是经过不断进行代码整理思考的结果。开始是在一个bolt里完成业务所需的所有逻辑,之后才拆成一个一个bolt来对待,并不是一上来就拆成一个一个bolt,第一天的时候比较担心日志无法成对的问题,所以都不敢拆,实验完GroupByField之后才敢拆。这个是有一个过程的。首先要知道业务要干什么,打算怎么实现,拆的话会引起什么问题,怎么解决,经过同事的指点与摸索,算是有了些心得,至少现在拆分出的几个bolt,基本上感觉是OK,已经是拆到一个合理的地步了。当然我也是刚玩Storm,如有好的bolt拆分心得,也欢迎指点,欢迎分享分享。

巧妙拆分bolt提升Storm集群吞吐量
巧妙拆分bolt提升Storm集群吞吐量

Storm & kafka 日志统计代码流程转成多个Bolt

所谓一图击破迷团。我们使用kafkaSpout接上kafka消息队列消息,使用ParseBolt将kafka消息中的message拿出来,得到我们统计日志要使用的日志节点信息traceId(跟踪Id)、日志时间、日志类型等信息,ParseBolt仅干这件事,把拿到的日志节点信息emit出去!抵达下一个CoutBolt,采用了GroupByField进行数据流控制,将同traceId的 tuple送到一个Bolt处理。CountBolt处理会进行日志合并统计,这个CountBolt有点点肥大,目前还没有再拆,统计逻辑在这个CountBolt里面。达到一定周期后,会将自身的统计数据emit出去,抵达MergeBolt,MergeBolt只做合并,将CountBolt节点的数据进行合并,达到一定周期后,将合并好的数据emit出去,抵达EsBolt,EsBolt将结果写入ElasticSearch。

QA1:为什么我们要用TraceID进行流控制?

因为一条日志只记录接口收到的时间或响应时间,需合成一对,得到处理时长。

QA2:GroupByFeild怎么感觉理解起来很怪异?

是的,如果新接触Storm,会听到一堆新词汇,Spout,Bolt,Topology,minbus,supervisor,worker,task等等。感觉用路由来理解这里的GroupByFeild按字段分组数据流,可能会比较好理解,至少我是这么觉得的。

QA3:Bolt到底拆分成几个好?

其实,刚开始的时候,我是打算一个KafkaSpout,一个Bolt,先把业务逻辑整出来,再来拆。不管拆成多少个Bolt,完成业务所需要的逻辑步骤一个都不能少,不管怎么拆。一方面刚玩Storm,也得找感觉。写完整理逻辑后,再针对性感受一下拆开处理会有哪些问题,能不能拆?拆完之后有什么好处,拆成多个Bolt后,数据怎么合并,为什么要拆。反复推敲之后,得到大神指点,也考虑后续集群运行的计算资源使用率,得提高吞吐量是更好。将EsBolt单拿出来,主要考虑合并数据逻辑与Es操作逻辑本身就两个人写,拆开也好开发,逻辑也清晰。

QA4:还可以怎么拆?

CountBolt逻辑有两大逻辑,一是日志合二为一,二是请求计数,时长计数。日志合二为一,将收发时间组成对,得到单个请求时长,这个逻辑与计数可以相互独立出来。即CountBolt拆成ReformBolt+CountBolt,Reform仅对日志进行合二为一,CountBolt仅统计。想想是不是这样子?

QA5:并行度怎么调优?

这个暂时没有太多的经验,初步来看,EsBolt可能着重考虑一下,应该要做到批量插入。网上有小伙伴已经踩进一条一条插入到hbase超时的坑。Es看起来也类似。更详细的并行度分享,等我后续实战后再分享。

 

如何享受宜信星火金服宜心理财实现财富增值。:

  1. 扫码二维码
  2. 通过宜信星火金服活动链接 http://www.ixinghuo.com/qcode.php?yixinqcode
  3. 通过宜信星火金服理财师店铺链接:https://xinghuo.yixin.com/yiidea
  4. 通过宜信星火金服理财师移动端邀请页面 https://xinghuo.yixin.com/mobile/activityPage/shareShop/yiidea
    5.通过宜信星火金服理财师店铺宜心理财团队短链接:

http://yixin.hk

http://yixin.ceo

http://yue.ma

  1. 通过宜信星火金服宜心理财团队网站页面

http://www.yixinlicai.com.cn

  1. 通过  宜信.公司 | 宜信.网络 | 宜信.net

Storm让人开心的是,这种Bolt串起来的方式,很有趣,感觉非常适合做可视化编程控制,不仅仅是用在Storm上,可以将程序按这种思想拆起一个一个Bolt小球,通过可视化的界面来配置逻辑流程,相当于可视化编程或者可视化生成Storm Topology,想想都非常有意思。

今天的实战分享就到这里啦,每天都会分享工作中实战的点点滴滴,并不能保证自己当前的做法就是百分之百正确或合理,敬请指教。

Storm入门经典文章:Word Count & kafka to Storm 本地模式运行storm的demo 单机模式跑起来

storm hello world & kafka storm

storm hello world & kafka storm

Storm是一个实时计算框架,有开源的大神为我们搭好了平台,按照大神的玩法,Storm的作业是topology,而topolgy是由spout,blot组成,spout是取数据,blot是处理数据,一个topology由一个spout加多个blot组成。将topology丢到storm上就能跑起来,可以在本地模式下跑,也可以在集群模式下跑。

Storm的使用,可以查看小伙伴田海龙的经典Blog示例。一个经典的wordcount示例。这个示范了取数据到处理数据的过程。实际使用Storm的时候, 通常是Storm+kafka,按照我们公司的日志系统的情况来看,是这么搭配。

玩Storm能增值不?

当然可以,提升自身身价。同时,也可以考虑星火理财专业手段直接增值 。新技术能提升个人的技术身价,不过,玩Storm,玩到什么程度又是一个境界了。这个要看如何平衡了。一般日志成型后,可能也不会怎么动了,一直在不断的接触新东西,而storm的源代码读过否?

storm与财富增值

storm与财富增值

如何享受宜信星火金服宜心理财实现财富增值。:

  1. 扫码二维码
  2. 通过宜信星火金服活动链接 http://www.ixinghuo.com/qcode.php?yixinqcode
  3. 通过宜信星火金服理财师店铺链接:https://xinghuo.yixin.com/yiidea
  4. 通过宜信星火金服理财师移动端邀请页面 https://xinghuo.yixin.com/mobile/activityPage/shareShop/yiidea
    5.通过宜信星火金服理财师店铺宜心理财团队短链接:

http://yixin.hk

http://yixin.ceo

http://yue.ma

  1. 通过宜信星火金服宜心理财团队网站页面

http://www.yixinlicai.com.cn

  1. 通过  宜信.公司 | 宜信.网络 | 宜信.net

Storm难吗?

如果说使用storm,相对说讲,还是比较easy。比如用storm写一个word count,或者用storm读取kafka的消息。一般来讲,拿来用是比较简单。如果要去深入看storm,就另说了。

Storm如何集成Kafka?

其实有的时候找来找去,很多答案就在github.com上面,Storm的官方源码中,有kafka的集成示范代码,如果看不明白的话,其实也没有很大的关系。因为我们不还可以找国内小伙伴分享的一些经典。

我的经验分享,也是我们结合网上教程实践一些心得。我们知道topology是由spout与bolt组成的,那么跟kafka集成,必然要一个kafkaSpout,这个有现成的,网上的代码只需要实现一个Scheme,对kafka的输出进行转化,把字节流转成字符串。简单的说,kafkaspout别人都写好了。只需要写一个bytebuffer to string的方法。

小坑之一: String 转UTF8

网上的代码报错,找了一段bytebuffer to string替换掉

小坑之二:zookeeper

到底是跟kafka共用一个,还是独立一套zookeeper给storm用。这个问题纠结了一下,因为zookeeper要是每人上集群都配置一套的话,会有好多zookeeper,不过,老司机校友也是这么推荐的,那就听老司机的。单独给storm部署一的大套。插曲:同时听我要zookeeper的同事吓一跳,因为TA觉得是给kafka用的,不想给我这个Storm用。而我们今天只是想把storm与kafka跑通来,并不想再去部署一个zookeeper。哈哈,最好还是蹭的kafka的zookeeper!

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦

有的人写Blog,帖很多代码,都不带写些文字,也有的人喜欢写很多文字,看不到代码。我喜欢全二为一,尽量把自己的一些想法加在里面。还记录一下坑啊什么的,可以查看田海龙的一个入门级demo  http://www.tianhailong.com/?p=1358

WordCounter.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.youku.demo.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}

WordNormalizer.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.youku.demo.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
    public void cleanup() {}
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordReader.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.youku.demo.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }
    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }
    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

TopologyMain.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.youku.demo;
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(2000);
        cluster.shutdown();
    }
}

pom.xml:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.youku.demo</groupId>
    <artifactId>demo-storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>demo-storm</name>
    <url>http://maven.apache.org</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

words.txt:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great

运行的时候需要配置参数:src/main/resources/words.txt

因为入门的wordcount单独展示的,会给人感觉没有太大的实用价值。将kafka与storm的放一起,可以更好的理解怎么与kafka集成。

关注公众号回复【kafka】可以获得我实战整理的storm连kafka源代码包包哦。