SpringBoot(二十二)集成RabbitMQ---MQ实战练习训练_玖富


玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

RabbitMQ是一个在AMQP基础上完成的,可复用的企业音讯体系。他遵照Mozilla Public License开源协定。RabbitMQ是盛行的开源音讯行列体系,用erlang言语开辟。RabbitMQ是AMQP(高等音讯行列协定)的规范完成。

音讯中间件的事情历程可以或许用生产者消耗者模子来透露表现.即,生产者赓续的向音讯行列发送信息,而消耗者从音讯行列中消耗信息.

如果你还没有装置rabbitmq的,可以或许看看这篇《centos装置MQ》

不说了不说了,来一张图直接了当的看看MQ事情的详细历程:

残局一张图 故事端赖编.从上图可看出,关于音讯行列来讲,生产者,音讯行列,消耗者是最重要的三个观点,生产者发音讯到音讯行列中去,消耗者监听指定的音讯行列,而且当音讯行列收到音讯以后,吸收音讯行列传来的音讯,而且赋予响应的处置惩罚.音讯行列常用于分布式体系之间相互信息的通报.

v基础观点

关于RabbitMQ来讲,除这三个基础模块之外,还增加了一个模块,即交流机(Exchange).它使得生产者和音讯行列之间产生了断绝,生产者将音讯发送给交流机,而交流机则根据调理战略把响应的音讯转发给对应的音讯行列.那末RabitMQ的事情流程以下所示:

关于rabbitmq几个基础名词的引见:

Broker: 简朴来讲就是音讯行列服务器实体。 Exchange: 音讯交流机,它指定音讯按甚么划定规矩,路由到哪一个行列。 Queue: 音讯行列载体,每一个音讯都会被投入到一个或多个行列。 Binding: 绑定,它的作用就是把exchange和queue依照路由划定规矩绑定起来。 Routing Key: 路由关键字,exchange根据这个关键字举行音讯投递。 vhost: 虚拟主机,一个broker里可以或许开设多个vhost,用作分歧用户的权限星散。 producer: 音讯生产者,就是投递音讯的顺序。 consumer: 音讯消耗者,就是接收音讯的顺序。 channel: 音讯通道,在客户端的每一个衔接里,可竖立多个channel,每一个channel代表一个会话义务。

交流机的重要作用是吸收响应的音讯而且绑定到指定的行列.交流机有四种范例,分别为Direct,topic,headers,Fanout:

Direct: 处置惩罚路由键。须要将一个行列绑定到交流机上,要求该音讯与一个特定的路由键完全婚配。这是一个完全的婚配。如果一个行列绑定到该交流机上要求路由键 “demo”,则只要被标记为“demo”的音讯才被转发,不会转发demo.ooo,也不会转发test.123,只会转发demo。 Topic: 转发信息重如果根据通配符,将路由键和某情势举行婚配。此时行列须要绑定要一个情势上。标记“#”婚配一个或多个词,标记“*”婚配不多不少一个词。因而“audit.#”可以或许婚配到“audit.irs.corporate”,然则“audit.*” 只会婚配到“audit.irs”。 Headers: 根据一个划定规矩举行婚配,在音讯行列和交流机绑定的时刻会指定一组键值对划定规矩,而发送音讯的时刻也会指定一组键值对划定规矩,当两组键值对划定规矩相婚配的时刻,音讯会被发送到婚配的音讯行列中. Fanout: 路由播送的情势,将会把音讯发给绑定它的悉数行列,即使设置了key,也会被疏忽.

v实战练习训练

♛ 2.1 建立MQ

注:如果现有工程引入MQ,则增加Maven援用。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

这里我们连续之前springboot系列博文中的例子hellospringboot,在已有项目中增加mq的Maven援用。

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。- ♛ 2.2 application.properties

在application.properties文件傍边引入RabbitMQ基础的设置装备摆设信息

# ----- MQ -------- #
spring.rabbitmq.host=192.168.11.108
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
♛ 2.3 增加实体类MyModel
package com.demo.mq.model;

import java.io.Serializable;
import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
public class MyModel implements Serializable {
    private static final long serialVersionUID = 1L;
    private UUID id;
    private String info;

