[Kafka] ์ปจ์๋จธ ์คํ์ ์๋์ผ๋ก ์ปค๋ฐํ๊ธฐ
๊ฐ์
๋ฉ์์ง ์์ค์ ๋ฐฉ์งํ๊ธฐ ์ํด ๋ฉ์์ง ์ฒ๋ฆฌ๊ฐ ๋ฌธ์ ์์ด ์๋ฃ๋์์ ๊ฒฝ์ฐ์๋ง commit์ ์ํํ๋๋ก, ์ปจ์๋จธ์ offset commit์ ์๋์ผ๋ก ์ค์ ํ๋ ๊ฒฝ์ฐ๊ฐ ์ฌ๋ฌ ์กด์ฌํฉ๋๋ค. ํ์ง๋ง ๋จ์ํ auto.offset.commit๋ง false๋ก ์ง์ ํ๋ ๊ฒฝ์ฐ์๋ ์ํ๋ ๋ฐฉํฅ์ผ๋ก ๋์ํ์ง ์์ ์๋ ์์ต๋๋ค.
์ด๋ฒ ๊ฒ์๊ธ์์๋ ์๋ ์ปค๋ฐ์ ๋์ ๊ณผ์ ๊ณผ ์ฃผ์์ , ๊ทธ๋ฆฌ๊ณ ์ปค๋ฐ์ ์๋์ผ๋ก ์ ์ดํ๊ธฐ ์ํ auto.offset.commit ์ค์ ๊ณผ ack-mode ์ค์ ์ ๋ํด ์์๋ณด๊ฒ ์ต๋๋ค.
auto.offset.commit
Kafka์ ์ปจ์๋จธ๋ ์ฝ์ ๋ฉ์์ง์ ์์น๋ฅผ ์ถ์ ํ๊ธฐ ์ํด offset์ commit ํ๋ ์ญํ ์ ๋ด๋นํฉ๋๋ค. ์ด offset์ Kafka์ ๋ด๋ถ ํ ํฝ์ธ __consumer_offsets์ ์ ์ฅ๋๋ฉฐ, ์ปจ์๋จธ๋ commit์ด ๋ฐ์ํ ๋๋ง๋ค ์ด offset ๊ฐ์ ๊ฐฑ์ ํฉ๋๋ค. ์ดํ ์ปจ์๋จธ๋ ์ด offset ๊ฐ์ ์ฐธ์กฐํ์ฌ ๋ค์์ ์ฒ๋ฆฌํ ๋ ์ฝ๋๋ฅผ ์ฝ์ด์ต๋๋ค.
์ปจ์๋จธ๊ฐ offset์ commit ํ๋ ๋ฐฉ์์๋ ์๋ ์ปค๋ฐ๊ณผ ์๋ ์ปค๋ฐ์ด ์์ต๋๋ค. auto commit์ auto.offset.commit ์ค์ ์ด true์ผ ๋ ํ์ฑํ๋๋ฉฐ, ๊ธฐ๋ณธ์ ์ผ๋ก ์ด ๊ฐ์ true๋ก ์ค์ ๋์ด ์์ต๋๋ค.
๋ง์ฝ ์๋ ์ปค๋ฐ์ด ํ์ฑํ๋์ด ์์ ๊ฒฝ์ฐ, poll() ๋ฉ์๋๊ฐ ์คํ๋์์ ๋ ์ํ ์๊ฐ์ด auto.commit.interval.ms ์ค์ ๊ฐ์ ์ด๊ณผํ๋ ๊ฒฝ์ฐ ์๋์ผ๋ก offset์ด commit ๋ฉ๋๋ค.
๋ฐ๋ผ์ ์ปจ์๋จธ๋ poll์ ์์ฒญํ ๋๋ง๋ค auto.commit.interval.ms์ ์ ์ํ ์ปค๋ฐ ํ ์๊ฐ์ธ์ง ์๋์ง ์ฒดํฌํ๊ฒ ๋๊ณ , poll ์์ฒญ์ผ๋ก ๊ฐ์ ธ์จ ๋ง์ง๋ง ์คํ์ ์ ์ปค๋ฐํฉ๋๋ค.
์ค์ ๋ก Kafka์ ์์ค ์ฝ๋๋ฅผ ์ดํด๋ณด๋ฉด, poll() ๋ฉ์๋ ๋ด๋ถ์์ maybeAutoCommitOffsetsAsync() ๋ฉ์๋๊ฐ ํธ์ถ๋์ด ๋น๋๊ธฐ์ ์ผ๋ก auto commit์ด ์ผ์ด๋๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค. - ConsumerCoordinator.java #L1214
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
์ ๋ฉ์๋์ ๋์ ๊ณผ์ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
- ์๋ ์ปค๋ฐ ํ์ฑํ ์ฌ๋ถ ํ์ธ: autoCommitEnabled ๊ฐ์ ํตํด ์๋ ์ปค๋ฐ ๊ธฐ๋ฅ์ด ํ์ฑํ๋์ด ์๋์ง ํ์ธํฉ๋๋ค.
- ํ์ด๋จธ ์ ๋ฐ์ดํธ (nextAutoCommitTimer.update(now)): ํ์ฌ ์๊ฐ(now)์ ๊ธฐ์ค์ผ๋ก ๋ค์ ์๋ ์ปค๋ฐ์ด ์์ ๋ ํ์ด๋จธ๋ฅผ ์ ๋ฐ์ดํธํฉ๋๋ค. update ๋ฉ์๋๋ ํ์ฌ ์๊ฐ๊ณผ ๋ง์ง๋ง ์คํ์ ์ปค๋ฐ ์๊ฐ ์ฌ์ด์ ๊ฒฝ๊ณผ ์๊ฐ์ ๊ณ์ฐํ์ฌ, ๋ค์ ์๋ ์ปค๋ฐ ์คํ๊น์ง ๋จ์ ์๊ฐ์ ์กฐ์ ํฉ๋๋ค. ์ด ๊ณผ์ ์ ์๋ ์ปค๋ฐ ์ฃผ๊ธฐ๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํ ๊ฒ์ผ๋ก, ์ค์ ์ปค๋ฐ ์์ ์ด ์์๋๊ธฐ ์ ์ ํ์ฌ ์๊ฐ๊ณผ ์ค์ ๋ ์ปค๋ฐ ๊ฐ๊ฒฉ์ ๊ธฐ๋ฐ์ผ๋ก ํ์ด๋จธ ์ํ๋ฅผ ์ต์ ํํฉ๋๋ค.
- ํ์ด๋จธ ๋ง๋ฃ ์ฌ๋ถ ํ์ธ (nextAutoCommitTimer.isExpired()): ์ ๋ฐ์ดํธ๋ ํ์ด๋จธ๋ฅผ ๊ฒ์ฌํ์ฌ ์ค์ ๋ ์๋ ์ปค๋ฐ ๊ฐ๊ฒฉ์ด ๋ง๋ฃ๋์๋์ง ํ์ธํฉ๋๋ค. ๋ง์ฝ ํ์ด๋จธ๊ฐ ๋ง๋ฃ๋์๋ค๋ฉด (์ฆ, auto.commit.interval.ms์ ์ค์ ๋ ์๊ฐ์ด ๊ฒฝ๊ณผํ๋ค๋ฉด), ๋ค์ ๋จ๊ณ๋ก ์งํํฉ๋๋ค.
- ํ์ด๋จธ ๋ฆฌ์ (nextAutoCommitTimer.reset(autoCommitIntervalMs)): ํ์ด๋จธ๋ฅผ ๋ฆฌ์ ํ์ฌ ๋ค์ ์๋ ์ปค๋ฐ ์์ ์ ํ์ฌ ์๊ฐ์ผ๋ก๋ถํฐ ์ค์ ๋ ๊ฐ๊ฒฉ(autoCommitIntervalMs) ์ดํ๋ก ์กฐ์ ํฉ๋๋ค. ์ด๋ ์๋ ์ปค๋ฐ์ด ์คํ๋ ํ ๋ค์ ์๋ ์ปค๋ฐ ์ฃผ๊ธฐ๋ฅผ ์์ํ๊ธฐ ์ํ ์ค๋น ๋จ๊ณ์ ๋๋ค.
- ๋น๋๊ธฐ ์คํ์ ์ปค๋ฐ ์คํ (doAutoCommitOffsetsAsync()): ์ค์ ๋ก ์คํ์ ์ปค๋ฐ์ ๋น๋๊ธฐ์ ์ผ๋ก ์ํํ๋ ๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค. ์ด ๋จ๊ณ์์ ์ปจ์๋จธ๊ฐ ์ต๊ทผ์ ์ฒ๋ฆฌํ ๋ฉ์์ง์ ์คํ์ ์ด Kafka์ ์ปค๋ฐ๋ฉ๋๋ค.
์๋ ์ปค๋ฐ ์ค์ ์ ์ฃผ์์
์๋ ์ปค๋ฐ ์ค์ ์ ํธ๋ฆฌํ์ง๋ง ์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ๋ฐ์ํ ๋ ๋ฉ์์ง์ ์ค๋ณต ์ฒ๋ฆฌ๋ ์ ์ค ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
1. ๋ฉ์์ง ์ ์ค
์๋ ์ปค๋ฐ ์ค์ ์ด 1์ด๋ก ์ง์ ๋ ๊ฒฝ์ฐ๋ฅผ ์์ํด ๋ณด๊ฒ ์ต๋๋ค. ์ปจ์๋จธ๋ ์ ์์ ์ผ๋ก poll() ๋ฉ์๋๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ณ ํด๋น ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.
๊ทธ๋ฌ๋ ๋ฉ์์ง ์ฒ๋ฆฌ ๊ณผ์ ์ด 1์ด๋ฅผ ์ด๊ณผํ๋ ์๊ฐ์ด ๊ฑธ๋ฆด ๊ฒฝ์ฐ ๋ฌธ์ ๊ฐ ๋ฐ์ํฉ๋๋ค. ์๋ฅผ ๋ค์ด ๋ฉ์์ง๋ฅผ DB์ ์ ์ฅํ๊ฑฐ๋ ๋ค๋ฅธ ์๋น์ค์ ์ ๋ฌํ๋ ๊ณผ์ ์์ ๋คํธ์ํฌ ์ง์ฐ์ด๋ ์ฅ์ ๋ฑ ์ด๋ค ์ด์ ๋ก๋ ์ฒ๋ฆฌ ์๊ฐ์ด 1์ด๋ฅผ ์ด๊ณผํ์๋ค๊ณ ๊ฐ์ ํด ๋ด ์๋ค. ์ด๋ฌํ ์ํฉ์์ ๋ฉ์์ง ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋๊ธฐ ์ ์ ์คํ์ ์ปค๋ฐ์ด ์ผ์ด๋๊ณ , ๊ทธ ํ ๋ฉ์์ง ์ฒ๋ฆฌ์์ ์๋ฌ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ ํด๋น ๋ฉ์์ง๋ ๋ค์ ์ฒ๋ฆฌ๋ ์ ์๊ฒ ๋ฉ๋๋ค. ์ฆ ๋ฉ์์ง ์ ์ค์ด ์ผ์ด๋๊ฒ ๋ฉ๋๋ค.
2. ๋ฉ์์ง ์ค๋ณต
๋ค์๊ณผ ๊ฐ์ ์ค์ ์ ๊ฒฝ์ฐ, ๋ฉ์์ง์ ์ค๋ณต ์ฒ๋ฆฌ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
- enable.auto.commit = true
- auto.commit.interval.ms = 5000ms
- max.poll.records = 100
์ปจ์๋จธ๋ poll() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์ต๋ 100๊ฐ์ ๋ ์ฝ๋๋ฅผ ๊ฐ์ ธ์ต๋๋ค. ๊ทธ ํ ์ด ์ค 30๊ฐ์ ๋ ์ฝ๋๊น์ง ์ฒ๋ฆฌํ ์ํ์์ ์ปค๋ฐ์ด ์ด๋ฃจ์ด์ง๊ธฐ ์ ์ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ข ๋ฃ๋๊ฑฐ๋ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ๋ฐ์ํ๋ฉด ์ปจ์๋จธ๋ค์ด ์ฌํ ๋น๋ฉ๋๋ค. ์ดํ ์ปจ์๋จธ๋ ๋ง์ง๋ง์ผ๋ก ์ปค๋ฐ๋ offset๋ถํฐ ๋ค์ ๋ฐ์ดํฐ๋ฅผ polling ํ๊ฒ ๋๋๋ฐ, ์ด๋ฏธ ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌํ๋ 30๊ฐ์ ๋ ์ฝ๋๋ฅผ ๋ค์ ์ค๋ณต ์ฒ๋ฆฌํ๊ฒ ๋ฉ๋๋ค.
๋ฉ์์ง ์ ์ค์ ๊ฒฝ์ฐ Dead Letter Queue(DLT)๋ฅผ ๊ตฌ์ฑํ์ฌ ์ฒ๋ฆฌํ์ง ๋ชปํ ๋ฉ์์ง๋ฅผ ๋ณ๋๋ก ๊ด๋ฆฌํ๊ณ ์ฌ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ผ๋ก ํด๊ฒฐํ ์ ์๊ณ , ๋ฉ์์ง ์ค๋ณต์ ๊ฒฝ์ฐ๋ ๋น์ฆ๋์ค ๋ก์ง ๋ด๋ถ์์ ๋ฉฑ๋ฑ์ฑ์ ๋ณด์ฅํ๋ ๋ก์ง์ ํตํด ํด๊ฒฐํ ์ ์์ต๋๋ค.
ํ์ง๋ง ๋ฉ์์ง ์ฒ๋ฆฌ๋ฅผ ํ์ธํ ํ์ ์๋์ผ๋ก ์ปค๋ฐ ์์ ์ ์ ์ ํ๊ฒ ์กฐ์ ํ๋ค๋ฉด ์ฌ์ ์ ์ด๋ฌํ ๋ฌธ์ ๋ฅผ ์กฐ๊ธ์ด๋๋ง ๋ฐฉ์งํ ์ ์์ต๋๋ค.
์๋ ์ปค๋ฐ ์ค์
Kafka ์ปจ์๋จธ์ ์ปค๋ฐ์ ์๋์ผ๋ก ์ค์ ํ๊ธฐ ์ํด์๋ auto.offset.commit ์ค์ ๋ฟ๋ง ์๋๋ผ ack-mode ์ค์ ๊น์ง ์ด๋ฃจ์ด์ ธ์ผ ํฉ๋๋ค.
๋จผ์ Kafka ์ปจ์๋จธ ์ค์ ์์ ENABLE_AUTO_COMMIT_CONFIG๋ฅผ false๋ก ์ง์ ํ์ฌ ์๋ ์ปค๋ฐ ๊ธฐ๋ฅ์ ๋นํ์ฑํํ ์ ์์ต๋๋ค. enable.auto.commit ๊ธฐ๋ณธ๊ฐ์ true์ด๊ธฐ ๋๋ฌธ์, false๋ก ๋ช
์์ ์ผ๋ก ์ค์ ํด์ผ ์๋ ์ปค๋ฐ ๊ธฐ๋ฅ์ด ๋นํ์ฑํ๋ฉ๋๋ค.
private Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
// ...
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return configs;
}
๋ค์์ผ๋ก @KafkaListener์ ๊ฐ์ ๋ฆฌ์ค๋ ์ปจํ ์ด๋๋ฅผ ์ด์ฉํด ๋ฉ์์ง๋ฅผ ์๋นํ๋ค๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค.
@KafkaListener(topics = "test", groupId = "test")
public void listen(String message) {
System.out.println("Received Message in group 'test': " + message);
}
์์ ๊ฐ์ด ์ค์ ํ๋ฉด ๋ณ๋์ ์ปค๋ฐ์ ์ํํ๋ ์ฝ๋๋ฅผ ์์ฑํ์ง ์์์ผ๋ฏ๋ก, ๋ฉ์์ง๋ ์๋น๋์ง๋ง offset commit์ ๋ฐ์ํ์ง ์์ ๊ฒ์ด๋ผ๊ณ ๊ธฐ๋ํ ์ ์์ต๋๋ค. ๊ทธ๋ฌ๋ ์ค์ ๋ก๋ enable.auto.commit์ false๋ก ์ง์ ํ์์๋ ๋ถ๊ตฌํ๊ณ ์ปค๋ฐ์ด ์ ์์ ์ผ๋ก ์ด๋ฃจ์ด์ง๋ฉฐ Lag๊ฐ ์์ด์ง ์์ต๋๋ค.
๊ทธ ์ด์ ๋ enable.auto.commit=false ์ค์ ์ ์ํด Kafka ํด๋ผ์ด์ธํธ์ ์๋ ์ปค๋ฐ์ ๋นํ์ฑ ๋์ง๋ง, Spring์ ๊ธฐ๋ณธ์ ์ผ๋ก ์คํ์
์ปค๋ฐ์ ํ์ฑํ ์ํ๋ก ๋๊ณ ์๊ธฐ ๋๋ฌธ์
๋๋ค. ๋๋ฌธ์ ์ ์ค์ ์์๋ ์๋ ์ปค๋ฐ ๋์ฒ๋ผ auto.commit.interval.ms์ ์ํฅ์ ๋ฐ์ง๋ ์์ง๋ง ๋ฆฌ์ค๋ ๋ฉ์๋๊ฐ ์๋ฃ๋๋ฉด ์๋์ผ๋ก ์ปค๋ฐ์ด ์ผ์ด๋๊ฒ ๋ฉ๋๋ค.
๋ฐ๋ผ์ ๋ฆฌ์ค๋ ์ปจํ
์ด๋์ Ackmode๊น์ง ๋ณ๋๋ก ์์ ํด์ผ๋ง ์ง์ ํ ์๋ฏธ์ ์๋ ์ปค๋ฐ์ด ์ด๋ฃจ์ด์ง ์ ์์ต๋๋ค. AckMode๋ ์ค์ ์ ์ํ ์ฌ๋ฌ ๋ฐฉ๋ฒ์ด ์์ง๋ง ์๋์ฒ๋ผ ContainerFactory๋ฅผ ํตํด ๊ฐ๋จํ๊ฒ ์ค์ ํ ์ ์์ต๋๋ค.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> testFactory = new ConcurrentKafkaListenerContainerFactory<>();
testFactory.getContainerProperties().setAckMode(AckMode.MANUAL);
testFactory.setConsumerFactory(testConsumerFactory());
return testFactory;
}
AcksMode | ์ค๋ช |
RECORD | ๋ ์ฝ๋ ๋จ์๋ก ํ๋ก์ธ์ฑ ์ดํ ์ปค๋ฐํ๋ค. |
BATCH | poll() ๋ฉ์๋๋ก ํธ์ถ๋ ๋ ์ฝ๋๊ฐ ๋ชจ๋ ์ฒ๋ฆฌ๋ ์ดํ ์ปค๋ฐํ๋ค. |
TIME | ํน์ ์๊ฐ ์ดํ์ ์ปค๋ฐํ๋ค. ์ด ์ต์ ์ ์ฌ์ฉํ ๊ฒฝ์ฐ์๋ ์๊ฐ ๊ฐ๊ฒฉ์ ์ ์ธํ๋ `AckTime`์ต์ ์ ์ค์ ํด์ผ ํ๋ค. |
COUNT | ํน์ ๊ฐ์๋งํผ ๋ ์ฝ๋๊ฐ ์ฒ๋ฆฌ๋ ์ดํ์ ์ปค๋ฐํ๋ค. ์ด ์ต์ ์ ์ฌ์ฉํ ๊ฒฝ์ฐ์๋ ๋ ์ฝ๋ ๊ฐ์๋ฅผ ์ ์ธํ๋ `AckCount`์ต์ ์ ์ค์ ํด์ผ ํ๋ค. |
COUNT_TIME | TIME, COUNT ์ต์ ์ค ๋ง๋ ์กฐ๊ฑด์ด ํ๋๋ผ๋ ๋์ฌ ๊ฒฝ์ฐ ์ปค๋ฐํ๋ค. |
MANUAL | Acknowledgement.acknowledge() ๋ฉ์๋๊ฐ ํธ์ถ๋๋ฉด ๋ค์๋ฒ poll()๋ ์ปค๋ฐํ๋ค. ๋งค๋ฒ acknowledge() ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด BATCH ์ต์ ๊ณผ ๋์ผํ๊ฒ ๋์ํ๋ค. ์ด ์ต์ ์ ์ฌ์ฉํ ๊ฒฝ์ฐ AcknowledgingMessageListener ๋๋ BatchAcknowledgingMessageListener๋ฅผ ๋ฆฌ์ค๋๋ก ์ฌ์ฉํด์ผ ํ๋ค. |
MANUAL_IMMEDIATE | Acknowledgement.acknowledge() ๋ฉ์๋๋ฅผ ํธ์ถํ ์ฆ์ ์ปค๋ฐํ๋ค. ์ด ์ต์ ์ ์ฌ์ฉํ ๊ฒฝ์ฐ AcknowledgingMessageListener ๋๋ BatchAcknowledgingMessageListener๋ฅผ ๋ฆฌ์ค๋๋ก ์ฌ์ฉํด์ผ ํ๋ค. |
Ackmode์ default๋ BATCH์ด๊ธฐ ๋๋ฌธ์, enable.auto.commit=false๋ก ์ง์ ํ๋๋ผ๋ Listener ๋ฉ์๋๊ฐ ์ข ๋ฃ๋ ๋ ์คํ๋ง์์ ์๋์ผ๋ก ์ปค๋ฐ์ด ์ํํ์ฌ ์ปค๋ฐ์ด ์ผ์ด๋๊ฒ ๋ฉ๋๋ค.
๋ฐ๋ผ์ MANUAL ํน์ MANUAL_IMMEDIATE๋ก ์ง์ ํด์ผ์ง๋ง ์๋ ๊ฐ์ ํ๋ ๊ฒ์ฒ๋ผ ๋ฆฌ์ค๋์์ ํ ํฝ์ ์๋นํ์ง๋ง offset commit์ ์ผ์ด๋์ง ์๊ณ , Lag๊ฐ ์์ด๊ฒ ๋ฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ๋ค์๊ณผ ๊ฐ์ด Acknowledgement.acknowledge()๋ฅผ ํธ์ถํ์ฌ ์ปค๋ฐ ์์ ์ ์ง์ ์ ์ผ๋ก ์กฐ์ ํ ์ ์์ต๋๋ค.
@KafkaListener(topics = "test", groupId = "test")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
System.out.println("Received Message in group 'test': " + record.value());
// ๋ฉ์์ง ์ฒ๋ฆฌ ๋ก์ง
// ...
// ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋ ํ์ Acknowledgment๋ฅผ ์ฌ์ฉํ์ฌ ์ปค๋ฐ
acknowledgment.acknowledge();
}