Programming > Java

[Thread - Runnable] ThreadPool Executor 쓰레드활용

#multitasking#다중화#쓰레드#Thread#비동기#Async

내용

  1. Thread 를 실행하는 2가지 방법
    1. Runnable I/F를 Thread 생성자에 인자로 넣어 실행
    2. Thread를 상속한 클래스를 만들어 실행
  2. Runnable vs Callable
    1. Callable는 반환값이 없는 Runnable에 결과값을 추가한다.

 

1. Thread 를 실행하는 2가지 방법

  • 쓰레드는 종료되면 재사용을 할수없다;
  • 비동기 처리를 할때 매번 새로운 인스턴스를 생성해 쓰레드를 기동한다.

 

  • 쓰레드 인스턴스를 생성하면, 쓰레드 전용의 메모리영역이 할당된다.(가상머신이 해준다)
  • 1000개의 쓰레드를 실행하면 1000개의 메모리영역이 확보된다.

※ 쓰레드에서 시스템콜이나 리소스를 사용하거나 하면 비용이 많이드는 단점..
   이를 보완한게 쓰레드풀 (Executors 사용)

쓰레드를 실행하는 방법은 Runnable I/F를 구현해 Thread를 생성하는 방법과 Thread를 직접 상속해서 실행하는 방법이 있다.

Runnable 인터페이스로 구현체로 Thread를 생성해 실행

public static void main(String[] args) throws InterruptedException {
              
       Runnable runnable = new Runnable() {
             @Override
             public void run() {
                    System.out.println("Thread : " + Thread.currentThread().getId());
             }
       };
      
       for(int ii=0;ii<10;ii++) {
             Thread.sleep(10);
             runnable.run();
       }
}     

Runnable를 구현한 클래스로 Thread 를 생성해 실행

public class MultiThreadMain {

       public static void main(String[] args) throws InterruptedException {
      
             System.out.println("Starting.......");
             for(int ii=0;ii<100;ii++) {
                    Runnable runnable = new MyMultiRunnable(ii);
                   
                    Thread.sleep(10);
                    new Thread(runnable).start();
             }
       }
}

class MyMultiRunnable implements Runnable {

       int id;
      
       public MyMultiRunnable(int id) {
             this.id = id;
       }

       @Override
       public void run() {
             System.out.println(String.format("Running NO_%05d", id));
       }
}

Thread실행 2가지 방법

※ ① Runnable구현체로 Thread생성해 실행, ② Thread를 상속한 클래스를 실행

public class BasicSample {
    
    public static void main(String[] args) throws InterruptedException {
        
        for (int i = 0; i < 10; i++) {
            new MultiExt().start();
            new Thread(new MultiImp()).start();
        }
    }
}

/**
 * (1) Thread를 상속해서 구현
 *
 * MultiExt obj = new MultiExt();
 * obj.start();
 *
 * @author user
 *
 */
class MultiExt extends Thread {

    @Override
    public void run() {
       
        System.out.println("Thread_" + Thread.currentThread().getId());
    }
   
}

/**
 * (2) Runnable 인터페이스를 구현-실행시 Thread 객체생성이 필요
 *
 * Thread obj = new Thread(new MultiImp());
 * obj.start();
 *
 *
 * @author user
 *
 */
class MultiImp implements Runnable {

    @Override
    public void run() {
        System.out.println("Runnable_" + Thread.currentThread().getId());
    }   
}

 

2. Runnable vs Callable

Callable는 반환값을 갖는다.

① Runnable 은 결과값없이 실행만 한다.

/**
 * Runnable interface 는 Thread 의 인자로 전달되어 구현된다.
 * 실행은 Thread 의 start() 메소드로 실행
 *
 */
new Thread(new Runnable() {
   
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+ " call : " + LocalTime.now());
    }
} ).start();

/**
 * 동일한 내용의 Lambda식
 */
new Thread(()->{
    System.out.println(Thread.currentThread().getName()+ " call : " + LocalTime.now());
}).start();

 

② Callable는 결과값을 반환한다.
  FutureTask 는 Runnable 과 Callable를 상속하며 내부에서 run()메소드 내에서 Callable를 사용한다.

/**
 *
 * Callable interface를 FutureTask 의 인자로 전달해 구현한다.
 * FutureTask.get() 을 통해 처리가 완료될때까지 기다렸다 결과값을 얻는다.
 *
 */
FutureTask<String> ft = new FutureTask<String>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return Thread.currentThread().getName()+ " call : " + LocalTime.now();
    }
});
new Thread(ft).start();
System.out.println(ft.get());    // 결과를 기다렸다 출력
/**

 * 동일한 내용의 Lambda식
 */
FutureTask<String> ft1 = new FutureTask<String>(()->{
    return Thread.currentThread().getName()+ " call : " + LocalTime.now();
});
new Thread(ft1).start();
System.out.println(ft1.get());    // 결과를 기다렸다 출력

③ ExecutorService.submit() 의 인자로 Callable 전달해 결과값 취득

/**
 *
 * Callable interface 를 ExecutorService의 submit()인자로 전달해 구현
 * Future.get() 을 통해 처리가 완료될때까지 기다렸다 결과값을 얻는다.
 *
 * Executors.newSingleThreadExecutor();
 * Executors.newFixedThreadPool(100);
 * Executors.newCachedThreadPool();
 * Executors.newScheduledThreadPool(int);
 * Executors.newWorkStealingPool(int parallelism);
 *
 */
ExecutorService es = Executors.newSingleThreadExecutor();
Future<String> future = es.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return Thread.currentThread().getName()+ " call : " + LocalTime.now();
    }
});
System.out.println(future.get());    // 결과를 기다렸다 출력

