かとじゅんの技術日誌

技術の話をするところ

具体的な実装コードからEvent Sourcingを理解する

DDD Community JPのほうでCQRS/Event Sourcingについて少し盛り上がったので、どういう議論をしたかまとめるのと同時に補足も追加しました。ちなみに、Event Sourcingが主題ですが、CQRSも前提として関係します。その想定で読んでいただければと。

発端はこのツイート。

僕が引用したツイートは松岡さんの質問箱に対するリアクションです。その質問箱に寄せられた質問は以下。

ストリームを開いてから閉じるまでのデータが変化する毎にUIで表示したい場合、DDDではどのように設計したら良いでしょうか? DDDのリポジトリは1つのリクエストに対して1つのリクエストを返すイメージがあり、ストリームをどのような形で扱ったら良いのかつかめずにいます。

かなり抽象的な質問なのでいろいろ確認しないとわからないのですが、CQRS/Event Sourcingならどう解決するか考えてみました。こういった説明が抽象的でわかりにくいという声をよく聞くので、具体的なコード*1を交えた説明になっています。 注意事項としては、CQRS/Event Sourcingには実装コストが掛かります*2。それほどのコストを掛ける価値があるシステムなのかよく考えてくださいというのは前提としてありますので、その点は注意して読んでください。

CRUDは最新状態に基づく設計スタイル

ドメイン駆動設計を前提にAPIサーバなどを設計した場合、APIはリクエスト・レスポンスでCRUDすることが多いです。この場合、アプリケーション内部でリポジトリを使ってドメインオブジェクトを取り出すロジックになるのでリポジトリ操作もリクエスト・レスポンスに対応します。そしてCRUDは最新状態に基づく設計スタイルです。

ショッピングカート*3に商品を追加するユースケースで考えると以下のようになります。ビジネスルールを守るのがドメインオブジェクトで重要な役割を持っています。

class AddCartItemUseCase(cartRepository: CartRepository) {

  def execute(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
    // 最新の集約(グローバルなエンティティ)をストレージから取得する
    val cart = cartRepository.findById(cartId)
    // ロジック実行: 予算超過ならカートオブジェクトが商品の追加を拒否する!
    val newCart = cart.addItem(itemId, num)
    // 更新された最新状態をストレージに保存する
    cartRepository.store(newCart)
  }

}

カート内の商品を取得するユースケースは以下のようになるでしょう。

class GetCartItemsUseCase(cartRepository: CartRepository) {

  def execute(userAccountId: UserAccountId): Vector[CartItem] = {
     cartRepository.findByUserAccountId(userAccountId).map{ cart => cart.items }
  }

}

質問にあったように、リクエストに対してレスポンスを返すスタイルになっています。いわゆるPULL側のAPIです。REST APIであればこれで何ら問題でしょう。

イベントをPub/Subする

今回の質問は、ストリーム接続したいということです。漠然と「ストリームで」という話ですが、扱うデータは何でしょうか。上記の質問の場合は、ドメインの最新状態をリクエスト・レスポンス型で返しますが、ストリームでは最新状態を返すのでしょうか?ほとんどケースではそうではなく、そのとき起こった出来事であるイベントをクライアントに返すことになります。

例えば、上記のAddCartItemUseCase#executeが実行されると、商品追加イベントが発生し、ストリームに接続するクライアントにそのイベントが通知されるという具合になります。サーバからクライアントへデータがPUSHされる形になり、通常はストリームの接続は永続的になります。

サーバ側のエンドポイントの実装ではイベントのサブスクライバを作ります。サブスクライバでカートイベントを受信したら、ストリームに流すだけです。以下はakka-httpでSSE(Server Sent Event)を行う場合の例です。イベントどのサーバからでも受信できるようにストレージからWrite/Readすることになると思います。

path("events" / LongNumber ) { cartId =>
    get {
      complete {
        cartEventSubscriber.subscribe(cartId)
          .map(event => ServerSentEvent(event))
          .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      }
    }
  }

さて、ドメイン状態が変化したときに、イベントをパブリッシュする実装はどうしたらよいか。ユースケースから上記で説明したストレージへイベントを書き込むことになると思います。

// CRUD前提のユースケース実装
class AddCartItemUseCase(cartRepository: CartRepository) {

