devoong2
devoong2
devoong2
전체 방문자
오늘
어제

인기 글

최근 글

  • Category (35)
    • Java (4)
    • Spring (13)
    • JPA (4)
    • DesignPattern (1)
    • 동시성 (Concurrency) (4)
    • 회고 (1)
    • Redis (1)
    • Network (3)
    • Kafka (2)
    • Spring Batch (2)

최근 댓글

반응형
hELLO · Designed By 정상우.
devoong2

devoong2

[Spring Batch] KafkaItemReader 사용방법
Spring Batch

[Spring Batch] KafkaItemReader 사용방법

2024. 4. 23. 23:47
반응형

예제 및 테스트 코드는 github 에서 확인 가능합니다.

- Spring Batch KafkaItemReader 란?
  - KafkaItemReader 예시
    - pollTimeout
    - partitionOffsets
  - KafkaItemReader 주의사항
    - SpringBatch 버전에 따른 offset 관리 이슈
    - KafkaItemReader 실행환경에 따른 offset 최신화

 

Spring Batch KafkaItemReader 란?

안녕하세요. 이번에는 Spring Batch 의 ItemReader 중 하나인 KafkaItemReader 에 대해 알아보겠습니다.
KafkaItemReader 은 Spring Batch 에서 제공하는 ItemReader<T> 를 구현하고 있으며
KafkaConsumer 를 이용해 카프카의 토픽, 파티션에 있는 데이터를 읽어들일 수 ItemReader 중 하나입니다.

토픽에 있는 데이터를 일괄로 처리가 필요한 경우에 KafkaItemReader 를 사용하기 용이합니다.

KafkaItemReader 예시

매일 오전 카프카의 DeadLetterTopic 에 있는 데이터를 조회하고 가공해야 하는 작업이 있다고 가정하겠습니다.
DeadLetterTopic 의 데이터를 읽어들여서 로그로 출력하는 간단한 예제를 만들어보겠습니다.

 

version

- Java 17
- SpringBoot 3.2.4
- SpringBatch 5.1.1

 

build.gradle

implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'

 

application.yml

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/batch
    username: root
    password: 1234
  kafka:
    consumer:
      bootstrap-servers: localhost:29092
    topics:
      dead-letter: dead-letter

 

DeadLetterJobConfig.java

@Configuration
public class DeadLetterJobConfig {
  public static final String JOB_NAME = "deadLetterJob";

  private final Step deadLetterStep;

  public DeadLetterJobConfig(Step deadLetterStep) {
    this.deadLetterStep = deadLetterStep;
  }

  @Bean
  public Job deadLetterJob(final JobRepository jobRepository) {
    return new JobBuilder(JOB_NAME, jobRepository)
      .start(deadLetterStep)
      .build();
  }
}

 

DeadLetterStepConfig.java

@Configuration
public class DeadLetterStepConfig {
  public static final String STEP_NAME = "deadLetterStep";

  private final DeadLetterItemReader deadLetterItemReader;
  private final DeadLetterItemWriter deadLetterItemWriter;

  public DeadLetterStepConfig(DeadLetterItemReader deadLetterItemReader,
                              DeadLetterItemWriter deadLetterItemWriter) {
    this.deadLetterItemReader = deadLetterItemReader;
    this.deadLetterItemWriter = deadLetterItemWriter;
  }

  @Bean
  public Step deadLetterStep(final JobRepository jobRepository,
                             final PlatformTransactionManager platformTransactionManager) {
    return new StepBuilder(STEP_NAME, jobRepository)
      .<String, String>chunk(5, platformTransactionManager)
      .reader(deadLetterItemReader.deadLetterKafkaItemReader())
      .writer(deadLetterItemWriter)
      .build();
  }
}

 

DeadLetterItemReader.java

@Component
public class DeadLetterItemReader {
  private final KafkaProperties kafkaProperties;
  private final SslBundles sslBundles;

  public DeadLetterItemReader(KafkaProperties kafkaProperties, SslBundles sslBundles) {
    this.kafkaProperties = kafkaProperties;
    this.sslBundles = sslBundles;
  }

  @Value("${spring.kafka.topics.dead-letter}")
  private String topic;


  public KafkaItemReader<String, String> deadLetterKafkaItemReader() {
    Properties props = new Properties();
    props.putAll(kafkaProperties.buildConsumerProperties(sslBundles));
    props.put("group.id", "dlt-consumer-group");

    return new KafkaItemReaderBuilder<String, String>()
      .name("deadLetterKafkaItemReader")
      .topic(topic)
      .partitions(0)
      .consumerProperties(props)
      .pollTimeout(Duration.ofSeconds(5L))
      .partitionOffsets(new HashMap<>())
      .saveState(true)
      .build();
  }
}

 

DeadLetterItemWriter.java

@Slf4j
@Component
public class DeadLetterItemWriter implements ItemWriter<String> {

  @Override
  public void write(Chunk<? extends String> items) {
    for (String item : items) {
      log.info("item: {}", item);
    }

    log.info("-----------------------------------");
  }
}

