かとじゅんの技術日誌

技術の話をするところ

Rustでブロッキングキューを実装する

Rustでブロッキングキューを実装した話。これはRustのカレンダー | Advent Calendar 2023 - Qiitaの22日目の記事です。

ブロッキングキューはご存じだろうか。(えっ…スレッドはブロックしたくない…と思った人は最後まで読むとよいかも)

Javaにはあります。

docs.oracle.com

要素の取得時にキューが空でなくなるまで待機したり、要素の格納時にキュー内に空きが生じるまで待機する操作を追加でサポートしたりするQueueです。

これはRustの標準にはない。今回はブロッキングキューを実装してみる。

「そういえば、ブロッキングキューが欲しい!」と思ったときに、ぜひこのブログ記事を思い出してほしい。

まず作るな・既存のコードを調査しろ

標準にはないが、クレートとして実装がある。

JimFawcett/RustBlockingQueuejulianbuettner/rust-blockinqueueをみるとMutextとCondvarを使うと実装できるということが分かる。TeaEntityLab/fpRustではmpscを使っている模様。

MutexCondvarがあれば実装できることがわかる。詳しいことは以下参照

この本にもMutex,Condvarの使い方の紹介がある。

BlockingQueueのインターフェイス

JavaのAPIを参考にインターフェイスは以下のようにした。

queue-rs/src/queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

pub trait BlockingQueueBehavior<E: Element>: QueueBehavior<E> + Send {
  /// Inserts the specified element into this queue. If necessary, waits until space is available.<br/>
  /// 指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。
  ///
  /// # Arguments / 引数
  /// - `element` - The element to be inserted. / 挿入する要素。
  ///
  /// # Return Value / 戻り値
  /// - `Ok(())` - If the element is inserted successfully. / 要素が正常に挿入された場合。
  /// - `Err(QueueError::OfferError(element))` - If the element cannot be inserted. / 要素を挿入できなかった場合。
  /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。
  fn put(&mut self, element: E) -> Result<()>;

  /// Inserts the specified element into this queue. If necessary, waits until space is available. You can specify the waiting time.<br/>
  /// 指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。待機時間を指定できます。
  ///
  /// # Arguments / 引数
  /// - `element` - The element to be inserted. / 挿入する要素。
  ///
  /// # Return Value / 戻り値
  /// - `Ok(())` - If the element is inserted successfully. / 要素が正常に挿入された場合。
  /// - `Err(QueueError::OfferError(element))` - If the element cannot be inserted. / 要素を挿入できなかった場合。
  /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。
  /// - `Err(QueueError::TimeoutError)` - If the operation times out. / 操作がタイムアウトした場合。
  fn put_timeout(&mut self, element: E, timeout: Duration) -> Result<()>;

  /// Retrieve the head of this queue and delete it. If necessary, wait until an element becomes available.<br/>
  /// このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。
  ///
  /// # Return Value / 戻り値
  /// - `Ok(Some(element))` - If the element is retrieved successfully. / 要素が正常に取得された場合。
  /// - `Ok(None)` - If the queue is empty. / キューが空の場合。
  /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。
  fn take(&mut self) -> Result<Option<E>>;

  /// Retrieve the head of this queue and delete it. If necessary, wait until an element becomes available. You can specify the waiting time.<br/>
  /// このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。待機時間を指定できます。
  ///
  /// # Arguments / 引数
  /// - `timeout` - The maximum time to wait. / 待機する最大時間。
  ///
  /// # Return Value / 戻り値
  /// - `Ok(Some(element))` - If the element is retrieved successfully. / 要素が正常に取得された場合。
  /// - `Ok(None)` - If the queue is empty. / キューが空の場合。
  /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。
  /// - `Err(QueueError::TimeoutError)` - If the operation times out. / 操作がタイムアウトした場合。
  fn take_timeout(&mut self, timeout: Duration) -> Result<Option<E>>;

  /// Returns the number of elements that can be inserted into this queue without blocking.<br/>
  /// ブロックせずにこのキューに挿入できる要素数を返します。
  ///
  /// # Return Value / 戻り値
  /// - `QueueSize::Limitless` - If the queue has no capacity limit. / キューに容量制限がない場合。
  /// - `QueueSize::Limited(num)` - If the queue has a capacity limit. / キューに容量制限がある場合。
  fn remaining_capacity(&self) -> QueueSize;

  /// Interrupts the operation of this queue.<br/>
  /// このキューの操作を中断します。
  fn interrupt(&mut self);

  /// Returns whether the operation of this queue has been interrupted.<br/>
  /// このキューの操作が中断されたかどうかを返します。
  ///
  /// # Return Value / 戻り値
  /// - `true` - If the operation is interrupted. / 操作が中断された場合。
  /// - `false` - If the operation is not interrupted. / 操作が中断されていない場合。
  fn is_interrupted(&self) -> bool;
}

