-
Kafka idleBetweenPolls를 활용한 의도적 메시지 처리 지연 구현(Spring)Kafka 2025. 1. 14. 00:01반응형
https://www.flaticon.com/free-icons/duration Duration icons created by Hilmy Abiyyu A.
0. 케이스
Kafka가 메시지를 처리하는 속도는 메시징 시스템 중에서 가장 빠른 편이다.
더보기
KafkaPulsar RabbitMQ
(Mirrored)Peak Throughput
(MB/s)605
MB/s305
MB/s38
MB/sp99 Latency
(ms)5 ms
(200 MB/s load)25 ms
(200 MB/s load)1 ms*
(reduced 30 MB/s load)https://www.confluent.io/ko-kr/blog/kafka-fastest-messaging-system/
Benchmarking RabbitMQ vs Kafka vs Pulsar Performance
A complete benchmark of RabbitMQ, Kafka, and Pulsar to determine performance, throughput, and latency at scale. View the comparison results!
www.confluent.io
다만 이러한 메시징 시스템이 빠르게 이벤트를 처리한다고 하더라도, 뒤쪽의 DB나 API의 처리량이 따라오지 못하면 문제가 발생하게 된다. DB가 처리량을 버티지 못하여 병목현상이 발생하는 현상 등이 그 예시이다. 기본적으로 시스템 자원을 늘려준다면 원활한 서비스가 가능하겠지만 현실적인 문제로 그렇지 못한 경우가 있다. 이러한 경우 어느정도 처리 속도의 지연이 있더라도 안정적으로 서비스를 유지하는 것이 일차적인 목표이기 때문에 의도적으로 메시지 처리를 지연시켜야 한다.
1. idleBetweenPolls
org.springframework.kafka.listener.KafkaMessageListenerContainer
pollAndInvoke() { doProcessCommits(); fixTxOffsetsIfNeeded(); idleBetweenPollIfNecessary(); ... }
KafkaMessageListenerContainer는 while문을 돌며 pollAndInvoke() 를 호출하여 레코드를 소비한다. 여기서 해당 메소드를 보면 idleBetweenPollIfNecessary()를 호출한다.
private void idleBetweenPollIfNecessary() { long idleBetweenPolls = this.containerProperties.getIdleBetweenPolls(); Collection<TopicPartition> assigned = getAssignedPartitions(); if (idleBetweenPolls > 0 && assigned != null && !assigned.isEmpty()) { idleBetweenPolls = Math.min(idleBetweenPolls, this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll) - 5000); // NOSONAR - less by five seconds to avoid race condition with rebalance if (idleBetweenPolls > 0) { try { ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this, idleBetweenPolls); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IllegalStateException("Consumer Thread [" + this + "] has been interrupted", ex); } } } }
위의 메소드에서는 idleBetweenPolls 값이 0보다 크다면 그 값만큼 Thread.sleep() 한다. 즉 idleBetweenPolls 속성을 부여하면 다음 poll 요청이 이뤄지기 전까지 interval을 부여할 수 있다.
간단하게 나타내면 이렇다.
2. 단순히 값을 설정하면 되는지?
이제 idleBetweenPolls 속성이 어떻게 처리를 지연시키는지를 알았다. 그렇다면 값만 부여한다고 의도한 효과를 낼 수 있을까? 대답은 반은 맞고 반은 틀리다.(...) 정확하게는 더 알아야 하는 것들이 있다.
2.1. max.poll.records
Kafka Consumer는 효율적인 처리를 위해 한 번의 Poll 요청시 다건의 레코드를 한 요청으로 가져온다. 이 때 몇 건의 레코드를 한 요청에 가져올지에 대한 설정이 max.poll.records 속성이다.
즉 idleBetweenPolls 속성으로 poll() 간의 interval을 부여한다고 하더라도, max.poll.records 만큼의 레코드는 기존처럼 처리되고, 그 다음번의 poll()이 지연되어 처리 될 것이다. 각각의 레코드가 텀을 두고 처리되게 하려면 max.poll.records 속성도 적절하게 설정하여야 한다. (한건마다 간격을 부여하려면 1로 설정)
2.2. maxPollInterval
idleBetweenPolls = Math.min(idleBetweenPolls, this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll) - 5000); // NOSONAR - less by five seconds to avoid race condition with
위의 코드는 Thread.sleep() 할 시간을 구하는 코드인데, Math.min의 인자 중에서 첫 번째 인자가 설정에서 입력한 값이고, 두 번째 인자는 시스템에서 maxPollInterval에 따라 계산하는데, rebalncing을 피하기 위해 maxPollInterval과 idleBetweenPolls 의 차이가 5초보다 적은 경우 해당 속성이 적용되지 않는다. 즉 idleBetweenPolls을 큰 값으로 적용할 경우 maxPollInterval 수치도 같이 변경해주어야 한다는 것이다.
2.3. 요약
- idleBetweenPolls 값으로 poll 요청에 간격을 조절할 수 있음
- 특정 건수마다 간격을 부여하고 싶다면 max.poll.records 옵션도 같이 변경해주어야 한다.
- 부여하고 싶은 간격이 25초를 넘는 경우 maxPollInterval 수치도 같이 설정해주어야 적용된다.
3. 적용방법
org.springframework.kafka.listener.ContainerProperties ContainerProperties cp = new ContainerProperties("test"); cp.setIdleBetweenPolls(10000L); // The sleep interval in milliseconds used in the main loop between Consumer.poll(Duration) calls.
* idleBetweenPolls 값은 spring-kafka를 사용하는 Java Configuration 에서만 설정 가능하다.
Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5분 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // 1건 DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>(props);
4.마치며
사실 위와 같은 처리는 일시적인 것이고, Kafka의 성능도 완전히 활용을 못하기 때문에 하면서도 현타(?)가 왔다.
(자원좀 늘려주세요...)덕분에 내부 코드를 조금이라도 더 들여다 보게 된 것을 긍정적으로 생각하려고 한다.반응형'Kafka' 카테고리의 다른 글
Kafka Producer 커스터마이징하기 #2 (0) 2024.05.11 Kafka Producer 커스터마이징하기 #1 (0) 2024.05.11