かとじゅんの技術日誌

技術の話をするところ

CQRS/ESによって集約の境界定義を見直す

peing.net

メッセージングシステムのお題のようです。面白そうなのでちょっと考えてみよう。

問題提起

集約候補が以下の3つ。

  • ユーザー
  • 企業
  • スレッド
    • メッセージ

スレッド集約はメッセージを複数保持するようです。

  • 1000件のメッセージを保持するスレッド集約を更新した際、1000件のアップデートが行われる
  • スレッド集約内部で更新された属性を把握していない場合は、リポジトリでは全メッセージ分の更新となる。これを避けるための仕組みはどう実装するのか?

ということが指摘されている。まぁわかります。これはCQRS/ESなら解決できるよと言ってみる

問題の分析

で、僕ならどう考えて実装に落とすかつらつらまとめてみよう。CQRS/ES前提です。Akkaの成分は少なめでScalaの擬似コードで解説します。コードはコンパイルしてないので…おかしなところあるかも。

問題はスレッド集約がメッセージの集合を保持した場合、更新コストが大きくなりがち。さらに差分更新しようとしても集約の内部実装が複雑になるのではという論点。

まず考えるのは、スレッドとメッセージの関係性に強い整合性は必要か? ということです。Noならメッセージを別の集約として切り出すのがよいとは思います。おそらく問題設定としてはYesになっている気がしますが…。 仮に契約プランによって書き込めるメッセージ数が変わるという場合は、スレッドとメッセージには強い整合性が必要かもしれません。別別の集約にした場合、トランザクションが別になり結果整合性になるため、インスタンス数(レコード数)を厳密に制御することはできませんね。ということで、強い整合性が必要という前提で進めます。

あと、この問題だけではスレッド集約がどのような使われ方をするのが厳密にはわかりません。つまり振る舞いがイメージできません。たとえば、スレッドにメンバーとして参加したアカウントだけがメッセージを閲覧・追加・更新できるというシナリオがあればスレッド集約の構造が明確になります。メンバーの考慮が必要なスレッド集約は以下のようなイメージになるでしょう。

case class Message(id: MessageId, text: MessageText, senderId: AccountId, createdAt: Instant, updatedAt: Instant)

case class Messages(values: List[Message]) {
  def add(message: Message): Messages = ???
  def update(message: Message): Messages = ???
}

case class Members(values: List[AccountId]) {
  def add(accountId: AccountId): Members = ???
  def remove(accountId: AccountId): Members = ???
  def contains(accountId: AccountId): Boolean = ???
}


class Thread(id: ThreadId, members: Members, messages: Messages, createdAt: Instant) {

  def addMember(account: AccountId): Thread = copy(members = members.add(accountId))

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = { 
    if (members.contains(senderId)) {
      Right(copy(messages = messages.add(Message(messageId, messageText, senderId, Instant.now, Instant.now))))
    } else {
      Left(new AddMessageError)
    }
  }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, sender: AccountId): Either[ThreadError, Thread] = 
    if (members.contains(senderId)) {
      Right(copy(messages = messages.updateMessage(Message(messageId, messageText, senderId, Instant.now, Instant.now))))
    } else {
      Left(new UpdateMessageError)
    }
    
  def getMessages(accountId: Account): Either[ThreadError, Messages] = 
    if (members.contains(accountId)) {
      Right(messages)
    } else {
      Left(new ReadMessagesError)
    }

}

課題に戻りますが、このクラスをリポジトリでCRUDすることを想像してみましょう。thread.messagesが1000件ある場合です。

for {
thread <- threadRepository.findById(threadId)  // 1000件メッセージを持つスレッドを取得
newThread <- thread.addMessage(Message(...)) // 状態変更
_ <- thradRepository.store(newThread) // 1001件メッセージを持つスレッドを更新
} yield ()

スレッドのどのフィールドが更新されたかわからない場合は、スレッドのタイトル、説明、メッセージの本文などをすべての情報をSQLなどを使って更新するのでしょうか? これはあまりに非効率ではないか…。それを回避するにはThread集約内部にどのフィールドをDB上で更新すべきか把握する仕組みが必要なのではないかという話ですね。 あと、そもそもfindByIdの時点で、本文も含む1000件のメッセージをDBから取得するのは効率がよいとはいえませんね。

