ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

RabbitMQ: Java code example

2022-05-09 11:34:23  阅读:176  来源: 互联网

标签:code Java String public IOException io import example channel


 

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>io.veer</groupId>
  <artifactId>rabbitmq</artifactId>
  <version>1.0-SNAPSHOT</version>


  <properties>
    <java.version>17</java.version>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <version>5.8.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.14.2</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.36</version>
    </dependency>
  </dependencies>

</project>

 

Provider:

package io.veer;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TestProvider{
  public static void main(String[] args) throws TimeoutException, IOException{
    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.8.105");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("venal");

    // 创建connection
    Connection connection = connectionFactory.newConnection();

    // 通过connection创建channel
    Channel channel = connection.createChannel();

    /**
     * 通道声明消息队列, 如果队列已经存在, 在queueDeclare必须和已存在的queue完全一致
     * param1: 队列名, 不存在自动创建
     * param2: durable 是否持久化队列, 队列已存在为durable, 则设为false报错, durable仅保证queue重启不丢失, 若要保证数据也不丢失, 发布消息时设置MessageProperties.PERSISTENT_TEXT_PLAIN
     * param3: exclusive 是否独占队列
     * param4: autoDelete 是否在消费完成自动删除队列, 生产者 & 消费者queueDeclare声明必须相同, 消费者消费完线程退出, autoDelete才会生效
     * param5: 额外参数
     * 生产者 & 消费者 queueDeclare必须完全相同
     */
    channel.queueDeclare("veneer", true, false, false, null);

    /**
     * param1: exchange name  "" 表示 default exchange
     * param2: queue name
     * param3: extra config
     * param4: message body 二进制流
     */
    for(int i = 0; i < 5; i++){
      // 向不存在的queue, 发布消息, 则消息被忽略
      // channel绑定了VirtualHost, 向不同VirtualHost发布消息, 则消息被忽略. 必须向绑定的VirtualHost发布消息
      channel.basicPublish("", "veneer", MessageProperties.PERSISTENT_TEXT_PLAIN, "venal rabbitmq".getBytes());
    }

    channel.close();
    connection.close();
  }
}

 

Consumer:

package io.veer;


import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TestConsumer{
  public static void main(String[] args) throws IOException, TimeoutException{
    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.8.107");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("venal");

    // 创建connection
    Connection connection = connectionFactory.newConnection();

    // 通过connection创建channel
    Channel channel = connection.createChannel();

    /**
     * 如果通道声明的queue, 则queue必须和已存在的queue完全一致
     * queueDeclare声明的queue和basicConsume消费的queue可不相同
     * 声明不存在的queue, 会自动创建queue
     */
    channel.queueDeclare("ruzz", false, false, false, null);

    /**
     * param1: 消费的queue name, queue不存在, 报404异常, no queue 'veneer' in vhost 'venal'
     * param2: autoAck
     * param3: 回调接口
     */
    channel.basicConsume("veneer", true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        try{
          Thread.sleep(500);  // 每秒消费一次
        }catch(InterruptedException e){
          throw new RuntimeException(e);
        }
        System.out.println("\033[37;7m" + String.format("Consumer  Message: %s, Thread: %s", new String(body, StandardCharsets.UTF_8), Thread.currentThread()) + "\033[0m");
      }
    });

    /**
     * 都未关闭, 一直阻塞消费
     * connection未关闭, channel.close() 消费一次, 阻塞
     * channel未关闭, connection.close() 不消费, 直接退出
     * 都关闭, 不消费, 直接退出
     */
    // channel.close();  // 关闭后, 阻塞, 但是无法获取message, 默认一直运行接收message
    // connection.close();  // 多线程, 关闭后程序会结束

    // 创建了新线程消费, 会先打印
    System.out.println("\033[37;7m" + Thread.currentThread() + "\033[0m");
  }

}

 

RabbitMQ工具类:

