読者です 読者をやめる 読者になる 読者になる

かとじゅんの技術日誌

技術の話をするところ

スレッドセーフにするために考えること 〜応用編 その2〜

スレッドセーフにするために考えること 〜応用編 その1〜 - じゅんいち☆かとうの技術日誌
では、エンティティのインスタンスを共有せずに、スレッドの振る舞いを関数型のようにするとよいという話をしました。そして、アトミックを必要とするトランザクション操作は、リポジトリの責務にしました。

エンティティのインスタンスを共有しない場合はよいが、どうしても共有しなければならない場合はどうしたらよか という点はまだ解決できていません。それをこのエントリで考えていきましょう。

スレッドセーフに対するエンティティとバリューオブジェクトの違い

まず、エンティティとバリューオブジェクトで共有した時の影響が異なる点について説明しておきます。
前エントリでも説明したようにEmployeeなどのエンティティは、同一インスタンスで状態を変更できる可変オブジェクトです。スレッドセーフにするときに工夫が必要なのは可変オブジェクトです。
そして、Departmentなどのバリューオブジェクトは、同一インスタンスで状態を変更できない不変オブジェクトです。そもそも、スレッドセーフなのは値を変更できない不変オブジェクトです。値を読み出すことしかできず、値を更新できないからです。
なので、Departmentクラスはそのままでスレッドセーフなので、Employeeクラスを考えていきましょう。

並行処理では不変条件を維持することが重要

たとえば、EmployeeRepositoryオブジェクトで同じIDのEmployeeオブジェクトに対して更新操作を行う場合は、スレッドセーフでしょうか?以下のようにEmployeeTaskを修正してみました。

public class ThreadTest {

	static final Department エロ部 = Department.of("エロ部");
	static final Department 宴会部 = Department.of("宴会部");
	static final Department 企画部 = Department.of("企画部");
	static final Department 技術部 = Department.of("技術部");

