かとじゅんの技術日誌

技術の話をするところ

CQRSはなぜEvent Sourcingになってしまうのか

CQRSはなぜEvent Sourcingになってしまうのか、まとめてみたいと思います。

なぜまとめるか、それはCQRSにとってEvent Sourcingはオプションだと誤解されている方が多いからです。この記事を書いてる本人も最初はそう思っていましたが、実際に開発・運用を経験してみるとCQRSにとってEvent Sourcingはほぼ必須で、認識を改めるべきだと気づきました。なので、原義に基づいたうえで、Event SourcingではないCQRSがなぜよくない設計になるのか解説します。

その前に松岡さんの記事について。

CQRSの領域ではモデルを完全に分ける

松岡さんの記事には”CQRSはモデルを完全に分ける必要はない”と書かれていますが、知識がないと誤解しがちですが文字のまま意味を取るといけません。こちらの言及は、システムのうち、モデルをC/Qに分割するCQRS領域とモデルを分割しない非CQRS領域に分けることができて、CQRSを部分導入できますよいう意味だと思います(下図参照)。モデルを分けなくていいのはあくまで非CQRS領域です。間違っても「CQRSはモデルを分割しなくてもいいんだ!」という解釈をしてはいけません。その解釈はもはやCQRSではありません。

部分的導入について

重要なことですが、CQRSは部分的な導入が可能です。 つまり、「参照用モデルと更新用モデルを完全に分ける必要はない」ということです。 どちらかというと、「必要なところだけ参照に特化したモデルを導入する」といった使い方が適切でしょう。

CQRS実践入門 [ドメイン駆動設計] - little hands' lab

f:id:j5ik2o:20200918095049p:plain

CQRSはモデルだけでなくモジュールも分割する

さらにモデルを分けるだけではなく、そのモデルを内包するトップレベルのモジュール同士を分離というか、隔離しなければなりません。という話はこちらの記事を参照してください。

blog.j5ik2o.me

シンプルなCQRSの概念図は以下です。上述したようにC/Qは隔離されています。データベースのところはRDB前提で書いていますが、実際の運用ではこうしません。想像しやすいように書いてるだけで、厳密には書いてないと思ってください。

f:id:j5ik2o:20200918154402p:plain

Event SourcingではないCQRSを考える

この記事では、リポジトリからリード専用DAOまでの設計を考えていきましょう。最初はEvent Sourcingを考えずに素直に状態に基づくCRUD脳で考えていきます。

具体的なコードのイメージは以下の記事を読むといいです。

blog.j5ik2o.me

対象ドメインはショッピングカート前提で考えます。

コマンドプロセッサ実装

コマンドプロセッサの実装例です。コマンド側のユースケースクラスだと思ってください。 カートオブジェクトとリポジトリの操作をCRUDで考えると以下のように実装になるでしょう。

class AddCartItemCommandProcessor(cartRepository: CartRepository) {

  def execute(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
    // 最新の集約(グローバルなエンティティ)をストレージから取得する
    val cart = cartRepository.findById(cartId)
    // ロジック実行: 予算超過ならカートオブジェクトが商品の追加を拒否する!
    val newCart = cart.addItem(itemId, num)
    // 更新された最新状態をストレージに保存する
    cartRepository.store(newCart)
  }

}

まずC側のテーブル(A)とQ側のテーブル(B)は形式が異なるのですが、まずリポジトリによってカートオブジェクトは以下のテーブルに永続化されます。 (カート内のアイテムは別テーブルにマッピングされますが、リポジトリでfindAllByItemIdのような逆引き検索がなければ、子テーブルは不要かもしれませんが、一旦わかりやすさを優先して子テーブルを設けています)

  • C側のテーブル(A)
    • カートテーブル
      • カートID(PK)
      • 顧客アカウントID
      • 上限予算金額
      • 作成日時
    • カートアイテムテーブル
      • カートアイテムID(PK)
      • カートID(FK)
      • 商品ID
      • 数量
      • 作成日時

このように永続化されますが、ビジネスロジックで計算する値もあります。カートの合計金額です。 あと、商品の単価は外部のオブジェクトから提供されるので、カートオブジェクト内部で保持してません。

case class Cart(id: CartId, userAccountId, upperLimitPrice: Price, items: CartItems, createAt: Instant) {
  // 合計金額計算
  def totalPrice(priceResolver: ItemId => Price): Price = {
    items.fold(Price.zero){ (t, item) => t + item.price(priceResolver) }
  }
}

case class CartItem(id: CartItemId, itemId: ItemId, quantity: Quantity, createAt: Instant)  {
  // 価格計算メソッド
  def price(priceResolver: ItemId => Price): Price = priceResolver(itemId) * quantity
}

クエリプロセッサの実装例

クエリ側のユースケースクラス相当の実装です。 クエリ側のリード専用DAOとリードモデルであるDTOは以下のようなイメージになります。

class GetCartQueryProcessor(cartDao: CartDao) {

  def execute(cartId: Long): Vector[CartDto] = {
     cartDao.findByCartId(cartId)
  }

}

case class CartDto (
  id: Long,
  userAccountId: Long,
  userAccountName: String,
  upperLimitPrice: String,
  totalPrice: String
  items: Seq[CartItemDto]
)

case class CartItemDto(
  id: Long,
  itemId: Long,
  itemName: String,
  unitPrice: String
  quantity: Long,
  price: String)

対応するテーブルはDTOと同形で、C側と比べて非正規型になっています。C型と似てるようで似てない。別物です。あとで説明しますが、(★)のところが厄介ですね。

  • Q側のテーブル(B)
    • カートテーブル
      • カートID(PK)
      • 顧客アカウントID
      • 顧客アカウント名(●)
      • 上限予算金額
      • 合計金額(★)
    • カートアイテムテーブル
      • カートアイテムID(PK)
      • 商品ID
      • 商品名(●)
      • 数量
      • 単価(★)
      • 価格(★)

補足:

(●)も面倒ですが、これは他の集約(エンティティ)のデータとして永続化されていて、SQLで解決できる可能性があります。

めちゃくちゃ単純ですが、これでC/Qが独立しています。が、C/Qを連携させなければ更新されたデータを読み込むことができません。上図の(C)の部分です。

コマンド側からクエリ側に変更をどう通知するか

はい。やっと本題です。コマンド側からクエリ側に変更をどう通知するか、(C)部分の実装について以下に挙げてみます。

  • テーブル(A)の更新トリガ時の変更を、SQLを使ってテーブル(B)を書き込む
  • プログラムで、テーブル(A)を読み込み、その結果をテーブル(B)に書き込む

テーブル(A)の更新トリガ時の変更を、SQLを使ってテーブル(B)を書き込む

トリガー例はあえて書きませんが、更新データを受信してテーブル(B)のレコードをまるっと全カラム書くとよいですね。欲を言えば、更新されたレコードだけを検知したいですが、全カラム更新するしかないと思います。まぁリポジトリで集約(エンティティ)を保存する時点で全カラム更新なら、ここで差分更新を頑張っても釣り合わないですね。と、C側で静的に保持されているデータをQ側に書くだけならトリガーでよさそうです。

