かとじゅんの技術日誌

技術の話をするところ

Error, Defect, Fault, Failureの定義と解釈

ソフトウェア設計を行う場合、必ずエラーや障害などの用語が飛び交うことがあるが認識がずれていることが多いので、以下に簡単にまとめてみた。

ソフトウェアの非正常を表す概念

ソフトウェアのエラーや障害などの非正常を表す概念の定義をいくつかピックアップしてみた。

JIS X 0014:1999の定義

出典: ソフトウェア品質知識体系ガイド(第2版) - SQuBOK Guide V2-(ソフトウェア品質知識体系ガイド(第2版) - SQuBOK Guide V2-)

  1. 誤差・誤り(error)
    計算,観測もしくは測定された値または状態と,真の,指定されたもしくは理論的に正しい値または状態との間の相違。
  2. 障害(fault)
    要求された機能を遂行する機能単位の能力の,縮退または喪失を引き起こす,異常な状態。
  3. 故障(failure)
    要求された機能を遂行する,機能単位の能力がなくなること。

JIS X 0014では、意図しない結果を引き起こす人間の行為は、「間違い、人的過誤(mistake, human error)」として別に定義しており,「誤差・誤り(error)」の中に含めていない。

JIS Z 8115:2000の定義

  1. エラー・誤り(error)
    計算,観察,または測定値若しくは条件と,特定されまたは推論的に正しい若しくは条件との間の不一致。
  2. フォールト(fault)
    ある要求された機能を遂行不可能なアイテムの状態,また,その状態にあるアイテムの部分。アイテムの要求機能遂行能力を失わせたり,要求機能遂行能力に支障を起こさせる原因(設計の状態)。
  3. 故障(failure)
    アイテムが要求機能達成能力を失うこと。

JIS X 8115:2000では、人的な間違いをおかす行為(ヒューマンエラー)を「エラー・誤り(error)」に含める場合があるとしている。

JSTQB ソフトウェアテスト標準用語集の定義

  • 出典

  • エラー(error)
    間違った結果を生み出す人間の行為。

  • 誤り(mistake)
    エラー(error)と同義
  • 欠陥(defect)
    コンポーネント又はシステムに要求された機能が実現できない原因となる、コンポーネント又はシステムに含まれる不備。たとえば、不正なステートメント又はデータ定義。実行中に欠陥に遭遇した場合、コンポーネント又はシステムの故障を引き起こす。
  • バグ(bug)
    欠陥(defect)と同義
  • フォールト(fault)
    欠陥(defect)と同義
  • 故障(failure)
    コンポーネントやシステムが、期待した機能、サービス、結果から逸脱すること。

ソフトウェアの災いを表す用語

出典: オブジェクト指向入門 原則・コンセプト P449

Bertrand Meyerさんの考え方:

  1. エラー(error)とは、ソフトウェアシステムの開発中になされた誤った決定である。
  2. 欠陥(defect)とは、意図した振る舞いからシステムが逸れてしまう原因となるソフトウェアシステムの特性である。
  3. フォルト(fault)とは、何らかの実行中に意図した振る舞いから逸れてしまうソフトウェアシステムのイベントである。

  4. エラーが設計時の視点にみえる

JIS Q 9000:2006の定義

「品質マネジメントシステム-基本及び用語」なのでソフトウェアに限った定義ではないですが、欠陥の定義は以下です。

  1. 欠陥(defect)
    意図された用途または規定された用途に関連する要求事項を満たしていないこと

ざっくり比較

ざっくりの比較表。 人為的な過誤を除いて、システムで扱う非正常は大きく分けて、エラー(Error)、欠陥(Defect)、障害(Fault)、故障(Failure)がある。1

種別 エラー(Error) 欠陥(Defect) 障害(Fault) 故障(Failure)
JIS X 0014:1999 値または状態の不一致(人的過誤を含まない)。 - 機能単位の能力の,縮退または喪失を引き起こす,異常な状態。 要求された機能を遂行する,機能単位の能力がなくなること。
JIS Z 8115:2000 値または条件の不一致(人的過誤を含む)。 - 機能を遂行不可能なアイテムの状態 アイテムが要求機能達成能力を失うこと。
JSTQB 間違った結果を生み出す人間の行為。 機能が実現できない原因となる不備(Bugを含む) 欠陥(defect)と同義 コンポーネントやシステムが、期待した機能、サービス、結果から逸脱すること。
ソフトウェアの災いを表す用語(Bertrand Meyer) ソフトウェアシステムの開発中になされた誤った決定である 意図した振る舞いからシステムが逸れてしまう原因となるソフトウェアシステムの特性である(バグの概念を含む) 何らかの実行中に意図した振る舞いから逸れてしまうソフトウェアシステムのイベントである -
JIS Q 9000:2006 - 意図された用途または規定された用途に関連する要求事項を満たしていないこと - -

まとめ