    public UUID getId() {
        return id;
    }

    public void setId(UUID id) {
        this.id = id;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}
♛ 2.4 增加RabbitConfig
package com.demo.mq.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * Created by toutou on 2019/1/1.
 */
@Configuration
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;


    public static final String EXCHANGE_A = "my-mq-exchange_A";
    public static final String EXCHANGE_B = "my-mq-exchange_B";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 针对消耗者设置装备摆设
     * 1. 设置交流机范例
     * 2. 将行列绑定到交流机
     FanoutExchange: 将音讯分发到一切的绑定行列,无routingkey的观点
     HeadersExchange :经由过程增加属性key-value婚配
     DirectExchange:依照routingkey分发到指定行列
     TopicExchange:多关键字婚配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_A);
    }

    /**
     * 猎取行列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //行列耐久
    }

    /**
     * 猎取行列B
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //行列耐久
    }

    /**
     * 把交流机,行列,经由过程路由关键字举行绑定
     * @return
     */
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    /**
     * 一个交流机可以或许绑定多个音讯行列,也就是音讯经由过程一个交流机,可以或许分发到分歧的行列傍边去。
     * @return
     */
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
    }

}
♛ 2.5 增加音讯的生产者MyProducer
package com.demo.mq.producer;

import com.demo.mq.common.RabbitConfig;
import com.demo.mq.model.MyModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by toutou on 2019/1/1.
 */
@Component
public class MyProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //因为rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,以是不克不及自动注入
    private RabbitTemplate rabbitTemplate;

    /**
     * 组织要领注入rabbitTemplate
     */
    @Autowired
    public MyProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是末了设置的内容
    }

    public void sendMsg(MyModel model) {
        //把音讯放入ROUTINGKEY_A对应的行列傍边去,对应的是行列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model);
    }

    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:"   correlationData);
        if (ack) {
            logger.info("音讯胜利消耗");
        } else {
            logger.info("音讯消耗失利:"   cause);
        }
    }
}
♛ 2.6 增加音讯的消耗者MyReceiver
package com.demo.mq.receiver;

import com.demo.mq.common.RabbitConfig;
import com.demo.mq.model.MyModel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by toutou on 2019/1/1.
 */
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MyReceiver {
    @RabbitHandler
    public void process(MyModel model) {
        System.out.println("吸收处置惩罚行列A傍边的音讯: "   model.getInfo());
    }
}
♛ 2.7 增加MyMQController
package com.demo.controller;

import com.demo.mq.model.MyModel;
import com.demo.mq.producer.MyProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
@RestController
@Slf4j
public class MyMQController {
    @Autowired
    MyProducer myProducers;

    @GetMapping("/mq/producer")
    public String myProducer(String content){
        MyModel model = new MyModel();
        model.setId(UUID.randomUUID());
        model.setInfo(content);
        myProducers.sendMsg(model);
        return "已发送:"   content;
    }
}
♛ 2.8 项目团体目次

 

♛ 2.9 调试

2.9.1 在页面中要求http://localhost:8081/mq/producer?content=hello rabbitmq

2.9.2 检察http://ip:15672/#/queues的转变

关于RabbitMQ Management有疑问的,可以或许看上篇博文。《浅谈RabbitMQ Management》

 

2.9.3 检察消耗者日记纪录

如许一个完全的rabbitmq实例就有了。

v源码地点

https://github.com/toutouge/javademo/tree/master/hellospringboot


作  者:请叫我头头哥
出  处:http://www.cnblogs.com/toutou/
关于作者:专注于基础平台的项目开辟。若有题目或发起,请多多见教!
版权声明:本文版权归作者和博客园共有,迎接转载,但未经作者赞同必需保存此段声明,且在文章页面显着地位给出原文链接。
特此声明:一切批评和私信都会在第一时间复兴。也迎接园子的大大们斧正毛病,共同进步。或许直接私信
支援博主:如果您以为文章对您有资助,可以或许点击文章右下角引荐一下。您的勉励是作者对峙原创和延续写作的最大动力!

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。