RabbitMQ是一款广为使用的企业级消息队列系统,采用erlang语言开发,支持多种语言的客户端访问,这也是我选择rabbitmq作为我接触的第一个MQ的原因。

在去年的项目,我使用Rabbit用来缓存生产者产生的消息,另一端采用java客户端来获取消息然后进行保存、过滤等处理,每天的消息量峰值在1000条/s,共3条队列,以消费者和生产者加起来只有不到40个,单机4核4G的Linux平台下,未做HA的情况下稳定运行了一年多,最多时曾缓存了近两亿条数据(由于消费者机器断电导致)。

随着业务不断复杂,采用自己开发的java客户端要处理多个并行的逻辑逐渐出现瓶颈和增加了开发的难度,于是,开始采用storm实时流式计算进行后端数据的过滤、合并、保存等处理。

于是在整个业务流程上,rabbitmq的消费者端即为整个storm topology的spout,我们一方面可以自己开发spout,并且在spout中维护rabbitmq的链接,可以采用spring-rabbit进行访问。另一方面,github上流行着一个storm-rabbitmq的开源程序,本文即是采用此方法来使用该框架来集成storm-rabbitmq.

github地址:https://github.com/ppat/storm-rabbitmq

clone到本地后,使用maven进行build并install到本地repository中。需要注意的是,我们需要pom.xml文件中以下项目:

1.rabbitmq版本

2.storm版本,注意现在的storm为apache下的框架
最终依赖列表如下:

[xml]
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
[/xml]

进入根目录,执行mvn package install,这样就生成了storm-rabbitmq-0.6.2-SNAPSHOT.jar包到本地repository中,我们可以在其他项目通过如下依赖引用:

[xml]
<dependency>
<groupId>io.latent</groupId>
<artifactId>storm-rabbitmq</artifactId>
<version>0.6.2-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
[/xml]

 

开发步骤:

引入storm-rabbitmq-x.jar之后,我们就可以进行开发了。

1.我们首先需要继承backtype.storm.spout.Scheme来反序列化RabbitMQ的消息。如下:

[java]
/**
* 自定义MQ消息的Schema
* @author adam
*
*/
public class MyCustomMessageScheme implements backtype.storm.spout.Scheme {

/**
* 把MQ中读取的消息反序列化
*/
public List<Object> deserialize(byte[] bytes) {
List objs = new ArrayList();

//直接反序列化为string
String str = new String(bytes);

//依次返回UUID,String,Number
objs.add(UUID.randomUUID().toString());
objs.add(str);
String numStr = Math.round(Math.random()*8999+1000)+"";
objs.add(numStr);

return objs;
}

/**
* 定义spout输出的Fileds
*/
public Fields getOutputFields() {
//依次返回UUID,String,Number,需要与上述返回的List列表一一对应
return new Fields("id", "str", "num");
}

}
[/java]

2.在Topology的Builder之前,我们就可以创建该Spout使用上面我们定义的schema,如下:

[java]
Scheme scheme = new MyCustomMessageScheme();
RabbitMQSpout spout = new RabbitMQSpout(scheme);
[/java]

3.创建rabbitmq的连接配置,实际开发过程中,连接参数可以放在resources目录创建一个properties目录,然后动态配置

[java]
ConnectionConfig connectionConfig = new ConnectionConfig("localhost", 5672, "guest", "guest", ConnectionFactory.DEFAULT_VHOST, 10); // host, port, username, password, virtualHost, heartBeat
ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig)
.queue("your.rabbitmq.queue")
.prefetch(200)
.requeueOnFail()
.build();
[/java]

这里值得注意的是requeueOnFail()如果打开,如果tuple后期处理失败或未发送ack消息,消息将会重新返回rabbitmq进行排队处理。如果关闭该选项,失败的消息将会从消息队列中移除并且发送RabbitMQ默认的dead letter exchange(需要在RabbitMQ中配置).

 

4.添加spout到TopologyBuilder中,设置MaxSpoutPending的值与RabbitMQ预取(prefetch)的值相同,注意MaxSpoutPending应该总是小于prefetch值。如下:

[java]
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("my-spout", spout)
.addConfigurations(spoutConfig.asMap())
.setMaxSpoutPending(200);
[/java]

这样就相当于spout这个水龙头就有了数据来源,后期我们可以创建bolt用来保存该值到hbase,mysql,redis等存储中,同时我们可以创建bolt用来进行过滤等操作。

后面我会将rabbitmq作为输入源,然后使用hbase进行保存,采用storm-hbase开源程序。

Storm-RabbitMQ的使用(一)
Tagged on:     

One thought on “Storm-RabbitMQ的使用(一)

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据