読者です 読者をやめる 読者になる 読者になる

かとじゅんの技術日誌

技術の話をするところ

[Java][並行処理] スレッドセーフにするために考えること 〜応用編 その1〜

スレッドセーフにするために考えること 〜基礎編〜 - じゅんいち☆かとうの技術日誌
基礎編では、スレッドセーフについての基本的な考え方を書きました。

スレッドセーフにするには、アトミック性を保証することが必要でした。この応用編では、その点を注意しながら、実際にドメイン層の振る舞いを持つドメインモデルでどのようなことを気をつければスレッドセーフになるか考えてみたいと思います。考えながら、書いてるのでちょっとおかしいところがあるかも。ご容赦をw

モデルオブジェクトを複数のスレッドで共有して操作してみる

とりあえず、典型的なサンプルとして従業員と部署。。もうやだ〜このサンプル〜。とか言わないのw
以下の従業員は名前と役職、所属部署(複数可)を保持します。スレッドとか気にせずとりあえず適当にモデリング。toStringはデバッグ用にcommons-langのToStringBuilderを使います。

// 従業員
public class Employee extends AbstractEntity {

	public static Employee of(Long id, String name) {
		return new Employee(id, name);
	}

	private final List<Department> departments = Lists.newArrayList();

	private String name;

	private Position position;

	private Employee(Long id, String name) {
		super(id);
		this.name = name;
	}

	public List<Department> getDepartments() {
		return departments;
	}

	public String getName() {
		return name;
	}

	public Position getPosition() {
		return position;
	}

	public void setName(String name) {
		this.name = name;
	}

	public void setPosition(Position position) {
		this.position = position;
	}

	@Override
	public Employee clone() {
		return (Employee) super.clone();
	}

	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.MULTI_LINE_STYLE);
	}

}

Employeeは識別されるので、エンティティにします。とりあえず、今はさらっと読み飛ばしてください。DDDのエンティティとかバリューオブジェクトがわからない人はこちらを読んでみてください。

public abstract class AbstractEntity implements Entity {

	private final Long id;

	public AbstractEntity(Long id) {
		Validate.notNull(id);
		this.id = id;
	}

	@Override
	public AbstractEntity clone() {
		try {
			return (AbstractEntity) super.clone();
		} catch (CloneNotSupportedException e) {
			throw new Error();
		}

	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (getClass() != obj.getClass()) {
			return false;
		}
		AbstractEntity other = (AbstractEntity) obj;
		return id.equals(other.id);
	}

	@Override
	public Long getId() {
		return id;
	}

	@Override
	public int hashCode() {
		return id.hashCode();
	}
}
// エンティティインターフェイス
public interface Entity extends Cloneable {

	Long getId();

}

次は部署。名前を保持します。バリューオブジェクトです。

// 部署
public class Department {

	public static Department of(String name) {
		return new Department(name);
	}

	private final String name;

	private Department(String name) {
		Validate.notNull(name);
		this.name = name;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (getClass() != obj.getClass()) {
			return false;
		}
		Department other = (Department) obj;
		return name.equals(other.name);
	}

	public String getName() {
		return name;
	}

	@Override
	public int hashCode() {
		return name.hashCode();
	}

	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.MULTI_LINE_STYLE);
	}
}

以下は、上記のエンティティ(Employee)をマルチスレッドで操作するサンプルコードです。
testメソッド内のemployeeを2つのスレッドで共有して、各スレッドでエンティティを更新する処理を行います。
EmployeeTask1では、かとう についての一連の属性を設定、EmployeeTask2では、都元 についての一連の属性を設定します。想定では、後に更新したスレッドの勝ちとなるはず。。。さあ、かとうが勝つか、都元が勝つか。。

public class ThreadTest {

	static final Department エロ部 = Department.of("エロ部");
	static final Department 宴会部 = Department.of("宴会部");
	static final Department 企画部 = Department.of("企画部");
	static final Department 技術部 = Department.of("技術部");

