かとじゅんの技術日誌

技術の話をするところ

CQRSはなぜEvent Sourcingになってしまうのか

CQRSはなぜEvent Sourcingになってしまうのか、まとめてみたいと思います。

なぜまとめるか、それはCQRSにとってEvent Sourcingはオプションだと誤解されている方が多いからです。この記事を書いてる本人も最初はそう思っていましたが、実際に開発・運用を経験してみるとCQRSにとってEvent Sourcingはほぼ必須で、認識を改めるべきだと気づきました。なので、原義に基づいたうえで、Event SourcingではないCQRSがなぜよくない設計になるのか解説します。

その前に松岡さんの記事について。

CQRSの領域ではモデルを完全に分ける

松岡さんの記事には”CQRSはモデルを完全に分ける必要はない”と書かれていますが、知識がないと誤解しがちですが文字のまま意味を取るといけません。こちらの言及は、システムのうち、モデルをC/Qに分割するCQRS領域とモデルを分割しない非CQRS領域に分けることができて、CQRSを部分導入できますよいう意味だと思います(下図参照)。モデルを分けなくていいのはあくまで非CQRS領域です。間違っても「CQRSはモデルを分割しなくてもいいんだ!」という解釈をしてはいけません。その解釈はもはやCQRSではありません。

部分的導入について

重要なことですが、CQRSは部分的な導入が可能です。 つまり、「参照用モデルと更新用モデルを完全に分ける必要はない」ということです。 どちらかというと、「必要なところだけ参照に特化したモデルを導入する」といった使い方が適切でしょう。

CQRS実践入門 [ドメイン駆動設計] - little hands' lab

f:id:j5ik2o:20200918095049p:plain

CQRSはモデルだけでなくモジュールも分割する

さらにモデルを分けるだけではなく、そのモデルを内包するトップレベルのモジュール同士を分離というか、隔離しなければなりません。という話はこちらの記事を参照してください。

blog.j5ik2o.me

シンプルなCQRSの概念図は以下です。上述したようにC/Qは隔離されています。データベースのところはRDB前提で書いていますが、実際の運用ではこうしません。想像しやすいように書いてるだけで、厳密には書いてないと思ってください。

f:id:j5ik2o:20200918154402p:plain

Event SourcingではないCQRSを考える

この記事では、リポジトリからリード専用DAOまでの設計を考えていきましょう。最初はEvent Sourcingを考えずに素直に状態に基づくCRUD脳で考えていきます。

具体的なコードのイメージは以下の記事を読むといいです。

blog.j5ik2o.me

対象ドメインはショッピングカート前提で考えます。

コマンドプロセッサ実装

コマンドプロセッサの実装例です。コマンド側のユースケースクラスだと思ってください。 カートオブジェクトとリポジトリの操作をCRUDで考えると以下のように実装になるでしょう。

class AddCartItemCommandProcessor(cartRepository: CartRepository) {

  def execute(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
    // 最新の集約(グローバルなエンティティ)をストレージから取得する
    val cart = cartRepository.findById(cartId)
    // ロジック実行: 予算超過ならカートオブジェクトが商品の追加を拒否する!
    val newCart = cart.addItem(itemId, num)
    // 更新された最新状態をストレージに保存する
    cartRepository.store(newCart)
  }

}

まずC側のテーブル(A)とQ側のテーブル(B)は形式が異なるのですが、まずリポジトリによってカートオブジェクトは以下のテーブルに永続化されます。 (カート内のアイテムは別テーブルにマッピングされますが、リポジトリでfindAllByItemIdのような逆引き検索がなければ、子テーブルは不要かもしれませんが、一旦わかりやすさを優先して子テーブルを設けています)

  • C側のテーブル(A)
    • カートテーブル
      • カートID(PK)
      • 顧客アカウントID
      • 上限予算金額
      • 作成日時
    • カートアイテムテーブル
      • カートアイテムID(PK)
      • カートID(FK)
      • 商品ID
      • 数量
      • 作成日時

このように永続化されますが、ビジネスロジックで計算する値もあります。カートの合計金額です。 あと、商品の単価は外部のオブジェクトから提供されるので、カートオブジェクト内部で保持してません。

case class Cart(id: CartId, userAccountId, upperLimitPrice: Price, items: CartItems, createAt: Instant) {
  // 合計金額計算
  def totalPrice(priceResolver: ItemId => Price): Price = {
    items.fold(Price.zero){ (t, item) => t + item.price(priceResolver) }
  }
}

case class CartItem(id: CartItemId, itemId: ItemId, quantity: Quantity, createAt: Instant)  {
  // 価格計算メソッド
  def price(priceResolver: ItemId => Price): Price = priceResolver(itemId) * quantity
}

クエリプロセッサの実装例

クエリ側のユースケースクラス相当の実装です。 クエリ側のリード専用DAOとリードモデルであるDTOは以下のようなイメージになります。

class GetCartQueryProcessor(cartDao: CartDao) {

  def execute(cartId: Long): Vector[CartDto] = {
     cartDao.findByCartId(cartId)
  }

}

case class CartDto (
  id: Long,
  userAccountId: Long,
  userAccountName: String,
  upperLimitPrice: String,
  totalPrice: String
  items: Seq[CartItemDto]
)

case class CartItemDto(
  id: Long,
  itemId: Long,
  itemName: String,
  unitPrice: String
  quantity: Long,
  price: String)

