かとじゅんの技術日誌

技術の話をするところ

akka-cluster スプリットブレインリゾルバ OSS実装一覧

Akkaクラスターがネットワーク分断に遭遇した場合に、UnreachableメンバーをDown状態に遷移させるためのリゾルバのOSS実装を以下にまとめる。

ちなみに、このリゾルバがない場合はUnreachableのままだとリーダアクションが取れずにクラスターが機能不全状態なる。かといってAutodownを有効にするとスプリットブレインが発生する可能性がある。これを解決するのがスプリットブレインリゾルバで商用版はLightbend社から提供されている。スプリットブレインリゾルバの仕様はこちら

git repo stars 備考
TanUkkii007/akka-cluster-custom-downing 131 OldestAutoDowning, QuorumLeaderAutoDowning, MajorityLeaderAutoDowningに対応している。OldestAutoDowningには不具合があるようだ。要修正
mbilski/akka-reasonable-downing 85 Static QuorumによるDowningにしか対応していない
arnohaase/simple-akka-downing 16 static-quorum, keep-majority, keep-oldestに対応している
guangwenz/akka-down-resolver 5 Static QuorumによるDowningにしか対応していない

sbtでビルドしたDockerイメージをECRにプッシュする方法

sbt-native-packagerとsbt-ecrを使って、ビルド→ECRログイン→ECRへのプッシュまで行います。

https://github.com/sbt/sbt-native-packager https://github.com/sbilinski/sbt-ecr

ECRを作成する

レジストリを作る。作成後 以下のようなURIが取得できます。

123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/j5ik2o/thread-weaver-api-server

sbtプラグインの設定の設定

project/plugins.sbt

// ...
addSbtPlugin("com.mintbeans" % "sbt-ecr" % "0.14.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.10")
// ...

build.sbtの設定

repositoryNameは、上記で得られたURIのホスト名を除く文字列(j5ik2o/thread-weaver-api-server)を指定すること

import com.amazonaws.regions.{Region, Regions}

val ecrSettings = Seq(
  region           in Ecr := Region.getRegion(Regions.AP_NORTHEAST_1),
  repositoryName   in Ecr := "j5ik2o/thread-weaver-api-server",
  repositoryTags   in Ecr ++= Seq(version.value), // タグを付ける場合は指定する
  localDockerImage in Ecr := "j5ik2o/" + (packageName in Docker).value + ":" + (version in Docker).value,
  push in Ecr := ((push in Ecr) dependsOn (publishLocal in Docker, login in Ecr)).value
)

val `api-server` = (project in file("api-server"))
  .enablePlugins(AshScriptPlugin, JavaAgent, EcrPlugin)
  .settings(baseSettings)
  .settings(ecrSettings)
  .settings(
    name = "thread-weaver-api-server",
    dockerBaseImage := "adoptopenjdk/openjdk8:x86_64-alpine-jdk8u191-b12",
    maintainer in Docker := "Junichi Kato <j5ik2o@gmail.com>",
    dockerUpdateLatest := true,
    dockerUsername := Some("j5ik2o"),
    bashScriptExtraDefines ++= Seq(
      "addJava -Xms${JVM_HEAP_MIN:-1024m}",
      "addJava -Xmx${JVM_HEAP_MAX:-1024m}",
      "addJava -XX:MaxMetaspaceSize=${JVM_META_MAX:-512M}",
      "addJava ${JVM_GC_OPTIONS:--XX:+UseG1GC}",
      "addJava -Dconfig.resource=${CONFIG_RESOURCE:-application.conf}"
    ),
// ...
)

実行

profileを使う場合は以下のようにする

$ AWS_DEFAULT_PROFILE=xxxxx sbt api-server/ecr:push

Microsoft社のDDD, CQRSに関する記事一覧

