かとじゅんの技術日誌

技術の話をするところ

Scala.js ウェブフレームワーク Facade 調査(2019年4月版)

主要なウェブフレームワークのFacadeについて調査した(2019/4/17日時点)。

React.js

Reactバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 タグ形式 ReactRouterサポート Reduxサポート 備考
16.6+ 2.12,2.11 japgolly/scalajs-react 1196 1時間前 Scalatags形式 Scala.js独自実装 サポートなし
16.8.1 2.12 shadaj/slinky 276 4日前 case class なし なし japgolly/scalajs-reactに依存, ReactNative対応
16.8.1+ 2.12 aappddeevv/scalajs-reaction 17 25日前 case class あり あり
15.5.3 2.12 shogowada/scalajs-reactjs 23 2017/5 Scalatags形式 あり あり
0.11.0 2.11 xored/scala-js-react 133 2015/2 scalax なし なし

所感

Vue.js

Vueバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 Vue Routerサポート Vuexサポート 備考
2.x 2.11,2.12 random-scalor/scala-js-vue 3 2018/2 あり あり
2.x 2.12 massung/scala-js-vue 12 2017/11 なし なし
2.x 2.11 fancellu/scalajs-vue 76 2017/5 なし なし

所感

Angular

  • 最新のAngularを使えるFacadeを見つけることができなかった…。

まとめ

Scala.jsでウェブアプリケーションを作るならscalajs-react一択…。他は自分でがっつりコントリビュートする気がないと、いろいろ大変そうです。 ということで、サンプルプロジェクトを二種類用意したので、興味があれば確認してみてください。

https://github.com/j5ik2o/scalajs-react-webpack4-example https://github.com/j5ik2o/scalajs-vuejs-webpack4-example

ID生成方法についてあれこれ

ID生成について聞かれることが多いので、独自の観点でまとめてみます。タイトルは適当です…。 DBはMySQL(InnoDB)を想定しています。あしからず。

ID生成を知りたいなら

ID生成に関しては以下の記事がよく纏まっているので参考にしてみてください。値形式など詳しく書かれています。

ID生成方法

以下のID生成方法は、お手軽に採用しやすいもの順で列挙します。

DB採番/連番型

AUTO_INCREMENT

DBのAUTO_INCREMENTで採番する方法。

Pros

  • 数値型で扱える
    • 普通は64ビットの整数型を採用することが多い
  • 単調増加する連番なので、ソート可能でかつインデックスの空間効率がよい
  • 単調増加するので、キャパシティを予測しやすい
    • 64ビットあればあまり気にすることもないと思うが…

Cons

  • DBとのネットワークI/O、DBでのディスクI/O分のレイテンシーを想定する必要がある、まぁ当然ですが…
  • ID生成がDB(Master)1台に依存するので、SPoFになりやすい
    • DB依存のアプリケーションならこのSPoFは想定内ではあると思います
  • 保存前にエンティティのIDを確定できず、IDに値があるかないかという判定ロジックが煩雑になりがち(初期値が未定問題)
    • AUTO_INCREMENTのため(書き込み側の都合)に User#idがOption[Long] を採用した場合は、読み込み側では必ず値があるので邪魔になります。書き込み用と読み込み用のモデルをいちいち用意するもの大袈裟です。
case class User(id: Option[Long], name: String, ...)

object UserDao {
  def findById(id: Long): Option[User] = ???
}

def getUserName(id: Long): Option[(Long, String)] = {
  UserDao.findById(id).map { result =>
    (result.id, result.name) // これは期待する戻り値と異なる型なのでコンパイルエラー。読み込み側に書き込み側の都合が影響する…。
  }
}
  • 連番(/users/:idなど)はディレクトリスキャンやレコード数予測に使われる可能性がある。
    • これは対策する方法があります。後述します。

事前採番方式

AUTO_INCREMENTの初期値が未定問題だけを 事前採番 で改善する方式(その他のConsは改善されません)。一般的に事前採番であればシーケンスを使うと思います。PostgreSQLなら使えますね。MySQLでは以下のように採番テーブルを用意しLAST_INSERT_IDを使うと採番可能です(ストレージエンジンはMyISAM)。

CREATE TABLE `user_id_seq`(id bigint unsigned NOT NULL) ENGINE=MyISAM;
UPDATE user_id_seq SET id = LAST_INSERT_ID(id+1)
SELECT LAST_INSERT_ID() AS id

こちらの記事も参考してください。

Pros

  • 初期値未定問題を回避できる
  • 数値型で扱える
  • 単調増加する連番なので、ソート可能でかつインデックスの空間効率がよい
  • 単調増加するので、キャパシティを予測しやすい

