かとじゅんの技術日誌

技術の話をするところ

S2Chronosでバッチ処理をつくってみよう

1.0.0をリリースした記念にS2Chronosの使い方を簡単に紹介していく記事を書いていきたいと思います.

セットアップ

セットアップ手順は
http://s2chronos.sandbox.seasar.org/ja/install.html
をご覧ください.

サンプルのダウンロードは
http://s2chronos.sandbox.seasar.org/ja/download.html
からお願いします.
コンソール版のサンプルはs2chronos-example-1.0.0.zip
Teeda版のサンプルはs2chronos-teeda-example-1.0.0.zip
SAStruts版のサンプルはs2chronos-sastruts-example-1.0.0.zip
です.
コンソールアプリではスケジューラの起動命令を記述する必要があります.三行で書けますw

	public static void main(String[] args) {
		SingletonS2ContainerFactory.init();
		SingletonS2Container.getComponent(Scheduler.class).process();
		SingletonS2ContainerFactory.destroy();
	}

アプリケーションサーバS2Chronosを内包しウェブアプリとバッチ処理を同居させることができます.ただし,S2ContainerServletより後に起動するようにload-on-startupを調整してください.

    <servlet>
        <servlet-name>chronosServlet</servlet-name>
        <servlet-class>org.seasar.chronos.extension.servlet.S2ChronosServlet</servlet-class>
        <load-on-startup>3</load-on-startup>
    </servlet>

すでに持っているDIできる資産を好きなだけバッチ処理にDIしてください.

バッチ処理はタスククラスを書くことから

ルートパッケージ名+taskに,クラスサフィックスがTaskで終わるクラス名で以下のようなタスククラスを定義しましょう.すると,アプリケーションを起動したときにS2Chronosがタスククラスを見つけて,スケジューラに自動的にスケジュール登録します.このサンプルは,1分ごとにdoExecuteメソッドを呼び出し,ログを出力します.

@Task
@CronTrigger(expression = "0 */1 * * * ?")
public class SampleTask {
    
    private static Logger log = Logger.getLogger(SampleTask .class);
    
    // タスク処理
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

ちなみに,@Taskがついていない,トリガーが指定されていないタスクはスケジューリングされません.(Taskアノテーションがついていても,@Task(autoSchedule=false)の場合はスケジューリングされません)

実行時間はトリガーで指定する

トリガーはタスクを起動する時間を指定する役割を持っています.標準でいくつかのトリガーが用意されていますが,やはり最も利用するのがクローントリガーです.上記の例ではCROND形式の起動条件を指定して1分ごとにタスクが起動するようにしています.

単純なバッチ処理であれば覚えることはこれだけ.次回はスケジューラのスレッドの話をさせてもらいます.

初期化と破棄処理について

スレッドプール周りの話に行く前に,初期化と破棄処理のお話をしておかないとまずいw.
initializeとdestroyという名のメソッドを定義してください.そうするとS2Chronosのスケジューラによって登録された後に,スケジューラがタスクを順番に検索します.その検索時に最初に一度だけinitializeメソッドが実行されます.
そして,破棄メソッドですが,タスククラスのdoExecuteメソッドが終了したら,スケジューラ内部でのタスクの状態管理が実行済みに遷移し,その後一定間隔でスケジューラのキューから削除されます.その削除前に一度だけdestroyメソッドが呼ばれます.(GC的な破棄という意味ではないので誤解なきよう)ただし,destroyメソッドはタスククラスがisReScheduleTaskをtrueで返すか,タスククラスが返すトリガーがisReScheduleTaskをtrueで返す場合は,destroyメソッドは呼ばれません.isReScheduleTaskがtrueの場合は永遠に終わりの来ないタスクという意味合いになります.注意してください.標準では@CronTrigger, CCronTriggerはisReScheduleTaskをtrueで返しますので,destroyメソッドを定義していても呼ばれません.
このあたりも,Teedaをマネて作っているので規約ベースなんですが,今後はT2のようにアノテーションで好きな名前を指定できるようにしたいと思いますw

@Task
@NonDelayTrigger
public class SampleTask {

	private static Logger log = Logger.getLogger(SampleTask .class);

	public void initialize(){
		log.info(this.getClass().getSimpleName() + ":initialize");
	}

	public void doExecute(){
		log.info(this.getClass().getSimpleName() + ":doExecute");
	}

