かとじゅんの技術日誌

技術の話をするところ

CQRS/ESによって集約の境界定義を見直す

peing.net

メッセージングシステムのお題のようです。面白そうなのでちょっと考えてみよう。

問題提起

集約候補が以下の3つ。

  • ユーザー
  • 企業
  • スレッド
    • メッセージ

スレッド集約はメッセージを複数保持するようです。

  • 1000件のメッセージを保持するスレッド集約を更新した際、1000件のアップデートが行われる
  • スレッド集約内部で更新された属性を把握していない場合は、リポジトリでは全メッセージ分の更新となる。これを避けるための仕組みはどう実装するのか?

ということが指摘されている。まぁわかります。これはCQRS/ESなら解決できるよと言ってみる

問題の分析

で、僕ならどう考えて実装に落とすかつらつらまとめてみよう。CQRS/ES前提です。Akkaの成分は少なめでScalaの擬似コードで解説します。コードはコンパイルしてないので…おかしなところあるかも。

問題はスレッド集約がメッセージの集合を保持した場合、更新コストが大きくなりがち。さらに差分更新しようとしても集約の内部実装が複雑になるのではという論点。

まず考えるのは、スレッドとメッセージの関係性に強い整合性は必要か? ということです。Noならメッセージを別の集約として切り出すのがよいとは思います。おそらく問題設定としてはYesになっている気がしますが…。 仮に契約プランによって書き込めるメッセージ数が変わるという場合は、スレッドとメッセージには強い整合性が必要かもしれません。別別の集約にした場合、トランザクションが別になり結果整合性になるため、インスタンス数(レコード数)を厳密に制御することはできませんね。ということで、強い整合性が必要という前提で進めます。

あと、この問題だけではスレッド集約がどのような使われ方をするのが厳密にはわかりません。つまり振る舞いがイメージできません。たとえば、スレッドにメンバーとして参加したアカウントだけがメッセージを閲覧・追加・更新できるというシナリオがあればスレッド集約の構造が明確になります。メンバーの考慮が必要なスレッド集約は以下のようなイメージになるでしょう。

case class Message(id: MessageId, text: MessageText, senderId: AccountId, createdAt: Instant, updatedAt: Instant)

case class Messages(values: List[Message]) {
  def add(message: Message): Messages = ???
  def update(message: Message): Messages = ???
}

case class Members(values: List[AccountId]) {
  def add(accountId: AccountId): Members = ???
  def remove(accountId: AccountId): Members = ???
  def contains(accountId: AccountId): Boolean = ???
}


class Thread(id: ThreadId, members: Members, messages: Messages, createdAt: Instant) {

  def addMember(account: AccountId): Thread = copy(members = members.add(accountId))

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = { 
    if (members.contains(senderId)) {
      Right(copy(messages = messages.add(Message(messageId, messageText, senderId, Instant.now, Instant.now))))
    } else {
      Left(new AddMessageError)
    }
  }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, sender: AccountId): Either[ThreadError, Thread] = 
    if (members.contains(senderId)) {
      Right(copy(messages = messages.updateMessage(Message(messageId, messageText, senderId, Instant.now, Instant.now))))
    } else {
      Left(new UpdateMessageError)
    }
    
  def getMessages(accountId: Account): Either[ThreadError, Messages] = 
    if (members.contains(accountId)) {
      Right(messages)
    } else {
      Left(new ReadMessagesError)
    }

}

課題に戻りますが、このクラスをリポジトリでCRUDすることを想像してみましょう。thread.messagesが1000件ある場合です。

for {
thread <- threadRepository.findById(threadId)  // 1000件メッセージを持つスレッドを取得
newThread <- thread.addMessage(Message(...)) // 状態変更
_ <- thradRepository.store(newThread) // 1001件メッセージを持つスレッドを更新
} yield ()