CQRS+ESパターンの説明

  • コマンド クエリ責務分離 (CQRS) パターン
    • ステートソーシング(CRUD)の短所としてあげているもの
      • 読み取りと書き込みのデータ表現不一致問題
      • ステートの同時更新による競合問題
      • 読み取りと書き込みが同じモデルによる、データの誤用問題
    • CRUDの単一データモデルより設計と実装が簡単になる。ただし、CRUDはスキャーフォルドメカニズムによって自動生成できない
    • リードモデルはSQLビューもしくは即時プロジェクションを生成するか。
    • 同じ物理ストアに格納する場合は、パフォーマンス、スケーラビリティ、セキュリティを最大化するため、データストアを物理的に分けるのが一般的
  • イベント ソーシング パターン
    • ステートソーシング(CRUD)の短所としてあげているもの
      • 更新時のロックがパフォーマンスと応答性を低下させる
      • 単一データ項目への更新は競合が起きやすい(コラボレータが複数の場合)
      • 監査メカニズムがない限り、履歴が失われる
    • ESのメリット
      • イベントが不変であり、追記保存するだけ。イベントを処理するプロセスは非同期処理で問題が生じない
      • イベントは、データストアを更新しない、シンプルなオブジェクト。
      • イベントはドメインエキスパートの関心事。
      • 同時更新による競合の発生を防ぐことができる(ただし、ドメインオブジェクトが矛盾した状態にならないよう依然として保護が必要)

関連記事

DDD,CQRS関連

マイクロサービス関連

マイクロサービス関連。DDDだと戦略的設計の部分。

Scala.js ウェブフレームワーク Facade 調査(2019年4月版)

主要なウェブフレームワークのFacadeについて調査した(2019/4/17日時点)。

React.js

Reactバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 タグ形式 ReactRouterサポート Reduxサポート 備考
16.6+ 2.12,2.11 japgolly/scalajs-react 1196 1時間前 Scalatags形式 Scala.js独自実装 サポートなし
16.8.1 2.12 shadaj/slinky 276 4日前 case class なし なし japgolly/scalajs-reactに依存, ReactNative対応
16.8.1+ 2.12 aappddeevv/scalajs-reaction 17 25日前 case class あり あり
15.5.3 2.12 shogowada/scalajs-reactjs 23 2017/5 Scalatags形式 あり あり
0.11.0 2.11 xored/scala-js-react 133 2015/2 scalax なし なし

所感

Vue.js

Vueバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 Vue Routerサポート Vuexサポート 備考
2.x 2.11,2.12 random-scalor/scala-js-vue 3 2018/2 あり あり
2.x 2.12 massung/scala-js-vue 12 2017/11 なし なし
2.x 2.11 fancellu/scalajs-vue 76 2017/5 なし なし

所感

Angular

  • 最新のAngularを使えるFacadeを見つけることができなかった…。

まとめ

Scala.jsでウェブアプリケーションを作るならscalajs-react一択…。他は自分でがっつりコントリビュートする気がないと、いろいろ大変そうです。 ということで、サンプルプロジェクトを二種類用意したので、興味があれば確認してみてください。

https://github.com/j5ik2o/scalajs-react-webpack4-example https://github.com/j5ik2o/scalajs-vuejs-webpack4-example

akka-streamを使ってmemcachedクライアントを作るには

かなり前になるのですが、akka-streamの習作課題として、Memcachedクライアントを実装したので、概要を解説する記事をまとめます。詳しくはgithubをみてください。

https://github.com/j5ik2o/reactive-memcached

TCPのハンドリング方法

akkaでTCPをハンドリングするには、以下の二つになる。おそらく

  1. akka.actorakka.ioのパッケージを使う方式。つまり、アクターでTCP I/Oを実装する方法。
  2. akka.streamTcp#outgoingConnectionというAPIを使う方法

今回は、akka-streamをベースにしたかったので、2を採用しました。

outgoingConnectionメソッドの戻り値の型は、Flow[ByteString, ByteString, Future[OutgoingConnection]]です。ByteStringを渡せば、ByteStringが返ってくるらしい。わかりやすいですね。そしてストリームが起動中であれば、コネクションは常時接続された状態になります。

def outgoingConnection(
  remoteAddress : InetSocketAddress,
  localAddress : Option[InetSocketAddress],
  options : Traversable[SocketOption], 
  halfClose : Boolean,
  connectTimeout : Duration,
  idleTimeout : Duration) : Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]]

Memcachedのコネクションを表現するオブジェクトを実装する