ざっくりまとめると以下のような解釈ができる。

  • エラー(Error)は、入力値と期待値に相違が生じている状態。典型的にはバリエーションエラー。おそらくリカバリする想定内のエラー(テストエディタなどで開くファイルが存在しないなど)も含まれる
  • 欠陥(Defect)は、呼び出し元が要求事項を満たしていない状態(バグを含む)。契約を満たしていない状態ともいえる。本来発生してはいけないもの。基本的には表明(アサーション)することになる
  • 障害(Fault)は、機能が遂行できない異常な状態。呼び出し先のバグもしくは責任所在不明でリカバリ不能。AkkaやErlangの軽量プロセスはError KernelLet it crashという考え方で障害にある程度の耐性を持つことができる 2
  • 故障(Failure)は、機能を達成する能力を失った状態

  1. これ以外にいろいろ定義はあるけどまぁ実用的にはこの程度で十分かな

  2. JISの障害には欠陥が踏まれているようにみえる。FaultはDefectより広い概念

akka-cluster スプリットブレインリゾルバ OSS実装一覧

Akkaクラスターがネットワーク分断に遭遇した場合に、UnreachableメンバーをDown状態に遷移させるためのリゾルバのOSS実装を以下にまとめる。

ちなみに、このリゾルバがない場合はUnreachableのままだとリーダアクションが取れずにクラスターが機能不全状態なる。かといってAutodownを有効にするとスプリットブレインが発生する可能性がある。これを解決するのがスプリットブレインリゾルバで商用版はLightbend社から提供されている。スプリットブレインリゾルバの仕様はこちら

git repo stars 備考
TanUkkii007/akka-cluster-custom-downing 131 OldestAutoDowning, QuorumLeaderAutoDowning, MajorityLeaderAutoDowningに対応している。OldestAutoDowningには不具合があるようだ。要修正
mbilski/akka-reasonable-downing 85 Static QuorumによるDowningにしか対応していない
arnohaase/simple-akka-downing 16 static-quorum, keep-majority, keep-oldestに対応している
guangwenz/akka-down-resolver 5 Static QuorumによるDowningにしか対応していない

sbtでビルドしたDockerイメージをECRにプッシュする方法

sbt-native-packagerとsbt-ecrを使って、ビルド→ECRログイン→ECRへのプッシュまで行います。

https://github.com/sbt/sbt-native-packager https://github.com/sbilinski/sbt-ecr

ECRを作成する

レジストリを作る。作成後 以下のようなURIが取得できます。

123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/j5ik2o/thread-weaver-api-server

sbtプラグインの設定の設定

project/plugins.sbt

// ...
addSbtPlugin("com.mintbeans" % "sbt-ecr" % "0.14.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.10")
// ...

build.sbtの設定

repositoryNameは、上記で得られたURIのホスト名を除く文字列(j5ik2o/thread-weaver-api-server)を指定すること

import com.amazonaws.regions.{Region, Regions}

val ecrSettings = Seq(
  region           in Ecr := Region.getRegion(Regions.AP_NORTHEAST_1),
  repositoryName   in Ecr := "j5ik2o/thread-weaver-api-server",
  repositoryTags   in Ecr ++= Seq(version.value), // タグを付ける場合は指定する
  localDockerImage in Ecr := "j5ik2o/" + (packageName in Docker).value + ":" + (version in Docker).value,
  push in Ecr := ((push in Ecr) dependsOn (publishLocal in Docker, login in Ecr)).value
)

val `api-server` = (project in file("api-server"))
  .enablePlugins(AshScriptPlugin, JavaAgent, EcrPlugin)
  .settings(baseSettings)
  .settings(ecrSettings)
  .settings(
    name = "thread-weaver-api-server",
    dockerBaseImage := "adoptopenjdk/openjdk8:x86_64-alpine-jdk8u191-b12",
    maintainer in Docker := "Junichi Kato <j5ik2o@gmail.com>",
    dockerUpdateLatest := true,
    dockerUsername := Some("j5ik2o"),
    bashScriptExtraDefines ++= Seq(
      "addJava -Xms${JVM_HEAP_MIN:-1024m}",
      "addJava -Xmx${JVM_HEAP_MAX:-1024m}",
      "addJava -XX:MaxMetaspaceSize=${JVM_META_MAX:-512M}",
      "addJava ${JVM_GC_OPTIONS:--XX:+UseG1GC}",
      "addJava -Dconfig.resource=${CONFIG_RESOURCE:-application.conf}"
    ),
// ...
)

実行

profileを使う場合は以下のようにする

$ AWS_DEFAULT_PROFILE=xxxxx sbt api-server/ecr:push

Microsoft社のDDD, CQRSに関する記事一覧

CQRS+ESパターンの説明

  • コマンド クエリ責務分離 (CQRS) パターン
    • ステートソーシング(CRUD)の短所としてあげているもの
      • 読み取りと書き込みのデータ表現不一致問題
      • ステートの同時更新による競合問題
      • 読み取りと書き込みが同じモデルによる、データの誤用問題
    • CRUDの単一データモデルより設計と実装が簡単になる。ただし、CRUDはスキャーフォルドメカニズムによって自動生成できない
    • リードモデルはSQLビューもしくは即時プロジェクションを生成するか。
    • 同じ物理ストアに格納する場合は、パフォーマンス、スケーラビリティ、セキュリティを最大化するため、データストアを物理的に分けるのが一般的
  • イベント ソーシング パターン
    • ステートソーシング(CRUD)の短所としてあげているもの
      • 更新時のロックがパフォーマンスと応答性を低下させる
      • 単一データ項目への更新は競合が起きやすい(コラボレータが複数の場合)
      • 監査メカニズムがない限り、履歴が失われる
    • ESのメリット
      • イベントが不変であり、追記保存するだけ。イベントを処理するプロセスは非同期処理で問題が生じない
      • イベントは、データストアを更新しない、シンプルなオブジェクト。
      • イベントはドメインエキスパートの関心事。
      • 同時更新による競合の発生を防ぐことができる(ただし、ドメインオブジェクトが矛盾した状態にならないよう依然として保護が必要)

