同系列文章
源代码下载
源代码
ConsumerWindow.xaml
<Window x:Class="Splash.ConsumerWindow" xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:d="http://schemas.microsoft.com/expression/blend/2008" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" mc:Ignorable="d" Title="Kafka Consumer" Width="480" Icon="bulb.ico" Closing="Window_Closing" ResizeMode="NoResize" SizeToContent="Height" WindowStartupLocation="CenterScreen"> <Grid> <Grid.RowDefinitions> <RowDefinition Height="Auto"/> <RowDefinition Height="48"/> <RowDefinition Height="Auto"/> </Grid.RowDefinitions> <GroupBox Grid.Row="0" Margin="4"> <Grid Margin="4"> <Grid.RowDefinitions> <RowDefinition Height="Auto"/> <RowDefinition Height="Auto"/> <RowDefinition Height="Auto"/> </Grid.RowDefinitions> <Grid.ColumnDefinitions> <ColumnDefinition Width="Auto"/> <ColumnDefinition/> </Grid.ColumnDefinitions> <Label Grid.Row="0" Grid.Column="0" Margin="4" Content="Borker"/> <Label Grid.Row="1" Grid.Column="0" Margin="4" Content="Topic"/> <Label Grid.Row="2" Grid.Column="0" Margin="4" Content="GroupId"/> <TextBox Grid.Row="0" Grid.Column="1" Margin="4" VerticalContentAlignment="Center" Name="textBoxBroker" Text="localhost:9092"/> <TextBox Grid.Row="1" Grid.Column="1" Margin="4" VerticalContentAlignment="Center" Name="textBoxTopic" Text="test"/> <TextBox Grid.Row="2" Grid.Column="1" Margin="4" VerticalContentAlignment="Center" Name="textBoxGroupId" Text="simple-csharp-consumer"/> </Grid> </GroupBox> <Grid Grid.Row="1" Margin="4"> <Grid.ColumnDefinitions> <ColumnDefinition/> <ColumnDefinition/> </Grid.ColumnDefinitions> <Button Grid.Column="0" Margin="4" Content="清空" HorizontalAlignment="Left" Width="64" Name="buttonClear" Click="ButtonClear_Click"/> <Button Grid.Column="1" Margin="4" Content="开启侦听" HorizontalAlignment="Right" Width="64" Name="buttonListen" Click="ButtonListen_Click"/> </Grid> <RichTextBox Grid.Row="2" Margin="4" VerticalScrollBarVisibility="Visible" IsReadOnly="True" Height="300" Name="richTextBoxLog"/> </Grid> </Window>
ConsumerWindow.xaml.cs
/* ---------------------------------------------------------- * 文件名称:ConsumerWindow.xaml.cs * 作者:秦建辉 * * 微信:splashcn * * 博客:http://www.firstsolver.com/wordpress/ * * 开发环境: * Visual Studio V2017 * .NET Framework 4.5.2 * librdkafka.redist 0.11.1 * Confluent.Kafka 0.11.1 * * 版本历史: * V1.0 2017年10月23日 * Kafka Consumer 示例 * * 控制台命令 * 启动 ZooKeeper 服务器 * bin\windows\zookeeper-server-start.bat config\zookeeper.properties * 启动 Kafka 服务器 * bin\windows\kafka-server-start.bat config\server.properties * 发送消息 * bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test * 开启消费者 * bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning ------------------------------------------------------------ */ using Confluent.Kafka; using Confluent.Kafka.Serialization; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Windows; using System.Windows.Documents; using System.Windows.Media; namespace Splash { public partial class ConsumerWindow : Window { private bool IsListening = false; private volatile bool ShouldStop = false; public ConsumerWindow() { InitializeComponent(); } private void Window_Closing(object sender, System.ComponentModel.CancelEventArgs e) { if (IsListening) { ShouldStop = true; } } private void ButtonClear_Click(object sender, RoutedEventArgs e) { richTextBoxLog.Document.Blocks.Clear(); } private void ButtonListen_Click(object sender, RoutedEventArgs e) { buttonListen.IsEnabled = false; if (IsListening) { ShouldStop = true; } else { ShouldStop = false; string Broker = textBoxBroker.Text.Trim(); if (string.IsNullOrEmpty(Broker)) { MessageBox.Show("消息代理不能为空!", "错误", MessageBoxButton.OK, MessageBoxImage.Error); return; } string Topic = textBoxTopic.Text.Trim(); if (string.IsNullOrEmpty(Topic)) { MessageBox.Show("主题不能为空!", "错误", MessageBoxButton.OK, MessageBoxImage.Error); return; } string GroupId = textBoxGroupId.Text.Trim(); if (string.IsNullOrEmpty(GroupId)) { MessageBox.Show("分组ID不能为空!", "错误", MessageBoxButton.OK, MessageBoxImage.Error); return; } ThreadPool.QueueUserWorkItem(StartListenServer, new ConsumerSetting() { Broker = Broker, Topic = Topic, GroupId = GroupId }); } } private void StartListenServer(object state) { ConsumerSetting Setting = state as ConsumerSetting; try { var config = new Dictionary<string, object> { { "group.id", Setting.GroupId }, { "bootstrap.servers", Setting.Broker } }; using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset(Setting.Topic, 0, 0) }); SetStatus(true); while (true) { if (ShouldStop) break; if (consumer.Consume(out Message<Null, string> msg, TimeSpan.FromSeconds(1))) { AppendTextToLog($"Topic: {msg.Topic}\r\nPartition: {msg.Partition}\r\nOffset: {msg.Offset}\r\nMessage: {msg.Value}"); } } } } catch (Exception exception) { string ErrorMessage = exception.Message; if (string.IsNullOrEmpty(ErrorMessage)) ErrorMessage = exception.InnerException.Message; MessageBox.Show(ErrorMessage, "错误", MessageBoxButton.OK, MessageBoxImage.Error); } finally { SetStatus(false); } } public void SetStatus(bool isListening) { this.Dispatcher.Invoke(new Action<bool>((status) => { if (status) { this.buttonListen.Content = "停止侦听"; } else { this.buttonListen.Content = "开启侦听"; } this.IsListening = status; this.buttonListen.IsEnabled = true; }), isListening); } private void AppendTextToLog(string content) { this.Dispatcher.BeginInvoke(new Action<string>((message) => { Run run = new Run(message) { Background = new SolidColorBrush(Colors.LightGray) }; richTextBoxLog.Document.Blocks.Add(new Paragraph(run)); }), content); } } public class ConsumerSetting { public string Broker; public string Topic; public string GroupId; } }