では、早速 Memcachedのコネクションを表現するオブジェクトである MemcachedConnection を実装します。このオブジェクトを生成するとTCP接続され、破棄されると切断されます。そして、考えたインターフェイスは以下です(別にtrait切り出さなくてもよかったですが、説明のために分けました)。sendメソッドにコマンドを指定して呼ぶとレスポンスが返る単純なものです。 ちなみに、sendメソッドの戻り値の型はmonix.eval.Taskです。詳しくは https://monix.io/docs/3x/eval/task.html をご覧ください。

trait MemcachedConnection {
  def id: UUID // 接続ID
  def shutdown(): Unit // シャットダウン
  def peerConfig: Option[PeerConfig] // 接続先設定

  def send[C <: CommandRequest](cmd: C): Task[cmd.Response] // コマンドの送信

  // ...
}

実装はどうなるか。全貌は以下。先ほどのTcp#outgoingConnectionを使います。

private[memcached] class MemcachedConnectionImpl(_peerConfig: PeerConfig,
                                                 supervisionDecider: Option[Supervision.Decider])(
    implicit system: ActorSystem
) extends MemcachedConnection {

  // ...

  private implicit val mat: ActorMaterializer = ...

  protected val tcpFlow: Flow[ByteString, ByteString, NotUsed] =
    RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
      Tcp()
        .outgoingConnection(remoteAddress, localAddress, options, halfClose, connectTimeout, idleTimeout)
    }

  protected val connectionFlow: Flow[RequestContext, ResponseContext, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val requestFlow = b.add(
        Flow[RequestContext]
          .map { rc =>
            log.debug(s"request = [{}]", rc.commandRequestString)
            (ByteString.fromString(rc.commandRequest.asString + "\r\n"), rc)
          }
      )
      val responseFlow = b.add(Flow[(ByteString, RequestContext)].map {
        case (byteString, requestContext) =>
          log.debug(s"response = [{}]", byteString.utf8String)
          ResponseContext(byteString, requestContext)
      })
      val unzip = b.add(Unzip[ByteString, RequestContext]())
      val zip   = b.add(Zip[ByteString, RequestContext]())
      requestFlow.out ~> unzip.in
      unzip.out0 ~> tcpFlow ~> zip.in0
      unzip.out1 ~> zip.in1
      zip.out ~> responseFlow.in
      FlowShape(requestFlow.in, responseFlow.out)
    })

  protected val (requestQueue: SourceQueueWithComplete[RequestContext], killSwitch: UniqueKillSwitch) = Source
    .queue[RequestContext](requestBufferSize, overflowStrategy)
    .via(connectionFlow)
    .map { responseContext =>
      log.debug(s"req_id = {}, command = {}: parse",
                responseContext.commandRequestId,
                responseContext.commandRequestString)
      val result = responseContext.parseResponse // レスポンスのパース
      responseContext.completePromise(result.toTry) // パース結果を返す
    }
    .viaMat(KillSwitches.single)(Keep.both)
    .toMat(Sink.ignore)(Keep.left)
    .run()

  override def shutdown(): Unit = killSwitch.shutdown()

  override def send[C <: CommandRequest](cmd: C): Task[cmd.Response] = Task.deferFutureAction { implicit ec =>
    val promise = Promise[CommandResponse]()
    requestQueue
      .offer(RequestContext(cmd, promise, ZonedDateTime.now()))
      .flatMap {
        case QueueOfferResult.Enqueued =>
          promise.future.map(_.asInstanceOf[cmd.Response])
        case QueueOfferResult.Failure(t) =>
          Future.failed(BufferOfferException("Failed to send request", Some(t)))
        case QueueOfferResult.Dropped =>
          Future.failed(
            BufferOfferException(
              s"Failed to send request, the queue buffer was full."
            )
          )
        case QueueOfferResult.QueueClosed =>
          Future.failed(BufferOfferException("Failed to send request, the queue was closed"))
      }
  }

}