関連記事

DDD,CQRS関連

マイクロサービス関連

マイクロサービス関連。DDDだと戦略的設計の部分。

Scala.js ウェブフレームワーク Facade 調査(2019年4月版)

主要なウェブフレームワークのFacadeについて調査した(2019/4/17日時点)。

React.js

Reactバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 タグ形式 ReactRouterサポート Reduxサポート 備考
16.6+ 2.12,2.11 japgolly/scalajs-react 1196 1時間前 Scalatags形式 Scala.js独自実装 サポートなし
16.8.1 2.12 shadaj/slinky 276 4日前 case class なし なし japgolly/scalajs-reactに依存, ReactNative対応
16.8.1+ 2.12 aappddeevv/scalajs-reaction 17 25日前 case class あり あり
15.5.3 2.12 shogowada/scalajs-reactjs 23 2017/5 Scalatags形式 あり あり
0.11.0 2.11 xored/scala-js-react 133 2015/2 scalax なし なし

所感

Vue.js

Vueバージョン Scalaバージョン ライブラリ名 スター数 最終更新日時 Vue Routerサポート Vuexサポート 備考
2.x 2.11,2.12 random-scalor/scala-js-vue 3 2018/2 あり あり
2.x 2.12 massung/scala-js-vue 12 2017/11 なし なし
2.x 2.11 fancellu/scalajs-vue 76 2017/5 なし なし

所感

Angular

  • 最新のAngularを使えるFacadeを見つけることができなかった…。

まとめ

Scala.jsでウェブアプリケーションを作るならscalajs-react一択…。他は自分でがっつりコントリビュートする気がないと、いろいろ大変そうです。 ということで、サンプルプロジェクトを二種類用意したので、興味があれば確認してみてください。

https://github.com/j5ik2o/scalajs-react-webpack4-example https://github.com/j5ik2o/scalajs-vuejs-webpack4-example

ID生成方法についてあれこれ

ID生成について聞かれることが多いので、独自の観点でまとめてみます。タイトルは適当です…。 DBはMySQL(InnoDB)を想定しています。あしからず。

ID生成を知りたいなら

ID生成に関しては以下の記事がよく纏まっているので参考にしてみてください。値形式など詳しく書かれています。

ID生成方法

以下のID生成方法は、お手軽に採用しやすいもの順で列挙します。

DB採番/連番型

AUTO_INCREMENT

DBのAUTO_INCREMENTで採番する方法。

Pros

  • 数値型で扱える
    • 普通は64ビットの整数型を採用することが多い
  • 単調増加する連番なので、ソート可能でかつインデックスの空間効率がよい
  • 単調増加するので、キャパシティを予測しやすい
    • 64ビットあればあまり気にすることもないと思うが…

Cons

  • DBとのネットワークI/O、DBでのディスクI/O分のレイテンシーを想定する必要がある、まぁ当然ですが…
  • ID生成がDB(Master)1台に依存するので、SPoFになりやすい
    • DB依存のアプリケーションならこのSPoFは想定内ではあると思います
  • 保存前にエンティティのIDを確定できず、IDに値があるかないかという判定ロジックが煩雑になりがち(初期値が未定問題)
    • AUTO_INCREMENTのため(書き込み側の都合)に User#idがOption[Long] を採用した場合は、読み込み側では必ず値があるので邪魔になります。書き込み用と読み込み用のモデルをいちいち用意するもの大袈裟です。
case class User(id: Option[Long], name: String, ...)

object UserDao {
  def findById(id: Long): Option[User] = ???
}

def getUserName(id: Long): Option[(Long, String)] = {
  UserDao.findById(id).map { result =>
    (result.id, result.name) // これは期待する戻り値と異なる型なのでコンパイルエラー。読み込み側に書き込み側の都合が影響する…。
  }
}
  • 連番(/users/:idなど)はディレクトリスキャンやレコード数予測に使われる可能性がある。
    • これは対策する方法があります。後述します。

事前採番方式

AUTO_INCREMENTの初期値が未定問題だけを 事前採番 で改善する方式(その他のConsは改善されません)。一般的に事前採番であればシーケンスを使うと思います。PostgreSQLなら使えますね。MySQLでは以下のように採番テーブルを用意しLAST_INSERT_IDを使うと採番可能です(ストレージエンジンはMyISAM)。

CREATE TABLE `user_id_seq`(id bigint unsigned NOT NULL) ENGINE=MyISAM;
UPDATE user_id_seq SET id = LAST_INSERT_ID(id+1)
SELECT LAST_INSERT_ID() AS id

