`
身心不坚强
  • 浏览: 36116 次
  • 性别: Icon_minigender_2
  • 来自: 西安
社区版块
存档分类
最新评论

使用spring-rabbit插件实现RabbitMQ消息的发送和接收

 
阅读更多

    本文不介绍AMQP和RabbitMQ的基础知识,请参考链接: http://tracywen.iteye.com/blog/2183604 ,介绍的非常详细。

    本文主要通过一个小的demo,来举例说明如何使用spring-rabbit插件来实现RabbitMQ消息的发送和接收,发送端称为生产者,接收端称为消费者。

    1. 给pom.xml文件中添加rabbitmq相关依赖

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <spring.version>3.1.1.RELEASE</spring.version>
  <spring.rabbit.version>1.3.5.RELEASE</spring.rabbit.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring.version}</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>${spring.rabbit.version}</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>3.8.1</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
  </dependency>
  <dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>2.2.2</version>
  </dependency>
</dependencies>
    上述protobuf-java依赖用于序列化和反序列化RabbitMQ的消息。

    2. 生产者的xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
		http://www.springframework.org/schema/context 
		http://www.springframework.org/schema/context/spring-context-3.1.xsd
		http://www.springframework.org/schema/rabbit 
		http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">

	<context:property-placeholder location="classpath:rabbitmq.properties" />

	<!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 -->
	<context:component-scan base-package="com.tracy" />

	<!-- 创建rabbit ConnectionFactory -->
	<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" />

	<!-- 创建RabbitAdmin,用来管理exchange、queue、bindings -->
	<rabbit:admin id="containerAdmin" connection-factory="connectionFactory" />

	<!-- 指定protobuf为消息队列格式 -->
	<bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"></bean>

	<!-- 创建发送消息模板auditTemplate -->
	<rabbit:template id="auditTemplate" connection-factory="connectionFactory" exchange="${rabbitmq.exchange}" routing-key="${rabbitmq.routingKey}" message-converter="protoMessageConverter" />

</beans>

    上述配置文件中,<rabbit:template>中的exchange声明将消息发送到名为ui_ex_test的交换器,routing-key指定消息应当路由到名为audit的队列,message-converter指定使用protobuf作为数据的交换格式。

    连接RabbitMQ服务器的相关信息放到了rabbitmq.properties文件中,此文件位于src/main/resources的根目录下,具体内容为:

rabbitmq.host = 10.0.3.123
rabbitmq.username = guest
rabbitmq.password = guest

rabbitmq.exchange = ui_ex_test
rabbitmq.routingKey = audit

     

    3. 生产者和消费者共用的proto格式约定

    有关于protobuf的介绍,请参考本人的上一篇博文,地址http://tracywen.iteye.com/blog/2106402

person_msg.proto文件内容为:

package com.tracy.rabbitmq.proto;

option java_package = "com.tracy.rabbitmq.proto";

option java_outer_classname = "PersonMsgProtos";

message Person {
  // ID(必需)
  required int32 id = 1;
  // 姓名(必需)
  required string name = 2;
  // email(可选)
  optional string email = 3;
  // 朋友(集合)
  repeated string friends = 4;
}

     

    4. 生产者主函数

package com.tracy.server;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.tracy.rabbitmq.proto.PersonMsgProtos;

/**
 * 发送消息主函数
 * 
 * @author tracy_cui
 *
 */
public class Sender {
  public static void main(String[] args) throws Exception {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-sender.xml");
    RabbitTemplate template = (RabbitTemplate) context.getBean("auditTemplate");
    // 按照定义的Proto结构,创建一个Person 
    PersonMsgProtos.Person.Builder personBuilder = PersonMsgProtos.Person.newBuilder();
    personBuilder.setId(1);
    personBuilder.setName("tracy");
    personBuilder.setEmail("tracy_cui@xxx.com");
    personBuilder.addFriends("wang");
    personBuilder.addFriends("yang");
    PersonMsgProtos.Person person = personBuilder.build();
    // 将该Java对象发送给rabbit:template绑定的message-converter
    template.convertAndSend(person);
  }
}

     

    5. 消息格式转换插件protobuf messageconverter

    此插件由生产者和消费者公用,createMessage由生产者调用,convertProto2Object由消费者调用

package com.tracy.rabbitmq.converter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import com.google.protobuf.InvalidProtocolBufferException;
import com.tracy.rabbitmq.proto.PersonMsgProtos;
/**
 * 
 * ProtoBuf & object格式转换
 * 
 * @author tracy_cui
 *
 */
public class ProtobufMessageConverter extends AbstractMessageConverter{
  /**
   * object转换为ProtoBuf, 发送消息
  */
  @Override
  public Message createMessage(Object object, MessageProperties messageProperties) {
    System.out.println("发送转换的消息");
    PersonMsgProtos.Person person = (PersonMsgProtos.Person)object;
    byte[] byteArray = person.toByteArray();
    Message message = new Message(byteArray, messageProperties);
    return message;
  }
	
  @Override
  public Object fromMessage(Message message) throws MessageConversionException {
    return null;
  }
	
  /**
   * ProtoBuf转换为object, 接收消息 
  */
  public Object convertProto2Object(Message message) throws InvalidProtocolBufferException{
    byte[] byteArray = message.getBody();
    PersonMsgProtos.Person parsePerson = PersonMsgProtos.Person.parseFrom(byteArray);
    return parsePerson;
  }

}

 

    6. 消费者的xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
		http://www.springframework.org/schema/context 
		http://www.springframework.org/schema/context/spring-context-3.1.xsd
		http://www.springframework.org/schema/rabbit 
		http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd" >	
  <context:property-placeholder location="classpath:rabbitmq.properties" />
  <!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 -->
  <context:component-scan base-package="com.tracy" />
  <!-- 创建rabbit ConnectionFactory -->
  <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}" />
  <!-- 创建RabbitAdmin,用来管理exchange、queue、bindings -->
  <rabbit:admin id="containerAdmin" connection-factory="connectionFactory" />

  <!-- 声明队列 -->
  <rabbit:queue name="audit_queue" durable="false" exclusive="false" auto-delete="false" auto-declare="true"/>

  <!-- 声明direct类型的交换器 -->
  <rabbit:direct-exchange name="${rabbitmq.exchange}" durable="false" auto-delete="false" auto-declare="true">
    <!-- 将交换器与队列、路由key绑定 -->
    <rabbit:bindings>
      <rabbit:binding queue="audit_queue" key="${rabbitmq.routingKey}"></rabbit:binding>
    </rabbit:bindings>
  </rabbit:direct-exchange>

  <!-- 声明两个监听器 -->
  <bean id="auditListenerOne" class="com.tracy.rabbitmq.listener.AuditListenerOne" />
  <bean id="auditListenerTwo" class="com.tracy.rabbitmq.listener.AuditListenerTwo" />
  <!-- 指定protobuf为消息队列格式 -->
  <bean id="protoMessageConverter" class="com.tracy.rabbitmq.converter.ProtobufMessageConverter"></bean>
  
  <!-- 将两个监听器绑定到声明的队列中 -->
  <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" prefetch="1" message-converter="protoMessageConverter">
    <rabbit:listener ref="auditListenerOne" queue-names="audit_queue" />
    <rabbit:listener ref="auditListenerTwo" queue-names="audit_queue" />
  </rabbit:listener-container>
  <!-- 创建spring线程池,多线程对收到的数据进行处理 -->
  <task:executor id="MessageQueue-Executor" pool-size="2-5" queue-capacity="50" rejection-policy="CALLER_RUNS" keep-alive="2000"/>
  <task:annotation-driven executor="MessageQueue-Executor"/>
</beans>

    上述配置文件中,rabbit连接工厂的定义与生产者一致,消费者的配置与生产者的配置区别在于以下几点:

    a. 定义了名称为audit_queue的队列,声明队列的作用是消费exchange中的消息;

    b. 声明交换器,并与队列、路由key绑定,即将exchange收到的消息发送到bindkey=audit的队列中;

    c. 声明了两个监听器,用于监听audit_queue中的消息;

    d. 使用spring线程池对收到的数据进行处理。

 

    7. 消费者的监听器

package com.tracy.rabbitmq.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * 监听rabbitMQ消息
 * 
 * @author tracy_cui
 *
 */
public class AuditListenerOne implements MessageListener{
	
  private static final Logger logger = LoggerFactory.getLogger(AuditListenerOne.class);
	
  @Autowired
  private AuditListenerHandler auditListenerHandler;
	
  public AuditListenerOne() {
    logger.info("[****************] MessageQueue waiting for messages...");
  }
	
  @Override
  public void onMessage(Message message) {
    try {
      auditListenerHandler.handleMessage(message);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
	
}

 

    8. 消费者对收到的数据进行处理

package com.tracy.rabbitmq.listener;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import com.tracy.rabbitmq.converter.ProtobufMessageConverter;
import com.tracy.rabbitmq.proto.PersonMsgProtos;
import com.google.protobuf.InvalidProtocolBufferException;

/**
 * Message处理
 * 
 * @author tracy_cui
 *
 */
@Component
public class AuditListenerHandler {
	
  private static final Logger logger = LoggerFactory.getLogger(AuditListenerHandler.class);
	
  @Autowired
  private ProtobufMessageConverter messageConverter;
	
  /**
   * 使用Spring线程池
  */
  @Async
  public void handleMessage(Message message) throws Exception{
    logger.info("[****************] handleMessage thread : " + Thread.currentThread().getName());
    PersonMsgProtos.Person person = this.convertMessage(message);
    if(person  != null){
      System.out.println("id : " + person.getId());
      System.out.println("name : " + person.getName());
      System.out.println("email : " + person.getEmail());
      List<String> friendLists = person.getFriendsList();
      for(String friend : friendLists){
        System.out.println("friend :" + friend);
      }
    }
  }
	
  /**
   * 将ProtoBuf转换为Entity
  */
  private PersonMsgProtos.Person convertMessage(Message message){
    PersonMsgProtos.Person person = null;
    try {
      Object object = messageConverter.convertProto2Object(message);
      if(object instanceof PersonMsgProtos.Person){
        person = (PersonMsgProtos.Person)object;
      }else{
	logger.warn("[****************] object is not a instance of CreativeAuditProtos.ui_audit_t");
      }
    } catch (InvalidProtocolBufferException e) {
      logger.warn("[****************] convert message error, InvalidProtocolBuffer");
      e.printStackTrace();
    }
    return person;
  }
	
}

 

     消费者收到的数据截图:



 

 本项目完整代码已使用git托管,地址:https://coding.net/u/tracywen/p/RabbitMQ/git

 

  • 大小: 22.9 KB
分享到:
评论
2 楼 breadviking 2017-05-03  
breadviking 写道
谢谢 分享

只是有个疑问 为啥你这没有实现 ProtobufMessageConverter的fromMessage 方法
  @Override 
  public Object fromMessage(Message message) throws MessageConversionException { 
    return null; 
  } 
1 楼 breadviking 2017-05-03  
谢谢 分享

相关推荐

Global site tag (gtag.js) - Google Analytics