移動しました。
akka-cluster スプリットブレインリゾルバ OSS実装一覧
Akkaクラスターがネットワーク分断に遭遇した場合に、UnreachableメンバーをDown状態に遷移させるためのリゾルバのOSS実装を以下にまとめる。
ちなみに、このリゾルバがない場合はUnreachableのままだとリーダアクションが取れずにクラスターが機能不全状態なる。かといってAutodownを有効にするとスプリットブレインが発生する可能性がある。これを解決するのがスプリットブレインリゾルバで商用版はLightbend社から提供されている。スプリットブレインリゾルバの仕様はこちら
git repo | stars | 備考 |
---|---|---|
TanUkkii007/akka-cluster-custom-downing | 131 | OldestAutoDowning, QuorumLeaderAutoDowning, MajorityLeaderAutoDowningに対応している。OldestAutoDowningには不具合があるようだ。要修正 |
mbilski/akka-reasonable-downing | 85 | Static QuorumによるDowningにしか対応していない |
arnohaase/simple-akka-downing | 16 | static-quorum, keep-majority, keep-oldestに対応している |
guangwenz/akka-down-resolver | 5 | Static QuorumによるDowningにしか対応していない |
sbtでビルドしたDockerイメージをECRにプッシュする方法
sbt-native-packagerとsbt-ecrを使って、ビルド→ECRログイン→ECRへのプッシュまで行います。
https://github.com/sbt/sbt-native-packager https://github.com/sbilinski/sbt-ecr
ECRを作成する
レジストリを作る。作成後 以下のようなURIが取得できます。
123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/j5ik2o/thread-weaver-api-server
sbtプラグインの設定の設定
project/plugins.sbt
// ... addSbtPlugin("com.mintbeans" % "sbt-ecr" % "0.14.1") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.10") // ...
build.sbtの設定
repositoryName
は、上記で得られたURIのホスト名を除く文字列(j5ik2o/thread-weaver-api-server
)を指定すること
import com.amazonaws.regions.{Region, Regions} val ecrSettings = Seq( region in Ecr := Region.getRegion(Regions.AP_NORTHEAST_1), repositoryName in Ecr := "j5ik2o/thread-weaver-api-server", repositoryTags in Ecr ++= Seq(version.value), // タグを付ける場合は指定する localDockerImage in Ecr := "j5ik2o/" + (packageName in Docker).value + ":" + (version in Docker).value, push in Ecr := ((push in Ecr) dependsOn (publishLocal in Docker, login in Ecr)).value ) val `api-server` = (project in file("api-server")) .enablePlugins(AshScriptPlugin, JavaAgent, EcrPlugin) .settings(baseSettings) .settings(ecrSettings) .settings( name = "thread-weaver-api-server", dockerBaseImage := "adoptopenjdk/openjdk8:x86_64-alpine-jdk8u191-b12", maintainer in Docker := "Junichi Kato <j5ik2o@gmail.com>", dockerUpdateLatest := true, dockerUsername := Some("j5ik2o"), bashScriptExtraDefines ++= Seq( "addJava -Xms${JVM_HEAP_MIN:-1024m}", "addJava -Xmx${JVM_HEAP_MAX:-1024m}", "addJava -XX:MaxMetaspaceSize=${JVM_META_MAX:-512M}", "addJava ${JVM_GC_OPTIONS:--XX:+UseG1GC}", "addJava -Dconfig.resource=${CONFIG_RESOURCE:-application.conf}" ), // ... )
実行
profileを使う場合は以下のようにする
$ AWS_DEFAULT_PROFILE=xxxxx sbt api-server/ecr:push
Microsoft社のDDD, CQRSに関する記事一覧
CQRS+ESパターンの説明
- コマンド クエリ責務分離 (CQRS) パターン
- ステートソーシング(CRUD)の短所としてあげているもの
- 読み取りと書き込みのデータ表現不一致問題
- ステートの同時更新による競合問題
- 読み取りと書き込みが同じモデルによる、データの誤用問題
- CRUDの単一データモデルより設計と実装が簡単になる。ただし、CRUDはスキャーフォルドメカニズムによって自動生成できない
- リードモデルはSQLビューもしくは即時プロジェクションを生成するか。
- 同じ物理ストアに格納する場合は、パフォーマンス、スケーラビリティ、セキュリティを最大化するため、データストアを物理的に分けるのが一般的
- ステートソーシング(CRUD)の短所としてあげているもの
- イベント ソーシング パターン
- ステートソーシング(CRUD)の短所としてあげているもの
- 更新時のロックがパフォーマンスと応答性を低下させる
- 単一データ項目への更新は競合が起きやすい(コラボレータが複数の場合)
- 監査メカニズムがない限り、履歴が失われる
- ESのメリット
- イベントが不変であり、追記保存するだけ。イベントを処理するプロセスは非同期処理で問題が生じない
- イベントは、データストアを更新しない、シンプルなオブジェクト。
- イベントはドメインエキスパートの関心事。
- 同時更新による競合の発生を防ぐことができる(ただし、ドメインオブジェクトが矛盾した状態にならないよう依然として保護が必要)
- ステートソーシング(CRUD)の短所としてあげているもの
関連記事
DDD,CQRS関連
マイクロサービス関連
マイクロサービス関連。DDDだと戦略的設計の部分。
Scala.js ウェブフレームワーク Facade 調査(2019年4月版)
主要なウェブフレームワークのFacadeについて調査した(2019/4/17日時点)。
React.js
Reactバージョン | Scalaバージョン | ライブラリ名 | スター数 | 最終更新日時 | タグ形式 | ReactRouterサポート | Reduxサポート | 備考 |
---|---|---|---|---|---|---|---|---|
16.6+ | 2.12,2.11 | japgolly/scalajs-react | 1196 | 1時間前 | Scalatags形式 | Scala.js独自実装 | サポートなし | |
16.8.1 | 2.12 | shadaj/slinky | 276 | 4日前 | case class | なし | なし | japgolly/scalajs-reactに依存, ReactNative対応 |
16.8.1+ | 2.12 | aappddeevv/scalajs-reaction | 17 | 25日前 | case class | あり | あり | |
15.5.3 | 2.12 | shogowada/scalajs-reactjs | 23 | 2017/5 | Scalatags形式 | あり | あり | |
0.11.0 | 2.11 | xored/scala-js-react | 133 | 2015/2 | scalax | なし | なし |
- Reactコンポーネント関連
所感
- aappddeevv/scalajs-reactionもreact-router,redux対応しているので魅力的だが、スターと更新頻度的にjapgolly/scalajs-reactが安心
- japgolly/scalajs-reactのルーターは独自実装だが使えれば問題ない。Redux部分もScala.jsで関数型プログラミングすればなくても問題なさそう
Vue.js
Vueバージョン | Scalaバージョン | ライブラリ名 | スター数 | 最終更新日時 | Vue Routerサポート | Vuexサポート | 備考 |
---|---|---|---|---|---|---|---|
2.x | 2.11,2.12 | random-scalor/scala-js-vue | 3 | 2018/2 | あり | あり | |
2.x | 2.12 | massung/scala-js-vue | 12 | 2017/11 | なし | なし | |
2.x | 2.11 | fancellu/scalajs-vue | 76 | 2017/5 | なし | なし |
所感
- とはいえどれも古くなってきている…。
- まともに使えそうなものはrandom-scalor/scala-js-vue
Angular
- 最新のAngularを使えるFacadeを見つけることができなかった…。
まとめ
Scala.jsでウェブアプリケーションを作るならscalajs-react
一択…。他は自分でがっつりコントリビュートする気がないと、いろいろ大変そうです。
ということで、サンプルプロジェクトを二種類用意したので、興味があれば確認してみてください。
https://github.com/j5ik2o/scalajs-react-webpack4-example https://github.com/j5ik2o/scalajs-vuejs-webpack4-example
ID生成方法についてあれこれ
引っ越しました。
akka-streamを使ってmemcachedクライアントを作るには
かなり前になるのですが、akka-streamの習作課題として、Memcachedクライアントを実装したので、概要を解説する記事をまとめます。詳しくはgithubをみてください。
https://github.com/j5ik2o/reactive-memcached
TCPのハンドリング方法
akkaでTCPをハンドリングするには、以下の二つになる。おそらく
akka.actor
とakka.io
のパッケージを使う方式。つまり、アクターでTCP I/Oを実装する方法。akka.stream
のTcp#outgoingConnection
というAPIを使う方法
今回は、akka-streamをベースにしたかったので、2を採用しました。
outgoingConnection
メソッドの戻り値の型は、Flow[ByteString, ByteString, Future[OutgoingConnection]]
です。ByteString
を渡せば、ByteString
が返ってくるらしい。わかりやすいですね。そしてストリームが起動中であれば、コネクションは常時接続された状態になります。
def outgoingConnection(
remoteAddress : InetSocketAddress,
localAddress : Option[InetSocketAddress],
options : Traversable[SocketOption],
halfClose : Boolean,
connectTimeout : Duration,
idleTimeout : Duration) : Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]]
Memcachedのコネクションを表現するオブジェクトを実装する
では、早速 Memcachedのコネクションを表現するオブジェクトである MemcachedConnection
を実装します。このオブジェクトを生成するとTCP接続され、破棄されると切断されます。そして、考えたインターフェイスは以下です(別にtrait切り出さなくてもよかったですが、説明のために分けました)。send
メソッドにコマンドを指定して呼ぶとレスポンスが返る単純なものです。
ちなみに、send
メソッドの戻り値の型はmonix.eval.Task
です。詳しくは https://monix.io/docs/3x/eval/task.html をご覧ください。
trait MemcachedConnection { def id: UUID // 接続ID def shutdown(): Unit // シャットダウン def peerConfig: Option[PeerConfig] // 接続先設定 def send[C <: CommandRequest](cmd: C): Task[cmd.Response] // コマンドの送信 // ... }
実装はどうなるか。全貌は以下。先ほどのTcp#outgoingConnection
を使います。
private[memcached] class MemcachedConnectionImpl(_peerConfig: PeerConfig, supervisionDecider: Option[Supervision.Decider])( implicit system: ActorSystem ) extends MemcachedConnection { // ... private implicit val mat: ActorMaterializer = ... protected val tcpFlow: Flow[ByteString, ByteString, NotUsed] = RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () => Tcp() .outgoingConnection(remoteAddress, localAddress, options, halfClose, connectTimeout, idleTimeout) } protected val connectionFlow: Flow[RequestContext, ResponseContext, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val requestFlow = b.add( Flow[RequestContext] .map { rc => log.debug(s"request = [{}]", rc.commandRequestString) (ByteString.fromString(rc.commandRequest.asString + "\r\n"), rc) } ) val responseFlow = b.add(Flow[(ByteString, RequestContext)].map { case (byteString, requestContext) => log.debug(s"response = [{}]", byteString.utf8String) ResponseContext(byteString, requestContext) }) val unzip = b.add(Unzip[ByteString, RequestContext]()) val zip = b.add(Zip[ByteString, RequestContext]()) requestFlow.out ~> unzip.in unzip.out0 ~> tcpFlow ~> zip.in0 unzip.out1 ~> zip.in1 zip.out ~> responseFlow.in FlowShape(requestFlow.in, responseFlow.out) }) protected val (requestQueue: SourceQueueWithComplete[RequestContext], killSwitch: UniqueKillSwitch) = Source .queue[RequestContext](requestBufferSize, overflowStrategy) .via(connectionFlow) .map { responseContext => log.debug(s"req_id = {}, command = {}: parse", responseContext.commandRequestId, responseContext.commandRequestString) val result = responseContext.parseResponse // レスポンスのパース responseContext.completePromise(result.toTry) // パース結果を返す } .viaMat(KillSwitches.single)(Keep.both) .toMat(Sink.ignore)(Keep.left) .run() override def shutdown(): Unit = killSwitch.shutdown() override def send[C <: CommandRequest](cmd: C): Task[cmd.Response] = Task.deferFutureAction { implicit ec => val promise = Promise[CommandResponse]() requestQueue .offer(RequestContext(cmd, promise, ZonedDateTime.now())) .flatMap { case QueueOfferResult.Enqueued => promise.future.map(_.asInstanceOf[cmd.Response]) case QueueOfferResult.Failure(t) => Future.failed(BufferOfferException("Failed to send request", Some(t))) case QueueOfferResult.Dropped => Future.failed( BufferOfferException( s"Failed to send request, the queue buffer was full." ) ) case QueueOfferResult.QueueClosed => Future.failed(BufferOfferException("Failed to send request, the queue was closed")) } } }
TCPの送受信するにはtcpFlow
メソッドが返すFlow
に必要なByteString
を流して、ByteString
を受け取ればいいですが、リクエストとレスポンスを扱うためにもう少し工夫が必要です。そのためにconnectionFlow
メソッドはtcpFlow
メソッドを内部Flow
として利用します。そして、FlowはRequestContext
とResponseContext
の型を利用します。その実装は以下のようなシンプルなものです。キューであるrequestQueue
はconnectionFlow
にリクエストを送信します。ストリームの後半で返ってきたレスポンスを処理するという流れになっています。キューへのリクエストのエンキューはsend
メソッドが行います。send
メソッドが呼ばれるとRequestContext
を作ってrequestQueue
にエンキュー、完了するとレスポンスを返します。requestQueue
はストリームと繋がっていて、エンキューされたRequestContext
はconnectionFlow
に渡され返ってきたResponseContext
がレスポンス内容をパースし、その結果をPromise経由で返すというものです1。エラーハンドリングについて、エラーを起こしやすいところにRestartFlow.withBackoff
を入れています。
final case class RequestContext(commandRequest: CommandRequest, promise: Promise[CommandResponse], requestAt: ZonedDateTime) { val id: UUID = commandRequest.id val commandRequestString: String = commandRequest.asString } final case class ResponseContext(byteString: ByteString, requestContext: RequestContext, requestsInTx: Seq[CommandRequest] = Seq.empty, responseAt: ZonedDateTime = ZonedDateTime.now) extends ResponseBase { val commandRequest: CommandRequest = requestContext.commandRequest def withRequestsInTx(values: Seq[CommandRequest]): ResponseContext = copy(requestsInTx = values) // レスポンスのパース def parseResponse: Either[ParseException, CommandResponse] = { requestContext.commandRequest match { case scr: CommandRequest => scr.parse(ByteVector(byteString.toByteBuffer)).map(_._1) } } }
コネクションオブジェクトの使い方は簡単です。ほとんどの場合は、このようにコネクションオブジェクトを直接操作せずに、高レベルなAPIを操作したいと思うはずです。そのためのクライアントオブジェクトはあとで述べます。
val connection = MemcachedConnection( PeerConfig(new InetSocketAddress("127.0.0.1", memcachedTestServer.getPort), backoffConfig = BackoffConfig(maxRestarts = 1)), None ) val resultFuture = (for { _ <- connection.send(SetRequest(UUID.randomUUID(), "key1", "1", 10 seconds)) _ <- connection.send(SetRequest(UUID.randomUUID(), "key2", "2", 10 seconds)) gr1 <- connection.send(GetRequest(UUID.randomUUID(), "key1")) gr2 <- connection.send(GetRequest(UUID.randomUUID(), "key2")) } yield (gr1, gr2)).runAsync val result = Await.result(resultFuture, Duration.Inf) // (1, 2)
コマンドの実装
コマンドの一例はこんな感じです。
asString
メソッドは文字通りMemcachedにコマンドを送信するときに利用される文字列です。responseParser
はfastparseで実装されたレスポンスパーサを指定します。parseResponse
メソッドはMemcachedからのレスポンスを表現する構文木をコマンドのレンスポンスに変換するための関数です。これは少し複雑なのであとで説明します。
final class GetRequest private (val id: UUID, val key: String) extends CommandRequest with StringParsersSupport { override type Response = GetResponse override val isMasterOnly: Boolean = false override def asString: String = s"get $key" // レスポンスを解析するためのパーサを指定。レスポンス仕様に合わせて指定する override protected def responseParser: P[Expr] = P(retrievalCommandResponse) // ASTをコマンドレスポンスに変換する override protected def parseResponse: Handler = { case (EndExpr, next) => (GetSucceeded(UUID.randomUUID(), id, None), next) case (ValueExpr(key, flags, length, casUnique, value), next) => (GetSucceeded(UUID.randomUUID(), id, Some(ValueDesc(key, flags, length, casUnique, value))), next) case (ErrorExpr, next) => (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.OtherType, None)), next) case (ClientErrorExpr(msg), next) => (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ClientType, Some(msg))), next) case (ServerErrorExpr(msg), next) => (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ServerType, Some(msg))), next) } }
これはGetRequestの上位traitであるCommandRequestです。Memcachedのコマンド送信に必要なのはasString
メソッドだけで、コマンドのレスポンスのパースにはparse
メソッドが利用されます。
trait CommandRequest { type Elem type Repr type P[+T] = core.Parser[T, Elem, Repr] type Response <: CommandResponse type Handler = PartialFunction[(Expr, Int), (Response, Int)] val id: UUID val key: String val isMasterOnly: Boolean def asString: String protected def responseParser: P[Expr] protected def convertToParseSource(s: ByteVector): Repr def parse(text: ByteVector, index: Int = 0): Either[ParseException, (Response, Int)] = { responseParser.parse(convertToParseSource(text), index) match { case f @ Parsed.Failure(_, index, _) => Left(new ParseException(f.msg, index)) case Parsed.Success(value, index) => Right(parseResponse((value, index))) } } protected def parseResponse: Handler }
レスポンス・パーサの実装
レスポンスパーサは、Memcachedの仕様に合わせて以下の定義から適切なものを選んで利用します。詳しくは公式ドキュメントをみてください。標準のパーサーコンビネータと少し違うところがありますが、概ね似たような感じで書けます。Memcachedのレスポンスのルールは単純で左再帰除去などもしなくていいので、パーサーコンビネータのはじめての題材にはよいと思います。
object StringParsers { val digit: P0 = P(CharIn('0' to '9')) val lowerAlpha: P0 = P(CharIn('a' to 'z')) val upperAlpha: P0 = P(CharIn('A' to 'Z')) val alpha: P0 = P(lowerAlpha | upperAlpha) val alphaDigit: P0 = P(alpha | digit) val crlf: P0 = P("\r\n") val error: P[ErrorExpr.type] = P("ERROR" ~ crlf).map(_ => ErrorExpr) val clientError: P[ClientErrorExpr] = P("CLIENT_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ClientErrorExpr) val serverError: P[ServerErrorExpr] = P("SERVER_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ServerErrorExpr) val allErrors: P[Expr] = P(error | clientError | serverError) val end: P[EndExpr.type] = P("END" ~ crlf).map(_ => EndExpr) val deleted: P[DeletedExpr.type] = P("DELETED" ~ crlf).map(_ => DeletedExpr) val stored: P[StoredExpr.type] = P("STORED" ~ crlf).map(_ => StoredExpr) val notStored: P[NotStoredExpr.type] = P("NOT_STORED" ~ crlf).map(_ => NotStoredExpr) val exists: P[ExistsExpr.type] = P("EXISTS" ~ crlf).map(_ => ExistsExpr) val notFound: P[NotFoundExpr.type] = P("NOT_FOUND" ~ crlf).map(_ => NotFoundExpr) val touched: P[TouchedExpr.type] = P("TOUCHED" ~ crlf).map(_ => TouchedExpr) val key: P[String] = (!" " ~/ AnyChar).rep(1).! val flags: P[Int] = digit.rep(1).!.map(_.toInt) val bytes: P[Long] = digit.rep(1).!.map(_.toLong) val casUnique: P[Long] = digit.rep(1).!.map(_.toLong) val incOrDecCommandResponse: P[Expr] = P(notFound | ((!crlf ~/ AnyChar).rep.! ~ crlf).map(StringExpr) | allErrors) val storageCommandResponse: P[Expr] = P((stored | notStored | exists | notFound) | allErrors) val value: P[ValueExpr] = P("VALUE" ~ " " ~ key ~ " " ~ flags ~ " " ~ bytes ~ (" " ~ casUnique).? ~ crlf ~ (!crlf ~/ AnyChar).rep.! ~ crlf) .map { case (key, flags, bytes, cas, value) => ValueExpr(key, flags, bytes, cas, value) } val version: P[VersionExpr] = P("VERSION" ~ " " ~ alphaDigit.rep(1).!).map(VersionExpr) val retrievalCommandResponse: P[Expr] = P(end | value | allErrors) val deletionCommandResponse: P[Expr] = P(deleted | notFound | allErrors) val touchCommandResponse: P[Expr] = P(touched | notFound | allErrors) val versionCommandResponse: P[Expr] = P(version | allErrors) }
クライアントの実装
次はクライアント実装。MemcachedClient#send
メソッドはコマンドを受け取り、ReaderTTaskMemcachedConnection[cmd.Response]
を返します。ReaderTTaskMemcachedConnection
はMemcachedのために特化したReaderTで、コネクションを受け取り、レスポンスを返すTask
を返します。Memcachedのための高レベルなAPIはこのsend
メソッドを利用して実装されます。get
,set
などのメソッドの内部でコマンドを作成し、レスポンスを戻り値にマッピングするだけで、使いやすい高レベルAPIになります。また、ReaderT
を返すためfor式での簡潔な記述もできます。
package object memcached { type ReaderTTask[C, A] = ReaderT[Task, C, A] type ReaderTTaskMemcachedConnection[A] = ReaderTTask[MemcachedConnection, A] type ReaderMemcachedConnection[M[_], A] = ReaderT[M, MemcachedConnection, A] }
final class MemcachedClient()(implicit system: ActorSystem) { def send[C <: CommandRequest](cmd: C): ReaderTTaskMemcachedConnection[cmd.Response] = ReaderT(_.send(cmd)) def get(key: String): ReaderTTaskMemcachedConnection[Option[ValueDesc]] = send(GetRequest(UUID.randomUUID(), key)).flatMap { case GetSucceeded(_, _, result) => ReaderTTask.pure(result) case GetFailed(_, _, ex) => ReaderTTask.raiseError(ex) } def set[A: Show](key: String, value: A, expireDuration: Duration = Duration.Inf, flags: Int = 0): ReaderTTaskMemcachedConnection[Int] = send(SetRequest(UUID.randomUUID(), key, value, expireDuration, flags)).flatMap { case SetExisted(_, _) => ReaderTTask.pure(0) case SetNotFounded(_, _) => ReaderTTask.pure(0) case SetNotStored(_, _) => ReaderTTask.pure(0) case SetSucceeded(_, _) => ReaderTTask.pure(1) case SetFailed(_, _, ex) => ReaderTTask.raiseError(ex) } // ... }
- for式での利用例
val client = new MemcachedClient() val resultFuture = (for { _ <- client.set(key, "1") r1 <- client.get(key) _ <- client.set(key, "2") r2 <- client.get(key) } yield (r1, r2)).run(connection).runAsync val result = Await.result(resultFuture, Duration.Inf) // (1, 2)
コネクションプール
コネクションとクライアントを実装できたら、次はコネクションプールも欲しくなります。MemcachedConnection
をプーリングして選択できればいいわけなので、commons-pool
など使えば楽ですね。コネクションをプールするアルゴリズム(HashRingなど)も複数考えられるので、工夫するとなかなか面白いです。
プールの実装 https://github.com/j5ik2o/reactive-memcached/blob/master/core/src/main/scala/com/github/j5ik2o/reactive/memcached/pool/MemcachedConnectionPoolActor.scala https://github.com/j5ik2o/reactive-memcached/blob/master/pool-commons/src/main/scala/com/github/j5ik2o/reactive/memcached/CommonsPool.scala
利用例
implicit val system = ActorSystem() val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379)) val pool = MemcachedConnectionPool.ofSingleRoundRobin(sizePerPeer = 5, peerConfig, RedisConnection(_)) // powered by RoundRobinPool val connection = MemcachedConnection(connectionConfig) val client = MemcachedClient() // ローンパターン形式 val resultFuture1 = pool.withConnectionF{ con => (for{ _ <- client.set("foo", "bar") r <- client.get("foo") } yield r).run(con) }.runAsync // モナド形式 val resultFuture2 = (for { _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run) r <- ConnectionAutoClose(pool)(client.get("foo").run) } yield r).run().runAsync
まとめ
というわけでこんな風にakka-streamを使えばTCPクライアントも比較的簡単に実装できると思います。参考にしてみてください。
-
バックプレッシャを適切にハンドリングするには、ストリームを常に起動した状態にしておく必要があります、要求ごとにストリームを起動していると効果的ではありません))↩