かとじゅんの技術日誌

技術の話をするところ

スレッドセーフにするために考えること 〜応用編 その3〜

終わったかとみえたスレッドセーフのネタですが、再浮上。
連投ものだと読むのが大変なんですが、よくぞこのエントリまでおいでくださいましたm(__)m

ということこで、Departmentクラスはバリューオブジェクト(不変オブジェクト)でしたが、エンティティ(可変オブジェクト)にしてみたら、どういう苦労をするのだろうか、というMっ気ったぷりな今回のテーマ。

早速、Departmentクラスをエンティティにしてみました。nameは可変できる属性。AbstractEntityはこちら参照

public class Department extends AbstractEntity {

	public static Department of(UUID id, String name) {
		return new Department(id, name);
	}

	private String name;

	private Department(UUID id, String name) {
		super(id);
		Validate.notNull(name);
		this.name = name;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.MULTI_LINE_STYLE);
	}
}

今回のソースは、リポジトリの構成が変わっています。

  • Repositoryインターフェイスを新設。
  • 汎用的なオンメモリなリポジトリ OnMemoryRepositoryを新設。このリポジトリはメモリ上のリポジトリなんでcloneできます。
  • EmployeeとDepartmentのエンティティに対応するEmployeeRepository, DepartmentRepositoryを新設。こちらはDBやファイルなどの複製しにくい永続化層を想定しているので、cloneできません。

まず、Repositoryインターフェイスから。

public interface Repository<T extends Entity> {

	// エンティティをリストとして取得する
	Set<T> asEntitiesSet();

	// リポジトリが管理しているすべてのエンティティを削除する。
	void deleteAll();

	// 識別子(ID)を指定してエンティティを削除する
	T delete(UUID id);

	// 識別子(ID)を指定してエンティティを検索する。
	T resolve(UUID id);

	// エンティティを保存する。
	void store(T entity);

}

お手軽に使えるオンメモリなリポジトリ内部は単なるMapです。一応スレッドセーフになっているはず。

public class OnMemoryRepository<T extends Entity> implements Repository<T>,
		Cloneable {

	private volatile Map<UUID, T> entities = new ConcurrentHashMap<UUID, T>();

	@Override
	@SuppressWarnings("unchecked")
	public synchronized Set<T> asEntitiesSet() {
		Set<T> result = new HashSet<T>();
		Set<Entry<UUID, T>> entrySet = entities.entrySet();
		for (Entry<UUID, T> entry : entrySet) {
			result.add((T) entry.getValue().clone());
		}
		return result;
	}

	@Override
	public void deleteAll() {
		entities.clear();
	}

	@Override
	@SuppressWarnings("unchecked")
	public synchronized OnMemoryRepository<T> clone() {
		try {
			OnMemoryRepository<T> repository = (OnMemoryRepository<T>) super
					.clone();
			Map<UUID, T> old = repository.entities;
			repository.entities = new ConcurrentHashMap<UUID, T>();
			repository.entities.putAll(old);
			return repository;
		} catch (CloneNotSupportedException e) {
			throw new Error(e);
		}
	}

	@Override
	@SuppressWarnings("unchecked")
	public synchronized T resolve(UUID id) {
		if (entities.containsKey(id) == false) {
			throw new EntityNotFoundException(id);
		}
		return (T) entities.get(id).clone();
	}

	@Override
	@SuppressWarnings("unchecked")
	public void store(T entity) {
		entities.put(entity.getId(), (T) entity.clone());
	}

	@Override
	public T delete(UUID id) {
		return entities.remove(id);
	}
}

次は、Employeeクラスです。Employeeはグローバルな同一性を持つのでアグリゲートの集約ルートです。そして、Employeeクラスは、もう一つのグローバルな同一性を持つ集約ルートであるDepartmentオブジェクトの参照を複数保持します。*1
とりあえず、Departmentオブジェクトを複数保持するのに、上記で紹介したOnMemoryRepositoryクラスを使わずに、ここでは普通にHashMapにしました。

public class Employee extends AbstractEntity {

	public static Employee of(UUID id, String name) {
		return new Employee(id, name);
	}

