#multitasking#다중화#쓰레드#Thread#비동기#Async
내용
※ 쓰레드에서 시스템콜이나 리소스를 사용하거나 하면 비용이 많이드는 단점..
이를 보완한게 쓰레드풀 (Executors 사용)
쓰레드를 실행하는 방법은 Runnable I/F를 구현해 Thread를 생성하는 방법과 Thread를 직접 상속해서 실행하는 방법이 있다.
public static void main(String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("Thread : " + Thread.currentThread().getId());
}
};
for(int ii=0;ii<10;ii++) {
Thread.sleep(10);
runnable.run();
}
}
public class MultiThreadMain {
public static void main(String[] args) throws InterruptedException {
System.out.println("Starting.......");
for(int ii=0;ii<100;ii++) {
Runnable runnable = new MyMultiRunnable(ii);
Thread.sleep(10);
new Thread(runnable).start();
}
}
}
class MyMultiRunnable implements Runnable {
int id;
public MyMultiRunnable(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println(String.format("Running NO_%05d", id));
}
}
※ ① Runnable구현체로 Thread생성해 실행, ② Thread를 상속한 클래스를 실행
public class BasicSample {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new MultiExt().start();
new Thread(new MultiImp()).start();
}
}
}
/**
* (1) Thread를 상속해서 구현
*
* MultiExt obj = new MultiExt();
* obj.start();
*
* @author user
*
*/
class MultiExt extends Thread {
@Override
public void run() {
System.out.println("Thread_" + Thread.currentThread().getId());
}
}
/**
* (2) Runnable 인터페이스를 구현-실행시 Thread 객체생성이 필요
*
* Thread obj = new Thread(new MultiImp());
* obj.start();
*
*
* @author user
*
*/
class MultiImp implements Runnable {
@Override
public void run() {
System.out.println("Runnable_" + Thread.currentThread().getId());
}
}
Callable는 반환값을 갖는다.
① Runnable 은 결과값없이 실행만 한다.
/**
* Runnable interface 는 Thread 의 인자로 전달되어 구현된다.
* 실행은 Thread 의 start() 메소드로 실행
*
*/
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ " call : " + LocalTime.now());
}
} ).start();
/**
* 동일한 내용의 Lambda식
*/
new Thread(()->{
System.out.println(Thread.currentThread().getName()+ " call : " + LocalTime.now());
}).start();
② Callable는 결과값을 반환한다.
FutureTask 는 Runnable 과 Callable를 상속하며 내부에서 run()메소드 내에서 Callable를 사용한다.
/**
*
* Callable interface를 FutureTask 의 인자로 전달해 구현한다.
* FutureTask.get() 을 통해 처리가 완료될때까지 기다렸다 결과값을 얻는다.
*
*/
FutureTask<String> ft = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName()+ " call : " + LocalTime.now();
}
});
new Thread(ft).start();
System.out.println(ft.get()); // 결과를 기다렸다 출력
/**
* 동일한 내용의 Lambda식
*/
FutureTask<String> ft1 = new FutureTask<String>(()->{
return Thread.currentThread().getName()+ " call : " + LocalTime.now();
});
new Thread(ft1).start();
System.out.println(ft1.get()); // 결과를 기다렸다 출력
③ ExecutorService.submit() 의 인자로 Callable 전달해 결과값 취득
/**
*
* Callable interface 를 ExecutorService의 submit()인자로 전달해 구현
* Future.get() 을 통해 처리가 완료될때까지 기다렸다 결과값을 얻는다.
*
* Executors.newSingleThreadExecutor();
* Executors.newFixedThreadPool(100);
* Executors.newCachedThreadPool();
* Executors.newScheduledThreadPool(int);
* Executors.newWorkStealingPool(int parallelism);
*
*/
ExecutorService es = Executors.newSingleThreadExecutor();
Future<String> future = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName()+ " call : " + LocalTime.now();
}
});
System.out.println(future.get()); // 결과를 기다렸다 출력
/**
* 동일한 내용의 Lambda식
*/
Future<String> future1 = es.submit(()->{
return Thread.currentThread().getName()+ " call : " + LocalTime.now();
});
System.out.println(future1.get()); // 결과를 기다렸다 출력
생성 | 내용 |
---|---|
Executors.newSingleThreadExecutor() | Thread를 한개만 생성해 재사용한다. |
Executors.newFixedThreadPool(num) | Thread를 num 개 갯수 만큼 생성해 재사용한다. |
Executors.newCachedThreadPool() | 필요에따라 Thread를 생성해 재사용한다. (60초) |
※ shutdown 을 하지 않으면, 모근 Task가 끝나도 신규 Task를 대기중인 상태의 대기쓰레드가 남아있는 상태가 지속된다. 결국 JVM을 종료할 수 없는 상태가 되기도 한다.
※ Thread Pool을 생성했으면 반드시 shutdown 하는 것을 잊지 말것
메소드 | 내용 |
---|---|
shutdown | Thread Pool은 신규Task를 더이상 받지 않는다. shutdown() 이후에 처리를 넘겨받아도 예외가 발생한다. 실행중인 Task는 끝날때까지 처리를 계속한다. 모든 Task가 종료되면 자연히 종료된다. |
shutdownNow | 강제종료를 시작한다. 실행중인 Task에게는 interrupt하고, 실행대기중인 Task는 리터값을 설정 |
awaitTermination | 모든 Task가 종료 또는 지정시간이 경과할때까지 처리를 Block(정지)한다. 그래도 처리중인 Task가 종료되지 않을경우에는 shutdownNow()를 call 하는것을 고려할 것 |
public class PoolExecutorsMain {
public static void main(String[] args) throws Exception {
싱글쓰레드();
고정쓰레드풀();
필요따라쓰레드생성();
}
public static void 필요따라쓰레드생성() throws InterruptedException {
// ====================================
// 필요에따라 쓰레드를 신규작성하면서
// 일정기간은 쓰레드를 재사용하는 방법
//
// 60초간 사용안한 쓰레드는 폐기하고,
// 60초 이전 쓰레드는 재사용한다.
// ====================================
ExecutorService exec = Executors.newCachedThreadPool();
try {
for (int ii = 0; ii < 10; ii++) {
//Thread.sleep(300);
exec.submit(new MyExecutorRunnable(ii));
}
} finally {
exec.shutdown();
exec.awaitTermination(1, TimeUnit.MINUTES);
}
}
public static void 고정쓰레드풀() throws InterruptedException {
// ====================================
// 5개의 쓰레드 인스턴스를 Pool해 놓은것
// 5개를 돌아가며 재사용한다.
// 지정한 인스턴스가 병렬처리를 한다.
// ====================================
ExecutorService exec = Executors.newFixedThreadPool(5);
try {
for (int ii = 0; ii < 10; ii++) {
Thread.sleep(300);
exec.submit(new MyExecutorRunnable(ii));
}
} finally {
exec.shutdown();
exec.awaitTermination(1, TimeUnit.MINUTES);
}
}
public static void 싱글쓰레드() throws InterruptedException {
// ====================================
// Thread를 1개만 생성해 5회 태스크를 실행한다.
// 순차적으로 실행한다.
// ====================================
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
for (int ii = 0; ii < 5; ii++) {
Thread.sleep(500);
exec.submit(new MyExecutorRunnable(ii));
}
} finally {
exec.shutdown();
exec.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
class MyExecutorRunnable implements Runnable {
int id;
public MyExecutorRunnable(int id) {
this.id = id;
}
@Override
public void run() {
// Thread ID 출력
System.out.println(String.format("ID_%05d :", id) + Thread.currentThread().getId());
}
}
메인쓰레드에서 Sub쓰레드의 결과값을 취득하기 위해서는 Callable를 사용해 Future인터페이스를 통해 취득한다.
public class PoolExecutorsMain {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future<String>> list = new ArrayList<Future<String>>();
for(int ii=0;ii<10;ii++) {
Future<String> future = exec.submit(new MyCallable(ii));
// 결과값을 받기위해 list에 담아둔다.
list.add(future);
}
// 여기서 Thread는 종료
exec.shutdown();
for(Future<String> future: list) {
String id = future.get();
//---------------------------
// 아래와 같이 지정한 시간만큼 실행결과를 기다리는 처리를 설정가능
// 지정된 시간을 넘어서면 TimeoutException 이 발생한다.
//---------------------------
// future.get(timeout, unit)
// future.get(1, TimeUnit.SECONDS); < 1초간
System.out.println("RESULT : " + id);
}
}
}
class MyCallable implements Callable<String> {
int id;
public MyCallable(int id) {
this.id = id;
}
@Override
public String call() throws Exception {
// 테스트에서는 Thread를 반환한다.
System.out.println(String.format("MyCallable : %03d / ", id) + Thread.currentThread().getId());
return String.format("%03d / ", id) + Thread.currentThread().getId();
}
}