スレッドのどのフィールドが更新されたかわからない場合は、スレッドのタイトル、説明、メッセージの本文などをすべての情報をSQLなどを使って更新するのでしょうか? これはあまりに非効率ではないか…。それを回避するにはThread集約内部にどのフィールドをDB上で更新すべきか把握する仕組みが必要なのではないかという話ですね。 あと、そもそもfindByIdの時点で、本文も含む1000件のメッセージをDBから取得するのは効率がよいとはいえませんね。

CQRS/ES流の設計

上記の問題をCQRS/ESで改善できるか考えてみよう。

コマンド側

集約はCQRSのC(コマンド)側に所属しますが、役割は副作用を起こすことを目的にします。 たとえば、メンバーの追加・削除、メッセージを追加・更新などです。メンバーの確認はメッセージの追加・更新に必要なので含めます。これら以外はクエリ側の責務とします。

class Thread(id: ThreadId, memberIds: MemberIds, messages: MessageIds, createdAt: Instant) {

  def addMember(account: AccountId): Thread = copy(members = memberIds.add(accountId))

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      Right(copy(messages = messageIds.add(mesageId)) // 本文は保存しない
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, sender: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      Right(this) // スレッドではメッセージIDしか管理しないのでメッセージ本文を更新できない
    } else {
      Left(new UpdateMessageError)
    }
  

}

本文を持たないので大分軽量化できたのではないでしょうか?(ただ、内部にID集合は保持するので無限に保持するわけには行かないとは思います。上限値は必要だと思います) さて、クエリ側にどうやって連携するかですね。メッセージの本文も読めないと意味がないですからね。

ということで、擬似コードですが、以下のように副作用が起きたときにイベントをDBに追記します。このイベントは別のプロセスがコンシュームしてリードモデルを作る元ネタになります。

sealed trait ThreadEvent
case class MemeberAdded(threadId: ThreadId, accountId: AccountId, occurredAt: Instant) extends ThreadEvent
case class MessageAdded(threadId: ThreadId, messageId: MessageId, messageText: MesssageText, senderId: AccountId, occurredAt: Instant) extends ThreadEvent
case class MessageUpdated(threadId: ThreadId, messageId: MessageId, messageText: MesssageText, senderId: AccountId, occurredAt: Instant) extends ThreadEvent

class Thread(id: ThreadId, memberIds: MemberIds, messages: MessageIds, createdAt: Instant) {

  def addMember(accountId: AccountId): Thread = {
     persistEvent(MemberAdded(id, accountId, Instant.now))
     copy(members = memberIds.add(accountId))
   }

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      persistEvent(MessageAdded(id, messageId, messageText, senderId, Instant.now))
      Right(copy(messages = messageIds.add(mesageId))
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      persistEvent(MessageUpdated(id, messageId, messageText, senderId, Instant.now))
      Right(this)
    } else {
      Left(new UpdateMessageError)
    }
  

}

書き込みはこれでよいとして、集約をDBから再生する際はどうなるのか。DBに保存してあるイベント列を順番に空のThreadに適用するだけです。 これも疑似コードで説明。この考慮はあくまでコマンドを実行するための前提を整えるためのものです。

class Thread(id: ThreadId, events: Seq[ThreadEvent]) {

  private var memberIds: MemberIds = MemberIds.empty
  private var messages: MessageIds = MessageIds.empty

