同系列文章
源代码下载
源代码
build.gradle
/* 开发环境: jdk 8u152 IntelliJ IDEA 2017.2.5 Gradle 4.3 Gradle 命令集合 gradle clean gradle build gradle run gradle uploadArchives 参考资料: Gradle Build Language Reference https://docs.gradle.org/4.3/dsl/ */ group 'Com.FirstSolver' version '1.0.0.1' apply plugin: 'application' // 打包成 zip 和 tar apply plugin: 'java' // 使用 java 插件 sourceCompatibility = 1.8 // JDK8 targetCompatibility = 1.8 // JDK8 [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' repositories { // 阿里云 maven 镜像 maven { url 'http://maven.aliyun.com/mvn/repository/' } // mavenCentral() } // 声明项目依赖 dependencies { // 编译项目源代码时依赖 compile "org.apache.kafka:kafka-clients:1.0.0" compile "org.apache.kafka:kafka-streams:1.0.0" // 编译测试代码时依赖 testCompile group: 'junit', name: 'junit', version: '4.12' } // 指定 application 主类 mainClassName = 'Splash.KafkaStreamsWordCount' // 输出 jar 文件名定义 jar { baseName = 'KafkaStreamsWordCount' version = '1.0.0.1' manifest { // 设置 jar 主类 attributes 'Main-Class': 'Splash.KafkaStreamsWordCount' } // 将引用的包打入 jar 包 // from configuration.compile.collect { zipTree it} } // 版本发布地址 uploadArchives { repositories { flatDir { dirs 'repos' } } } // 指定资源文件源目录 sourceSets { main { resources { srcDirs = ["src/main/java"] // 资源文件和 .java 文件放置在同一个目录下 excludes = ["**/*.java"] } } } // 指定资源文件目标目录 processResources { into "$buildDir" // 资源文件和 .class 文件打包到同一个目录下 } /* // 打包授权文件 task copyLicense { outputs.file new File("$buildDir/LICENSE") doLast { copy { from "LICENSE" into "$buildDir" } } } applicationDistribution.from(copyLicense) { into "" } */
KafkaStreamsWordCount.java
package Splash; import javafx.application.Application; import javafx.fxml.FXMLLoader; import javafx.scene.Parent; import javafx.scene.Scene; import javafx.scene.image.Image; import javafx.stage.Stage; import java.util.Locale; import javafx.stage.WindowEvent; public class KafkaStreamsWordCount extends Application { @Override public void start(Stage stage) throws Exception{ Locale.setDefault(Locale.CHINESE); // 显示中文 FXMLLoader loader = new FXMLLoader(getClass().getResource("KafkaStreamsWordCount.fxml")); Parent root = loader.load(); stage.setScene(new Scene(root)); stage.sizeToScene(); // 设置窗体标题 stage.setTitle("Kafka Streams 基于流式计算的单词频率统计"); // 设置窗体图标 stage.getIcons().add(new Image(getClass().getResourceAsStream("FireEyes.png"))); // 设置到屏幕中心 stage.centerOnScreen(); // 设置窗口关闭处理函数 stage.setOnCloseRequest((WindowEvent e) -> { try { KafkaStreamsWordCountController controller = loader.getController(); controller.close(); } catch (Exception exception) { // 忽视异常 } }); // 显示窗体 stage.show(); } public static void main(String[] args) { launch(args); } }
KafkaStreamsWordCount.fxml
<?xml version="1.0" encoding="UTF-8"?> <?import javafx.geometry.Insets?> <?import javafx.scene.control.Button?> <?import javafx.scene.control.Label?> <?import javafx.scene.control.TextArea?> <?import javafx.scene.control.TextField?> <?import javafx.scene.layout.AnchorPane?> <?import javafx.scene.layout.ColumnConstraints?> <?import javafx.scene.layout.GridPane?> <?import javafx.scene.layout.RowConstraints?> <GridPane alignment="center" hgap="10" vgap="10" xmlns="http://javafx.com/javafx/8.0.121" xmlns:fx="http://javafx.com/fxml/1" fx:controller="Splash.KafkaStreamsWordCountController"> <rowConstraints> <RowConstraints /> <RowConstraints /> <RowConstraints /> </rowConstraints> <AnchorPane style="-fx-border-insets: 8 0 0 0; -fx-background-color: #FFFFFF; -fx-border-color: lightgray;" GridPane.hgrow="ALWAYS" GridPane.rowIndex="0"> <children> <Label alignment="TOP_LEFT" layoutX="14.0" style="-fx-padding: 0 5; -fx-background-color: inherit;" text="消息设置" /> <GridPane AnchorPane.bottomAnchor="1.0" AnchorPane.leftAnchor="1.0" AnchorPane.rightAnchor="1.0" AnchorPane.topAnchor="10.0"> <columnConstraints> <ColumnConstraints /> <ColumnConstraints hgrow="ALWAYS" /> </columnConstraints> <rowConstraints> <RowConstraints /> <RowConstraints /> <RowConstraints /> <RowConstraints /> </rowConstraints> <children> <Label text="消息代理" GridPane.columnIndex="0" GridPane.rowIndex="0" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <Label text="应用标识" GridPane.columnIndex="0" GridPane.rowIndex="1" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <Label text="输入主题" GridPane.columnIndex="0" GridPane.rowIndex="2" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <Label text="输出主题" GridPane.columnIndex="0" GridPane.rowIndex="3" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <TextField fx:id="textFieldBroker" text="localhost:9092" GridPane.columnIndex="1" GridPane.rowIndex="0" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> <TextField fx:id="textFieldApplicationId" text="streams-wordcount" GridPane.columnIndex="1" GridPane.rowIndex="1" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> <TextField fx:id="textFieldInputTopic" text="streams-plaintext-input" GridPane.columnIndex="1" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> <TextField fx:id="textFieldOutputTopic" text="streams-wordcount-output" GridPane.columnIndex="1" GridPane.rowIndex="3" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> </children> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </GridPane> </children> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> <padding> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </padding> </AnchorPane> <GridPane GridPane.hgrow="ALWAYS" GridPane.rowIndex="1"> <columnConstraints> <ColumnConstraints /> <ColumnConstraints hgrow="ALWAYS" /> </columnConstraints> <rowConstraints> <RowConstraints /> </rowConstraints> <children> <Button fx:id="buttonClear" minHeight="40.0" minWidth="80.0" onAction="#handleButtonClearAction" text="清空" GridPane.columnIndex="0" GridPane.halignment="LEFT" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Button> <Button fx:id="buttonExecute" minHeight="40.0" minWidth="80.0" onAction="#handleButtonExecute" text="开启计算" GridPane.columnIndex="1" GridPane.halignment="RIGHT" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Button> </children> </GridPane> <TextArea fx:id="textAreaLog" editable="false" minHeight="400.0" minWidth="600.0" wrapText="true" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS" GridPane.vgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextArea> </GridPane>
KafkaStreamsWordCountController.java
/* ---------------------------------------------------------- * 文件名称:KafkaStreamsWordCountController.java * 作者:秦建辉 * * 微信:splashcn * * 博客:http://www.firstsolver.com/wordpress/ * * 开发环境: * IntelliJ IDEA 2017.2.5 * Gradle 4.3 * Apache Kafka 1.0.0 * * 版本历史: * V1.0 2017年11月13日 * KafkaStreams 流式计算示例 * * 参考资料: * kafka 1.0.0 API http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/ * Java API Docs https://docs.oracle.com/javase/9/docs/api/index.html?overview-summary.html * ManualResetEvent http://www.firstsolver.com/wordpress/?p=3359 * * 控制台命令 * 启动 ZooKeeper 服务器 * bin\windows\zookeeper-server-start.bat config\zookeeper.properties * 启动 Kafka 服务器 * bin\windows\kafka-server-start.bat config\server.properties * 发送消息 * bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test * 开启消费者 * bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning ------------------------------------------------------------ */ package Splash; import javafx.application.Platform; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.*; public class KafkaStreamsWordCountController implements AutoCloseable { @FXML private TextField textFieldBroker; @FXML private TextField textFieldApplicationId; @FXML private TextField textFieldInputTopic; @FXML private TextField textFieldOutputTopic; @FXML private TextArea textAreaLog; @FXML private Button buttonExecute; // 线程池 private final ExecutorService pool = Executors.newCachedThreadPool(); // 停止 KafkaStreams 事件 private final ManualResetEvent stopStreamsArrivedEvent = new ManualResetEvent(false); // KafkaStreams 是否正在运行 private volatile Boolean IsRunning = false; @Override public void close() throws Exception { if (IsRunning) closeStreams(); // 释放线程池 pool.shutdown(); } private void closeStreams() throws InterruptedException { stopStreamsArrivedEvent.Set(); } // 清空 @FXML public void handleButtonClearAction(ActionEvent actionEvent) { textAreaLog.clear(); } // 提交 @FXML public void handleButtonExecute(ActionEvent actionEvent) throws InterruptedException { buttonExecute.setDisable(true); if (IsRunning) { closeStreams(); } else { // 获取消息代理 String Broker = textFieldBroker.getText(); if (IsNullOrEmpty(Broker)) { ShowErrorMessage("消息代理不能为空!"); return; } // 获取应用标识 String ApplicationId = textFieldApplicationId.getText(); if (IsNullOrEmpty(ApplicationId)) { ShowErrorMessage("应用标识不能为空!"); return; } // 获取输入主题 String InputTopic = textFieldInputTopic.getText(); if (IsNullOrEmpty(InputTopic)) { ShowErrorMessage("输入主题不能为空!"); return; } // 获取输出主题 String OutputTopic = textFieldOutputTopic.getText(); if (IsNullOrEmpty(OutputTopic)) { ShowErrorMessage("输出主题不能为空!"); return; } pool.execute(()->{ KafkaStreams streams = null; try { // 配置 Kafka Streams 属性 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, ApplicationId); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Broker); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream(InputTopic); KTable<String, Long> counts = source .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { OnDataEcho(value); // 回显正在处理的数据 return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); } }) .groupBy(new KeyValueMapper<String, String, String>() { @Override public String apply(String key, String value) { return value; } }) .count(); counts.toStream().to(OutputTopic, Produced.with(Serdes.String(), Serdes.Long())); streams = new KafkaStreams(builder.build(), props); stopStreamsArrivedEvent.Reset(); OnChangeStatus(true); streams.start(); stopStreamsArrivedEvent.WaitOne(); } catch(Exception exception){ this.ShowErrorMessage(exception.getMessage()); } finally { if (streams != null) streams.close(); OnChangeStatus(false); } }); } } // 回显正在处理的消息 public void OnDataEcho(String record) { Platform.runLater(() -> { // 显示消息 textAreaLog.appendText(record + "\r\n"); }); } // 设置状态 public void OnChangeStatus(Boolean isRunning) { final CountDownLatch latch = new CountDownLatch(1); Platform.runLater(() -> { if (isRunning) { buttonExecute.setText("停止计算"); } else { buttonExecute.setText("开启计算"); } IsRunning = isRunning; buttonExecute.setDisable(false); latch.countDown(); }); try { latch.await(); // 等待线程同步 } catch (InterruptedException e) { // 忽视异常 } } public static void ShowErrorMessage(String message) { Platform.runLater(() -> { Alert alert = new Alert(Alert.AlertType.ERROR, message, ButtonType.OK); alert.setTitle("KafkaStreamsWordCount"); alert.setHeaderText("错误"); alert.showAndWait(); }); } // 判断字符串是否为空 public static boolean IsNullOrEmpty(String value) { return value == null || value.isEmpty(); } }