發布訂閱和點對點的代碼基本相同,只是修改一下目的地,myQueue改為myTopic,以及在創建目的地的時候,將createQueue改為createTopic。
1、在com.bjpowernode.activemq.send包下編寫一個消息發布者TopicPublisher發送消息
package com.bjpowernode.activemq.send;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicPublisher {
public static final String BROKER_URL = "tcp://192.168.235.128:61616";
//相當于一個數據庫
public static final String DESTINATION = "myTopic";
public static void main(String[] args) {
sendMessage();
}
public static void sendMessage(){
//1 .創建一個連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageProducer messageProducer = null;
try {
//2. 獲取一個連接
connection = connectionFactory.createConnection();
//3. 創建一個Session 第一個參數:是否是事務消息 第二個參數:消息確認機制(自動確認還是手動確認)
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4. 有了session之后,就可以創建消息,目的地,生產者和消費者
Message message = session.createTextMessage("Hello ActiveMQ");
//目的地
Destination destination = session.createTopic(DESTINATION);
//生產者
messageProducer = session.createProducer(destination);
//發消息 沒有返回值,是非阻塞的
messageProducer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}finally{
try {
if(messageProducer != null){
messageProducer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
}catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2、在com.bjpowernode.activemq.receive包下編寫一個消息訂閱者TopicSubcriber接收消息
package com.bjpowernode.activemq.receive;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicSubcriber {
public static final String BROKER_URL = "tcp://192.168.235.128:61616";
//相當于一個數據庫(其實是一個隊列)
public static final String DESTINATION = "myTopic";
public static void main(String[] args) {
receiveMessage();
}
public static void receiveMessage(){
//1 .創建一個連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//2. 獲取一個連接
connection = connectionFactory.createConnection();
//接收消息,需要將連接啟動一下,才可以接收到消息
connection.start();
//3. 創建一個Session 第一個參數:是否是事務消息 第二個參數:消息確認機制(自動確認還是手動確認)
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4. 有了session之后,就可以創建消息,目的地,生產者和消費者
//目的地
Destination destination = session.createTopic(DESTINATION);
//消費者
messageConsumer = session.createConsumer(destination);
//循環接收消息
while (true){
//接收消息 有返回值,是阻塞的
Message message = messageConsumer.receive();
//判斷消息類型
if(message instanceof TextMessage){
String text = ((TextMessage) message).getText();
System.out.println(text);
}
}
} catch (JMSException e) {
e.printStackTrace();
}finally{
try {
if(messageConsumer != null){
messageConsumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
}catch (JMSException e) {
e.printStackTrace();
}
}
}
}
3、消息訂閱者先運行,然后再運行消息發布者