	private Map<UUID, Department> departments = new HashMap<UUID, Department>();

	private String name;

	private Position position;

	private Employee(UUID id, String name) {
		super(id);
		this.name = name;
	}

	public void addDepartment(Department department) {
		departments.put(department.getId(), department);
	}

	public void clearDepartments() {
		departments.clear();
	}

	@Override
	public Employee clone() {
		Employee result = (Employee) super.clone();
		result.departments = new HashMap<UUID, Department>();
		return result;
	}

	public Set<Department> getDepartments() {
		return new HashSet<Department>(departments.values());
	}

	public String getName() {
		return name;
	}

	public Position getPosition() {
		return position;
	}

	public void setName(String name) {
		this.name = name;
	}

	public void setPosition(Position position) {
		this.position = position;
	}

	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.MULTI_LINE_STYLE);
	}

}

Departmentクラスは可変オブジェクトですが、EmployeeクラスはDepartmentオブジェクトを集約していない関係です。実装としてはHashMapのインスタンス自体は複製しますが、要素は複製しません。それは、グローバルな集約ルートなので手を出さない感じ。

上記のOnMemoryRepositoryクラスでDepartmentオブジェクトへの参照を持った場合は、Entity#cloneで複製を作って返すので結局EmployeeがDepartmentの不変条件を維持することになってしまうからです。これは、集約ルートが集約ルートの参照を保持する場合は気を付けないといけないところではないかと。OnMemoryRepositoryクラスでやるなら、Entity#cloneしない機能が必要かもしれません。

public class Employee extends AbstractEntity {

	public static Employee of(UUID id, String name) {
		return new Employee(id, name);
	}

	// 集約関係にならないように気をつけるべき。falseで生成するとEntity#cloneしないモードになるとか。
	private OnMemoryRepository<Department> departmentRepository = new OnMemoryRepository<Department>(false);

	// 以下、省略。
}

ということで、EmployeeとDepartmentのリポジトリをこんな感じに作ってみました。このような実装がよいかまだ分かってませんが一例として。
面倒なんで内部でOnMemoryRepositoryを持っていますが、本当はDBやファイルを扱うAPIに永続化処理を委譲する感じになると思います。そこは適宜読み替えてください。

public class DepartmentRepository implements Repository<Department> {

	private final OnMemoryRepository<Department> internalRepository = new OnMemoryRepository<Department>();

	@Override
	public Set<Department> asEntitiesSet() {
		return internalRepository.asEntitiesSet();
	}

	@Override
	public void deleteAll() {
		internalRepository.deleteAll();
	}

	@Override
	public Department resolve(UUID id) {
		return internalRepository.resolve(id);
	}

	@Override
	public void store(Department entity) {
		internalRepository.store(entity);
	}

	@Override
	public Department delete(UUID id) {
		return internalRepository.delete(id);
	}

}

EmployeeRepositoryクラスは前回のエントリと同様にDepartmentクラス(集約ルート)に対応するDepartmentRepositoryオブジェクトの参照を内部で保持し、Departmentクラスの永続化処理を委譲します。

public class EmployeeRepository implements Repository<Employee> {

	private final Map<UUID, Set<UUID>> departmentIds = new ConcurrentHashMap<UUID, Set<UUID>>(); // Departmentとの関連を示すSetのMap
	private final OnMemoryRepository<Employee> internalRepository = new OnMemoryRepository<Employee>();
	private final DepartmentRepository departmentRepository;

	public EmployeeRepository(DepartmentRepository departmentRepository) {
		Validate.notNull(departmentRepository);
		this.departmentRepository = departmentRepository;
	}

	@Override
	public Employee resolve(UUID id) {
		Employee entity = internalRepository.resolve(id);
		loadDepartments(entity);
		return entity;
	}

	// EmployeeにDepartmentを関連付ける
	private synchronized void loadDepartments(Employee entity) {
		Set<UUID> set = departmentIds.get(entity.getId());
		if (set != null) {
			entity.clearDepartments(); // あると問題なので既存のものは無視する
			for (UUID departmentId : set) {
				Department department = departmentRepository
						.resolve(departmentId);
				entity.addDepartment(department);
			}
		}
	}

