개발/Java, Spring(2)
-
[Java] 자바 8 Streams와 ConcurrentModificationException
dataMap.forEach((k, v) -> { func(v); dataMap.remove(k); }); Spring Application을 만드는데 동시성 보장이 필요해 ConcurrentHashMap을 사용하였는데, Streams를 사용한 다음 로직에서 ConcurrentModificationException이 발생했다. 발생 원인을 확인해보니 Collection.forEach(Stream도 collection)의 로직 동안은 데이터가 immutable 해야한다. 따라서 수정을 감지한 즉시 ConcurrentModificationException을 던지며 프로그램을 멈춘다. 즉 ConcurrentHashMap과 상관없이, forEach와 연관되어있는 문제였다. for(Map.Entry e : dat..
2022.01.03 -
[Spring, Kafka] Spring Kafka Consumer 사용시 오프셋 관리
Spring Kafka를 이용하여 대용량 데이터를 consuming 하고있는 어플리케이션을 제작하고 있었는데 어플리케이션 오류 등 여러 이유로 consumer가 죽거나 할때 재시작을 해줘야 하는 경우가 있었다. 재시작시 consumer offset이 유지될 경우 엄청나게 밀린 LAG을 처리하기에 기능이 LAG이 해소되는동안 정상동작 못하는 이슈가 있어서 해결이 필요했다. Spring Kafka 에 ConsumerSeekAware 인터페이스가 존재하는데, 다음과 같은 기능들이 정의되어있다. https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerSeekAware.html ConsumerSeekAware (Sprin..
2021.12.30