引っ越しました。
akka-streamを使ってmemcachedクライアントを作るには
かなり前になるのですが、akka-streamの習作課題として、Memcachedクライアントを実装したので、概要を解説する記事をまとめます。詳しくはgithubをみてください。
https://github.com/j5ik2o/reactive-memcached
TCPのハンドリング方法
akkaでTCPをハンドリングするには、以下の二つになる。おそらく
akka.actor
とakka.io
のパッケージを使う方式。つまり、アクターでTCP I/Oを実装する方法。akka.stream
のTcp#outgoingConnection
というAPIを使う方法
今回は、akka-streamをベースにしたかったので、2を採用しました。
outgoingConnection
メソッドの戻り値の型は、Flow[ByteString, ByteString, Future[OutgoingConnection]]
です。ByteString
を渡せば、ByteString
が返ってくるらしい。わかりやすいですね。そしてストリームが起動中であれば、コネクションは常時接続された状態になります。
def outgoingConnection(
remoteAddress : InetSocketAddress,
localAddress : Option[InetSocketAddress],
options : Traversable[SocketOption],
halfClose : Boolean,
connectTimeout : Duration,
idleTimeout : Duration) : Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]]
Memcachedのコネクションを表現するオブジェクトを実装する
では、早速 Memcachedのコネクションを表現するオブジェクトである MemcachedConnection
を実装します。このオブジェクトを生成するとTCP接続され、破棄されると切断されます。そして、考えたインターフェイスは以下です(別にtrait切り出さなくてもよかったですが、説明のために分けました)。send
メソッドにコマンドを指定して呼ぶとレスポンスが返る単純なものです。
ちなみに、send
メソッドの戻り値の型はmonix.eval.Task
です。詳しくは https://monix.io/docs/3x/eval/task.html をご覧ください。
trait MemcachedConnection { def id: UUID // 接続ID def shutdown(): Unit // シャットダウン def peerConfig: Option[PeerConfig] // 接続先設定 def send[C <: CommandRequest](cmd: C): Task[cmd.Response] // コマンドの送信 // ... }
実装はどうなるか。全貌は以下。先ほどのTcp#outgoingConnection
を使います。
private[memcached] class MemcachedConnectionImpl(_peerConfig: PeerConfig, supervisionDecider: Option[Supervision.Decider])( implicit system: ActorSystem ) extends MemcachedConnection { // ... private implicit val mat: ActorMaterializer = ... protected val tcpFlow: Flow[ByteString, ByteString, NotUsed] = RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => Tcp() .outgoingConnection(remoteAddress, localAddress, options, halfClose, connectTimeout, idleTimeout) } protected val connectionFlow: Flow[RequestContext, ResponseContext, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val requestFlow = b.add( Flow[RequestContext] .map { rc => log.debug(s"request = [{}]", rc.commandRequestString) (ByteString.fromString(rc.commandRequest.asString + "\r\n"), rc) } ) val responseFlow = b.add(Flow[(ByteString, RequestContext)].map { case (byteString, requestContext) => log.debug(s"response = [{}]", byteString.utf8String) ResponseContext(byteString, requestContext) }) val unzip = b.add(Unzip[ByteString, RequestContext]()) val zip = b.add(Zip[ByteString, RequestContext]()) requestFlow.out ~> unzip.in unzip.out0 ~> tcpFlow ~> zip.in0 unzip.out1 ~> zip.in1 zip.out ~> responseFlow.in FlowShape(requestFlow.in, responseFlow.out) }) protected val (requestQueue: SourceQueueWithComplete[RequestContext], killSwitch: UniqueKillSwitch) = Source .queue[RequestContext](requestBufferSize, overflowStrategy) .via(connectionFlow) .map { responseContext => log.debug(s"req_id = {}, command = {}: parse", responseContext.commandRequestId, responseContext.commandRequestString) val result = responseContext.parseResponse // レスポンスのパース responseContext.completePromise(result.toTry) // パース結果を返す } .viaMat(KillSwitches.single)(Keep.both) .toMat(Sink.ignore)(Keep.left) .run() override def shutdown(): Unit = killSwitch.shutdown() override def send[C <: CommandRequest](cmd: C): Task[cmd.Response] = Task.deferFutureAction { implicit ec => val promise = Promise[CommandResponse]() requestQueue .offer(RequestContext(cmd, promise, ZonedDateTime.now())) .flatMap { case QueueOfferResult.Enqueued => promise.future.map(_.asInstanceOf[cmd.Response]) case QueueOfferResult.Failure(t) => Future.failed(BufferOfferException("Failed to send request", Some(t))) case QueueOfferResult.Dropped => Future.failed( BufferOfferException( s"Failed to send request, the queue buffer was full." ) ) case QueueOfferResult.QueueClosed => Future.failed(BufferOfferException("Failed to send request, the queue was closed")) } } }
TCPの送受信するにはtcpFlow
メソッドが返すFlow
に必要なByteString
を流して、ByteString
を受け取ればいいですが、リクエストとレスポンスを扱うためにもう少し工夫が必要です。そのためにconnectionFlow
メソッドはtcpFlow
メソッドを内部Flow
として利用します。そして、FlowはRequestContext
とResponseContext
の型を利用します。その実装は以下のようなシンプルなものです。キューであるrequestQueue
はconnectionFlow
にリクエストを送信します。ストリームの後半で返ってきたレスポンスを処理するという流れになっています。キューへのリクエストのエンキューはsend
メソッドが行います。send
メソッドが呼ばれるとRequestContext
を作ってrequestQueue
にエンキュー、完了するとレスポンスを返します。requestQueue
はストリームと繋がっていて、エンキューされたRequestContext
はconnectionFlow
に渡され返ってきたResponseContext
がレスポンス内容をパースし、その結果をPromise経由で返すというものです1。エラーハンドリングについて、エラーを起こしやすいところにRestartFlow.withBackoff
を入れています。
final case class RequestContext(commandRequest: CommandRequest, promise: Promise[CommandResponse], requestAt: ZonedDateTime) { val id: UUID = commandRequest.id val commandRequestString: String = commandRequest.asString } final case class ResponseContext(byteString: ByteString, requestContext: RequestContext, requestsInTx: Seq[CommandRequest] = Seq.empty, responseAt: ZonedDateTime = ZonedDateTime.now) extends ResponseBase { val commandRequest: CommandRequest = requestContext.commandRequest def withRequestsInTx(values: Seq[CommandRequest]): ResponseContext = copy(requestsInTx = values) // レスポンスのパース def parseResponse: Either[ParseException, CommandResponse] = { requestContext.commandRequest match { case scr: CommandRequest => scr.parse(ByteVector(byteString.toByteBuffer)).map(_._1) } } }
コネクションオブジェクトの使い方は簡単です。ほとんどの場合は、このようにコネクションオブジェクトを直接操作せずに、高レベルなAPIを操作したいと思うはずです。そのためのクライアントオブジェクトはあとで述べます。
val connection = MemcachedConnection( PeerConfig(new InetSocketAddress("127.0.0.1", memcachedTestServer.getPort), backoffConfig = BackoffConfig(maxRestarts = 1)), None ) val resultFuture = (for { _ <- connection.send(SetRequest(UUID.randomUUID(), "key1", "1", 10 seconds)) _ <- connection.send(SetRequest(UUID.randomUUID(), "key2", "2", 10 seconds)) gr1 <- connection.send(GetRequest(UUID.randomUUID(), "key1")) gr2 <- connection.send(GetRequest(UUID.randomUUID(), "key2")) } yield (gr1, gr2)).runAsync val result = Await.result(resultFuture, Duration.Inf) // (1, 2)
コマンドの実装
コマンドの一例はこんな感じです。
asString
メソッドは文字通りMemcachedにコマンドを送信するときに利用される文字列です。responseParser
はfastparseで実装されたレスポンスパーサを指定します。parseResponse
メソッドはMemcachedからのレスポンスを表現する構文木をコマンドのレンスポンスに変換するための関数です。これは少し複雑なのであとで説明します。
final class GetRequest private (val id: UUID, val key: String) extends CommandRequest with StringParsersSupport { override type Response = GetResponse override val isMasterOnly: Boolean = false override def asString: String = s"get $key" // レスポンスを解析するためのパーサを指定。レスポンス仕様に合わせて指定する override protected def responseParser: P[Expr] = P(retrievalCommandResponse) // ASTをコマンドレスポンスに変換する override protected def parseResponse: Handler = { case (EndExpr, next) => (GetSucceeded(UUID.randomUUID(), id, None), next) case (ValueExpr(key, flags, length, casUnique, value), next) => (GetSucceeded(UUID.randomUUID(), id, Some(ValueDesc(key, flags, length, casUnique, value))), next) case (ErrorExpr, next) => (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.OtherType, None)), next) case (ClientErrorExpr(msg), next) => (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ClientType, Some(msg))), next) case (ServerErrorExpr(msg), next) => (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ServerType, Some(msg))), next) } }
これはGetRequestの上位traitであるCommandRequestです。Memcachedのコマンド送信に必要なのはasString
メソッドだけで、コマンドのレスポンスのパースにはparse
メソッドが利用されます。
trait CommandRequest { type Elem type Repr type P[+T] = core.Parser[T, Elem, Repr] type Response <: CommandResponse type Handler = PartialFunction[(Expr, Int), (Response, Int)] val id: UUID val key: String val isMasterOnly: Boolean def asString: String protected def responseParser: P[Expr] protected def convertToParseSource(s: ByteVector): Repr def parse(text: ByteVector, index: Int = 0): Either[ParseException, (Response, Int)] = { responseParser.parse(convertToParseSource(text), index) match { case f @ Parsed.Failure(_, index, _) => Left(new ParseException(f.msg, index)) case Parsed.Success(value, index) => Right(parseResponse((value, index))) } } protected def parseResponse: Handler }
レスポンス・パーサの実装
レスポンスパーサは、Memcachedの仕様に合わせて以下の定義から適切なものを選んで利用します。詳しくは公式ドキュメントをみてください。標準のパーサーコンビネータと少し違うところがありますが、概ね似たような感じで書けます。Memcachedのレスポンスのルールは単純で左再帰除去などもしなくていいので、パーサーコンビネータのはじめての題材にはよいと思います。
object StringParsers { val digit: P0 = P(CharIn('0' to '9')) val lowerAlpha: P0 = P(CharIn('a' to 'z')) val upperAlpha: P0 = P(CharIn('A' to 'Z')) val alpha: P0 = P(lowerAlpha | upperAlpha) val alphaDigit: P0 = P(alpha | digit) val crlf: P0 = P("\r\n") val error: P[ErrorExpr.type] = P("ERROR" ~ crlf).map(_ => ErrorExpr) val clientError: P[ClientErrorExpr] = P("CLIENT_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ClientErrorExpr) val serverError: P[ServerErrorExpr] = P("SERVER_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ServerErrorExpr) val allErrors: P[Expr] = P(error | clientError | serverError) val end: P[EndExpr.type] = P("END" ~ crlf).map(_ => EndExpr) val deleted: P[DeletedExpr.type] = P("DELETED" ~ crlf).map(_ => DeletedExpr) val stored: P[StoredExpr.type] = P("STORED" ~ crlf).map(_ => StoredExpr) val notStored: P[NotStoredExpr.type] = P("NOT_STORED" ~ crlf).map(_ => NotStoredExpr) val exists: P[ExistsExpr.type] = P("EXISTS" ~ crlf).map(_ => ExistsExpr) val notFound: P[NotFoundExpr.type] = P("NOT_FOUND" ~ crlf).map(_ => NotFoundExpr) val touched: P[TouchedExpr.type] = P("TOUCHED" ~ crlf).map(_ => TouchedExpr) val key: P[String] = (!" " ~/ AnyChar).rep(1).! val flags: P[Int] = digit.rep(1).!.map(_.toInt) val bytes: P[Long] = digit.rep(1).!.map(_.toLong) val casUnique: P[Long] = digit.rep(1).!.map(_.toLong) val incOrDecCommandResponse: P[Expr] = P(notFound | ((!crlf ~/ AnyChar).rep.! ~ crlf).map(StringExpr) | allErrors) val storageCommandResponse: P[Expr] = P((stored | notStored | exists | notFound) | allErrors) val value: P[ValueExpr] = P("VALUE" ~ " " ~ key ~ " " ~ flags ~ " " ~ bytes ~ (" " ~ casUnique).? ~ crlf ~ (!crlf ~/ AnyChar).rep.! ~ crlf) .map { case (key, flags, bytes, cas, value) => ValueExpr(key, flags, bytes, cas, value) } val version: P[VersionExpr] = P("VERSION" ~ " " ~ alphaDigit.rep(1).!).map(VersionExpr) val retrievalCommandResponse: P[Expr] = P(end | value | allErrors) val deletionCommandResponse: P[Expr] = P(deleted | notFound | allErrors) val touchCommandResponse: P[Expr] = P(touched | notFound | allErrors) val versionCommandResponse: P[Expr] = P(version | allErrors) }
クライアントの実装
次はクライアント実装。MemcachedClient#send
メソッドはコマンドを受け取り、ReaderTTaskMemcachedConnection[cmd.Response]
を返します。ReaderTTaskMemcachedConnection
はMemcachedのために特化したReaderTで、コネクションを受け取り、レスポンスを返すTask
を返します。Memcachedのための高レベルなAPIはこのsend
メソッドを利用して実装されます。get
,set
などのメソッドの内部でコマンドを作成し、レスポンスを戻り値にマッピングするだけで、使いやすい高レベルAPIになります。また、ReaderT
を返すためfor式での簡潔な記述もできます。
package object memcached { type ReaderTTask[C, A] = ReaderT[Task, C, A] type ReaderTTaskMemcachedConnection[A] = ReaderTTask[MemcachedConnection, A] type ReaderMemcachedConnection[M[_], A] = ReaderT[M, MemcachedConnection, A] }
final class MemcachedClient()(implicit system: ActorSystem) { def send[C <: CommandRequest](cmd: C): ReaderTTaskMemcachedConnection[cmd.Response] = ReaderT(_.send(cmd)) def get(key: String): ReaderTTaskMemcachedConnection[Option[ValueDesc]] = send(GetRequest(UUID.randomUUID(), key)).flatMap { case GetSucceeded(_, _, result) => ReaderTTask.pure(result) case GetFailed(_, _, ex) => ReaderTTask.raiseError(ex) } def set[A: Show](key: String, value: A, expireDuration: Duration = Duration.Inf, flags: Int = 0): ReaderTTaskMemcachedConnection[Int] = send(SetRequest(UUID.randomUUID(), key, value, expireDuration, flags)).flatMap { case SetExisted(_, _) => ReaderTTask.pure(0) case SetNotFounded(_, _) => ReaderTTask.pure(0) case SetNotStored(_, _) => ReaderTTask.pure(0) case SetSucceeded(_, _) => ReaderTTask.pure(1) case SetFailed(_, _, ex) => ReaderTTask.raiseError(ex) } // ... }
- for式での利用例
val client = new MemcachedClient() val resultFuture = (for { _ <- client.set(key, "1") r1 <- client.get(key) _ <- client.set(key, "2") r2 <- client.get(key) } yield (r1, r2)).run(connection).runAsync val result = Await.result(resultFuture, Duration.Inf) // (1, 2)
コネクションプール
コネクションとクライアントを実装できたら、次はコネクションプールも欲しくなります。MemcachedConnection
をプーリングして選択できればいいわけなので、commons-pool
など使えば楽ですね。コネクションをプールするアルゴリズム(HashRingなど)も複数考えられるので、工夫するとなかなか面白いです。
プールの実装 https://github.com/j5ik2o/reactive-memcached/blob/master/core/src/main/scala/com/github/j5ik2o/reactive/memcached/pool/MemcachedConnectionPoolActor.scala https://github.com/j5ik2o/reactive-memcached/blob/master/pool-commons/src/main/scala/com/github/j5ik2o/reactive/memcached/CommonsPool.scala
利用例
implicit val system = ActorSystem() val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379)) val pool = MemcachedConnectionPool.ofSingleRoundRobin(sizePerPeer = 5, peerConfig, RedisConnection(_)) // powered by RoundRobinPool val connection = MemcachedConnection(connectionConfig) val client = MemcachedClient() // ローンパターン形式 val resultFuture1 = pool.withConnectionF{ con => (for{ _ <- client.set("foo", "bar") r <- client.get("foo") } yield r).run(con) }.runAsync // モナド形式 val resultFuture2 = (for { _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run) r <- ConnectionAutoClose(pool)(client.get("foo").run) } yield r).run().runAsync
まとめ
というわけでこんな風にakka-streamを使えばTCPクライアントも比較的簡単に実装できると思います。参考にしてみてください。
-
バックプレッシャを適切にハンドリングするには、ストリームを常に起動した状態にしておく必要があります、要求ごとにストリームを起動していると効果的ではありません))↩
「エンティティの同一性を表現するためにequalsをオーバーライドすべきか否か」の感想
毎日、ドメイン駆動設計というか、設計の話が投稿されると、楽しくなりますね。
さて、今日の話題は、以下です!
エンティティの同一性を表現するためにequalsをオーバーライドすべきか否か
”稀によくあるサンプル”。多分これ僕が書いた事例ですね。ということで、なぜそうしたか、理由など書いておきたいと思います。
なぜこうしたか
equals
の責務は以下のとおりで、オブジェクトが等しいかどうかを示すものです。エンティティの等価判定の基準に、識別子以外の属性は含めていません。 これにはトレードオフがあります。
https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Object.html Object#equalsの責務は、「このオブジェクトと他のオブジェクトが等しいかどうかを示します。」
以下のように実装したと場合に、(ID以外の)属性が変わったエンティティを特定することが可能です。
scala> case class EmployeeId(value: Int) extends Identifier defined class EmployeeId scala> case class Employee(identifier: EmployeeId, name: String) extends Entity[EmployeeId] defined class Employee scala> val list = Seq(Employee(EmployeeId(1), "yamada taro"), Employee(EmployeeId(2), "yamada hanako")) list: Seq[Employee] = List(Employee(EmployeeId(1),yamada taro), Employee(EmployeeId(2),yamada hanako)) // "yamada hanako"が結婚して名前が変わるイベントが発生、リストの内容も合わせて変更される scala> list.contains(Employee(EmployeeId(2), "yamada hanako")) res0: Boolean = true
Entity#sameIdentityAs
の場合は、 コレクションのメソッドに組み込むことはできないのでそれなりに実装を工夫する必要がありますね。
論理的等価性検査としてのequals
上記のメリットはよいとして、一体equals
メソッドとして何を求めるているのか。DDDの観点があろうがなかろうか、equals
の契約を逸脱する設計をしないようにすべきと考えます。久しぶりに、Effective Javaを開いてみると、equalsは「論理的等価性」検査の責務を持つというような記述はあるものの、「論理的等価性」が具体的に何かは明記されていませんでした。
ちなみにjava.lang.Object
のデフォルト実装は以下となっています。多くの場合はサブクラスで固有のequalsメソッドとしてオーバーライドされますが、されない場合もあります。java.util.Random
は、同じ乱数列を生成するかで等価判定できたはずですが、クライアントがそれを求めてなかったので、デフォルト実装のままとなっているようです。
public boolean equals(Object obj) { return (this == obj); }
契約プログラミングの観点から
契約プログラミングの観点では、java.lan.Object#equals
のJavadocに書かれていること以上のことは求めてはいけません。つまり、一般契約に従っている以上は「オブジェクトが等しいかどうか」の仕様を満たしているはずです。
DDDの文脈では、等価性は(エンティティが持つ)値がすべて同じであればtrueとみなし、同一性は識別子が同じであればtrueとみなすものと解釈できそうです。つまり、エンティティの定義である「同一性によって定義されるオブジェクト」に照らすと equals をオーバーライドすることは適切ではありません。 この点では、本来オーバーライドすべきは eq と言えるかもしれません。
このアイデアは有益ですしこの設計を取ってもよいと思っていますが、java.lan.Object#equals
の契約に照らして考えるに「適切ではありません」とまでは言い切れないと考えています。
結局どうすべきか
まぁ身の蓋もないですが、こうすべきだと思っています。(ここでオーバーライドするという意味は、IDのみの等価判断する実装としてオーバーライドするかしないかという意味です)
まぁ、オーバーライドするかしないかは、トレードオフがあるので、プロジェクトやチームで判断してくださいというスタンスかな、僕は。
— かとじゅん (@j5ik2o) 2018年12月5日
また、equals
やhashCode
のようにもともと抽象度の高いインターフェイスは意図がわかりにくいことがあります。対策としては、ドキュメンテーションコメントに自然言語できちんと設計の意図を記述するべきで、利用者も求められる契約が何かを理解すべきだと思っています。(自壊の念を込めて)
実装を追わなければ equals がオーバーライドされていることが分からないことは、余分な意識コストとなってしまうでしょう。
DbCの観点でいえば、equals, hashCodeはDDDでいう意図の明白なインターフェイスではありませんね。コードだけでは設計の意図がわからない場合は、javadocなりscaladocに自然言語で仕様を記述すべきですね。
— かとじゅん (@j5ik2o) 2018年12月5日
akka-httpにswagger2.xを組み込む方法
akka-http
にswagger2.x
を組み込む方法を以下に示します。
※DIコンテナであるAirframeを使っていますが、もちろん必須ではありません。適宜読み替えてくだだい。
ライブラリの依存関係
swagger-akka-httpを追加します。javax.ws.rs-api
はアノテーションを利用するために追加します。akka-http-cors
は必要に応じて追加してください。
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.1.5", "com.github.swagger-akka-http" %% "swagger-akka-http" % "2.0.0" "javax.ws.rs" % "javax.ws.rs-api" % "2.0.1" "ch.megard" %% "akka-http-cors" % "0.3.0" // ... )
コントローラの例
コントローラに相当するクラスにアクションを作って、そのメソッドにアノテーションを割り当てます。別にコントローラを定義しなくとも、エンドポイントごとにRoute定義が分かれていて、アノテーションが付与できればよいです。
アノテーションの使い方は、Swagger 2.X Annotationsを読んでください。
package spetstore.interface.api.controller import java.time.ZonedDateTime import akka.http.scaladsl.server.{Directives, Route} import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._ import io.circe.generic.auto._ import io.swagger.v3.oas.annotations.Operation import io.swagger.v3.oas.annotations.media.{Content, Schema} import io.swagger.v3.oas.annotations.parameters.RequestBody import io.swagger.v3.oas.annotations.responses.ApiResponse import javax.ws.rs._ import monix.eval.Task import monix.execution.Scheduler import org.hashids.Hashids import org.sisioh.baseunits.scala.money.Money import spetstore.domain.model.basic.StatusType import spetstore.domain.model.item._ import spetstore.interface.api.model.{CreateItemRequest, CreateItemResponse, CreateItemResponseBody} import spetstore.interface.generator.jdbc.ItemIdGeneratorOnJDBC import spetstore.interface.repository.ItemRepository import wvlet.airframe._ import scala.concurrent.Future @Path("/items") @Consumes(Array("application/json")) @Produces(Array("application/json")) trait ItemController extends Directives { private val itemRepository: ItemRepository[Task] = bind[ItemRepository[Task]] private val itemIdGeneratorOnJDBC: ItemIdGeneratorOnJDBC = bind[ItemIdGeneratorOnJDBC] private val hashids = bind[Hashids] def route: Route = create private def convertToAggregate(id: ItemId, request: CreateItemRequest): Item = Item( id = id, status = StatusType.Active, name = ItemName(request.name), description = request.description.map(ItemDescription), categories = Categories(request.categories), price = Price(Money.yens(request.price)), createdAt = ZonedDateTime.now(), updatedAt = None ) @POST @Operation( summary = "Create item", description = "Create Item", requestBody = new RequestBody(content = Array(new Content(schema = new Schema(implementation = classOf[CreateItemRequest])))), responses = Array( new ApiResponse(responseCode = "200", description = "Create response", content = Array(new Content(schema = new Schema(implementation = classOf[CreateItemResponse])))), new ApiResponse(responseCode = "500", description = "Internal server error") ) ) def create: Route = path("items") { post { extractActorSystem { implicit system => implicit val scheduler: Scheduler = Scheduler(system.dispatcher) entity(as[CreateItemRequest]) { request => val future: Future[CreateItemResponse] = (for { itemId <- itemIdGeneratorOnJDBC.generateId() _ <- itemRepository.store(convertToAggregate(itemId, request)) } yield CreateItemResponse(Right(CreateItemResponseBody(hashids.encode(itemId.value))))).runAsync onSuccess(future) { result => complete(result) } } } } } // ... }
swagger-ui
swagger-uiのdist
をsrc/main/resource/swagger
としてコピーしてください。
SwaggerHttpService
次にSwaggerHttpServiceの実装を用意します。
package spetstore.interface.api import com.github.swagger.akka.SwaggerHttpService import com.github.swagger.akka.model.Info class SwaggerDocService(hostName: String, port: Int, val apiClasses: Set[Class[_]]) extends SwaggerHttpService { override val host = s"127.0.0.1:$port" //the url of your api, not swagger's json endpoint override val apiDocsPath = "api-docs" //where you want the swagger-json endpoint exposed override val info = Info() //provides license and other description details override val unwantedDefinitions = Seq("Function1", "Function1RequestContextFutureRouteResult") }
routeの設定
akka-httpのRouteは以下を参考にしてください。SwaggerDocServiceとコントローラをrouteに加えます。また、CORSが必要なら、"ch.megard" %% "akka-http-cors" % "0.3.0" を使うとよいと思います。
package spetstore.interface.api import akka.http.scaladsl.model.{ ContentTypes, HttpEntity, HttpResponse } import akka.http.scaladsl.server.{ Directives, Route, StandardRoute } import wvlet.airframe._ import ch.megard.akka.http.cors.scaladsl.CorsDirectives._ import spetstore.interface.api.controller.ItemController trait Routes extends Directives { private lazy val itemController = bind[ItemController] private lazy val swaggerDocService = bind[SwaggerDocService] private def index(): StandardRoute = complete( HttpResponse( entity = HttpEntity( ContentTypes.`text/plain(UTF-8)`, "Wellcome to API" ) ) ) def routes: Route = cors() { pathEndOrSingleSlash { index() } ~ path("swagger") { getFromResource("swagger/index.html") } ~ getFromResourceDirectory("swagger") ~ swaggerDocService.routes ~ itemController.route } }
ブートストラップ
akka-httpの起動部分のコードです。
package spetstore.interface.api import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.settings.ServerSettings import akka.stream.ActorMaterializer import wvlet.airframe._ import scala.concurrent.Future import scala.util.{ Failure, Success } trait ApiServer { implicit val system = bind[ActorSystem] implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher private val routes = bind[Routes].routes def start(host: String, port: Int, settings: ServerSettings): Future[ServerBinding] = { val bindingFuture = Http().bindAndHandle(handler = routes, interface = host, port = port, settings = settings) bindingFuture.onComplete { case Success(binding) => system.log.info(s"Server online at http://${binding.localAddress.getHostName}:${binding.localAddress.getPort}/") case Failure(ex) => system.log.error(ex, "occurred error") } sys.addShutdownHook { bindingFuture .flatMap(_.unbind()) .onComplete { _ => materializer.shutdown() system.terminate() } } bindingFuture } }
アプリケーションのブートストラップ部分です。
package spetstore.api import akka.actor.ActorSystem import akka.http.scaladsl.settings.ServerSettings import monix.eval.Task import org.hashids.Hashids import slick.basic.DatabaseConfig import slick.jdbc.JdbcProfile import spetstore.domain.model.item.ItemId import spetstore.interface.api.controller.ItemController import spetstore.interface.api.{ApiServer, Routes, SwaggerDocService} import spetstore.interface.generator.IdGenerator import spetstore.interface.generator.jdbc.ItemIdGeneratorOnJDBC import spetstore.interface.repository.{ItemRepository, ItemRepositoryBySlick} import wvlet.airframe._ /** * http://127.0.0.1:8080/swagger */ object Main { def main(args: Array[String]): Unit = { val parser = new scopt.OptionParser[AppConfig]("spetstore") { opt[String]('h', "host").action((x, c) => c.copy(host = x)).text("host") opt[Int]('p', "port").action((x, c) => c.copy(port = x)).text("port") } val system = ActorSystem("spetstore") val dbConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig[JdbcProfile](path = "spetstore.interface.storage.jdbc", system.settings.config) parser.parse(args, AppConfig()) match { case Some(config) => val design = newDesign .bind[Hashids].toInstance(new Hashids(system.settings.config.getString("spetstore.interface.hashids.salt"))) .bind[ActorSystem].toInstance(system) .bind[JdbcProfile].toInstance(dbConfig.profile) .bind[JdbcProfile#Backend#Database].toInstance(dbConfig.db) .bind[Routes].toSingleton .bind[SwaggerDocService].toInstance( new SwaggerDocService(config.host, config.port, Set(classOf[ItemController])) ) .bind[ApiServer].toSingleton .bind[ItemRepository[Task]].to[ItemRepositoryBySlick] .bind[IdGenerator[ItemId]].to[ItemIdGeneratorOnJDBC] .bind[ItemController].toSingleton design.withSession { session => val system = session.build[ActorSystem] session.build[ApiServer].start(config.host, config.port, settings = ServerSettings(system)) } case None => println(parser.usage) } } }
まとめ
最低限、考慮すべきこととしては、akka-httpのroute dslはエンドポイントごとに分割してswaggerアノテーションを割り当てれるようにしてください。これ以外はswaggerの一般的な使い方と変わりません。
Getter/Setterを避けて役に立つドメインオブジェクトを作る
Clean Architecture 達人に学ぶソフトウェアの構造と設計を読んでます。モデリングに関しては成分薄めですが、よい本だと思います。はい。
Clean Architecture 達人に学ぶソフトウェアの構造と設計
- 作者: Robert C.Martin,角征典,高木正弘
- 出版社/メーカー: KADOKAWA
- 発売日: 2018/07/27
- メディア: 単行本
- この商品を含むブログを見る
本書の大筋から少し逸れるが、「5章 オブジェクト指向プログラミング」の「カプセル化」が面白かったので、これを切り口にモデリングについて考えてみる。
続きを読むDDDリポジトリを楽に実装するライブラリ
DDDのリポジトリを実装するのがダルいので、ライブラリ化したというか前から書いていたけど、Redis, Memcachedなどの実装も追加したので、簡単に説明を書いてみる。 プロジェクトなどで自前で実装する際も、参照実装として参考になると思います。Scalaの例だけど、他の言語でも参考にはなるんじゃないかと勝手に想像してます。
https://github.com/j5ik2o/scala-ddd-base
デフォルトで対応している永続化技術は、以下。
- JDBC
- SkinnyORM
- Slick3
- NOSQL
- Memcached
- Redis
- Guava Cache
- Freeモナド
- こちらは永続化そのものは行いません。上記どれかの実装に委譲することになります。
何が楽になるかというと、上記向けのリポジトリの実装があるので、Daoだけ用意すればリポジトリが実装できるようになります。
中核になるトレイト
中核となる抽象トレイトはこれ。実装はついてないです。
https://github.com/j5ik2o/scala-ddd-base/tree/master/core/src/main/scala/com/github/j5ik2o/dddbase
IOするのはAggregateです。リポジトリによっては実装するメソッドが異なるのでトレイトは細かく分かれています。Mは型コンストラクタです。
trait AggregateSingleReader[M[_]] extends AggregateIO[M] { def resolveById(id: IdType): M[AggregateType] }
SkinnyORM向け実装トレイト
一例としてSkinnyORM向けに実装を提供するトレイトの説明を簡単にします。M
はここでは、ReaderT[Task, DBSession, A]
としています。Task
はmonix.eval.Task
です。この型は実装ごとに変わります。たとえば、Redis向けの実装では、ReaderT[Task, RedisConnection, A]
になります。
実装を提供するトレイトはAggregateIOBaseFeature
を継承しますが、ほとんどのロジックでSkinnyDaoSupport#Dao
を実装したオブジェクトに委譲します。つまり、リポジトリを作る場合は、これらのトレイトをリポジトリのクラスにミックスインして、Daoの実装を提供するだけでよいことになります。
object AggregateIOBaseFeature { type RIO[A] = ReaderT[Task, DBSession, A] } trait AggregateIOBaseFeature extends AggregateIO[RIO] { override type IdType <: AggregateLongId type RecordType <: SkinnyDaoSupport#Record type DaoType <: SkinnyDaoSupport#Dao[RecordType] protected val dao: DaoType } trait AggregateSingleReadFeature extends AggregateSingleReader[RIO] with AggregateBaseReadFeature { override def resolveById(id: IdType): RIO[AggregateType] = for { record <- ReaderT[Task, DBSession, RecordType] { implicit dbSession: DBSession => Task { dao.findBy(byCondition(id)).getOrElse(throw AggregateNotFoundException(id)) } } aggregate <- convertToAggregate(record) } yield aggregate }
実装サンプル
実装する際は、Aggregate*Feature
のトレイトをミックスしてください(UserAccountRepositoryは実装を持たない抽象型です)。あとはDaoの実装の提供だけです。以下の例では、UserAccountComponent
がUserAccountRecord
, UserAccountDao
を提供します。
object UserAccountRepository { type BySlick[A] = Task[A] def bySkinny: UserAccountRepository[BySkinny] = new UserAccountRepositoryBySkinny } class UserAccountRepositoryBySkinny extends UserAccountRepository[BySkinny] with AggregateSingleReadFeature with AggregateSingleWriteFeature with AggregateMultiReadFeature with AggregateMultiWriteFeature with AggregateSingleSoftDeleteFeature with AggregateMultiSoftDeleteFeature with UserAccountComponent { override type RecordType = UserAccountRecord override type DaoType = UserAccountDao.type override protected val dao: UserAccountDao.type = UserAccountDao override protected def convertToAggregate: UserAccountRecord => RIO[UserAccount] = { record => ReaderT { _ => // このメソッド内部でDBを利用したり、非同期タスクを実行する可能性もあるので、RIO形式を取っている Task.pure { UserAccount( id = UserAccountId(record.id), status = Status.withName(record.status), emailAddress = EmailAddress(record.email), password = HashedPassword(record.password), firstName = record.firstName, lastName = record.lastName, createdAt = record.createdAt, updatedAt = record.updatedAt ) } } } override protected def convertToRecord: UserAccount => RIO[UserAccountRecord] = { aggregate => ReaderT { _ => Task.pure { UserAccountRecord( id = aggregate.id.value, status = aggregate.status.entryName, email = aggregate.emailAddress.value, password = aggregate.password.value, firstName = aggregate.firstName, lastName = aggregate.lastName, createdAt = aggregate.createdAt, updatedAt = aggregate.updatedAt ) } } } }
DaoはSkinnyDaoSupport#Daoを実装する形式になります。これはSkinnyCRUDMapperの派生型です。 僕の場合は、ここで紹介した方法でスキーマから自動生成しています。
package skinny { import com.github.j5ik2o.dddbase.skinny.SkinnyDaoSupport import scalikejdbc._ import _root_.skinny.orm._ trait UserAccountComponent extends SkinnyDaoSupport { case class UserAccountRecord( id: Long, status: String, email: String, password: String, firstName: String, lastName: String, createdAt: java.time.ZonedDateTime, updatedAt: Option[java.time.ZonedDateTime] ) extends Record object UserAccountDao extends Dao[UserAccountRecord] { override def useAutoIncrementPrimaryKey: Boolean = false override val tableName: String = "user_account" override protected def toNamedValues(record: UserAccountRecord): Seq[(Symbol, Any)] = Seq( 'status -> record.status, 'email -> record.email, 'password -> record.password, 'first_name -> record.firstName, 'last_name -> record.lastName, 'created_at -> record.createdAt, 'updated_at -> record.updatedAt ) override def defaultAlias: Alias[UserAccountRecord] = createAlias("u") override def extract(rs: WrappedResultSet, s: ResultName[UserAccountRecord]): UserAccountRecord = autoConstruct(rs, s) } } }
利用するときは、以下のような感じでリポジトリが返すReaderT[Task, DBSession, UserAccount]#run
にDBSession
を渡すとTask
が返ってきます。
それをrunAsync
するとFuture[UserAccount]
が取得できます。他の実装でもほとんど同じように使えます。ご参考までに。
val resultFuture: Future[UserAccount] = (for { _ <- repository.store(userAccount) r <- repository.resolveById(userAccount.id) } yield r).run(AutoSession).runAsync
sbtでDAOを自動生成する方法
DDDのリポジトリを実装する際、ほとんどのケースでDAOが必要になります。が、ボイラープレートが多く、自動生成したいところです。というわけで作りました。
どうやって自動化するか
septeni-original/sbt-dao-generator
指定されたスキーマのJDBCメタ情報とテンプレートをマージさせて、ソースコードを出力します。その機能を提供するのがsbt-dao-generator1です。つまり、sbt
からコマンド一発でこういうことができるようになるわけですが、 DBのインスタンスが立ち上がっていて、スキーマ情報が組み込まれた状態でないと使えません。
chatwork/sbt-wix-embedded-mysql
sbt
から組み込みMySQLを起動するプラグインです。MySQL固定です…。
flyway/flyway-sbt
こちらは言わずもがな、有名なsbtプラグイン。組み込みMySQL上にスキーマを自動作成するために使います。
環境構築手順
実際のサンプルコードは、j5ik2o/scala-ddd-baseをみてください。
flyway
を扱うプロジェクトflywayとDAOを自動生成するプロジェクトexampleはわけています。
project/plugins.sbt
プラグインを追加しましょう
addSbtPlugin("com.chatwork" % "sbt-wix-embedded-mysql" % "1.0.9") addSbtPlugin("jp.co.septeni-original" % "sbt-dao-generator" % "1.0.8") addSbtPlugin("io.github.davidmweber" % "flyway-sbt" % "5.0.0")
テンプレートを作りましょう
FTLでDAOのテンプレートを書きます。以下はSkinnyORMのための例です。レコードクラスとDAOクラスです。
case class ${className}Record( <#list primaryKeys as primaryKey> ${primaryKey.propertyName}: ${primaryKey.propertyTypeName}<#if primaryKey_has_next>,</#if></#list><#if primaryKeys?has_content>,</#if> <#list columns as column> <#if column.columnName == "status"> <#assign softDelete=true> </#if> <#if column.nullable> ${column.propertyName}: Option[${column.propertyTypeName}]<#if column_has_next>,</#if> <#else> ${column.propertyName}: ${column.propertyTypeName}<#if column_has_next>,</#if> </#if> </#list> ) extends Record object ${className}Dao extends Dao[${className}Record] { override def useAutoIncrementPrimaryKey: Boolean = false override val tableName: String = "${tableName}" override protected def toNamedValues(record: ${className}Record): Seq[(Symbol, Any)] = Seq( <#list columns as column> '${column.name} -> record.${column.propertyName}<#if column.name?ends_with("id") || column.name?ends_with("Id")>.value</#if><#if column_has_next>,</#if> </#list> ) override def defaultAlias: Alias[UserAccountRecord] = createAlias("${className[0]?lower_case}") override def extract(rs: WrappedResultSet, s: ResultName[${className}Record]): ${className}Record = autoConstruct(rs, s) }
特定のDAOに依存しないので、ほとんどのものに対応できるはず。以下は、Slick用とSkinny用の両方に対応したテンプレート例です。どちらでも好きなORMを使ってください。
https://github.com/j5ik2o/scala-ddd-base/blob/reboot/example/templates/template.ftl
テンプレートの書き方はこちら参照。カラム名をあらかじめプロパティ名としてテンプレートコンテキストに含めているので、簡単に書けるはずです。
build.sbt
- flywayプロジェクト
https://github.com/j5ik2o/scala-ddd-base/blob/master/build.sbt#L114-L136
このプロジェクトではchatwork/sbt-wix-embedded-mysqlとflyway/flyway-sbtを使って自動的にスキーマを作ります。flywayMigrate := (flywayMigrate dependsOn wixMySQLStart).value
としているので、sbt flyway/flywayMigrate
する前に組み込みMySQLが起動します。
- exampleプロジェクト
https://github.com/j5ik2o/scala-ddd-base/blob/master/build.sbt#L138-L188
JDBCの接続先設定は、flyway
プロジェクトと同じ設定を指定してください。
このプロジェクトでは、septeni-original/sbt-dao-generatorを使ってJDBCメタ情報とテンプレートをマージして、DAOクラスのソースコードを生成します。
今回は生成物をGitで管理したかったので、以下のようにして通常のソースコードと同じパスに出力していますが、(sourceManaged in Compile).value
を使ってtarget/src_managed
に出力することも可能です。
outputDirectoryMapper in generator := { case s if s.endsWith("Spec") => (sourceDirectory in Test).value case s => new java.io.File((scalaSource in Compile).value, "/com/github/j5ik2o/dddbase/example/dao") },
outputDirectoryMapper in generator := { className: String => (sourceManaged in Compile).value },
コンパイル時にソースコード生成するには以下のようにしてください。コンパイルと無関係に生成タスクを実行したい場合はsbt generator::generateAll
としてください。
// sourceGenerators in Compile時に出力
sourceGenerators in Compile += (generateAll in generator).value
コンパイルより前に出力したい場合は以下でも動作します。
// コンパイルより前に出力
compile in Compile := ((compile in Compile) dependsOn (generateAll in generator)).value
あとでexample
プロジェクトからflyway
プロジェクトに依存することをお忘れ無く
val example = ...
dependsOn(..., flyway)
生成
コンパイル時に自動的に以下が行われます。
- 組み込みMySQLの起動
- flyway マイグレーション実行
- JDBCメタ情報とテンプレートのマージとソースファイル出力
- コンパイル
実際に生成されたソースコードはこちら。
package com.github.j5ik2o.dddbase.example.dao package slick { import com.github.j5ik2o.dddbase.slick.SlickDaoSupport trait UserAccountComponent extends SlickDaoSupport { import profile.api._ case class UserAccountRecord( id: Long, status: String, email: String, password: String, firstName: String, lastName: String, createdAt: java.time.ZonedDateTime, updatedAt: Option[java.time.ZonedDateTime] ) extends SoftDeletableRecord case class UserAccounts(tag: Tag) extends TableBase[UserAccountRecord](tag, "user_account") with SoftDeletableTableSupport[UserAccountRecord] { // def id = column[Long]("id", O.PrimaryKey) def status = column[String]("status") def email = column[String]("email") def password = column[String]("password") def firstName = column[String]("first_name") def lastName = column[String]("last_name") def createdAt = column[java.time.ZonedDateTime]("created_at") def updatedAt = column[Option[java.time.ZonedDateTime]]("updated_at") override def * = (id, status, email, password, firstName, lastName, createdAt, updatedAt) <> (UserAccountRecord.tupled, UserAccountRecord.unapply) } object UserAccountDao extends TableQuery(UserAccounts) } } package skinny { import com.github.j5ik2o.dddbase.skinny.SkinnyDaoSupport import scalikejdbc._ import _root_.skinny.orm._ trait UserAccountComponent extends SkinnyDaoSupport { case class UserAccountRecord( id: Long, status: String, email: String, password: String, firstName: String, lastName: String, createdAt: java.time.ZonedDateTime, updatedAt: Option[java.time.ZonedDateTime] ) extends Record object UserAccountDao extends Dao[UserAccountRecord] { override def useAutoIncrementPrimaryKey: Boolean = false override val tableName: String = "user_account" override protected def toNamedValues(record: UserAccountRecord): Seq[(Symbol, Any)] = Seq( 'status -> record.status, 'email -> record.email, 'password -> record.password, 'first_name -> record.firstName, 'last_name -> record.lastName, 'created_at -> record.createdAt, 'updated_at -> record.updatedAt ) override def defaultAlias: Alias[UserAccountRecord] = createAlias("u") override def extract(rs: WrappedResultSet, s: ResultName[UserAccountRecord]): UserAccountRecord = autoConstruct(rs, s) } } }
まとめ
このプロジェクト構成は、仕事でも結構がっつり使っていて気に入っています。スキーマ変更が起こっても、DAOは一瞬で自動生成できるので楽になると思います。興味あれば使ってみてください!
-
僕とセプテーニさんとコラボして作りました。↩