	@Test
	public void test() {
		CountDownLatch startLatch = new CountDownLatch(1);

		Employee employee = Employee.of(1L, "名無し");
		List<Thread> threads = new ArrayList<Thread>(4);

		threads.add(new Thread(new EmployeeTask1(employee, startLatch)));
		threads.add(new Thread(new EmployeeTask2(employee, startLatch)));

		for (Thread thread : threads) {
			thread.start();
		}
		startLatch.countDown();
		for (Thread thread : threads) {
			try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		System.out.println(employee);

		if (employee.getName().equals("かとう")) {
			assertThat(employee.getName(), is("かとう"));
			assertThat(employee.getPosition(), is(Position.EROGRAMMER));
			assertThat(employee.getDepartments().get(0), is(エロ部));
			assertThat(employee.getDepartments().get(1), is(技術部));
			assertThat(employee.getDepartments().get(2), is(企画部));

		} else if (employee.getName().equals("都元")) {
			assertThat(employee.getName(), is("都元"));
			assertThat(employee.getPosition(), is(Position.PROGRAMMER));
			assertThat(employee.getDepartments().get(0), is(技術部));
			assertThat(employee.getDepartments().get(1), is(企画部));
			assertThat(employee.getDepartments().get(2), is(宴会部));

		}
	}

	static class EmployeeTask1 extends Task {

		EmployeeTask1(Employee employee, CountDownLatch startLatch) {
			super(employee, startLatch);
		}

		@Override
		public void run() {
			try {
				startLatch.await();
				employee.setName("かとう");
				if (employee.getDepartments().isEmpty()) {
					employee.getDepartments().add(エロ部);
					employee.getDepartments().add(技術部);
					employee.getDepartments().add(企画部);
				} else {
					employee.getDepartments().set(0, エロ部);
					employee.getDepartments().set(1, 技術部);
					employee.getDepartments().set(2, 企画部);
				}
				employee.setPosition(Position.EROGRAMMER);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	static class EmployeeTask2 extends Task {

		EmployeeTask2(Employee employee, CountDownLatch startLatch) {
			super(employee, startLatch);
		}

		@Override
		public void run() {
			try {
				startLatch.await();
				employee.setName("都元");
				if (employee.getDepartments().isEmpty()) {
					employee.getDepartments().add(技術部);
					employee.getDepartments().add(企画部);
					employee.getDepartments().add(宴会部);
				} else {
					employee.getDepartments().set(0, 技術部);
					employee.getDepartments().set(1, 企画部);
					employee.getDepartments().set(2, 宴会部);
				}
				employee.setPosition(Position.PROGRAMMER);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	static abstract class Task implements Runnable {

		protected final Employee employee;
		protected final CountDownLatch startLatch;

		Task(Employee employee, CountDownLatch startLatch) {
			this.employee = employee;
			this.startLatch = startLatch;
		}

	}

}

結果は以下です。assertThatが通りません。都元が技術部2つ、企画部2つ、エロ部に所属し、EROGRAMMERです。大変なことになりました。

test.Employee@4e99353f[
  departments=[test.Department@23394894[
  name=技術部
], test.Department@54bb7759[
  name=企画部
], test.Department@5f989f84[
  name=エロ部
], test.Department@23394894[
  name=技術部
], test.Department@54bb7759[
  name=企画部
]]
  name=都元
  position=EROGRAMMER
  id=1
]

基礎編の知識があれば自明なのですが、EmployeeTask1#runとEmployeeTask2#runは、共有しているemployeeオブジェクトに対して、お互いに割り込んで更新操作を行っているのが原因です。

スレッドセーフにしてみる

それでは、スレッドセーフにしてみますか。
スレッドセーフにするためにあらゆるメソッドに、固有ロックを適用したくなるかもしれません。

	public synchronized String getName() {
		return name;
	}

	public synchronized void setName(String name) {
		this.name = name;
	}

単一の操作でアトミック性を保証する場合はこれで十分だと思いますが、この対策を施しても、今回のプログラムでは実行結果は変わりません。ロックの対象は一連の更新操作です。この一連の操作をトランザクションと呼ぶ場合があります。
これを解決するのに、一番簡単な方法はクライアントサイドロックです。

@Override
public void run() {
	try {
		startLatch.await();
		synchronized (employee) { // employeeでロックする
			employee.setName("かとう");
			if (employee.getDepartments().isEmpty()) {
				employee.getDepartments().add(エロ部);
				employee.getDepartments().add(技術部);
				employee.getDepartments().add(企画部);
			} else {
				employee.getDepartments().set(0, エロ部);
				employee.getDepartments().set(1, 技術部);
				employee.getDepartments().set(2, 企画部);
			}
			employee.setPosition(Position.EROGRAMMER);
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}
@Override
public void run() {
	try {
		startLatch.await();
		synchronized (employee) { // employeeでロックする
			employee.setName("都元");
			if (employee.getDepartments().isEmpty()) {
				employee.getDepartments().add(技術部);
				employee.getDepartments().add(企画部);
				employee.getDepartments().add(宴会部);
			} else {
				employee.getDepartments().set(0, 技術部);
				employee.getDepartments().set(1, 企画部);
				employee.getDepartments().set(2, 宴会部);
			}
			employee.setPosition(Position.PROGRAMMER);
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

この方法はお手軽ではありますが、Employeeの責務としてスレッドセーフは実装されていません。では、スレッドセーフを責務として実装するにはどうすればいいでしょうか。
クラスのインターフェイスを歪めることになりますが、以下のようにアトミック性を保証するメソッドを作ることで可能です。

public synchronized void apply(String name, List<Department> departments) {
	this.name = name;
	this.departments.addAll(departments);
}

根本的には上記と変わりませんが、更新対象のデータをひとまとめにしたUnitOfWorkを使うことでアトミック性を保証する方法もあるでしょう。

public synchronized void apply(UnitOfWork unitOfWork){
	this.name = unitOfWork.getName();
	this.departments.addAll(unitOfWork.getDepartments());
}

他にもいろいろ考えられますが、いずれにも一連の操作(トランザクション)に対してのロック、アンロック操作ができなければならないでしょう。
うーん、そもそも、トランザクションをモデルの責務にすることは適切なのかという疑問が浮上してきます。なんか違う気がしてならないのです。典型的なRDBMSを使うウェブアプリケーションでも、トランザクションを操作するのは永続化層です。テーブルをモデル化したいわゆる”エンティティ”ではありません。エンティティにトランザクション管理させるのは責務として荷が重い感じがします。

エンティティのインスタンスを共有しない戦略

上記から言えることは、エンティティを共有してトランザクションを管理することは適切だろうかということです。同じエンティティをスレッド間で共有しなければ、当然スレッドセーフですし、競り合い状態になりません。共有しないで済むならそれに越したことはないのです。

話が逸れますが、最近、ScalaHaskell, Erlangなどの関数型言語が人気です。並行処理に適しているとか。
その関数型言語の”関数”という概念は、数学的な関数が由来しています。
以下の関数で示すと

y = f(x)

yはxに依存します。xが決まればyが決まるということです。
fがxを2倍にする関数なら以下のようにyが決まります。

x = 2 の時は、y = 2 * 2 = 4
x = 3 の時は、y = 3 * 2 = 6
x = 4 の時は、y = 4 * 2 = 8

このように関数と引数が決まれば、戻り値も決まる性質のことを参照透過性といいます。
逆に、以下のように、引数以外のaなどの状態に依存する場合は、aによる副作用が発生し関数の戻り値は一定ではありません。関数型言語では、副作用のないプログラミングモデルを採用しています。

a = 0 もしくは 1
x = 2 の時は、y = 2 * 2 + a = 4 もしくは 5
x = 3 の時は、y = 3 * 2 + a = 6 もしくは 7 
x = 4 の時は、y = 4 * 2 + a = 8 もしくは 9

この特性が、並行処理時にも効力を発揮します。関数が引数の他にリソースを共有しないため、そもそもスレッドセーフなのです。
この原理を借りれば、今回のサンプルもスレッドセーフになるはずです。以下のようにパラメータが決まれば、処理結果が決まるようにできないでしょうか。Threadクラスが戻り値を返さないので、本当の意味では参照透過性はありませんが、処理結果はリポジトリに反映することで代用します。*1

処理結果A = スレッド(パラメータA)
処理結果B = スレッド(パラメータB)
処理結果C = スレッド(パラメータC)

今回のモデルで考えるなら、こんなイメージ。

かとう更新データの適用 = スレッド(かとうの更新データ)
都元更新データの適用 = スレッド(都元の更新データ)

この方針に従ってコードを書きなおしたのが以下です。
Employeeオブジェクトは各スレッドで必要なインスタンスを作成し、EmployeeRepositoryオブジェクトは1つだけ作成し、スレッドに渡します。スレッドでは単純にEmployeeRepositoryクラスのstoreメソッドで更新操作を実行するだけです。しかし、storeメソッドは固有ロックでアトミックに動作するので、前述したような不整合は起こりません。また、Employee自体をUnitOfWorkとして、トランザクション管理の責務が実装されていることがわかります。

public class ThreadTest {

	static final Department エロ部 = Department.of("エロ部");
	static final Department 宴会部 = Department.of("宴会部");
	static final Department 企画部 = Department.of("企画部");
	static final Department 技術部 = Department.of("技術部");

	@Test
	public void test() {
		EmployeeRepository employeeRepository = new EmployeeRepository();
		CountDownLatch startLatch = new CountDownLatch(1);

		Employee kato = Employee.of(1L, "かとう");
		kato.setPosition(Position.EROGRAMMER);
		kato.getDepartments().add(エロ部);
		kato.getDepartments().add(技術部);
		kato.getDepartments().add(企画部);

		Employee tsumoto = Employee.of(1L, "都元");
		tsumoto.setPosition(Position.PROGRAMMER);
		tsumoto.getDepartments().add(技術部);
		tsumoto.getDepartments().add(企画部);
		tsumoto.getDepartments().add(宴会部);

		List<Thread> threads = new ArrayList<Thread>(4);

		threads.add(new Thread(new EmployeeTask(employeeRepository, kato,
				startLatch)));
		threads.add(new Thread(new EmployeeTask(employeeRepository, tsumoto,
				startLatch)));

		for (Thread thread : threads) {
			thread.start();
		}
		startLatch.countDown();
		for (Thread thread : threads) {
			try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		Employee employee = employeeRepository.findById(1L);

		System.out.println(employee);

		if (employee.getName().equals("かとう")) {
			assertThat(employee.getName(), is("かとう"));
			assertThat(employee.getPosition(), is(Position.EROGRAMMER));
			assertThat(employee.getDepartments().get(0), is(エロ部));
			assertThat(employee.getDepartments().get(1), is(技術部));
			assertThat(employee.getDepartments().get(2), is(企画部));

		} else if (employee.getName().equals("都元")) {
			assertThat(employee.getName(), is("都元"));
			assertThat(employee.getPosition(), is(Position.PROGRAMMER));
			assertThat(employee.getDepartments().get(0), is(技術部));
			assertThat(employee.getDepartments().get(1), is(企画部));
			assertThat(employee.getDepartments().get(2), is(宴会部));

		}
	}

	static class EmployeeTask extends Task {
		private final EmployeeRepository employeeRepository;

		EmployeeTask(EmployeeRepository employeeRepository, Employee employee,
				CountDownLatch startLatch) {
			super(employee, startLatch);
			this.employeeRepository = employeeRepository;
		}

		@Override
		public void run() {
			try {
				startLatch.await();
				employeeRepository.store(employee);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}

	}

	static abstract class Task implements Runnable {

		protected final Employee employee;
		protected final CountDownLatch startLatch;

		Task(Employee employee, CountDownLatch startLatch) {
			this.employee = employee;
			this.startLatch = startLatch;
		}

	}

}
public class EmployeeRepository {

	private final Map<Long, Employee> employees = new HashMap<Long, Employee>();

	public Employee findById(Long id) { // 今回はシングルスレッドで操作するので同期化対象外
		return employees.get(id);
	}

	public synchronized void store(Employee employee) {
		employees.put(employee.getId(), employee);
	}
}

このリポジトリは、以下のようにリファクタリングしてもよいでしょう。

public class EmployeeRepository {

	private final Map<Long, Employee> employees = new ConcurrentHashMap<Long, Employee>();

	public Employee findById(Long id) { // ConcurrentHashMapになったので同期化される
		return employees.get(id);
	}

	public void store(Employee employee) {
		employees.put(employee.getId(), employee);
	}
}

結果も期待通りです。

test.Employee@2aca0115[
  departments=[test.Department@16aa37a6[
  name=技術部
], test.Department@2c76e369[
  name=企画部
], test.Department@314c194d[
  name=宴会部
]]
  name=都元
  position=PROGRAMMER
  id=1
]

複数のスレッドで扱うエンティティのインスタンスが、別々であればそもそもスレッド間で干渉しないし、リポジトリでアトミックに更新動作をサポートすればよいこともわかりました。
が、複数のスレッドで同じエンティティのインスタンスを扱わないといけない場合、例えばid=1のEmployeeオブジェクトを複数のスレッドで扱う場合は、どのような設計戦略を取るのがよいのでしょうか?まぁ、最初の問い戻ってるわけですが、、、
上記のコードでは、id=1のEmployeeオブジェクトへの参照が、複数のスレッドに渡せてしまうので、この問題が解決されていません。

長くなったので、次のエントリでその辺りを考えましょう。

あわせて読みたい
スレッドセーフにするために考えること 〜基礎編〜 - じゅんいち☆かとうの技術日誌
スレッドセーフにするために考えること 〜応用編 その2〜 - じゅんいち☆かとうの技術日誌

*1:cocurrentパッケージにあるCallableクラスを使えば非同期処理で値を返すことができます。