	@Test
	public void test() {
		EmployeeRepository employeeRepository = new EmployeeRepository();
		employeeRepository.store(Employee.of(1L, "名無し"));

		CountDownLatch startLatch = new CountDownLatch(1);

		List<Thread> threads = new ArrayList<Thread>(4);

		Employee kato = Employee.of(1L, "かとう");
		kato.setPosition(Position.EROGRAMMER);
		kato.getDepartments().add(エロ部);
		kato.getDepartments().add(技術部);
		kato.getDepartments().add(企画部);

		Employee tsumoto = Employee.of(1L, "都元");
		tsumoto.setPosition(Position.PROGRAMMER);
		tsumoto.getDepartments().add(技術部);
		tsumoto.getDepartments().add(企画部);
		tsumoto.getDepartments().add(宴会部);

		threads.add(new Thread(new EmployeeTask(employeeRepository, 1L, kato,
				startLatch)));
		threads.add(new Thread(new EmployeeTask(employeeRepository, 1L,
				tsumoto, startLatch)));

		for (Thread thread : threads) {
			thread.start();
		}
		startLatch.countDown();
		for (Thread thread : threads) {
			try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		Employee employee = employeeRepository.findById(1L);

		System.out.println(employee);

		if (employee.getName().equals("かとう")) {
			assertThat(employee.getName(), is("かとう"));
			assertThat(employee.getPosition(), is(Position.EROGRAMMER));
			assertThat(employee.getDepartments().get(0), is(エロ部));
			assertThat(employee.getDepartments().get(1), is(技術部));
			assertThat(employee.getDepartments().get(2), is(企画部));

		} else if (employee.getName().equals("都元")) {
			assertThat(employee.getName(), is("都元"));
			assertThat(employee.getPosition(), is(Position.PROGRAMMER));
			assertThat(employee.getDepartments().get(0), is(技術部));
			assertThat(employee.getDepartments().get(1), is(企画部));
			assertThat(employee.getDepartments().get(2), is(宴会部));

		}
	}

	static class EmployeeTask implements Runnable {
		private final Employee employee;
		private final EmployeeRepository employeeRepository;
		private final Long id;
		private final CountDownLatch startLatch;

		EmployeeTask(EmployeeRepository employeeRepository, Long id,
				Employee employee, CountDownLatch startLatch) {
			this.employeeRepository = employeeRepository;
			this.id = id;
			this.employee = employee;
			this.startLatch = startLatch;
		}

		@Override
		public void run() {
			try {
				startLatch.await();
				Employee target = employeeRepository.findById(id);
				target.setName(employee.getName());
				target.setPosition(employee.getPosition());
				target.getDepartments().clear();
				target.getDepartments().addAll(employee.getDepartments());
				employeeRepository.store(target);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

		}

	}

}

employeeRepository.findById(id)は、リポジトリ内部で管理しているEmployeeオブジェクトへの参照をそのまま返しています。この実装では、複数のスレッドでEmployeeオブジェクトを共有していることと同じです。
リポジトリ内部のEmployeeオブジェクトへの参照をそのまま外部に返すということは、あずかり知らぬところでEmployeeオブジェクトの状態を変更されてしまうリスクがあります。Employeeオブジェクトはリポジトリの管理下のオブジェクトです。このオブジェクトの状態が変わるということはリポジトリの状態も変わってしまうということです。リポジトリが状態を変更するときは、リポジトリのインターフェイスを通して行われるべきです。このようなオブジェクトが変わらない条件を不変条件といいます。

この不変条件を考慮したリポジトリを見てみましょう。
EmployeeRepositoryクラスのfindByIdメソッドでは参照を直接返すのではなく、Employeeのcloneメソッドによって複製を作って返すわけです。storeメソッドも外部の参照をそのままputするとリポジトリの不変条件が維持できません。これもcloneメソッドによって複製を作ることで副作用を起こさないようにしています。

public class EmployeeRepository {

	private final Map<Long, Employee> employees = new HashMap<Long, Employee>();

	public Employee findById(Long id) {
		Validate.notNull(id);
		if (employees.containsKey(id) == false) {
			throw new EntityNotFoundException(id);
		}
		return employees.get(id).clone();
	}

	public synchronized void store(Employee employee) {
		Validate.notNull(employee);
		employees.put(employee.getId(), employee.clone());
	}
}

Employeeはsetterを保持して属性を変更できるオブジェクトです。このようなオブジェクトを可変オブジェクトと呼びますが、可変オブジェクトは状態を更新する際に、複製を作ってから更新を行うと副作用を回避できます。これによって元のオブジェクトの状態を維持し、不変条件を維持できるわけです。

public class Employee extends AbstractEntity {

	public static Employee of(Long id, String name) {
		return new Employee(id, name);
	}

	private List<Department> departments = new ArrayList<Department>();

	private String name;

	private Position position;

	private Employee(Long id, String name) {
		super(id);
		this.name = name;
	}

	@Override
	public Employee clone() {
		Employee result = (Employee) super.clone();
		// cloneしたresultのdepartmentsは、このEmployeeオブジェクトと同じコレクションへの参照なので、ここでも複製を作る
		result.departments = getDepartments();
		return result;
	}

	public List<Department> getDepartments() {
		return departments;
	}

	public String getName() {
		return name;
	}

	public Position getPosition() {
		return position;
	}

	public void setName(String name) {
		this.name = name;
	}

	public void setPosition(Position position) {
		this.position = position;
	}

	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.MULTI_LINE_STYLE);
	}

}

不変条件を維持したリポジトリは、内部で管理しているEmployeeオブジェクトはスレッドによって直接操作できません。更新したければアトミックな更新動作を保証するstoreメソッドを経由するしかないのです。並行処理では不変条件を維持することが重要だということがわかります。

clone戦略は並行処理と相性がいい

Employeeの複製を返す以下の処理は、二つのスレッドで別々なインスタンスを返しますが、1LのEmployeeを返しています。

Employee target = employeeRepository.findById(id);

同じ内容の値を保持していますが、インスタンスが異なるわけです。各スレッドが複製のインスタンスを扱えば、自ずとスレッドセーフです。
さらに、スレッドが保有しているEmployeeオブジェクトはstoreされるときは、そのインスタンスではなく複製が保存されるため、これもまた副作用がなく、スレッドセーフになります。

employeeRepository.store(target);