	public void destroy(){
		log.info(this.getClass().getSimpleName() + ":destroy");
	}
}

開始処理と終了処理について

退屈な説明ですが,基本的なことなんでご容赦をw
初期化処理,破棄処理に加えてタスクを実行前と実行後に呼ばれるメソッドを定義できます.startとendというメソッド名で定義してください.下記の例では,呼び出し順序が,initialize→ start → doExecute → end → destroyとなります.
isReScheduleTaskがtrueを返すタスクは,タスクが実行されるごとにstartとendが実行されます.

@Task
@NonDelayTrigger
public class SampleTask {

	private static Logger log = Logger.getLogger(SampleTask .class);

	public void initialize(){
		log.info(this.getClass().getSimpleName() + ":initialize");
	}

	public void start(){
		log.info(this.getClass().getSimpleName() + ":start");
	}

	public void doExecute(){
		log.info(this.getClass().getSimpleName() + ":doExecute");
	}

	public void end(){
		log.info(this.getClass().getSimpleName() + ":end");
	}

	public void destroy(){
		log.info(this.getClass().getSimpleName() + ":destroy");
	}
}

タスクとタスクグループの関係

詳しくは知りませんが,JP1にはジョブとジョブグループが存在します.
ジョブ,つまりバッチ処理の最小単位ですが,ジョブグループはジョブを複数内包することができます.これをそのままクラス表現にすると,コンポジットパターンでJobGroupはJobを複数保持できるクラスということになるかと思います.
このモデルは意図的にS2Chronosでは採用していません.非常に見通しが悪くなるためです.デザイン的には優れていてもソースコードの見通しが悪くなるものはなるべく採用しないというスタンスです.
では,S2Chronosではジョブグループに相当するタスクグループはどう表現するかってことですが,,,

まず,タスククラスでは,複数のタスクメソッドを管理できます.doExecute以外のdoから始まるメソッドを複数定義できます.以下を参照してください.

@Task
@NonDelayTrigger
public class SampleTask {

	private static Logger log = Logger.getLogger(SampleTask .class);

	public void initialize(){
		log.info(this.getClass().getSimpleName() + ":initialize");
	}

	@NextTask("taskA")
	public void start(){
		log.info(this.getClass().getSimpleName() + ":start");
	}

	@NextTask("taskB")
	public void doTaskA(){
		log.info(this.getClass().getSimpleName() + ":doTaskA");
	}

	public void doTaskB(){
		log.info(this.getClass().getSimpleName() + ":doTaskB");
	}

	public void end(){
		log.info(this.getClass().getSimpleName() + ":end");
	}

	public void destroy(){
		log.info(this.getClass().getSimpleName() + ":destroy");
	}
}

この場合,doから始まるタスクメソッドが複数あるのでどこから始めればよいかわかりません.なので,startメソッドにNextTaskアノテーションを使って指定する必要があります.doを省いたメソッド名をキャメル形式で指定してください.また,doTaskAの後に実行するタスクメソッドもdoTaskA自体にNextTaskアノテーションで定義してください.
この方法では,コンパイル時に遷移先のタスクを決定しますが,以下のようにすれば実行時にも指定できます.

@Task
@NonDelayTrigger
public class SampleTask {

	private static Logger log = Logger.getLogger(SampleTask .class);

	public void initialize(){
		log.info(this.getClass().getSimpleName() + ":initialize");
	}

	@NextTask("taskA")
	public void start(){
		log.info(this.getClass().getSimpleName() + ":start");
	}


	public String doTaskA(){
		log.info(this.getClass().getSimpleName() + ":doTaskA");
		return "taskB";
	}

	public void doTaskB(){
		log.info(this.getClass().getSimpleName() + ":doTaskB");
	}

	public void end(){
		log.info(this.getClass().getSimpleName() + ":end");
	}

	public void destroy(){
		log.info(this.getClass().getSimpleName() + ":destroy");
	}
}

戻り値で次に遷移するタスクメソッドを指定すれば実行に遷移先を決定できます.
各タスクメソッドはdoTaskAが完了すれば,doTaskBへと同期的に呼び出されて実行されます.通常の関数呼び出しと変わりません.このタスクメソッドを非同期に呼び出したい場合は以下のようにします.

@Task
@NonDelayTrigger
public class SampleTask {

	private static Logger log = Logger.getLogger(SampleTask .class);

	public void initialize(){
		log.info(this.getClass().getSimpleName() + ":initialize");
	}

	@NextTask("taskA")
	public void start(){
		log.info(this.getClass().getSimpleName() + ":start");
	}

	@NextTask("taskA")
	@JoinTask(JoinType.NoWait)
	public void doTaskA(){
		log.info(this.getClass().getSimpleName() + ":doTaskA");
	}