전체적인 Job 의 구성은 다음과 같습니다.
여기서 이번의 주제인 KafkaItemReader 에 대해 조금 더 자세히 살펴보겠습니다.

 

 

DeadLetterReader.java

@Component
public class DeadLetterItemReader {
  private final KafkaProperties kafkaProperties;
  private final SslBundles sslBundles;

  public DeadLetterItemReader(KafkaProperties kafkaProperties, SslBundles sslBundles) {
    this.kafkaProperties = kafkaProperties;
    this.sslBundles = sslBundles;
  }

  @Value("${spring.kafka.topics.dead-letter}")
  private String topic;


  public KafkaItemReader<String, String> deadLetterKafkaItemReader() {
    Properties props = new Properties(); 
    props.putAll(kafkaProperties.buildConsumerProperties(sslBundles)); // (1)
    props.put("group.id", "dlt-consumer-group");

    return new KafkaItemReaderBuilder<String, String>()
      .name("deadLetterKafkaItemReader") // (2)
      .topic(topic) // (3)
      .partitions(0) // (4)
      .consumerProperties(props) // (5)
      .pollTimeout(Duration.ofSeconds(30L)) // (6)
      .partitionOffsets(new HashMap<>()) // (7)
      .saveState(true) // (8)
      .build();
  }
}
  • (1) props.putAll(kafkaProperties.buildConsumerProperties(sslBundles)); : KafkaItemReader 내의 KafkaConsumer 에게 할당할 Consumer 속성을 정의합니다
  • (2) name("deadLetterKafkaItemReader") : Reader 의 인스턴스 이름을 정의합니다.
  • (3) topic(topic) : 읽어들일 토픽을 지정합니다.
  • (4) partitions(0) : 토픽의 파티션을 지정합니다. 매개변수는 가변인자로 되어있기에 여러 토픽을 지정할 수 있습니다.
    ex) partitions(0, 1, 2)
  • (5) consumerProperties(props) : 컨슈머의 속성을 정의합니다.
  • (6) pollTimeout(Duration.ofSeconds(30L)) : 컨슈머의 polling 에 대한 타임아웃입니다. 기본값은 30s 입니다.
  • (7) partitionOffsets(new HashMap<>()) : 파티션의 오프셋에 대한 설정입니다. 기본값은 오프셋을 0부터 읽어들입니다.
    빈 해시맵을 전달하면 컨슈머가 카프에 저장된 offset Id 부터 읽어들입니다.
  • (8) saveState(false) : Reader 의 상태를 저장하는 옵션입니다.
    Reader 가 실패한 지점을 알 수 있습니다. 예제에서는 false 로 사용하겠습니다.

KafkaItemReader 에는 다음과 같은 설정들이 존재합니다. 여기서 두 설정에 조금 더 알아보겠습니다.

 

 

pollTimeout()

pollTimeout() 은 컨슈머의 polling 에 대한 타임아웃 설정이고, 기본값은 30s 입니다.
이 상태로 read() 를 하게 되면 컨슈머는 30s 동안은 계속해서 데이터를 polling 하게 됩니다.

그렇기에 읽어들일 데이터에 대해 fetch.min.bytes, fetch.max.wait.ms, max.poll.records 와 pollTimeout 과 같은 컨슈머 조회에 영향을 주는 옵션들에 대해 고민이 필요합니다.

  • fetch.min.bytes(기본값: 1bytes) : 한번에 가져올 수 있는 최소 사이즈로, 만약 가져오는 데이터가 지정한 사이즈보다 데이터가 누적될 때 까지 기다림
  • fetch.max.wait.ms(기본값: 500ms) : fetch.min.bytes에 설정된 데이터보다 데이터 양이 적은 경우 요청에 응답을 기다리는 최대시간
  • max.poll.records(기본값: 500개) : polling 시 최대로 가져올 수 있는 record 개수

 

partitionOffsets()

partitionOffsets 은 파티션의 오프셋에 대한 설정입니다.

https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/kafka/KafkaItemReader.html#setPartitionOffsets(java.util.Map)

 

문서에서는 다음과 같이 이야기하고있습니다.

이 매핑은 리더에게 각 파티션에서 읽기를 시작할 오프셋을 알려줍니다. 이는 선택 사항이며, 기본값은 각 파티션의 오프셋 0부터 시작됩니다. 빈 맵을 전달하면 판독기가 소비자 그룹 ID에 대해 Kafka에 저장된 오프셋에서 시작됩니다.


그렇다면 KafkaItemReader 내부를 한번 살펴보겠습니다.

 

open() 메소드에서 partitionOffsets 설정이 null 인 경우에는 정말 각 파티션들의 offset 을 0으로 설정하는 것을 볼 수 있습니다.

즉, partitionOffsets 설정을 따로 하지 않는다면 해당 Reader 는 무조건 offset 을 처음부터 읽게 됩니다.

그렇게 되면 매번 배치 실행시 중복 컨슈밍 문제, 데이터가 많아짐에 따른 성능문제를 야기할 수 있습니다.


그래서 offset 관리는 꼭 필요한 설정인데요.

