同系列文章
源代码下载
源代码
build.gradle
/* 开发环境: jdk 8u152 IntelliJ IDEA 2017.2.5 Gradle 4.3 Gradle 命令集合 gradle clean gradle build gradle run gradle uploadArchives 参考资料: Gradle Build Language Reference https://docs.gradle.org/4.3/dsl/ */ group 'Com.FirstSolver' version '1.0.0.1' apply plugin: 'application' // 打包成 zip 和 tar apply plugin: 'java' // 使用 java 插件 sourceCompatibility = 1.8 // JDK8 targetCompatibility = 1.8 // JDK8 [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' repositories { // 阿里云 maven 镜像 maven { url 'http://maven.aliyun.com/mvn/repository/' } // mavenCentral() } // 声明项目依赖 dependencies { // 编译项目源代码时依赖 compile "org.apache.kafka:kafka-clients:1.0.0" // 编译测试代码时依赖 testCompile group: 'junit', name: 'junit', version: '4.12' } // 指定 application 主类 mainClassName = 'Splash.KafkaSimpleConsumer' // 输出 jar 文件名定义 jar { baseName = 'KafkaSimpleConsumer' version = '1.0.0.1' manifest { // 设置 jar 主类 attributes 'Main-Class': 'Splash.KafkaSimpleConsumer' } // 将引用的包打入 jar 包 // from configuration.compile.collect { zipTree it} } // 版本发布地址 uploadArchives { repositories { flatDir { dirs 'repos' } } } // 指定资源文件源目录 sourceSets { main { resources { srcDirs = ["src/main/java"] // 资源文件和 .java 文件放置在同一个目录下 excludes = ["**/*.java"] } } } // 指定资源文件目标目录 processResources { into "$buildDir" // 资源文件和 .class 文件打包到同一个目录下 } /* // 打包授权文件 task copyLicense { outputs.file new File("$buildDir/LICENSE") doLast { copy { from "LICENSE" into "$buildDir" } } } applicationDistribution.from(copyLicense) { into "" } */
KafkaSimpleConsumer.java
package Splash; import javafx.application.Application; import javafx.fxml.FXMLLoader; import javafx.scene.Parent; import javafx.scene.Scene; import javafx.scene.image.Image; import javafx.stage.Stage; import java.util.Locale; import javafx.stage.WindowEvent; public class KafkaSimpleConsumer extends Application { @Override public void start(Stage stage) throws Exception{ Locale.setDefault(Locale.CHINESE); // 显示中文 FXMLLoader loader = new FXMLLoader(getClass().getResource("KafkaSimpleConsumer.fxml")); Parent root = loader.load(); stage.setScene(new Scene(root)); stage.sizeToScene(); // 设置窗体标题 stage.setTitle("Kafka 简单消费者演示"); // 设置窗体图标 stage.getIcons().add(new Image(getClass().getResourceAsStream("FireEyes.png"))); // 设置到屏幕中心 stage.centerOnScreen(); // 设置窗口关闭处理函数 stage.setOnCloseRequest((WindowEvent e) -> { try { KafkaSimpleConsumerController controller = loader.getController(); controller.close(); } catch (Exception e1) { // 忽视异常 } }); // 显示窗体 stage.show(); } public static void main(String[] args) { launch(args); } }
KafkaSimpleConsumer.fxml
<?xml version="1.0" encoding="UTF-8"?> <?import javafx.geometry.Insets?> <?import javafx.scene.control.Button?> <?import javafx.scene.control.Label?> <?import javafx.scene.control.TextArea?> <?import javafx.scene.control.TextField?> <?import javafx.scene.layout.AnchorPane?> <?import javafx.scene.layout.ColumnConstraints?> <?import javafx.scene.layout.GridPane?> <?import javafx.scene.layout.RowConstraints?> <GridPane alignment="center" hgap="10" vgap="10" xmlns="http://javafx.com/javafx/8.0.121" xmlns:fx="http://javafx.com/fxml/1" fx:controller="Splash.KafkaSimpleConsumerController"> <rowConstraints> <RowConstraints /> <RowConstraints /> <RowConstraints /> </rowConstraints> <AnchorPane style="-fx-border-insets: 8 0 0 0; -fx-background-color: #FFFFFF; -fx-border-color: lightgray;" GridPane.hgrow="ALWAYS" GridPane.rowIndex="0"> <children> <Label alignment="TOP_LEFT" layoutX="14.0" style="-fx-padding: 0 5; -fx-background-color: inherit;" text="消息设置" /> <GridPane AnchorPane.bottomAnchor="1.0" AnchorPane.leftAnchor="1.0" AnchorPane.rightAnchor="1.0" AnchorPane.topAnchor="10.0"> <columnConstraints> <ColumnConstraints /> <ColumnConstraints hgrow="ALWAYS" /> </columnConstraints> <rowConstraints> <RowConstraints /> <RowConstraints /> <RowConstraints /> </rowConstraints> <children> <Label text="消息代理" GridPane.columnIndex="0" GridPane.rowIndex="0" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <Label text="消息主题" GridPane.columnIndex="0" GridPane.rowIndex="1" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <Label text="分组标识" GridPane.columnIndex="0" GridPane.rowIndex="2" GridPane.halignment="RIGHT"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Label> <TextField fx:id="textFieldBroker" text="localhost:9092" GridPane.columnIndex="1" GridPane.rowIndex="0" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> <TextField fx:id="textFieldTopic" text="test" GridPane.columnIndex="1" GridPane.rowIndex="1" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> <TextField fx:id="textFieldGroupId" text="simple-java-consumer" GridPane.columnIndex="1" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextField> </children> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </GridPane> </children> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> <padding> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </padding> </AnchorPane> <GridPane GridPane.hgrow="ALWAYS" GridPane.rowIndex="1"> <columnConstraints> <ColumnConstraints /> <ColumnConstraints hgrow="ALWAYS" /> </columnConstraints> <rowConstraints> <RowConstraints /> </rowConstraints> <children> <Button fx:id="buttonClear" minHeight="40.0" minWidth="80.0" onAction="#handleButtonClearAction" text="清空" GridPane.columnIndex="0" GridPane.halignment="LEFT" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Button> <Button fx:id="buttonListen" minHeight="40.0" minWidth="80.0" onAction="#handleButtonListen" text="开启侦听" GridPane.columnIndex="1" GridPane.halignment="RIGHT" GridPane.hgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </Button> </children> </GridPane> <TextArea fx:id="textAreaLog" editable="false" minHeight="400.0" minWidth="600.0" wrapText="true" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS" GridPane.vgrow="ALWAYS"> <GridPane.margin> <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" /> </GridPane.margin> </TextArea> </GridPane>
KafkaSimpleConsumerController.java
/* ---------------------------------------------------------- * 文件名称:KafkaSimpleConsumerController.java * 作者:秦建辉 * * 微信:splashcn * * 博客:http://www.firstsolver.com/wordpress/ * * 开发环境: * IntelliJ IDEA 2017.2.5 * Gradle 4.3 * Apache Kafka 1.0.0 * * 版本历史: * V1.0 2017年11月06日 * Kafka Consumer 示例 * * 参考资料: * kafka 1.0.0 API http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/ * Java API Docs https://docs.oracle.com/javase/9/docs/api/index.html?overview-summary.html * * 控制台命令 * 启动 ZooKeeper 服务器 * bin\windows\zookeeper-server-start.bat config\zookeeper.properties * 启动 Kafka 服务器 * bin\windows\kafka-server-start.bat config\server.properties * 发送消息 * bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test * 开启消费者 * bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning ------------------------------------------------------------ */ package Splash; import javafx.application.Platform; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.concurrent.*; public class KafkaSimpleConsumerController implements AutoCloseable, IKafkaConsumerRecordsHandler { @FXML private TextField textFieldBroker; @FXML private TextField textFieldTopic; @FXML private TextField textFieldGroupId; @FXML private TextArea textAreaLog; @FXML private Button buttonListen; // 线程池 private final ExecutorService pool = Executors.newCachedThreadPool(); // 消费者线程是否正在运行 private Boolean IsRunning = false; // 消费者线程 private KafkaConsumerRunner Runner = null; @Override public void close() throws Exception { if (IsRunning) closeRunner(); // 释放线程池 pool.shutdown(); } private void closeRunner() throws InterruptedException { if (Runner != null) { Runner.shutdown(); Runner = null; } } // 清空 @FXML public void handleButtonClearAction(ActionEvent actionEvent) { textAreaLog.clear(); } // 提交 @FXML public void handleButtonListen(ActionEvent actionEvent) throws InterruptedException { buttonListen.setDisable(true); if (IsRunning) { closeRunner(); } else { // 获取消息代理 String Broker = textFieldBroker.getText(); if (IsNullOrEmpty(Broker)) { ShowErrorMessage("消息代理不能为空!"); return; } // 获取消息主题 String Topic = textFieldTopic.getText(); if (IsNullOrEmpty(Topic)) { ShowErrorMessage("消息主题不能为空!"); return; } // 获取分组标识 String GroupId = textFieldGroupId.getText(); if (IsNullOrEmpty(GroupId)) { ShowErrorMessage("分组标识不能为空!"); return; } Runner = new KafkaConsumerRunner(this, Broker, Topic, GroupId); pool.execute(Runner); } } @Override public void OnConsumerRecordsArrived(ConsumerRecords<String, String> records) { Platform.runLater(() -> { for (ConsumerRecord<String, String> record : records) { StringBuilder sb = new StringBuilder(); sb.append("Topic: ").append(record.topic()).append("\r\n"); sb.append("Partition: ").append(record.partition()).append("\r\n"); sb.append("Offset: ").append(record.offset()).append("\r\n"); sb.append("Message.Key: ").append(record.key()).append("\r\n"); sb.append("Message.Value: ").append(record.value()).append("\r\n\r\n"); // 显示消息 textAreaLog.appendText(sb.toString()); } }); } // 设置状态 @Override public void OnChangeStatus(Boolean isRunning) { final CountDownLatch doneLatch = new CountDownLatch(1); Platform.runLater(() -> { if (isRunning) { buttonListen.setText("停止侦听"); } else { buttonListen.setText("开启侦听"); } IsRunning = isRunning; buttonListen.setDisable(false); doneLatch.countDown(); }); try { doneLatch.await(); // 等待线程同步 } catch (InterruptedException e) { // 忽视异常 } } public static void ShowErrorMessage(String message) { Platform.runLater(() -> { Alert alert = new Alert(Alert.AlertType.ERROR, message, ButtonType.OK); alert.setTitle("KafkaSimpleConsumer"); alert.setHeaderText("错误"); alert.showAndWait(); }); } // 判断字符串是否为空 public static boolean IsNullOrEmpty(String value) { return value == null || value.isEmpty(); } }
IKafkaConsumerRecordsHandler.java
package Splash; import org.apache.kafka.clients.consumer.ConsumerRecords; public interface IKafkaConsumerRecordsHandler { // 处理消费者获取到的消息集合 public void OnConsumerRecordsArrived(ConsumerRecords<String, String> records); // 消费者处理器状态改变 public void OnChangeStatus(Boolean isRunning); }
KafkaConsumerRunner.java
package Splash; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer<String, String> consumer; private final IKafkaConsumerRecordsHandler handler; // 构造函数 public KafkaConsumerRunner(IKafkaConsumerRecordsHandler handler, String broker, String topic, String groupId) { this.handler = handler; // 配置消费者属性 Properties props = new Properties(); props.put("bootstrap.servers", broker); // 消息代理 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("group.id", groupId); // 分组标识 // 订阅主题,多个主题以空格间隔 consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic.split(" "))); // 消息主题 } @Override public void run() { handler.OnChangeStatus(true); try { while (!closed.get()) { ConsumerRecords<String, String> records = consumer.poll(100); handler.OnConsumerRecordsArrived(records); } } catch (WakeupException e) { if (!closed.get()) throw e; } finally { consumer.close(); handler.OnChangeStatus(false); } } public void shutdown() { closed.set(true); consumer.wakeup(); } }