同系列文章:
源代码
FormSubscriber.cs
/* ---------------------------------------------------------- * 文件名称:FormSubscriber.cs * * 作者:秦建辉 * * QQ:36748897 * * 博客:http://www.firstsolver.com/wordpress/ * * 开发环境: * Visual Studio V2015 * .NET Framework 4.5.2 * * 版本历史: * V1.0 2016年06月01日 * 基于ActiveMQ实现发布/订阅消息传递域上的订阅者 * * 消息接收流程: * 1.创建连接使用的工厂类IConnectionFactory * 2.使用管理对象IConnectionFactory建立连接IConnection,并启动 * 3.使用连接IConnection建立会话ISession * 4.使用会话ISession和管理对象IDestination创建消息接收者IMessageConsumer * 5.实现消息接收者IMessageConsumer的Listener事件处理方法 ------------------------------------------------------------ */ using Apache.NMS; using Apache.NMS.ActiveMQ; using System; using System.Windows.Forms; namespace Com.FirstSolver { public partial class FormSubscriber : Form { private const string DESTINATION = "Com.FirstSolver"; private bool IsServerRunning = false; private IConnection Connection = null; private ISession Session = null; private IMessageConsumer Consumer = null; public FormSubscriber() { InitializeComponent(); } // 清空 private void buttonClearAll_Click(object sender, EventArgs e) { textBoxMessage.Clear(); } // 启动/停止消息侦听 private void buttonStart_Click(object sender, EventArgs e) { if (IsServerRunning) { if (Consumer != null) { Consumer.Close(); Consumer = null; } if (Session != null) { Session.Close(); Session = null; } if (Connection != null) { Connection.Stop(); // 停止侦听 Connection.Close(); // 关闭连接 Connection = null; } IsServerRunning = false; buttonStart.Text = "Start Listener"; } else { try { // 创建公共消息连接工厂 IConnectionFactory Factory = new ConnectionFactory(textBoxURL.Text); Connection = Factory.CreateConnection(); Connection.ClientId = "Com.FirstSolver.ActiveMQ.Listener"; Connection.Start(); // 开启侦听 Session = Connection.CreateSession(); Consumer = Session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(DESTINATION), "Customer", "filter='demo'", false); Consumer.Listener += OnMessageReceived; } catch (Exception ex) { MessageBox.Show(ex.Message, "错误", MessageBoxButtons.OK, MessageBoxIcon.Error); return; } IsServerRunning = true; buttonStart.Text = "Stop Listener"; } } private void OnMessageReceived(IMessage message) { if (message is ITextMessage) { this.BeginInvoke(new Action<string>((msg) => { textBoxMessage.AppendText(msg + "\r\n"); }), ((ITextMessage)message).Text); } } private void FormSubscriber_FormClosing(object sender, FormClosingEventArgs e) { if (IsServerRunning) { if (Consumer != null) { Consumer.Close(); Consumer = null; } if (Session != null) { Session.Close(); Session = null; } if (Connection != null) { Connection.Stop(); // 停止侦听 Connection.Close(); // 关闭连接 Connection = null; } IsServerRunning = false; } } } }