TCPの送受信するにはtcpFlowメソッドが返すFlowに必要なByteStringを流して、ByteStringを受け取ればいいですが、リクエストとレスポンスを扱うためにもう少し工夫が必要です。そのためにconnectionFlowメソッドはtcpFlowメソッドを内部Flowとして利用します。そして、FlowはRequestContextResponseContextの型を利用します。その実装は以下のようなシンプルなものです。キューであるrequestQueueconnectionFlowにリクエストを送信します。ストリームの後半で返ってきたレスポンスを処理するという流れになっています。キューへのリクエストのエンキューはsendメソッドが行います。sendメソッドが呼ばれるとRequestContextを作ってrequestQueueにエンキュー、完了するとレスポンスを返します。requestQueueはストリームと繋がっていて、エンキューされたRequestContextconnectionFlowに渡され返ってきたResponseContextがレスポンス内容をパースし、その結果をPromise経由で返すというものです1。エラーハンドリングについて、エラーを起こしやすいところにRestartFlow.withBackoffを入れています。

final case class RequestContext(commandRequest: CommandRequest,
                                promise: Promise[CommandResponse],
                                requestAt: ZonedDateTime) {
  val id: UUID                     = commandRequest.id
  val commandRequestString: String = commandRequest.asString
}

final case class ResponseContext(byteString: ByteString,
                                 requestContext: RequestContext,
                                 requestsInTx: Seq[CommandRequest] = Seq.empty,
                                 responseAt: ZonedDateTime = ZonedDateTime.now)
    extends ResponseBase {

  val commandRequest: CommandRequest = requestContext.commandRequest

  def withRequestsInTx(values: Seq[CommandRequest]): ResponseContext = copy(requestsInTx = values)
  // レスポンスのパース
  def parseResponse: Either[ParseException, CommandResponse] = {
    requestContext.commandRequest match {
      case scr: CommandRequest =>
        scr.parse(ByteVector(byteString.toByteBuffer)).map(_._1)
    }
  }

}

コネクションオブジェクトの使い方は簡単です。ほとんどの場合は、このようにコネクションオブジェクトを直接操作せずに、高レベルなAPIを操作したいと思うはずです。そのためのクライアントオブジェクトはあとで述べます。

val connection = MemcachedConnection(
  PeerConfig(new InetSocketAddress("127.0.0.1", memcachedTestServer.getPort),
  backoffConfig = BackoffConfig(maxRestarts = 1)),
  None
)

val resultFuture = (for {
  _   <- connection.send(SetRequest(UUID.randomUUID(), "key1", "1", 10 seconds))
  _   <- connection.send(SetRequest(UUID.randomUUID(), "key2", "2", 10 seconds))
  gr1 <- connection.send(GetRequest(UUID.randomUUID(), "key1"))
  gr2 <- connection.send(GetRequest(UUID.randomUUID(), "key2"))
} yield (gr1, gr2)).runAsync
val result = Await.result(resultFuture, Duration.Inf) // (1, 2)

コマンドの実装

コマンドの一例はこんな感じです。 asStringメソッドは文字通りMemcachedにコマンドを送信するときに利用される文字列です。responseParserはfastparseで実装されたレスポンスパーサを指定します。parseResponseメソッドはMemcachedからのレスポンスを表現する構文木をコマンドのレンスポンスに変換するための関数です。これは少し複雑なのであとで説明します。

final class GetRequest private (val id: UUID, val key: String) extends CommandRequest with StringParsersSupport {

  override type Response = GetResponse
  override val isMasterOnly: Boolean = false

  override def asString: String = s"get $key"

  // レスポンスを解析するためのパーサを指定。レスポンス仕様に合わせて指定する
  override protected def responseParser: P[Expr] = P(retrievalCommandResponse)

  // ASTをコマンドレスポンスに変換する
  override protected def parseResponse: Handler = {
    case (EndExpr, next) =>
      (GetSucceeded(UUID.randomUUID(), id, None), next)
    case (ValueExpr(key, flags, length, casUnique, value), next) =>
      (GetSucceeded(UUID.randomUUID(), id, Some(ValueDesc(key, flags, length, casUnique, value))), next)
    case (ErrorExpr, next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.OtherType, None)), next)
    case (ClientErrorExpr(msg), next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ClientType, Some(msg))), next)
    case (ServerErrorExpr(msg), next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ServerType, Some(msg))), next)
  }

}

