搜档网
当前位置:搜档网 › Storm中文学习手册

Storm中文学习手册

Storm中文学习手册
Storm中文学习手册

Getting Started with Storm中文版

韩飞译

Mail : aaron.han.1986@https://www.sodocs.net/doc/018219391.html,

QQ : 362504281

Blog : https://www.sodocs.net/doc/018219391.html,/lonelytrooper

写在前面的话:翻译这本书纯因为个人对storm的兴趣,水平有限,希望大家多多指正,我自己也会不断改进其中的内容。希望这本书能对

storm的入门爱好者起到一定的帮助作用。希望此版本仅用作学

习交流,不要出现挂在论坛赚积分、甚至赚钱的无耻行为。

2013.9.30

中文版版权所有翻版必究

第一章基础 (4)

Storm组件 (5)

Storm属性 (5)

第二章开始 (6)

操作模式 (6)

本地模式 (6)

远程模式 (7)

Hello World Storm (7)

验证Java安装 (8)

创建工程 (8)

建立我们第一个Topology (10)

Spout (10)

Bolts (13)

主类 (17)

实践一下 (19)

结论 (20)

第三章Topologies (20)

流分组 (20)

Shuffle分组 (21)

Fields分组 (21)

All分组 (21)

自定义分组 (22)

Direct分组 (23)

Global分组 (24)

None分组 (24)

LocalCluster versus StormSubmitter (24)

DPRC Topologies (25)

第四章Spouts (26)

可靠消息versus不可靠消息 (27)

获取数据 (28)

直接连接 (28)

队列消息 (31)

DRPC (33)

结论 (33)

第五章Bolts (34)

Bolt生命周期 (34)

Bolt结构 (34)

可靠Bolts versus不可靠Bolts (35)

多流 (36)

多锚定 (36)

通过IBasicBolt来自动Ack (36)

第六章一个真实的示例 (37)

Node.js Web应用 (38)

开始Node.js Web应用 (39)

The Storm Topology (39)

UsersNavigationSpout (41)

GetCategoryBolt (42)

UserHistoryBolt (43)

ProductCategoriesCounterBolt (46)

NewsNotifierBolt (48)

Redis服务器 (48)

生产信息 (49)

用户导航队列 (49)

中间数据 (49)

结果 (49)

测试Topology (50)

测试初始化 (50)

一个测试用例 (52)

扩展性和可用性的说明 (52)

第七章在Storm中使用非JVM语言 (53)

Mutilang协议 (54)

初始握手 (55)

启动循环并读写元组 (56)

第八章事务性Topologies (61)

设计 (61)

事务实战 (62)

The Spout (63)

The Bolts (67)

提交者Bolts (69)

分区的事务性Spouts (71)

不透明事务Topologies (73)

第一章基础

Storm是一套分布式的、可靠的,可容错的用于处理流式数据的系统。处理工作会被委派给不同类型的组件,每个组件负责一项简单的、特定的处理任务。Storm集群的输入流由名为spout的组件负责。Spout将数据传递给名为bolt的组件,后者将以某种方式处理这些数据。例如bolt以某种存储方式持久化这些数据,或者将它们传递给另外的bolt。你可以把一个storm集群想象成一条由bolt组件组成的链,每个bolt对spout暴露出来的数据做某种方式的处理。

为了说明这个概念,这里有一个简单的示例。昨天晚上在我看新闻时,播音员们开始谈论政治家以及他们阵营的各种话题。播音员们一直重复着不同的名字,于是我想知道是否每个名字被提及了相同的次数,或者提到的次数是否有偏重。

把播音员们说的字幕认为成你的数据输入流。你可以让spout来从一个文件(或者套接字,通过HTTP,或者一些其他方法)读取输入。当文本行到达时,spout将它们交给一个bolt,该bolt将文本行流分隔成单词。单词流被传递到另一个bolt,在这个bolt里,每个单词会被与一个预先定义好的政治家名单列表作比较。每作一次比较,第二个bolt会在数据库中增加一次那个名字的计数。当你想查看结果时,你只要查询数据库,该数据库在数据到达时被会实时更新。所有组件的排列(spouts和bolts)及它们的连接被称为一个topology(见图1-1)。

图1-1. 一个简单的topology

