Apache Kafka(六)流式计算:输出单元(C#)

同系列文章

源代码下载

KafkaDemo.zip

演示程序界面

源代码
KafkaStreamsConsumerWindow.xaml

<Window x:Class="Splash.KafkaStreamsConsumerWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
        mc:Ignorable="d"
        Title="KafkaStreams 流式计算结果显示" Width="480" Height="640" Icon="bulb.ico" Closing="Window_Closing" Loaded="Window_Loaded" WindowStartupLocation="CenterScreen">
    <Grid>
        <Grid.RowDefinitions>
            <RowDefinition Height="Auto"/>
            <RowDefinition Height="48"/>
            <RowDefinition Height="*"/>
        </Grid.RowDefinitions>

        <GroupBox Grid.Row="0" Margin="4">
            <Grid Margin="4">
                <Grid.RowDefinitions>
                    <RowDefinition Height="Auto"/>
                    <RowDefinition Height="Auto"/>
                    <RowDefinition Height="Auto"/>
                </Grid.RowDefinitions>

                <Grid.ColumnDefinitions>
                    <ColumnDefinition Width="Auto"/>
                    <ColumnDefinition/>
                </Grid.ColumnDefinitions>

                <Label Grid.Row="0" Grid.Column="0" Margin="4" Content="Borker"/>
                <Label Grid.Row="1" Grid.Column="0" Margin="4" Content="Topic"/>
                <Label Grid.Row="2" Grid.Column="0" Margin="4" Content="GroupId"/>

                <TextBox Grid.Row="0" Grid.Column="1" Margin="4" VerticalContentAlignment="Center" Name="textBoxBroker" Text="localhost:9092"/>
                <TextBox Grid.Row="1" Grid.Column="1" Margin="4" VerticalContentAlignment="Center" Name="textBoxTopic" Text="streams-wordcount-output"/>
                <TextBox Grid.Row="2" Grid.Column="1" Margin="4" VerticalContentAlignment="Center" Name="textBoxGroupId" Text="streams-wordcount-csharp-consumer"/>
            </Grid>
        </GroupBox>

        <Grid Grid.Row="1" Margin="4">
            <Grid.ColumnDefinitions>
                <ColumnDefinition/>
                <ColumnDefinition/>
            </Grid.ColumnDefinitions>

            <Button Grid.Column="1" Margin="4" Content="开启侦听" HorizontalAlignment="Right" Width="64" Name="buttonListen" Click="ButtonListen_Click"/>
        </Grid>

        <DataGrid Grid.Row="2" Margin="4" Name="dataGridWordStatistics" GridLinesVisibility="All" ItemsSource="{Binding}" AutoGenerateColumns="False" CanUserResizeRows="False" HeadersVisibility="Column" SelectionMode="Single" VerticalScrollBarVisibility="Auto">
            <DataGrid.ColumnHeaderStyle>
                <Style TargetType="DataGridColumnHeader">
                    <Setter Property="HorizontalAlignment" Value="Stretch"/>
                    <Setter Property="HorizontalContentAlignment" Value="Center" />
                </Style>
            </DataGrid.ColumnHeaderStyle>

            <DataGrid.Columns>
                <DataGridTextColumn Header="单词" Width="*" Binding="{Binding Word}" CanUserResize="False" IsReadOnly="True" CanUserReorder="False" CanUserSort="False">
                    <DataGridTextColumn.ElementStyle>
                        <Style TargetType="TextBlock">
                            <Setter Property="HorizontalAlignment" Value="Left" />
                        </Style>
                    </DataGridTextColumn.ElementStyle>
                </DataGridTextColumn>

                <DataGridTextColumn Header="次数" MinWidth="200" Width="Auto" Binding="{Binding Count}" CanUserResize="False" IsReadOnly="True" CanUserReorder="False" CanUserSort="False">
                </DataGridTextColumn>
            </DataGrid.Columns>
        </DataGrid>     
    </Grid>
</Window>

KafkaStreamsConsumerWindow.xaml.cs

/* ----------------------------------------------------------
 * 文件名称:KafkaStreamsConsumerWindow.xaml.cs
 * 作者:秦建辉
 * 
 * 微信:splashcn
 * 
 * 博客:http://www.firstsolver.com/wordpress/
 * 
 * 开发环境:
 *      Visual Studio V2017
 *      .NET Framework 4.5.2
 *      librdkafka.redist 0.11.1
 *      Confluent.Kafka 0.11.1
 *      
 * 版本历史:
 *      V1.0    2017年11月14日
 *              KafkaStreams 流式计算结果演示
 *              
 * 控制台命令
 *      启动 ZooKeeper 服务器
 *          bin\windows\zookeeper-server-start.bat config\zookeeper.properties
 *      启动 Kafka 服务器
 *          bin\windows\kafka-server-start.bat config\server.properties
 *      发送消息
 *          bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
 *      开启消费者
 *          bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
------------------------------------------------------------ */
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading;
using System.Windows;

namespace Splash
{
    public partial class KafkaStreamsConsumerWindow : Window
    {
        private bool IsListening = false;
        private volatile bool ShouldStop = false;
        private volatile ObservableCollection<TWordInformation> WordStatistics = new ObservableCollection<TWordInformation>();

        public KafkaStreamsConsumerWindow()
        {
            InitializeComponent();
        }

        private void Window_Loaded(object sender, RoutedEventArgs e)
        {
            dataGridWordStatistics.ItemsSource = WordStatistics;
        }

        private void Window_Closing(object sender, System.ComponentModel.CancelEventArgs e)
        {
            if (IsListening)
            {
                ShouldStop = true;
            }
        }

        private void ButtonListen_Click(object sender, RoutedEventArgs e)
        {
            buttonListen.IsEnabled = false;
            if (IsListening)
            {
                ShouldStop = true;
            }
            else
            {
                ShouldStop = false;

                string Broker = textBoxBroker.Text.Trim();
                if (string.IsNullOrEmpty(Broker))
                {
                    MessageBox.Show("消息代理不能为空!", "错误", MessageBoxButton.OK, MessageBoxImage.Error);
                    return;
                }

                string Topic = textBoxTopic.Text.Trim();
                if (string.IsNullOrEmpty(Topic))
                {
                    MessageBox.Show("主题不能为空!", "错误", MessageBoxButton.OK, MessageBoxImage.Error);
                    return;
                }

                string GroupId = textBoxGroupId.Text.Trim();
                if (string.IsNullOrEmpty(GroupId))
                {
                    MessageBox.Show("分组ID不能为空!", "错误", MessageBoxButton.OK, MessageBoxImage.Error);
                    return;
                }

                ThreadPool.QueueUserWorkItem(StartListenServer, new TConsumerSetting() { Broker = Broker, Topic = Topic, GroupId = GroupId });
            }
        }

        private void StartListenServer(object state)
        {
            TConsumerSetting Setting = state as TConsumerSetting;
            try
            {
                var config = new Dictionary<string, object>
                {
                    { "group.id", Setting.GroupId },
                    { "bootstrap.servers", Setting.Broker },
                    { "auto.offset.reset", "earliest" }
                };

                using (var consumer = new Consumer<string, long>(config, new StringDeserializer(Encoding.UTF8), new LongDeserializer()))
                {
                    consumer.Subscribe(Setting.Topic);

                    SetStatus(true);
                    while (true)
                    {
                        if (ShouldStop) break;
                        if (consumer.Consume(out Message<string, long> msg, TimeSpan.FromSeconds(1)))
                        {
                            string key = msg.Key;
                            long value = msg.Value;
                            this.Dispatcher.Invoke(() => {
                                try
                                {
                                    TWordInformation Source = Enumerable.First<TWordInformation>(WordStatistics, s => key.Equals(s.Word));
                                    WordStatistics.Remove(Source);
                                    WordStatistics.Add(new TWordInformation(key, value));
                                }
                                catch (InvalidOperationException)
                                {   // 新单词
                                    WordStatistics.Add(new TWordInformation(key, value));
                                }
                            });
                        }
                    }
                }
            }
            catch (Exception exception)
            {
                string ErrorMessage = exception.Message;
                if (string.IsNullOrEmpty(ErrorMessage)) ErrorMessage = exception.InnerException.Message;
                MessageBox.Show(ErrorMessage, "错误", MessageBoxButton.OK, MessageBoxImage.Error);
            }
            finally
            {
                SetStatus(false);
            }
        }

        public void SetStatus(bool isListening)
        {
            this.Dispatcher.Invoke(new Action<bool>((status) =>
            {
                if (status)
                {
                    this.buttonListen.Content = "停止侦听";
                }
                else
                {
                    this.buttonListen.Content = "开启侦听";
                }
                this.IsListening = status;
                this.buttonListen.IsEnabled = true;
            }), isListening);
        }
    }

    public class TConsumerSetting
    {
        public string Broker;
        public string Topic;
        public string GroupId;
    }

    public class TWordInformation
    {
        /// <summary>
        /// 单词
        /// </summary>
        public string Word { get; set; }

        /// <summary>
        /// 次数
        /// </summary>
        public long Count { get; set; }

        // 构造函数
        public TWordInformation(string word, long count)
        {
            Word = word;
            Count = count;
        }
    }
}

Comments are closed.