Cons

  • AUTO_INCREMENTと比べるとI/O回数が増えてしまう
  • DBとのネットワークI/O、DBでのディスクI/O分のレイテンシーを想定する必要がある。
  • ID生成がDB(Master)1台に依存するので、SPoFになりやすい
  • 連番(/users/:idなど)はディレクトリスキャンやレコード数予測に使われる可能性がある。これは対策する方法が(ry

連番型 + Hashids

ID生成とは直接関係ないですが、連番型で生成したIDをHashidsを使うとハッシュ値として隠蔽できます。IDとハッシュ値を変換テーブルを用意することなく相互に変換可能です。クエリのID条件にはデコードした数値が使えます。

val hashids = new Hashids("this is my salt")
val id = hashids.encode(1) // 1 -> gB0NV05e
val number = hashids.decode(id) // gB0NV05e -> 1

Pros/Consは、連番を隠蔽できる以外は上述と同じなので省略。

非DB採番/文字列型

UUID

仕様としてはv1~v5までありますが、よく使われるのはv1かv4あたり。

UUID version1の生成アルゴリズムをみると分かりますが、まずタイムスタンプは下位と上位が逆転しているのと、ランダム成分が含まれる場合があるので、ソートは不能と考えてよいです。UUID v4の形式はそもそもランダム成分なのでソート不能です。

Pros

  • ディスクやネットワークを使わずに、各アプリケーション内でID生成できるので、スケールしやすい&SPoFがない

Cons

  • 数値型(64bit Longなど)で扱えない(128bitを文字列型として扱う)
  • 乱数成分の影響で、ソート不能でかつインデックスの空間効率が悪い
  • MySQL(InnoDB)で100万件以上扱う場合は、INSERT時間のペナルティが大きくなります(以下に詳しく述べます)
プライマリキーにUUIDを採用した場合のINSERT時間のペナルティ

とりあえず、プライマリキーにUUIDを指定した場合の実験結果を示す、以下の記事をみましょう。1

最初の二つのグラフをみると、INSERT時間が増えていくことがわかります。レコード数が多くなるほどその影響は顕著になります。INSERT時間が遅くなる直接の原因は分からないですが…。

以下の記事も参考にあると思います。

過去に「性能劣化は2割程度」というどこかの記事を読んだことがありますが、2割ってかなり大きいです…。それなりの規模のデータを扱う場合は要注意ですね。

また、タイムシリーズでデータが読み書きされる場合、直近のデータに対するインデックスはインデックス空間上の近い位置(リーフ)に格納されていてほしいですが、UUIDだとソートできないのでインデックスの位置がフラグメントするというか、分散します。こういうケースだと、おそらくSELECTも非効率になるので要注意です。

ULID

上記のUUIDの欠点をカバーするのが、ULIDです。ULIDには乱数成分が含まれますが、先頭にタイムスタンプ成分があるのでソート可能です。詳しくは https://github.com/ulid/spec を参照してください。様々な言語向けの実装がありますが、ロジック自体は難しくないので対応していない言語でも移植は比較的容易だと思います。

Pros

  • ディスクやネットワークを使わずに、各アプリケーション内でID生成できるので、スケールしやすい&SPoFがない
  • UUIDと違って、先頭48bitを使ってソート可能(1ミリ秒以内に280を超えない限り)
  • MySQL(InnoDB)での空間効率は、UUIDよりよいはず、たぶん。(1ミリ秒以内に280を超えない限り)

Cons

  • 数値型で扱えない(文字列型として扱う)

非DB採番/数値型

DBを利用せずに64ビットLong型のIDを生成する方法があります。 ここからは少し話しがデカくなります。

こういう目的で利用されるのが分散IDワーカーです。僕の職場でも開発・運用されています。

もともとの実装は、twitter/snowflakeです。もうアーカイブされてから久しいですが。

IDの形式は以下です。

  • 41bit タイムスタンプ
  • 10bit マシンID (データセンタID + ワーカーID)
  • 12bit シーケンス
    • 1ミリ秒以内にタイムスタンプが重複する場合にカウントアップされる

Pros

  • 64bit整数値
  • 連続性がある(ソート可能およびインデックスの空間効率がよい)
  • SPoFがない(DBなどに頼らない)

Cons

  • IDワーカーのID管理が面倒(重複するとIDの重複に繋がる)
IDワーカーのID管理

snowflakeの実装はアプリケーション内で利用することができますが、ワーカーIDが5ビットなので32インスタンスまでしか作れません。つまり、インプロセスでID生成器を使った場合は、そのプロセスは32インスタンスまでしかスケールさせられないという制限を持つことになります…。2 まぁ、あまり好ましいとは言えないですね。

そもそもオートスケーリングな環境下でどのホストにどのワーカーIDを割り当てるか? そのインスタンスが突然死した場合ワーカーIDを再利用させるか、など様々な分散環境でのリソース管理の問題がでてきます。こういった課題をsnowflakeではzookeeperを使って解決しています(ウチもzookeeperで管理してます)

zookeeperを使う方法以外には、 JVM限定の話になりますが、akka-cluster を使ってワーカーIDを管理する方法もあります。どちらもハードルは高いですが…。実装例は以下です。

詳しくは以下の資料を参考にしてください。

送信するコマンドメッセージにワーカーIDを含めると、対応するワーカーIDのIDワーカーアクターにメッセージが届くという仕組みです。このアクターは起動していなければ自動的に起動し、 akka-cluster-sharding によってノード上に自動的に分散されます。

https://github.com/TanUkkii007/reactive-snowflake/blob/master/reactive-snowflake-cluster/src/main/scala/org/tanukkii/reactive/snowflake/ShardedIdRouter.scala#L31-L37

TanUkkii007(たぬっきーパイセン)の実装で面白いところ

他にも工夫がされている点があります。1ミリ秒あたり212まで採番できるが、twitterの実装ではシーケンスが桁溢れすると次のタイムスタンプまでブロックする。しかし、reactive-snowflakeでは、採番できなかったものとし自分自身のアクターにリトライメッセージを送信し再度採番するようになっている。つまりブロッキングしないようになっている。

まとめ

  • DB採番/数値型
    • ID生成がDBに依存する
    • ソート可能な数値型IDを生成できる
    • 性能を問われる大量のリクエストには向いていない
      • ID生成時にネットワークやディスクのI/Oが生じる
      • ID生成器=DB(Master)1台なのでボトルネックが生じやすい
  • 非DB採番/文字列型
    • ID生成がDBに依存しない
    • 文字列型IDをアプリケーション内で分散して生成できる
    • UUIDはソート不能だが、ULIDはソート可能なIDを生成できる
  • 非DB採番/数値型(分散IDワーカー)
    • ID生成がDBに依存しない
    • ソート可能な数値型IDを特定のノードで分散して生成できる
      • ID生成器を外部ノード上に配置した場合は、アプリケーションからはネットワークI/Oは生じる
    • IDワーカーIDを管理する仕組みが必要

以下のような条件に合致するなら、ULIDで十分でしょう。

  • IDは数値型でなくてよい
  • ソート可能
  • 分散してID生成可能(SPoFがない)
  • 大掛かりな仕組みが必要ない

  1. UUIDのバージョンは不明ですが、乱数成分が原因ではないかと思います。

  2. データセンターIDをうまく使えばもっと多いインスタンスを立ち上げれますが、いずれにしても上限はあまり考えたくない…。

akka-streamを使ってmemcachedクライアントを作るには

かなり前になるのですが、akka-streamの習作課題として、Memcachedクライアントを実装したので、概要を解説する記事をまとめます。詳しくはgithubをみてください。

https://github.com/j5ik2o/reactive-memcached

TCPのハンドリング方法

akkaでTCPをハンドリングするには、以下の二つになる。おそらく

  1. akka.actorakka.ioのパッケージを使う方式。つまり、アクターでTCP I/Oを実装する方法。
  2. akka.streamTcp#outgoingConnectionというAPIを使う方法

今回は、akka-streamをベースにしたかったので、2を採用しました。

outgoingConnectionメソッドの戻り値の型は、Flow[ByteString, ByteString, Future[OutgoingConnection]]です。ByteStringを渡せば、ByteStringが返ってくるらしい。わかりやすいですね。そしてストリームが起動中であれば、コネクションは常時接続された状態になります。

def outgoingConnection(
  remoteAddress : InetSocketAddress,
  localAddress : Option[InetSocketAddress],
  options : Traversable[SocketOption], 
  halfClose : Boolean,
  connectTimeout : Duration,
  idleTimeout : Duration) : Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]]

