예제 및 테스트 코드는 github 에서 확인 가능합니다.
Kafka MessageListener 에서 max.poll.records 옵션의 동작
이번에는 카프카 컨슈머의 Listener 와 max.poll.records 옵션의 관계
그리고 제가 가지고 있던 오해에 대해 알아보겠습니다.
먼저 카프카 컨슈머의 구현체인 리스너는 크게 다음과 같이 나누어져 있습니다.
MessageListener: Record 를 1개씩 처리한다BatchMessageListener: Record 다수를 한번에 처리한다
그리고 max.poll.records 옵션은 다음과 같습니다.
- 컨슈머가
polling시 최대로 가져갈 수 있는 record 개수 (defualt : 500개)
그렇다면 MessageListener 로 컨슈머를 구현하고 max.poll.records 옵션이 10개라고 가정한다면
컨슈머는 데이터를 어떻게 읽어올까요?
MessageListener 는 record 를 1개씩 단건으로 처리한다고 했는데요, 여러개를 처리할수 있는걸까요?
혹은 max.poll.records 설정이 무시되는걸까요?
이 글은 저 의문에서 시작되어 작성하게 되었습니다.
카프카 컨슈머 예제 코드
한번 예제코드를 통해 실제 컨슈머에서는 어떻게 동작하는지 확인해보겠습니다.
application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:29092
group-id: record-test-group
topics:
record-test: record-test
KafkaConsumerConfig.java
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = consumerConfig();
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // max.poll.records 설정
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return config;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
RecordConsumer.java
@Component
@Slf4j
public class RecordConsumer {
@KafkaListener(
topics = "${spring.kafka.topics.record-test}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "containerFactory"
)
public void consume(@Payload String message, @Headers MessageHeaders messageHeaders) {
log.info("message: {}", message);
log.info("messageHeaders: {}", messageHeaders);
}
}
max.poll.records 는 10개로 설정하였습니다.
이제 토픽에 메시지 5개 정도를 쌓아두고 컨슈머를 실행하면 데이터를 어떻게 읽어오는지 확인해보겠습니다.

그럼 컨슈머를 실행해보겠습니다. 다음과 같이 max.poll.records 가 10개로 잘 설정된것을 볼 수 있습니다.

그리고 로그를 확인해보면 다음과 같이 단건으로 실행된것을 확인할 수 있습니다.

max.poll.records 설정이 MessageListener 에서는 동작하지 않는것일까요?
그럼 리스너의 페이로드를 List 로 변경해서 받아보면 어떨까요?
이번엔 메시지 3개를 프로듀싱하고 다시 컨슈머를 실행해보겠습니다.


이번에도 역시 동일하게 단건으로 처리하고 있습니다.
그렇다면 max.poll.records 설정은 BatchListener 에서만 유효한걸까요?
이번에는 컨슈머에서 레코드를 polling 하는 과정을 한번 따라가보겠습니다.
Kafka Consumer Polling 동작 원리
컨슈머는 메시지를 가져올때 주기적으로 브로커에게 poll() 메소드를 통해서 데이터를 가져오고 있습니다.
주기적으로 polling 하는곳을 먼저 찾아가 보겠습니다.
1. KafkaMessageListenerContainer.java

KafkaMessageListenerContainer 의 run 메소드를 살펴보면 무한루프를 돌면서 주기적으로
pollAndInvoke() 를 이용해 주기적으로 메시지를 polling 하고 있습니다.

그리고 doPoll 메소드를 통해 ConsumerRecord 로 메시지를 읽어 가져옵니다.
그럼 한번 doPoll 메소드를 살펴보겠습니다.

isBatchListener: 배치 리스너인지 여부를 반환subBatchperPartition: 배치를 파티션별로 분할할지 여부를 반환, default is null
저희는 배치 리스너도 아니고 subBatchperPartition 설정도 별도로 하지 않았기에 두 값 모두 false 여서 else 로 빠지게 됩니다.

