Java MQ使用指南:从基础到实践
MQ概述与核心概念
消息队列(Message Queue,简称MQ)是一种应用间的通信方式,通过异步消息传递实现系统解耦、流量削峰和最终一致性,在Java生态中,主流MQ产品包括RabbitMQ、RocketMQ、Kafka及ActiveMQ等,它们各自适用于不同场景:RabbitMQ擅长高并发小消息,RocketMQ支持事务消息,Kafka适合大数据流处理,ActiveMQ则注重通用性。

使用MQ的核心价值在于:
- 解耦:生产者与消费者无需直接通信,降低系统依赖。
- 异步:非阻塞式消息处理,提升系统吞吐量。
- 削峰:突发流量暂存于队列,避免系统过载。
- 可靠:持久化机制确保消息不丢失。
主流MQ产品对比与选型
| 特性 | RabbitMQ | RocketMQ | Kafka | ActiveMQ |
|---|---|---|---|---|
| 协议支持 | AMQP, MQTT, STOMP | 自定义协议 | 自定义协议 | OpenWire, STOMP |
| 吞吐量 | 中(万级/秒) | 高(十万级/秒) | 极高(百万级/秒) | 中(万级/秒) |
| 消息延迟 | 毫秒级 | 毫秒级 | 毫秒级至秒级 | 毫秒级 |
| 事务消息 | 支持 | 原生支持 | 不支持 | 支持 |
| 适用场景 | 电商、金融 | 分布式事务 | 日志收集、流处理 | 企业应用集成 |
选型建议:
- 高并发、强事务场景优先选RocketMQ;
- 大数据流处理选Kafka;
- 需要灵活协议支持选RabbitMQ。
RabbitMQ的Java实践
RabbitMQ基于AMQP协议,通过Exchange(交换器)和Queue(队列)路由消息,以下是关键步骤:

环境准备
引入Maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
生产者发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列(持久化、非独占、自动删除)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello RabbitMQ!";
// 发送消息(持久化)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者接收消息
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列(确保存在)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消费消息(自动确认)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
高级特性
- 消息持久化:设置
deliveryMode=2,结合队列持久化。 - 消息确认:消费者手动确认(
channel.basicAck)避免消息丢失。 - 发布订阅:通过
Exchange(如fanout、topic)实现多播路由。
RocketMQ的Java实践
RocketMQ是阿里巴巴开源的分布式消息中间件,支持事务消息和顺序消息。
依赖引入
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.1.0</version>
</dependency>
发送同步消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class RocketMQProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
producer.shutdown();
}
}
消费消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
核心特性
- 事务消息:通过
TransactionMQProducer实现本地事务与消息发送的原子性。 - 顺序消费:设置
MessageListenerOrderly保证消息有序性。 - 消息过滤:通过SQL92或Tag表达式过滤消息。
MQ使用最佳实践
- 消息可靠性:
- 生产者:开启消息持久化,使用同步发送或事务消息。
- 消费者:关闭自动确认,改为手动确认,结合重试机制。
- 性能优化:
- 批量发送消息减少网络开销。
- 合理设置队列分区数(如Kafka的
partition)提升并行度。
- 异常处理:
- 捕获
MQClientException等异常,实现重试或降级逻辑。 - 监控消息堆积(如RabbitMQ的
messages_ready),及时扩容。
- 捕获
- 监控与运维:
- 使用Prometheus+Grafana监控MQ指标(如消息延迟、吞吐量)。
- 定期清理过期消息,避免磁盘占用过高。
Java中使用MQ需根据业务场景选型,掌握核心API(如生产者/消费者创建、消息发送/接收),并注重消息可靠性、性能及异常处理,通过合理设计消息路由策略(如RabbitMQ的Exchange类型、RocketMQ的Tag过滤),可充分发挥MQ在分布式系统中的价值,构建高可用、高性能的架构。




















