読者です 読者をやめる 読者になる 読者になる

かとじゅんの技術日誌

技術の話をするところ

スレッドプールの使い方

バッチ処理を効率よく実行するには,マルチスレッド化はかかせません.
ただし,ハードウェアリソースを無視してスレッドを作りすぎるのはあまり賢い方法ではありませんので,ここはスレッドプールを使いましょう.ということで,S2Chronosの内部では,Java5から使えるconcurrentパッケージのスレッドプールを利用しています.
開発者は特別に意識しなくともバッチ処理をスレッドプール上で実行させることが可能となります.
次のようなコードを書くとスレッドプールを明示的に指定できます.threadPoolTypeとthreadPoolSizeプロパティで作成されるスレッドプールが決まります.ThreadPoolTypeは,Java5のconcurrentパッケージに準拠しており,FIXED/CACHED/SINGLE/SCHEDULEDが使えます.

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);
    
    // スレッドプールタイプを返します.
    public ThreadPoolType getThreadPoolType(){
    	return ThreadPoolType.FIXED;
    }
    
    // スレッドプールのサイズを返します.
    public Integer getThreadPoolSize(){
    	return 10;
    }

    // タスク処理
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

また,以下のようにThreadPoolを使ってもOKです.

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);
    
    private TaskThreadPool threadPool = new ThreadPool();
    
    public void initialize(){
    	threadPool.setThreadPoolType(ThreadPoolType.FIXED);
        threadPool.setThreadPoolSize(10);
    }
    
    // スレッドプールを返します.
    public TaskThreadPool getThreadPool(){
    	return threadPool;
    }

    // タスク処理
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

ただ,この状況だとタスクメソッドは同時に実行されるのは1個だけなのでスレッドプールは使い切りませんw
たとえば,以下だと

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);
    
    private TaskThreadPool threadPool = new ThreadPool();
    
    public void initialize(){
    	threadPool.setThreadPoolType(ThreadPoolType.FIXED);
        threadPool.setThreadPoolSize(10);
    }
    
    // スレッドプールを返します.
    public TaskThreadPool getThreadPool(){
    	return threadPool;
    }

    // タスク処理
    @CloneTask(20)
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

CloneTaskアノテーションでdoExecuteを同時に20個並列実行しようとします.スレッドプールは10個ですので,まず最初に10個が実行され空きが発生しだい残りのタスクメソッドが実行されます.また,CloneTaskアノテーション以外にJoinTaskでNoWaitを指定したタスクがある場合も,指定されたスレッドプール内で並行処理を実行します.

このスレッドプールを複数のタスク間で共有したい場合も以下のようにすれば可能です.
TaskThreadPoolをMapで管理するThreadPoolCacheUtilを用意してください.(標準で用意したほうがいいかな...検討しますw)そこで管理されるスレッドプール情報を複数のタスククラスのthreadPoolプロパティで返せば,スレッドプールを共有するタスククラスを実装できます.

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);

    // スレッドプールを返します.
    public TaskThreadPool  getThreadPool(){
    	return ThreadPoolCacheUtil.get("basicGroup");
    }

    // タスク処理
    @CloneTask(20)
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}