Consumer
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "group1");
prop.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple")); // 토픽 구동
while(조건){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
Systme.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
}
}
consumer.close();
커밋과 오프셋
커밋된 오프셋이 없는 경우
처음 접근이거나 커밋한 오프셋이 없는 경우
auto.offset.reset 설정 사용
컨슈머 설정
조회에 영향을 주는 주요 설정
fetch.min.bytes: 조회시 브로커가 전송할 최소 데이터 크기
fetch.max.wait.ms: 데이터가 최소 크기가 될 때까지 기다릴 시간
max.partition.fetch.bytes: 파티션 당 서버가 리턴할 수 있는 최대 크기
자동 커밋/수동 커밋
수동 커밋 : 동기/비동기 커밋
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
... 처리
}
try {
consumer.commitSync();
} catch(Exception ex) {
// 커밋 실패시 에러 발생
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
... 처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)
재처리와 순서
세션 타임아웃, 하트비트, 최대 poll 간격
종료 처리
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singleton("simple"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // wakeup() 호출시 익셉션 발생
... records 처리
try {
consumer.commitAsync();
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception ex) {
...
} finally {
consumer.close();
}
주의: 쓰레드 안전하지 않음
Refs