こちらの記事も参考してください。

Pros

  • 初期値未定問題を回避できる
  • 数値型で扱える
  • 単調増加する連番なので、ソート可能でかつインデックスの空間効率がよい
  • 単調増加するので、キャパシティを予測しやすい

Cons

  • AUTO_INCREMENTと比べるとI/O回数が増えてしまう
  • DBとのネットワークI/O、DBでのディスクI/O分のレイテンシーを想定する必要がある。
  • ID生成がDB(Master)1台に依存するので、SPoFになりやすい
  • 連番(/users/:idなど)はディレクトリスキャンやレコード数予測に使われる可能性がある。これは対策する方法が(ry

連番型 + Hashids

ID生成とは直接関係ないですが、連番型で生成したIDをHashidsを使うとハッシュ値として隠蔽できます。IDとハッシュ値を変換テーブルを用意することなく相互に変換可能です。クエリのID条件にはデコードした数値が使えます。

val hashids = new Hashids("this is my salt")
val id = hashids.encode(1) // 1 -> gB0NV05e
val number = hashids.decode(id) // gB0NV05e -> 1

Pros/Consは、連番を隠蔽できる以外は上述と同じなので省略。

非DB採番/文字列型

UUID

仕様としてはv1~v5までありますが、よく使われるのはv1かv4あたり。

UUID version1の生成アルゴリズムをみると分かりますが、まずタイムスタンプは下位と上位が逆転しているのと、ランダム成分が含まれる場合があるので、ソートは不能と考えてよいです。UUID v4の形式はそもそもランダム成分なのでソート不能です。

Pros

  • ディスクやネットワークを使わずに、各アプリケーション内でID生成できるので、スケールしやすい&SPoFがない

Cons

  • 数値型(64bit Longなど)で扱えない(128bitを文字列型として扱う)
  • 乱数成分の影響で、ソート不能でかつインデックスの空間効率が悪い
  • MySQL(InnoDB)で100万件以上扱う場合は、INSERT時間のペナルティが大きくなります(以下に詳しく述べます)
プライマリキーにUUIDを採用した場合のINSERT時間のペナルティ

とりあえず、プライマリキーにUUIDを指定した場合の実験結果を示す、以下の記事をみましょう。1

最初の二つのグラフをみると、INSERT時間が増えていくことがわかります。レコード数が多くなるほどその影響は顕著になります。INSERT時間が遅くなる直接の原因は分からないですが…。

以下の記事も参考にあると思います。

過去に「性能劣化は2割程度」というどこかの記事を読んだことがありますが、2割ってかなり大きいです…。それなりの規模のデータを扱う場合は要注意ですね。

また、タイムシリーズでデータが読み書きされる場合、直近のデータに対するインデックスはインデックス空間上の近い位置(リーフ)に格納されていてほしいですが、UUIDだとソートできないのでインデックスの位置がフラグメントするというか、分散します。こういうケースだと、おそらくSELECTも非効率になるので要注意です。

ULID

上記のUUIDの欠点をカバーするのが、ULIDです。ULIDには乱数成分が含まれますが、先頭にタイムスタンプ成分があるのでソート可能です。詳しくは https://github.com/ulid/spec を参照してください。様々な言語向けの実装がありますが、ロジック自体は難しくないので対応していない言語でも移植は比較的容易だと思います。

Pros

  • ディスクやネットワークを使わずに、各アプリケーション内でID生成できるので、スケールしやすい&SPoFがない
  • UUIDと違って、先頭48bitを使ってソート可能(1ミリ秒以内に280を超えない限り)
  • MySQL(InnoDB)での空間効率は、UUIDよりよいはず、たぶん。(1ミリ秒以内に280を超えない限り)

Cons

  • 数値型で扱えない(文字列型として扱う)

非DB採番/数値型

DBを利用せずに64ビットLong型のIDを生成する方法があります。 ここからは少し話しがデカくなります。

こういう目的で利用されるのが分散IDワーカーです。僕の職場でも開発・運用されています。

もともとの実装は、twitter/snowflakeです。もうアーカイブされてから久しいですが。

IDの形式は以下です。

  • 41bit タイムスタンプ
  • 10bit マシンID (データセンタID + ワーカーID)
  • 12bit シーケンス
    • 1ミリ秒以内にタイムスタンプが重複する場合にカウントアップされる

Pros

  • 64bit整数値
  • 連続性がある(ソート可能およびインデックスの空間効率がよい)
  • SPoFがない(DBなどに頼らない)

Cons

  • IDワーカーのID管理が面倒(重複するとIDの重複に繋がる)
IDワーカーのID管理

snowflakeの実装はアプリケーション内で利用することができますが、ワーカーIDが5ビットなので32インスタンスまでしか作れません。つまり、インプロセスでID生成器を使った場合は、そのプロセスは32インスタンスまでしかスケールさせられないという制限を持つことになります…。2 まぁ、あまり好ましいとは言えないですね。

そもそもオートスケーリングな環境下でどのホストにどのワーカーIDを割り当てるか? そのインスタンスが突然死した場合ワーカーIDを再利用させるか、など様々な分散環境でのリソース管理の問題がでてきます。こういった課題をsnowflakeではzookeeperを使って解決しています(ウチもzookeeperで管理してます)

zookeeperを使う方法以外には、 JVM限定の話になりますが、akka-cluster を使ってワーカーIDを管理する方法もあります。どちらもハードルは高いですが…。実装例は以下です。

詳しくは以下の資料を参考にしてください。

送信するコマンドメッセージにワーカーIDを含めると、対応するワーカーIDのIDワーカーアクターにメッセージが届くという仕組みです。このアクターは起動していなければ自動的に起動し、 akka-cluster-sharding によってノード上に自動的に分散されます。

https://github.com/TanUkkii007/reactive-snowflake/blob/master/reactive-snowflake-cluster/src/main/scala/org/tanukkii/reactive/snowflake/ShardedIdRouter.scala#L31-L37

TanUkkii007(たぬっきーパイセン)の実装で面白いところ

他にも工夫がされている点があります。1ミリ秒あたり212まで採番できるが、twitterの実装ではシーケンスが桁溢れすると次のタイムスタンプまでブロックする。しかし、reactive-snowflakeでは、採番できなかったものとし自分自身のアクターにリトライメッセージを送信し再度採番するようになっている。つまりブロッキングしないようになっている。

まとめ

  • DB採番/数値型
    • ID生成がDBに依存する
    • ソート可能な数値型IDを生成できる
    • 性能を問われる大量のリクエストには向いていない
      • ID生成時にネットワークやディスクのI/Oが生じる
      • ID生成器=DB(Master)1台なのでボトルネックが生じやすい
  • 非DB採番/文字列型
    • ID生成がDBに依存しない
    • 文字列型IDをアプリケーション内で分散して生成できる
    • UUIDはソート不能だが、ULIDはソート可能なIDを生成できる
  • 非DB採番/数値型(分散IDワーカー)
    • ID生成がDBに依存しない
    • ソート可能な数値型IDを特定のノードで分散して生成できる
      • ID生成器を外部ノード上に配置した場合は、アプリケーションからはネットワークI/Oは生じる
    • IDワーカーIDを管理する仕組みが必要

以下のような条件に合致するなら、ULIDで十分でしょう。

  • IDは数値型でなくてよい
  • ソート可能
  • 分散してID生成可能(SPoFがない)
  • 大掛かりな仕組みが必要ない

  1. UUIDのバージョンは不明ですが、乱数成分が原因ではないかと思います。

  2. データセンターIDをうまく使えばもっと多いインスタンスを立ち上げれますが、いずれにしても上限はあまり考えたくない…。

akka-streamを使ってmemcachedクライアントを作るには

かなり前になるのですが、akka-streamの習作課題として、Memcachedクライアントを実装したので、概要を解説する記事をまとめます。詳しくはgithubをみてください。

https://github.com/j5ik2o/reactive-memcached

TCPのハンドリング方法

akkaでTCPをハンドリングするには、以下の二つになる。おそらく

  1. akka.actorakka.ioのパッケージを使う方式。つまり、アクターでTCP I/Oを実装する方法。
  2. akka.streamTcp#outgoingConnectionというAPIを使う方法

今回は、akka-streamをベースにしたかったので、2を採用しました。

outgoingConnectionメソッドの戻り値の型は、Flow[ByteString, ByteString, Future[OutgoingConnection]]です。ByteStringを渡せば、ByteStringが返ってくるらしい。わかりやすいですね。そしてストリームが起動中であれば、コネクションは常時接続された状態になります。

def outgoingConnection(
  remoteAddress : InetSocketAddress,
  localAddress : Option[InetSocketAddress],
  options : Traversable[SocketOption], 
  halfClose : Boolean,
  connectTimeout : Duration,
  idleTimeout : Duration) : Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]]

