Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "group1");
prop.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // 토픽 구동
while(조건){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
Systme.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
}
}
consumer.close();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
... 처리
}
try {
consumer.commitSync();
} catch(Exception ex) {
// 커밋 실패시 에러 발생
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
... 처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 익셉션 발생
... records 처리
try {
consumer.commitAsync();
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception ex) {
...
} finally {
consumer.close();
}