CQRS/ES流の設計

上記の問題をCQRS/ESで改善できるか考えてみよう。

コマンド側

集約はCQRSのC(コマンド)側に所属しますが、役割は副作用を起こすことを目的にします。 たとえば、メンバーの追加・削除、メッセージを追加・更新などです。メンバーの確認はメッセージの追加・更新に必要なので含めます。これら以外はクエリ側の責務とします。

class Thread(id: ThreadId, memberIds: MemberIds, messages: MessageIds, createdAt: Instant) {

  def addMember(account: AccountId): Thread = copy(members = memberIds.add(accountId))

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      Right(copy(messages = messageIds.add(mesageId)) // 本文は保存しない
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, sender: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      Right(this) // スレッドではメッセージIDしか管理しないのでメッセージ本文を更新できない
    } else {
      Left(new UpdateMessageError)
    }
  

}

本文を持たないので大分軽量化できたのではないでしょうか?(ただ、内部にID集合は保持するので無限に保持するわけには行かないとは思います。上限値は必要だと思います) さて、クエリ側にどうやって連携するかですね。メッセージの本文も読めないと意味がないですからね。

ということで、擬似コードですが、以下のように副作用が起きたときにイベントをDBに追記します。このイベントは別のプロセスがコンシュームしてリードモデルを作る元ネタになります。

sealed trait ThreadEvent
case class MemeberAdded(threadId: ThreadId, accountId: AccountId, occurredAt: Instant) extends ThreadEvent
case class MessageAdded(threadId: ThreadId, messageId: MessageId, messageText: MesssageText, senderId: AccountId, occurredAt: Instant) extends ThreadEvent
case class MessageUpdated(threadId: ThreadId, messageId: MessageId, messageText: MesssageText, senderId: AccountId, occurredAt: Instant) extends ThreadEvent

class Thread(id: ThreadId, memberIds: MemberIds, messages: MessageIds, createdAt: Instant) {

  def addMember(accountId: AccountId): Thread = {
     persistEvent(MemberAdded(id, accountId, Instant.now))
     copy(members = memberIds.add(accountId))
   }

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      persistEvent(MessageAdded(id, messageId, messageText, senderId, Instant.now))
      Right(copy(messages = messageIds.add(mesageId))
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      persistEvent(MessageUpdated(id, messageId, messageText, senderId, Instant.now))
      Right(this)
    } else {
      Left(new UpdateMessageError)
    }
  

}

書き込みはこれでよいとして、集約をDBから再生する際はどうなるのか。DBに保存してあるイベント列を順番に空のThreadに適用するだけです。 これも疑似コードで説明。この考慮はあくまでコマンドを実行するための前提を整えるためのものです。

class Thread(id: ThreadId, events: Seq[ThreadEvent]) {

  private var memberIds: MemberIds = MemberIds.empty
  private var messages: MessageIds = MessageIds.empty