対応するテーブルはDTOと同形で、C側と比べて非正規型になっています。C型と似てるようで似てない。別物です。あとで説明しますが、(★)のところが厄介ですね。

  • Q側のテーブル(B)
    • カートテーブル
      • カートID(PK)
      • 顧客アカウントID
      • 顧客アカウント名(●)
      • 上限予算金額
      • 合計金額(★)
    • カートアイテムテーブル
      • カートアイテムID(PK)
      • 商品ID
      • 商品名(●)
      • 数量
      • 単価(★)
      • 価格(★)

補足:

(●)も面倒ですが、これは他の集約(エンティティ)のデータとして永続化されていて、SQLで解決できる可能性があります。

めちゃくちゃ単純ですが、これでC/Qが独立しています。が、C/Qを連携させなければ更新されたデータを読み込むことができません。上図の(C)の部分です。

コマンド側からクエリ側に変更をどう通知するか

はい。やっと本題です。コマンド側からクエリ側に変更をどう通知するか、(C)部分の実装について以下に挙げてみます。

  • テーブル(A)の更新トリガ時の変更を、SQLを使ってテーブル(B)を書き込む
  • プログラムで、テーブル(A)を読み込み、その結果をテーブル(B)に書き込む

テーブル(A)の更新トリガ時の変更を、SQLを使ってテーブル(B)を書き込む

トリガー例はあえて書きませんが、更新データを受信してテーブル(B)のレコードをまるっと全カラム書くとよいですね。欲を言えば、更新されたレコードだけを検知したいですが、全カラム更新するしかないと思います。まぁリポジトリで集約(エンティティ)を保存する時点で全カラム更新なら、ここで差分更新を頑張っても釣り合わないですね。と、C側で静的に保持されているデータをQ側に書くだけならトリガーでよさそうです。

問題は(★)の部分。(★)はC側のデータベース上にはありません。ドメインオブジェクトの振る舞いによって計算される値だからです。 まさかドメインロジックと同じ仕様のSQLを書くとかないですよね…。まぁ振る舞いのない貧血症オブジェクトしかないなら、特に問題はないでしょうね(皮肉) あ、例えトランザクションスクリプトであっても、ロジッククラスの助けなしに計算できない値があるとしたら?同じことですよね?

苦肉の策としては計算した結果もC側のテーブルに書き込むという方法です。カートオブジェクトだと、priceResolver引数が外部のオブジェクトなので、まずこれが永続化時にないとこういったことはできません。リポジトリのI/Fを def store(cart: Cart, priceResolver: ItemId => Price): Unitするなど設計に歪みがでることがあります。つまり、リポジトリ内部でビジネスロジックを起動するということです。リポジトリの責務違反…

そもそも、トリガーという時点でイベントに頼ってるような印象ですが、こういう計算で導出される値の扱いは難しいです。

プログラムで、テーブル(A)を読み込み、その結果をテーブル(B)に書き込む

(★)の問題があるから、一度ドメインオブジェクトに計算させてから、テーブル(B)に書けばよいことになります。これならQ側にもれなくデータを転送できそうです。が、変更のトリガーってどう検知しましょうか? まさか、C側のDBに変更されているかどうかわからないけど、ポーリングします?対象の集約はどうしますか?集約が大量にもあっても、それら全部ポーリングします?全然機能するイメージがないですね…。やっぱり最新状態を手に入れるにしても、更新イベントが必要なんです

補足:

Q側のテーブルをC側のVIEWにすればよいのでは?というアイデアもあると思いますが、(★)の問題はSQLで解決できません。結局ドメインオブジェクトに頼ることになります。

結局 Event Sourcingにたどり着く

上述した2パターンのよいところを組み合わせるとよさそうです。C側のテーブル(A)以外に更新イベントを通知するキューを用意します。以下のようになります。

f:id:j5ik2o:20200918165640p:plain

リポジトリ側でC側のテーブル(A)を更新する以外に、更新イベントを通知するためにキューにもイベントを書き込みます。そのうえで、リードモデル更新プロセスがそのイベントを受け取り、テーブル(A)から集約(エンティティ)を再現し、Q側のテーブル(B)に書き込めばよいです。まぁ、いちいちC側のテーブル(A) を読まずに、変更内容を更新イベントに載せたらどうかという発想もありますね。

一見、これで問題がないようにみえますが、C側のリポジトリは二つのストレージに書き込みを行っています。テーブル(A)はRDB、更新イベントを伝えるキューはRDBは不向きでしょう。AWSであればSQSのようなものを想像するでしょう。となると、同一トランザクションにはなりません。はい。高いコストを払う可能性がある、ダブルコミット問題が生じます。以下の記事でもダブルコミットは避けましょうという話をしました。

blog.j5ik2o.me

ダブルコミットを避けるためにイベントを真のデータソースにします。C側のドメイン状態はイベントから作るようにします。そして、C側のDBはもはやRDBである必要はありません。上記記事にも書きましたが、集約で発生するイベントを追記したり、集約IDごとのイベント列を読み込めれば十分だからです。実際はNoSQL(KVS)を使うことが多いです。AWSのDynamoDBであればDynamoDB Streamsからスケーラブルにイベントを読み込めます。これがEvent Sourcingですね。結局ここにたどり着きます。

f:id:j5ik2o:20200918170500p:plain

CQRSシステムのほとんどがEvent Sourcingを採用しているそうです(Lightbend社調べ)。その理由を分かっていただけたのではないでしょうか。