Memcachedのコネクションを表現するオブジェクトを実装する

では、早速 Memcachedのコネクションを表現するオブジェクトである MemcachedConnection を実装します。このオブジェクトを生成するとTCP接続され、破棄されると切断されます。そして、考えたインターフェイスは以下です(別にtrait切り出さなくてもよかったですが、説明のために分けました)。sendメソッドにコマンドを指定して呼ぶとレスポンスが返る単純なものです。 ちなみに、sendメソッドの戻り値の型はmonix.eval.Taskです。詳しくは https://monix.io/docs/3x/eval/task.html をご覧ください。

trait MemcachedConnection {
  def id: UUID // 接続ID
  def shutdown(): Unit // シャットダウン
  def peerConfig: Option[PeerConfig] // 接続先設定

  def send[C <: CommandRequest](cmd: C): Task[cmd.Response] // コマンドの送信

  // ...
}

実装はどうなるか。全貌は以下。先ほどのTcp#outgoingConnectionを使います。

private[memcached] class MemcachedConnectionImpl(_peerConfig: PeerConfig,
                                                 supervisionDecider: Option[Supervision.Decider])(
    implicit system: ActorSystem
) extends MemcachedConnection {

  // ...

  private implicit val mat: ActorMaterializer = ...

  protected val tcpFlow: Flow[ByteString, ByteString, NotUsed] =
    RestartFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRestarts) { () =>
      Tcp()
        .outgoingConnection(remoteAddress, localAddress, options, halfClose, connectTimeout, idleTimeout)
    }

  protected val connectionFlow: Flow[RequestContext, ResponseContext, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val requestFlow = b.add(
        Flow[RequestContext]
          .map { rc =>
            log.debug(s"request = [{}]", rc.commandRequestString)
            (ByteString.fromString(rc.commandRequest.asString + "\r\n"), rc)
          }
      )
      val responseFlow = b.add(Flow[(ByteString, RequestContext)].map {
        case (byteString, requestContext) =>
          log.debug(s"response = [{}]", byteString.utf8String)
          ResponseContext(byteString, requestContext)
      })
      val unzip = b.add(Unzip[ByteString, RequestContext]())
      val zip   = b.add(Zip[ByteString, RequestContext]())
      requestFlow.out ~> unzip.in
      unzip.out0 ~> tcpFlow ~> zip.in0
      unzip.out1 ~> zip.in1
      zip.out ~> responseFlow.in
      FlowShape(requestFlow.in, responseFlow.out)
    })

  protected val (requestQueue: SourceQueueWithComplete[RequestContext], killSwitch: UniqueKillSwitch) = Source
    .queue[RequestContext](requestBufferSize, overflowStrategy)
    .via(connectionFlow)
    .map { responseContext =>
      log.debug(s"req_id = {}, command = {}: parse",
                responseContext.commandRequestId,
                responseContext.commandRequestString)
      val result = responseContext.parseResponse // レスポンスのパース
      responseContext.completePromise(result.toTry) // パース結果を返す
    }
    .viaMat(KillSwitches.single)(Keep.both)
    .toMat(Sink.ignore)(Keep.left)
    .run()

  override def shutdown(): Unit = killSwitch.shutdown()

  override def send[C <: CommandRequest](cmd: C): Task[cmd.Response] = Task.deferFutureAction { implicit ec =>
    val promise = Promise[CommandResponse]()
    requestQueue
      .offer(RequestContext(cmd, promise, ZonedDateTime.now()))
      .flatMap {
        case QueueOfferResult.Enqueued =>
          promise.future.map(_.asInstanceOf[cmd.Response])
        case QueueOfferResult.Failure(t) =>
          Future.failed(BufferOfferException("Failed to send request", Some(t)))
        case QueueOfferResult.Dropped =>
          Future.failed(
            BufferOfferException(
              s"Failed to send request, the queue buffer was full."
            )
          )
        case QueueOfferResult.QueueClosed =>
          Future.failed(BufferOfferException("Failed to send request, the queue was closed"))
      }
  }

}