	public void doTaskB(){
		log.info(this.getClass().getSimpleName() + ":doTaskB");
	}

	public void end(){
		log.info(this.getClass().getSimpleName() + ":end");
	}

	public void destroy(){
		log.info(this.getClass().getSimpleName() + ":destroy");
	}
}

このようにJoinTaskアノテーションでNoWaitを指定するとdoTaskAの終了を待たずに次のdoTaskBに遷移します.ちなみに,先ほどの戻り値にStringを使って次のタスクメソッドに遷移する場合は同期呼び出し,つまりJoinTaskアノテーションでWaitが指定されたのと同じ扱いになります.
同期でも非同期でもすべてのタスクメソッドが終了するとendメソッドが呼ばれます.

このように,ひとつのタスククラスで複数のバッチ処理をメソッド単位で定義できます.1つのバッチ処理でも,複数のバッチ処理でもタスククラスで柔軟に定義できるようにしています.S2Chronosではこれをタスクグループと定義しています.また,タスククラスには複数のタスクグループを定義できます.これは別の機会に紹介します.
同期呼び出しについては通常の関数呼び出しと変わらないわけなので,NextTaskアノテーションを使わずとも記述できますが,タスクメソッドの遷移はコードを書かないというポリシーで設計されています.

スレッドプールの使い方

バッチ処理を効率よく実行するには,マルチスレッド化はかかせません.
ただし,ハードウェアリソースを無視してスレッドを作りすぎるのはあまり賢い方法ではありませんので,ここはスレッドプールを使いましょう.ということで,S2Chronosの内部では,Java5から使えるconcurrentパッケージのスレッドプールを利用しています.
開発者は特別に意識しなくともバッチ処理をスレッドプール上で実行させることが可能となります.
次のようなコードを書くとスレッドプールを明示的に指定できます.threadPoolTypeとthreadPoolSizeプロパティで作成されるスレッドプールが決まります.ThreadPoolTypeは,Java5のconcurrentパッケージに準拠しており,FIXED/CACHED/SINGLE/SCHEDULEDが使えます.

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);
    
    // スレッドプールタイプを返します.
    public ThreadPoolType getThreadPoolType(){
    	return ThreadPoolType.FIXED;
    }
    
    // スレッドプールのサイズを返します.
    public Integer getThreadPoolSize(){
    	return 10;
    }

    // タスク処理
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

また,以下のようにThreadPoolを使ってもOKです.

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);
    
    private TaskThreadPool threadPool = new ThreadPool();
    
    public void initialize(){
    	threadPool.setThreadPoolType(ThreadPoolType.FIXED);
        threadPool.setThreadPoolSize(10);
    }
    
    // スレッドプールを返します.
    public TaskThreadPool getThreadPool(){
    	return threadPool;
    }

    // タスク処理
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

ただ,この状況だとタスクメソッドは同時に実行されるのは1個だけなのでスレッドプールは使い切りませんw
たとえば,以下だと

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);
    
    private TaskThreadPool threadPool = new ThreadPool();
    
    public void initialize(){
    	threadPool.setThreadPoolType(ThreadPoolType.FIXED);
        threadPool.setThreadPoolSize(10);
    }
    
    // スレッドプールを返します.
    public TaskThreadPool getThreadPool(){
    	return threadPool;
    }

    // タスク処理
    @CloneTask(20)
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}

CloneTaskアノテーションでdoExecuteを同時に20個並列実行しようとします.スレッドプールは10個ですので,まず最初に10個が実行され空きが発生しだい残りのタスクメソッドが実行されます.また,CloneTaskアノテーション以外にJoinTaskでNoWaitを指定したタスクがある場合も,指定されたスレッドプール内で並行処理を実行します.

このスレッドプールを複数のタスク間で共有したい場合も以下のようにすれば可能です.
TaskThreadPoolをMapで管理するThreadPoolCacheUtilを用意してください.(標準で用意したほうがいいかな...検討しますw)そこで管理されるスレッドプール情報を複数のタスククラスのthreadPoolプロパティで返せば,スレッドプールを共有するタスククラスを実装できます.

@Task
@NonDelayTrigger
public class BasicTask {
    
    private static Logger log = Logger.getLogger(BasicTask.class);

    // スレッドプールを返します.
    public TaskThreadPool  getThreadPool(){
    	return ThreadPoolCacheUtil.get("basicGroup");
    }

    // タスク処理
    @CloneTask(20)
    public void doExecute() {
        log.info(this.getClass().getSimpleName() + ":doExecute");
    }

}