예제 및 테스트 코드는 github 에서 확인 가능합니다.
Kafka MessageListener 에서 max.poll.records 옵션의 동작
이번에는 카프카 컨슈머의 Listener
와 max.poll.records
옵션의 관계
그리고 제가 가지고 있던 오해에 대해 알아보겠습니다.
먼저 카프카 컨슈머의 구현체인 리스너는 크게 다음과 같이 나누어져 있습니다.
MessageListener
: Record 를 1개씩 처리한다BatchMessageListener
: Record 다수를 한번에 처리한다
그리고 max.poll.records
옵션은 다음과 같습니다.
- 컨슈머가
polling
시 최대로 가져갈 수 있는 record 개수 (defualt : 500개)
그렇다면 MessageListener
로 컨슈머를 구현하고 max.poll.records
옵션이 10개라고 가정한다면
컨슈머는 데이터를 어떻게 읽어올까요?
MessageListener
는 record 를 1개씩 단건으로 처리한다고 했는데요, 여러개를 처리할수 있는걸까요?
혹은 max.poll.records
설정이 무시되는걸까요?
이 글은 저 의문에서 시작되어 작성하게 되었습니다.
카프카 컨슈머 예제 코드
한번 예제코드를 통해 실제 컨슈머에서는 어떻게 동작하는지 확인해보겠습니다.
application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:29092
group-id: record-test-group
topics:
record-test: record-test
KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = consumerConfig();
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // max.poll.records 설정
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
RecordConsumer.java
@Component
@Slf4j
public class RecordConsumer {
@KafkaListener(
topics = "${spring.kafka.topics.record-test}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "containerFactory"
)
public void consume(@Payload String message, @Headers MessageHeaders messageHeaders) {
log.info("message: {}", message);
log.info("messageHeaders: {}", messageHeaders);
}
}
max.poll.records
는 10개로 설정하였습니다.
이제 토픽에 메시지 5개 정도를 쌓아두고 컨슈머를 실행하면 데이터를 어떻게 읽어오는지 확인해보겠습니다.
그럼 컨슈머를 실행해보겠습니다. 다음과 같이 max.poll.records
가 10개로 잘 설정된것을 볼 수 있습니다.
그리고 로그를 확인해보면 다음과 같이 단건으로 실행된것을 확인할 수 있습니다.
max.poll.records
설정이 MessageListener
에서는 동작하지 않는것일까요?
그럼 리스너의 페이로드를 List
로 변경해서 받아보면 어떨까요?
이번엔 메시지 3개를 프로듀싱하고 다시 컨슈머를 실행해보겠습니다.
이번에도 역시 동일하게 단건으로 처리하고 있습니다.
그렇다면 max.poll.records
설정은 BatchListener
에서만 유효한걸까요?
이번에는 컨슈머에서 레코드를 polling
하는 과정을 한번 따라가보겠습니다.
Kafka Consumer Polling 동작 원리
컨슈머는 메시지를 가져올때 주기적으로 브로커에게 poll()
메소드를 통해서 데이터를 가져오고 있습니다.
주기적으로 polling
하는곳을 먼저 찾아가 보겠습니다.
1. KafkaMessageListenerContainer.java
KafkaMessageListenerContainer
의 run 메소드를 살펴보면 무한루프를 돌면서 주기적으로
pollAndInvoke()
를 이용해 주기적으로 메시지를 polling
하고 있습니다.
그리고 doPoll
메소드를 통해 ConsumerRecord
로 메시지를 읽어 가져옵니다.
그럼 한번 doPoll
메소드를 살펴보겠습니다.
isBatchListener
: 배치 리스너인지 여부를 반환subBatchperPartition
: 배치를 파티션별로 분할할지 여부를 반환, default is null
저희는 배치 리스너도 아니고 subBatchperPartition
설정도 별도로 하지 않았기에 두 값 모두 false 여서 else 로 빠지게 됩니다.
그리고 pollConsumer()
를 따라가보니 consumer 의 poll 메소드를 호출하네요. 계속 따라가 보겠습니다.
2. KafkaConsumer.java
해당 poll()
메소드에서는 지정한 타임아웃 시간만큼 루프를 돌면서 pollForFetches(timer)
메소드를 호출합니다.
- sendFetches(): Fetch 요청을 Queue 에 넣습니다. 이후 RequestFuture 타입을 반환하고 onSuccess 콜백 메서드에서 fetch 가 완료된 레코드들을 completedFetches 큐에 넣습니다.
- client.poll():
ConsumerNetworkClient
객체에서 fetch 요청을 실제 네트워크를 태워서 보냅니다. - fetcher.collectFetch(): completedFetches 큐에 레코드가 있으면 반환합니다.
이때,max.poll.records
의 개수만큼 반환합니다.
pollForFetches
메소드에서는 다음과 같은 동작을 수행합니다.
이때, 실제 네트워크 요청을 하는 ConsumerNetworkClient
객체는 데이터를 bytes
형태로 가져와
keyDeserializer / valueDeseializer 를 통해 역직렬화 후 completedFetches 에 데이터를 넣습니다.
그럼 이제 fetcher.collectFetch()
내부를 들여다보겠습니다.
3. AbstractFetch.collectFetch
collectFetch()
메소드를 간략하게 가져왔습니다.
recordsRemaining 는 max.poll.records
의 설정 값이고 그 아래를 보면 recordsRemaining > 0
의 경우 계속 루프를 돌며 레코드를 가져오는 것을 볼 수 있습니다.
Fetch<K, V nextFetch = fetchRecords(recordsRemaining)
에서는 recordsRemaining 수 만큼 레코드를 가져오고 있습니다.
CompletedFetch.fetchRecords
다음을 보시면 maxRecords
만큼 for loop 를 수행하면서 records 를 넣고 offset 을 기록합니다.
만약 max.poll.records
값이 5이고 컨슘해야할 레코드들이 10개라면 5개만 가져오게 되는것이죠.
이후 recordsRemaining -= nextFetch.numRecords();
는 가져온 레코드 수 만큼 recordsRemaining 를 차감하고 있습니다.
그래서 만약 recordsRemaining 이 5개이고 fetchRecords(recordsRemaining)
를 통해 5개의 레코드를 가져왔다면 차감되어 recordsRemaining 는 0이 되고 while 조건은 종료되게 됩니다.
반대로 fetchRecords(recordsRemaining)
으로 가져온 레코드의 개수가 recordsRemaining 보다 적다면
그 다음 읽어온 records 가 없기 때문에 아래 코드와 같이 루프가 종료되게 됩니다.
CompletedFetch<K, V> records = completedFetches.peek();
if (records == null) break;
결론
컨슈머의 polling 과정을 따라가 보며 max.poll.records
설정은
MessageListener
가 한번에 처리할 수 있는 레코드의 개수가 아니라
컨슈머가 브로커에게 메시지를 가져오기 위해 polling
요청으로 가져올 수 있는 레코드의 최대 개수라는것을 알았습니다.
컨슈머는 poll 요청으로 max.poll.records
개수만큼 레코드를 한번에 가져온 후
내부적으로 리스너에게 단건 혹은 배치로 전달하게 됩니다.
문서에도 polling 시 가져오는 최대 개수라고 명시되어있지만 저는 그것을 그대로 리스너에게 전달하거나,
혹은 max.poll.records
설정이 동작하지 않나? 라는 잘못된 오해를 하고있었는데요.
이번 기회를 통해 컨슈머의 내부 동작과정을 얕게나마 살펴본것이 많은 도움이 된것 같습니다.
감사합니다.
Reference
'Kafka' 카테고리의 다른 글
Kafka 와 Redis 의 Pub/Sub 비교 (0) | 2024.04.29 |
---|