  // イベントから最新状態を復元
  events.foreach{
    case MemberAdded(_, accountId, _) => members = memberIds.add(accountId)
    case MessageAdded(_, messageId, messageText, senderId, createdAt) => messages = messages.add(Message(messageId, messageText, senderId, createdAt)
    case MessageUpdated(_, messageId, messageText, senderId, updatedAt) => messages = messages.update(Message(messageId, messageText, senderId, updatedAt)
  }


  def addMember(accountId: AccountId): Thread = {
     persistEvent(MemberAdded(id, accountId, Instant.now))
     copy(members = memberIds.add(accountId))
   }

  def addMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] =
    if (memberIds.contains(senderId)) {
      persistEvent(MessageAdded(id, messageId, messageText, senderId, Instant.now))
      Right(copy(messages = messageIds.add(mesageId))
    } else {
      Left(new AddMessageError)
    }
    
  def updateMessage(messageId: MessageId, messageText: MessageText, senderId: AccountId): Either[ThreadError, Thread] = 
    if (memberIds.contains(senderId)) {
      persistEvent(MessageUpdated(id, messageId, messageText, senderId, Instant.now))
      Right(this)
    } else {
      Left(new UpdateMessageError)
    }
  

}

これでドメインを扱うコマンド側では、差分を表現するイベントをジャーナルとして追記保存すればいいことになります。また最新の集約の状態はコマンドを受け付けるために必要最低限のものしか持ちません。クエリをしないので、これで問題は起きません。当初想定していたモデルよりかなり小さくなります。コマンド側の集約はリプレイ後はルールをチェックしてパスしたらイベントを追記保存するだけなんです。

AkkaはEvent Sourcingを標準サポート

ここでは示してませんが、イベントの列が巨大だとリプレイに時間が掛かります。例えばイベントN件に1回スナップショットを保存しておき、リプレイ時に最新スナップショット+差分のイベントでリプレイを高速化することもできます。とはいえ、リクエストのたびにイベント列を読み込み・適用するのは効率が悪そうです。Akkaのアプローチでは軽量プロセスであるActorとしてスレッド集約を実装します。上記のクラスがActorとして実装されます(集約アクター)。メソッドコールがメッセージパッシングに変わります。すでにイベント列がある集約アクターはAkkaがストレージからイベントを読み込み、メッセージとして適用してくれます。また一定時間メッセージを受け付けていない集約アクターはランタイムから消えるように設定できます。まぁ、必要な機能が揃っているので確実に楽できます。

doc.akka.io

クエリ側の設計

クエリ側のリードモデルはどうやって作るのか? 仮に上記のイベントがDynamoDBに書き込まれるとして、パーティションキーはThreadId, ソートキーはイベント番号(akkaでは自動採番してくれます)、イベント本体はJSONなどで格納されるイメージとします。これをDynamoDB Streamsなどでコンシュームします。コンシューム部分はLambdaでもKCLでもよいです。以下を参考にしてみてください。

docs.aws.amazon.com docs.aws.amazon.com

この方法でイベントが時系列順に手に入ります。リードモデルを作るにはこのイベントを順番にリードDBに反映します。 仮にRDBにTHREADテーブル、MEMBERテーブル、MESSAGEテーブルがある場合は以下のような処理になります。 consumeEventsByThreadIdFromDDBStreamsはDynamoDB Streamsからレコードを読み込んでイベントを返す関数です。

consumeEventsByThreadIdFromDDBStreams.foreach{ 
  case ev: ThreadCreated => insertThread(ev) // 実際にはスレッドが作られたときのイベントも永続化する必要がある
  case ev: MemberAdded => insertMember(ev)
  case ev: MessageAdded => insertMessage(ev)
  case ev: MessageUpdated => updateMessage(ev)
}

最終的にリードモデルはレスポンスの形式に併せて定義するとよいでしょう。

case class ThreadDto(id: Long, ...)
case class MemberDto(id: Long, accountId: Long, createdAt: Int)
case class MessageDto(id: Long, threadId: Long, text: String, senderId: Long, createdAt: Instant, updatedAt: Instant)

リードモデルはDAO内部でSQLを使って、部分集合を取り出したりできます。リードモデルはお好きなように…。

val messages: Seq[MessageDto] = MessageDao.findAllByThreadIdWithOffsetLimit(threadId, 0, 100)

結局どうなるのか

最終的な疑似コードイメージです。 1000件のメッセージを保持するスレッドといってもIDしか持ちません。本文を持つより簡単にリプレイし一つのコマンドを付けて一つのイベントを永続化するだけです。1001件のメッセージは永続化しません。間違いなくスケールするのはこっちですね。

val thread = Thread(ThreadId(1L)) // 永続化されているイベント列があれば渡される。なければイベント列は空で作られる
thread.addMessage(...) // 1つのコマンドで1つのイベントが追記されるだけ。1001件も更新されません。

補足

上記の考え方がわかったとしても、CQRS/ESは自前でやるのは大変です。 たとえば、以下の処理が複数のサーバで起こった場合、ロックがなくても大丈夫でしょうか?ダメそうですねw そう書き込むためのトランザクション管理が必要なのです…。

val thread = Thread(ThreadId(1L))
thread.addMessage(...) 

AkkaではThread部分がActorになりますが、複数のサーバで起動しても、ユニークになるように分散できます。つまり分散システム上で同じIDのActorがたかだか一つになるように配置してくれるので、トランザクションの問題は起きません。 こういったことを考えると、CQRS/ESをサポートしたフレームワークやライブラリを使うほうが圧倒的に楽です。Akka お勧めです。

akka.io

akka-cluster スプリットブレインリゾルバ OSS実装一覧

Akkaクラスターがネットワーク分断に遭遇した場合に、UnreachableメンバーをDown状態に遷移させるためのリゾルバのOSS実装を以下にまとめる。

ちなみに、このリゾルバがない場合はUnreachableのままだとリーダアクションが取れずにクラスターが機能不全状態なる。かといってAutodownを有効にするとスプリットブレインが発生する可能性がある。これを解決するのがスプリットブレインリゾルバで商用版はLightbend社から提供されている。スプリットブレインリゾルバの仕様はこちら

git repo stars 備考
TanUkkii007/akka-cluster-custom-downing 131 OldestAutoDowning, QuorumLeaderAutoDowning, MajorityLeaderAutoDowningに対応している。OldestAutoDowningには不具合があるようだ。要修正
mbilski/akka-reasonable-downing 85 Static QuorumによるDowningにしか対応していない
arnohaase/simple-akka-downing 16 static-quorum, keep-majority, keep-oldestに対応している
guangwenz/akka-down-resolver 5 Static QuorumによるDowningにしか対応していない

sbtでビルドしたDockerイメージをECRにプッシュする方法

sbt-native-packagerとsbt-ecrを使って、ビルド→ECRログイン→ECRへのプッシュまで行います。

https://github.com/sbt/sbt-native-packager https://github.com/sbilinski/sbt-ecr

ECRを作成する

レジストリを作る。作成後 以下のようなURIが取得できます。

123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/j5ik2o/thread-weaver-api-server

sbtプラグインの設定の設定

project/plugins.sbt

// ...
addSbtPlugin("com.mintbeans" % "sbt-ecr" % "0.14.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.10")
// ...

build.sbtの設定

repositoryNameは、上記で得られたURIのホスト名を除く文字列(j5ik2o/thread-weaver-api-server)を指定すること

import com.amazonaws.regions.{Region, Regions}

val ecrSettings = Seq(
  region           in Ecr := Region.getRegion(Regions.AP_NORTHEAST_1),
  repositoryName   in Ecr := "j5ik2o/thread-weaver-api-server",
  repositoryTags   in Ecr ++= Seq(version.value), // タグを付ける場合は指定する
  localDockerImage in Ecr := "j5ik2o/" + (packageName in Docker).value + ":" + (version in Docker).value,
  push in Ecr := ((push in Ecr) dependsOn (publishLocal in Docker, login in Ecr)).value
)

val `api-server` = (project in file("api-server"))
  .enablePlugins(AshScriptPlugin, JavaAgent, EcrPlugin)
  .settings(baseSettings)
  .settings(ecrSettings)
  .settings(
    name = "thread-weaver-api-server",
    dockerBaseImage := "adoptopenjdk/openjdk8:x86_64-alpine-jdk8u191-b12",
    maintainer in Docker := "Junichi Kato <j5ik2o@gmail.com>",
    dockerUpdateLatest := true,
    dockerUsername := Some("j5ik2o"),
    bashScriptExtraDefines ++= Seq(
      "addJava -Xms${JVM_HEAP_MIN:-1024m}",
      "addJava -Xmx${JVM_HEAP_MAX:-1024m}",
      "addJava -XX:MaxMetaspaceSize=${JVM_META_MAX:-512M}",
      "addJava ${JVM_GC_OPTIONS:--XX:+UseG1GC}",
      "addJava -Dconfig.resource=${CONFIG_RESOURCE:-application.conf}"
    ),
// ...
)

実行

profileを使う場合は以下のようにする

$ AWS_DEFAULT_PROFILE=xxxxx sbt api-server/ecr:push

Microsoft社のDDD, CQRSに関する記事一覧

CQRS+ESパターンの説明

  • コマンド クエリ責務分離 (CQRS) パターン
    • ステートソーシング(CRUD)の短所としてあげているもの
      • 読み取りと書き込みのデータ表現不一致問題
      • ステートの同時更新による競合問題
      • 読み取りと書き込みが同じモデルによる、データの誤用問題
    • CRUDの単一データモデルより設計と実装が簡単になる。ただし、CRUDはスキャーフォルドメカニズムによって自動生成できない
    • リードモデルはSQLビューもしくは即時プロジェクションを生成するか。
    • 同じ物理ストアに格納する場合は、パフォーマンス、スケーラビリティ、セキュリティを最大化するため、データストアを物理的に分けるのが一般的
  • イベント ソーシング パターン
    • ステートソーシング(CRUD)の短所としてあげているもの
      • 更新時のロックがパフォーマンスと応答性を低下させる
      • 単一データ項目への更新は競合が起きやすい(コラボレータが複数の場合)
      • 監査メカニズムがない限り、履歴が失われる
    • ESのメリット
      • イベントが不変であり、追記保存するだけ。イベントを処理するプロセスは非同期処理で問題が生じない
      • イベントは、データストアを更新しない、シンプルなオブジェクト。
      • イベントはドメインエキスパートの関心事。
      • 同時更新による競合の発生を防ぐことができる(ただし、ドメインオブジェクトが矛盾した状態にならないよう依然として保護が必要)

関連記事

DDD,CQRS関連

マイクロサービス関連

マイクロサービス関連。DDDだと戦略的設計の部分。

Scala.js ウェブフレームワーク Facade 調査(2019年4月版)

主要なウェブフレームワークのFacadeについて調査した(2019/4/17日時点)。

React.js

Reactバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 タグ形式 ReactRouterサポート Reduxサポート 備考
16.6+ 2.12,2.11 japgolly/scalajs-react 1196 1時間前 Scalatags形式 Scala.js独自実装 サポートなし
16.8.1 2.12 shadaj/slinky 276 4日前 case class なし なし japgolly/scalajs-reactに依存, ReactNative対応
16.8.1+ 2.12 aappddeevv/scalajs-reaction 17 25日前 case class あり あり
15.5.3 2.12 shogowada/scalajs-reactjs 23 2017/5 Scalatags形式 あり あり
0.11.0 2.11 xored/scala-js-react 133 2015/2 scalax なし なし

所感

Vue.js

Vueバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 Vue Routerサポート Vuexサポート 備考
2.x 2.11,2.12 random-scalor/scala-js-vue 3 2018/2 あり あり
2.x 2.12 massung/scala-js-vue 12 2017/11 なし なし
2.x 2.11 fancellu/scalajs-vue 76 2017/5 なし なし

所感

Angular

  • 最新のAngularを使えるFacadeを見つけることができなかった…。

まとめ

Scala.jsでウェブアプリケーションを作るならscalajs-react一択…。他は自分でがっつりコントリビュートする気がないと、いろいろ大変そうです。 ということで、サンプルプロジェクトを二種類用意したので、興味があれば確認してみてください。

https://github.com/j5ik2o/scalajs-react-webpack4-example https://github.com/j5ik2o/scalajs-vuejs-webpack4-example