Spring Boot์์ ์ฌ๋ฌ Kafka ํด๋ฌ์คํฐ ์ฌ์ฉํ๊ธฐ
๊ฐ์
ํ๋ก์ ํธ์์ ์ผ๋ฐ์ ์ผ๋ก ํ๋์ Kafka ํด๋ฌ์คํฐ๋ง์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ๊ฐ ๋๋ถ๋ถ์ด์ง๋ง, ๊ฒฝ์ฐ์ ๋ฐ๋ผ ์๋ก ๋ค๋ฅธ ํ๊ฒฝ์ Kafka ํด๋ฌ์คํฐ์ ๋์์ ์ฐ๊ฒฐํด์ผ ํ ํ์๊ฐ ์๊ธธ ์ ์๋ค.
๋๋ ์ด๋ฒ์ ์์ ํ๋ฉด์ ๊ธฐ์กด์ ์ฐ๊ฒฐ๋ Kafka ํด๋ฌ์คํฐ ๋ง๊ณ ๋ ๋ ํ๋์ ์๋ก์ด ํด๋ฌ์คํฐ๋ฅผ ์ฐ๊ฒฐํด์ผ ํ๋๋ฐ, ์์ ์ ์งํํ๋ฉด์ ์ป์๋ ์ง์์ ๊ณต์ ํ๊ณ ์ ์์ฑํ๊ฒ ๋์๋ค.
์ปจ์๋จธ ์ค์
Spring Boot๋ ๊ธฐ๋ณธ์ ์ผ๋ก ConcurrentKafkaListenerContainerFactory๋ฅผ ์๋์ผ๋ก ๊ตฌ์ฑํ์ฌ Kafka ๋ฆฌ์ค๋ ์ปจํ ์ด๋๋ฅผ ์์ฑํ๋ค. ๊ทธ๋ฌ๋ ์ฌ๋ฌ Kafka ํด๋ฌ์คํฐ์ ์ฐ๊ฒฐํ๋ ค๋ฉด ๊ฐ ํด๋ฌ์คํฐ์ ๋ํด ๋ณ๋์ ConcurrentKafkaListenerContainerFactory ๋น์ ์์ฑํ๊ณ , ํด๋น ํด๋ฌ์คํฐ์ ๋ง๋ ConsumerFactory ๋น์ ์ค์ ํด์ผ ํ๋ค.
์๋๋ ๋ ๊ฐ์ kafka ํด๋ฌ์คํฐ(Kafka-A์ Kafka-B)๋ฅผ ์ค์ ํ๋ ์์ ์ฝ๋์ด๋ค.
kafka-A:
bootstrap-server: b-2.kafka-a.amazonaws.com:9092,b-1.kafka-a.amazonaws.com:9092...
kafka-B:
bootstrap-server: b-1.kafka-b.amazonaws.com:9092,b-2.kafka-b.amazonaws.com:9092...
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka-A.bootstrap-server}")
private String kafkaABootstrapServers;
@Value("${kafka-B.bootstrap-server}")
private String kafkaBBootstrapServers;
@Bean
public ConsumerFactory<String, String> kafkaAConsumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaABootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "application.name");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConsumerFactory<String, String> kafkaBConsumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBBootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "application.name");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaAListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaAConsumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaBListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaBConsumerFactory());
return factory;
}
}
์ด์ @KafkaListener ์ ๋ ธํ ์ด์ ์ ์ฌ์ฉํด containerFactory ์์ฑ์ ํน์ ํด๋ฌ์คํฐ์ ๋น์ ๋ช ์ํจ์ผ๋ก์จ, ํด๋น ํด๋ฌ์คํฐ์ ์ฐ๊ฒฐํ ์ ์๋ค.
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "topicA", containerFactory = "kafkaAListenerContainerFactory")
public void consumeFromKafkaA(String message) {
System.out.println("Received from Kafka-A: " + message);
}
@KafkaListener(topics = "topicB", containerFactory = "kafkaBListenerContainerFactory")
public void consumeFromKafkaB(String message) {
System.out.println("Received from Kafka-B: " + message);
}
}
๋น๋ง ์ถ๊ฐํด ์ฃผ๋ฉด ๋์ด์ ์๊ฐ๋ณด๋ค ๋๊ฒ ๊ฐ๋จํ๋ค. ๋ค๋ง ๊ธฐ์กด์๋ @KafkaListener์ containerFactory ์์ฑ์ ๋ช ์ํ์ง ์๊ณ ์ฌ์ฉํ์๋๋ฐ, ์ด ๊ฒฝ์ฐ ๊ธฐ๋ณธ์ ์ผ๋ก ์ด๋ค ๋น์ด ์ฌ์ฉ๋๊ธธ๋ ์๋๋๋์ง ๊ถ๊ธํ๋ค.
containerFactory ์์ฑ ๋ฏธ์ง์ ์ ๋์ ๋ฐฉ์
Spring Boot๋ ๊ธฐ๋ณธ์ ์ผ๋ก kafkaListenerContainerFactory๋ผ๋ ์ด๋ฆ์ ConcurrentKafkaListenerContainerFactory ๋น์ ์ฌ์ฉํ์ฌ Kafka ๋ฆฌ์ค๋ ์ปจํ ์ด๋๋ฅผ ์์ฑํ๋ค. ๋ฐ๋ผ์ @KafkaListener ์ ๋ ธํ ์ด์ ์์ containerFactory๋ฅผ ๋ช ์ํ์ง ์์ผ๋ฉด ์๋์ ๊ฐ์ด ๋์ํ๋ค.
- ๊ธฐ๋ณธ ๋น ์ฌ์ฉ: kafkaListenerContainerFactory๋ผ๋ ์ด๋ฆ์ ๋น์ด ์ ์๋์ด ์๋ค๋ฉด, @KafkaListener์์ containerFactory๋ฅผ ๋ช ์ํ์ง ์์๋ ์ด ๋น์ด ์ฌ์ฉ๋๋ค.
- ๋ช ์์ ๋น ์ฌ์ฉ: ๋ง์ฝ newKafkaListenerContainerFactory๋ผ๋ ์ด๋ฆ์ ๋น์ ๋ณ๋๋ก ์ ์ํ๊ณ , @KafkaListener ์ ๋ ธํ ์ด์ ์์ containerFactory = "newKafkaListenerContainerFactory"๋ก ๋ช ์ํ๋ค๋ฉด ํด๋น ๋น์ด ์ฌ์ฉ๋๋ค.
- ๋น ๋ฏธ์ ์ ์ ์์ธ ๋ฐ์: kafkaAListenerContainerFactory์ kafkaBListenerContainerFactory ๊ฐ์ ์ด๋ฆ์ ๋น์ ๋ฑ๋กํ ์ํ์์ @KafkaListener์์ containerFactory๋ฅผ ๋ช ์ํ์ง ์์ผ๋ฉด, ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ๋ kafkaListenerContainerFactory ๋น์ ์ฐพ์ ์ ์๋ค๋ NoSuchBeanDefinitionException์ด ๋ฐ์ํ๋ค.
๊ณต์๋ฌธ์์์ ๊ธฐ๋ณธ ๋น ๋ค์์ด kafkaListenerContainerFactory์์ ํ์ธํ ์ ์๋ค.
ํ๋ก๋์ ์ค์
ํ๋ก๋์ ์ค์ ๋ ์ปจ์๋จธ ์ค์ ๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก ๊ฐ ํด๋ฌ์คํฐ์ ๋ํด ๋ณ๋์ ProducerFactory ๋น์ ์์ฑํ๊ณ , ํด๋น ํด๋ฌ์คํฐ์ ๋ง๋ KafkaTemplate์ ์ค์ ํด์ผ ํ๋ค.
@Configuration
public class KafkaProducerConfig {
@Value("${kafka-A.bootstrap-server}")
private String kafkaABootstrapServers;
@Value("${kafka-B.bootstrap-server}")
private String kafkaBBootstrapServers;
@Bean
public ProducerFactory<String, String> kafkaAProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaABootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> kafkaBProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBBootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaATemplate() {
return new KafkaTemplate<>(kafkaAProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaBTemplate() {
return new KafkaTemplate<>(kafkaBProducerFactory());
}
}
์ฌ์ฉ์ ํ ๋๋ ๋ค์๊ณผ ๊ฐ์ด ์ฌ์ฉํ๋ฉด ๋๋ค(๊ฐ๋จํ๊ฒ ์ค๋ช ํ๊ธฐ ์ํด ํ๋ ์ฃผ์ ์ ์์ผฐ๋ค).
@Component
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaATemplate;
@Autowired
private KafkaTemplate<String, String> kafkaBTemplate;
public void sendMessageToKafkaA(String topic, String message) {
kafkaATemplate.send(topic, message);
System.out.println("Sent to Kafka-A: " + message);
}
public void sendMessageToKafkaB(String topic, String message) {
kafkaBTemplate.send(topic, message);
System.out.println("Sent to Kafka-B: " + message);
}
}
์ด ์ค์ ์ ํตํด kafka-A์ kafka-B ํด๋ฌ์คํฐ์ ๊ฐ๊ฐ ๋ฉ์์ง๋ฅผ ์ ์กํ ์ ์์ผ๋ฉฐ, ์ํฉ์ ๋ฐ๋ผ ์ ์ ํ KafkaTemplate์ ์ ํํ์ฌ ์ฌ์ฉํ ์ ์๋ค.
๋ง์ฝ KafkaProducerService์์ ์ค์ ๋์ง ์์ kafkaCTemplate์ ์ฃผ์ ๋ฐ์ผ๋ ค ํ๋ค๋ฉด, KafkaProducerConfig์ ํด๋น ๋น์ด ์ ์๋์ด ์์ง ์๊ธฐ ๋๋ฌธ์ NoSuchBeanDefinitionException์ด ๋ฐ์ํ๊ฒ ๋๋ค. ๋ง์ฝ ํน์ ๋น์ด ์ ์๋์ด ์์ง ์์ ๊ฒฝ์ฐ, ๊ธฐ๋ณธ์ผ๋ก ์ฌ์ฉํ KafkaTemplate์ ์ง์ ํ๊ณ ์ถ๋ค๋ฉด ์๋์ ๊ฐ์ด @Primary ์ ๋ ธํ ์ด์ ์ ์ฌ์ฉํ์ฌ ์ง์ ํ ์ ์๋ค.
@Bean
@Primary
public KafkaTemplate<String, String> defaultKafkaTemplate() {
return new KafkaTemplate<>(kafkaAProducerFactory());
}
์ด ๊ฒฝ์ฐ kafkaTemplate์ด ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ๋๋ฉฐ, ๋ค๋ฅธ KafkaTemplate์ด ๋ช ์์ ์ผ๋ก ์ฃผ์ ๋์ง ์์ ๊ฒฝ์ฐ ๊ธฐ๋ณธ ๋น์ผ๋ก ์ ํ๋๋ค. ์ฆ ๋ค์๊ณผ ๊ฐ์ด ์ผ์นํ์ง ์๋ ๋น ๋ค์์ KafkaTemplate์ ์ฃผ์ ๋ฐ๋๋ผ๋ @Primary๋ก ์ง์ ๋ defaultKafkaTemplate์ด ์ฃผ์ ๋๋ค.
@Service
public class KafkaProducerService {
/**
* ์ด ํ๋์ ์ด๋ฆ์ด ์ค์ ๋ก ์กด์ฌํ๋ ๋น ์ด๋ฆ๊ณผ ์ผ์นํ์ง ์๋๋ผ๋,
* @Primary๋ก ์ง์ ๋ KafkaTemplate์ด ์ฃผ์
๋๋ค.
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent to Kafka-A: " + message);
}
}
๋ค๋ง ์ด๋ฌํ ๊ฒฝ์ฐ์๋ ์๋ชป๋ KafkaTemplate์ด ์ฃผ์ ๋ ๊ฐ๋ฅ์ฑ์ด ์์ผ๋ฏ๋ก, ์ค์๋ฅผ ๋ฐฉ์งํ๊ธฐ ์ํด ๋ง์ปค ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ ํด๋์ค๋ฅผ ์ฌ์ฉํ๋ ๋ฑ ํด๋จผ ์๋ฌ๋ฅผ ์ค์ผ ์ ์๋ ๋ฐฉ๋ฒ์ ๊ณ ๋ คํด ๋ณด๋ ๊ฒ๋ ์ข์ ๊ฒ ๊ฐ๋ค.