問題は(★)の部分。(★)はC側のデータベース上にはありません。ドメインオブジェクトの振る舞いによって計算される値だからです。 まさかドメインロジックと同じ仕様のSQLを書くとかないですよね…。まぁ振る舞いのない貧血症オブジェクトしかないなら、特に問題はないでしょうね(皮肉) あ、例えトランザクションスクリプトであっても、ロジッククラスの助けなしに計算できない値があるとしたら?同じことですよね?

苦肉の策としては計算した結果もC側のテーブルに書き込むという方法です。カートオブジェクトだと、priceResolver引数が外部のオブジェクトなので、まずこれが永続化時にないとこういったことはできません。リポジトリのI/Fを def store(cart: Cart, priceResolver: ItemId => Price): Unitするなど設計に歪みがでることがあります。つまり、リポジトリ内部でビジネスロジックを起動するということです。リポジトリの責務違反…

そもそも、トリガーという時点でイベントに頼ってるような印象ですが、こういう計算で導出される値の扱いは難しいです。

プログラムで、テーブル(A)を読み込み、その結果をテーブル(B)に書き込む

(★)の問題があるから、一度ドメインオブジェクトに計算させてから、テーブル(B)に書けばよいことになります。これならQ側にもれなくデータを転送できそうです。が、変更のトリガーってどう検知しましょうか? まさか、C側のDBに変更されているかどうかわからないけど、ポーリングします?対象の集約はどうしますか?集約が大量にもあっても、それら全部ポーリングします?全然機能するイメージがないですね…。やっぱり最新状態を手に入れるにしても、更新イベントが必要なんです

補足:

Q側のテーブルをC側のVIEWにすればよいのでは?というアイデアもあると思いますが、(★)の問題はSQLで解決できません。結局ドメインオブジェクトに頼ることになります。

結局 Event Sourcingにたどり着く

上述した2パターンのよいところを組み合わせるとよさそうです。C側のテーブル(A)以外に更新イベントを通知するキューを用意します。以下のようになります。

f:id:j5ik2o:20200918165640p:plain

リポジトリ側でC側のテーブル(A)を更新する以外に、更新イベントを通知するためにキューにもイベントを書き込みます。そのうえで、リードモデル更新プロセスがそのイベントを受け取り、テーブル(A)から集約(エンティティ)を再現し、Q側のテーブル(B)に書き込めばよいです。まぁ、いちいちC側のテーブル(A) を読まずに、変更内容を更新イベントに載せたらどうかという発想もありますね。

一見、これで問題がないようにみえますが、C側のリポジトリは二つのストレージに書き込みを行っています。テーブル(A)はRDB、更新イベントを伝えるキューはRDBは不向きでしょう。AWSであればSQSのようなものを想像するでしょう。となると、同一トランザクションにはなりません。はい。高いコストを払う可能性がある、ダブルコミット問題が生じます。以下の記事でもダブルコミットは避けましょうという話をしました。

blog.j5ik2o.me

ダブルコミットを避けるためにイベントを真のデータソースにします。C側のドメイン状態はイベントから作るようにします。そして、C側のDBはもはやRDBである必要はありません。上記記事にも書きましたが、集約で発生するイベントを追記したり、集約IDごとのイベント列を読み込めれば十分だからです。実際はNoSQL(KVS)を使うことが多いです。AWSのDynamoDBであればDynamoDB Streamsからスケーラブルにイベントを読み込めます。これがEvent Sourcingですね。結局ここにたどり着きます。

f:id:j5ik2o:20200918170500p:plain

CQRSシステムのほとんどがEvent Sourcingを採用しているそうです(Lightbend社調べ)。その理由を分かっていただけたのではないでしょうか。

CQRSはモデルだけでなくモジュールも分割する

掲題についての議論です。

僕の結論はこちら。"モジュール分割せずにモデルを分割するだけ"はCQRSと呼んでいけないのでは?まず原義からちゃんと把握しようというお話。

sipadan2003.blogspot.com

Gregさんの原義によると

Origins

起源

Command and Query Responsibility Segregation uses the same definition of Commands and Queries that Meyer used and maintains the viewpoint that they should be pure. The fundamental difference is that in CQRS objects are split into two objects, one containing the Commands one containing the Queries.

コマンドとクエリの責任の分離は、Meyer氏が使用していたものと同じ定義を使用しており、純粋なものであるべきという視点を維持しています。根本的な違いは、CQRSではオブジェクトが2つのオブジェクトに分割され、1つはコマンドを含むオブジェクト、もう1つはクエリを含むオブジェクトに分割されます。

ここだけ読むと「そうかオブジェクトを分ければいいだけか」となりますが、

The Query Side

クエリ側

After CQRS has been applied there is a natural boundary. Separate paths have been made explicit. It makes a lot of sense now to not use the domain to project DTOs. Instead it is possible to introduce a new way of projecting DTOs

CQRSを適用した後には、自然な境界があります。別々のパスが明示されています。DTOを投影するためにドメインを使用しないことは、今では非常に理にかなっています。その代わりに、DTOを投影する新しい方法を導入することができます。

ドメインを利用するコマンド側とDTOを利用するクエリ側には境界があるとされている。原義のPDFでも大半の部分でこの隔離法を説いている。以下は抜粋。

In the “Stereotypical Architecture” the domain was handling both Commands and Queries, this caused many issues within the domain itself.

ステレオタイプのアーキテクチャでは、ドメインはコマンドとクエリの両方を処理していたため、ドメイン自体に多くの問題が発生しました。

Once the read layer has been separated the domain will only focus on the processing of Commands. These issues also suddenly go away. Domain objects suddenly no longer have a need to expose internal state, repositories have very few if any query methods aside from GetById, and a more behavioral focus can be had on Aggregate boundaries.

読み取り層が分離されると、ドメインはCommandsの処理のみに集中するようになります。 これらの問題も突然なくなります。 ドメインオブジェクトは突然内部状態を公開する必要がなくなり、リポジトリにはGetById以外のクエリメソッドがある場合はほとんどありません。

まとめると、モジュールレベルでCとQが分かれているということです。簡単にCQRSとは何かを説明するなら以下となる。

たぶん、C/Qをモジュールとして隔離しなくてもモデルだけ分ければいい派はいるでしょう。いてもいいですが、僕はそれを原義に照らしてCQRSと言ったらダメでしょう派です。GregさんがSegregationという強い言葉をわざわざ選んだ理由を考えるべき。

原義をこのように理解しているが、このままで終わると原理主義と言われるので、次の記事で背理法的にモジュール分割せずにモデルだけ分ける立場でそれが、なぜ良い設計にならないか書いてみよう。

具体的な実装コードからEvent Sourcingを理解する

DDD Community JPのほうでCQRS/Event Sourcingについて少し盛り上がったので、どういう議論をしたかまとめるのと同時に補足も追加しました。ちなみに、Event Sourcingが主題ですが、CQRSも前提として関係します。その想定で読んでいただければと。

発端はこのツイート。

