Rustでブロッキングキューを実装した話。これはRustのカレンダー | Advent Calendar 2023 - Qiitaの22日目の記事です。
ブロッキングキューはご存じだろうか。(えっ…スレッドはブロックしたくない…と思った人は最後まで読むとよいかも)
Javaにはあります。
要素の取得時にキューが空でなくなるまで待機したり、要素の格納時にキュー内に空きが生じるまで待機する操作を追加でサポートしたりするQueueです。
これはRustの標準にはない。今回はブロッキングキューを実装してみる。
「そういえば、ブロッキングキューが欲しい!」と思ったときに、ぜひこのブログ記事を思い出してほしい。
まず作るな・既存のコードを調査しろ
標準にはないが、クレートとして実装がある。
- https://github.com/JimFawcett/RustBlockingQueue
- https://github.com/TeaEntityLab/fpRust
- https://github.com/julianbuettner/rust-blockinqueue
JimFawcett/RustBlockingQueue
とjulianbuettner/rust-blockinqueue
をみるとMutextとCondvarを使うと実装できるということが分かる。TeaEntityLab/fpRust
ではmpscを使っている模様。
Mutex
とCondvar
があれば実装できることがわかる。詳しいことは以下参照
この本にもMutex
,Condvar
の使い方の紹介がある。
BlockingQueueのインターフェイス
JavaのAPIを参考にインターフェイスは以下のようにした。
queue-rs/src/queue.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub
pub trait BlockingQueueBehavior<E: Element>: QueueBehavior<E> + Send { /// Inserts the specified element into this queue. If necessary, waits until space is available.<br/> /// 指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。 /// /// # Arguments / 引数 /// - `element` - The element to be inserted. / 挿入する要素。 /// /// # Return Value / 戻り値 /// - `Ok(())` - If the element is inserted successfully. / 要素が正常に挿入された場合。 /// - `Err(QueueError::OfferError(element))` - If the element cannot be inserted. / 要素を挿入できなかった場合。 /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。 fn put(&mut self, element: E) -> Result<()>; /// Inserts the specified element into this queue. If necessary, waits until space is available. You can specify the waiting time.<br/> /// 指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。待機時間を指定できます。 /// /// # Arguments / 引数 /// - `element` - The element to be inserted. / 挿入する要素。 /// /// # Return Value / 戻り値 /// - `Ok(())` - If the element is inserted successfully. / 要素が正常に挿入された場合。 /// - `Err(QueueError::OfferError(element))` - If the element cannot be inserted. / 要素を挿入できなかった場合。 /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。 /// - `Err(QueueError::TimeoutError)` - If the operation times out. / 操作がタイムアウトした場合。 fn put_timeout(&mut self, element: E, timeout: Duration) -> Result<()>; /// Retrieve the head of this queue and delete it. If necessary, wait until an element becomes available.<br/> /// このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。 /// /// # Return Value / 戻り値 /// - `Ok(Some(element))` - If the element is retrieved successfully. / 要素が正常に取得された場合。 /// - `Ok(None)` - If the queue is empty. / キューが空の場合。 /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。 fn take(&mut self) -> Result<Option<E>>; /// Retrieve the head of this queue and delete it. If necessary, wait until an element becomes available. You can specify the waiting time.<br/> /// このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。待機時間を指定できます。 /// /// # Arguments / 引数 /// - `timeout` - The maximum time to wait. / 待機する最大時間。 /// /// # Return Value / 戻り値 /// - `Ok(Some(element))` - If the element is retrieved successfully. / 要素が正常に取得された場合。 /// - `Ok(None)` - If the queue is empty. / キューが空の場合。 /// - `Err(QueueError::InterruptedError)` - If the operation is interrupted. / 操作が中断された場合。 /// - `Err(QueueError::TimeoutError)` - If the operation times out. / 操作がタイムアウトした場合。 fn take_timeout(&mut self, timeout: Duration) -> Result<Option<E>>; /// Returns the number of elements that can be inserted into this queue without blocking.<br/> /// ブロックせずにこのキューに挿入できる要素数を返します。 /// /// # Return Value / 戻り値 /// - `QueueSize::Limitless` - If the queue has no capacity limit. / キューに容量制限がない場合。 /// - `QueueSize::Limited(num)` - If the queue has a capacity limit. / キューに容量制限がある場合。 fn remaining_capacity(&self) -> QueueSize; /// Interrupts the operation of this queue.<br/> /// このキューの操作を中断します。 fn interrupt(&mut self); /// Returns whether the operation of this queue has been interrupted.<br/> /// このキューの操作が中断されたかどうかを返します。 /// /// # Return Value / 戻り値 /// - `true` - If the operation is interrupted. / 操作が中断された場合。 /// - `false` - If the operation is not interrupted. / 操作が中断されていない場合。 fn is_interrupted(&self) -> bool; }
データ構造
今回のブロッキングキューとは別に前提となるtraitがQueueBehaviorです。ブロッキングしないキューの責務をQueueBehavior
traitとして定義しています。型パラメータQはこのトレイトを実装していなければなりません。Q以外にConvarの2個がタプルでまとめてArcにラップされています。Cloneが実装されているので、clone
してもArcなので同じデータを参照する。p
は要素の型を保持するためのもの。is_interrupted
は操作を中断させるためのフラグとなる。
#[derive(Debug, Clone)] pub struct BlockingQueue<E, Q: QueueBehavior<E>> { underlying: Arc<(Mutex<Q>, Condvar, Condvar)>, p: PhantomData<E>, is_interrupted: Arc<AtomicBool>, }
putメソッド
put
は「指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。」という機能。空きがない場合はメソッド内でスレッドをブロックするので制御が戻ってこなくなる。
putのテスト
put
のテストを書いてみた。キャパシティ一杯になるとブロックする。
#[test] #[serial] fn test_blocking_put() { init_logger(); let mut q = create_blocking_queue(QueueType::VecDeque, QUEUE_SIZE); let mut q_cloned = q.clone(); let please_interrupt = Arc::new(CountDownLatch::new(1)); let please_interrupt_cloned = please_interrupt.clone(); let handler1 = thread::spawn(move || { // キューにデータをputする for i in 0..QUEUE_SIZE.to_usize() { let _ = q_cloned.put(i as i32).unwrap(); } assert_eq!(q_cloned.len(), QUEUE_SIZE); // 割り込みを要求する q_cloned.interrupt(); // QUEUE_SIZEを超えたputなのでブロックするはずだが // すでにinterruptされているので割り込みエラーが発生する。 match q_cloned.put(99) { Ok(_) => { panic!("put: finish: 99, should not be here"); } Err(err) => { log::debug!("put: finish: 99, error = {:?}", err); let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap(); assert_eq!(queue_error, &QueueError::InterruptedError); } } // 割り込みが検出されると割り込み状態がリセットされることを確認する assert!(!q_cloned.is_interrupted()); please_interrupt_cloned.count_down(); // 先にputするのでブロックが発生するが // 非同期にinterruptされるので割り込みエラー match q_cloned.put(99) { Ok(_) => { panic!("put: finish: 99, should not be here"); } Err(err) => { log::debug!("put: finish: 99, error = {:?}", err); let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap(); assert_eq!(queue_error, &QueueError::InterruptedError); } } assert!(!q_cloned.is_interrupted()); }); // カウントが0になるまでブロックする // つまり最初のputのテストが終わるまで待機する please_interrupt.wait(); assert!(!handler1.is_finished()); // 非同期に割り込みを行う q.interrupt(); handler1.join().unwrap(); }
putの実装
仕様は「指定された要素をこのキューに挿入します。必要に応じて、空きが生じるまで待機します。」。 実装は以下。
空きがある場合は2)はスルーされて5)だけ実行される。空きがないつまりフルの場合は、2)の内部に入り、4)がnot_full
のwaitを呼び出すとスレッドをブロックする。
take
メソッド内で要素を取得するときにnot_full.notify_once
が実行されると、not_full
のwait
が解除される(待機しているスレッドが複数ある場合そのうち一つだけが解除される)。
impl<E: Element + 'static, Q: QueueBehavior<E>> BlockingQueueBehavior<E> for BlockingQueue<E, Q> { // ... fn put(&mut self, element: E) -> Result<()> { // 1) underlyingから必要な要素を取得する // queue_vec_mutexは内部キュー、not_fullのConvar, not_emptyのConvar let (queue_vec_mutex, not_full, not_empty) = &*self.underlying; let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap(); // 2) 内部キューがフルのときにはループする while queue_vec_mutex_guard.is_full() { // 3) 割り込みのチェック if self.check_and_update_interrupted() { log::debug!("put: return by interrupted"); return Err(QueueError::<E>::InterruptedError.into()); } log::debug!("put: blocking start..."); // 4) 空きが生じるまで待機する queue_vec_mutex_guard = not_full.wait(queue_vec_mutex_guard).unwrap(); log::debug!("put: blocking end..."); } // 5) 内部のキューにofferを使って挿入する let result = queue_vec_mutex_guard.offer(element); // 6) not_emptyを使って通知する not_empty.notify_one(); result } // ... }
takeメソッド
take
は「このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します。」という機能。要素がない場合はメソッド内でスレッドをブロックするので制御が戻ってこなくなる。
takeのテスト
#[test] #[serial] pub fn test_blocking_take() { init_logger(); let mut q = populated_queue(QueueType::VecDeque, QUEUE_SIZE); let mut q_cloned = q.clone(); let please_interrupt = Arc::new(CountDownLatch::new(1)); let please_interrupt_cloned = please_interrupt.clone(); let handler1 = thread::spawn(move || { // キューにデータをputする for _ in 0..QUEUE_SIZE.to_usize() { let _ = q_cloned.take().unwrap(); } // 割り込みを要求する q_cloned.interrupt(); // QUEUE_SIZEを超えたtakeなのでブロックするはずだが // すでにinterruptされているので割り込みエラーが発生する。 match q_cloned.take() { Ok(_) => { panic!("take: finish: should not be here"); } Err(err) => { log::debug!("take: finish: error = {:?}", err); let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap(); assert_eq!(queue_error, &QueueError::InterruptedError); } } assert!(!q_cloned.is_interrupted()); please_interrupt_cloned.count_down(); // 先にtakeするのでブロックが発生するが // 非同期にinterruptされるので割り込みエラー match q_cloned.take() { Ok(_) => { panic!("take: finish: should not be here"); } Err(err) => { log::debug!("take: finish: error = {:?}", err); let queue_error = err.downcast_ref::<QueueError<i32>>().unwrap(); assert_eq!(queue_error, &QueueError::InterruptedError); } } assert!(!q_cloned.is_interrupted()); }); // カウントが0になるまでブロックする // つまり最初のtakeのテストが終わるまで待機する please_interrupt.wait(); assert!(!handler1.is_finished()); // 非同期に割り込みを行う q.interrupt(); handler1.join().unwrap(); }
takeの実装
仕様は「このキューの先頭を取得して削除します。必要に応じて、要素が利用可能になるまで待機します」。 実装は以下。
要素が一つでもある場合は2)はスルーされて5)だけ実行される。要素がないつまり空の場合は、2)の内部に入り、4)がnot_empty
のwaitを呼び出すとスレッドをブロックする。put
メソッド内で要素を取得するときにnot_empty.notify_once
が実行されると、not_empty
のwait
が解除される(待機しているスレッドが複数ある場合そのうち一つだけが解除される)。
fn take(&mut self) -> Result<Option<E>> { // 1) underlyingから必要な要素を取得する let (queue_vec_mutex, not_full, not_empty) = &*self.underlying; let mut queue_vec_mutex_guard = queue_vec_mutex.lock().unwrap(); // 2) 内部キューが空のときにはループする while queue_vec_mutex_guard.is_empty() { // 3) 割り込みのチェック if self.check_and_update_interrupted() { log::debug!("take: return by interrupted"); return Err(QueueError::<E>::InterruptedError.into()); } log::debug!("take: blocking start..."); // 4) 要素が利用可能になるまで待機する queue_vec_mutex_guard = not_empty.wait(queue_vec_mutex_guard).unwrap(); log::debug!("take: blocking end..."); } // 5) 内部キューのpollを使って要素を取得する let result = queue_vec_mutex_guard.poll(); // 6) not_fullを使って通知 not_full.notify_one(); result }
putとtakeの相互作用
put
メソッドとtake
メソッドは以下の点で相互に作用する。
- ブロッキングと通知:
put
メソッドはキューが満杯の場合にブロックし、take
メソッドはキューが空の場合にブロックする。これにより、プロデューサー(要素を追加するスレッド)とコンシューマー(要素を取り出すスレッド)は、キューの状態に応じて適切に待機または動作する - スレッド間の協調: 一方のメソッドがブロックされているスレッドを解放するために、もう一方のメソッドは条件変数を使って通知を行う。例えば、
put
メソッドは新しい要素を追加した後、take
メソッドで待機中のスレッドにキューに新しい要素が利用可能であることを通知する
whileをなぜ使うのか
一見不要そうなwhile
文は、実はブロッキングキューのような同期処理においては非常に重要だ。これは、条件変数(Condvar)を使用する際の一般的なパターンとなる。while
文を使用する理由は以下の通り:
- スプリアス・ウェイクアップ(Spurious wakeup)の防止
- スレッドは何らかの理由で誤って(スプリアス・ウェイクアップと呼ばれる)目覚めることがある。
while
文を使用することで、スレッドが再び目覚めたときに、待機条件が依然として満たされているかを再確認する
- スレッドは何らかの理由で誤って(スプリアス・ウェイクアップと呼ばれる)目覚めることがある。
- 競合状態の回避
- 複数のスレッドが同時にキューにアクセスし、条件変数によって待機が解除される場合、一つのスレッドが条件を満たして操作を行うと、他のスレッドにとってはその条件がもはや真でなくなる可能性がある。
while
文を用いることで、各スレッドは操作を行う前に条件を再チェックする
- 複数のスレッドが同時にキューにアクセスし、条件変数によって待機が解除される場合、一つのスレッドが条件を満たして操作を行うと、他のスレッドにとってはその条件がもはや真でなくなる可能性がある。
- 明確なロジック
- 条件のチェックをループ内で行うことで、コードのロジックがより明確になり、他の開発者がコードを理解しやすくなる
たとえば、put
メソッドの実装において、while
文はキューがフルである限り、スレッドをブロックし続ける。キューに空きができたときだけ、ループを抜けて要素の挿入を試みる。同様に、take
メソッドではキューが空である限りスレッドをブロックし、要素が利用可能になったときのみループを抜けて要素の取得を行う。したがって、while
文はこれらの場合において安全性と正確性を高めるために非常に重要となる。
中断機構
ブロッキングキューでは、特定の状況で操作を中断する機能が重要だ。中断機構はput
およびtake
メソッドと密接に連携して動作する。ここでは、その中断機構の実装について詳しく見ていく。
check_and_update_interrupted
put
およびtake
メソッドで呼び出されるcheck_and_update_interrupted
メソッドは、中断要求があったかどうかを判定し、その状態を更新します。もしis_interrupted
フラグがtrue
であれば、メソッドはtrue
を返し、同時に(アトミック操作により)そのフラグをfalse
に更新する。これにより、中断要求が一度だけ認識され、処理されるようになる。
fn check_and_update_interrupted(&self) -> bool { match self .is_interrupted .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) { Ok(_) => true, Err(_) => false, } }
interrupt
interrupt
メソッドでは、is_interrupted
フラグをtrue
に設定する。これにより、put
やtake
メソッドが中断される。しかし、スレッドがブロックされている場合、このフラグの設定だけでは中断は完了しません。そのため、not_empty
とnot_full
のCondvar
を用いてnotify_all
メソッドを呼び出し、すべてのスレッドのブロックを解除し、中断を実現する。
fn interrupt(&mut self) { log::debug!("interrupt: start..."); self.is_interrupted.store(true, Ordering::Relaxed); let (_, not_full, not_empty) = &*self.underlying; not_empty.notify_all(); // すべてのスレッドのブロックを解除 not_full.notify_all(); // すべてのスレッドのブロックを解除 log::debug!("interrupt: end..."); }
is_interrupted
is_interrupted
メソッドは、外部からの中断要求があったかどうかを確認するものだ。これにより、他のメソッドが中断状態を認識し、適切に反応できるようになる。
fn is_interrupted(&self) -> bool { self.is_interrupted.load(Ordering::Relaxed) }
tokio版の実装
std
ライブラリのMutex
やCondvar
などは、リソースが利用可能になるまでスレッドを物理的にブロックする。この方法は同期処理で一般的に使用され、スレッドがリソースを待っている間は他の作業を行うことができない。これはCPUリソースを消費せず、単純な同期処理には適しているが、スケーラビリティや非同期処理の観点では効率的ではない。tokioを使った非同期処理の中でこの実装を利用するのは推奨されないので、要注意。
一方で、tokioのMutex
やCondvar
は非同期の待ち受けを実現する。これはスレッドを物理的にブロックするのではなく、リソースが利用可能になるまでの操作を中断し、その間に他のタスクを実行する。この方法は非同期プログラミングで一般的で、特にI/Oバウンドの処理やスケーラブルなシステムに適している。await
キーワードを使用することで、非同期タスクの実行を「一時停止」し、リソースが利用可能になると再開する。つまるところ、std
の待ち受けはスレッドをブロックする同期的なアプローチであり、tokioの待ち受けはスレッドをブロックせずに非同期処理を行うアプローチとなる。適切な選択は、アプリケーションの要件と性能の要求によって異なる。
ということで、tokio版の実装も作った。この実装はアクターシステムを実装するときに使えそう。
あと、tokio版の実装はブロッキングしないのに、BlockingQueue
という名前になっている…。AsyncBlockingQueue
だろうか?それともNonBlockingQueue
だろうか…。命名が難しい。
queue-rs/src/queue/tokio.rs at f4de43386432b0d9873af07e7cbba41e6133a75f · j5ik2o/queue-rs · GitHub
まとめ
Rustでブロッキングキューを実装する過程を詳細に掘り下げた。JavaのBlockingQueue
を参考にしながらも、Rust独自の機能であるMutex
、Condvar
、およびアトミック操作を駆使して、効率的かつスレッドセーフなキューを作成できた。この実装は、Rustの同期プリミティブを活用し並行処理をモデリングする一つの方法を示した。また、JavaのBlockingQueue
のコンセプトをRustに適用する試みとして、異なるプログラミング言語間のアプローチを探求する良い機会となった。