package io.veer.util;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtil{
  private static final ConnectionFactory connectionFactory;

  static{
    connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.8.105");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("venal");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
  }

  public static Connection getConnection(){
    try{
      return connectionFactory.newConnection();
    }catch(RuntimeException | IOException | TimeoutException e){
      e.printStackTrace();
    }
    return null;
  }

  public static void closeConnectionAndChannel(Connection connection, Channel channel){
    try{
      channel.close();
      connection.close();  // 会自动关闭queueDeclare的通道
    }catch(IOException | TimeoutException e){
      throw new RuntimeException(e);
    }
  }
}

 

Worker:

Provider

package io.veer.workerqueue;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.queueDeclare("worker", true, false, false, null);

    for(int b = 0; b < 100; b++){
      channel.basicPublish("", "worker", MessageProperties.PERSISTENT_TEXT_PLAIN, (b + " worker queue").getBytes(StandardCharsets.UTF_8));
    }

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

Consumer

package io.veer.workerqueue;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer1{
  public static void main(String[] args) throws IOException{

    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.queueDeclare("worker", true, false, false, null);
    channel.basicQos(1);  // 一次只接受一条未确认消息, 否则接受全部消息, 清空server的queue
    // 关闭自动确认, 没有确认不能接受下一条message, Unacked + 1
    channel.basicConsume("worker", false, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        try{
          System.out.println("\033[37;7m" + new String(body) + "\033[0m");
          channel.basicAck(envelope.getDeliveryTag(), false);  // false: 每次确认一条
          Thread.sleep(1000);
        }catch(Exception e){
          throw new RuntimeException(e);
        }
      }
    });
  }

}
package io.veer.workerqueue;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer2{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.queueDeclare("worker", true, false, false, null);
    channel.basicQos(1);
    channel.basicConsume("worker", false, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        try{
          System.out.println("\033[37;7m" + new String(body) + "\033[0m");
          channel.basicAck(envelope.getDeliveryTag(), false);
          Thread.sleep(500);
        }catch(IOException | InterruptedException e){
          throw new RuntimeException(e);
        }
      }
    });
  }
}

 

Exchange Fanout 没有routing key:

provider:

package io.veer.fanout;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();

    Channel channel = Objects.requireNonNull(connection).createChannel();

    /**
     * 声明通道为exchange
     * param1: exchange name
     * param1: exchange type
     */
    channel.exchangeDeclare("log_fanout", "fanout");

    /**
     * 发送消息
     * param1: exchange name
     * param2: routingKey fanout类型中无意义
     */
    channel.basicPublish("log_fanout", "", null, "exchange: logs, type: fanout".getBytes(StandardCharsets.UTF_8));

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

Consumer

package io.veer.fanout;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Consumer{
  public static void main(String[] args) throws IOException{
    for(int i = 0; i < 5; i++){
      new Thread(() -> {
        try{
          new Consumer().consume();
        }catch(IOException e){
          throw new RuntimeException(e);
        }
      }).start();
    }
  }

  public void consume() throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    // 通道声明为exchange
    channel.exchangeDeclare("log_fanout", "fanout");

    // 临时队列
    String queue = channel.queueDeclare().getQueue();

    channel.queueBind(queue, "log_fanout", "");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        String format = String.format("%s %s", Thread.currentThread(), new String(body, StandardCharsets.UTF_8));
        System.out.println("\033[37;7m" + format + "\033[0m");
      }
    });
  }
}

 

Exchange Direct 通过routing key转发至不同queue

provider:

package io.veer.direct;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();
    channel.exchangeDeclare("log_direct", "direct");

    String routingKey = "error";
    String body = String.format("routingKey: %s, %s", routingKey, Thread.currentThread().getName());
    channel.basicPublish("log_direct", routingKey, null, body.getBytes(StandardCharsets.UTF_8));

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

consumer

package io.veer.direct;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer1{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "log_direct";

    channel.exchangeDeclare(exchangeName, "direct");

    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, exchangeName, "info");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + String.format("Consumer1 %s", new String(body)) + "\033[0m");
      }
    });
  }
}
package io.veer.direct;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer2{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "log_direct";
    channel.exchangeDeclare(exchangeName, "direct");

    String queue = channel.queueDeclare().getQueue();

    channel.queueBind(queue, exchangeName, "info");
    channel.queueBind(queue, exchangeName, "error");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + String.format("Consumer2 %s", new String(body)) + "\033[0m");
      }
    });
  }
}

 