データ構造

今回のブロッキングキューとは別に前提となるtraitがQueueBehaviorです。ブロッキングしないキューの責務をQueueBehavior traitとして定義しています。型パラメータQはこのトレイトを実装していなければなりません。Q以外にConvarの2個がタプルでまとめてArcにラップされています。Cloneが実装されているので、cloneしてもArcなので同じデータを参照する。pは要素の型を保持するためのもの。is_interruptedは操作を中断させるためのフラグとなる。

#[derive(Debug, Clone)]
pub struct BlockingQueue<E, Q: QueueBehavior<E>> {
  underlying: Arc<(Mutex<Q>, Condvar, Condvar)>,
  p: PhantomData<E>,
  is_interrupted: Arc<AtomicBool>,
}

putメソッド

putは「指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。」という機能。空きがない場合はメソッド内でスレッドをブロックするので制御が戻ってこなくなる。

putのテスト

putのテストを書いてみた。キャパシティ一杯になるとブロックする。

  #[test]
  #[serial]
  fn test_blocking_put() {
    init_logger();

    let mut q = create_blocking_queue(QueueType::VecDeque, QUEUE_SIZE);
    let mut q_cloned = q.clone();

    let please_interrupt = Arc::new(CountDownLatch::new(1));
    let please_interrupt_cloned = please_interrupt.clone();

    let handler1 = thread::spawn(move || {
      // キューにデータをputする
      for i in 0..QUEUE_SIZE.to_usize() {
        let _ = q_cloned.put(i as i32).unwrap();
      }
      assert_eq!(q_cloned.len(), QUEUE_SIZE);

      // 割り込みを要求する
      q_cloned.interrupt();
      // QUEUE_SIZEを超えたputなのでブロックするはずだが
      // すでにinterruptされているので割り込みエラーが発生する。
      match q_cloned.put(99) {
        Ok(_) => {
          panic!("put: finish: 99, should not be here");
        }
        Err(err) => {
          log::debug!("put: finish: 99, error = {:?}", err);
          let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap();
          assert_eq!(queue_error, &QueueError::InterruptedError); 
        }
      }
      // 割り込みが検出されると割り込み状態がリセットされることを確認する
      assert!(!q_cloned.is_interrupted()); 

      please_interrupt_cloned.count_down();
      // 先にputするのでブロックが発生するが
      // 非同期にinterruptされるので割り込みエラー
      match q_cloned.put(99) {
        Ok(_) => {
          panic!("put: finish: 99, should not be here");
        }
        Err(err) => {
          log::debug!("put: finish: 99, error = {:?}", err);
          let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap();
          assert_eq!(queue_error, &QueueError::InterruptedError);
        }
      }
      assert!(!q_cloned.is_interrupted());
    });

    // カウントが0になるまでブロックする
    // つまり最初のputのテストが終わるまで待機する
    please_interrupt.wait();
    assert!(!handler1.is_finished());
    // 非同期に割り込みを行う
    q.interrupt();
    handler1.join().unwrap();
  }

putの実装

仕様は「指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。」。 実装は以下。

queue-rs/src/queue/blocking_queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

空きがある場合は2)はスルーされて5)だけ実行される。空きがないつまりフルの場合は、2)の内部に入り、4)がnot_fullのwaitを呼び出すとスレッドをブロックする。 takeメソッド内で要素を取得するときにnot_full.notify_onceが実行されると、not_fullwaitが解除される(待機しているスレッドが複数ある場合そのうち一つだけが解除される)。

impl<E: Element + 'static, Q: QueueBehavior<E>> BlockingQueueBehavior<E> for BlockingQueue<E, Q> {
// ...
  fn put(&mut self, element: E) -> Result<()> {
    // 1) underlyingから必要な要素を取得する
    // queue_vec_mutexは内部キュー、not_fullのConvar, not_emptyのConvar
    let (queue_vec_mutex, not_full, not_empty) = &*self.underlying;
    let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
    // 2) 内部キューがフルのときにはループする
    while queue_vec_mutex_guard.is_full() {
      // 3) 割り込みのチェック
      if self.check_and_update_interrupted() {
        log::debug!("put: return by interrupted");
        return Err(QueueError::<E>::InterruptedError.into());
      }
      log::debug!("put: blocking start...");
      // 4) 空きが生じるまで待機する
      queue_vec_mutex_guard = not_full.wait(queue_vec_mutex_guard).unwrap();
      log::debug!("put: blocking end...");
    }
    // 5) 内部のキューにofferを使って挿入する
    let result = queue_vec_mutex_guard.offer(element);
    // 6) not_emptyを使って通知する
    not_empty.notify_one();
    result
  }
