每天分享最新软件开发,Devops,敏捷,测试以及项目管理最新,最热门的文章,每天花3分钟学习何乐而不为,希望大家点赞,加关注,你的支持是我最大的动力。
在Kafka集群中创建Topics很容易,并且kafka-topics.sh在官方 API 文档中都有很好的记录。
bin/kafka-topics.sh --help
当您尝试执行定义Topics命名的标准方法时,就会出现复杂性。有很多方法可以根据您的需要确定正确的约定,但是在创建Topics约定时强制执行此类Topics约定在此 5 步博客中进行了说明。
没有正确的约定:它始终取决于您的业务需求。
对于我的示例,我希望定义一个遵循语义的主题约定:
.
它很容易上手,并且可以很容易地扩展,你会在跟随的过程中观察到。
从官方文档中,如果您希望定义自定义topic策略创建,则必须定义属性:
create.topic.policy.class.name = mypackage.className
className应该实现接口:
org.apache.kafka.server.policy.CreateTopicPolicy
有了这两个构建块,让我们定义一个 Maven 项目:
让我们定义一个包“me.samarthya”,并在“pom.xml”中添加Kafka 客户端的依赖项。
XML
org.apache.kafka
kafka-clients
3.2.0
compile
让我们定义主类TopicPolicy:
爪哇
public class TopicPolicy implements CreateTopicPolicy {
private final Logger logger = Logger.getLogger(TopicPolicy.class.toString());
private final static String TopicPattern = "\w+\.{1}\w+";
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
StringBuilder bd = new StringBuilder().append(" Topic Name=").append(requestMetadata.topic());
logger.info(bd.toString());
if ( requestMetadata.topic().isEmpty() || !Pattern.matches(TopicPattern, requestMetadata.topic())) {
throw new PolicyViolationException("Topic name " + requestMetadata.topic() + " should match the pattern " + TopicPattern);
}
}
@Override
public void close() throws Exception {
logger.info(" Close & release.");
}
@Override
public void configure(Map configs) {
if (configs != null) {
for( String k: configs.keySet()) {
logger.info(configs.get(k).toString());
}
}
}
}
定义类后,要观察的主要是TopicPattern 已定义为格式的 将与名称匹配。如果没有找到,PolicyViolationException将抛出 a。
将jar. 它必须放在 Kafka(类路径)的“lib”文件夹下。
-rw-r--r--. 1 vagrant vagrant 3881 Jul 12 06:28 topic-policy-1.0-SNAPSHOT.jar
此外,在“server.properties”中,您可以定义两个属性:
属性文件
create.topic.policy.class.name=me.samarthya.TopicPolicy
auto.create.topics.enable=false
重新启动您的集群。
让我们回到 Kafka 二进制文件夹(本地机器),topic再次发出创建命令。
bin/kafka-topics.sh --bootstrap-server mybroker.test:9092 --topic invalid_topic --create
如果jar已成功加载,您应该会看到如下报告的错误:
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Error while executing topic command : Topic name invalid_topic should match the pattern \w+\.{1}\w+
[2022-07-13 09:49:21,805] ERROR org.apache.kafka.common.errors.PolicyViolationException: Topic name invalid_topic should match the pattern \w+\.{1}\w+
(kafka.admin.TopicCommand$)
您现在可以根据自己的方便修改模式并重新部署jar以检查新的自定义主题策略。
bin/kafka-topics.sh --bootstrap-server mybroker.test.test:9092 --topic invalid.valid --create
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
由于自动创建主题已被禁用,如果您尝试通过创建无效主题producer,它将无法正常工作(见下文)。
bin/kafka-console-producer.sh --bootstrap-server mybroker.test:9092 --topic test
这将导致以下错误:
[2022-07-13 09:54:21,196] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
对于现有主题invalid.valid,它应该按如下方式工作:
bin/kafka-console-producer.sh --bootstrap-server mybrokers.test:9092 --topic invalid.valid
留言与评论(共有 0 条评论) “” |