僕が引用したツイートは松岡さんの質問箱に対するリアクションです。その質問箱に寄せられた質問は以下。

ストリームを開いてから閉じるまでのデータが変化する毎にUIで表示したい場合、DDDではどのように設計したら良いでしょうか? DDDのリポジトリは1つのリクエストに対して1つのリクエストを返すイメージがあり、ストリームをどのような形で扱ったら良いのかつかめずにいます。

かなり抽象的な質問なのでいろいろ確認しないとわからないのですが、CQRS/Event Sourcingならどう解決するか考えてみました。こういった説明が抽象的でわかりにくいという声をよく聞くので、具体的なコード*1を交えた説明になっています。 注意事項としては、CQRS/Event Sourcingには実装コストが掛かります*2。それほどのコストを掛ける価値があるシステムなのかよく考えてくださいというのは前提としてありますので、その点は注意して読んでください。

CRUDは最新状態に基づく設計スタイル

ドメイン駆動設計を前提にAPIサーバなどを設計した場合、APIはリクエスト・レスポンスでCRUDすることが多いです。この場合、アプリケーション内部でリポジトリを使ってドメインオブジェクトを取り出すロジックになるのでリポジトリ操作もリクエスト・レスポンスに対応します。そしてCRUDは最新状態に基づく設計スタイルです。

ショッピングカート*3に商品を追加するユースケースで考えると以下のようになります。ビジネスルールを守るのがドメインオブジェクトで重要な役割を持っています。

class AddCartItemUseCase(cartRepository: CartRepository) {

  def execute(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
    // 最新の集約(グローバルなエンティティ)をストレージから取得する
    val cart = cartRepository.findById(cartId)
    // ロジック実行: 予算超過ならカートオブジェクトが商品の追加を拒否する!
    val newCart = cart.addItem(itemId, num)
    // 更新された最新状態をストレージに保存する
    cartRepository.store(newCart)
  }

}

カート内の商品を取得するユースケースは以下のようになるでしょう。

class GetCartItemsUseCase(cartRepository: CartRepository) {

  def execute(userAccountId: UserAccountId): Vector[CartItem] = {
     cartRepository.findByUserAccountId(userAccountId).map{ cart => cart.items }
  }

}

質問にあったように、リクエストに対してレスポンスを返すスタイルになっています。いわゆるPULL側のAPIです。REST APIであればこれで何ら問題でしょう。

イベントをPub/Subする

今回の質問は、ストリーム接続したいということです。漠然と「ストリームで」という話ですが、扱うデータは何でしょうか。上記の質問の場合は、ドメインの最新状態をリクエスト・レスポンス型で返しますが、ストリームでは最新状態を返すのでしょうか?ほとんどケースではそうではなく、そのとき起こった出来事であるイベントをクライアントに返すことになります。

例えば、上記のAddCartItemUseCase#executeが実行されると、商品追加イベントが発生し、ストリームに接続するクライアントにそのイベントが通知されるという具合になります。サーバからクライアントへデータがPUSHされる形になり、通常はストリームの接続は永続的になります。

サーバ側のエンドポイントの実装ではイベントのサブスクライバを作ります。サブスクライバでカートイベントを受信したら、ストリームに流すだけです。以下はakka-httpでSSE(Server Sent Event)を行う場合の例です。イベントどのサーバからでも受信できるようにストレージからWrite/Readすることになると思います。

