๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
BackEnd๐ŸŒฑ/Kafka

Spring Boot์—์„œ ์—ฌ๋Ÿฌ Kafka ํด๋Ÿฌ์Šคํ„ฐ ์‚ฌ์šฉํ•˜๊ธฐ

by dkswnkk 2024. 8. 17.

๊ฐœ์š”

ํ”„๋กœ์ ํŠธ์—์„œ ์ผ๋ฐ˜์ ์œผ๋กœ ํ•˜๋‚˜์˜ Kafka ํด๋Ÿฌ์Šคํ„ฐ๋งŒ์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋Œ€๋ถ€๋ถ„์ด์ง€๋งŒ, ๊ฒฝ์šฐ์— ๋”ฐ๋ผ ์„œ๋กœ ๋‹ค๋ฅธ ํ™˜๊ฒฝ์˜ Kafka ํด๋Ÿฌ์Šคํ„ฐ์— ๋™์‹œ์— ์—ฐ๊ฒฐํ•ด์•ผ ํ•  ํ•„์š”๊ฐ€ ์ƒ๊ธธ ์ˆ˜ ์žˆ๋‹ค.

๋‚˜๋Š” ์ด๋ฒˆ์— ์ž‘์—…ํ•˜๋ฉด์„œ ๊ธฐ์กด์˜ ์—ฐ๊ฒฐ๋œ Kafka ํด๋Ÿฌ์Šคํ„ฐ ๋ง๊ณ ๋„ ๋˜ ํ•˜๋‚˜์˜ ์ƒˆ๋กœ์šด ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์—ฐ๊ฒฐํ•ด์•ผ ํ–ˆ๋Š”๋ฐ, ์ž‘์—…์„ ์ง„ํ–‰ํ•˜๋ฉด์„œ ์–ป์—ˆ๋˜ ์ง€์‹์„ ๊ณต์œ ํ•˜๊ณ ์ž ์ž‘์„ฑํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.

 

 

์ปจ์Šˆ๋จธ ์„ค์ •

Spring Boot๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ConcurrentKafkaListenerContainerFactory๋ฅผ ์ž๋™์œผ๋กœ ๊ตฌ์„ฑํ•˜์—ฌ Kafka ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์—ฌ๋Ÿฌ Kafka ํด๋Ÿฌ์Šคํ„ฐ์— ์—ฐ๊ฒฐํ•˜๋ ค๋ฉด ๊ฐ ํด๋Ÿฌ์Šคํ„ฐ์— ๋Œ€ํ•ด ๋ณ„๋„์˜ ConcurrentKafkaListenerContainerFactory ๋นˆ์„ ์ƒ์„ฑํ•˜๊ณ , ํ•ด๋‹น ํด๋Ÿฌ์Šคํ„ฐ์— ๋งž๋Š” ConsumerFactory ๋นˆ์„ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.

์•„๋ž˜๋Š” ๋‘ ๊ฐœ์˜ kafka ํด๋Ÿฌ์Šคํ„ฐ(Kafka-A์™€ Kafka-B)๋ฅผ ์„ค์ •ํ•˜๋Š” ์˜ˆ์‹œ ์ฝ”๋“œ์ด๋‹ค.

kafka-A:
  bootstrap-server: b-2.kafka-a.amazonaws.com:9092,b-1.kafka-a.amazonaws.com:9092...

kafka-B:
  bootstrap-server: b-1.kafka-b.amazonaws.com:9092,b-2.kafka-b.amazonaws.com:9092...
@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka-A.bootstrap-server}")
    private String kafkaABootstrapServers;

    @Value("${kafka-B.bootstrap-server}")
    private String kafkaBBootstrapServers;

    @Bean
    public ConsumerFactory<String, String> kafkaAConsumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaABootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "application.name");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConsumerFactory<String, String> kafkaBConsumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBBootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "application.name");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaAListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaAConsumerFactory());
        
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaBListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaBConsumerFactory());
        
        return factory;
    }
}

์ด์ œ @KafkaListener ์• ๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•ด containerFactory ์†์„ฑ์— ํŠน์ • ํด๋Ÿฌ์Šคํ„ฐ์˜ ๋นˆ์„ ๋ช…์‹œํ•จ์œผ๋กœ์จ, ํ•ด๋‹น ํด๋Ÿฌ์Šคํ„ฐ์— ์—ฐ๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค.

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "topicA", containerFactory = "kafkaAListenerContainerFactory")
    public void consumeFromKafkaA(String message) {
        System.out.println("Received from Kafka-A: " + message);
    }

    @KafkaListener(topics = "topicB", containerFactory = "kafkaBListenerContainerFactory")
    public void consumeFromKafkaB(String message) {
        System.out.println("Received from Kafka-B: " + message);
    }
}