Memcachedのコネクションを表現するオブジェクトを実装する

では、早速 Memcachedのコネクションを表現するオブジェクトである MemcachedConnection を実装します。このオブジェクトを生成するとTCP接続され、破棄されると切断されます。そして、考えたインターフェイスは以下です(別にtrait切り出さなくてもよかったですが、説明のために分けました)。sendメソッドにコマンドを指定して呼ぶとレスポンスが返る単純なものです。 ちなみに、sendメソッドの戻り値の型はmonix.eval.Taskです。詳しくは https://monix.io/docs/3x/eval/task.html をご覧ください。

trait MemcachedConnection {
  def id: UUID // 接続ID
  def shutdown(): Unit // シャットダウン
  def peerConfig: Option[PeerConfig] // 接続先設定

  def send[C <: CommandRequest](cmd: C): Task[cmd.Response] // コマンドの送信

  // ...
}

実装はどうなるか。全貌は以下。先ほどのTcp#outgoingConnectionを使います。

private[memcached] class MemcachedConnectionImpl(_peerConfig: PeerConfig,
                                                 supervisionDecider: Option[Supervision.Decider])(
    implicit system: ActorSystem
) extends MemcachedConnection {

  // ...

  private implicit val mat: ActorMaterializer = ...

  protected val tcpFlow: Flow[ByteString, ByteString, NotUsed] =
    RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
      Tcp()
        .outgoingConnection(remoteAddress, localAddress, options, halfClose, connectTimeout, idleTimeout)
    }

  protected val connectionFlow: Flow[RequestContext, ResponseContext, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val requestFlow = b.add(
        Flow[RequestContext]
          .map { rc =>
            log.debug(s"request = [{}]", rc.commandRequestString)
            (ByteString.fromString(rc.commandRequest.asString + "\r\n"), rc)
          }
      )
      val responseFlow = b.add(Flow[(ByteString, RequestContext)].map {
        case (byteString, requestContext) =>
          log.debug(s"response = [{}]", byteString.utf8String)
          ResponseContext(byteString, requestContext)
      })
      val unzip = b.add(Unzip[ByteString, RequestContext]())
      val zip   = b.add(Zip[ByteString, RequestContext]())
      requestFlow.out ~> unzip.in
      unzip.out0 ~> tcpFlow ~> zip.in0
      unzip.out1 ~> zip.in1
      zip.out ~> responseFlow.in
      FlowShape(requestFlow.in, responseFlow.out)
    })

  protected val (requestQueue: SourceQueueWithComplete[RequestContext], killSwitch: UniqueKillSwitch) = Source
    .queue[RequestContext](requestBufferSize, overflowStrategy)
    .via(connectionFlow)
    .map { responseContext =>
      log.debug(s"req_id = {}, command = {}: parse",
                responseContext.commandRequestId,
                responseContext.commandRequestString)
      val result = responseContext.parseResponse // レスポンスのパース
      responseContext.completePromise(result.toTry) // パース結果を返す
    }
    .viaMat(KillSwitches.single)(Keep.both)
    .toMat(Sink.ignore)(Keep.left)
    .run()

  override def shutdown(): Unit = killSwitch.shutdown()

  override def send[C <: CommandRequest](cmd: C): Task[cmd.Response] = Task.deferFutureAction { implicit ec =>
    val promise = Promise[CommandResponse]()
    requestQueue
      .offer(RequestContext(cmd, promise, ZonedDateTime.now()))
      .flatMap {
        case QueueOfferResult.Enqueued =>
          promise.future.map(_.asInstanceOf[cmd.Response])
        case QueueOfferResult.Failure(t) =>
          Future.failed(BufferOfferException("Failed to send request", Some(t)))
        case QueueOfferResult.Dropped =>
          Future.failed(
            BufferOfferException(
              s"Failed to send request, the queue buffer was full."
            )
          )
        case QueueOfferResult.QueueClosed =>
          Future.failed(BufferOfferException("Failed to send request, the queue was closed"))
      }
  }

}

TCPの送受信するにはtcpFlowメソッドが返すFlowに必要なByteStringを流して、ByteStringを受け取ればいいですが、リクエストとレスポンスを扱うためにもう少し工夫が必要です。そのためにconnectionFlowメソッドはtcpFlowメソッドを内部Flowとして利用します。そして、FlowはRequestContextResponseContextの型を利用します。その実装は以下のようなシンプルなものです。キューであるrequestQueueconnectionFlowにリクエストを送信します。ストリームの後半で返ってきたレスポンスを処理するという流れになっています。キューへのリクエストのエンキューはsendメソッドが行います。sendメソッドが呼ばれるとRequestContextを作ってrequestQueueにエンキュー、完了するとレスポンスを返します。requestQueueはストリームと繋がっていて、エンキューされたRequestContextconnectionFlowに渡され返ってきたResponseContextがレスポンス内容をパースし、その結果をPromise経由で返すというものです1。エラーハンドリングについて、エラーを起こしやすいところにRestartFlow.withBackoffを入れています。

final case class RequestContext(commandRequest: CommandRequest,
                                promise: Promise[CommandResponse],
                                requestAt: ZonedDateTime) {
  val id: UUID                     = commandRequest.id
  val commandRequestString: String = commandRequest.asString
}

final case class ResponseContext(byteString: ByteString,
                                 requestContext: RequestContext,
                                 requestsInTx: Seq[CommandRequest] = Seq.empty,
                                 responseAt: ZonedDateTime = ZonedDateTime.now)
    extends ResponseBase {

  val commandRequest: CommandRequest = requestContext.commandRequest

  def withRequestsInTx(values: Seq[CommandRequest]): ResponseContext = copy(requestsInTx = values)
  // レスポンスのパース
  def parseResponse: Either[ParseException, CommandResponse] = {
    requestContext.commandRequest match {
      case scr: CommandRequest =>
        scr.parse(ByteVector(byteString.toByteBuffer)).map(_._1)
    }
  }

}

