Kafka Topics 命名

在这篇文章中,了解如何轻松地对 Apache Kafka 主题实施命名约定。

每天‬分享‬最新‬软件‬开发‬,Devops,敏捷‬,测试‬以及‬项目‬管理‬最新‬,最热门‬的‬文章‬,每天‬花‬3分钟‬学习‬何乐而不为‬,希望‬大家‬点赞‬,加‬关注‬,你的‬支持‬是我‬最大‬的‬动力‬。



在Kafka集群中创建Topics很容易,并且kafka-topics.sh在官方 API 文档中都有很好的记录。

 bin/kafka-topics.sh --help


当您尝试执行定义Topics命名的标准方法时,就会出现复杂性。有很多方法可以根据您的需要确定正确的约定,但是在创建Topics约定时强制执行此类Topics约定在此 5 步博客中进行了说明。

没有正确的约定:它始终取决于您的业务需求。

对于我的示例,我希望定义一个遵循语义的主题约定:

.


它很容易上手,并且可以很容易地扩展,你会在跟随的过程中观察到。

从官方文档中,如果您希望定义自定义topic策略创建,则必须定义属性:


create.topic.policy.class.name = mypackage.className


className应该实现接口:

org.apache.kafka.server.policy.CreateTopicPolicy

第 1 步:构建项目

有了这两个构建块,让我们定义一个 Maven 项目:

Kafka Topics 命名

第 2 步:定义依赖关系

让我们定义一个包“me.samarthya”,并在“pom.xml”中添加Kafka 客户端的依赖项。

XML

 
   org.apache.kafka
   kafka-clients
   3.2.0
   compile


第 3 步:实施

让我们定义主类TopicPolicy

爪哇

public class TopicPolicy implements CreateTopicPolicy {
    private final Logger logger = Logger.getLogger(TopicPolicy.class.toString());

    private final static String TopicPattern = "\w+\.{1}\w+";

    @Override
    public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
        StringBuilder bd = new StringBuilder().append(" Topic Name=").append(requestMetadata.topic());
        logger.info(bd.toString());
        if ( requestMetadata.topic().isEmpty() || !Pattern.matches(TopicPattern, requestMetadata.topic())) {
            throw new PolicyViolationException("Topic name " + requestMetadata.topic() + " should match the pattern " + TopicPattern);
        }
    }

    @Override
    public void close() throws Exception {
        logger.info(" Close & release.");
    }

    @Override
    public void configure(Map configs) {
        if (configs != null) {
            for( String k: configs.keySet()) {
                logger.info(configs.get(k).toString());
            }
        }
    }
}


定义类后,要观察的主要是TopicPattern 已定义为格式的 将与名称匹配。如果没有找到,PolicyViolationException将抛出 a。

第 4 步:对集群中的每个代理重复此操作

jar. 它必须放在 Kafka(类路径)的“lib”文件夹下。


-rw-r--r--. 1 vagrant vagrant     3881 Jul 12 06:28 topic-policy-1.0-SNAPSHOT.jar


此外,在“server.properties”中,您可以定义两个属性:

属性文件

create.topic.policy.class.name=me.samarthya.TopicPolicy
auto.create.topics.enable=false


重新启动您的集群。

第 5 步:测试您的“主题”

让我们回到 Kafka 二进制文件夹(本地机器),topic再次发出创建命令。

 bin/kafka-topics.sh --bootstrap-server mybroker.test:9092  --topic invalid_topic --create


如果jar已成功加载,您应该会看到如下报告的错误:


WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Error while executing topic command : Topic name invalid_topic should match the pattern \w+\.{1}\w+
[2022-07-13 09:49:21,805] ERROR org.apache.kafka.common.errors.PolicyViolationException: Topic name invalid_topic should match the pattern \w+\.{1}\w+
 (kafka.admin.TopicCommand$)


您现在可以根据自己的方便修改模式并重新部署jar以检查新的自定义主题策略。

例子


bin/kafka-topics.sh --bootstrap-server mybroker.test.test:9092  --topic invalid.valid --create


WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.


笔记

由于自动创建主题已被禁用,如果您尝试通过创建无效主题producer,它将无法正常工作(见下文)。


 bin/kafka-console-producer.sh --bootstrap-server mybroker.test:9092 --topic test


这将导致以下错误:

[2022-07-13 09:54:21,196] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)


对于现有主题invalid.valid,它应该按如下方式工作:

bin/kafka-console-producer.sh --bootstrap-server mybrokers.test:9092 --topic invalid.valid
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章