これはGetRequestの上位traitであるCommandRequestです。Memcachedのコマンド送信に必要なのはasStringメソッドだけで、コマンドのレスポンスのパースにはparseメソッドが利用されます。

trait CommandRequest {
  type Elem
  type Repr
  type P[+T] = core.Parser[T, Elem, Repr]

  type Response <: CommandResponse

  type Handler = PartialFunction[(Expr, Int), (Response, Int)]

  val id: UUID
  val key: String
  val isMasterOnly: Boolean

  def asString: String

  protected def responseParser: P[Expr]

  protected def convertToParseSource(s: ByteVector): Repr

  def parse(text: ByteVector, index: Int = 0): Either[ParseException, (Response, Int)] = {
    responseParser.parse(convertToParseSource(text), index) match {
      case f @ Parsed.Failure(_, index, _) =>
        Left(new ParseException(f.msg, index))
      case Parsed.Success(value, index) => Right(parseResponse((value, index)))
    }
  }

  protected def parseResponse: Handler

}

レスポンス・パーサの実装

レスポンスパーサは、Memcachedの仕様に合わせて以下の定義から適切なものを選んで利用します。詳しくは公式ドキュメントをみてください。標準のパーサーコンビネータと少し違うところがありますが、概ね似たような感じで書けます。Memcachedのレスポンスのルールは単純で左再帰除去などもしなくていいので、パーサーコンビネータのはじめての題材にはよいと思います。

object StringParsers {

  val digit: P0      = P(CharIn('0' to '9'))
  val lowerAlpha: P0 = P(CharIn('a' to 'z'))
  val upperAlpha: P0 = P(CharIn('A' to 'Z'))
  val alpha: P0      = P(lowerAlpha | upperAlpha)
  val alphaDigit: P0 = P(alpha | digit)
  val crlf: P0       = P("\r\n")