๋นˆ๋งŒ ์ถ”๊ฐ€ํ•ด ์ฃผ๋ฉด ๋˜์–ด์„œ ์ƒ๊ฐ๋ณด๋‹ค ๋˜๊ฒŒ ๊ฐ„๋‹จํ•˜๋‹ค. ๋‹ค๋งŒ ๊ธฐ์กด์—๋Š” @KafkaListener์— containerFactory ์†์„ฑ์„ ๋ช…์‹œํ•˜์ง€ ์•Š๊ณ  ์‚ฌ์šฉํ–ˆ์—ˆ๋Š”๋ฐ, ์ด ๊ฒฝ์šฐ ๊ธฐ๋ณธ์ ์œผ๋กœ ์–ด๋–ค ๋นˆ์ด ์‚ฌ์šฉ๋˜๊ธธ๋ž˜ ์ž‘๋™๋˜๋Š”์ง€ ๊ถ๊ธˆํ–ˆ๋‹ค.

 

containerFactory ์†์„ฑ ๋ฏธ์ง€์ • ์‹œ ๋™์ž‘ ๋ฐฉ์‹

Spring Boot๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ kafkaListenerContainerFactory๋ผ๋Š” ์ด๋ฆ„์˜ ConcurrentKafkaListenerContainerFactory ๋นˆ์„ ์‚ฌ์šฉํ•˜์—ฌ Kafka ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค. ๋”ฐ๋ผ์„œ @KafkaListener ์• ๋…ธํ…Œ์ด์…˜์—์„œ containerFactory๋ฅผ ๋ช…์‹œํ•˜์ง€ ์•Š์œผ๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ๋™์ž‘ํ•œ๋‹ค.

  • ๊ธฐ๋ณธ ๋นˆ ์‚ฌ์šฉ: kafkaListenerContainerFactory๋ผ๋Š” ์ด๋ฆ„์˜ ๋นˆ์ด ์ •์˜๋˜์–ด ์žˆ๋‹ค๋ฉด, @KafkaListener์—์„œ containerFactory๋ฅผ ๋ช…์‹œํ•˜์ง€ ์•Š์•„๋„ ์ด ๋นˆ์ด ์‚ฌ์šฉ๋œ๋‹ค.
  • ๋ช…์‹œ์  ๋นˆ ์‚ฌ์šฉ: ๋งŒ์•ฝ newKafkaListenerContainerFactory๋ผ๋Š” ์ด๋ฆ„์˜ ๋นˆ์„ ๋ณ„๋„๋กœ ์ •์˜ํ•˜๊ณ , @KafkaListener ์• ๋…ธํ…Œ์ด์…˜์—์„œ containerFactory = "newKafkaListenerContainerFactory"๋กœ ๋ช…์‹œํ–ˆ๋‹ค๋ฉด ํ•ด๋‹น ๋นˆ์ด ์‚ฌ์šฉ๋œ๋‹ค.
  • ๋นˆ ๋ฏธ์ •์˜ ์‹œ ์˜ˆ์™ธ ๋ฐœ์ƒ: kafkaAListenerContainerFactory์™€ kafkaBListenerContainerFactory ๊ฐ™์€ ์ด๋ฆ„์˜ ๋นˆ์„ ๋“ฑ๋กํ•œ ์ƒํƒœ์—์„œ @KafkaListener์—์„œ containerFactory๋ฅผ ๋ช…์‹œํ•˜์ง€ ์•Š์œผ๋ฉด, ๊ธฐ๋ณธ์ ์œผ๋กœ ์‚ฌ์šฉ๋  kafkaListenerContainerFactory ๋นˆ์„ ์ฐพ์„ ์ˆ˜ ์—†๋‹ค๋Š” NoSuchBeanDefinitionException์ด ๋ฐœ์ƒํ•œ๋‹ค.

๊ณต์‹๋ฌธ์„œ์—์„œ ๊ธฐ๋ณธ ๋นˆ ๋„ค์ž„์ด kafkaListenerContainerFactory์ž„์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html

 

 

ํ”„๋กœ๋“€์„œ ์„ค์ •

ํ”„๋กœ๋“€์„œ ์„ค์ •๋„ ์ปจ์Šˆ๋จธ ์„ค์ •๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ๊ฐ ํด๋Ÿฌ์Šคํ„ฐ์— ๋Œ€ํ•ด ๋ณ„๋„์˜ ProducerFactory ๋นˆ์„ ์ƒ์„ฑํ•˜๊ณ , ํ•ด๋‹น ํด๋Ÿฌ์Šคํ„ฐ์— ๋งž๋Š” KafkaTemplate์„ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka-A.bootstrap-server}")
    private String kafkaABootstrapServers;

    @Value("${kafka-B.bootstrap-server}")
    private String kafkaBBootstrapServers;

    @Bean
    public ProducerFactory<String, String> kafkaAProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaABootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ProducerFactory<String, String> kafkaBProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBBootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaATemplate() {
        return new KafkaTemplate<>(kafkaAProducerFactory());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaBTemplate() {
        return new KafkaTemplate<>(kafkaBProducerFactory());
    }
}

