C#:ActiveMQ(七)请求/响应方式之响应者

同系列文章:

程序界面:
ActiveMQ-Responder

源代码
FormReponser.cs

/* ----------------------------------------------------------
 * 文件名称:FormReponser.cs
 * 
 * 作者:秦建辉
 * 
 * QQ:36748897 
 * 
 * 博客:http://www.firstsolver.com/wordpress/
 * 
 * 开发环境:
 *      Visual Studio V2015
 *      .NET Framework 4.5.2
 *  
 *  版本历史:
 *      V1.0    2016年06月03日
 *              基于ActiveMQ实现响应者
 *              
 * 参考资料:
 *      http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
 ------------------------------------------------------------ */
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;
using System.Windows.Forms;

namespace Com.FirstSolver
{
    public partial class FormResponder : Form
    {
        private bool IsServerRunning = false;
        private IConnection Connection = null;
        private ISession Session = null;
        private IMessageProducer Producer = null;
        private IMessageConsumer Consumer = null;

        private const string DESTINATION = "Com.FirstSolver.Hotline";
        private IDestination NMSDestination = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(DESTINATION);

        public FormResponder()
        {
            InitializeComponent();
        }

        // 清空
        private void buttonClearAll_Click(object sender, EventArgs e)
        {
            textBoxMessage.Clear();
        }

        // 启动/停止消息侦听
        private void buttonStart_Click(object sender, EventArgs e)
        {
            if (IsServerRunning)
            {
                MQClose();
            }
            else
            {
                try
                {
                    IConnectionFactory Factory = new ConnectionFactory(textBoxURL.Text);
                    Connection = Factory.CreateConnection();

                    Connection.ClientId = "Com.FirstSolver.ActiveMQ.Responder";
                    Connection.Start(); // 开启侦听

                    Session = Connection.CreateSession();

                    // 创建生产者
                    Producer = Session.CreateProducer();

                    // 创建消费者
                    Consumer = Session.CreateConsumer(NMSDestination);
                    Consumer.Listener += OnMessageReceived;
                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message, "异常", MessageBoxButtons.OK, MessageBoxIcon.Error);
                    MQClose();
                    return;
                }

                IsServerRunning = true;
                buttonStart.Text = "Stop Listener";
            }
        }

        private void OnMessageReceived(IMessage message)
        {
            if (message is ITextMessage)
            {
                this.Invoke(new Action<ITextMessage>((msg) => {
                    textBoxMessage.AppendText("请求-消息编号:" + msg.NMSMessageId + "\r\n");
                    textBoxMessage.AppendText("请求-关联编号:" + msg.NMSCorrelationID + "\r\n");
                    textBoxMessage.AppendText("请求-消息内容:" + msg.Text + "\r\n");

                    // 回复消息
                    ITextMessage ReplyMessage = Producer.CreateTextMessage();
                    ReplyMessage.Text = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                    ReplyMessage.NMSCorrelationID = msg.NMSCorrelationID;

                    Producer.Send(msg.NMSReplyTo, ReplyMessage, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);

                    // 显示发送内容
                    textBoxMessage.AppendText("应答-消息编号:" + ReplyMessage.NMSMessageId + "\r\n");
                    textBoxMessage.AppendText("应答-关联编号:" + ReplyMessage.NMSCorrelationID + "\r\n");
                    textBoxMessage.AppendText("应答-消息内容:" + ReplyMessage.Text + "\r\n\r\n");
                }), message as ITextMessage);
            }
        }

        private void FormConsumer_FormClosing(object sender, FormClosingEventArgs e)
        {
            MQClose();
        }

        private void MQClose()
        {
            this.Invoke(new Action(() => {
                if (IsServerRunning)
                {
                    if (Consumer != null) { Consumer.Close(); Consumer = null; }
                    if (Producer != null) { Producer.Close(); Producer = null; }
                    if (Session != null) { Session.Close(); Session = null; }
                    if (Connection != null) { Connection.Stop(); Connection.Close(); Connection = null; }

                    IsServerRunning = false;
                    buttonStart.Text = "Start Listener";
                }
            }));
        }
    }
}

Comments are closed.