	@Override
	public synchronized void store(Employee entity) {
		internalRepository.store(entity);
		Set<UUID> set = new HashSet<UUID>(); // DepartmentをDepartmentRepositoryに委譲し、関連を保存。
		for (Department d : entity.getDepartments()) {
			departmentRepository.store(d);
			set.add(d.getId());
		}
		departmentIds.put(entity.getId(), set);
	}

	@Override
	public Set<Employee> asEntitiesSet() {
		Set<Employee> result = internalRepository.asEntitiesSet();
		for (Employee e : result) { // Departmentの参照を関連付ける処理
			loadDepartments(e);
		}
		return result;
	}

	@Override
	public synchronized void deleteAll() {
		internalRepository.deleteAll();
		departmentIds.clear(); // Departmentとの関連をクリア
	}

	@Override
	public synchronized Employee delete(UUID id) {
		Employee result = internalRepository.delete(id);
		departmentIds.remove(id); // Departmentとの関連を削除
		return result;
	}

}

こちらがスレッドのコードですが、、Departmentが並行で更新されないので、さすがにこれだとスレッドセーフですね。

public class ThreadTest {

	static final Department エロ部 = Department.of(UUID.randomUUID(), "エロ部");
	static final Department 宴会部 = Department.of(UUID.randomUUID(), "宴会部");
	static final Department 企画部 = Department.of(UUID.randomUUID(), "企画部");
	static final Department 技術部 = Department.of(UUID.randomUUID(), "技術部");