// ...
}

takeメソッド

takeは「このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。」という機能。要素がない場合はメソッド内でスレッドをブロックするので制御が戻ってこなくなる。

takeのテスト

  #[test]
  #[serial]
  pub fn test_blocking_take() {
    init_logger();

    let mut q = populated_queue(QueueType::VecDeque, QUEUE_SIZE);
    let mut q_cloned = q.clone();

    let please_interrupt = Arc::new(CountDownLatch::new(1));
    let please_interrupt_cloned = please_interrupt.clone();

    let handler1 = thread::spawn(move || {
      // キューにデータをputする
      for _ in 0..QUEUE_SIZE.to_usize() {
        let _ = q_cloned.take().unwrap();
      }

      // 割り込みを要求する
      q_cloned.interrupt();
      // QUEUE_SIZEを超えたtakeなのでブロックするはずだが
      // すでにinterruptされているので割り込みエラーが発生する。
      match q_cloned.take() {
        Ok(_) => {
          panic!("take: finish: should not be here");
        }
        Err(err) => {
          log::debug!("take: finish: error = {:?}", err);
          let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap();
          assert_eq!(queue_error, &QueueError::InterruptedError);
        }
      }
      assert!(!q_cloned.is_interrupted());

      please_interrupt_cloned.count_down();
      // 先にtakeするのでブロックが発生するが
      // 非同期にinterruptされるので割り込みエラー
      match q_cloned.take() {
        Ok(_) => {
          panic!("take: finish: should not be here");
        }
        Err(err) => {
          log::debug!("take: finish: error = {:?}", err);
          let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap();
          assert_eq!(queue_error, &QueueError::InterruptedError);
        }
      }
      assert!(!q_cloned.is_interrupted());
    });

    // カウントが0になるまでブロックする
    // つまり最初のtakeのテストが終わるまで待機する
    please_interrupt.wait();
    assert!(!handler1.is_finished());
    // 非同期に割り込みを行う
    q.interrupt();
    handler1.join().unwrap();
  }

takeの実装

仕様は「このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します」。 実装は以下。

queue-rs/src/queue/blocking_queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

要素が一つでもある場合は2)はスルーされて5)だけ実行される。要素がないつまり空の場合は、2)の内部に入り、4)がnot_emptyのwaitを呼び出すとスレッドをブロックする。putメソッド内で要素を取得するときにnot_empty.notify_onceが実行されると、not_emptywaitが解除される(待機しているスレッドが複数ある場合そのうち一つだけが解除される)。

  fn take(&mut self) -> Result<Option<E>> {
    // 1) underlyingから必要な要素を取得する
    let (queue_vec_mutex, not_full, not_empty) = &*self.underlying;
    let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap();
    // 2) 内部キューが空のときにはループする
    while queue_vec_mutex_guard.is_empty() {
      // 3) 割り込みのチェック
      if self.check_and_update_interrupted() {
        log::debug!("take: return by interrupted");
        return Err(QueueError::<E>::InterruptedError.into());
      }
      log::debug!("take: blocking start...");
      // 4) 要素が利用可能になるまで待機する
      queue_vec_mutex_guard = not_empty.wait(queue_vec_mutex_guard).unwrap();
      log::debug!("take: blocking end...");
    }
    // 5) 内部キューのpollを使って要素を取得する
    let result = queue_vec_mutex_guard.poll();
    // 6) not_fullを使って通知
    not_full.notify_one();
    result
  }

putとtakeの相互作用

putメソッドとtakeメソッドは以下の点で相互に作用する。

  • ブロッキングと通知: putメソッドはキューが満杯の場合にブロックし、takeメソッドはキューが空の場合にブロックする。これにより、プロデューサー(要素を追加するスレッド)とコンシューマー(要素を取り出すスレッド)は、キューの状態に応じて適切に待機または動作する
  • スレッド間の協調: 一方のメソッドがブロックされているスレッドを解放するために、もう一方のメソッドは条件変数を使って通知を行う。例えば、putメソッドは新しい要素を追加した後、takeメソッドで待機中のスレッドにキューに新しい要素が利用可能であることを通知する

whileをなぜ使うのか

一見不要そうなwhile文は、実はブロッキングキューのような同期処理においては非常に重要だ。これは、条件変数(Condvar)を使用する際の一般的なパターンとなる。while文を使用する理由は以下の通り:

  • スプリアス・ウェイクアップ(Spurious wakeup)の防止
    • スレッドは何らかの理由で誤って(スプリアス・ウェイクアップと呼ばれる)目覚めることがある。while文を使用することで、スレッドが再び目覚めたときに、待機条件が依然として満たされているかを再確認する
  • 競合状態の回避
    • 複数のスレッドが同時にキューにアクセスし、条件変数によって待機が解除される場合、一つのスレッドが条件を満たして操作を行うと、他のスレッドにとってはその条件がもはや真でなくなる可能性がある。while文を用いることで、各スレッドは操作を行う前に条件を再チェックする
  • 明確なロジック
    • 条件のチェックをループ内で行うことで、コードのロジックがより明確になり、他の開発者がコードを理解しやすくなる

