这个是监听器
publicstaticvoidmain(String[]args)throwsJMSException{
ConnectionFactoryfactory=newActiveMQConnectionFactory("manage","123456","tcp://localhost:61616");
Connectionconnection=factory.createConnection();
connection.start();
finalSessionsession=connection.createSession(false/*支持事务*/,Session.AUTO_ACKNOWLEDGE);
Destinationqueue=AdvisorySupport.getProducerAdvisoryTopic(session.createTopic(">"));
MessageConsumerconsumer=session.createConsumer(queue);
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
System.out.println(message);
}
});
}
activemq.xml也配置了
"advisoryForConsumed="true">
随便一个订阅端
publicstaticvoidmain(String[]args)throwsJMSException,IOException{
//创建连接工厂
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("root","123456","tcp://127.0.0.1:61616");
//创建连接
Connectionconnection=connectionFactory.createConnection();
//启动
connection.start();
//创建session对象false表示是否自动提交;
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建队列des的子类!
Topictopic=session.createTopic("test");
//创建消费者
MessageConsumermessageConsumer=session.createConsumer(topic);
messageConsumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("PUB:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
});
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
}
这个订阅端建立时候监听器无法监听到SUB的连接信息
分 -->
|