  // イベントから最新状態を復元
  events.foreach{
    case MemberAdded(_, accountId, _) => members = memberIds.add(accountId)
    case MessageAdded(_, messageId, messageText, senderId, createdAt) => messages = messages.add(Message(messageId, messageText, senderId, createdAt)
    case MessageUpdated(_, messageId, messageText, senderId, updatedAt) => messages = messages.update(Message(messageId, messageText, senderId, updatedAt)
  }


  def addMember(accountId: AccountId): Thread = {
     persistEvent(MemberAdded(id, accountId, Instant.now))
     copy(members = memberIds.add(accountId))
   }

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      persistEvent(MessageAdded(id, messageId, messageText, senderId, Instant.now))
      Right(copy(messages = messageIds.add(mesageId))
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      persistEvent(MessageUpdated(id, messageId, messageText, senderId, Instant.now))
      Right(this)
    } else {
      Left(new UpdateMessageError)
    }
  

}

これでドメインを扱うコマンド側では、差分を表現するイベントをジャーナルとして追記保存すればいいことになります。また最新の集約の状態はコマンドを受け付けるために必要最低限のものしか持ちません。クエリをしないので、これで問題は起きません。当初想定していたモデルよりかなり小さくなります。コマンド側の集約はリプレイ後はルールをチェックしてパスしたらイベントを追記保存するだけなんです。

AkkaはEvent Sourcingを標準サポート

ここでは示してませんが、イベントの列が巨大だとリプレイに時間が掛かります。例えばイベントN件に1回スナップショットを保存しておき、リプレイ時に最新スナップショット+差分のイベントでリプレイを高速化することもできます。とはいえ、リクエストのたびにイベント列を読み込み・適用するのは効率が悪そうです。Akkaのアプローチでは軽量プロセスであるActorとしてスレッド集約を実装します。上記のクラスがActorとして実装されます(集約アクター)。メソッドコールがメッセージパッシングに変わります。すでにイベント列がある集約アクターはAkkaがストレージからイベントを読み込み、メッセージとして適用してくれます。また一定時間メッセージを受け付けていない集約アクターはランタイムから消えるように設定できます。まぁ、必要な機能が揃っているので確実に楽できます。

doc.akka.io

クエリ側の設計

クエリ側のリードモデルはどうやって作るのか? 仮に上記のイベントがDynamoDBに書き込まれるとして、パーティションキーはThreadId, ソートキーはイベント番号(akkaでは自動採番してくれます)、イベント本体はJSONなどで格納されるイメージとします。これをDynamoDB Streamsなどでコンシュームします。コンシューム部分はLambdaでもKCLでもよいです。以下を参考にしてみてください。

docs.aws.amazon.com docs.aws.amazon.com

この方法でイベントが時系列順に手に入ります。リードモデルを作るにはこのイベントを順番にリードDBに反映します。 仮にRDBにTHREADテーブル、MEMBERテーブル、MESSAGEテーブルがある場合は以下のような処理になります。 consumeEventsByThreadIdFromDDBStreamsはDynamoDB Streamsからレコードを読み込んでイベントを返す関数です。

consumeEventsByThreadIdFromDDBStreams.foreach{ 
  case ev: ThreadCreated => insertThread(ev) // 実際にはスレッドが作られたときのイベントも永続化する必要がある
  case ev: MemberAdded => insertMember(ev)
  case ev: MessageAdded => insertMessage(ev)
  case ev: MessageUpdated => updateMessage(ev)
}

最終的にリードモデルはレスポンスの形式に併せて定義するとよいでしょう。

case class ThreadDto(id: Long, ...)
case class MemberDto(id: Long, accountId: Long, createdAt: Int)
case class MessageDto(id: Long, threadId: Long, text: String, senderId: Long, createdAt: Instant, updatedAt: Instant)

リードモデルはDAO内部でSQLを使って、部分集合を取り出したりできます。リードモデルはお好きなように…。

val messages: Seq[MessageDto] = MessageDao.findAllByThreadIdWithOffsetLimit(threadId, 0, 100)

結局どうなるのか

最終的な疑似コードイメージです。 1000件のメッセージを保持するスレッドといってもIDしか持ちません。本文を持つより簡単にリプレイし一つのコマンドを付けて一つのイベントを永続化するだけです。1001件のメッセージは永続化しません。間違いなくスケールするのはこっちですね。

val thread = Thread(ThreadId(1L)) // 永続化されているイベント列があれば渡される。なければイベント列は空で作られる
thread.addMessage(...) // 1つのコマンドで1つのイベントが追記されるだけ。1001件も更新されません。

補足

上記の考え方がわかったとしても、CQRS/ESは自前でやるのは大変です。 たとえば、以下の処理が複数のサーバで起こった場合、ロックがなくても大丈夫でしょうか?ダメそうですねw そう書き込むためのトランザクション管理が必要なのです…。

val thread = Thread(ThreadId(1L))
thread.addMessage(...) 

AkkaではThread部分がActorになりますが、複数のサーバで起動しても、ユニークになるように分散できます。つまり分散システム上で同じIDのActorがたかだか一つになるように配置してくれるので、トランザクションの問題は起きません。 こういったことを考えると、CQRS/ESをサポートしたフレームワークやライブラリを使うほうが圧倒的に楽です。Akka お勧めです。

akka.io