コネクションオブジェクトの使い方は簡単です。ほとんどの場合は、このようにコネクションオブジェクトを直接操作せずに、高レベルなAPIを操作したいと思うはずです。そのためのクライアントオブジェクトはあとで述べます。

val connection = MemcachedConnection(
  PeerConfig(new InetSocketAddress("127.0.0.1", memcachedTestServer.getPort),
  backoffConfig = BackoffConfig(maxRestarts = 1)),
  None
)

val resultFuture = (for {
  _   <- connection.send(SetRequest(UUID.randomUUID(), "key1", "1", 10 seconds))
  _   <- connection.send(SetRequest(UUID.randomUUID(), "key2", "2", 10 seconds))
  gr1 <- connection.send(GetRequest(UUID.randomUUID(), "key1"))
  gr2 <- connection.send(GetRequest(UUID.randomUUID(), "key2"))
} yield (gr1, gr2)).runAsync
val result = Await.result(resultFuture, Duration.Inf) // (1, 2)

コマンドの実装

コマンドの一例はこんな感じです。 asStringメソッドは文字通りMemcachedにコマンドを送信するときに利用される文字列です。responseParserはfastparseで実装されたレスポンスパーサを指定します。parseResponseメソッドはMemcachedからのレスポンスを表現する構文木をコマンドのレンスポンスに変換するための関数です。これは少し複雑なのであとで説明します。

final class GetRequest private (val id: UUID, val key: String) extends CommandRequest with StringParsersSupport {

  override type Response = GetResponse
  override val isMasterOnly: Boolean = false

  override def asString: String = s"get $key"

  // レスポンスを解析するためのパーサを指定。レスポンス仕様に合わせて指定する
  override protected def responseParser: P[Expr] = P(retrievalCommandResponse)

  // ASTをコマンドレスポンスに変換する
  override protected def parseResponse: Handler = {
    case (EndExpr, next) =>
      (GetSucceeded(UUID.randomUUID(), id, None), next)
    case (ValueExpr(key, flags, length, casUnique, value), next) =>
      (GetSucceeded(UUID.randomUUID(), id, Some(ValueDesc(key, flags, length, casUnique, value))), next)
    case (ErrorExpr, next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.OtherType, None)), next)
    case (ClientErrorExpr(msg), next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ClientType, Some(msg))), next)
    case (ServerErrorExpr(msg), next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ServerType, Some(msg))), next)
  }

}

これはGetRequestの上位traitであるCommandRequestです。Memcachedのコマンド送信に必要なのはasStringメソッドだけで、コマンドのレスポンスのパースにはparseメソッドが利用されます。

trait CommandRequest {
  type Elem
  type Repr
  type P[+T] = core.Parser[T, Elem, Repr]

  type Response <: CommandResponse

  type Handler = PartialFunction[(Expr, Int), (Response, Int)]

  val id: UUID
  val key: String
  val isMasterOnly: Boolean

  def asString: String

  protected def responseParser: P[Expr]

  protected def convertToParseSource(s: ByteVector): Repr

  def parse(text: ByteVector, index: Int = 0): Either[ParseException, (Response, Int)] = {
    responseParser.parse(convertToParseSource(text), index) match {
      case f @ Parsed.Failure(_, index, _) =>
        Left(new ParseException(f.msg, index))
      case Parsed.Success(value, index) => Right(parseResponse((value, index)))
    }
  }

  protected def parseResponse: Handler

}

レスポンス・パーサの実装

レスポンスパーサは、Memcachedの仕様に合わせて以下の定義から適切なものを選んで利用します。詳しくは公式ドキュメントをみてください。標準のパーサーコンビネータと少し違うところがありますが、概ね似たような感じで書けます。Memcachedのレスポンスのルールは単純で左再帰除去などもしなくていいので、パーサーコンビネータのはじめての題材にはよいと思います。

object StringParsers {

  val digit: P0      = P(CharIn('0' to '9'))
  val lowerAlpha: P0 = P(CharIn('a' to 'z'))
  val upperAlpha: P0 = P(CharIn('A' to 'Z'))
  val alpha: P0      = P(lowerAlpha | upperAlpha)
  val alphaDigit: P0 = P(alpha | digit)
  val crlf: P0       = P("\r\n")