์‚ฌ์šฉ์„ ํ•  ๋•Œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค(๊ฐ„๋‹จํ•˜๊ฒŒ ์„ค๋ช…ํ•˜๊ธฐ ์œ„ํ•ด ํ•„๋“œ ์ฃผ์ž…์„ ์‹œ์ผฐ๋‹ค).

@Component
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaATemplate;

    @Autowired
    private KafkaTemplate<String, String> kafkaBTemplate;

    public void sendMessageToKafkaA(String topic, String message) {
        kafkaATemplate.send(topic, message);
        System.out.println("Sent to Kafka-A: " + message);
    }

    public void sendMessageToKafkaB(String topic, String message) {
        kafkaBTemplate.send(topic, message);
        System.out.println("Sent to Kafka-B: " + message);
    }
}

์ด ์„ค์ •์„ ํ†ตํ•ด kafka-A์™€ kafka-B ํด๋Ÿฌ์Šคํ„ฐ์— ๊ฐ๊ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ƒํ™ฉ์— ๋”ฐ๋ผ ์ ์ ˆํ•œ KafkaTemplate์„ ์„ ํƒํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

๋งŒ์•ฝ KafkaProducerService์—์„œ ์„ค์ •๋˜์ง€ ์•Š์€ kafkaCTemplate์„ ์ฃผ์ž…๋ฐ›์œผ๋ ค ํ•œ๋‹ค๋ฉด, KafkaProducerConfig์— ํ•ด๋‹น ๋นˆ์ด ์ •์˜๋˜์–ด ์žˆ์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— NoSuchBeanDefinitionException์ด ๋ฐœ์ƒํ•˜๊ฒŒ ๋œ๋‹ค. ๋งŒ์•ฝ ํŠน์ • ๋นˆ์ด ์ •์˜๋˜์–ด ์žˆ์ง€ ์•Š์€ ๊ฒฝ์šฐ, ๊ธฐ๋ณธ์œผ๋กœ ์‚ฌ์šฉํ•  KafkaTemplate์„ ์ง€์ •ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด @Primary ์• ๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•˜์—ฌ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

@Bean
@Primary
public KafkaTemplate<String, String> defaultKafkaTemplate() {
    return new KafkaTemplate<>(kafkaAProducerFactory());
}

์ด ๊ฒฝ์šฐ kafkaTemplate์ด ๊ธฐ๋ณธ์ ์œผ๋กœ ์‚ฌ์šฉ๋˜๋ฉฐ, ๋‹ค๋ฅธ KafkaTemplate์ด ๋ช…์‹œ์ ์œผ๋กœ ์ฃผ์ž…๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ๊ธฐ๋ณธ ๋นˆ์œผ๋กœ ์„ ํƒ๋œ๋‹ค. ์ฆ‰ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ผ์น˜ํ•˜์ง€ ์•Š๋Š” ๋นˆ ๋„ค์ž„์˜ KafkaTemplate์„ ์ฃผ์ž…๋ฐ›๋”๋ผ๋„ @Primary๋กœ ์ง€์ •๋œ defaultKafkaTemplate์ด ์ฃผ์ž…๋œ๋‹ค.

@Service
public class KafkaProducerService {

    /**
     * ์ด ํ•„๋“œ์˜ ์ด๋ฆ„์ด ์‹ค์ œ๋กœ ์กด์žฌํ•˜๋Š” ๋นˆ ์ด๋ฆ„๊ณผ ์ผ์น˜ํ•˜์ง€ ์•Š๋”๋ผ๋„,
     * @Primary๋กœ ์ง€์ •๋œ KafkaTemplate์ด ์ฃผ์ž…๋œ๋‹ค.
     */
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("Sent to Kafka-A: " + message);
    }
}

๋‹ค๋งŒ ์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ์—๋Š” ์ž˜๋ชป๋œ KafkaTemplate์ด ์ฃผ์ž…๋  ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์œผ๋ฏ€๋กœ, ์‹ค์ˆ˜๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ๋งˆ์ปค ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋“ฑ ํœด๋จผ ์—๋Ÿฌ๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์„ ๊ณ ๋ คํ•ด ๋ณด๋Š” ๊ฒƒ๋„ ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค.

๋Œ“๊ธ€