예제 및 테스트 코드는 github 에서 확인 가능합니다.
- Spring Batch KafkaItemReader 란?
- KafkaItemReader 예시
- pollTimeout
- partitionOffsets
- KafkaItemReader 주의사항
- SpringBatch 버전에 따른 offset 관리 이슈
- KafkaItemReader 실행환경에 따른 offset 최신화
Spring Batch KafkaItemReader 란?
안녕하세요. 이번에는 Spring Batch 의 ItemReader 중 하나인 KafkaItemReader
에 대해 알아보겠습니다.KafkaItemReader
은 Spring Batch
에서 제공하는 ItemReader<T>
를 구현하고 있으며KafkaConsumer
를 이용해 카프카의 토픽, 파티션에 있는 데이터를 읽어들일 수 ItemReader 중 하나입니다.
토픽에 있는 데이터를 일괄로 처리가 필요한 경우에 KafkaItemReader
를 사용하기 용이합니다.
KafkaItemReader 예시
매일 오전 카프카의 DeadLetterTopic
에 있는 데이터를 조회하고 가공해야 하는 작업이 있다고 가정하겠습니다.DeadLetterTopic
의 데이터를 읽어들여서 로그로 출력하는 간단한 예제를 만들어보겠습니다.
version
- Java 17
- SpringBoot 3.2.4
- SpringBatch 5.1.1
build.gradle
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
application.yml
spring:
datasource:
url: jdbc:mysql://localhost:3306/batch
username: root
password: 1234
kafka:
consumer:
bootstrap-servers: localhost:29092
topics:
dead-letter: dead-letter
DeadLetterJobConfig.java
@Configuration
public class DeadLetterJobConfig {
public static final String JOB_NAME = "deadLetterJob";
private final Step deadLetterStep;
public DeadLetterJobConfig(Step deadLetterStep) {
this.deadLetterStep = deadLetterStep;
}
@Bean
public Job deadLetterJob(final JobRepository jobRepository) {
return new JobBuilder(JOB_NAME, jobRepository)
.start(deadLetterStep)
.build();
}
}
DeadLetterStepConfig.java
@Configuration
public class DeadLetterStepConfig {
public static final String STEP_NAME = "deadLetterStep";
private final DeadLetterItemReader deadLetterItemReader;
private final DeadLetterItemWriter deadLetterItemWriter;
public DeadLetterStepConfig(DeadLetterItemReader deadLetterItemReader,
DeadLetterItemWriter deadLetterItemWriter) {
this.deadLetterItemReader = deadLetterItemReader;
this.deadLetterItemWriter = deadLetterItemWriter;
}
@Bean
public Step deadLetterStep(final JobRepository jobRepository,
final PlatformTransactionManager platformTransactionManager) {
return new StepBuilder(STEP_NAME, jobRepository)
.<String, String>chunk(5, platformTransactionManager)
.reader(deadLetterItemReader.deadLetterKafkaItemReader())
.writer(deadLetterItemWriter)
.build();
}
}
DeadLetterItemReader.java
@Component
public class DeadLetterItemReader {
private final KafkaProperties kafkaProperties;
private final SslBundles sslBundles;
public DeadLetterItemReader(KafkaProperties kafkaProperties, SslBundles sslBundles) {
this.kafkaProperties = kafkaProperties;
this.sslBundles = sslBundles;
}
@Value("${spring.kafka.topics.dead-letter}")
private String topic;
public KafkaItemReader<String, String> deadLetterKafkaItemReader() {
Properties props = new Properties();
props.putAll(kafkaProperties.buildConsumerProperties(sslBundles));
props.put("group.id", "dlt-consumer-group");
return new KafkaItemReaderBuilder<String, String>()
.name("deadLetterKafkaItemReader")
.topic(topic)
.partitions(0)
.consumerProperties(props)
.pollTimeout(Duration.ofSeconds(5L))
.partitionOffsets(new HashMap<>())
.saveState(true)
.build();
}
}
DeadLetterItemWriter.java
@Slf4j
@Component
public class DeadLetterItemWriter implements ItemWriter<String> {
@Override
public void write(Chunk<? extends String> items) {
for (String item : items) {
log.info("item: {}", item);
}
log.info("-----------------------------------");
}
}
전체적인 Job 의 구성은 다음과 같습니다.
여기서 이번의 주제인 KafkaItemReader
에 대해 조금 더 자세히 살펴보겠습니다.
DeadLetterReader.java
@Component
public class DeadLetterItemReader {
private final KafkaProperties kafkaProperties;
private final SslBundles sslBundles;
public DeadLetterItemReader(KafkaProperties kafkaProperties, SslBundles sslBundles) {
this.kafkaProperties = kafkaProperties;
this.sslBundles = sslBundles;
}
@Value("${spring.kafka.topics.dead-letter}")
private String topic;
public KafkaItemReader<String, String> deadLetterKafkaItemReader() {
Properties props = new Properties();
props.putAll(kafkaProperties.buildConsumerProperties(sslBundles)); // (1)
props.put("group.id", "dlt-consumer-group");
return new KafkaItemReaderBuilder<String, String>()
.name("deadLetterKafkaItemReader") // (2)
.topic(topic) // (3)
.partitions(0) // (4)
.consumerProperties(props) // (5)
.pollTimeout(Duration.ofSeconds(30L)) // (6)
.partitionOffsets(new HashMap<>()) // (7)
.saveState(true) // (8)
.build();
}
}
- (1)
props.putAll(kafkaProperties.buildConsumerProperties(sslBundles));
: KafkaItemReader 내의 KafkaConsumer 에게 할당할 Consumer 속성을 정의합니다 - (2)
name("deadLetterKafkaItemReader")
: Reader 의 인스턴스 이름을 정의합니다. - (3)
topic(topic)
: 읽어들일 토픽을 지정합니다. - (4)
partitions(0)
: 토픽의 파티션을 지정합니다. 매개변수는 가변인자로 되어있기에 여러 토픽을 지정할 수 있습니다.
ex)partitions(0, 1, 2)
- (5)
consumerProperties(props)
: 컨슈머의 속성을 정의합니다. - (6)
pollTimeout(Duration.ofSeconds(30L))
: 컨슈머의 polling 에 대한 타임아웃입니다. 기본값은 30s 입니다. - (7)
partitionOffsets(new HashMap<>())
: 파티션의 오프셋에 대한 설정입니다. 기본값은 오프셋을 0부터 읽어들입니다.
빈 해시맵을 전달하면 컨슈머가 카프에 저장된 offset Id 부터 읽어들입니다. - (8)
saveState(false)
: Reader 의 상태를 저장하는 옵션입니다.
Reader 가 실패한 지점을 알 수 있습니다. 예제에서는 false 로 사용하겠습니다.
KafkaItemReader
에는 다음과 같은 설정들이 존재합니다. 여기서 두 설정에 조금 더 알아보겠습니다.
pollTimeout()
pollTimeout()
은 컨슈머의 polling 에 대한 타임아웃 설정이고, 기본값은 30s 입니다.
이 상태로 read() 를 하게 되면 컨슈머는 30s 동안은 계속해서 데이터를 polling 하게 됩니다.
그렇기에 읽어들일 데이터에 대해 fetch.min.bytes
, fetch.max.wait.ms
, max.poll.records
와 pollTimeout
과 같은 컨슈머 조회에 영향을 주는 옵션들에 대해 고민이 필요합니다.
fetch.min.bytes
(기본값: 1bytes) : 한번에 가져올 수 있는 최소 사이즈로, 만약 가져오는 데이터가 지정한 사이즈보다 데이터가 누적될 때 까지 기다림fetch.max.wait.ms
(기본값: 500ms) : fetch.min.bytes에 설정된 데이터보다 데이터 양이 적은 경우 요청에 응답을 기다리는 최대시간max.poll.records
(기본값: 500개) : polling 시 최대로 가져올 수 있는 record 개수
partitionOffsets()
partitionOffsets
은 파티션의 오프셋에 대한 설정입니다.
문서에서는 다음과 같이 이야기하고있습니다.
이 매핑은 리더에게 각 파티션에서 읽기를 시작할 오프셋을 알려줍니다. 이는 선택 사항이며, 기본값은 각 파티션의 오프셋 0부터 시작됩니다. 빈 맵을 전달하면 판독기가 소비자 그룹 ID에 대해 Kafka에 저장된 오프셋에서 시작됩니다.
그렇다면 KafkaItemReader 내부를 한번 살펴보겠습니다.
open()
메소드에서 partitionOffsets 설정이 null 인 경우에는 정말 각 파티션들의 offset 을 0으로 설정하는 것을 볼 수 있습니다.
즉, partitionOffsets 설정을 따로 하지 않는다면 해당 Reader 는 무조건 offset 을 처음부터 읽게 됩니다.
그렇게 되면 매번 배치 실행시 중복 컨슈밍 문제, 데이터가 많아짐에 따른 성능문제를 야기할 수 있습니다.
그래서 offset 관리는 꼭 필요한 설정인데요.
문서에서 안내하는것처럼 빈 맵을 전달하면 if (this.partitionOffsets == null)
조건에 해당하지 않기에 컨슈머의 groupId 에 저장된 offset 정보를 기반으로 데이터를 읽을 수 있습니다.
++ 위 조건만 피하면 되기에 빈 맵을 전달하는 대신 인자를 대체할 수 있는 HashTable 을 전달해도 동일하게 동작합니다.
e.g) partitionOffsets(new HashTable<>())
KafkaItemReader 주의사항
Spring Batch 버전에 따른 offset 관리 이슈
SpringBatch 4.3.0
이전의 버전에는 KafkaItemReader
의 partitionOffsets
옵션은 존재했지만
해당 옵션을 설정하는 메소드를 지원하지 않았습니다.
그렇게되면 오프셋을 항상 0부터 읽기때문에 오프셋에 대한 관리를 할 수 없게되는 상황이 존재합니다.
그렇다고 저 오프셋관리를 위해 SpringBatch
버전을 올리기에는 어떤 리스크가 존재할지 몰라 위험할 수 있습니다.
그런 경우 상위 버전의 KafkaItemReader
의 코드를 가져와서 만들어 사용하는 방법을 고민해 볼 수 있습니다.
CustomKafkaItemReader.java
public class CustomKafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";
private static final long DEFAULT_POLL_TIMEOUT = 30L;
private List<TopicPartition> topicPartitions;
private Map<TopicPartition, Long> partitionOffsets; // 상위 버전는 존재
private KafkaConsumer<K, V> kafkaConsumer;
private Properties consumerProperties;
private Iterator<ConsumerRecord<K, V>> consumerRecords;
private Duration pollTimeout = Duration.ofSeconds(DEFAULT_POLL_TIMEOUT);
private boolean saveState = true;
...
}
KafkaItemReader 실행환경에 따른 offset 최신화
배치 Job 을 여러번 실행해도 오프셋이 갱신되지 않는 이슈입니다.
Job 을 어떻게 실행시키느냐에 따라 발생할 수 있는 문제인데요.
Job 을 실행할때마다 KafkaItemReader 의 Bean 이 새로 로드된다면 문제가 없습니다.
다만 어플리케이션에 실행된 상태에서 이미 Bean 이 띄워진 Reader 에 대해서는 Job을 여러번 실행하게 되어도 처음 실행 시점의 컨슈머의 current offset 을 계속 읽게됩니다.
컨트롤러를 이용해 jobLauncher 로 배치를 실행하는 경우를 예로 들 수 있습니다.
좀 더 자세하게 예시를 통해 알아보겠습니다.
처음 배치 실행 시 KafkaItemReader 컨슈머의 current offset 은 11입니다.
현재 offset 은 15까지 데이터가 존재하니 KafkaItemReader 는 11부터 15까지의 데이터를 모두 가져오게 됩니다.
그리고 해당 컨슈머의 current offset 은 아래와 같이 16으로 변경됩니다.
이후 Job 을 다시 실행하게된다면 offset 을 어디서부터 읽어올까요?
current offset 은 16번이니 16번부터 읽어올것으로 생각되지만 실제로는 처음 Bean 이 로드될 때 시점의 11번 offset 부터 읽게됩니다.
왜 그럴까요?
카프카 컨슈머의 current offset 은 Kafka Broker 내의 Coordinator 에서 관리됩니다.
즉, 카프카 서버에서 관리된다고 볼 수 있습니다.
그리고 KafkaItemReader
의 컨슈머의 경우에는 처음 Bean 이 로드될때 카프카 서버로부터current offset
을 가져오게 됩니다.
이후 Bean 이 새로 로드되지 않는 이상은 처음 읽어왔던 current offset 이 갱신되지 않는것입니다.
해결방안
해결방안은 Job 실행시마다 Bean 을 새로 로드시켜주면 current offset
을 실행시마다 갱신할 수 있습니다.
바로 SpringBatch 의 LateBinding 을 이용하는건데요 @JobScope, @StepScope
를 이용하여 해당 Job 에 대해 LateBinding 으로 처리하는 것입니다.
@Bean
@JobScope // LateBinding
public Step deadLetterStep(final JobRepository jobRepository,
final PlatformTransactionManager platformTransactionManager) {
return new StepBuilder(STEP_NAME, jobRepository)
.<String, String>chunk(5, platformTransactionManager)
.reader(deadLetterItemReader.deadLetterKafkaItemReader())
.writer(deadLetterItemWriter)
.build();
}
지금까지 읽어주셔서 감사합니다.
reference
'Spring Batch' 카테고리의 다른 글
[Spring Batch] BulkInsert 로 성능 개선하기 (feat. TypeSafe) (0) | 2024.12.23 |
---|