Apache Kafka(五)流式计算:计算单元(Java)

同系列文章

源代码下载

KafkaDemo.zip

演示程序界面

源代码
build.gradle

/*
开发环境:
    jdk 8u152
    IntelliJ IDEA 2017.2.5
    Gradle 4.3

Gradle 命令集合
    gradle clean
    gradle build
    gradle run
    gradle uploadArchives

参考资料:
    Gradle Build Language Reference     https://docs.gradle.org/4.3/dsl/
 */
group 'Com.FirstSolver'
version '1.0.0.1'

apply plugin: 'application' // 打包成 zip 和 tar
apply plugin: 'java'        // 使用 java 插件

sourceCompatibility = 1.8   // JDK8
targetCompatibility = 1.8   // JDK8
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

repositories {
    // 阿里云 maven 镜像
    maven { url 'http://maven.aliyun.com/mvn/repository/' }

    // mavenCentral()
}

// 声明项目依赖
dependencies {
    // 编译项目源代码时依赖
    compile "org.apache.kafka:kafka-clients:1.0.0"
    compile "org.apache.kafka:kafka-streams:1.0.0"

    // 编译测试代码时依赖
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

// 指定 application 主类
mainClassName = 'Splash.KafkaStreamsWordCount'

// 输出 jar 文件名定义
jar {
    baseName = 'KafkaStreamsWordCount'
    version = '1.0.0.1'

    manifest {
        // 设置 jar 主类
        attributes 'Main-Class': 'Splash.KafkaStreamsWordCount'
    }

    // 将引用的包打入 jar 包
    // from configuration.compile.collect { zipTree it}
}

// 版本发布地址
uploadArchives {
    repositories {
        flatDir {
            dirs 'repos'
        }
    }
}

// 指定资源文件源目录
sourceSets {
    main {
        resources {
            srcDirs = ["src/main/java"] // 资源文件和 .java 文件放置在同一个目录下
            excludes = ["**/*.java"]
        }
    }
}

// 指定资源文件目标目录
processResources {
    into "$buildDir"    // 资源文件和 .class 文件打包到同一个目录下
}

/*
// 打包授权文件
task copyLicense {
    outputs.file new File("$buildDir/LICENSE")
    doLast {
        copy {
            from "LICENSE"
            into "$buildDir"
        }
    }
}

applicationDistribution.from(copyLicense) {
    into ""
}
*/

KafkaStreamsWordCount.java

package Splash;

import javafx.application.Application;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.scene.image.Image;
import javafx.stage.Stage;
import java.util.Locale;
import javafx.stage.WindowEvent;

public class KafkaStreamsWordCount extends Application {
    @Override
    public void start(Stage stage) throws Exception{
        Locale.setDefault(Locale.CHINESE); // 显示中文

        FXMLLoader loader = new FXMLLoader(getClass().getResource("KafkaStreamsWordCount.fxml"));
        Parent root = loader.load();
        stage.setScene(new Scene(root));
        stage.sizeToScene();

        // 设置窗体标题
        stage.setTitle("Kafka Streams 基于流式计算的单词频率统计");

        // 设置窗体图标
        stage.getIcons().add(new Image(getClass().getResourceAsStream("FireEyes.png")));

        // 设置到屏幕中心
        stage.centerOnScreen();

        // 设置窗口关闭处理函数
        stage.setOnCloseRequest((WindowEvent e) -> {
            try {
                KafkaStreamsWordCountController controller = loader.getController();
                controller.close();
            } catch (Exception exception) {
                // 忽视异常
            }
        });
        
        // 显示窗体
        stage.show();
    }

    public static void main(String[] args) {
        launch(args);
    }
}

KafkaStreamsWordCount.fxml

<?xml version="1.0" encoding="UTF-8"?>

<?import javafx.geometry.Insets?>
<?import javafx.scene.control.Button?>
<?import javafx.scene.control.Label?>
<?import javafx.scene.control.TextArea?>
<?import javafx.scene.control.TextField?>
<?import javafx.scene.layout.AnchorPane?>
<?import javafx.scene.layout.ColumnConstraints?>
<?import javafx.scene.layout.GridPane?>
<?import javafx.scene.layout.RowConstraints?>

<GridPane alignment="center" hgap="10" vgap="10" xmlns="http://javafx.com/javafx/8.0.121" xmlns:fx="http://javafx.com/fxml/1" fx:controller="Splash.KafkaStreamsWordCountController">
    <rowConstraints>
        <RowConstraints />
        <RowConstraints />
        <RowConstraints />
    </rowConstraints>

    <AnchorPane style="-fx-border-insets: 8 0 0 0; -fx-background-color: #FFFFFF; -fx-border-color: lightgray;" GridPane.hgrow="ALWAYS" GridPane.rowIndex="0">
        <children>
            <Label alignment="TOP_LEFT" layoutX="14.0" style="-fx-padding: 0 5; -fx-background-color: inherit;" text="消息设置" />
            <GridPane AnchorPane.bottomAnchor="1.0" AnchorPane.leftAnchor="1.0" AnchorPane.rightAnchor="1.0" AnchorPane.topAnchor="10.0">
                <columnConstraints>
                    <ColumnConstraints />
                    <ColumnConstraints hgrow="ALWAYS" />
                </columnConstraints>

                <rowConstraints>
                    <RowConstraints />
                    <RowConstraints />
                    <RowConstraints />
                    <RowConstraints />
                </rowConstraints>

                <children>
                    <Label text="消息代理" GridPane.columnIndex="0" GridPane.rowIndex="0" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="应用标识" GridPane.columnIndex="0" GridPane.rowIndex="1" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="输入主题" GridPane.columnIndex="0" GridPane.rowIndex="2" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="输出主题" GridPane.columnIndex="0" GridPane.rowIndex="3" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <TextField fx:id="textFieldBroker" text="localhost:9092" GridPane.columnIndex="1" GridPane.rowIndex="0" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldApplicationId" text="streams-wordcount" GridPane.columnIndex="1" GridPane.rowIndex="1" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldInputTopic" text="streams-plaintext-input" GridPane.columnIndex="1" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldOutputTopic" text="streams-wordcount-output" GridPane.columnIndex="1" GridPane.rowIndex="3" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>
                </children>

                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </GridPane>
        </children>

        <GridPane.margin>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </GridPane.margin>

        <padding>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </padding>
    </AnchorPane>

    <GridPane GridPane.hgrow="ALWAYS" GridPane.rowIndex="1">
        <columnConstraints>
            <ColumnConstraints />
            <ColumnConstraints hgrow="ALWAYS" />
        </columnConstraints>

        <rowConstraints>
            <RowConstraints />
        </rowConstraints>

        <children>
            <Button fx:id="buttonClear" minHeight="40.0" minWidth="80.0" onAction="#handleButtonClearAction" text="清空" GridPane.columnIndex="0" GridPane.halignment="LEFT" GridPane.hgrow="ALWAYS">
                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </Button>

            <Button fx:id="buttonExecute" minHeight="40.0" minWidth="80.0" onAction="#handleButtonExecute" text="开启计算" GridPane.columnIndex="1" GridPane.halignment="RIGHT" GridPane.hgrow="ALWAYS">
                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </Button>
        </children>
    </GridPane>

    <TextArea fx:id="textAreaLog" editable="false" minHeight="400.0" minWidth="600.0" wrapText="true" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS" GridPane.vgrow="ALWAYS">
        <GridPane.margin>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </GridPane.margin>
    </TextArea>
</GridPane>

KafkaStreamsWordCountController.java

/* ----------------------------------------------------------
 * 文件名称:KafkaStreamsWordCountController.java
 * 作者:秦建辉
 *
 * 微信:splashcn
 *
 * 博客:http://www.firstsolver.com/wordpress/
 *
 * 开发环境:
 *      IntelliJ IDEA 2017.2.5
 *      Gradle 4.3
 *      Apache Kafka 1.0.0
 *
 * 版本历史:
 *      V1.0    2017年11月13日
 *              KafkaStreams 流式计算示例
 *
 * 参考资料:
 *      kafka 1.0.0 API     http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/
 *      Java API Docs       https://docs.oracle.com/javase/9/docs/api/index.html?overview-summary.html
 *      ManualResetEvent    http://www.firstsolver.com/wordpress/?p=3359
 *
 * 控制台命令
 *      启动 ZooKeeper 服务器
 *          bin\windows\zookeeper-server-start.bat config\zookeeper.properties
 *      启动 Kafka 服务器
 *          bin\windows\kafka-server-start.bat config\server.properties
 *      发送消息
 *          bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
 *      开启消费者
 *          bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
------------------------------------------------------------ */
package Splash;

import javafx.application.Platform;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.*;

public class KafkaStreamsWordCountController implements AutoCloseable {
    @FXML
    private TextField textFieldBroker;

    @FXML
    private TextField textFieldApplicationId;

    @FXML
    private TextField textFieldInputTopic;

    @FXML
    private TextField textFieldOutputTopic;

    @FXML
    private TextArea textAreaLog;

    @FXML
    private Button buttonExecute;

    // 线程池
    private final ExecutorService pool = Executors.newCachedThreadPool();

    // 停止 KafkaStreams 事件
    private final ManualResetEvent stopStreamsArrivedEvent = new ManualResetEvent(false);

    // KafkaStreams 是否正在运行
    private volatile Boolean IsRunning = false;

    @Override
    public void close() throws Exception {
        if (IsRunning) closeStreams();

        // 释放线程池
        pool.shutdown();
    }

    private void closeStreams() throws InterruptedException {
        stopStreamsArrivedEvent.Set();
    }

    // 清空
    @FXML
    public void handleButtonClearAction(ActionEvent actionEvent) {
        textAreaLog.clear();
    }

    // 提交
    @FXML
    public void handleButtonExecute(ActionEvent actionEvent) throws InterruptedException {
        buttonExecute.setDisable(true);
        if (IsRunning) {
            closeStreams();
        } else {
            // 获取消息代理
            String Broker = textFieldBroker.getText();
            if (IsNullOrEmpty(Broker)) {
                ShowErrorMessage("消息代理不能为空!");
                return;
            }

            // 获取应用标识
            String ApplicationId = textFieldApplicationId.getText();
            if (IsNullOrEmpty(ApplicationId)) {
                ShowErrorMessage("应用标识不能为空!");
                return;
            }

            // 获取输入主题
            String InputTopic = textFieldInputTopic.getText();
            if (IsNullOrEmpty(InputTopic)) {
                ShowErrorMessage("输入主题不能为空!");
                return;
            }

            // 获取输出主题
            String OutputTopic = textFieldOutputTopic.getText();
            if (IsNullOrEmpty(OutputTopic)) {
                ShowErrorMessage("输出主题不能为空!");
                return;
            }

            pool.execute(()->{
                KafkaStreams streams = null;
                try {
                    // 配置 Kafka Streams 属性
                    Properties props = new Properties();
                    props.put(StreamsConfig.APPLICATION_ID_CONFIG, ApplicationId);
                    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Broker);
                    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
                    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

                    StreamsBuilder builder = new StreamsBuilder();
                    KStream<String, String> source = builder.stream(InputTopic);
                    KTable<String, Long> counts = source
                            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                @Override
                                public Iterable<String> apply(String value) {
                                    OnDataEcho(value); // 回显正在处理的数据
                                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                                }
                            })
                            .groupBy(new KeyValueMapper<String, String, String>() {
                                @Override
                                public String apply(String key, String value) {
                                    return value;
                                }
                            })
                            .count();

                    counts.toStream().to(OutputTopic, Produced.with(Serdes.String(), Serdes.Long()));
                    streams = new KafkaStreams(builder.build(), props);

                    stopStreamsArrivedEvent.Reset();
                    OnChangeStatus(true);
                    streams.start();
                    stopStreamsArrivedEvent.WaitOne();
                }
                catch(Exception exception){
                    this.ShowErrorMessage(exception.getMessage());

                }
                finally {
                    if (streams != null) streams.close();
                    OnChangeStatus(false);
                }
            });
        }
    }

    // 回显正在处理的消息
    public void OnDataEcho(String record) {
        Platform.runLater(() -> {
            // 显示消息
            textAreaLog.appendText(record + "\r\n");
        });
    }

    // 设置状态
    public void OnChangeStatus(Boolean isRunning) {
        final CountDownLatch latch = new CountDownLatch(1);
        Platform.runLater(() -> {
            if (isRunning) {
                buttonExecute.setText("停止计算");
            }
            else {
                buttonExecute.setText("开启计算");
            }
            IsRunning = isRunning;
            buttonExecute.setDisable(false);
            latch.countDown();
        });
        try {
            latch.await();  // 等待线程同步
        } catch (InterruptedException e) {
            // 忽视异常
        }
    }

    public static void ShowErrorMessage(String message) {
        Platform.runLater(() -> {
            Alert alert = new Alert(Alert.AlertType.ERROR, message, ButtonType.OK);
            alert.setTitle("KafkaStreamsWordCount");
            alert.setHeaderText("错误");
            alert.showAndWait();
        });
    }

    // 判断字符串是否为空
    public static boolean IsNullOrEmpty(String value) {
        return value == null || value.isEmpty();
    }
}

Comments are closed.