TCPの送受信するにはtcpFlowメソッドが返すFlowに必要なByteStringを流して、ByteStringを受け取ればいいですが、リクエストとレスポンスを扱うためにもう少し工夫が必要です。そのためにconnectionFlowメソッドはtcpFlowメソッドを内部Flowとして利用します。そして、FlowはRequestContextResponseContextの型を利用します。その実装は以下のようなシンプルなものです。キューであるrequestQueueconnectionFlowにリクエストを送信します。ストリームの後半で返ってきたレスポンスを処理するという流れになっています。キューへのリクエストのエンキューはsendメソッドが行います。sendメソッドが呼ばれるとRequestContextを作ってrequestQueueにエンキュー、完了するとレスポンスを返します。requestQueueはストリームと繋がっていて、エンキューされたRequestContextconnectionFlowに渡され返ってきたResponseContextがレスポンス内容をパースし、その結果をPromise経由で返すというものです1。エラーハンドリングについて、エラーを起こしやすいところにRestartFlow.withBackoffを入れています。

final case class RequestContext(commandRequest: CommandRequest,
                                promise: Promise[CommandResponse],
                                requestAt: ZonedDateTime) {
  val id: UUID                     = commandRequest.id
  val commandRequestString: String = commandRequest.asString
}

final case class ResponseContext(byteString: ByteString,
                                 requestContext: RequestContext,
                                 requestsInTx: Seq[CommandRequest] = Seq.empty,
                                 responseAt: ZonedDateTime = ZonedDateTime.now)
    extends ResponseBase {

  val commandRequest: CommandRequest = requestContext.commandRequest

  def withRequestsInTx(values: Seq[CommandRequest]): ResponseContext = copy(requestsInTx = values)
  // レスポンスのパース
  def parseResponse: Either[ParseException, CommandResponse] = {
    requestContext.commandRequest match {
      case scr: CommandRequest =>
        scr.parse(ByteVector(byteString.toByteBuffer)).map(_._1)
    }
  }

}

コネクションオブジェクトの使い方は簡単です。ほとんどの場合は、このようにコネクションオブジェクトを直接操作せずに、高レベルなAPIを操作したいと思うはずです。そのためのクライアントオブジェクトはあとで述べます。

val connection = MemcachedConnection(
  PeerConfig(new InetSocketAddress("127.0.0.1", memcachedTestServer.getPort),
  backoffConfig = BackoffConfig(maxRestarts = 1)),
  None
)