たとえば、putメソッドの実装において、while文はキューがフルである限り、スレッドをブロックし続ける。キューに空きができたときだけ、ループを抜けて要素の挿入を試みる。同様に、takeメソッドではキューが空である限りスレッドをブロックし、要素が利用可能になったときのみループを抜けて要素の取得を行う。したがって、while文はこれらの場合において安全性と正確性を高めるために非常に重要となる。

中断機構

ブロッキングキューでは、特定の状況で操作を中断する機能が重要だ。中断機構はputおよびtakeメソッドと密接に連携して動作する。ここでは、その中断機構の実装について詳しく見ていく。

check_and_update_interrupted

putおよびtakeメソッドで呼び出されるcheck_and_update_interruptedメソッドは、中断要求があったかどうかを判定し、その状態を更新します。もしis_interruptedフラグがtrueであれば、メソッドはtrueを返し、同時に(アトミック操作により)そのフラグをfalseに更新する。これにより、中断要求が一度だけ認識され、処理されるようになる。

queue-rs/src/queue/blocking_queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

  fn check_and_update_interrupted(&self) -> bool {
    match self
      .is_interrupted
      .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
    {
      Ok(_) => true,
      Err(_) => false,
    }
  }

interrupt

interruptメソッドでは、is_interruptedフラグをtrueに設定する。これにより、puttakeメソッドが中断される。しかし、スレッドがブロックされている場合、このフラグの設定だけでは中断は完了しません。そのため、not_emptynot_fullCondvarを用いてnotify_allメソッドを呼び出し、すべてのスレッドのブロックを解除し、中断を実現する。

queue-rs/src/queue/blocking_queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

  fn interrupt(&mut self) {
    log::debug!("interrupt: start...");
    self.is_interrupted.store(true, Ordering::Relaxed);
    let (_, not_full, not_empty) = &*self.underlying;
    not_empty.notify_all(); // すべてのスレッドのブロックを解除
    not_full.notify_all(); // すべてのスレッドのブロックを解除
    log::debug!("interrupt: end...");
  }

is_interrupted

is_interruptedメソッドは、外部からの中断要求があったかどうかを確認するものだ。これにより、他のメソッドが中断状態を認識し、適切に反応できるようになる。

queue-rs/src/queue/blocking_queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

  fn is_interrupted(&self) -> bool {
    self.is_interrupted.load(Ordering::Relaxed)
  }

tokio版の実装

stdライブラリのMutexCondvarなどは、リソースが利用可能になるまでスレッドを物理的にブロックする。この方法は同期処理で一般的に使用され、スレッドがリソースを待っている間は他の作業を行うことができない。これはCPUリソースを消費せず、単純な同期処理には適しているが、スケーラビリティや非同期処理の観点では効率的ではない。tokioを使った非同期処理の中でこの実装を利用するのは推奨されないので、要注意。

一方で、tokioのMutexCondvarは非同期の待ち受けを実現する。これはスレッドを物理的にブロックするのではなく、リソースが利用可能になるまでの操作を中断し、その間に他のタスクを実行する。この方法は非同期プログラミングで一般的で、特にI/Oバウンドの処理やスケーラブルなシステムに適している。awaitキーワードを使用することで、非同期タスクの実行を「一時停止」し、リソースが利用可能になると再開する。つまるところ、stdの待ち受けはスレッドをブロックする同期的なアプローチであり、tokioの待ち受けはスレッドをブロックせずに非同期処理を行うアプローチとなる。適切な選択は、アプリケーションの要件と性能の要求によって異なる。

ということで、tokio版の実装も作った。この実装はアクターシステムを実装するときに使えそう。

あと、tokio版の実装はブロッキングしないのに、BlockingQueueという名前になっている…。AsyncBlockingQueueだろうか?それともNonBlockingQueueだろうか…。命名が難しい。

queue-rs/src/queue/tokio.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub

まとめ

Rustでブロッキングキューを実装する過程を詳細に掘り下げた。JavaのBlockingQueueを参考にしながらも、Rust独自の機能であるMutexCondvar、およびアトミック操作を駆使して、効率的かつスレッドセーフなキューを作成できた。この実装は、Rustの同期プリミティブを活用し並行処理をモデリングする一つの方法を示した。また、JavaのBlockingQueueのコンセプトをRustに適用する試みとして、異なるプログラミング言語間のアプローチを探求する良い機会となった。