  def execute(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
    val cart = cartRepository.findById(cartId)
    val newCart = cart.addItem(itemId, num)
    cartRepository.store(newCart) // (1)
    cartEventService.publish(CartItemAddedEvent(cartId, itemId, num)) // (2)
  }

}

これはいわゆるPub/Subの仕組みです。で、勘違いされている方が多いのですが、CQRS/Event Sourcingとは直接関係ないです。混同しないようにしましょう。上記はイベントを送受信しているだけでCQRS/Event Sourcingではありません。

ところで、上記コードの(1)の状態更新に加え(2)のイベントの書き込みが増えたわけですが、ここで違和感を覚えます。(1)と(2)ってストレージがまず違うので同一トランザクションにできません。つまり2相コミットとかダブルコミットというやつです。不整合が起きた場合、リカバリが面倒なやつです。例えば、(1)がコミットされたあとに、(2)が失敗したらどうなるか。(1)の完了した書き込みを削除するか、(2)の書き込みが成功するまでリトライするかです。複雑なリカバリ処理を自前で実装するハメになります。

ダブルコミットは避ける

なので、ダブルコミットはできる限り避けましょうです。以下は古典ですが読むことをお勧めします。

ameblo.jp

CQRS/Event Sourcingの考案者であるGregさんもダブルコミットを推奨していないようです:(興味あればこちらも参照ください)

The two-phase commit can be expensive but for low latency systems there is a larger problem when dealing with this situation. Generally the queue itself is persistent so the event becomes written on disk twice in the two-phase commit, once to the Event Storage and once to the persistent queue. Given for most systems having dual writes is not that important but if you have low latency requirements it can become quite an expensive operation as it will also force seeks on the disk.

”二相コミットはコストがかかりますが、低レイテンシのシステムでは、この状況を扱う際にはより大きな問題があります。一般的に、キュー自体は永続的なので、イベントは二段階のコミットで二度ディスク上に書き込まれます。ほとんどのシステムでは、二重書き込みを行うことはそれほど重要ではありませんが、もし低レイテンシの要件がある場合には、 ディスク上でのシークを強制的に行うことになるため、非常に高価な操作になる可能性があります。”

The database would insure that the values of sequence number would be unique and incrementing, this can be easily done using an auto-incrementing type. Because the values are unique and incrementing a secondary process can chase the Events table, publishing the events off to their queue. The chasing process would simply have to store the value of the sequence number of the last event it had processed, it could even update this value with a two-phase commit bringing the update and the publish to the queue into the same transaction.

”データベースは、シーケンス番号の値が一意でインクリメントされることを保証しますが、これはAUTO INCREMENT型を使用して簡単に行うことができます。値が一意でインクリメントされているので、セカンダリプロセスはイベントテーブルを追いかけることができ、イベントをキューに公開することができます。追いかけるプロセスは、単に最後に処理したイベントのシーケンス番号の値を保存しておく必要があります。 この値を更新するには、二段階のコミットで更新とキューへの公開を同じトランザクションにすることもできます。”

ではどうしたらよいか。(1)と(2)は同時に二つ処理せずに、(2)のイベントだけを書き込むようにします。(1)の状態は(2)のイベントを基に別のプロセスで作り出します。これがCQRS/Event Sourcingで、すべての状態はイベントから導出できるようにします。やっと本題。

しかし、以下の(0)から(1)の処理は、最新状態に基づいているのでそのままでは使えません。設計を見直す必要があります。

def addCartItem(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
  // val cart = cartRepository.findById(cartId) // (0)
  // val newCart = cart.addItem(itemId, num)
  // cartRepository.store(newCart) // (1)
  // イベントの書き込みはこれでよいが、↑の処理をどうしたらよいか 
  cartEventService.publish(ItemAdded(cartId, itemId, num)) // (2)
}

Event Sourcingでプログラミングモデルがどう変わるか

前述しましたが、CQRS/Event Sourcingは、簡単にいうとイベントから状態を導出するアーキテクチャです。絵で説明したほうがわかりやすいので結論となる図は以下。どーん。「え、複雑だなぁ」と思いますよね。単純なCRUDよりはそりゃ難しくなりますよ…。その代わり耐障害性やスケーラビリティが向上するのです…。一つのアプリケーションのように見えますが、コマンド側とクエリ側は分離してデプロイすることが可能です。