  val error: P[ErrorExpr.type]        = P("ERROR" ~ crlf).map(_ => ErrorExpr)
  val clientError: P[ClientErrorExpr] = P("CLIENT_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ClientErrorExpr)
  val serverError: P[ServerErrorExpr] = P("SERVER_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ServerErrorExpr)
  val allErrors: P[Expr]              = P(error | clientError | serverError)

  val end: P[EndExpr.type]             = P("END" ~ crlf).map(_ => EndExpr)
  val deleted: P[DeletedExpr.type]     = P("DELETED" ~ crlf).map(_ => DeletedExpr)
  val stored: P[StoredExpr.type]       = P("STORED" ~ crlf).map(_ => StoredExpr)
  val notStored: P[NotStoredExpr.type] = P("NOT_STORED" ~ crlf).map(_ => NotStoredExpr)
  val exists: P[ExistsExpr.type]       = P("EXISTS" ~ crlf).map(_ => ExistsExpr)
  val notFound: P[NotFoundExpr.type]   = P("NOT_FOUND" ~ crlf).map(_ => NotFoundExpr)
  val touched: P[TouchedExpr.type]     = P("TOUCHED" ~ crlf).map(_ => TouchedExpr)

  val key: P[String]     = (!" " ~/ AnyChar).rep(1).!
  val flags: P[Int]      = digit.rep(1).!.map(_.toInt)
  val bytes: P[Long]     = digit.rep(1).!.map(_.toLong)
  val casUnique: P[Long] = digit.rep(1).!.map(_.toLong)

  val incOrDecCommandResponse: P[Expr] = P(notFound | ((!crlf ~/ AnyChar).rep.! ~ crlf).map(StringExpr) | allErrors)
  val storageCommandResponse: P[Expr]  = P((stored | notStored | exists | notFound) | allErrors)

  val value: P[ValueExpr] =
    P("VALUE" ~ " " ~ key ~ " " ~ flags ~ " " ~ bytes ~ (" " ~ casUnique).? ~ crlf ~ (!crlf ~/ AnyChar).rep.! ~ crlf)
      .map {
        case (key, flags, bytes, cas, value) =>
          ValueExpr(key, flags, bytes, cas, value)
      }

  val version: P[VersionExpr] = P("VERSION" ~ " " ~ alphaDigit.rep(1).!).map(VersionExpr)

  val retrievalCommandResponse: P[Expr] = P(end | value | allErrors)

  val deletionCommandResponse: P[Expr] = P(deleted | notFound | allErrors)

  val touchCommandResponse: P[Expr] = P(touched | notFound | allErrors)

  val versionCommandResponse: P[Expr] = P(version | allErrors)
}

クライアントの実装

次はクライアント実装。MemcachedClient#sendメソッドはコマンドを受け取り、ReaderTTaskMemcachedConnection[cmd.Response]を返します。ReaderTTaskMemcachedConnectionはMemcachedのために特化したReaderTで、コネクションを受け取り、レスポンスを返すTaskを返します。Memcachedのための高レベルなAPIはこのsendメソッドを利用して実装されます。get,setなどのメソッドの内部でコマンドを作成し、レスポンスを戻り値にマッピングするだけで、使いやすい高レベルAPIになります。また、ReaderTを返すためfor式での簡潔な記述もできます。

package object memcached {

  type ReaderTTask[C, A]                  = ReaderT[Task, C, A]
  type ReaderTTaskMemcachedConnection[A]  = ReaderTTask[MemcachedConnection, A]
  type ReaderMemcachedConnection[M[_], A] = ReaderT[M, MemcachedConnection, A]

}
final class MemcachedClient()(implicit system: ActorSystem) {

  def send[C <: CommandRequest](cmd: C): ReaderTTaskMemcachedConnection[cmd.Response] = ReaderT(_.send(cmd))

  def get(key: String): ReaderTTaskMemcachedConnection[Option[ValueDesc]] =
    send(GetRequest(UUID.randomUUID(), key)).flatMap {
      case GetSucceeded(_, _, result) => ReaderTTask.pure(result)
      case GetFailed(_, _, ex)        => ReaderTTask.raiseError(ex)
    }

  def set[A: Show](key: String,
                   value: A,
                   expireDuration: Duration = Duration.Inf,
                   flags: Int = 0): ReaderTTaskMemcachedConnection[Int] =
    send(SetRequest(UUID.randomUUID(), key, value, expireDuration, flags)).flatMap {
      case SetExisted(_, _)    => ReaderTTask.pure(0)
      case SetNotFounded(_, _) => ReaderTTask.pure(0)
      case SetNotStored(_, _)  => ReaderTTask.pure(0)
      case SetSucceeded(_, _)  => ReaderTTask.pure(1)
      case SetFailed(_, _, ex) => ReaderTTask.raiseError(ex)
    }

   // ...

}
  • for式での利用例
val client = new MemcachedClient()
val resultFuture = (for {
  _  <- client.set(key, "1")
  r1 <- client.get(key)
  _  <- client.set(key, "2")
  r2 <- client.get(key)
} yield (r1, r2)).run(connection).runAsync
val result = Await.result(resultFuture, Duration.Inf) // (1, 2)

コネクションプール

コネクションとクライアントを実装できたら、次はコネクションプールも欲しくなります。MemcachedConnectionをプーリングして選択できればいいわけなので、commons-poolなど使えば楽ですね。コネクションをプールするアルゴリズム(HashRingなど)も複数考えられるので、工夫するとなかなか面白いです。

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val pool = MemcachedConnectionPool.ofSingleRoundRobin(sizePerPeer = 5, peerConfig, RedisConnection(_)) // powered by RoundRobinPool
val connection = MemcachedConnection(connectionConfig)
val client = MemcachedClient()

// ローンパターン形式
val resultFuture1 = pool.withConnectionF{ con =>
  (for{
    _ <- client.set("foo", "bar")
    r <- client.get("foo")
  } yield r).run(con) 
}.runAsync

// モナド形式
val resultFuture2 = (for {
  _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run)
  r <- ConnectionAutoClose(pool)(client.get("foo").run)
} yield r).run().runAsync

まとめ

というわけでこんな風にakka-streamを使えばTCPクライアントも比較的簡単に実装できると思います。参考にしてみてください。


