C#:ActiveMQ(九)基于Advisory Message实现监控

同系列文章:

程序界面:
ActiveMQ-Advisory

源代码
FormAdvisory.cs

/* ----------------------------------------------------------
 * 文件名称:FormAdvisory.cs
 * 
 * 作者:秦建辉
 * 
 * QQ:36748897 
 * 
 * 博客:http://www.firstsolver.com/wordpress/
 * 
 * 开发环境:
 *      Visual Studio V2015
 *      .NET Framework 4.5.2
 *  
 *  版本历史:
 *      V1.0    2016年06月14日
 *              基于Advisory Message实现监控
 *              
 * 参考资料:
 *      http://activemq.apache.org/advisory-message.html
 ------------------------------------------------------------ */
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Windows.Forms;

namespace Splash
{
    public partial class FormAdvisory : Form
    {
        public const string QUEUE_ADVISORY_DESTINATION = "ActiveMQ.Advisory.Queue";
        public const string TOPIC_ADVISORY_DESTINATION = "ActiveMQ.Advisory.Topic";
        public const string TEMPQUEUE_ADVISORY_DESTINATION = "ActiveMQ.Advisory.TempQueue";
        public const string TEMPTOPIC_ADVISORY_DESTINATION = "ActiveMQ.Advisory.TempTopic";
        public const string ALLDEST_ADVISORY_DESTINATION = QUEUE_ADVISORY_DESTINATION + "," + TOPIC_ADVISORY_DESTINATION + "," + TEMPQUEUE_ADVISORY_DESTINATION + "," + TEMPTOPIC_ADVISORY_DESTINATION;

        private bool IsServerRunning = false;
        private IConnection Connection = null;
        private ISession Session = null;
        private IMessageConsumer Consumer = null;

        public FormAdvisory()
        {
            InitializeComponent();
        }

        private void buttonClearAll_Click(object sender, EventArgs e)
        {
            textBoxTopic.Clear();
            textBoxQueue.Clear();
            textBoxTempTopic.Clear();
            textBoxTempQueue.Clear();
        }

        private void buttonStart_Click(object sender, EventArgs e)
        {
            if (IsServerRunning)
            {
                if (Consumer != null)
                {
                    Consumer.Close();
                    Consumer = null;
                }

                if (Session != null)
                {
                    Session.Close();
                    Session = null;
                }

                if (Connection != null)
                {
                    Connection.Stop();  // 停止侦听
                    Connection.Close(); // 关闭连接

                    Connection = null;
                }

                IsServerRunning = false;
                buttonStart.Text = "Start Listener";
            }
            else
            {
                try
                {
                    // 创建公共消息连接工厂
                    IConnectionFactory Factory = new ConnectionFactory(textBoxURL.Text);
                    Connection = Factory.CreateConnection();
                    Connection.Start(); // 开启侦听
                    Session = Connection.CreateSession();
                    Consumer = Session.CreateConsumer(Session.GetTopic(ALLDEST_ADVISORY_DESTINATION));
                    Consumer.Listener += OnMessageReceived;
                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message, "错误", MessageBoxButtons.OK, MessageBoxIcon.Error);
                    return;
                }

                IsServerRunning = true;
                buttonStart.Text = "Stop Listener";
            }
        }

        private void OnMessageReceived(IMessage message)
        {
            this.BeginInvoke(new Action<IMessage>((msg) => {
                ActiveMQMessage amqMsg = msg as ActiveMQMessage;
                if (amqMsg.DataStructure != null)
                {
                    DestinationInfo info = amqMsg.DataStructure as DestinationInfo;
                    if (info != null)
                    {
                        if (info.Destination.IsTemporary)
                        {
                            if (info.Destination.IsTopic)
                            {
                                textBoxTempTopic.AppendText(info.Destination.ToString() + "\r\n");
                            }
                            else
                            {
                                textBoxTempQueue.AppendText(info.Destination.ToString() + "\r\n");
                            }
                        }
                        else
                        {
                            if (info.Destination.IsTopic)
                            {
                                textBoxTopic.AppendText(info.Destination.ToString() + "\r\n");
                            }
                            else
                            {
                                textBoxQueue.AppendText(info.Destination.ToString() + "\r\n");
                            }
                        }
                    }
                }
            }), message);
        }

        private void FormAdvisory_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (IsServerRunning)
            {
                if (Consumer != null)
                {
                    Consumer.Close();
                    Consumer = null;
                }

                if (Session != null)
                {
                    Session.Close();
                    Session = null;
                }

                if (Connection != null)
                {
                    Connection.Stop();  // 停止侦听
                    Connection.Close(); // 关闭连接

                    Connection = null;
                }

                IsServerRunning = false;
                buttonStart.Text = "Start Listener";
            }
        }
    }
}

Comments are closed.