现在想象你可以简单地定义整个集群中每个bolt和spout的并行度,这样你就可以无限地扩展你的topology。很神奇,不是吗?尽管这只是一个简单的例子,你已经可以看到storm有多强大。

storm的典型用例有哪些呢?

流处理

正如前述示例中演示的,与其他的流处理系统不同,使用storm不需要中间队列。

持续计算

向客户端持续发送数据,这样它们可以实时更新并显示结果,例如站点度量。

分布式远程过程调用

简单地并行化cpu密集型操作。

Storm组件

在storm集群中,结点被一个持续运行的主结点管理。

Storm集群中有两种结点:主结点和工作结点。主结点运行一个叫做Nimbus的守护进程,它负责在集群内分发代码,为每个工作结点指派任务和监控失败的任务。工作结点运行一个叫做Supervisor的守护进程,它执行topology的一部分。Storm中的topology运行在不同机器的许多工作结点上。

因为storm保存所有集群状态在zookeeper或者本地磁盘上,因此这些守护进程是无状态的,并且可以在对系统健康无影响的情况下失败或者重启(见图1-2)。

图1-2 storm集群的组件

在底层,storm使用了zeromq(Omq,zeromq),一种高级的,可嵌入的、提供极好特性的网络库,这使得storm成为可能。我们列举一下zeromq的一些特性:

﹒充当并行框架的套接字库

﹒比TCP更快速,支持集群产品和超级计算

﹒在进程内、IPC、TCP和多播之间传递数据

﹒异步IO支持可扩展的多核消息传递应用

﹒通过展开、发布订阅、管道、请求应答连接N-to-N

Storm只使用push/pull套接字。

Storm属性

在所有的这些设计理念及决策中,有一些非常好的属性使得storm与众不同。

编程简单

如果你尝试过从头开始构建实时处理系统,你会明白它有多痛苦。通过storm,复杂性被引人注目的减少了。

支持多种编程语言

使用基于JVM的开发语言很容易,但是storm支持任何语言只要你使用或者实现一个小的中间库。

容错

Storm关注worker数量的下降并在必要时刻对任务进行重分配。

可扩展

对于扩展,你所有要做的只是为集群增加更多的机器。Storm会为新机器分配任务当它们可用的时候。

可靠

所有的消息都确保至少处理一次。如果有错误,消息可能会被处理不止一次,但是你永远不会丢失数据。

快速

速度曾是驱动storm设计的关键因素。

事务

对于几乎任何计算,你可以获取确切的一次消息语义。

第二章开始

在本章中,我们会建一个storm工程和我们的第一个storm topology。

下述假设你安装了至少1.6版本的Java运行时环境(JRE)。我们推荐使用oracle提供的JRE,可以在这里找到https://www.sodocs.net/doc/018219391.html,/downloads/。

操作模式

在我们开始之前,理解storm的操作模式很重要。有两种方式运行storm。

本地模式

在本地模式中,storm topologies运行在本地机器一个单独的JVM中。由于是最简单的查看所有的topology组件一起工作的模式,这种方式被用来开发,测试和调试。在这种模式下,我们可以调整参数,这使得我们可以看到我们的topology在不同的storm配置环境下是怎么运行的。为了以本地模式运行topologies,我们需要下载storm的开发依赖包,其中包含了我们开发和测试topology所需的所有东西。

当我们建立自己的第一个storm工程的时候我们很快就可以看到是怎么回事了。

在本地模式运行topology与在storm集群中运行它是类似的。然而,确保所有的组件线程安全是重要的,因为当它们被部署到远程模式中时,它们可能运行在不同的JVM 中或者在不同的物理机器上,这样的话,它们之间没有直接的交流或者内存共享。

本章的所有示例,我们都以本地模式运行。

远程模式

在远程模式中,我们提交topology到storm集群,该集群由许多进程组成,通常运行在不同的机器上。远程模式不显示调试信息,这也是它被认为是生产模式的原因。然而,在一台单独的开发机器上建立storm集群是可能的,并且它被认为是在部署至生产前的一个好方法,这可以确保在生产环境中运行topology时没有任何问题。

你在第六章中可以了解到更多关于远程模式的内容,我会在附录B里展示怎样安装一个集群。Hello World Storm