  1. バックプレッシャを適切にハンドリングするには、ストリームを常に起動した状態にしておく必要があります、要求ごとにストリームを起動していると効果的ではありません))

「エンティティの同一性を表現するためにequalsをオーバーライドすべきか否か」の感想

毎日、ドメイン駆動設計というか、設計の話が投稿されると、楽しくなりますね。

さて、今日の話題は、以下です!

エンティティの同一性を表現するためにequalsをオーバーライドすべきか否か

”稀によくあるサンプル”。多分これ僕が書いた事例ですね。ということで、なぜそうしたか、理由など書いておきたいと思います。

なぜこうしたか

equalsの責務は以下のとおりで、オブジェクトが等しいかどうかを示すものです。エンティティの等価判定の基準に、識別子以外の属性は含めていません。 これにはトレードオフがあります。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Object.html Object#equalsの責務は、「このオブジェクトと他のオブジェクトが等しいかどうかを示します。」

以下のように実装したと場合に、(ID以外の)属性が変わったエンティティを特定することが可能です。

scala> case class EmployeeId(value: Int) extends Identifier
defined class EmployeeId

scala> case class Employee(identifier: EmployeeId, name: String) extends Entity[EmployeeId]
defined class Employee

scala> val list = Seq(Employee(EmployeeId(1), "yamada taro"), Employee(EmployeeId(2), "yamada hanako"))
list: Seq[Employee] = List(Employee(EmployeeId(1),yamada taro), Employee(EmployeeId(2),yamada hanako))

// "yamada hanako"が結婚して名前が変わるイベントが発生、リストの内容も合わせて変更される

scala> list.contains(Employee(EmployeeId(2), "yamada hanako"))
res0: Boolean = true

Entity#sameIdentityAsの場合は、 コレクションのメソッドに組み込むことはできないのでそれなりに実装を工夫する必要がありますね。

論理的等価性検査としてのequals

上記のメリットはよいとして、一体equalsメソッドとして何を求めるているのか。DDDの観点があろうがなかろうか、equalsの契約を逸脱する設計をしないようにすべきと考えます。久しぶりに、Effective Javaを開いてみると、equalsは「論理的等価性」検査の責務を持つというような記述はあるものの、「論理的等価性」が具体的に何かは明記されていませんでした。

ちなみにjava.lang.Objectのデフォルト実装は以下となっています。多くの場合はサブクラスで固有のequalsメソッドとしてオーバーライドされますが、されない場合もあります。java.util.Randomは、同じ乱数列を生成するかで等価判定できたはずですが、クライアントがそれを求めてなかったので、デフォルト実装のままとなっているようです。

public boolean equals(Object obj) {
  return (this == obj);
}

契約プログラミングの観点から

契約プログラミングの観点では、java.lan.Object#equalsのJavadocに書かれていること以上のことは求めてはいけません。つまり、一般契約に従っている以上は「オブジェクトが等しいかどうか」の仕様を満たしているはずです。

DDDの文脈では、等価性は(エンティティが持つ)値がすべて同じであればtrueとみなし、同一性は識別子が同じであればtrueとみなすものと解釈できそうです。つまり、エンティティの定義である「同一性によって定義されるオブジェクト」に照らすと equals をオーバーライドすることは適切ではありません。 この点では、本来オーバーライドすべきは eq と言えるかもしれません。

このアイデアは有益ですしこの設計を取ってもよいと思っていますが、java.lan.Object#equalsの契約に照らして考えるに「適切ではありません」とまでは言い切れないと考えています。

結局どうすべきか

まぁ身の蓋もないですが、こうすべきだと思っています。(ここでオーバーライドするという意味は、IDのみの等価判断する実装としてオーバーライドするかしないかという意味です)

また、equalshashCodeのようにもともと抽象度の高いインターフェイスは意図がわかりにくいことがあります。対策としては、ドキュメンテーションコメントに自然言語できちんと設計の意図を記述するべきで、利用者も求められる契約が何かを理解すべきだと思っています。(自壊の念を込めて)

実装を追わなければ equals がオーバーライドされていることが分からないことは、余分な意識コストとなってしまうでしょう。