path("events" / LongNumber ) { cartId =>
    get {
      complete {
        cartEventSubscriber.subscribe(cartId)
          .map(event => ServerSentEvent(event))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }

さて、ドメイン状態が変化したときに、イベントをパブリッシュする実装はどうしたらよいか。ユースケースから上記で説明したストレージへイベントを書き込むことになると思います。

// CRUD前提のユースケース実装
class AddCartItemUseCase(cartRepository: CartRepository) {

  def execute(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
    val cart = cartRepository.findById(cartId)
    val newCart = cart.addItem(itemId, num)
    cartRepository.store(newCart) // (1)
    cartEventService.publish(CartItemAddedEvent(cartId, itemId, num)) // (2)
  }

}

これはいわゆるPub/Subの仕組みです。で、勘違いされている方が多いのですが、CQRS/Event Sourcingとは直接関係ないです。混同しないようにしましょう。上記はイベントを送受信しているだけでCQRS/Event Sourcingではありません。

ところで、上記コードの(1)の状態更新に加え(2)のイベントの書き込みが増えたわけですが、ここで違和感を覚えます。(1)と(2)ってストレージがまず違うので同一トランザクションにできません。つまり2相コミットとかダブルコミットというやつです。不整合が起きた場合、リカバリが面倒なやつです。例えば、(1)がコミットされたあとに、(2)が失敗したらどうなるか。(1)の完了した書き込みを削除するか、(2)の書き込みが成功するまでリトライするかです。複雑なリカバリ処理を自前で実装するハメになります。

ダブルコミットは避ける

なので、ダブルコミットはできる限り避けましょうです。以下は古典ですが読むことをお勧めします。

ameblo.jp

CQRS/Event Sourcingの考案者であるGregさんもダブルコミットを推奨していないようです:(興味あればこちらも参照ください)

The two-phase commit can be expensive but for low latency systems there is a larger problem when dealing with this situation. Generally the queue itself is persistent so the event becomes written on disk twice in the two-phase commit, once to the Event Storage and once to the persistent queue. Given for most systems having dual writes is not that important but if you have low latency requirements it can become quite an expensive operation as it will also force seeks on the disk.

”二相コミットはコストがかかりますが、低レイテンシのシステムでは、この状況を扱う際にはより大きな問題があります。一般的に、キュー自体は永続的なので、イベントは二段階のコミットで二度ディスク上に書き込まれます。ほとんどのシステムでは、二重書き込みを行うことはそれほど重要ではありませんが、もし低レイテンシの要件がある場合には、 ディスク上でのシークを強制的に行うことになるため、非常に高価な操作になる可能性があります。”

The database would insure that the values of sequence number would be unique and incrementing, this can be easily done using an auto-incrementing type. Because the values are unique and incrementing a secondary process can chase the Events table, publishing the events off to their queue. The chasing process would simply have to store the value of the sequence number of the last event it had processed, it could even update this value with a two-phase commit bringing the update and the publish to the queue into the same transaction.

”データベースは、シーケンス番号の値が一意でインクリメントされることを保証しますが、これはAUTO INCREMENT型を使用して簡単に行うことができます。値が一意でインクリメントされているので、セカンダリプロセスはイベントテーブルを追いかけることができ、イベントをキューに公開することができます。追いかけるプロセスは、単に最後に処理したイベントのシーケンス番号の値を保存しておく必要があります。 この値を更新するには、二段階のコミットで更新とキューへの公開を同じトランザクションにすることもできます。”

ではどうしたらよいか。(1)と(2)は同時に二つ処理せずに、(2)のイベントだけを書き込むようにします。(1)の状態は(2)のイベントを基に別のプロセスで作り出します。これがCQRS/Event Sourcingで、すべての状態はイベントから導出できるようにします。やっと本題。

しかし、以下の(0)から(1)の処理は、最新状態に基づいているのでそのままでは使えません。設計を見直す必要があります。

def addCartItem(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
  // val cart = cartRepository.findById(cartId) // (0)
  // val newCart = cart.addItem(itemId, num)
  // cartRepository.store(newCart) // (1)
  // イベントの書き込みはこれでよいが、↑の処理をどうしたらよいか 
  cartEventService.publish(ItemAdded(cartId, itemId, num)) // (2)
}

Event Sourcingでプログラミングモデルがどう変わるか

前述しましたが、CQRS/Event Sourcingは、簡単にいうとイベントから状態を導出するアーキテクチャです。絵で説明したほうがわかりやすいので結論となる図は以下。どーん。「え、複雑だなぁ」と思いますよね。単純なCRUDよりはそりゃ難しくなりますよ…。その代わり耐障害性やスケーラビリティが向上するのです…。一つのアプリケーションのように見えますが、コマンド側とクエリ側は分離してデプロイすることが可能です。

f:id:j5ik2o:20200916090416p:plain

イベントからドメインオブジェクトをリプレイする

ジャーナルDBと呼ばれるデータベースに、カートオブジェクトが発行したカートのイベント(以下、カートイベント)が永続化されます。このイベントを使ってコマンド側にドメインオブジェクトを、クエリ側にリードモデルを構築します。(リードモデルの形式は何でもOKです。リードDBはジャーナルDBと物理的に分けるかどうかはここでは問いません。まず概念的に理解したほうがいいでしょう)。つまり状態といえるものが2種類あるわけです。リードモデルは後述しますが、まずはドメインオブジェクトから。

以下のように、永続された複数のイベントがあれば、最新のカートオブジェクトを作る(リプレイ)ことが可能です。(1)の部分でイベントを読み込み、(2)で最新のカートオブジェクトを作り出します。

def addCartItem(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
  val allEvents = cartEventService.getAllEventsById(cartId) // (1)
  val cart = replayFunction(allEvents) // (2)
  val itemAdded = cart.addItem(itemId, num)
  cartEventService.publish(itemAdded)
}

スナップショットでリプレイ時間を短縮する

前述のコードみてこれはひどい設計だと思ったのではないでしょうか。わかります。僕も最初そう思いました…。容易に想像がつきますが、永続化されたカートイベントの件数に比例してリプレイのパフォーマンスが悪化します。なので、対策として、永続化するイベントN件ごとにカートオブジェクトの状態をスナップショットDBに保存しておき、カートオブジェクトをリプレイする際に、最新スナップショット+それ以降に発生した差分イベントを読み込んで、高速にリプレイします。つまり、N件以上イベントがあっても、最新のスナップショットと最大N - 1件分の読み込みで済ませることができます。とはいえ、Nを小さくすると読み込む差分イベントが少なくなる一方でイベント書き込み時のスナップショットを更新する頻度も高くなります。諸刃の剣ではありますが、バランスをうまくとる必要があります。Akka(akka-persistence)でもサポートされている機能でS3やDynamoDBをスナップショットDBとして使うことができます。

以上のことを踏まえると以下のようなイメージになります。前提としてイベントやスナップショットには連番(seqNr)が振ってあるものとします。まず、最新のスナップショットとそのスナップショットのseqNr以降の差分イベントを取得します。それらを使って最新のカートオブジェクトをリプレイします。そしてオブジェクトオブジェクトの振る舞いを実行します。新しいカートオブジェクト、イベントを得ます。イベントはエンキューされますが、seqNrが1000件で割り切れるときに新しいカートオブジェクトを最新のスナップショットとして保存します。

def addCartItem(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
  val snapshot = loadSnapshot(cartId)
  val events = cartEventService.getEventsByIdWithSeqNr(cartId, snapshot.seqNr + 1)
  val cart = replayFunction(snapshot, events)
  val (newCart, itemAdded) = cart.addItem(itemId, num)
  seqNr += 1
  if (seqNr % 1000 == 0 ) {
    saveSnapshot(newCart, seqNr)
  }
  cartEventService.publish(itemAdded, seqNr)
}

こうすることでリプレイ時間を短縮化できますが、CRUDのときと比較すると多くのデータを読み込んでいることがわかりますね…。もう少し効率的にならないかについては後ほど説明します。

追記:

ドメインオブジェクトはいちいちイベントからリプレイせずに、クエリ側のリードモデルをユースケースでうまく使えないの?と思うかもしれませんが、以下の問題があり難しいです。

  1. 非正規化されたリードモデルは正規化されたドメインモデルではないので、代替はそもそも難しい。代替したとしたらDDDではなくなりそう
  2. C/Qがモジュールとして分離できなくなる。CとQが分離されていないならもはやCQRSではない?違う呼び方がよさそうです。
  3. リードDBへの書き込み時間分、最新状態を取得する時間が遅延する。つまり常に過去のデータをみていることになってしまいます。これは許容できる場合がありそう、要件によりますね

リードモデルの構築

コマンド側のドメインモデルの概要はつかめたと思いますが、クエリ側のリードモデルを考えてみましょう。

このアーキテクチャパターンによって、イベントが唯一信頼できるデータソースとなり、そのイベントを基にクエリ側のリードモデルを構築します。イベントは不変という特徴を持ちます。なので、いつでも正しいリードモデルを作り出せます。極端な話、リードモデルの設計をミスってもイベントから作り直せます。もちろん、データ更新コストはかかりますが…。

コマンドとクエリで要件が非対称

そもそもなぜコマンドとクエリでモデルを分離するのか。理由は以下の表を参照してください。つまるところ要件が違うからです。人間が閲覧するようなシステム(SoEは特に顕著)だとまずクエリ側は非正規化データを求めます。

-コマンドクエリ
一貫性/可用性トランザクション整合性を使い一貫性を重視する結果整合を使い可用性を重視する
データ構造トランザクション処理を行い正規化されたデータを保存することが好まれる(集約単位など)非正規化したデータ形式を取得することが好まれる(クライント都合のレスポンスなど)
スケーラビリティ全体のリクエスト比率とごく少数のトランザクション処理しかしない。必ずしもスケーラビリティは重要ではない全体のかなりのリクエスト比率を占める処理を行うため、クエリ側はスケーラビリティが重要

例えば、以下のような集約(グローバルなエンティティ)がある場合でも

  • Employee { id, name, deptId }
  • Department { id, name }

画面は以下のように集約を二つ結合したようなデータを求めます。deptIdだけではどの部署か分からないから名前も必要なのです。*4

  • Employee { id, name, deptId, deptName }

ドメインオブジェクトの構造にインパクトを与えるのは、このような正規と非正規の構造的な非対称性です。考えてみるとわかりますが、トランザクション処理と検索・レポートを両立するモデルの実現も理解も難しいのです。

さて、コマンドとクエリを分離しないと実装上でどんな問題が起きるか気になるところですが、以下のようなものがあります。

  • リポジトリのクエリメソッドが複雑になる
  • N+1クエリが発生しやすい
  • ドメインオブジェクトからDTOへの変換が非効率

簡単に以下に説明します。

クエリ要件をリポジトリで満たそうとしてメソッドが複雑になる

集約(グローバルなエンティティ)は識別のためにIDを持つため、IDから集約本体を引き当てることができます。これはある意味正引きです。

val employee = employeeRepository.findById(employeeId)

しかし、集約の他の属性(名前や他のIDなど)を使って集約(単体もしくは複数)を引き当てたい場合があります。セカンダリインデックスを使うような逆引きですね。

以下のようなコードを書いたことがありませんか? 僕は散々書いてきました。

val employees = employeeRepository
  .findByDeptIdsWithEmpNamePatterns(deptIds, empNamePatterns)

ドメインの振る舞いを起こすためというより、クライアントにクエリのレスポンスを返すためによく使っていました。これは本当にドメインの責務でしょうか。ドメインは振る舞いを起こすことが責務と捉えると、ドメインの振る舞いが伴わないこのようなリポジトリメソッドは関心が分離できていないのかもしれません。前述した GetCartItemsUseCaseクラスの実装も本当にリポジトリの責務でいいか考える必要がありそうです。本当に集約をそのまま返すならば問題ないかもしれません。次のN+1クエリを含むような問題に発展する可能性がある場合は要注意です。

リポジトリでレスポンスを組み立てるとN+1クエリが発生しやすい

APIのレスポンスでは非正規化データを返すことがほとんどです。例えば、ホテルの予約情報一覧を作るために、予約集約に関連するホテル集約、顧客集約を解決しなければならないことがあります。ここでもドメインの振る舞いは起きません。クエリだからです。これは本当にリポジトリがやるべき仕事でしょうか。僕は過去にDBAに申し訳ないと思いながらこういうN+1が大量に発生するコードを書いていました…。そもそもレスポンスとして表形式を期待するならSQLとRDBにやらせるべきではないでしょうか。

reservationRepository.findByIds(ids).map { reservation => 
  val hotel = hotelRepository.findById(reservation.hotelId)
  val customer = customerRepository.findById(reservation.customerId)
  new ReservationDto(reservation, hotel.name, customer.name)
}

ドメインオブジェクトからDTOへの変換が非効率

APIのレスポンスでは集約の一部だけを求めることがあります。以下は恣意的な例ですが、UIに併せて顧客名一覧を返す処理です。リポジトリで得た集約の大部分を捨てて名前だけのリストを作ります。クエリ要件を満たすために大部分のリード結果が捨てられることになります。

val customerNames = customerRepository.findByIds(ids).map { customer =>
  customer.name
}

もちろん、概念モデルの粒度が大きいとI/Oコストも比例して大きくなるので、まず先に概念の大きさに目を向けるべきですが、コマンド側のドメインでクエリを頑張りすぎるとこういうことになります。

イベントからリードモデルを作る

RDBにリードモデルを構築するなら、以下のように永続化されたイベントを受信して、SQLを実行するだけです(実際にはこのようなストリーム処理は単発ではなく後に発生するワークロードに備えて常時起動させる必要があります)。これはこれで簡単ですが、cartIdを指定して読み込むので、スケールしにくいです。具体的なIDを指定しないので、ある程度の並列度を維持し永続化されたイベントを全順序に読み込む必要があります。また、こういった動作をするコンシューマは障害発生時のリバランスが必要になります。僕のお勧めはKafkaを使うことです。これはこれで一本ブログ記事が書けるぐらいの知識量なので、別の機会に触れます。

cartEventSubscriber.subscribe(cartId).runWith(Sink.foreach{
  case e: CartCreated => insertCartTable(e)
  case e: CartItemAdded => insertCartItemTable(e)
  case e: CartItemNumUpdated => updateCartItemTable(e)
  case e: CartItemRemoved => deleteCartItemTable(e)
  case e: CartRemoved => deleteCartTable(e)
})

まとめ。そして課題はまだある

ということでだいたいの雰囲気はつかめたのではないかと思います。とはいえ実装するうえではまだ課題があります。

たとえば、addCartItem内部の処理をもう少し効率化できないかの課題については、まさに分散システムの問題に直面します。

考えられる対策としては、カートオブジェクトをユースケースのスコープから外に出して、ワークロードがある間だけ起動させてキャッシュさせておく方法があります。リクエストごとに集約をリプレイしなくなるのでオーハーヘッドが軽減します。が、これは自前で実装するのは大変です。たとえば、ウェブサーバがスケールアウトして、ロードバランサから同じカートID向けの異なるリクエストが、複数サーバに飛んだこと想定してみてください。同一カートオブジェクトが別々のサーバでリプレイされ、異なるコマンドが受理されることで、同一IDなのに別々の状態が作り出されてしまいます。ある意味スプリットブレインな状態になります。

f:id:j5ik2o:20200916113429p:plain

下図はイメージなので全然正確じゃないという前提ですが、前述のようなスプリットブレインにならないようにするには、集約(グローバルなエンティティ)を以下のようにシャーディングして、コマンドをルーティングするとよいわけです。つまり、同一IDの集約はクラスタ全体で1個しかないように配置すればよいでしょう…。クラスタ上で脳みそが一つしかないのでスプリットしようがないという話です。と、ここまで考えてめちゃくちゃ大変だと想像できたと思います。さらにサーバが故障した場合に別サーバに集約をテイクオーバさせるなど自前で実装したくない…。なので、AkkaやErlangなど分散システムのフレームワークなしでこういうことは辞めましょう…。AkkaではActorという軽量プロセスをクラスタ上に分散させることができます。こういう基盤なしに無茶は辞めよう…。*5

f:id:j5ik2o:20200916165624p:plain

興味があれば以下参照。

akka.io

正直この分野は沼感がありますが、参考になれば幸いです。

*1:とはいっても概念を説明するための疑似コードだと思ってください

*2:CQRS/Event Sourcingそのものというより、分散システムに起因する難しさですが…

*3:データベースに永続化される前提のカートと思ってください

*4:システムがユースケースのアクターならこういう考慮はいらないと思いますが、SoEだとどうしてもこういう要求は発生します

*5:分散システムに関係する話は途端に難しくなりがち。わかりやすく書いたつもりですが、十分に伝わらないかもしれません…ご容赦を…

外部キー制約は何も考えずに適用するとよくない

このブログが話題になってますね。制約を付けること自体はよいことだけど、無目的に適用すると害も生じると思います。 無目的という言い方はおかしいな…。外部キー制約をどのように使えばいいのか、逆にどんなときに使うとまずいのかを考えてみたいと思います。

tech.tabechoku.com

例えば、これ。外部キー制約はできるだけ付けるとか、何も考えずに付けるとよくないと思います。

外部キー制約は、可能な限りつけるようにしています。 DBが別れている場合、外部キーはもちろん貼れないのですが、そうでない場合はとにかく何も考えず貼っています。

データベース設計の際に気をつけていること - 食べチョク開発者ブログ

テーブル設計をシミュレーションする

いいたいことの結論はこれ。以上終了なのですが、もう少しわかりやすく書いてみよう。

テーブル設計

以下のテーブルがあるとします。商品の売上を管理するテーブルです。とりあえず、何も考えずに外部キー制約をすべてに適用しています。(論理削除ではなく一旦物理削除で考えましょう。論理削除はこういった境界分析の邪魔になりますので)

  • 売上テーブル
    • 売上ID(PK)
    • 売上日時
  • 売上詳細テーブル
    • 売上詳細ID(PK)
    • 売上ID(FK)
    • 商品ID(FK)
    • 数量
  • 商品テーブル
    • 商品ID(PK)
    • 商品名
    • 単価

f:id:j5ik2o:20200616103927p:plain

想定ユースケース

  • 商品情報を登録・更新・削除する
  • 売上を登録・更新・削除する
    • 売上には売れた商品、数量、金額の売上詳細の概念が含まれる

強い整合性と弱い整合性

ユースケースから考えて、整合性の境界を考えます。整合性には強いものと弱いものがあります。強い整合性は作成・更新・削除するときに一緒に行います。これは「トランザクション整合性」と呼ばれることがあります。弱い整合性は「結果整合性」と呼ばれることがあります。では、上記のテーブルで考えていきます。上記の図の点線はこの整合性の境界範囲を示しています。仮に全部が一つの境界内にあると考えシミュレーションします。

売上・売上詳細のトランザクション境界と外部キー制約

売上を登録するときに、一緒に個々の売上詳細を登録・更新・削除(物理削除)します。トランザクション整合性の境界(トランザクション境界)はどこからどこまでがよいでしょうか。売上には売上詳細の概念が含まれるとしているので、売上と売上詳細は同じ境界のほうが都合がよさそうです。つまり、売上と売上詳細が更新されるときは別個ではなく、不可分な一塊として扱われます。例えば、売上が先に作られて、売上詳細が後から作られることはありません。売上詳細だけが先に削除されることもなく、一緒に削除されます。

さて、売上と売上詳細は不可分な一塊です。ある売上詳細Aが存在するとき、参照する売上Bは必ず存在します。つまり売上詳細Aの売上ID(FK)は存在する売上BのIDを参照します。売上Bが存在しないとき売上詳細Aも存在しません。この外部キー制約は機能しそうですね。データを保護できそうです。

売上・売上詳細と商品は同じトランザクション境界か?

ここからが問題です。商品ID(FK)はどうなんでしょうか。ここに外部キー制約があっても本当に大丈夫でしょうか?ユースケースを見るかぎり、商品と売上(売上詳細を含む)は別々に作成・更新・削除されます。存在する売上詳細Aが削除されるとき、参照する商品Aはまだ削除されません。売上詳細Aの商品ID(FK)から商品AのIDに外部キー制約があると、売上詳細Aは削除できません。また、 ←(この表現は間違いでした。参照している側は削除できますね) 。外部キー制約の関係上、売上詳細が存在することで、商品単体での削除ができません。ユースケースを満たさなくなるので、同じトランザクション境界にできません。(え、普通は商品は削除しないだろうがという方、分かります。とりあえず最後まで読んでほしい)

なので、以下のように境界が別個になるはずですが、外部キー制約があるほうが問題になります。

f:id:j5ik2o:20200616104018p:plain

整合性境界は物理削除で考えたほうがラク

「論理削除なら整合性の境界とか考える必要ないのでは?」という想定質問があるのですが、そんなことはないです。論理削除でも更新する境界がどこからどこまでかを考える必要があります。売上を消したつもりが商品も消されたら問題ですから。削除フラグが連動する範囲が整合性の境界になります。しかし、これを頭の中で整理して考えることが難しい。なので、分析時は物理削除で考えたほうが圧倒的にラクです。

売上・売上詳細と商品は結果整合性を使う

ということで、この場合どう考えるとよいか。トランザクション境界が異なるということはライフサイクルの境界が異なるわけです。売上詳細から参照する商品IDは存在するかもしれないし、すでに削除されているかもしれません。こういった弱い整合性を結果整合性といいます。ここには外部キー制約は適用できません。

また、アプリケーションロジックで以下の売上合計金額を算出する場合、売上詳細からみて商品が結果整合性では、再計算できなくなってしまいます。

  • 売上合計金額=すべての売上詳細金額の合計
  • 売上詳細金額=商品IDの単価×数量

これに対処するには、売上・売上詳細の境界内に再計算するための材料を保持する必要があります。もしくは計算結果を保持する方法もありそうですね。前者だとしたら、更新時にそのときの商品IDの単価をコピーする必要があります。(場合によっては商品名が更新されたり削除されるかもしれないので、商品名のコピーも必要になるかもしれません)

  • 売上テーブル
    • 売上ID(PK)
    • 売上日時
  • 売上詳細テーブル
    • 売上詳細ID(PK)
    • 売上ID(FK)
    • 商品ID(※)
    • 単価(※)
    • 数量
  • 商品テーブル
    • 商品ID(PK)
    • 商品名
    • 単価

f:id:j5ik2o:20200616104057p:plain

これは集約という考え方

このような考え方は、ドメイン駆動設計の集約 を学ぶとわかるようになるはずです。今回はテーブル設計の視点から説明してみましたが、ドメインモデルが整合性の境界を持つと考えるとわかりやすくなるかも。DDDの観点で簡単に言えば、強い整合性境界である集約の内側では外部キー制約に意味があり、集約の外には外部キー制約は不要です。興味がある方はぜひ学んでみたらいいと思います。

エリック・エヴァンスのドメイン駆動設計

エリック・エヴァンスのドメイン駆動設計

  • 作者:Eric Evans
  • 発売日: 2013/11/20
  • メディア: Kindle版
実践ドメイン駆動設計

実践ドメイン駆動設計

ということで、整合性の境界を無視して、外部キー制約を適用することはできません、ということで。

併せて読みたい:

kbigwheel.hateblo.jp

追記:

ブコメで例が悪いと指摘があったので、少し再考してみた。わかりにくいことは認める。言いたいことは結果整合性を求める用途では外部キー制約は使えない。トランザクション整合性のある境界内では使えるという主張だった。

で、商品=今使える商品という意味で捉えてほしい。商品が廃盤になるとやはり物理削除して、過去に使えてた商品テーブルに移動するかもしれない。実際は、こんな難しいことをせずに、商品は削除せずに「廃盤状態」に遷移できるようにする方法もある。が、何が正しいかは要件による。ここでは仕様の善し悪しを議論したいわけではなく、仮に商品を消すことがある場合、売上詳細から商品ID(FK)は強い整合性を望むので使えないという意図だった。つまるところ、ライフサイクルの境界が異なるのだから、商品IDから商品に到達できるかはそのとき次第なので、以下のような考慮が必要かもしれない、ということ。

とはいえ、もっとよい事例はないかと考えた。Slackのようなチャットサービスで、あるアカウントが投稿したメッセージは他のアカウントでも読める。しかし、ある投稿者のアカウントIDが退会したケースを考えてみよう。忘れられる権利に対応するために、アカウントは物理削除しなければならないとする(物理削除好きやな。まぁ例示のための仮定です)。しかし、退会アカウントのメッセージはタイムラインで、”退会済みアカウント”が投稿したメッセージとして、他のアカウントから閲覧できるものとする。この場合でも、メッセージとアカウントはライフサイクル、整合性の境界が独立している。メッセージからみてアカウントはあるかもしれないしないかもしれない。弱い整合性。この場合は、メッセージのアカウントIDからアカウントのアカウントIDに外部キー制約は適用できない。

  • アカウント
    • アカウントID(PK)
    • メールアドレスなどの個人情報など
  • メッセージ
    • メッセージID(PK)
    • スレッドID
    • メッセージ内容
    • アカウントID(必ずしも存在するとは言えないので、外部キー制約は適用できない)
    • 作成日時
    • 更新日時

CQRS/ESによって集約の境界定義を見直す

peing.net

メッセージングシステムのお題のようです。面白そうなのでちょっと考えてみよう。

問題提起

集約候補が以下の3つ。

  • ユーザー
  • 企業
  • スレッド
    • メッセージ

スレッド集約はメッセージを複数保持するようです。

  • 1000件のメッセージを保持するスレッド集約を更新した際、1000件のアップデートが行われる
  • スレッド集約内部で更新された属性を把握していない場合は、リポジトリでは全メッセージ分の更新となる。これを避けるための仕組みはどう実装するのか?

ということが指摘されている。まぁわかります。これはCQRS/ESなら解決できるよと言ってみる

問題の分析

で、僕ならどう考えて実装に落とすかつらつらまとめてみよう。CQRS/ES前提です。Akkaの成分は少なめでScalaの擬似コードで解説します。コードはコンパイルしてないので…おかしなところあるかも。

問題はスレッド集約がメッセージの集合を保持した場合、更新コストが大きくなりがち。さらに差分更新しようとしても集約の内部実装が複雑になるのではという論点。

まず考えるのは、スレッドとメッセージの関係性に強い整合性は必要か? ということです。Noならメッセージを別の集約として切り出すのがよいとは思います。おそらく問題設定としてはYesになっている気がしますが…。 仮に契約プランによって書き込めるメッセージ数が変わるという場合は、スレッドとメッセージには強い整合性が必要かもしれません。別別の集約にした場合、トランザクションが別になり結果整合性になるため、インスタンス数(レコード数)を厳密に制御することはできませんね。ということで、強い整合性が必要という前提で進めます。

あと、この問題だけではスレッド集約がどのような使われ方をするのが厳密にはわかりません。つまり振る舞いがイメージできません。たとえば、スレッドにメンバーとして参加したアカウントだけがメッセージを閲覧・追加・更新できるというシナリオがあればスレッド集約の構造が明確になります。メンバーの考慮が必要なスレッド集約は以下のようなイメージになるでしょう。

case class Message(id: MessageId, text: MessageText, senderId: AccountId, createdAt: Instant, updatedAt: Instant)

case class Messages(values: List[Message]) {
  def add(message: Message): Messages = ???
  def update(message: Message): Messages = ???
}

case class Members(values: List[AccountId]) {
  def add(accountId: AccountId): Members = ???
  def remove(accountId: AccountId): Members = ???
  def contains(accountId: AccountId): Boolean = ???
}


class Thread(id: ThreadId, members: Members, messages: Messages, createdAt: Instant) {

  def addMember(account: AccountId): Thread = copy(members = members.add(accountId))

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = { 
    if (members.contains(senderId)) {
      Right(copy(messages = messages.add(Message(messageId, messageText, senderId, Instant.now, Instant.now))))
    } else {
      Left(new AddMessageError)
    }
  }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, sender: AccountId): Either[ThreadError, Thread] = 
    if (members.contains(senderId)) {
      Right(copy(messages = messages.updateMessage(Message(messageId, messageText, senderId, Instant.now, Instant.now))))
    } else {
      Left(new UpdateMessageError)
    }
    
  def getMessages(accountId: Account): Either[ThreadError, Messages] = 
    if (members.contains(accountId)) {
      Right(messages)
    } else {
      Left(new ReadMessagesError)
    }

}

課題に戻りますが、このクラスをリポジトリでCRUDすることを想像してみましょう。thread.messagesが1000件ある場合です。

for {
thread <- threadRepository.findById(threadId)  // 1000件メッセージを持つスレッドを取得
newThread <- thread.addMessage(Message(...)) // 状態変更
_ <- thradRepository.store(newThread) // 1001件メッセージを持つスレッドを更新
} yield ()

スレッドのどのフィールドが更新されたかわからない場合は、スレッドのタイトル、説明、メッセージの本文などをすべての情報をSQLなどを使って更新するのでしょうか? これはあまりに非効率ではないか…。それを回避するにはThread集約内部にどのフィールドをDB上で更新すべきか把握する仕組みが必要なのではないかという話ですね。 あと、そもそもfindByIdの時点で、本文も含む1000件のメッセージをDBから取得するのは効率がよいとはいえませんね。

CQRS/ES流の設計

上記の問題をCQRS/ESで改善できるか考えてみよう。

コマンド側

集約はCQRSのC(コマンド)側に所属しますが、役割は副作用を起こすことを目的にします。 たとえば、メンバーの追加・削除、メッセージを追加・更新などです。メンバーの確認はメッセージの追加・更新に必要なので含めます。これら以外はクエリ側の責務とします。

class Thread(id: ThreadId, memberIds: MemberIds, messages: MessageIds, createdAt: Instant) {

  def addMember(account: AccountId): Thread = copy(members = memberIds.add(accountId))

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      Right(copy(messages = messageIds.add(mesageId)) // 本文は保存しない
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, sender: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      Right(this) // スレッドではメッセージIDしか管理しないのでメッセージ本文を更新できない
    } else {
      Left(new UpdateMessageError)
    }
  

}

本文を持たないので大分軽量化できたのではないでしょうか?(ただ、内部にID集合は保持するので無限に保持するわけには行かないとは思います。上限値は必要だと思います) さて、クエリ側にどうやって連携するかですね。メッセージの本文も読めないと意味がないですからね。

ということで、擬似コードですが、以下のように副作用が起きたときにイベントをDBに追記します。このイベントは別のプロセスがコンシュームしてリードモデルを作る元ネタになります。

sealed trait ThreadEvent
case class MemeberAdded(threadId: ThreadId, accountId: AccountId, occurredAt: Instant) extends ThreadEvent
case class MessageAdded(threadId: ThreadId, messageId: MessageId, messageText: MesssageText, senderId: AccountId, occurredAt: Instant) extends ThreadEvent
case class MessageUpdated(threadId: ThreadId, messageId: MessageId, messageText: MesssageText, senderId: AccountId, occurredAt: Instant) extends ThreadEvent

class Thread(id: ThreadId, memberIds: MemberIds, messages: MessageIds, createdAt: Instant) {

  def addMember(accountId: AccountId): Thread = {
     persistEvent(MemberAdded(id, accountId, Instant.now))
     copy(members = memberIds.add(accountId))
   }

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      persistEvent(MessageAdded(id, messageId, messageText, senderId, Instant.now))
      Right(copy(messages = messageIds.add(mesageId))
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      persistEvent(MessageUpdated(id, messageId, messageText, senderId, Instant.now))
      Right(this)
    } else {
      Left(new UpdateMessageError)
    }
  

}

書き込みはこれでよいとして、集約をDBから再生する際はどうなるのか。DBに保存してあるイベント列を順番に空のThreadに適用するだけです。 これも疑似コードで説明。この考慮はあくまでコマンドを実行するための前提を整えるためのものです。

class Thread(id: ThreadId, events: Seq[ThreadEvent]) {

  private var memberIds: MemberIds = MemberIds.empty
  private var messages: MessageIds = MessageIds.empty

  // イベントから最新状態を復元
  events.foreach{
    case MemberAdded(_, accountId, _) => members = memberIds.add(accountId)
    case MessageAdded(_, messageId, messageText, senderId, createdAt) => messages = messages.add(Message(messageId, messageText, senderId, createdAt)
    case MessageUpdated(_, messageId, messageText, senderId, updatedAt) => messages = messages.update(Message(messageId, messageText, senderId, updatedAt)
  }


  def addMember(accountId: AccountId): Thread = {
     persistEvent(MemberAdded(id, accountId, Instant.now))
     copy(members = memberIds.add(accountId))
   }

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      persistEvent(MessageAdded(id, messageId, messageText, senderId, Instant.now))
      Right(copy(messages = messageIds.add(mesageId))
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      persistEvent(MessageUpdated(id, messageId, messageText, senderId, Instant.now))
      Right(this)
    } else {
      Left(new UpdateMessageError)
    }
  

}

これでドメインを扱うコマンド側では、差分を表現するイベントをジャーナルとして追記保存すればいいことになります。また最新の集約の状態はコマンドを受け付けるために必要最低限のものしか持ちません。クエリをしないので、これで問題は起きません。当初想定していたモデルよりかなり小さくなります。コマンド側の集約はリプレイ後はルールをチェックしてパスしたらイベントを追記保存するだけなんです。

AkkaはEvent Sourcingを標準サポート

ここでは示してませんが、イベントの列が巨大だとリプレイに時間が掛かります。例えばイベントN件に1回スナップショットを保存しておき、リプレイ時に最新スナップショット+差分のイベントでリプレイを高速化することもできます。とはいえ、リクエストのたびにイベント列を読み込み・適用するのは効率が悪そうです。Akkaのアプローチでは軽量プロセスであるActorとしてスレッド集約を実装します。上記のクラスがActorとして実装されます(集約アクター)。メソッドコールがメッセージパッシングに変わります。すでにイベント列がある集約アクターはAkkaがストレージからイベントを読み込み、メッセージとして適用してくれます。また一定時間メッセージを受け付けていない集約アクターはランタイムから消えるように設定できます。まぁ、必要な機能が揃っているので確実に楽できます。

doc.akka.io

クエリ側の設計

クエリ側のリードモデルはどうやって作るのか? 仮に上記のイベントがDynamoDBに書き込まれるとして、パーティションキーはThreadId, ソートキーはイベント番号(akkaでは自動採番してくれます)、イベント本体はJSONなどで格納されるイメージとします。これをDynamoDB Streamsなどでコンシュームします。コンシューム部分はLambdaでもKCLでもよいです。以下を参考にしてみてください。

docs.aws.amazon.com docs.aws.amazon.com

この方法でイベントが時系列順に手に入ります。リードモデルを作るにはこのイベントを順番にリードDBに反映します。 仮にRDBにTHREADテーブル、MEMBERテーブル、MESSAGEテーブルがある場合は以下のような処理になります。 consumeEventsByThreadIdFromDDBStreamsはDynamoDB Streamsからレコードを読み込んでイベントを返す関数です。

consumeEventsByThreadIdFromDDBStreams.foreach{ 
  case ev: ThreadCreated => insertThread(ev) // 実際にはスレッドが作られたときのイベントも永続化する必要がある
  case ev: MemberAdded => insertMember(ev)
  case ev: MessageAdded => insertMessage(ev)
  case ev: MessageUpdated => updateMessage(ev)
}

最終的にリードモデルはレスポンスの形式に併せて定義するとよいでしょう。

case class ThreadDto(id: Long, ...)
case class MemberDto(id: Long, accountId: Long, createdAt: Int)
case class MessageDto(id: Long, threadId: Long, text: String, senderId: Long, createdAt: Instant, updatedAt: Instant)

リードモデルはDAO内部でSQLを使って、部分集合を取り出したりできます。リードモデルはお好きなように…。

val messages: Seq[MessageDto] = MessageDao.findAllByThreadIdWithOffsetLimit(threadId, 0, 100)

結局どうなるのか

最終的な疑似コードイメージです。 1000件のメッセージを保持するスレッドといってもIDしか持ちません。本文を持つより簡単にリプレイし一つのコマンドを付けて一つのイベントを永続化するだけです。1001件のメッセージは永続化しません。間違いなくスケールするのはこっちですね。

val thread = Thread(ThreadId(1L)) // 永続化されているイベント列があれば渡される。なければイベント列は空で作られる
thread.addMessage(...) // 1つのコマンドで1つのイベントが追記されるだけ。1001件も更新されません。

補足

上記の考え方がわかったとしても、CQRS/ESは自前でやるのは大変です。 たとえば、以下の処理が複数のサーバで起こった場合、ロックがなくても大丈夫でしょうか?ダメそうですねw そう書き込むためのトランザクション管理が必要なのです…。

val thread = Thread(ThreadId(1L))
thread.addMessage(...) 

AkkaではThread部分がActorになりますが、複数のサーバで起動しても、ユニークになるように分散できます。つまり分散システム上で同じIDのActorがたかだか一つになるように配置してくれるので、トランザクションの問題は起きません。 こういったことを考えると、CQRS/ESをサポートしたフレームワークやライブラリを使うほうが圧倒的に楽です。Akka お勧めです。

akka.io

akka-cluster スプリットブレインリゾルバ OSS実装一覧

Akkaクラスターがネットワーク分断に遭遇した場合に、UnreachableメンバーをDown状態に遷移させるためのリゾルバのOSS実装を以下にまとめる。

ちなみに、このリゾルバがない場合はUnreachableのままだとリーダアクションが取れずにクラスターが機能不全状態なる。かといってAutodownを有効にするとスプリットブレインが発生する可能性がある。これを解決するのがスプリットブレインリゾルバで商用版はLightbend社から提供されている。スプリットブレインリゾルバの仕様はこちら

git repo stars 備考
TanUkkii007/akka-cluster-custom-downing 131 OldestAutoDowning, QuorumLeaderAutoDowning, MajorityLeaderAutoDowningに対応している。OldestAutoDowningには不具合があるようだ。要修正
mbilski/akka-reasonable-downing 85 Static QuorumによるDowningにしか対応していない
arnohaase/simple-akka-downing 16 static-quorum, keep-majority, keep-oldestに対応している
guangwenz/akka-down-resolver 5 Static QuorumによるDowningにしか対応していない