f:id:j5ik2o:20200916090416p:plain

イベントからドメインオブジェクトをリプレイする

ジャーナルDBと呼ばれるデータベースに、カートオブジェクトが発行したカートのイベント(以下、カートイベント)が永続化されます。このイベントを使ってコマンド側にドメインオブジェクトを、クエリ側にリードモデルを構築します。(リードモデルの形式は何でもOKです。リードDBはジャーナルDBと物理的に分けるかどうかはここでは問いません。まず概念的に理解したほうがいいでしょう)。つまり状態といえるものが2種類あるわけです。リードモデルは後述しますが、まずはドメインオブジェクトから。

以下のように、永続された複数のイベントがあれば、最新のカートオブジェクトを作る(リプレイ)ことが可能です。(1)の部分でイベントを読み込み、(2)で最新のカートオブジェクトを作り出します。

def addCartItem(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
  val allEvents = cartEventService.getAllEventsById(cartId) // (1)
  val cart = replayFunction(allEvents) // (2)
  val itemAdded = cart.addItem(itemId, num)
  cartEventService.publish(itemAdded)
}

スナップショットでリプレイ時間を短縮する

前述のコードみてこれはひどい設計だと思ったのではないでしょうか。わかります。僕も最初そう思いました…。容易に想像がつきますが、永続化されたカートイベントの件数に比例してリプレイのパフォーマンスが悪化します。なので、対策として、永続化するイベントN件ごとにカートオブジェクトの状態をスナップショットDBに保存しておき、カートオブジェクトをリプレイする際に、最新スナップショット+それ以降に発生した差分イベントを読み込んで、高速にリプレイします。つまり、N件以上イベントがあっても、最新のスナップショットと最大N - 1件分の読み込みで済ませることができます。とはいえ、Nを小さくすると読み込む差分イベントが少なくなる一方でイベント書き込み時のスナップショットを更新する頻度も高くなります。諸刃の剣ではありますが、バランスをうまくとる必要があります。Akka(akka-persistence)でもサポートされている機能でS3やDynamoDBをスナップショットDBとして使うことができます。

以上のことを踏まえると以下のようなイメージになります。前提としてイベントやスナップショットには連番(seqNr)が振ってあるものとします。まず、最新のスナップショットとそのスナップショットのseqNr以降の差分イベントを取得します。それらを使って最新のカートオブジェクトをリプレイします。そしてオブジェクトオブジェクトの振る舞いを実行します。新しいカートオブジェクト、イベントを得ます。イベントはエンキューされますが、seqNrが1000件で割り切れるときに新しいカートオブジェクトを最新のスナップショットとして保存します。

def addCartItem(cartId: CartId, itemId: ItemId, num: ItemNum): Unit = {
  val snapshot = loadSnapshot(cartId)
  val events = cartEventService.getEventsByIdWithSeqNr(cartId, snapshot.seqNr + 1)
  val cart = replayFunction(snapshot, events)
  val (newCart, itemAdded) = cart.addItem(itemId, num)
  seqNr += 1
  if (seqNr % 1000 == 0 ) {
    saveSnapshot(newCart, seqNr)
  }
  cartEventService.publish(itemAdded, seqNr)
}

こうすることでリプレイ時間を短縮化できますが、CRUDのときと比較すると多くのデータを読み込んでいることがわかりますね…。もう少し効率的にならないかについては後ほど説明します。

追記:

ドメインオブジェクトはいちいちイベントからリプレイせずに、クエリ側のリードモデルをユースケースでうまく使えないの?と思うかもしれませんが、以下の問題があり難しいです。

  1. 非正規化されたリードモデルは正規化されたドメインモデルではないので、代替はそもそも難しい。代替したとしたらDDDではなくなりそう
  2. C/Qがモジュールとして分離できなくなる。CとQが分離されていないならもはやCQRSではない?違う呼び方がよさそうです。
  3. リードDBへの書き込み時間分、最新状態を取得する時間が遅延する。つまり常に過去のデータをみていることになってしまいます。これは許容できる場合がありそう、要件によりますね

