Apache Kafka(三)消息系统:简单生产者(Java)

同系列文章

源代码下载

KafkaDemo.zip

源代码下载

KafkaMessagingJavaDemo.zip

演示程序界面

源代码
build.gradle

/*
开发环境:
    jdk 8u152
    IntelliJ IDEA 2017.2.5
    Gradle 4.3

Gradle 命令集合
    gradle clean
    gradle build
    gradle run
    gradle uploadArchives

参考资料:
    Gradle Build Language Reference     https://docs.gradle.org/4.3/dsl/
 */
group 'Com.FirstSolver'
version '1.0.0.1'

apply plugin: 'application' // 打包成 zip 和 tar
apply plugin: 'java'        // 使用 java 插件

sourceCompatibility = 1.8   // JDK8
targetCompatibility = 1.8   // JDK8
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

repositories {
    // 阿里云 maven 镜像
    maven { url 'http://maven.aliyun.com/mvn/repository/' }

    // mavenCentral()
}

// 声明项目依赖
dependencies {
    // 编译项目源代码时依赖
    compile "org.apache.kafka:kafka-clients:1.0.0"

    // 编译测试代码时依赖
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

// 指定 application 主类
mainClassName = 'Splash.KafkaSimpleProducer'

// 输出 jar 文件名定义
jar {
    baseName = 'KafkaSimpleProducer'
    version = '1.0.0.1'

    manifest {
        // 设置 jar 主类
        attributes 'Main-Class': 'Splash.KafkaSimpleProducer'
    }

    // 将引用的包打入 jar 包
    // from configuration.compile.collect { zipTree it}
}

// 版本发布地址
uploadArchives {
    repositories {
        flatDir {
            dirs 'repos'
        }
    }
}

// 指定资源文件源目录
sourceSets {
    main {
        resources {
            srcDirs = ["src/main/java"] // 资源文件和 .java 文件放置在同一个目录下
            excludes = ["**/*.java"]
        }
    }
}

// 指定资源文件目标目录
processResources {
    into "$buildDir"    // 资源文件和 .class 文件打包到同一个目录下
}

/*
// 打包授权文件
task copyLicense {
    outputs.file new File("$buildDir/LICENSE")
    doLast {
        copy {
            from "LICENSE"
            into "$buildDir"
        }
    }
}

applicationDistribution.from(copyLicense) {
    into ""
}
*/

KafkaSimpleProducer.java

package Splash;

import javafx.application.Application;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.scene.image.Image;
import javafx.stage.Stage;

import java.util.Locale;

public class KafkaSimpleProducer extends Application {
    @Override
    public void start(Stage stage) throws Exception{
        Locale.setDefault(Locale.CHINESE); // 显示中文
        Parent root = FXMLLoader.load(getClass().getResource("KafkaSimpleProducer.fxml"));
        stage.setScene(new Scene(root));
        stage.sizeToScene();

        // 设置窗体标题
        stage.setTitle("Kafka Messaging 简单生产者演示");

        // 设置窗体图标
        stage.getIcons().add(new Image(getClass().getResourceAsStream("FireEyes.png")));

        // 设置到屏幕中心
        stage.centerOnScreen();

        // 显示窗体
        stage.show();
    }

    public static void main(String[] args) {
        launch(args);
    }
}

KafkaSimpleProducer.fxml

<?xml version="1.0" encoding="UTF-8"?>

<?import javafx.geometry.Insets?>
<?import javafx.scene.control.Button?>
<?import javafx.scene.control.Label?>
<?import javafx.scene.control.TextArea?>
<?import javafx.scene.control.TextField?>
<?import javafx.scene.layout.AnchorPane?>
<?import javafx.scene.layout.ColumnConstraints?>
<?import javafx.scene.layout.GridPane?>
<?import javafx.scene.layout.RowConstraints?>

<GridPane alignment="center" hgap="10" vgap="10" xmlns="http://javafx.com/javafx/8.0.121" xmlns:fx="http://javafx.com/fxml/1" fx:controller="Splash.KafkaSimpleProducerController">
    <rowConstraints>
        <RowConstraints />
        <RowConstraints />
        <RowConstraints />
        <RowConstraints />
    </rowConstraints>

    <AnchorPane style="-fx-border-insets: 8 0 0 0; -fx-background-color: #FFFFFF; -fx-border-color: lightgray;" GridPane.hgrow="ALWAYS" GridPane.rowIndex="0">
        <children>
            <Label alignment="TOP_LEFT" layoutX="14.0" style="-fx-padding: 0 5; -fx-background-color: inherit;" text="消息设置" />
            <GridPane AnchorPane.bottomAnchor="1.0" AnchorPane.leftAnchor="1.0" AnchorPane.rightAnchor="1.0" AnchorPane.topAnchor="10.0">
                <columnConstraints>
                    <ColumnConstraints />
                    <ColumnConstraints hgrow="ALWAYS" />
                </columnConstraints>

                <rowConstraints>
                    <RowConstraints />
                    <RowConstraints />
                    <RowConstraints />
                    <RowConstraints />
                </rowConstraints>

                <children>
                    <Label text="消息代理" GridPane.columnIndex="0" GridPane.rowIndex="0" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="消息主题" GridPane.columnIndex="0" GridPane.rowIndex="1" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="消息键名" GridPane.columnIndex="0" GridPane.rowIndex="2" GridPane.halignment="RIGHT">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <Label text="消息键值" GridPane.columnIndex="0" GridPane.rowIndex="3" GridPane.halignment="RIGHT" GridPane.valignment="TOP">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </Label>

                    <TextField fx:id="textFieldBroker" text="localhost:9092" GridPane.columnIndex="1" GridPane.rowIndex="0" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldTopic" text="test" GridPane.columnIndex="1" GridPane.rowIndex="1" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextField fx:id="textFieldMessageKey" GridPane.columnIndex="1" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextField>

                    <TextArea fx:id="textAreaMessageValue" GridPane.columnIndex="1" GridPane.rowIndex="3" GridPane.hgrow="ALWAYS" GridPane.vgrow="ALWAYS">
                        <GridPane.margin>
                            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                        </GridPane.margin>
                    </TextArea>
                </children>

                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </GridPane>
        </children>

        <GridPane.margin>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </GridPane.margin>

        <padding>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </padding>
    </AnchorPane>

    <GridPane GridPane.hgrow="ALWAYS" GridPane.rowIndex="1">
        <columnConstraints>
            <ColumnConstraints />
            <ColumnConstraints hgrow="ALWAYS" />
        </columnConstraints>

        <rowConstraints>
            <RowConstraints />
        </rowConstraints>

        <children>
            <Button fx:id="buttonClear" minHeight="40.0" minWidth="80.0" onAction="#handleButtonClearAction" text="清空" GridPane.columnIndex="0" GridPane.halignment="LEFT" GridPane.hgrow="ALWAYS">
                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </Button>

            <Button fx:id="buttonSubmit" minHeight="40.0" minWidth="80.0" onAction="#handleButtonSubmitAction" text="提交" GridPane.columnIndex="1" GridPane.halignment="RIGHT" GridPane.hgrow="ALWAYS">
                <GridPane.margin>
                    <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
                </GridPane.margin>
            </Button>
        </children>
    </GridPane>

    <TextArea fx:id="textAreaLog" editable="false" minHeight="400.0" minWidth="600.0" wrapText="true" GridPane.rowIndex="2" GridPane.hgrow="ALWAYS" GridPane.vgrow="ALWAYS">
        <GridPane.margin>
            <Insets bottom="4.0" left="4.0" right="4.0" top="4.0" />
        </GridPane.margin>
    </TextArea>
</GridPane>

KafkaSimpleProducerController.java

/* ----------------------------------------------------------
 * 文件名称:KafkaSimpleProducerController.java
 * 作者:秦建辉
 *
 * 微信:splashcn
 *
 * 博客:http://www.firstsolver.com/wordpress/
 *
 * 开发环境:
 *      IntelliJ IDEA 2017.2.5
 *      Gradle 4.3
 *      Apache Kafka 1.0.0
 *      JDK 8u152
 *
 * 版本历史:
 *      V1.0    2017年11月03日
 *              Kafka Producer 示例
 *
 * 参考资料:
 *      kafka 1.0.0 API     http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/
 *      Java API Docs       https://docs.oracle.com/javase/9/docs/api/index.html?overview-summary.html
 *
 * 控制台命令
 *      启动 ZooKeeper 服务器
 *          bin\windows\zookeeper-server-start.bat config\zookeeper.properties
 *      启动 Kafka 服务器
 *          bin\windows\kafka-server-start.bat config\server.properties
 *      发送消息
 *          bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
 *      开启消费者
 *          bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
------------------------------------------------------------ */
package Splash;

import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.*;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaSimpleProducerController {
    @FXML
    private TextField textFieldBroker;

    @FXML
    private TextField textFieldTopic;

    @FXML
    private TextField textFieldMessageKey;

    @FXML
    private TextArea textAreaMessageValue;

    @FXML
    private TextArea textAreaLog;

    @FXML
    private Button buttonSubmit;

    // 初始化。注意:Initializable 接口已废弃
    public void initialize() {
        textFieldMessageKey.setText("毛泽东");
        textAreaMessageValue.setText("《卜算子·咏梅》\r\n风雨送春归,\r\n飞雪迎春到。\r\n已是悬崖百丈冰,\r\n犹有花枝俏。\r\n\r\n俏也不争春,\r\n只把春来报。\r\n待到山花烂漫时,\r\n她在丛中笑。");
    }

    // 清空
    @FXML
    public void handleButtonClearAction(ActionEvent actionEvent) { textAreaLog.clear(); }

    // 提交
    @FXML
    public void handleButtonSubmitAction(ActionEvent actionEvent) {
        try { // 获取消息代理
            String Broker = textFieldBroker.getText();
            if (IsNullOrEmpty(Broker)) {
                ShowErrorMessage("消息代理不能为空!");
                return;
            }

            // 获取消息主题
            String Topic = textFieldTopic.getText();
            if (IsNullOrEmpty(Topic)) {
                ShowErrorMessage("消息主题不能为空!");
                return;
            }

            // 获取消息键名
            String MessageKey = textFieldMessageKey.getText();

            // 获取消息键值
            String MessageValue = textAreaMessageValue.getText();
            if (IsNullOrEmpty(MessageValue)) {
                ShowErrorMessage("消息键值不能为空!");
                return;
            }

            // 配置生产者属性
            Properties props = new Properties();
            props.put("bootstrap.servers", Broker);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("acks", "all");

            try (Producer<String, String> producer = new KafkaProducer<>(props)) {
                producer.send(new ProducerRecord<String, String>(Topic, MessageKey, MessageValue), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) { // 成功
                            StringBuilder sb = new StringBuilder();
                            sb.append("主题: ").append(metadata.topic()).append("\r\n分区: ").append(metadata.partition()).append("\r\n偏移: ").append(metadata.offset()).append("\r\n\r\n");
                            textAreaLog.appendText(sb.toString());
                        }
                        else { // 异常
                            ShowErrorMessage(exception.getMessage());
                        }
                    }
                });
            }
        }
        catch (Exception exception){
            ShowErrorMessage(exception.getMessage());
        }
    }

    public void ShowErrorMessage(String message) {
        Alert alert = new Alert(Alert.AlertType.ERROR, message, ButtonType.OK);
        alert.setTitle("KafkaSimpleProducer");
        alert.setHeaderText("错误");
        alert.showAndWait();
    }

    // 判断字符串是否为空
    public static boolean IsNullOrEmpty(String value) {
        return value == null || value.isEmpty();
    }
}

Comments are closed.