Exchange Topic routing key使用通配符 * #:

provider

package io.veer.topic;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class Provider{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "topic";
    channel.exchangeDeclare(exchangeName, "topic");

    String routingKey = "user.insert.ruzz";

    channel.basicPublish(exchangeName, routingKey, null, String.format("Provider routingKey: %s", routingKey).getBytes(StandardCharsets.UTF_8));

    RabbitMQUtil.closeConnectionAndChannel(connection, channel);
  }
}

Consumer

package io.veer.topic;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer1{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "topic";
    channel.exchangeDeclare(exchangeName, "topic");

    String queue = channel.queueDeclare().getQueue();
    channel.queueBind(queue, exchangeName, "user.*");
    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + new String(body) + "\033[0m");
      }
    });
  }
}
package io.veer.topic;


import com.rabbitmq.client.*;
import io.veer.util.RabbitMQUtil;

import java.io.IOException;
import java.util.Objects;

public class Consumer2{
  public static void main(String[] args) throws IOException{
    Connection connection = RabbitMQUtil.getConnection();
    Channel channel = Objects.requireNonNull(connection).createChannel();

    String exchangeName = "topic";
    channel.exchangeDeclare(exchangeName, "topic");

    String queue = channel.queueDeclare().getQueue();

    channel.queueBind(queue, exchangeName, "user.#");

    channel.basicConsume(queue, true, new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println("\033[37;7m" + new String(body) + "\033[0m");
      }
    });
  }
}

 

 

Springboot:

依赖

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 192.168.8.105
    port: 5672
    virtual-host: veil
    username: veil
    password: veil

 

测试类:

package io.veer.rabbit;


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest(classes = RabbitApplication.class)
public class TestRabbitMQ{
  @Resource
  private RabbitTemplate rabbitTemplate;

  @Test
  public void tun(){
    rabbitTemplate.convertAndSend("tun", "tun");
  }

  @Test
  public void worker(){
    for(int i = 0; i < 10; i++){
      rabbitTemplate.convertAndSend("worker", "worker " + i);
    }
  }

  @Test
  public void fanout(){
    rabbitTemplate.convertAndSend("fanout", "", "fanout");
  }

  @Test
  public void direct(){
    rabbitTemplate.convertAndSend("direct", "error", "redict");
  }

  @Test
  public void topic(){
    rabbitTemplate.convertAndSend("topic", "user.save.insert", "topic");
  }
}

 

consumer类:

package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "tun", durable = "false", autoDelete = "true"))
public class Tun{
  @RabbitHandler
  public void tun(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Worker{
  @RabbitListener(queuesToDeclare = @Queue(value = "worker"))  // 直接加到方法上, 代替RabbitHandler
  public void worker1(String message){
    System.out.println("\033[37;7m" + "worker1: " + message + "\033[0m");
  }

  @RabbitListener(queuesToDeclare = @Queue(value = "worker"))
  public void worker2(String message){
    System.out.println("\033[37;7m" + "worker2: " + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Fanout{
  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "fanout", type = "fanout"))})
  public void fanout1(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }

  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "fanout", type = "fanout"))})
  public void fanout2(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Direct{
  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info"})})
  public void direct1(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }

  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info", "error"})})
  public void direct2(String message){
    System.out.println("\033[37;7m" + message + "\033[0m");
  }
}
package io.veer.rabbit.consumer;


import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Topic{
  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.*"})})
  public void topic1(String msg){
    System.out.println("\033[37;7m" + msg + "\033[0m");
  }

  @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.#"})})
  public void topic2(String msg){
    System.out.println("\033[37;7m" + msg + "\033[0m");
  }
}

 

标签:code,Java,String,public,IOException,io,import,example,channel
来源: https://www.cnblogs.com/dissipate/p/16248492.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有