  val error: P[ErrorExpr.type]        = P("ERROR" ~ crlf).map(_ => ErrorExpr)
  val clientError: P[ClientErrorExpr] = P("CLIENT_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ClientErrorExpr)
  val serverError: P[ServerErrorExpr] = P("SERVER_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ServerErrorExpr)
  val allErrors: P[Expr]              = P(error | clientError | serverError)

  val end: P[EndExpr.type]             = P("END" ~ crlf).map(_ => EndExpr)
  val deleted: P[DeletedExpr.type]     = P("DELETED" ~ crlf).map(_ => DeletedExpr)
  val stored: P[StoredExpr.type]       = P("STORED" ~ crlf).map(_ => StoredExpr)
  val notStored: P[NotStoredExpr.type] = P("NOT_STORED" ~ crlf).map(_ => NotStoredExpr)
  val exists: P[ExistsExpr.type]       = P("EXISTS" ~ crlf).map(_ => ExistsExpr)
  val notFound: P[NotFoundExpr.type]   = P("NOT_FOUND" ~ crlf).map(_ => NotFoundExpr)
  val touched: P[TouchedExpr.type]     = P("TOUCHED" ~ crlf).map(_ => TouchedExpr)

  val key: P[String]     = (!" " ~/ AnyChar).rep(1).!
  val flags: P[Int]      = digit.rep(1).!.map(_.toInt)
  val bytes: P[Long]     = digit.rep(1).!.map(_.toLong)
  val casUnique: P[Long] = digit.rep(1).!.map(_.toLong)

  val incOrDecCommandResponse: P[Expr] = P(notFound | ((!crlf ~/ AnyChar).rep.! ~ crlf).map(StringExpr) | allErrors)
  val storageCommandResponse: P[Expr]  = P((stored | notStored | exists | notFound) | allErrors)

  val value: P[ValueExpr] =
    P("VALUE" ~ " " ~ key ~ " " ~ flags ~ " " ~ bytes ~ (" " ~ casUnique).? ~ crlf ~ (!crlf ~/ AnyChar).rep.! ~ crlf)
      .map {
        case (key, flags, bytes, cas, value) =>
          ValueExpr(key, flags, bytes, cas, value)
      }

  val version: P[VersionExpr] = P("VERSION" ~ " " ~ alphaDigit.rep(1).!).map(VersionExpr)

  val retrievalCommandResponse: P[Expr] = P(end | value | allErrors)

  val deletionCommandResponse: P[Expr] = P(deleted | notFound | allErrors)

  val touchCommandResponse: P[Expr] = P(touched | notFound | allErrors)

  val versionCommandResponse: P[Expr] = P(version | allErrors)
}

クライアントの実装

次はクライアント実装。MemcachedClient#sendメソッドはコマンドを受け取り、ReaderTTaskMemcachedConnection[cmd.Response]を返します。ReaderTTaskMemcachedConnectionはMemcachedのために特化したReaderTで、コネクションを受け取り、レスポンスを返すTaskを返します。Memcachedのための高レベルなAPIはこのsendメソッドを利用して実装されます。get,setなどのメソッドの内部でコマンドを作成し、レスポンスを戻り値にマッピングするだけで、使いやすい高レベルAPIになります。また、ReaderTを返すためfor式での簡潔な記述もできます。

package object memcached {

  type ReaderTTask[C, A]                  = ReaderT[Task, C, A]
  type ReaderTTaskMemcachedConnection[A]  = ReaderTTask[MemcachedConnection, A]
  type ReaderMemcachedConnection[M[_], A] = ReaderT[M, MemcachedConnection, A]

}
final class MemcachedClient()(implicit system: ActorSystem) {

  def send[C <: CommandRequest](cmd: C): ReaderTTaskMemcachedConnection[cmd.Response] = ReaderT(_.send(cmd))

  def get(key: String): ReaderTTaskMemcachedConnection[Option[ValueDesc]] =
    send(GetRequest(UUID.randomUUID(), key)).flatMap {
      case GetSucceeded(_, _, result) => ReaderTTask.pure(result)
      case GetFailed(_, _, ex)        => ReaderTTask.raiseError(ex)
    }

  def set[A: Show](key: String,
                   value: A,
                   expireDuration: Duration = Duration.Inf,
                   flags: Int = 0): ReaderTTaskMemcachedConnection[Int] =
    send(SetRequest(UUID.randomUUID(), key, value, expireDuration, flags)).flatMap {
      case SetExisted(_, _)    => ReaderTTask.pure(0)
      case SetNotFounded(_, _) => ReaderTTask.pure(0)
      case SetNotStored(_, _)  => ReaderTTask.pure(0)
      case SetSucceeded(_, _)  => ReaderTTask.pure(1)
      case SetFailed(_, _, ex) => ReaderTTask.raiseError(ex)
    }

   // ...

}
  • for式での利用例
val client = new MemcachedClient()
val resultFuture = (for {
  _  <- client.set(key, "1")
  r1 <- client.get(key)
  _  <- client.set(key, "2")
  r2 <- client.get(key)
} yield (r1, r2)).run(connection).runAsync
val result = Await.result(resultFuture, Duration.Inf) // (1, 2)

コネクションプール

コネクションとクライアントを実装できたら、次はコネクションプールも欲しくなります。MemcachedConnectionをプーリングして選択できればいいわけなので、commons-poolなど使えば楽ですね。コネクションをプールするアルゴリズム(HashRingなど)も複数考えられるので、工夫するとなかなか面白いです。

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val pool = MemcachedConnectionPool.ofSingleRoundRobin(sizePerPeer = 5, peerConfig, RedisConnection(_)) // powered by RoundRobinPool
val connection = MemcachedConnection(connectionConfig)
val client = MemcachedClient()

// ローンパターン形式
val resultFuture1 = pool.withConnectionF{ con =>
  (for{
    _ <- client.set("foo", "bar")
    r <- client.get("foo")
  } yield r).run(con) 
}.runAsync

// モナド形式
val resultFuture2 = (for {
  _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run)
  r <- ConnectionAutoClose(pool)(client.get("foo").run)
} yield r).run().runAsync

まとめ

というわけでこんな風にakka-streamを使えばTCPクライアントも比較的簡単に実装できると思います。参考にしてみてください。


  1. バックプレッシャを適切にハンドリングするには、ストリームを常に起動した状態にしておく必要があります、要求ごとにストリームを起動していると効果的ではありません))

「エンティティの同一性を表現するためにequalsをオーバーライドすべきか否か」の感想

毎日、ドメイン駆動設計というか、設計の話が投稿されると、楽しくなりますね。

さて、今日の話題は、以下です!

エンティティの同一性を表現するためにequalsをオーバーライドすべきか否か

”稀によくあるサンプル”。多分これ僕が書いた事例ですね。ということで、なぜそうしたか、理由など書いておきたいと思います。

なぜこうしたか

equalsの責務は以下のとおりで、オブジェクトが等しいかどうかを示すものです。エンティティの等価判定の基準に、識別子以外の属性は含めていません。 これにはトレードオフがあります。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Object.html Object#equalsの責務は、「このオブジェクトと他のオブジェクトが等しいかどうかを示します。」

以下のように実装したと場合に、(ID以外の)属性が変わったエンティティを特定することが可能です。

scala> case class EmployeeId(value: Int) extends Identifier
defined class EmployeeId

scala> case class Employee(identifier: EmployeeId, name: String) extends Entity[EmployeeId]
defined class Employee

scala> val list = Seq(Employee(EmployeeId(1), "yamada taro"), Employee(EmployeeId(2), "yamada hanako"))
list: Seq[Employee] = List(Employee(EmployeeId(1),yamada taro), Employee(EmployeeId(2),yamada hanako))

// "yamada hanako"が結婚して名前が変わるイベントが発生、リストの内容も合わせて変更される

scala> list.contains(Employee(EmployeeId(2), "yamada hanako"))
res0: Boolean = true

Entity#sameIdentityAsの場合は、 コレクションのメソッドに組み込むことはできないのでそれなりに実装を工夫する必要がありますね。