ということで、エンティティは同一インスタンスの共有ではなく、不変条件を維持し、同値で異なるインスタンスを複製で扱えばスレッドセーフになることがわかりました。

エンティティの不変条件をチェックしよう

エンティティの不変条件を維持すれば、複数のスレッドでも共有できことがわかったので、改めてEmployeeクラスを確認してみます。
属性にはStringのnameがありますが、Stringは状態が変更できない不変オブジェクトです。不変オブジェクトはnewからgcされるまで状態を変更することができないので、あらゆるロジックで制限なく共有が可能です。なので、Stringは放置です。同様に列挙型のPositionも不変オブジェクトなので放置。
問題は、ArrayListであるdepartments。ArrayListは、同一インスタンスで要素の追加や削除ができる可変オブジェクトです。副作用を回避するには、複製を扱うことになるので、以下のようなコードになります。
要素であるDepartmentは不変オブジェクトであるため、要素はそのままにArrayListを複製して返しています。

public class Employee extends AbstractEntity {
	// ...

	private final List<Department> departments = new ArrayList<Department>();
	
	// ...

	public List<Department> getDepartments() {
		return new ArrayList<Department>(departments);
	}

	// ...
}

Departmentにsetterがある可変オブジェクトの場合は、要素を更新できてしまうので、以下のようにDepartmentの複数を複製したArrayListに追加して返す必要があります。

	public List<Department> getDepartments() {
		List<Department> result = new ArrayList<Department>();
		for(Department department : departments) {
			result.add(department.clone()); // 要素が可変オブジェクトならcloneする
		}
		return result;
	}

この改変によって、EmployeeTaskのrunメソッドの以下の処理は意味がなくなります。複製に追加しても意味がない訳です。

target.getDepartments().clear();
target.getDepartments().addAll(employee.getDepartments());

Employeeの不変条件に不整合を起こさないで、departmentsを更新するためのメソッドが必要なので、addDepartmentメソッドとclearDepartmentsメソッドを追加しました。

public class Employee extends AbstractEntity {
	// ...

	private final List<Department> departments = new ArrayList<Department>();

	// ...

	public void addDepartment(Department department) {
		departments.add(department);
	}

	public void clearDepartments() {
		departments.clear();
	}

	// ...

	public List<Department> getDepartments() {
		return new ArrayList<Department>(departments);
	}

	// ...
}

モデルの不変条件を考慮したスレッドのコードは以下のようになります。

@Override
public void run() {
	try {
		startLatch.await();
		Employee target = employeeRepository.findById(id);
		target.setName(employee.getName());
		target.setPosition(employee.getPosition());
		target.clearDepartments();
		for (Department department : employee.getDepartments()) {
			target.addDepartment(department);
		}
		employeeRepository.store(target);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
}

departmentsへの直接性が失われたことがデメリットですが、副作用のリスクとのトレードオフを行うことになります。

これで、他のスレッドに同じ値のエンティティを共有する場合は、cloneメソッドで複製を作って渡せば問題がないことがわかりました。
cloneは、浅いコピー(シャローコピー)です。name, position, departmentsの属性は同じ参照になります。nameやpositionは不変オブジェクトなのでそのままで問題がなく、departmentsは上記の例のとおり、「getDepartmentsメソッドが呼ばれた時にArrayListの複製を作るので問題が起きません。」というのは間違いということで、部下からツッコミを(汁、、、で、EmployeeのインスタンスとcloneしたEmployeeインスタンスで、同じコレクションへの参照を共有してしまうので、Employeeのclone時にも複製をつくらなければなりません。コレクションに類する可変オブジェクトはこの対策が必要ですね。ややこしいけど重要!詳しくはコメント欄を参照。

複製を渡すということは、その時のスナップショットです。本当に変更したい場合はアトミックな更新動作を保証するEmployeeRepositoryに反映するしかありません。更新の通知を受け取りたい場合はEmployeeRepositoryにObserverパターンを実装すると良いでしょう。

Employoee other = employee.clone();

あわせて読みたい
スレッドセーフにするために考えること 〜基礎編〜 - じゅんいち☆かとうの技術日誌
スレッドセーフにするために考えること 〜応用編 その1〜 - じゅんいち☆かとうの技術日誌