java实现kafka消息发送和接收

今天这个实现是和集群对应的。使用的是新版的API。属性如果想定制自己的,需要到官方网址上面去查看一下对应的值。

推介大家多去看看官方的介绍和demo。网上有些翻译过来的例子并不完善,最好是知己知彼,才能百战不殆

maven:

org.apache.kafka

kafka-clients

0.11.0.0

org.apache.kafka

kafka-streams

0.11.0.0

生产者Producer:

package com.roncoo.example.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

private final KafkaProducer producer;

public final static String TOPIC = "test5";

private ProducerDemo() {

Properties props = new Properties();

props.put("bootstrap.servers", "xxx:9092,1xxx:9092,xxx:9092");//xxx服务器ip

props.put("acks", "all");//所有follower都响应了才认为消息提交成功,即"committed"

props.put("retries", 0);//retries = MAX 无限重试,直到你意识到出现了问题:)

props.put("batch.size", 16384);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数

//batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms

props.put("linger.ms", 1);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理

props.put("buffer.memory", 33554432);//producer可以用来缓存数据的内存大小。

props.put("key.serializer",

"org.apache.kafka.common.serialization.IntegerSerializer");

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer(props);

}

public void produce() {

int messageNo = 1;

final int COUNT = 5;

while(messageNo < COUNT) {

String key = String.valueOf(messageNo);

String data = String.format("hello KafkaProducer message %s from hubo 06291018 ", key);

try {

producer.send(new ProducerRecord(TOPIC, data));

} catch (Exception e) {

e.printStackTrace();

}

messageNo++;

}

producer.close();

}

public static void main(String[] args) {

new ProducerDemo().produce();

}

}

消费者Consumer:

package com.roncoo.example.kafka;

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

public class UserKafkaConsumer extends Thread {

public static void main(String[] args){

Properties properties = new Properties();

properties.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");//xxx是服务器集群的ip

properties.put("group.id", "jd-group");

properties.put("enable.auto.commit", "true");

properties.put("auto.commit.interval.ms", "1000");

properties.put("auto.offset.reset", "latest");

properties.put("session.timeout.ms", "30000");

properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);

kafkaConsumer.subscribe(Arrays.asList("test5"));

while (true) {

ConsumerRecords records = kafkaConsumer.poll(100);

for (ConsumerRecord record : records) {

System.out.println("-----------------");

System.out.printf("offset = %d, value = %s", record.offset(), record.value());

System.out.println();

}

}

}

}

消息   java   kafka
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章