在这个工程中,我们会建立一个简单的topology来为单词计数。我们可以把这个工程

认为是storm topologies的“hello world”。然而,它是一个非常强大的topology,因为

它只需要做一些小的改动便可以扩展到几乎无限规模并且,我们甚至可以用它来做一

个统计系统。例如,我们可以修改这个项目来找出Twitter上的话题趋势。

为了建立这个topology,我们将使用一个spout来负责读取单词,第一个bolt来标准化单词,第二个bolt来为单词计数,正如我们在图2-1中看到的那样。

图2-1 开始topology

你可以在https://https://www.sodocs.net/doc/018219391.html,/storm-book/examples-ch02-getting_started/zipball/master

下载示例源代码的ZIP文件。

如果你使用git(一个分布式的校正控制及源代码管理工具),你可以在你想

要下载的源代码的目录中运行git clone git@https://www.sodocs.net/doc/018219391.html,:storm-book/examplesch02-

getting_started.git。

验证Java安装

安装环境的第一步是验证你正在运行的java的版本。打开一个终端窗口,运行命令java –version。我们可以看到如下类似的信息:

java -version

java version "1.6.0_26"

Java(TM) SE Runtime Environment (build 1.6.0_26-b03)

Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)

如果没有,验证下你的java安装。(见https://www.sodocs.net/doc/018219391.html,/download/.)

创建工程

为开始这个工程,先建立一个用来存放应用的文件夹(就像对任何的java应用一样)。该文件夹包含工程的源代码。

接着我们需要下载storm的依赖包:一个我们将添加到应用类路径的jar包的集合。你可以用两种方式中的一种做这件事:

﹒下载依赖包,解压,添加到类路径。

﹒使用Apache Maven

maven是一套软件工程管理的工具。它可以被用来管理软件开发周期中的多个方面,从依赖到发布构建过程。在本书中我们会广泛的使用它。为验证是否已安装了maven,运行命令mvn。如果没有,可以从https://www.sodocs.net/doc/018219391.html,/download.html下载。尽管使用storm没有必要成为一个maven专家,但是知道maven是怎样工作的基础知识是有帮助的。你可以找到更多信息在Apache Maven的网站(https://www.sodocs.net/doc/018219391.html,/)。

为了定义工程的结构,我们需要建立一个pom.xml(工程对象模型)文件,该文件描述依赖,包,源码等。我们将使用依赖包及nathanmarz建立的maven库(https://https://www.sodocs.net/doc/018219391.html,/nathan marz/)。这些依赖可以在这里找到https://https://www.sodocs.net/doc/018219391.html,/nathanmarz/storm/wiki/Maven。

storm的maven依赖包引用了在本地模式运行storm所需的所有库函数。

使用这些依赖包,我们可以写一个包含运行topology基本的必要组件的pom.xml文件:

xmlns:xsi="https://www.sodocs.net/doc/018219391.html,/2001/XMLSchema-instance"

xsi:schemaLocation="https://www.sodocs.net/doc/018219391.html,/POM/4.0.0

https://www.sodocs.net/doc/018219391.html,/xsd/maven-4.0.0.xsd">

4.0.0

storm.book

Getting-Started

0.0.1-SNAPSHOT

org.apache.maven.plugins

maven-compiler-plugin

2.3.2

1.6

1.6

1.6

https://www.sodocs.net/doc/018219391.html,

https://www.sodocs.net/doc/018219391.html,/repo

storm

storm

0.6.0

前几行指定了工程的名字和版本。然后我们添加了一个编译器插件,该插件告诉maven我们的代码应该用Java1.6编译。接下来我们定义库(maven支持同一工程的多个库)。Clojars是storm 依赖包所在的库。Maven会自动下载本地模式运行storm所需的所有子依赖包。

应用有如下的结构,典型的maven java工程:

Java下的文件夹包含我们的源代码并且我们会将我们的单词文件放到resources文件夹中来处理。

mkdir –p 建立所有所需的父目录。

建立我们第一个Topology

为建立我们第一个topology,我们要创建运行单词计数的所有的类。或许示例的一些部分在目前阶段不是很清晰,我们将在后边的章节中解释它们。

Spout

WordReader spout是实现了IRichSpout接口的类。我们在第四章会看到更多的细节。WordReader负责读文件并且将每行提供给一个bolt。

一个spout发射一个定义的域的列表。这个架构允许你有多种bolt读取相同的spout流,然后这些bolt定义域供其他的bolt消费等等。

示例2-1包含了这个类的完整代码(我们在示例后分析代码的每个部分)。

Example 2-1. src/main/java/spouts/WordReader.java

package spouts;

import java.io.BufferedReader;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

public class WordReader implements IRichSpout {

private SpoutOutputCollector collector;

private FileReader fileReader;

private boolean completed = false;

private TopologyContext context;

public boolean isDistributed() {return false;}

public void ack(Object msgId) {

System.out.println("OK:"+msgId);

}

public void close() {}

public void fail(Object msgId) {

System.out.println("FAIL:"+msgId);

}

/**

* The only thing that the methods will do It is emit each

* file line

*/

public void nextTuple() {

/**

* The nextuple it is called forever, so if we have been readed the file

* we will wait and then return

*/

if(completed){

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

//Do nothing

}

return;

}

String str;

//Open the reader

BufferedReader reader = new BufferedReader(fileReader);

try{

//Read all lines

while((str = reader.readLine()) != null){

/**

* By each line emmit a new value with the line as a their

*/

this.collector.emit(new Values(str),str);

}

}catch(Exception e){

throw new RuntimeException("Error reading tuple",e);

}finally{

completed = true;

}

}

/**

* We will create the file and get the collector object

*/

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

try {

this.context = context;

this.fileReader = new FileReader(conf.get("wordsFile").toString());

} catch (FileNotFoundException e) {

throw new RuntimeException("Error reading file

["+conf.get("wordFile")+"]");

}

this.collector = collector;

}

/**

* Declare the output field "word"

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("line"));

}

}

在任何spout中都调用的第一个方法是void open(Map conf, TopologyContext

context, SpoutOutputCollector collector)。方法的参数是TopologyContext,它包含了所有的topology数据;conf对象,它在topology定义的时候被创建;SpoutOutputCollector,它使得我们可以发射将被bolt处理的数据。下面的代码是open方法的实现:

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

try {

this.context = context;

this.fileReader = new FileReader(conf.get("wordsFile").toString());

} catch (FileNotFoundException e) {

throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");

}

this.collector = collector;

}

在这个方法中,我们也创建了reader,它负责读文件。接着我们需要实现public void nextTuple(),在这个方法里我们发射将被bolt处理的值。在我们的例子中,这个方法读文件并且每行发射一个值。

public void nextTuple() {

if(completed){

try {

Thread.sleep(1);

} catch (InterruptedException e) {

//Do nothing

}

return;

}

String str;

BufferedReader reader = new BufferedReader(fileReader);

try{

while((str = reader.readLine()) != null){

this.collector.emit(new Values(str));

}

}catch(Exception e){

throw new RuntimeException("Error reading tuple",e);

}finally{

completed = true;

}

}

Values是ArrayList的一个实现,其中把list的元素传到了构造方法中。

nextTuple()方法在相同的循环中被周期性的调用,正如ack()和fail()方法。当没有工作要做时,必须释放对线程的控制这样其他的方法有机会被调用。所以nextTuple方法的第一行是检查处理是否完成了。如果已经完成,在返回前它会休眠至少一毫秒来降低处理器的负载。如果有工作要做,那么文件的每一行被读取为一个值并且发射。

元组(Tuple)是一个值的命名列表,它可以是任何类型的java对象(只要这个对象是可序列化的)。Storm在缺省的情况下可以序列化常用的类型例如strings,byte arrays,ArrayList,HashMap和HashSet。

Bolts

现在我们有了一个spout来读取文件并且每一行发射一个元组。我们需要建立两个bolt来处理元组(见图2-1)。这些bolts实现了backtype.storm.topology.IRichBolt接口。

Bolt最重要的方法是void execute(Tuple input),每收到一个元组调用一次。对于每个收到的元组,bolt会发射出一些元组。

bolt或者spout可以发射如所需一样多的元组。当nextTuple或execute方法被调用时,它们可能发射0个,1个或多个元组。你将在第五章了解到更多。

第一个bolt,WordNormalizer,负责获取行并且标准化行。它会将行分隔成单词,将单词转化成小写并且trim单词。

首先我们要声明bolt的输出参数:

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

这里我们声明bolt将输出一个域,名为word。

接着我们实现public void execute(Tuple input)方法,在这里输入的元组将被处理:public void execute(Tuple input) {

String sentence = input.getString(0);

String[] words = sentence.split(" ");

for(String word : words){

word = word.trim();

if(!word.isEmpty()){

word = word.toLowerCase();

//Emit the word

collector.emit(new Values(word));

}

}

// Acknowledge the tuple

collector.ack(input);

}

第一行读取元组的值。值可以通过位置或者名字读取。值被处理然后使用collector对象发射。在每个元组被处理完成后,collector的ack()方法被调用来表明处理被成功的完成。如果该元组不能被处理,应该调用collector的fail()方法。

示例2-2包含这个类的完整代码。

Example 2-2. src/main/java/bolts/WordNormalizer.java

package bolts;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class WordNormalizer implements IRichBolt {

private OutputCollector collector;

public void cleanup() {}

/**

* The bolt will receive the line from the

* words file and process it to Normalize this line

*

* The normalize will be put the words in lower case

* and split the line to get all words in this

*/

public void execute(Tuple input) {

String sentence = input.getString(0);

String[] words = sentence.split(" ");

for(String word : words){

word = word.trim();

if(!word.isEmpty()){

word = word.toLowerCase();

//Emit the word

List a = new ArrayList();

a.add(input);

collector.emit(a,new Values(word));

}

}

// Acknowledge the tuple

collector.ack(input);

}

public void prepare(Map stormConf, TopologyContext context,

OutputCollector collector) {

this.collector = collector;

}

/**

* The bolt will only emit the field "word"

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

在这个类中,我们看到了在一个单独的execute方法中发射多个元组的例子。如果方法收到句子This is the Storm book,在一个单独的execute方法中,它将发射五个元组。

下一个bolt,WordCounter,负责为单词计数。当topology结束的时候(cleanup()方法被调用时),我们会显示每个单词的计数。

这是一个bolt不发射任何东西的示例。在这个例子中,数据被加到一个map中,

但在实际中,bolt会将数据存到数据库中。

package bolts;

import java.util.HashMap;

import java.util.Map;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt {

Integer id;

String name;

Map counters;

private OutputCollector collector;

/**

* At the end of the spout (when the cluster is shutdown

* We will show the word counters

*/

@Override

public void cleanup() {

System.out.println("-- Word Counter ["+name+"-"+id+"] --");

for(Map.Entry entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue());

}

}

/**

* On each word We will count

*/

@Override

public void execute(Tuple input) {

String str = input.getString(0);

/**

* If the word dosn't exist in the map we will create

* this, if not We will add 1

*/

if(!counters.containsKey(str)){

counters.put(str, 1);

}else{

Integer c = counters.get(str) + 1;

counters.put(str, c);

}

//Set the tuple as Acknowledge

collector.ack(input);

}

/**

* On create

*/

@Override

public void prepare(Map stormConf, TopologyContext context,

OutputCollector collector) {

this.counters = new HashMap();

this.collector = collector;

https://www.sodocs.net/doc/018219391.html, = context.getThisComponentId();

this.id = context.getThisTaskId();

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {}

}

execute方法是用map来收集和计数单词。当topology结束的时候,cleanup()方法被调用并且打印出计数的map。(这只是一个示例,通常来讲你应该在cleanup()方法里关闭有效的连接和其他资源当topology关闭的时候。)

主类

在主类中,你会建立topology和LocalCluster对象,它使你可以在本地测试和调试topology。与Config对象结合,LocalCluster允许你尝试不同的集群配置。例如,当一个全局或者类变量被不慎使用时,在使用不同数量的workers配置来测试你的topology时你会找到这个错误。(在第三章你将看到更多关于config对象。)

所有的topology结点应该可以在进程间没有数据共享的情形下独立运行(例如,没有全局或类变量),因为topology在实际的集群中运行时,这些进程可能运行在不同的机器上。

你将使用TopologyBuilder创建topology,topology告诉storm结点是怎么安排的并且它们之间怎样交换数据。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word-reader",new WordReader());

builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("wordreader");

builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("wordnormalizer"); spout和bolts通过shuffleGroupings连接起来。这种分组(grouping)告诉storm在源结点和目标结点之间以随机分布的方式发送消息。

接着,建立包含topology配置的Config对象,该配置在运行时会被与集群的配置合并并且通过prepare方法发送到所有结点。

Config conf = new Config();

conf.put("wordsFile", args[0]);

conf.setDebug(true);

对将要被spout读取的文件名设置属性wordsFile,因为你在开发过程中所以debug属性为

true。当debug为true时,storm打印结点间交换的所有消息,debug数据对于理解topology 怎样运行是有用的。

正如前边提到的,你将使用LocalCluster来运行topology。在生产环境中,topology持续的运行,但对于这个例子你可以只运行topology几秒以便你可以查看结果。

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());