/**
 * 동일한 내용의 Lambda식
 */
Future<String> future1 = es.submit(()->{
    return Thread.currentThread().getName()+ " call : " + LocalTime.now();
});
System.out.println(future1.get());    // 결과를 기다렸다 출력
 

 

ExecutorService 를 실행해 쓰레드 재사용

1) 종류
생성 내용
Executors.newSingleThreadExecutor() Thread를 한개만 생성해 재사용한다.
Executors.newFixedThreadPool(num) Thread를 num 개 갯수 만큼 생성해 재사용한다.
Executors.newCachedThreadPool() 필요에따라 Thread를 생성해 재사용한다. (60초)
2) 정지하기위한 메소드 설명

※ shutdown 을 하지 않으면, 모근 Task가 끝나도 신규 Task를 대기중인 상태의 대기쓰레드가 남아있는 상태가 지속된다. 결국 JVM을 종료할 수 없는 상태가 되기도 한다.
※ Thread Pool을 생성했으면 반드시 shutdown 하는 것을 잊지 말것

메소드 내용
shutdown Thread Pool은 신규Task를 더이상 받지 않는다.
shutdown() 이후에 처리를 넘겨받아도 예외가 발생한다.
실행중인 Task는 끝날때까지 처리를 계속한다.
모든 Task가 종료되면 자연히 종료된다.
shutdownNow 강제종료를 시작한다.
실행중인 Task에게는 interrupt하고, 실행대기중인 Task는 리터값을 설정 
awaitTermination 모든 Task가 종료 또는 지정시간이 경과할때까지 처리를 Block(정지)한다.
 그래도 처리중인 Task가 종료되지 않을경우에는 shutdownNow()를 call 하는것을 고려할 것

 

public class PoolExecutorsMain {

       public static void main(String[] args) throws Exception {

             싱글쓰레드();

             고정쓰레드풀();

             필요따라쓰레드생성();
       }

       public static void 필요따라쓰레드생성() throws InterruptedException {
             // ====================================
             // 필요에따라 쓰레드를 신규작성하면서
             // 일정기간은 쓰레드를 재사용하는 방법
             //
             // 60초간 사용안한 쓰레드는 폐기하고,
             // 60초 이전 쓰레드는 재사용한다.
             // ====================================
             ExecutorService exec = Executors.newCachedThreadPool();
             try {
                    for (int ii = 0; ii < 10; ii++) {
                           //Thread.sleep(300);
                           exec.submit(new MyExecutorRunnable(ii));
                    }
             } finally {
                    exec.shutdown();
                    exec.awaitTermination(1, TimeUnit.MINUTES);
             }
       }

       public static void 고정쓰레드풀() throws InterruptedException {
             // ====================================
             // 5개의 쓰레드 인스턴스를 Pool해 놓은것
             // 5개를 돌아가며 재사용한다.
             // 지정한 인스턴스가 병렬처리를 한다.
             // ====================================
             ExecutorService exec = Executors.newFixedThreadPool(5);
             try {
                    for (int ii = 0; ii < 10; ii++) {
                           Thread.sleep(300);
                           exec.submit(new MyExecutorRunnable(ii));
                    }
             } finally {
                    exec.shutdown();
                    exec.awaitTermination(1, TimeUnit.MINUTES);
             }
       }

       public static void 싱글쓰레드() throws InterruptedException {
             // ====================================
             // Thread를 1개만 생성해 5회 태스크를 실행한다.
             // 순차적으로 실행한다.
             // ====================================
             ExecutorService exec = Executors.newSingleThreadExecutor();
             try {
                    for (int ii = 0; ii < 5; ii++) {
                           Thread.sleep(500);
                           exec.submit(new MyExecutorRunnable(ii));
                    }
             } finally {
                    exec.shutdown();
                    exec.awaitTermination(1, TimeUnit.MINUTES);
             }
       }

}

class MyExecutorRunnable implements Runnable {

       int id;

       public MyExecutorRunnable(int id) {
             this.id = id;
       }

       @Override
       public void run() {
             // Thread ID 출력
             System.out.println(String.format("ID_%05d :", id) + Thread.currentThread().getId());

       }
}

 

결과값을 취득하기 위한 쓰레드 풀

메인쓰레드에서 Sub쓰레드의 결과값을 취득하기 위해서는 Callable를 사용해 Future인터페이스를 통해 취득한다.

public class PoolExecutorsMain {

       public static void main(String[] args) throws Exception {

             ExecutorService exec = Executors.newFixedThreadPool(3);
            
             List<Future<String>> list = new ArrayList<Future<String>>();
             for(int ii=0;ii<10;ii++) {
                    Future<String> future = exec.submit(new MyCallable(ii));
                   
                    // 결과값을 받기위해 list에 담아둔다.
                    list.add(future);
             }

             // 여기서 Thread는 종료
             exec.shutdown();   

             for(Future<String> future: list) {
                    String id = future.get();
                   
                    //---------------------------
                    // 아래와 같이 지정한 시간만큼 실행결과를 기다리는 처리를 설정가능
                    //    지정된 시간을 넘어서면 TimeoutException 이 발생한다.
                    //---------------------------
                    // future.get(timeout, unit)
                    // future.get(1, TimeUnit.SECONDS); < 1초간
                    System.out.println("RESULT : " + id);
             }
       }
}

class MyCallable implements Callable<String> {

       int id;
       public MyCallable(int id) {
             this.id = id;
       }

       @Override
       public String call() throws Exception {
             // 테스트에서는 Thread를 반환한다.
             System.out.println(String.format("MyCallable : %03d / ", id) + Thread.currentThread().getId());
             return String.format("%03d / ", id) + Thread.currentThread().getId();
       }
}