[Spring, Kafka] Spring Kafka Consumer 사용시 오프셋 관리

2021. 12. 30. 19:43개발/Java, Spring

Spring Kafka를 이용하여 대용량 데이터를 consuming 하고있는 어플리케이션을 제작하고 있었는데 어플리케이션 오류 등 여러 이유로 consumer가 죽거나 할때 재시작을 해줘야 하는 경우가 있었다.

 

재시작시 consumer offset이 유지될 경우 엄청나게 밀린 LAG을 처리하기에 기능이 LAG이 해소되는동안 정상동작 못하는 이슈가 있어서 해결이 필요했다.

 

Spring Kafka 에 ConsumerSeekAware 인터페이스가 존재하는데, 다음과 같은 기능들이 정의되어있다.

ConsumerSeekAware에 등록된 메소드들

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerSeekAware.html

 

ConsumerSeekAware (Spring for Apache Kafka 2.8.1 API)

If the container is configured to emit idle container events, this method is called when the container idle event is emitted - allowing a seek operation.

docs.spring.io

 

내 케이스의 경우 Spring Application이 시작되며 KafkaConsumer가 파티션에 assign될때 offset을 최후방으로 지정하는것이 필요하였고, 실제로 아래 코드와 같이 작성하였을 경우 각 파티션의 마지막 offset으로 지정되어, Kafka Lag이 바로 해소된 것이 확인 가능하였다.

@Component
public class KafkaConsumer implements ConsumerSeekAware {
	...
	@Override
	public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
		assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
	}
}

'개발 > Java, Spring' 카테고리의 다른 글

[Java] 자바 8 Streams와 ConcurrentModificationException  (0) 2022.01.03