同系列文章:
源代码
FormReponser.cs
/* ---------------------------------------------------------- * 文件名称:FormReponser.cs * * 作者:秦建辉 * * QQ:36748897 * * 博客:http://www.firstsolver.com/wordpress/ * * 开发环境: * Visual Studio V2015 * .NET Framework 4.5.2 * * 版本历史: * V1.0 2016年06月03日 * 基于ActiveMQ实现响应者 * * 参考资料: * http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html ------------------------------------------------------------ */ using Apache.NMS; using Apache.NMS.ActiveMQ; using System; using System.Windows.Forms; namespace Com.FirstSolver { public partial class FormResponder : Form { private bool IsServerRunning = false; private IConnection Connection = null; private ISession Session = null; private IMessageProducer Producer = null; private IMessageConsumer Consumer = null; private const string DESTINATION = "Com.FirstSolver.Hotline"; private IDestination NMSDestination = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(DESTINATION); public FormResponder() { InitializeComponent(); } // 清空 private void buttonClearAll_Click(object sender, EventArgs e) { textBoxMessage.Clear(); } // 启动/停止消息侦听 private void buttonStart_Click(object sender, EventArgs e) { if (IsServerRunning) { MQClose(); } else { try { IConnectionFactory Factory = new ConnectionFactory(textBoxURL.Text); Connection = Factory.CreateConnection(); Connection.ClientId = "Com.FirstSolver.ActiveMQ.Responder"; Connection.Start(); // 开启侦听 Session = Connection.CreateSession(); // 创建生产者 Producer = Session.CreateProducer(); // 创建消费者 Consumer = Session.CreateConsumer(NMSDestination); Consumer.Listener += OnMessageReceived; } catch (Exception ex) { MessageBox.Show(ex.Message, "异常", MessageBoxButtons.OK, MessageBoxIcon.Error); MQClose(); return; } IsServerRunning = true; buttonStart.Text = "Stop Listener"; } } private void OnMessageReceived(IMessage message) { if (message is ITextMessage) { this.Invoke(new Action<ITextMessage>((msg) => { textBoxMessage.AppendText("请求-消息编号:" + msg.NMSMessageId + "\r\n"); textBoxMessage.AppendText("请求-关联编号:" + msg.NMSCorrelationID + "\r\n"); textBoxMessage.AppendText("请求-消息内容:" + msg.Text + "\r\n"); // 回复消息 ITextMessage ReplyMessage = Producer.CreateTextMessage(); ReplyMessage.Text = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); ReplyMessage.NMSCorrelationID = msg.NMSCorrelationID; Producer.Send(msg.NMSReplyTo, ReplyMessage, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue); // 显示发送内容 textBoxMessage.AppendText("应答-消息编号:" + ReplyMessage.NMSMessageId + "\r\n"); textBoxMessage.AppendText("应答-关联编号:" + ReplyMessage.NMSCorrelationID + "\r\n"); textBoxMessage.AppendText("应答-消息内容:" + ReplyMessage.Text + "\r\n\r\n"); }), message as ITextMessage); } } private void FormConsumer_FormClosing(object sender, FormClosingEventArgs e) { MQClose(); } private void MQClose() { this.Invoke(new Action(() => { if (IsServerRunning) { if (Consumer != null) { Consumer.Close(); Consumer = null; } if (Producer != null) { Producer.Close(); Producer = null; } if (Session != null) { Session.Close(); Session = null; } if (Connection != null) { Connection.Stop(); Connection.Close(); Connection = null; } IsServerRunning = false; buttonStart.Text = "Start Listener"; } })); } } }