Apache Kafka(四)消息系统:简单消费者(Java)

同系列文章

源代码下载

KafkaDemo.zip

演示程序界面

源代码
build.gradle

/*
开发环境:
    jdk 8u152
    IntelliJ IDEA 2017.2.5
    Gradle 4.3

Gradle 命令集合
    gradle clean
    gradle build
    gradle run
    gradle uploadArchives

参考资料:
    Gradle Build Language Reference     https://docs.gradle.org/4.3/dsl/
 */
group 'Com.FirstSolver'
version '1.0.0.1'

apply plugin: 'application' // 打包成 zip 和 tar
apply plugin: 'java'        // 使用 java 插件

sourceCompatibility = 1.8   // JDK8
targetCompatibility = 1.8   // JDK8
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

repositories {
    // 阿里云 maven 镜像
    maven { url 'http://maven.aliyun.com/mvn/repository/' }

    // mavenCentral()
}

// 声明项目依赖
dependencies {
    // 编译项目源代码时依赖
    compile "org.apache.kafka:kafka-clients:1.0.0"

    // 编译测试代码时依赖
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

// 指定 application 主类
mainClassName = 'Splash.KafkaSimpleConsumer'

// 输出 jar 文件名定义
jar {
    baseName = 'KafkaSimpleConsumer'
    version = '1.0.0.1'

    manifest {
        // 设置 jar 主类
        attributes 'Main-Class': 'Splash.KafkaSimpleConsumer'
    }

    // 将引用的包打入 jar 包
    // from configuration.compile.collect { zipTree it}
}

// 版本发布地址
uploadArchives {
    repositories {
        flatDir {
            dirs 'repos'
        }
    }
}

// 指定资源文件源目录
sourceSets {
    main {
        resources {
            srcDirs = ["src/main/java"] // 资源文件和 .java 文件放置在同一个目录下
            excludes = ["**/*.java"]
        }
    }
}

// 指定资源文件目标目录
processResources {
    into "$buildDir"    // 资源文件和 .class 文件打包到同一个目录下
}

/*
// 打包授权文件
task copyLicense {
    outputs.file new File("$buildDir/LICENSE")
    doLast {
        copy {
            from "LICENSE"
            into "$buildDir"
        }
    }
}

applicationDistribution.from(copyLicense) {
    into ""
}
*/

KafkaSimpleConsumer.java

package Splash;

import javafx.application.Application;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.scene.image.Image;
import javafx.stage.Stage;
import java.util.Locale;
import javafx.stage.WindowEvent;

public class KafkaSimpleConsumer extends Application {
    @Override
    public void start(Stage stage) throws Exception{
        Locale.setDefault(Locale.CHINESE); // 显示中文

        FXMLLoader loader = new FXMLLoader(getClass().getResource("KafkaSimpleConsumer.fxml"));
        Parent root = loader.load();
        stage.setScene(new Scene(root));
        stage.sizeToScene();

        // 设置窗体标题
        stage.setTitle("Kafka 简单消费者演示");

        // 设置窗体图标
        stage.getIcons().add(new Image(getClass().getResourceAsStream("FireEyes.png")));

        // 设置到屏幕中心
        stage.centerOnScreen();

        // 设置窗口关闭处理函数
        stage.setOnCloseRequest((WindowEvent e) -> {
            try {
                KafkaSimpleConsumerController controller = loader.getController();
                controller.close();
            } catch (Exception e1) {
                // 忽视异常
            }
        });
        
        // 显示窗体
        stage.show();
    }

    public static void main(String[] args) {
        launch(args);
    }
}

KafkaSimpleConsumer.fxml

<?xml version="1.0" encoding="UTF-8"?>

<?import javafx.geometry.Insets?>
<?import javafx.scene.control.Button?>
<?import javafx.scene.control.Label?>
<?import javafx.scene.control.TextArea?>
<?import javafx.scene.control.TextField?>
<?import javafx.scene.layout.AnchorPane?>
<?import javafx.scene.layout.ColumnConstraints?>
<?import javafx.scene.layout.GridPane?>
<?import javafx.scene.layout.RowConstraints?>

<GridPane alignment="center" hgap="10" vgap="10" xmlns="http://javafx.com/javafx/8.0.121" xmlns:fx="http://javafx.com/fxml/1" fx:controller="Splash.KafkaSimpleConsumerController">
    <rowConstraints>
        <RowConstraints />
        <RowConstraints />
        <RowConstraints />
    </rowConstraints>

    <AnchorPane style="-fx-border-insets: 8 0 0 0; -fx-background-color: #FFFFFF; -fx-border-color: lightgray;" GridPane.hgrow="ALWAYS" GridPane.rowIndex="0">
        <children>
            <Label alignment="TOP_LEFT" layoutX="14.0" style="-fx-padding: 0 5; -fx-background-color: inherit;" text="消息设置" />
            <GridPane AnchorPane.bottomAnchor="1.0" AnchorPane.leftAnchor="1.0" AnchorPane.rightAnchor="1.0" AnchorPane.topAnchor="10.0">
                <columnConstraints>
                    <ColumnConstraints />
                    <ColumnConstraints hgrow="ALWAYS" />
                </columnConstraints>

                <rowConstraints>
                    <RowConstraints />
                    <RowConstraints />
                    <RowConstraints />
                </rowConstraints>

                <children>
                    <Label text="消息代理" GridPane.columnIndex="0" GridPane.rowIndex="0" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="消息主题" GridPane.columnIndex="0" GridPane.rowIndex="1" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="分组标识" GridPane.columnIndex="0" GridPane.rowIndex="2" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <TextField fx:id="textFieldBroker" text="localhost:9092" GridPane.columnIndex="1" GridPane.rowIndex="0" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldTopic" text="test" GridPane.columnIndex="1" GridPane.rowIndex="1" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldGroupId" text="simple-java-consumer" GridPane.columnIndex="1" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>
                </children>

                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </GridPane>
        </children>

        <GridPane.margin>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </GridPane.margin>

        <padding>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </padding>
    </AnchorPane>

    <GridPane GridPane.hgrow="ALWAYS" GridPane.rowIndex="1">
        <columnConstraints>
            <ColumnConstraints />
            <ColumnConstraints hgrow="ALWAYS" />
        </columnConstraints>

        <rowConstraints>
            <RowConstraints />
        </rowConstraints>

        <children>
            <Button fx:id="buttonClear" minHeight="40.0" minWidth="80.0" onAction="#handleButtonClearAction" text="清空" GridPane.columnIndex="0" GridPane.halignment="LEFT" GridPane.hgrow="ALWAYS">
                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </Button>

            <Button fx:id="buttonListen" minHeight="40.0" minWidth="80.0" onAction="#handleButtonListen" text="开启侦听" GridPane.columnIndex="1" GridPane.halignment="RIGHT" GridPane.hgrow="ALWAYS">
                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </Button>
        </children>
    </GridPane>

    <TextArea fx:id="textAreaLog" editable="false" minHeight="400.0" minWidth="600.0" wrapText="true" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS" GridPane.vgrow="ALWAYS">
        <GridPane.margin>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </GridPane.margin>
    </TextArea>
</GridPane>

KafkaSimpleConsumerController.java

/* ----------------------------------------------------------
 * 文件名称:KafkaSimpleConsumerController.java
 * 作者:秦建辉
 *
 * 微信:splashcn
 *
 * 博客:http://www.firstsolver.com/wordpress/
 *
 * 开发环境:
 *      IntelliJ IDEA 2017.2.5
 *      Gradle 4.3
 *      Apache Kafka 1.0.0
 *
 * 版本历史:
 *      V1.0    2017年11月06日
 *              Kafka Consumer 示例
 *
 * 参考资料:
 *      kafka 1.0.0 API     http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/
 *      Java API Docs       https://docs.oracle.com/javase/9/docs/api/index.html?overview-summary.html
 *
 * 控制台命令
 *      启动 ZooKeeper 服务器
 *          bin\windows\zookeeper-server-start.bat config\zookeeper.properties
 *      启动 Kafka 服务器
 *          bin\windows\kafka-server-start.bat config\server.properties
 *      发送消息
 *          bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
 *      开启消费者
 *          bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
------------------------------------------------------------ */
package Splash;

import javafx.application.Platform;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.*;

public class KafkaSimpleConsumerController implements AutoCloseable, IKafkaConsumerRecordsHandler {
    @FXML
    private TextField textFieldBroker;

    @FXML
    private TextField textFieldTopic;

    @FXML
    private TextField textFieldGroupId;

    @FXML
    private TextArea textAreaLog;

    @FXML
    private Button buttonListen;

    // 线程池
    private final ExecutorService pool = Executors.newCachedThreadPool();

    // 消费者线程是否正在运行
    private Boolean IsRunning = false;

    // 消费者线程
    private KafkaConsumerRunner Runner = null;

    @Override
    public void close() throws Exception {
        if (IsRunning) closeRunner();

        // 释放线程池
        pool.shutdown();
    }

    private void closeRunner() throws InterruptedException {
        if (Runner != null) {
            Runner.shutdown();
            Runner = null;
        }
    }

    // 清空
    @FXML
    public void handleButtonClearAction(ActionEvent actionEvent) {
        textAreaLog.clear();
    }

    // 提交
    @FXML
    public void handleButtonListen(ActionEvent actionEvent) throws InterruptedException {
        buttonListen.setDisable(true);
        if (IsRunning) {
            closeRunner();
        } else {
            // 获取消息代理
            String Broker = textFieldBroker.getText();
            if (IsNullOrEmpty(Broker)) {
                ShowErrorMessage("消息代理不能为空!");
                return;
            }

            // 获取消息主题
            String Topic = textFieldTopic.getText();
            if (IsNullOrEmpty(Topic)) {
                ShowErrorMessage("消息主题不能为空!");
                return;
            }

            // 获取分组标识
            String GroupId = textFieldGroupId.getText();
            if (IsNullOrEmpty(GroupId)) {
                ShowErrorMessage("分组标识不能为空!");
                return;
            }

            Runner = new KafkaConsumerRunner(this, Broker, Topic, GroupId);
            pool.execute(Runner);
        }
    }

    @Override
    public void OnConsumerRecordsArrived(ConsumerRecords<String, String> records) {
        Platform.runLater(() -> {
            for (ConsumerRecord<String, String> record : records) {
                StringBuilder sb = new StringBuilder();
                sb.append("Topic: ").append(record.topic()).append("\r\n");
                sb.append("Partition: ").append(record.partition()).append("\r\n");
                sb.append("Offset: ").append(record.offset()).append("\r\n");
                sb.append("Message.Key: ").append(record.key()).append("\r\n");
                sb.append("Message.Value: ").append(record.value()).append("\r\n\r\n");

                // 显示消息
                textAreaLog.appendText(sb.toString());
            }
        });
    }

    // 设置状态
    @Override
    public void OnChangeStatus(Boolean isRunning) {
        final CountDownLatch doneLatch = new CountDownLatch(1);
        Platform.runLater(() -> {
            if (isRunning) {
                buttonListen.setText("停止侦听");
            }
            else {
                buttonListen.setText("开启侦听");
            }
            IsRunning = isRunning;
            buttonListen.setDisable(false);
            doneLatch.countDown();
        });
        try {
            doneLatch.await();  // 等待线程同步
        } catch (InterruptedException e) {
            // 忽视异常
        }
    }

    public static void ShowErrorMessage(String message) {
        Platform.runLater(() -> {
            Alert alert = new Alert(Alert.AlertType.ERROR, message, ButtonType.OK);
            alert.setTitle("KafkaSimpleConsumer");
            alert.setHeaderText("错误");
            alert.showAndWait();
        });
    }

    // 判断字符串是否为空
    public static boolean IsNullOrEmpty(String value) {
        return value == null || value.isEmpty();
    }
}

IKafkaConsumerRecordsHandler.java

package Splash;

import org.apache.kafka.clients.consumer.ConsumerRecords;

public interface IKafkaConsumerRecordsHandler {
    // 处理消费者获取到的消息集合
    public void OnConsumerRecordsArrived(ConsumerRecords<String, String> records);

    // 消费者处理器状态改变
    public void OnChangeStatus(Boolean isRunning);
}

KafkaConsumerRunner.java

package Splash;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, String> consumer;
    private final IKafkaConsumerRecordsHandler handler;

    // 构造函数
    public KafkaConsumerRunner(IKafkaConsumerRecordsHandler handler, String broker, String topic, String groupId) {
        this.handler = handler;

        // 配置消费者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", broker); // 消息代理
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", groupId); // 分组标识

        // 订阅主题,多个主题以空格间隔
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic.split(" "))); // 消息主题
    }

    @Override
    public void run() {
        handler.OnChangeStatus(true);
        try {
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                handler.OnConsumerRecordsArrived(records);
            }
        }
        catch (WakeupException e) {
            if (!closed.get()) throw e;
        }
        finally {
            consumer.close();
            handler.OnChangeStatus(false);
        }
    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

Comments are closed.