- 配置:
server.properties
- 绑定Kafka服务器
- 生产者配置
- 生产者发送消息
- 消费配置
- 消费者接收消息
- 消费提交
- springboot 集成
- Kafka事务
配置:server.properties
broker.id=0
listeners=PLAINTEXT://192.168.65.60:9092
log.dir=/usr/local/data/kafka‐logs
zookeeper.connect=192.168.65.60:2181
绑定Kafka服务器
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
生产者配置
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
生产者发送消息
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, key_json, value_json);
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key_json, value_json);
RecordMetadata metadata = producer.send(producerRecord).get();
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
}
});
producer.close();
消费配置
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
消费者接收消息
- 消费者接收消息(topic):指定分区;回溯(从头,指定offset);拉取集合
consumer.subscribe(Arrays.asList(TOPIC_NAME));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime = new Date().getTime() ‐ 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
消费提交
consumer.commitSync();
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
}
});
springboot 集成
- springboot配置application.yml
spring:
kafka:
bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
producer:
retries: 3
batch‐size: 16384
buffer‐memory: 33554432
acks: 1
key‐serializer: org.apache.kafka.common.serialization.StringSerializer
value‐serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group‐id: default‐group
enable‐auto‐commit: false
auto‐offset‐reset: earliest
key‐deserializer: xxx.StringDeserializer
value‐deserializer: xxx.StringDeserializer
listener:
ack‐mode: manual_immediate
ack‐mode
- ack‐mode
- RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
- BATCH:当每一批poll()的数据被消费者监听器处理之后提交
- TIME:当每一批poll()的数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交
- COUNT:当每一批poll()的数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交
- TIME | COUNT:有一个条件满足时提交
- MANUAL:当每一批poll()的数据被消费者监听器处理之后, 手动调用Acknowledgment.acknowledge()后提交
- MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种(一次提交一条消息)
生产者 & 消费者
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
@KafkaListener(topics = "my‐replicated‐topic",groupId = "zhugeGroup")
public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
ack.acknowledge();
}
@KafkaListener(topics = "my‐replicated‐topic",groupId = "tulingGroup")
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
},concurrency = "6")
Kafka事务
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
producer.send();
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.close();