C#:ActiveMQ(五)发布/订阅方式之订阅者

同系列文章:

程序界面:
ActiveMQ-Subscriber

源代码
FormSubscriber.cs

/* ----------------------------------------------------------
 * 文件名称:FormSubscriber.cs
 * 
 * 作者:秦建辉
 * 
 * QQ:36748897 
 * 
 * 博客:http://www.firstsolver.com/wordpress/
 * 
 * 开发环境:
 *      Visual Studio V2015
 *      .NET Framework 4.5.2
 *  
 *  版本历史:
 *      V1.0    2016年06月01日
 *              基于ActiveMQ实现发布/订阅消息传递域上的订阅者
 *                  
 * 消息接收流程:
 *      1.创建连接使用的工厂类IConnectionFactory
 *      2.使用管理对象IConnectionFactory建立连接IConnection,并启动
 *      3.使用连接IConnection建立会话ISession
 *      4.使用会话ISession和管理对象IDestination创建消息接收者IMessageConsumer
 *      5.实现消息接收者IMessageConsumer的Listener事件处理方法
 ------------------------------------------------------------ */
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;
using System.Windows.Forms;

namespace Com.FirstSolver
{
    public partial class FormSubscriber : Form
    {
        private const string DESTINATION = "Com.FirstSolver";
        private bool IsServerRunning = false;
        private IConnection Connection = null;
        private ISession Session = null;
        private IMessageConsumer Consumer = null;

        public FormSubscriber()
        {
            InitializeComponent();
        }

        // 清空
        private void buttonClearAll_Click(object sender, EventArgs e)
        {
            textBoxMessage.Clear();
        }

        // 启动/停止消息侦听
        private void buttonStart_Click(object sender, EventArgs e)
        {
            if (IsServerRunning)
            {
                if (Consumer != null)
                {
                    Consumer.Close();
                    Consumer = null;
                }

                if (Session != null)
                {
                    Session.Close();
                    Session = null;
                }

                if (Connection != null)
                {
                    Connection.Stop();  // 停止侦听
                    Connection.Close(); // 关闭连接

                    Connection = null;
                }

                IsServerRunning = false;
                buttonStart.Text = "Start Listener";
            }
            else
            {
                try
                {
                    // 创建公共消息连接工厂
                    IConnectionFactory Factory = new ConnectionFactory(textBoxURL.Text);
                    Connection = Factory.CreateConnection();

                    Connection.ClientId = "Com.FirstSolver.ActiveMQ.Listener";
                    Connection.Start(); // 开启侦听

                    Session = Connection.CreateSession();
                    Consumer = Session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(DESTINATION), "Customer", "filter='demo'", false);
                    Consumer.Listener += OnMessageReceived;
                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message, "错误", MessageBoxButtons.OK, MessageBoxIcon.Error);
                    return;
                }

                IsServerRunning = true;
                buttonStart.Text = "Stop Listener";
            }
        }

        private void OnMessageReceived(IMessage message)
        {
            if (message is ITextMessage)
            {
                this.BeginInvoke(new Action<string>((msg) => { textBoxMessage.AppendText(msg + "\r\n"); }), ((ITextMessage)message).Text);
            }
        }

        private void FormSubscriber_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (IsServerRunning)
            {
                if (Consumer != null)
                {
                    Consumer.Close();
                    Consumer = null;
                }

                if (Session != null)
                {
                    Session.Close();
                    Session = null;
                }

                if (Connection != null)
                {
                    Connection.Stop();  // 停止侦听
                    Connection.Close(); // 关闭连接

                    Connection = null;
                }

                IsServerRunning = false;
            }
        }
    }
}

Comments are closed.