val resultFuture = (for {
  _   <- connection.send(SetRequest(UUID.randomUUID(), "key1", "1", 10 seconds))
  _   <- connection.send(SetRequest(UUID.randomUUID(), "key2", "2", 10 seconds))
  gr1 <- connection.send(GetRequest(UUID.randomUUID(), "key1"))
  gr2 <- connection.send(GetRequest(UUID.randomUUID(), "key2"))
} yield (gr1, gr2)).runAsync
val result = Await.result(resultFuture, Duration.Inf) // (1, 2)

コマンドの実装

コマンドの一例はこんな感じです。 asStringメソッドは文字通りMemcachedにコマンドを送信するときに利用される文字列です。responseParserはfastparseで実装されたレスポンスパーサを指定します。parseResponseメソッドはMemcachedからのレスポンスを表現する構文木をコマンドのレンスポンスに変換するための関数です。これは少し複雑なのであとで説明します。

final class GetRequest private (val id: UUID, val key: String) extends CommandRequest with StringParsersSupport {

  override type Response = GetResponse
  override val isMasterOnly: Boolean = false

  override def asString: String = s"get $key"

  // レスポンスを解析するためのパーサを指定。レスポンス仕様に合わせて指定する
  override protected def responseParser: P[Expr] = P(retrievalCommandResponse)

  // ASTをコマンドレスポンスに変換する
  override protected def parseResponse: Handler = {
    case (EndExpr, next) =>
      (GetSucceeded(UUID.randomUUID(), id, None), next)
    case (ValueExpr(key, flags, length, casUnique, value), next) =>
      (GetSucceeded(UUID.randomUUID(), id, Some(ValueDesc(key, flags, length, casUnique, value))), next)
    case (ErrorExpr, next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.OtherType, None)), next)
    case (ClientErrorExpr(msg), next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ClientType, Some(msg))), next)
    case (ServerErrorExpr(msg), next) =>
      (GetFailed(UUID.randomUUID(), id, MemcachedIOException(ErrorType.ServerType, Some(msg))), next)
  }

}

これはGetRequestの上位traitであるCommandRequestです。Memcachedのコマンド送信に必要なのはasStringメソッドだけで、コマンドのレスポンスのパースにはparseメソッドが利用されます。

trait CommandRequest {
  type Elem
  type Repr
  type P[+T] = core.Parser[T, Elem, Repr]

  type Response <: CommandResponse

  type Handler = PartialFunction[(Expr, Int), (Response, Int)]

  val id: UUID
  val key: String
  val isMasterOnly: Boolean

  def asString: String

  protected def responseParser: P[Expr]

  protected def convertToParseSource(s: ByteVector): Repr

  def parse(text: ByteVector, index: Int = 0): Either[ParseException, (Response, Int)] = {
    responseParser.parse(convertToParseSource(text), index) match {
      case f @ Parsed.Failure(_, index, _) =>
        Left(new ParseException(f.msg, index))
      case Parsed.Success(value, index) => Right(parseResponse((value, index)))
    }
  }

  protected def parseResponse: Handler

}

レスポンス・パーサの実装

レスポンスパーサは、Memcachedの仕様に合わせて以下の定義から適切なものを選んで利用します。詳しくは公式ドキュメントをみてください。標準のパーサーコンビネータと少し違うところがありますが、概ね似たような感じで書けます。Memcachedのレスポンスのルールは単純で左再帰除去などもしなくていいので、パーサーコンビネータのはじめての題材にはよいと思います。

object StringParsers {

  val digit: P0      = P(CharIn('0' to '9'))
  val lowerAlpha: P0 = P(CharIn('a' to 'z'))
  val upperAlpha: P0 = P(CharIn('A' to 'Z'))
  val alpha: P0      = P(lowerAlpha | upperAlpha)
  val alphaDigit: P0 = P(alpha | digit)
  val crlf: P0       = P("\r\n")

