同系列文章:
源代码
FormAdvisory.cs
/* ---------------------------------------------------------- * 文件名称:FormAdvisory.cs * * 作者:秦建辉 * * QQ:36748897 * * 博客:http://www.firstsolver.com/wordpress/ * * 开发环境: * Visual Studio V2015 * .NET Framework 4.5.2 * * 版本历史: * V1.0 2016年06月14日 * 基于Advisory Message实现监控 * * 参考资料: * http://activemq.apache.org/advisory-message.html ------------------------------------------------------------ */ using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.ActiveMQ.Commands; using System; using System.Windows.Forms; namespace Splash { public partial class FormAdvisory : Form { public const string QUEUE_ADVISORY_DESTINATION = "ActiveMQ.Advisory.Queue"; public const string TOPIC_ADVISORY_DESTINATION = "ActiveMQ.Advisory.Topic"; public const string TEMPQUEUE_ADVISORY_DESTINATION = "ActiveMQ.Advisory.TempQueue"; public const string TEMPTOPIC_ADVISORY_DESTINATION = "ActiveMQ.Advisory.TempTopic"; public const string ALLDEST_ADVISORY_DESTINATION = QUEUE_ADVISORY_DESTINATION + "," + TOPIC_ADVISORY_DESTINATION + "," + TEMPQUEUE_ADVISORY_DESTINATION + "," + TEMPTOPIC_ADVISORY_DESTINATION; private bool IsServerRunning = false; private IConnection Connection = null; private ISession Session = null; private IMessageConsumer Consumer = null; public FormAdvisory() { InitializeComponent(); } private void buttonClearAll_Click(object sender, EventArgs e) { textBoxTopic.Clear(); textBoxQueue.Clear(); textBoxTempTopic.Clear(); textBoxTempQueue.Clear(); } private void buttonStart_Click(object sender, EventArgs e) { if (IsServerRunning) { if (Consumer != null) { Consumer.Close(); Consumer = null; } if (Session != null) { Session.Close(); Session = null; } if (Connection != null) { Connection.Stop(); // 停止侦听 Connection.Close(); // 关闭连接 Connection = null; } IsServerRunning = false; buttonStart.Text = "Start Listener"; } else { try { // 创建公共消息连接工厂 IConnectionFactory Factory = new ConnectionFactory(textBoxURL.Text); Connection = Factory.CreateConnection(); Connection.Start(); // 开启侦听 Session = Connection.CreateSession(); Consumer = Session.CreateConsumer(Session.GetTopic(ALLDEST_ADVISORY_DESTINATION)); Consumer.Listener += OnMessageReceived; } catch (Exception ex) { MessageBox.Show(ex.Message, "错误", MessageBoxButtons.OK, MessageBoxIcon.Error); return; } IsServerRunning = true; buttonStart.Text = "Stop Listener"; } } private void OnMessageReceived(IMessage message) { this.BeginInvoke(new Action<IMessage>((msg) => { ActiveMQMessage amqMsg = msg as ActiveMQMessage; if (amqMsg.DataStructure != null) { DestinationInfo info = amqMsg.DataStructure as DestinationInfo; if (info != null) { if (info.Destination.IsTemporary) { if (info.Destination.IsTopic) { textBoxTempTopic.AppendText(info.Destination.ToString() + "\r\n"); } else { textBoxTempQueue.AppendText(info.Destination.ToString() + "\r\n"); } } else { if (info.Destination.IsTopic) { textBoxTopic.AppendText(info.Destination.ToString() + "\r\n"); } else { textBoxQueue.AppendText(info.Destination.ToString() + "\r\n"); } } } } }), message); } private void FormAdvisory_FormClosing(object sender, FormClosingEventArgs e) { if (IsServerRunning) { if (Consumer != null) { Consumer.Close(); Consumer = null; } if (Session != null) { Session.Close(); Session = null; } if (Connection != null) { Connection.Stop(); // 停止侦听 Connection.Close(); // 关闭连接 Connection = null; } IsServerRunning = false; buttonStart.Text = "Start Listener"; } } } }