ThreadPoolExecutor ๋์์ ๊ดํ ์คํด
๊ฐ์
Java์์ ์ค๋ ๋ํ์ ํจ๊ณผ์ ์ผ๋ก ๊ด๋ฆฌํ ์ ์๊ฒ ํด์ฃผ๋ ThreadPoolExecutor, ๊ทธ๋ฆฌ๊ณ ์ด๋ฅผ ๋์ฑ ์ฝ๊ฒ ์ฌ์ฉํ ์ ์๋๋ก Spring์์ ์ ๊ณตํ๋ ThreadPoolTaskExecutor์ ๋์ ๋ฐฉ์์ ๊ดํด ์ ๊ฐ ๊ทธ๋์ ์๋ชป ์ดํดํ๊ณ ์์๋ ๋ ๊ฐ์ง์ ์คํด๋ฅผ ์ ๋ฆฌํ๊ณ ์ ๊ธ์ ์์ฑํ๊ฒ ๋์์ต๋๋ค.
๋ด์ฉ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
- ๋ณ๋์ ์ค์ ์ ํ์ง ์์ผ๋ฉด ์๋ฒ ์คํ ์์ ์ง์ ํ ์ค๋ ๋ ํ ํฌ๊ธฐ๋งํผ ์ค๋ ๋๊ฐ ์๋์ผ๋ก ์์ฑ๋์ง ์๋๋ค.
- corePoolSize์ maxPoolSize๋ ์๋ก ์๊ด์๋ค.
- ํ์ ํฌ๊ธฐ๋ฅผ ์ง์ ํด์ฃผ์ง ์์ผ๋ฉด maxPoolSize์ ์ฌ์ค์ ์๋ฏธ ์๋ ์ค์ ์ด๋ค.
- corePoolSizeํฌ๊ธฐ ์ด์์ ์์ฒญ์ด ๋ค์ด์จ๋ค๊ณ ํด์ maxPoolSize๋งํผ ์ค๋ ๋๋ฅผ ์์ฑํ์ง ์๋๋ค.
https://github.com/dkswnkk/ThreadPoolExecutorTest
์ฌ์ ์ ์ค๋ ๋ํ์ ํฌ๊ธฐ๋งํผ ์ค๋ ๋๊ฐ ์๋์ผ๋ก ์์ฑ๋์ง ์๋๋ค.
๋จผ์ ์ฒซ ๋ฒ์งธ ์คํด์ ๋ํด ์ดํด๋ด ์๋ค. ์๋์ ๊ฐ์ ์ค์ ์ ๊ฐ์ง ์ค๋ ๋ ํ์ด ์๋ค๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค.
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean(name = "threadPoolTaskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // ๊ธฐ๋ณธ ์ค๋ ๋ ์
executor.setMaxPoolSize(20); // ์ต๋ ์ค๋ ๋ ์(default: Integer.MAX)
executor.setQueueCapacity(11); // ํ ์ฉ๋
executor.setKeepAliveSeconds(60); // ์ ํด ์ค๋ ๋ ์ข
๋ฃ ์๊ฐ(default: 60)
executor.setThreadNamePrefix("Executor-"); // ์ค๋ ๋ ์ด๋ฆ ์ ๋์ฌ
executor.initialize(); // ์ด๊ธฐํ
return executor;
}
}
์์ ๊ฐ์ด ์ค์ ํ์ ๊ฒฝ์ฐ ์๋ฒ ์์(์ ํ๋ฆฌ์ผ์ด์ ์คํ) ์ ์ค๋ ๋ ํ์ corePoolSize์ธ 5๋งํผ ์๋์ผ๋ก ์ค๋ ๋๊ฐ ์์ฑ๋ ๊ฒ์ผ๋ก ๊ธฐ๋ํ์ต๋๋ค. ๊ทธ๋ฌ๋ ์ค์ ๋ก๋ ์ค๋ ๋ ์์ ์ด ์์ฒญ๋ ๋ ํ์ํ ๋งํผ๋ง ์ค๋ ๋๊ฐ ์ค๋ ๋ํ์ ์์ฑ๋๊ธฐ ์์ํฉ๋๋ค.
์ค๋ ๋ ํ์ ๋์์ ํ์ธํ๊ธฐ ์ํ ์ฝ๋๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
@Slf4j
@SpringBootApplication
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(ThreadPoolTest.class, args);
Run run = context.getBean(Run.class);
for (int i = 0; i < 10; i++) {
run.checkPrestartedThreads();
Thread.sleep(1000);
}
}
}
@Slf4j
@Service
@DependsOn("threadPoolTaskExecutor")
public class Run {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Async(value = "threadPoolTaskExecutor")
public void checkPrestartedThreads() {
// ์ค๋ ๋ ํ์ ํ์ฌ ์ค๋ ๋ ์๋ฅผ ํ์ธ
int poolSize = threadPoolTaskExecutor.getPoolSize();
log.info("์ค๋ ๋ ํ์ ํ์ฌ ์ค๋ ๋ ์: {}", poolSize);
}
}
์ ์ฝ๋์ ์ํ ๊ฒฐ๊ณผ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
์ ๋ณด๋ฉด ์ฒ์๋ถํฐ ์ค๋ ๋ ํ์ ์ค๋ ๋ ์๊ฐ corePoolSize์ธ 5๋ก ์ธํ ๋๋ ๊ฒ์ด ์๋๋ผ ํด๋น ์์ ์ด ์์ฒญ๋ ๋ ์ค๋ ๋ ํ์ ์ค๋ ๋๋ฅผ ์์ฑํ๋ฉฐ, ํ ๋ฒ์ corePoolSize๋งํผ์ ์๋ฅผ ์์ฑํ์ง ์๊ณ ํ์ํ ๋งํผ ํ๋์ฉ ์์ฑํ๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ต์ข ์ ์ผ๋ก corePoolSize๋งํผ์ ์ค๋ ๋๊ฐ ์์ฑ๋์์ ๋ ๋ ์ด์ ์์ฑํ์ง ์๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค.
์ฆ, ์ ํ๋ฆฌ์ผ์ด์ ์คํ ์ ์ค๋ ๋ ํ์ corePoolSize๋งํผ ์ค๋ ๋๊ฐ ์๋์ผ๋ก ์์ฑ๋์ง ์์ผ๋ฉฐ, ์์ ์์ฒญ์ด ๋ฐ์ํ ๋ ์ค๋ ๋๊ฐ ์์ฑ๋ฉ๋๋ค. ๋ํ ํ ๋ฒ์ corePoolSize๋งํผ ์์ฑ๋์ง ์๊ณ ์์ฒญ์ ๋ฐ๋ผ ์ ์ง์ ์ผ๋ก ์์ฑ๋๋ฉฐ, ์ค๋ ๋ ํ์ด corePolSize์ ๋๋ฌํ๋ฉด ์ถ๊ฐ์ ์ธ ์ค๋ ๋๋ ์์ฑ๋์ง ์์ต๋๋ค. ๋ฐ๋ผ์ ์๋ฒ ์์ ํ, ์ฒซ ์์ฒญ์ ๊ฒฝ์ฐ์ cold start๊ฐ ์ผ์ด๋ ์ํ๋ ์ฑ๋ฅ์ด ๋์ค์ง ์์ ์๋ ์์ต๋๋ค.
๋ง์ฝ ์ ํ๋ฆฌ์ผ์ด์ ์คํ ์ ์ค๋ ๋ ํ์ corePoolSize ๊ฐ๋งํผ ์ฆ์ ์ฑ์ฐ๊ณ ์ถ๋ค๋ฉด, ์๋์ ๊ฐ์ด ThreadPoolTaskExecutor์ setPrestartAllCoreThreads๋ฅผ true๋ก ์ค์ ํด์ผ ํฉ๋๋ค. ์ด๋ฅผ ํตํด ์ค๋ ๋ ํ์ด ์ด๊ธฐํ๋ ๋(์๋ฒ ์์ ์) corePoolSize์ ์ค์ ๋ ์๋งํผ ์ค๋ ๋๋ฅผ ๋ฏธ๋ฆฌ ์์ฑํ๋๋ก ์ง์ํ ์ ์์ต๋๋ค.
setPrestartAllCoreThreads์ default ๊ฐ์ false์ด๋ฉฐ, Spring์์ ์ ๊ณตํ๋ ThreadPoolTaskExecutor์๋ corePoolSize์ ์ง์ ํ ์๋งํผ ํ ๋ฒ์ ์ค๋ ๋ ํ์ ์ฑ์ฐ๋ setPrestartAllCoreThreads๋ง ์กด์ฌํ์ง๋ง Java์์ ์ ๊ณตํ๋ ThreadTaskExecutor์๋ setPrestartAllCoreThreads ๋ฟ๋ง ์๋๋ผ prestartCoreThread๋ ์กด์ฌํ์ฌ preStartCoreThread๋ฅผ ํธ์ถํ๋ ํ์๋งํผ ์ฌ์ ์ ์ค๋ ๋๋ฅผ ์์ฑํ ์ ์์ต๋๋ค.
๋ฌผ๋ก ์ด ๊ฒฝ์ฐ์๋ corePoolSize๋ฅผ ์ด๊ณผํ๋ ์ค๋ ๋๋ ์์ฑํ์ง ์์ต๋๋ค.
corePoolSizeํฌ๊ธฐ์ maxPoolSizeํฌ๊ธฐ๋ ์๋ก ์๊ด์๋ค.
์ค๋ ๋ ํ์ ๋์ ํ๋ก์ฐ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
์ค๋ ๋ ํ์ corePoolSize์ ์ ํด ์ค๋ ๋๋ฅผ ๊ด๋ฆฌํ๋ allowCoreThreadTimeOut ์ค์ ์ ๋ณ๋๋ก ํ์ง ์์์ ์, corePoolSize์ ์ง์ ๋ ์ ์ดํ์ ์์ฑ๋ ์ค๋ ๋๋ง ๊ธฐ๋ณธ์ ์ผ๋ก ์ ์งํฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ corePoolSize๋ฅผ ์ด๊ณผํ๋ ์์ฒญ์ด ๋ค์ด์ค๋๋ผ๋ ๊ณง๋ฐ๋ก maxPoolSize๋งํผ ์ค๋ ๋๋ฅผ ๋๋ฆฌ์ง ์์ต๋๋ค. maxPoolSize๋งํผ์ ์ค๋ ๋ ์ฆ๊ฐ๋ setQueueCapacity๋ก ์ง์ ๋ ์์ ํ์ ํฌ๊ธฐ๊ฐ ๊ฐ๋ ์ฐผ์ ๋์๋ง ๋ฐ์ํฉ๋๋ค. ์ฆ, ์์ ํ๊ฐ ๊ฝ ์ฐจ์ผ์ง๋ง ์ค๋ ๋ ํ์ ์ค๋ ๋ ์๋ฅผ maxPoolSize๊น์ง ์ฆ๊ฐ์ํต๋๋ค.
๊ธฐ๋ณธ๊ฐ์ผ๋ก queueCapacity์ ํฌ๊ธฐ๋ Integer.MAX_VALUE๋ก ์ค์ ๋์ด ์๊ธฐ ๋๋ฌธ์ ๋ณ๋๋ก queueCapacity๋ฅผ ์ง์ ํ์ง ์์ผ๋ฉด ์ฌ์ค์ ํ์ ํฌ๊ธฐ๊ฐ ๊ฝ ์ฐฐ ์ผ์ด ์์ด ์ค๋ ๋ ํ์ ํฌ๊ธฐ๊ฐ maxPoolSize๊น์ง ์ฆ๊ฐํ๋ ์ํฉ์ด ๋ฐ์ํ์ง ์์ต๋๋ค.
์ค์ ๋ก ์๋์ ๊ฐ์ด queue์ ํฌ๊ธฐ๋ฅผ default๋ก ์ค์ ํ ๊ฒฝ์ฐ, maxPoolSize๋ฅผ 20์ผ๋ก ์ง์ ํ์์๋ ๋ถ๊ตฌํ๊ณ , ์ค๋ ๋ ํ์ ํฌ๊ธฐ๊ฐ 5๋ก ๊ณ์ํด์ ์ ์ง๋๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean(name = "threadPoolTaskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setThreadNamePrefix("Executor-");
executor.initialize();
return executor;
}
}
@Slf4j
@Service
@DependsOn("threadPoolTaskExecutor")
public class Run {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Async(value = "threadPoolTaskExecutor")
public void start() throws InterruptedException {
// ์ค๋ ๋ ํ์ ํ์ฌ ์ค๋ ๋ ์
int poolSize = threadPoolTaskExecutor.getPoolSize();
// ํ์ฌ ํ์ฑ ์ค๋ ๋ ์
int activeCount = threadPoolTaskExecutor.getActiveCount();
// ์์
ํ์ ๋จ์ ์ฉ๋
int remainingCapacity = threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().remainingCapacity();
// ํ์ฌ ์์
ํ์ ๋๊ธฐ ์ค์ธ ์์
์
int queueSize = threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size();
log.info("ํ์ฌ ์ค๋ ๋ ์ด๋ฆ: {}, ํ์ฌ ์ค๋ ๋ ํ์ ์ค๋ ๋ ์: {} ํ์ฌ ํ์ฑ ์ค๋ ๋ ์: {}, ํ์ฌ ์์
ํ์ ๋๊ธฐ ์ค์ธ ์์
์: {}, ์์
ํ์ ๋จ์ ํฌ๊ธฐ: {}",
Thread.currentThread().getName(), poolSize, activeCount, queueSize, remainingCapacity);
Thread.sleep(10000);
}
}
@Slf4j
@SpringBootApplication
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(ThreadPoolTest.class, args);
Run run = context.getBean(Run.class);
for (int i = 0; i < 100; i++) {
run.start();
}
}
}
๋ฐ๋ผ์ ๋ง์ฝ ํ์ ํฌ๊ธฐ๋ฅผ ๋ณ๋๋ก ์ค์ ํ์ง ์์ ๊ฒฝ์ฐ maxPoolSize ์ค์ ์ ์ฌ์ค์ ์๋ฏธ๊ฐ ์๋ ๊ฐ์ด๊ธฐ์, coorPoolSize๋ฅผ ์ ์ ํ๊ฒ ์ค์ ํ๋ ๊ฒ์ด ์ค์ํฉ๋๋ค.