リードモデルの構築

コマンド側のドメインモデルの概要はつかめたと思いますが、クエリ側のリードモデルを考えてみましょう。

このアーキテクチャパターンによって、イベントが唯一信頼できるデータソースとなり、そのイベントを基にクエリ側のリードモデルを構築します。イベントは不変という特徴を持ちます。なので、いつでも正しいリードモデルを作り出せます。極端な話、リードモデルの設計をミスってもイベントから作り直せます。もちろん、データ更新コストはかかりますが…。

コマンドとクエリで要件が非対称

そもそもなぜコマンドとクエリでモデルを分離するのか。理由は以下の表を参照してください。つまるところ要件が違うからです。人間が閲覧するようなシステム(SoEは特に顕著)だとまずクエリ側は非正規化データを求めます。

-コマンドクエリ
一貫性/可用性トランザクション整合性を使い一貫性を重視する結果整合を使い可用性を重視する
データ構造トランザクション処理を行い正規化されたデータを保存することが好まれる(集約単位など)非正規化したデータ形式を取得することが好まれる(クライント都合のレスポンスなど)
スケーラビリティ全体のリクエスト比率とごく少数のトランザクション処理しかしない。必ずしもスケーラビリティは重要ではない全体のかなりのリクエスト比率を占める処理を行うため、クエリ側はスケーラビリティが重要

例えば、以下のような集約(グローバルなエンティティ)がある場合でも

  • Employee { id, name, deptId }
  • Department { id, name }

画面は以下のように集約を二つ結合したようなデータを求めます。deptIdだけではどの部署か分からないから名前も必要なのです。*4

  • Employee { id, name, deptId, deptName }

ドメインオブジェクトの構造にインパクトを与えるのは、このような正規と非正規の構造的な非対称性です。考えてみるとわかりますが、トランザクション処理と検索・レポートを両立するモデルの実現も理解も難しいのです。

さて、コマンドとクエリを分離しないと実装上でどんな問題が起きるか気になるところですが、以下のようなものがあります。

  • リポジトリのクエリメソッドが複雑になる
  • N+1クエリが発生しやすい
  • ドメインオブジェクトからDTOへの変換が非効率

簡単に以下に説明します。

クエリ要件をリポジトリで満たそうとしてメソッドが複雑になる

集約(グローバルなエンティティ)は識別のためにIDを持つため、IDから集約本体を引き当てることができます。これはある意味正引きです。

val employee = employeeRepository.findById(employeeId)

しかし、集約の他の属性(名前や他のIDなど)を使って集約(単体もしくは複数)を引き当てたい場合があります。セカンダリインデックスを使うような逆引きですね。

以下のようなコードを書いたことがありませんか? 僕は散々書いてきました。

val employees = employeeRepository
  .findByDeptIdsWithEmpNamePatterns(deptIds, empNamePatterns)

ドメインの振る舞いを起こすためというより、クライアントにクエリのレスポンスを返すためによく使っていました。これは本当にドメインの責務でしょうか。ドメインは振る舞いを起こすことが責務と捉えると、ドメインの振る舞いが伴わないこのようなリポジトリメソッドは関心が分離できていないのかもしれません。前述した GetCartItemsUseCaseクラスの実装も本当にリポジトリの責務でいいか考える必要がありそうです。本当に集約をそのまま返すならば問題ないかもしれません。次のN+1クエリを含むような問題に発展する可能性がある場合は要注意です。

リポジトリでレスポンスを組み立てるとN+1クエリが発生しやすい

APIのレスポンスでは非正規化データを返すことがほとんどです。例えば、ホテルの予約情報一覧を作るために、予約集約に関連するホテル集約、顧客集約を解決しなければならないことがあります。ここでもドメインの振る舞いは起きません。クエリだからです。これは本当にリポジトリがやるべき仕事でしょうか。僕は過去にDBAに申し訳ないと思いながらこういうN+1が大量に発生するコードを書いていました…。そもそもレスポンスとして表形式を期待するならSQLとRDBにやらせるべきではないでしょうか。

reservationRepository.findByIds(ids).map { reservation => 
  val hotel = hotelRepository.findById(reservation.hotelId)
  val customer = customerRepository.findById(reservation.customerId)
  new ReservationDto(reservation, hotel.name, customer.name)
}

