-
Kafka Producer 커스터마이징하기 #1Kafka 2024. 5. 11. 04:13반응형
0. Maven Dependency 정의
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
1. Bean 주입
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; // Kafka Broker host (VM argument or properties 값을 사용) @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, String.class); // String, ByteArray, JSON, Custom... configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, String.class); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // all, 0 , 1 configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // true로 설정 시 중복 메시지 전송 X // 여기에 커스텀하게 여러 개의 설정을 추가할 수 있다. return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
ProducerFactory 에 config 설정 시 원하는 환경에 따라 다양한 설정들을 추가할 수 있다. 설정 가능한 값들이 많기 때문에 필요한 설정이 존재하는지는 확인해야 한다.
https://kafka.apache.org/documentation/#producerconfigs
2. 전송 테스트
@RestController public class ProducerController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public String sendMsg(@RequestParam String topic, @RequestParam String message) { kafkaTemplate.send(topic, message); // SendResult<byte[],byte[]> result = kafkaTemplate.send(producerRecord).get(); 처럼 전송 결과 응답 받을 수 있다. return "Message sent to Kafka topic: " + topic; } }
Reference :
https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html#kafka-template
반응형'Kafka' 카테고리의 다른 글
Kafka Producer 커스터마이징하기 #2 (0) 2024.05.11