  val error: P[ErrorExpr.type]        = P("ERROR" ~ crlf).map(_ => ErrorExpr)
  val clientError: P[ClientErrorExpr] = P("CLIENT_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ClientErrorExpr)
  val serverError: P[ServerErrorExpr] = P("SERVER_ERROR" ~ (!crlf ~/ AnyChar).rep(1).! ~ crlf).map(ServerErrorExpr)
  val allErrors: P[Expr]              = P(error | clientError | serverError)

  val end: P[EndExpr.type]             = P("END" ~ crlf).map(_ => EndExpr)
  val deleted: P[DeletedExpr.type]     = P("DELETED" ~ crlf).map(_ => DeletedExpr)
  val stored: P[StoredExpr.type]       = P("STORED" ~ crlf).map(_ => StoredExpr)
  val notStored: P[NotStoredExpr.type] = P("NOT_STORED" ~ crlf).map(_ => NotStoredExpr)
  val exists: P[ExistsExpr.type]       = P("EXISTS" ~ crlf).map(_ => ExistsExpr)
  val notFound: P[NotFoundExpr.type]   = P("NOT_FOUND" ~ crlf).map(_ => NotFoundExpr)
  val touched: P[TouchedExpr.type]     = P("TOUCHED" ~ crlf).map(_ => TouchedExpr)

  val key: P[String]     = (!" " ~/ AnyChar).rep(1).!
  val flags: P[Int]      = digit.rep(1).!.map(_.toInt)
  val bytes: P[Long]     = digit.rep(1).!.map(_.toLong)
  val casUnique: P[Long] = digit.rep(1).!.map(_.toLong)

  val incOrDecCommandResponse: P[Expr] = P(notFound | ((!crlf ~/ AnyChar).rep.! ~ crlf).map(StringExpr) | allErrors)
  val storageCommandResponse: P[Expr]  = P((stored | notStored | exists | notFound) | allErrors)

  val value: P[ValueExpr] =
    P("VALUE" ~ " " ~ key ~ " " ~ flags ~ " " ~ bytes ~ (" " ~ casUnique).? ~ crlf ~ (!crlf ~/ AnyChar).rep.! ~ crlf)
      .map {
        case (key, flags, bytes, cas, value) =>
          ValueExpr(key, flags, bytes, cas, value)
      }

  val version: P[VersionExpr] = P("VERSION" ~ " " ~ alphaDigit.rep(1).!).map(VersionExpr)

  val retrievalCommandResponse: P[Expr] = P(end | value | allErrors)

  val deletionCommandResponse: P[Expr] = P(deleted | notFound | allErrors)

  val touchCommandResponse: P[Expr] = P(touched | notFound | allErrors)

  val versionCommandResponse: P[Expr] = P(version | allErrors)
}

クライアントの実装

次はクライアント実装。MemcachedClient#sendメソッドはコマンドを受け取り、ReaderTTaskMemcachedConnection[cmd.Response]を返します。ReaderTTaskMemcachedConnectionはMemcachedのために特化したReaderTで、コネクションを受け取り、レスポンスを返すTaskを返します。Memcachedのための高レベルなAPIはこのsendメソッドを利用して実装されます。get,setなどのメソッドの内部でコマンドを作成し、レスポンスを戻り値にマッピングするだけで、使いやすい高レベルAPIになります。また、ReaderTを返すためfor式での簡潔な記述もできます。

package object memcached {

  type ReaderTTask[C, A]                  = ReaderT[Task, C, A]
  type ReaderTTaskMemcachedConnection[A]  = ReaderTTask[MemcachedConnection, A]
  type ReaderMemcachedConnection[M[_], A] = ReaderT[M, MemcachedConnection, A]

}
final class MemcachedClient()(implicit system: ActorSystem) {

  def send[C <: CommandRequest](cmd: C): ReaderTTaskMemcachedConnection[cmd.Response] = ReaderT(_.send(cmd))

  def get(key: String): ReaderTTaskMemcachedConnection[Option[ValueDesc]] =
    send(GetRequest(UUID.randomUUID(), key)).flatMap {
      case GetSucceeded(_, _, result) => ReaderTTask.pure(result)
      case GetFailed(_, _, ex)        => ReaderTTask.raiseError(ex)
    }

  def set[A: Show](key: String,
                   value: A,
                   expireDuration: Duration = Duration.Inf,
                   flags: Int = 0): ReaderTTaskMemcachedConnection[Int] =
    send(SetRequest(UUID.randomUUID(), key, value, expireDuration, flags)).flatMap {
      case SetExisted(_, _)    => ReaderTTask.pure(0)
      case SetNotFounded(_, _) => ReaderTTask.pure(0)
      case SetNotStored(_, _)  => ReaderTTask.pure(0)
      case SetSucceeded(_, _)  => ReaderTTask.pure(1)
      case SetFailed(_, _, ex) => ReaderTTask.raiseError(ex)
    }

   // ...

}
  • for式での利用例
val client = new MemcachedClient()
val resultFuture = (for {
  _  <- client.set(key, "1")
  r1 <- client.get(key)
  _  <- client.set(key, "2")
  r2 <- client.get(key)
} yield (r1, r2)).run(connection).runAsync
val result = Await.result(resultFuture, Duration.Inf) // (1, 2)

コネクションプール

コネクションとクライアントを実装できたら、次はコネクションプールも欲しくなります。MemcachedConnectionをプーリングして選択できればいいわけなので、commons-poolなど使えば楽ですね。コネクションをプールするアルゴリズム(HashRingなど)も複数考えられるので、工夫するとなかなか面白いです。

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val pool = MemcachedConnectionPool.ofSingleRoundRobin(sizePerPeer = 5, peerConfig, RedisConnection(_)) // powered by RoundRobinPool
val connection = MemcachedConnection(connectionConfig)
val client = MemcachedClient()

// ローンパターン形式
val resultFuture1 = pool.withConnectionF{ con =>
  (for{
    _ <- client.set("foo", "bar")
    r <- client.get("foo")
  } yield r).run(con) 
}.runAsync

// モナド形式
val resultFuture2 = (for {
  _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run)
  r <- ConnectionAutoClose(pool)(client.get("foo").run)
} yield r).run().runAsync

まとめ

というわけでこんな風にakka-streamを使えばTCPクライアントも比較的簡単に実装できると思います。参考にしてみてください。


  1. バックプレッシャを適切にハンドリングするには、ストリームを常に起動した状態にしておく必要があります、要求ごとにストリームを起動していると効果的ではありません))