	@Test
	public void test() {
		DepartmentRepository departmentRepository = new DepartmentRepository();
		EmployeeRepository employeeRepository = new EmployeeRepository(
				departmentRepository);

		departmentRepository.store(エロ部);
		departmentRepository.store(宴会部);
		departmentRepository.store(企画部);
		departmentRepository.store(技術部);
		UUID id = UUID.randomUUID();
		employeeRepository.store(Employee.of(id, "名無し"));

		CountDownLatch startLatch = new CountDownLatch(1);

		List<Thread> threads = new ArrayList<Thread>(4);

		Employee kato = Employee.of(id, "かとう");
		kato.setPosition(Position.EROGRAMMER);
		kato.addDepartment(エロ部);
		kato.addDepartment(技術部);
		kato.addDepartment(企画部);

		Employee tsumoto = Employee.of(id, "都元");
		tsumoto.setPosition(Position.PROGRAMMER);
		tsumoto.addDepartment(技術部);
		tsumoto.addDepartment(企画部);
		tsumoto.addDepartment(宴会部);

		threads.add(new Thread(new EmployeeTask(employeeRepository, id, kato,
				startLatch)));
		threads.add(new Thread(new EmployeeTask(employeeRepository, id,
				tsumoto, startLatch)));

		for (Thread thread : threads) {
			thread.start();
		}
		startLatch.countDown();
		for (Thread thread : threads) {
			try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		Employee employee = employeeRepository.resolve(id);

		System.out.println(employee);

		if (employee.getName().equals("かとう")) {
			assertThat(employee.getName(), is("かとう"));
			assertThat(employee.getPosition(), is(Position.EROGRAMMER));
			assertThat(employee.getDepartments().contains(技術部), is(true));
			assertThat(employee.getDepartments().contains(企画部), is(true));
			assertThat(employee.getDepartments().contains(エロ部), is(true));

		} else if (employee.getName().equals("都元")) {
			assertThat(employee.getName(), is("都元"));
			assertThat(employee.getPosition(), is(Position.PROGRAMMER));
			assertThat(employee.getDepartments().contains(技術部), is(true));
			assertThat(employee.getDepartments().contains(企画部), is(true));
			assertThat(employee.getDepartments().contains(宴会部), is(true));

		}
	}

	static class EmployeeTask implements Runnable {
		private final Employee employee;
		private final Repository<Employee> employeeRepository;
		private final UUID id;
		private final CountDownLatch startLatch;

		EmployeeTask(Repository<Employee> employeeRepository, UUID id,
				Employee employee, CountDownLatch startLatch) {
			this.employeeRepository = employeeRepository;
			this.id = id;
			this.employee = employee;
			this.startLatch = startLatch;
		}

		@Override
		public void run() {
			try {
				startLatch.await();
				Employee target = employeeRepository.resolve(id);
				target.setName(employee.getName());
				target.setPosition(employee.getPosition());
				target.clearDepartments();
				for (Department department : employee.getDepartments()) {
					target.addDepartment(department);
				}
				employeeRepository.store(target);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}

	}

}

なので、runメソッドの処理の一部を以下のように修正。

// ...
target.clearDepartments();
for (Department department : employee.getDepartments()) {
	department.setName(employee.getName()); // 部署名に従業員名を入れてみる
	target.addDepartment(department);
}
employeeRepository.store(target);
// ...

大変です、都元が紛れ込んでいます。スレッドセーフなら部署名がすべて”かとう”になるはず。

test.Employee@6cb8[
  departments={925fc99f-fcba-4848-8b1c-fd95bec75bde=test.Department@f01a1e[
  name=都元
  id=925fc99f-fcba-4848-8b1c-fd95bec75bde
], 11284be6-1f95-47c7-a122-efae12f98e91=test.Department@14d6a05e[
  name=都元
  id=11284be6-1f95-47c7-a122-efae12f98e91
], c19526dc-a66b-43b9-ac4c-c2427f4d3a03=test.Department@16ba8602[
  name=かとう
  id=c19526dc-a66b-43b9-ac4c-c2427f4d3a03
]}
  name=かとう
  position=EROGRAMMER
  id=a4dbce05-0c74-4cc2-bb72-4ed050235adb
]

案の定、スレッドセーフにならなかったですね。。
for文内のdepartmentはグローバルな同一性を持つエンティティ(集約ルート)です。だから、複数のスレッド間で共有していることになるので、当然と言えば当然。
このぐらいであれば、クライアントサイドロックでもよいかも。

// ...
target.clearDepartments();
for (Department department : employee.getDepartments()) {
	synchronized(department) {
		department.setName(employee.getName()); // 部署名に従業員名を入れてみる
	}
	target.addDepartment(department);
}
employeeRepository.store(target);
// ...

スレッドのコードを変更せずに対策するとすれば、以下のようにスレッド単位でDpeartmentオブジェクトの複製を作ることでスレッドセーフにできますね。cloneでインスタンスを共有する単位を制御する感じですね。

Employee kato = Employee.of(id, "かとう");
kato.setPosition(Position.EROGRAMMER);
kato.addDepartment(エロ部.clone()); 
kato.addDepartment(技術部.clone());
kato.addDepartment(企画部.clone());

Employee tsumoto = Employee.of(id, "都元");
tsumoto.setPosition(Position.PROGRAMMER);
tsumoto.addDepartment(技術部.clone());
tsumoto.addDepartment(企画部.clone());
tsumoto.addDepartment(宴会部.clone());

とりあえず、こんな感じなりました。

スレッドのネタはまだまだあるので、、ペース落としてぼちぼちやります。

追記:
複製のclone戦略は不変性を確保するのにちょうどよいのですが、実行環境によってはコストが高くつく場合があるので、何でもかんでも複製というわけにはいかないだろうと思っています。特に性能とか、メモリが厳しい場合は。そういう環境では、なるべく、集約ルートに集約ルートの参照を持たせないで、バリューオブジェクトかローカルエンティティを集約したほうがよいですね。

追記:
OnMemoryRepositoryクラスのentitiesは、finalでもvolatileでもない変数として宣言していました。複数のスレッドからstoreメソッドとdeleteメソッドを呼び出した場合、entitiesが見えない場合があるので、volatile修飾子を追加しました。volatileを付加しない場合は、これらのメソッドにもsynchronizedをかける必要があります。

private volatile Map<UUID, T> entities = new ConcurrentHashMap<UUID, T>();

*1:getDepartments()は前回はListだったのですが、順番の制御が面倒なんでSetにしてしまいました。