論理的等価性検査としてのequals

上記のメリットはよいとして、一体equalsメソッドとして何を求めるているのか。DDDの観点があろうがなかろうか、equalsの契約を逸脱する設計をしないようにすべきと考えます。久しぶりに、Effective Javaを開いてみると、equalsは「論理的等価性」検査の責務を持つというような記述はあるものの、「論理的等価性」が具体的に何かは明記されていませんでした。

ちなみにjava.lang.Objectのデフォルト実装は以下となっています。多くの場合はサブクラスで固有のequalsメソッドとしてオーバーライドされますが、されない場合もあります。java.util.Randomは、同じ乱数列を生成するかで等価判定できたはずですが、クライアントがそれを求めてなかったので、デフォルト実装のままとなっているようです。

public boolean equals(Object obj) {
  return (this == obj);
}

契約プログラミングの観点から

契約プログラミングの観点では、java.lan.Object#equalsのJavadocに書かれていること以上のことは求めてはいけません。つまり、一般契約に従っている以上は「オブジェクトが等しいかどうか」の仕様を満たしているはずです。

DDDの文脈では、等価性は(エンティティが持つ)値がすべて同じであればtrueとみなし、同一性は識別子が同じであればtrueとみなすものと解釈できそうです。つまり、エンティティの定義である「同一性によって定義されるオブジェクト」に照らすと equals をオーバーライドすることは適切ではありません。 この点では、本来オーバーライドすべきは eq と言えるかもしれません。

このアイデアは有益ですしこの設計を取ってもよいと思っていますが、java.lan.Object#equalsの契約に照らして考えるに「適切ではありません」とまでは言い切れないと考えています。

結局どうすべきか

まぁ身の蓋もないですが、こうすべきだと思っています。(ここでオーバーライドするという意味は、IDのみの等価判断する実装としてオーバーライドするかしないかという意味です)

また、equalshashCodeのようにもともと抽象度の高いインターフェイスは意図がわかりにくいことがあります。対策としては、ドキュメンテーションコメントに自然言語できちんと設計の意図を記述するべきで、利用者も求められる契約が何かを理解すべきだと思っています。(自壊の念を込めて)

実装を追わなければ equals がオーバーライドされていることが分からないことは、余分な意識コストとなってしまうでしょう。

akka-httpにswagger2.xを組み込む方法

akka-httpswagger2.xを組み込む方法を以下に示します。

※DIコンテナであるAirframeを使っていますが、もちろん必須ではありません。適宜読み替えてくだだい。

ライブラリの依存関係

swagger-akka-httpを追加します。javax.ws.rs-apiはアノテーションを利用するために追加します。akka-http-corsは必要に応じて追加してください。

libraryDependencies ++= Seq(
"com.typesafe.akka"            %% "akka-http"         % "10.1.5",
"com.github.swagger-akka-http" %% "swagger-akka-http" % "2.0.0"
"javax.ws.rs"                  % "javax.ws.rs-api"    % "2.0.1"
"ch.megard"                    %% "akka-http-cors"    % "0.3.0"
// ...
)

コントローラの例

コントローラに相当するクラスにアクションを作って、そのメソッドにアノテーションを割り当てます。別にコントローラを定義しなくとも、エンドポイントごとにRoute定義が分かれていて、アノテーションが付与できればよいです。

アノテーションの使い方は、Swagger 2.X Annotationsを読んでください。

package spetstore.interface.api.controller

import java.time.ZonedDateTime

import akka.http.scaladsl.server.{Directives, Route}
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe.generic.auto._
import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.parameters.RequestBody
import io.swagger.v3.oas.annotations.responses.ApiResponse
import javax.ws.rs._
import monix.eval.Task
import monix.execution.Scheduler
import org.hashids.Hashids
import org.sisioh.baseunits.scala.money.Money
import spetstore.domain.model.basic.StatusType
import spetstore.domain.model.item._
import spetstore.interface.api.model.{CreateItemRequest, CreateItemResponse, CreateItemResponseBody}
import spetstore.interface.generator.jdbc.ItemIdGeneratorOnJDBC
import spetstore.interface.repository.ItemRepository
import wvlet.airframe._

import scala.concurrent.Future

@Path("/items")
@Consumes(Array("application/json"))
@Produces(Array("application/json"))
trait ItemController extends Directives {

  private val itemRepository: ItemRepository[Task]         = bind[ItemRepository[Task]]

  private val itemIdGeneratorOnJDBC: ItemIdGeneratorOnJDBC = bind[ItemIdGeneratorOnJDBC]

  private val hashids = bind[Hashids]

  def route: Route = create

  private def convertToAggregate(id: ItemId, request: CreateItemRequest): Item = Item(
    id = id,
    status = StatusType.Active,
    name = ItemName(request.name),
    description = request.description.map(ItemDescription),
    categories = Categories(request.categories),
    price = Price(Money.yens(request.price)),
    createdAt = ZonedDateTime.now(),
    updatedAt = None
  )

  @POST
  @Operation(
    summary = "Create item",
    description = "Create Item",
    requestBody =
      new RequestBody(content = Array(new Content(schema = new Schema(implementation = classOf[CreateItemRequest])))),
    responses = Array(
      new ApiResponse(responseCode = "200",
                      description = "Create response",
                      content = Array(new Content(schema = new Schema(implementation = classOf[CreateItemResponse])))),
      new ApiResponse(responseCode = "500", description = "Internal server error")
    )
  )
  def create: Route = path("items") {
    post {
      extractActorSystem { implicit system =>
        implicit val scheduler: Scheduler = Scheduler(system.dispatcher)
        entity(as[CreateItemRequest]) { request =>
          val future: Future[CreateItemResponse] = (for {
            itemId <- itemIdGeneratorOnJDBC.generateId()
            _      <- itemRepository.store(convertToAggregate(itemId, request))
          } yield CreateItemResponse(Right(CreateItemResponseBody(hashids.encode(itemId.value))))).runAsync
          onSuccess(future) { result =>
            complete(result)
          }
        }
      }
    }
  }

  // ...

}

swagger-ui

swagger-uidistsrc/main/resource/swaggerとしてコピーしてください。

SwaggerHttpService

次にSwaggerHttpServiceの実装を用意します。

package spetstore.interface.api

import com.github.swagger.akka.SwaggerHttpService
import com.github.swagger.akka.model.Info