ドメインオブジェクトからDTOへの変換が非効率

APIのレスポンスでは集約の一部だけを求めることがあります。以下は恣意的な例ですが、UIに併せて顧客名一覧を返す処理です。リポジトリで得た集約の大部分を捨てて名前だけのリストを作ります。クエリ要件を満たすために大部分のリード結果が捨てられることになります。

val customerNames = customerRepository.findByIds(ids).map { customer =>
  customer.name
}

もちろん、概念モデルの粒度が大きいとI/Oコストも比例して大きくなるので、まず先に概念の大きさに目を向けるべきですが、コマンド側のドメインでクエリを頑張りすぎるとこういうことになります。

イベントからリードモデルを作る

RDBにリードモデルを構築するなら、以下のように永続化されたイベントを受信して、SQLを実行するだけです(実際にはこのようなストリーム処理は単発ではなく後に発生するワークロードに備えて常時起動させる必要があります)。これはこれで簡単ですが、cartIdを指定して読み込むので、スケールしにくいです。具体的なIDを指定しないので、ある程度の並列度を維持し永続化されたイベントを全順序に読み込む必要があります。また、こういった動作をするコンシューマは障害発生時のリバランスが必要になります。僕のお勧めはKafkaを使うことです。これはこれで一本ブログ記事が書けるぐらいの知識量なので、別の機会に触れます。

cartEventSubscriber.subscribe(cartId).runWith(Sink.foreach{
  case e: CartCreated => insertCartTable(e)
  case e: CartItemAdded => insertCartItemTable(e)
  case e: CartItemNumUpdated => updateCartItemTable(e)
  case e: CartItemRemoved => deleteCartItemTable(e)
  case e: CartRemoved => deleteCartTable(e)
})

まとめ。そして課題はまだある

ということでだいたいの雰囲気はつかめたのではないかと思います。とはいえ実装するうえではまだ課題があります。

たとえば、addCartItem内部の処理をもう少し効率化できないかの課題については、まさに分散システムの問題に直面します。

考えられる対策としては、カートオブジェクトをユースケースのスコープから外に出して、ワークロードがある間だけ起動させてキャッシュさせておく方法があります。リクエストごとに集約をリプレイしなくなるのでオーハーヘッドが軽減します。が、これは自前で実装するのは大変です。たとえば、ウェブサーバがスケールアウトして、ロードバランサから同じカートID向けの異なるリクエストが、複数サーバに飛んだこと想定してみてください。同一カートオブジェクトが別々のサーバでリプレイされ、異なるコマンドが受理されることで、同一IDなのに別々の状態が作り出されてしまいます。ある意味スプリットブレインな状態になります。

f:id:j5ik2o:20200916113429p:plain

下図はイメージなので全然正確じゃないという前提ですが、前述のようなスプリットブレインにならないようにするには、集約(グローバルなエンティティ)を以下のようにシャーディングして、コマンドをルーティングするとよいわけです。つまり、同一IDの集約はクラスタ全体で1個しかないように配置すればよいでしょう…。クラスタ上で脳みそが一つしかないのでスプリットしようがないという話です。と、ここまで考えてめちゃくちゃ大変だと想像できたと思います。さらにサーバが故障した場合に別サーバに集約をテイクオーバさせるなど自前で実装したくない…。なので、AkkaやErlangなど分散システムのフレームワークなしでこういうことは辞めましょう…。AkkaではActorという軽量プロセスをクラスタ上に分散させることができます。こういう基盤なしに無茶は辞めよう…。*5

f:id:j5ik2o:20200916165624p:plain

興味があれば以下参照。

akka.io

正直この分野は沼感がありますが、参考になれば幸いです。

*1:とはいっても概念を説明するための疑似コードだと思ってください

*2:CQRS/Event Sourcingそのものというより、分散システムに起因する難しさですが…

*3:データベースに永続化される前提のカートと思ってください

*4:システムがユースケースのアクターならこういう考慮はいらないと思いますが、SoEだとどうしてもこういう要求は発生します

*5:分散システムに関係する話は途端に難しくなりがち。わかりやすく書いたつもりですが、十分に伝わらないかもしれません…ご容赦を…