[Spring, Kafka] Spring Kafka Consumer 사용시 오프셋 관리
2021. 12. 30. 19:43ㆍ개발/Java, Spring
Spring Kafka를 이용하여 대용량 데이터를 consuming 하고있는 어플리케이션을 제작하고 있었는데 어플리케이션 오류 등 여러 이유로 consumer가 죽거나 할때 재시작을 해줘야 하는 경우가 있었다.
재시작시 consumer offset이 유지될 경우 엄청나게 밀린 LAG을 처리하기에 기능이 LAG이 해소되는동안 정상동작 못하는 이슈가 있어서 해결이 필요했다.
Spring Kafka 에 ConsumerSeekAware 인터페이스가 존재하는데, 다음과 같은 기능들이 정의되어있다.
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerSeekAware.html
내 케이스의 경우 Spring Application이 시작되며 KafkaConsumer가 파티션에 assign될때 offset을 최후방으로 지정하는것이 필요하였고, 실제로 아래 코드와 같이 작성하였을 경우 각 파티션의 마지막 offset으로 지정되어, Kafka Lag이 바로 해소된 것이 확인 가능하였다.
@Component
public class KafkaConsumer implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
}
'개발 > Java, Spring' 카테고리의 다른 글
[Java] 자바 8 Streams와 ConcurrentModificationException (0) | 2022.01.03 |
---|