class SwaggerDocService(hostName: String, port: Int, val apiClasses: Set[Class[_]]) extends SwaggerHttpService {
  override val host                = s"127.0.0.1:$port" //the url of your api, not swagger's json endpoint
  override val apiDocsPath         = "api-docs" //where you want the swagger-json endpoint exposed
  override val info                = Info() //provides license and other description details
  override val unwantedDefinitions = Seq("Function1", "Function1RequestContextFutureRouteResult")
}

routeの設定

akka-httpのRouteは以下を参考にしてください。SwaggerDocServiceとコントローラをrouteに加えます。また、CORSが必要なら、"ch.megard" %% "akka-http-cors" % "0.3.0" を使うとよいと思います。

package spetstore.interface.api

import akka.http.scaladsl.model.{ ContentTypes, HttpEntity, HttpResponse }
import akka.http.scaladsl.server.{ Directives, Route, StandardRoute }
import wvlet.airframe._
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import spetstore.interface.api.controller.ItemController

trait Routes extends Directives {

  private lazy val itemController    = bind[ItemController]
  private lazy val swaggerDocService = bind[SwaggerDocService]

  private def index(): StandardRoute = complete(
    HttpResponse(
      entity = HttpEntity(
        ContentTypes.`text/plain(UTF-8)`,
        "Wellcome to API"
      )
    )
  )

  def routes: Route = cors() {
    pathEndOrSingleSlash {
      index()
    } ~ path("swagger") {
      getFromResource("swagger/index.html")
    } ~ getFromResourceDirectory("swagger") ~
    swaggerDocService.routes ~ itemController.route
  }

}

ブートストラップ

akka-httpの起動部分のコードです。

package spetstore.interface.api

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.ActorMaterializer
import wvlet.airframe._

import scala.concurrent.Future
import scala.util.{ Failure, Success }

trait ApiServer {

  implicit val system           = bind[ActorSystem]
  implicit val materializer     = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  private val routes = bind[Routes].routes

  def start(host: String, port: Int, settings: ServerSettings): Future[ServerBinding] = {
    val bindingFuture = Http().bindAndHandle(handler = routes, interface = host, port = port, settings = settings)
    bindingFuture.onComplete {
      case Success(binding) =>
        system.log.info(s"Server online at http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/")
      case Failure(ex) =>
        system.log.error(ex, "occurred error")
    }
    sys.addShutdownHook {
      bindingFuture
        .flatMap(_.unbind())
        .onComplete { _ =>
          materializer.shutdown()
          system.terminate()
        }
    }
    bindingFuture
  }

}

アプリケーションのブートストラップ部分です。

package spetstore.api

import akka.actor.ActorSystem
import akka.http.scaladsl.settings.ServerSettings
import monix.eval.Task
import org.hashids.Hashids
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import spetstore.domain.model.item.ItemId
import spetstore.interface.api.controller.ItemController
import spetstore.interface.api.{ApiServer, Routes, SwaggerDocService}
import spetstore.interface.generator.IdGenerator
import spetstore.interface.generator.jdbc.ItemIdGeneratorOnJDBC
import spetstore.interface.repository.{ItemRepository, ItemRepositoryBySlick}
import wvlet.airframe._

/**
  * http://127.0.0.1:8080/swagger
  */
object Main {

  def main(args: Array[String]): Unit = {
    val parser = new scopt.OptionParser[AppConfig]("spetstore") {
      opt[String]('h', "host").action((x, c) => c.copy(host = x)).text("host")
      opt[Int]('p', "port").action((x, c) => c.copy(port = x)).text("port")
    }
    val system = ActorSystem("spetstore")
    val dbConfig: DatabaseConfig[JdbcProfile] =
      DatabaseConfig.forConfig[JdbcProfile](path = "spetstore.interface.storage.jdbc", system.settings.config)

    parser.parse(args, AppConfig()) match {
      case Some(config) =>
        val design = newDesign
          .bind[Hashids].toInstance(new Hashids(system.settings.config.getString("spetstore.interface.hashids.salt")))
          .bind[ActorSystem].toInstance(system)
          .bind[JdbcProfile].toInstance(dbConfig.profile)
          .bind[JdbcProfile#Backend#Database].toInstance(dbConfig.db)
          .bind[Routes].toSingleton
          .bind[SwaggerDocService].toInstance(
          new SwaggerDocService(config.host, config.port, Set(classOf[ItemController]))
        )
          .bind[ApiServer].toSingleton
          .bind[ItemRepository[Task]].to[ItemRepositoryBySlick]
          .bind[IdGenerator[ItemId]].to[ItemIdGeneratorOnJDBC]
          .bind[ItemController].toSingleton
        design.withSession { session =>
          val system = session.build[ActorSystem]
          session.build[ApiServer].start(config.host, config.port, settings = ServerSettings(system))
        }
      case None =>
        println(parser.usage)
    }
  }
}

まとめ

最低限、考慮すべきこととしては、akka-httpのroute dslはエンドポイントごとに分割してswaggerアノテーションを割り当てれるようにしてください。これ以外はswaggerの一般的な使い方と変わりません。

Getter/Setterを避けて役に立つドメインオブジェクトを作る

Clean Architecture 達人に学ぶソフトウェアの構造と設計を読んでます。モデリングに関しては成分薄めですが、よい本だと思います。はい。

Clean Architecture 達人に学ぶソフトウェアの構造と設計

Clean Architecture 達人に学ぶソフトウェアの構造と設計

本書の大筋から少し逸れるが、「5章 オブジェクト指向プログラミング」の「カプセル化」が面白かったので、これを切り口にモデリングについて考えてみる。

続きを読む

DDDリポジトリを楽に実装するライブラリ

DDDのリポジトリを実装するのがダルいので、ライブラリ化したというか前から書いていたけど、Redis, Memcachedなどの実装も追加したので、簡単に説明を書いてみる。 プロジェクトなどで自前で実装する際も、参照実装として参考になると思います。Scalaの例だけど、他の言語でも参考にはなるんじゃないかと勝手に想像してます。

https://github.com/j5ik2o/scala-ddd-base

デフォルトで対応している永続化技術は、以下。