Thread.sleep(2000);

cluster.shutdown();

创建和运行topology使用createTopology和submitTopology,睡眠两秒(topology运行在不同的线程中),然后通过关闭集群来停止topology。

看示例2-3来把它放到一起。

Example 2-3. src/main/java/TopologyMain.java

import spouts.WordReader;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import bolts.WordCounter;

import bolts.WordNormalizer;

public class TopologyMain {

public static void main(String[] args) throws InterruptedException {

//Topology definition

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("word-reader",new WordReader());

builder.setBolt("word-normalizer", new WordNormalizer())

.shuffleGrouping("word-reader");

builder.setBolt("word-counter", new WordCounter(),2)

.fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration

Config conf = new Config();

conf.put("wordsFile", args[0]);

conf.setDebug(false);

//Topology run

conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("Getting-Started-Toplogie", conf,

builder.createTopology());

Thread.sleep(1000);

cluster.shutdown();

}

}

实践一下

你将要运行你的第一个topology ! 如果你已经建立了文件src/main/resources/

words.txt并且其中每行有一个单词,你可以用这个命令运行topology:

mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/

words.txt"

例如,你可以用下边的word.txt文件:

Storm

test

are

great

is

an

Storm

simple

application

but

very

powerful

really

Storm

is

great

在日志中,你应该看到类似如下的输出:

is: 2

application: 1

but: 1

great: 1

test: 1

simple: 1

Storm: 3

really: 1

are: 1

great: 1

an: 1

powerful: 1

very: 1

在这个示例中,你只使用了每个结点的一个单例。但如果你有一个非常大的日志文件呢?你可以轻易的改变系统中结点的数量来并行工作。在这个例子中,你可以建立两个WordCounter的实例:

builder.setBolt("word-counter", new WordCounter(),2)

.shuffleGrouping("word-normalizer");

如果你重新运行程序,你将看到:

-- Word Counter [word-counter-2] --

application: 1

is: 1

great: 1

are: 1

powerful: 1

Storm: 3

-- Word Counter [word-counter-3] --

really: 1

is: 1

but: 1

great: 1

test: 1

simple: 1

an: 1

very: 1

牛逼啊!改变并行度这么容易(在实际中,当然,每个实例运行在独立的机器中)。但看起来似乎有个问题:单词is和great在每个WordCounter实例中各被计算了一次。为什么呢?当你使用shuffleGrouping的时候,你告诉storm以随机分布的方式将每条消息发送至你的bolt 实例。在这个例子中,总是把相同的单词送到相同的WordCounter是更理想的。为了实现这个,你可以将shuffleGrouping("wordnormalizer")换成fieldsGrouping("word-normalizer",new Fields("word"))。尝试一下并且重新运行程序来验证结果。你会在后续的章节看到更多关于分组和消息流的内容。

结论

我们已经讨论了storm本地操作模式和远程操作模式的不同,以及用storm开发的强大和简便。你也学到了更多关于storm的基本概念,这些概念我们将在接下来的章节深入解释。

第三章Topologies

在本章中,你将看到怎样在一个storm topology的不同组件之间传递元组,以及怎样在一个运行的storm集群上部署topology。

流分组

在设计一个topology的时候,你需要做的最重要的事情是定义数据在组件之间怎样交换(流怎样被bolts消费)。流分组指定了每个bolt消费哪些流和这些流被怎样消费。

一个结点可以发射不止一条数据流。流分组允许我们选择接收哪些流。

相关主题