문서에서 안내하는것처럼 빈 맵을 전달하면 if (this.partitionOffsets == null)
조건에 해당하지 않기에 컨슈머의 groupId 에 저장된 offset 정보를 기반으로 데이터를 읽을 수 있습니다.

 

++ 위 조건만 피하면 되기에 빈 맵을 전달하는 대신 인자를 대체할 수 있는 HashTable 을 전달해도 동일하게 동작합니다.

e.g) partitionOffsets(new HashTable<>())

 

KafkaItemReader 주의사항

Spring Batch 버전에 따른 offset 관리 이슈

SpringBatch 4.3.0 이전의 버전에는 KafkaItemReader 의 partitionOffsets 옵션은 존재했지만
해당 옵션을 설정하는 메소드를 지원하지 않았습니다.

그렇게되면 오프셋을 항상 0부터 읽기때문에 오프셋에 대한 관리를 할 수 없게되는 상황이 존재합니다.

그렇다고 저 오프셋관리를 위해 SpringBatch 버전을 올리기에는 어떤 리스크가 존재할지 몰라 위험할 수 있습니다.
그런 경우 상위 버전의 KafkaItemReader 의 코드를 가져와서 만들어 사용하는 방법을 고민해 볼 수 있습니다.

 

CustomKafkaItemReader.java

public class CustomKafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {

  private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";
  private static final long DEFAULT_POLL_TIMEOUT = 30L;
  private List<TopicPartition> topicPartitions;
  private Map<TopicPartition, Long> partitionOffsets; // 상위 버전는 존재
  private KafkaConsumer<K, V> kafkaConsumer;
  private Properties consumerProperties;
  private Iterator<ConsumerRecord<K, V>> consumerRecords;
  private Duration pollTimeout = Duration.ofSeconds(DEFAULT_POLL_TIMEOUT);
  private boolean saveState = true;

  ...
}

 

 

KafkaItemReader 실행환경에 따른 offset 최신화

배치 Job 을 여러번 실행해도 오프셋이 갱신되지 않는 이슈입니다.
Job 을 어떻게 실행시키느냐에 따라 발생할 수 있는 문제인데요.

 

Job 을 실행할때마다 KafkaItemReader 의 Bean 이 새로 로드된다면 문제가 없습니다.
다만 어플리케이션에 실행된 상태에서 이미 Bean 이 띄워진 Reader 에 대해서는 Job을 여러번 실행하게 되어도 처음 실행 시점의 컨슈머의 current offset 을 계속 읽게됩니다.

 

컨트롤러를 이용해 jobLauncher 로 배치를 실행하는 경우를 예로 들 수 있습니다.

좀 더 자세하게 예시를 통해 알아보겠습니다.

 

처음 배치 실행 시 KafkaItemReader 컨슈머의 current offset 은 11입니다.
현재 offset 은 15까지 데이터가 존재하니 KafkaItemReader 는 11부터 15까지의 데이터를 모두 가져오게 됩니다.

 

 

그리고 해당 컨슈머의 current offset 은 아래와 같이 16으로 변경됩니다.

이후 Job 을 다시 실행하게된다면 offset 을 어디서부터 읽어올까요?
current offset 은 16번이니 16번부터 읽어올것으로 생각되지만 실제로는 처음 Bean 이 로드될 때 시점의 11번 offset 부터 읽게됩니다.

 

왜 그럴까요?

 

카프카 컨슈머의 current offset 은 Kafka Broker 내의 Coordinator 에서 관리됩니다.
즉, 카프카 서버에서 관리된다고 볼 수 있습니다.

그리고 KafkaItemReader 의 컨슈머의 경우에는 처음 Bean 이 로드될때 카프카 서버로부터
current offset 을 가져오게 됩니다.
이후 Bean 이 새로 로드되지 않는 이상은 처음 읽어왔던 current offset 이 갱신되지 않는것입니다.

해결방안

해결방안은 Job 실행시마다 Bean 을 새로 로드시켜주면 current offset 을 실행시마다 갱신할 수 있습니다.
바로 SpringBatch 의 LateBinding 을 이용하는건데요 @JobScope, @StepScope 를 이용하여 해당 Job 에 대해 LateBinding 으로 처리하는 것입니다.

@Bean
@JobScope // LateBinding
public Step deadLetterStep(final JobRepository jobRepository,
                            final PlatformTransactionManager platformTransactionManager) {
  return new StepBuilder(STEP_NAME, jobRepository)
    .<String, String>chunk(5, platformTransactionManager)
    .reader(deadLetterItemReader.deadLetterKafkaItemReader())
    .writer(deadLetterItemWriter)
    .build();
}

 

 

지금까지 읽어주셔서 감사합니다.

reference

  • https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/kafka/KafkaItemReader.html
반응형

'Spring Batch' 카테고리의 다른 글

[Spring Batch] BulkInsert 로 성능 개선하기 (feat. TypeSafe)  (0) 2024.12.23
    'Spring Batch' 카테고리의 다른 글
    • [Spring Batch] BulkInsert 로 성능 개선하기 (feat. TypeSafe)
    devoong2
    devoong2
    github 주소: https://github.com/limwoobin

    티스토리툴바