그리고 pollConsumer() 를 따라가보니 consumer 의 poll 메소드를 호출하네요. 계속 따라가 보겠습니다.
2. KafkaConsumer.java

해당 poll() 메소드에서는 지정한 타임아웃 시간만큼 루프를 돌면서 pollForFetches(timer) 메소드를 호출합니다.

- sendFetches(): Fetch 요청을 Queue 에 넣습니다. 이후 RequestFuture 타입을 반환하고 onSuccess 콜백 메서드에서 fetch 가 완료된 레코드들을 completedFetches 큐에 넣습니다.
- client.poll():
ConsumerNetworkClient객체에서 fetch 요청을 실제 네트워크를 태워서 보냅니다. - fetcher.collectFetch(): completedFetches 큐에 레코드가 있으면 반환합니다.
이때,max.poll.records의 개수만큼 반환합니다.
pollForFetches 메소드에서는 다음과 같은 동작을 수행합니다.
이때, 실제 네트워크 요청을 하는 ConsumerNetworkClient 객체는 데이터를 bytes 형태로 가져와
keyDeserializer / valueDeseializer 를 통해 역직렬화 후 completedFetches 에 데이터를 넣습니다.
그럼 이제 fetcher.collectFetch() 내부를 들여다보겠습니다.
3. AbstractFetch.collectFetch

collectFetch() 메소드를 간략하게 가져왔습니다.
recordsRemaining 는 max.poll.records 의 설정 값이고 그 아래를 보면 recordsRemaining > 0 의 경우 계속 루프를 돌며 레코드를 가져오는 것을 볼 수 있습니다.
Fetch<K, V nextFetch = fetchRecords(recordsRemaining) 에서는 recordsRemaining 수 만큼 레코드를 가져오고 있습니다.
CompletedFetch.fetchRecords

다음을 보시면 maxRecords 만큼 for loop 를 수행하면서 records 를 넣고 offset 을 기록합니다.
만약 max.poll.records 값이 5이고 컨슘해야할 레코드들이 10개라면 5개만 가져오게 되는것이죠.
이후 recordsRemaining -= nextFetch.numRecords(); 는 가져온 레코드 수 만큼 recordsRemaining 를 차감하고 있습니다.
그래서 만약 recordsRemaining 이 5개이고 fetchRecords(recordsRemaining) 를 통해 5개의 레코드를 가져왔다면 차감되어 recordsRemaining 는 0이 되고 while 조건은 종료되게 됩니다.
반대로 fetchRecords(recordsRemaining) 으로 가져온 레코드의 개수가 recordsRemaining 보다 적다면
그 다음 읽어온 records 가 없기 때문에 아래 코드와 같이 루프가 종료되게 됩니다.
CompletedFetch<K, V> records = completedFetches.peek();
if (records == null) break;
결론
컨슈머의 polling 과정을 따라가 보며 max.poll.records 설정은
MessageListener 가 한번에 처리할 수 있는 레코드의 개수가 아니라
컨슈머가 브로커에게 메시지를 가져오기 위해 polling 요청으로 가져올 수 있는 레코드의 최대 개수라는것을 알았습니다.
컨슈머는 poll 요청으로 max.poll.records 개수만큼 레코드를 한번에 가져온 후
내부적으로 리스너에게 단건 혹은 배치로 전달하게 됩니다.
문서에도 polling 시 가져오는 최대 개수라고 명시되어있지만 저는 그것을 그대로 리스너에게 전달하거나,
혹은 max.poll.records 설정이 동작하지 않나? 라는 잘못된 오해를 하고있었는데요.
이번 기회를 통해 컨슈머의 내부 동작과정을 얕게나마 살펴본것이 많은 도움이 된것 같습니다.
감사합니다.
Reference
'Kafka' 카테고리의 다른 글
| Kafka 와 Redis 의 Pub/Sub 비교 (0) | 2024.04.29 |
|---|