  • JDBC
    • SkinnyORM
    • Slick3
  • NOSQL
    • Memcached
    • Redis
    • Guava Cache
  • Freeモナド
    • こちらは永続化そのものは行いません。上記どれかの実装に委譲することになります。

何が楽になるかというと、上記向けのリポジトリの実装があるので、Daoだけ用意すればリポジトリが実装できるようになります。

中核になるトレイト

中核となる抽象トレイトはこれ。実装はついてないです。

https://github.com/j5ik2o/scala-ddd-base/tree/master/core/src/main/scala/com/github/j5ik2o/dddbase

IOするのはAggregateです。リポジトリによっては実装するメソッドが異なるのでトレイトは細かく分かれています。Mは型コンストラクタです。

trait AggregateSingleReader[M[_]] extends AggregateIO[M] {
  def resolveById(id: IdType): M[AggregateType]
}

SkinnyORM向け実装トレイト

一例としてSkinnyORM向けに実装を提供するトレイトの説明を簡単にします。Mはここでは、ReaderT[Task, DBSession, A]としています。Taskmonix.eval.Taskです。この型は実装ごとに変わります。たとえば、Redis向けの実装では、ReaderT[Task, RedisConnection, A]になります。

実装を提供するトレイトはAggregateIOBaseFeatureを継承しますが、ほとんどのロジックでSkinnyDaoSupport#Daoを実装したオブジェクトに委譲します。つまり、リポジトリを作る場合は、これらのトレイトをリポジトリのクラスにミックスインして、Daoの実装を提供するだけでよいことになります。

object AggregateIOBaseFeature {
  type RIO[A] = ReaderT[Task, DBSession, A]
}

trait AggregateIOBaseFeature extends AggregateIO[RIO] {
  override type IdType <: AggregateLongId
  type RecordType <: SkinnyDaoSupport#Record
  type DaoType <: SkinnyDaoSupport#Dao[RecordType]

  protected val dao: DaoType
}

trait AggregateSingleReadFeature extends AggregateSingleReader[RIO] with AggregateBaseReadFeature {

  override def resolveById(id: IdType): RIO[AggregateType] =
    for {
      record <- ReaderT[Task, DBSession, RecordType] { implicit dbSession: DBSession =>
        Task {
          dao.findBy(byCondition(id)).getOrElse(throw AggregateNotFoundException(id))
        }
      }
      aggregate <- convertToAggregate(record)
    } yield aggregate

}

実装サンプル

実装する際は、Aggregate*Featureのトレイトをミックスしてください(UserAccountRepositoryは実装を持たない抽象型です)。あとはDaoの実装の提供だけです。以下の例では、UserAccountComponentUserAccountRecord, UserAccountDaoを提供します。

object UserAccountRepository {
  type BySlick[A] = Task[A]
  def bySkinny: UserAccountRepository[BySkinny] = new UserAccountRepositoryBySkinny
}

class UserAccountRepositoryBySkinny
    extends UserAccountRepository[BySkinny]
    with AggregateSingleReadFeature
    with AggregateSingleWriteFeature
    with AggregateMultiReadFeature
    with AggregateMultiWriteFeature
    with AggregateSingleSoftDeleteFeature
    with AggregateMultiSoftDeleteFeature
    with UserAccountComponent {

  override type RecordType = UserAccountRecord
  override type DaoType    = UserAccountDao.type
  override protected val dao: UserAccountDao.type = UserAccountDao

  override protected def convertToAggregate: UserAccountRecord => RIO[UserAccount] = { record =>
    ReaderT { _ => // このメソッド内部でDBを利用したり、非同期タスクを実行する可能性もあるので、RIO形式を取っている
      Task.pure {
        UserAccount(
          id = UserAccountId(record.id),
          status = Status.withName(record.status),
          emailAddress = EmailAddress(record.email),
          password = HashedPassword(record.password),
          firstName = record.firstName,
          lastName = record.lastName,
          createdAt = record.createdAt,
          updatedAt = record.updatedAt
        )
      }
    }
  }

  override protected def convertToRecord: UserAccount => RIO[UserAccountRecord] = { aggregate =>
    ReaderT { _ =>
      Task.pure {
        UserAccountRecord(
          id = aggregate.id.value,
          status = aggregate.status.entryName,
          email = aggregate.emailAddress.value,
          password = aggregate.password.value,
          firstName = aggregate.firstName,
          lastName = aggregate.lastName,
          createdAt = aggregate.createdAt,
          updatedAt = aggregate.updatedAt
        )
      }
    }
  }

}

DaoはSkinnyDaoSupport#Daoを実装する形式になります。これはSkinnyCRUDMapperの派生型です。 僕の場合は、ここで紹介した方法でスキーマから自動生成しています。

package skinny {

  import com.github.j5ik2o.dddbase.skinny.SkinnyDaoSupport
  import scalikejdbc._
  import _root_.skinny.orm._

  trait UserAccountComponent extends SkinnyDaoSupport {

    case class UserAccountRecord(
        id: Long,
        status: String,
        email: String,
        password: String,
        firstName: String,
        lastName: String,
        createdAt: java.time.ZonedDateTime,
        updatedAt: Option[java.time.ZonedDateTime]
    ) extends Record

    object UserAccountDao extends Dao[UserAccountRecord] {

      override def useAutoIncrementPrimaryKey: Boolean = false

      override val tableName: String = "user_account"

      override protected def toNamedValues(record: UserAccountRecord): Seq[(Symbol, Any)] = Seq(
        'status     -> record.status,
        'email      -> record.email,
        'password   -> record.password,
        'first_name -> record.firstName,
        'last_name  -> record.lastName,
        'created_at -> record.createdAt,
        'updated_at -> record.updatedAt
      )

      override def defaultAlias: Alias[UserAccountRecord] = createAlias("u")

      override def extract(rs: WrappedResultSet, s: ResultName[UserAccountRecord]): UserAccountRecord =
        autoConstruct(rs, s)

    }

  }

}

利用するときは、以下のような感じでリポジトリが返すReaderT[Task, DBSession, UserAccount]#runDBSessionを渡すとTaskが返ってきます。 それをrunAsyncするとFuture[UserAccount]が取得できます。他の実装でもほとんど同じように使えます。ご参考までに。

val resultFuture: Future[UserAccount] = (for {
  _ <- repository.store(userAccount)
  r <- repository.resolveById(userAccount.id)
} yield r).run(AutoSession).runAsync