マルチスレッド

ふと思い立って SqueakSmalltalk にあるマルチスレッド*1機能を使って遊んでいたときのこと。

stream
stream _ String new writeStream. ($1 to: $3) do: [: each | stream nextPut: each]. ($a to: $c) do: [: each | stream nextPut: each]. ^ stream contents
=> '123abc'

     ↓ (処理をブロック化してみる。直ちに評価するので実質は上と同じ。)

stream
stream _ String new writeStream. [($1 to: $3) do: [: each | stream nextPut: each]] value. [($a to: $c) do: [: each | stream nextPut: each]] value. ^ stream contents
=> '123abc'

     ↓ (ブロック化した処理を今度はフォークしてみる。結果待ち合わせのためセマフォを用意。)

stream forGoingOn1 forGoingOn2
stream _ String new writeStream. forGoingOn1 _ Semaphore new. forGoingOn2 _ Semaphore new. [($1 to: $3) do: [: each | stream nextPut: each]. forGoingOn1 signal] fork. [($a to: $c) do: [: each | stream nextPut: each]. forGoingOn2 signal] fork. forGoingOn1 wait. forGoingOn2 wait. ^ stream contents
=> '123abc'

     ↓ (フォークに際して、一方の処理の優先順位を変えてみる。)

stream forGoingOn1 forGoingOn2 defaultPriority
stream _ String new writeStream. forGoingOn1 _ Semaphore new. forGoingOn2 _ Semaphore new. defaultPriority _ Processor userSchedulingPriority. [($1 to: $3) do: [: each | stream nextPut: each]. forGoingOn1 signal] forkAt: defaultPriority. [($a to: $c) do: [: each | stream nextPut: each]. forGoingOn2 signal] forkAt: defaultPriority + 1. forGoingOn1 wait. forGoingOn2 wait. ^ stream contents
=> 'abc123'

ふむ。で、優先度(プライオリティ)を戻してから、イテレータ内で都度イールドすれば、互い違いに処理が進むようにできる。

stream forGoingOn1 forGoingOn2
stream _ String new writeStream. forGoingOn1 _ Semaphore new. forGoingOn2 _ Semaphore new. [($1 to: $3) do: [: each | stream nextPut: each. Processor yield]. forGoingOn1 signal] fork. [($a to: $c) do: [: each | stream nextPut: each. Processor yield]. forGoingOn2 signal] fork. forGoingOn1 wait. forGoingOn2 wait. ^ stream contents
=> '1a2b3c'

…と、ここまではよし。ちなみに、スレッドのプライオリティが異なると、

stream forGoingOn1 forGoingOn2 defaultPriority
stream _ String new writeStream. forGoingOn1 _ Semaphore new. forGoingOn2 _ Semaphore new. defaultPriority _ Processor userSchedulingPriority. [($1 to: $3) do: [: each | stream nextPut: each. Processor yield]. forGoingOn1 signal] forkAt: defaultPriority. [($a to: $c) do: [: each | stream nextPut: each. Processor yield]. forGoingOn2 signal] forkAt: defaultPriority + 1. forGoingOn1 wait. forGoingOn2 wait. ^ stream contents
=> 'abc123'

イールドもむなしく、互い違いにはならずに片方が優先的に実行されてしまう。これもよし。でも、ストリームへの書き込みが交互になるよう強制する専用のセマフォを用意すれば、きっと…

stream forGoingOn1 forGoingOn2 defaultPriority putSemaphore
stream _ String new writeStream. forGoingOn1 _ Semaphore new. forGoingOn2 _ Semaphore new. putSemaphore _ Semaphore new. defaultPriority _ Processor userSchedulingPriority. [($1 to: $3) do: [: each | stream nextPut: each. putSemaphore signal]. forGoingOn1 signal] forkAt: defaultPriority. [($a to: $c) do: [: each | putSemaphore wait. stream nextPut: each]. forGoingOn2 signal] forkAt: defaultPriority + 1. forGoingOn1 wait. forGoingOn2 wait. ^ stream contents
=> '112233'

ととっ… はぁ? 交互っぽい処理がなされているのは分かるけど、'112233' って…… いろいろといじくり回しても原因が分からず、もしやと思って各スレッド内で使用されるブロックのブロック変数名を定番の each から、それぞれ別のものに変えてみたら…

stream forGoingOn1 forGoingOn2 defaultPriority putSemaphore
stream _ String new writeStream. forGoingOn1 _ Semaphore new. forGoingOn2 _ Semaphore new. putSemaphore _ Semaphore new. defaultPriority _ Processor userSchedulingPriority. [($1 to: $3) do: [: num | stream nextPut: num. putSemaphore signal]. forGoingOn1 signal] forkAt: defaultPriority. [($a to: $c) do: [: chr | putSemaphore wait. stream nextPut: chr]. forGoingOn2 signal] forkAt: defaultPriority + 1. forGoingOn1 wait. forGoingOn2 wait. ^ stream contents
=> '1a2b3c'

とようやく思い通りに動いて、ほっ。

SqueakSmalltalk ではブロックがクロージャになっていないこととなにか関係あるのでしょうかねぇ…。ともあれ、想定にない動きに久々に出くわして、ちょっとだけ肝を冷やしましたとさ。


追記
念のため、昔の VI4(ブロッククロージャなどを実装した 3.2 からの派生バージョン。今は入手できない)で確認してみたところ、同名のブロック変数を使っていても期待通り動作するようです。ClosureCompiler は、インストールがうまくいかなかったり、インストールできているように見えても肝心のブロックがクロージャとして機能しなかったりで、それを動作させること自体うまくできませんでした。


追記2:
ClosureCompiler でブロックがクロージャにならなかったのは、ClosureCompiler の機能が #compileBlocksAsClosures オプションで on/off できることを知らなかったのが原因でした。このオプションを on にすることで、each のままでも正常な動作を確認できました。off にすると(つまり、クロージャでないと)異常な結果に戻るので、違いがはっきりしておもしろいです。

*1:Smalltalk システム的には、つまり Smalltalk を OS として見ればマルチ“プロセス”。いや、メモリ空間は共有しているからあくまでスレッドか…。いずれにせよ専門家に言わせれば“なんちゃって”でしょうが…(^_^;)。

#forkAndWait

フォークさせた処理の完了を待つためのセマフォの名前に、semaphore1、semaphore2(追記: forGoingOn に変更済み)では味気ないので、他に何かいいのがないかなぁ…と #signal の senders of it (alt-/cmd-n) を眺めていたら、セレンディプタスに BlockContext >> #forkAndWait なるものを発見。

BlockContext >> forkAndWait
   | semaphore |
   semaphore _ Semaphore new.
   [self ensure: [semaphore signal]] fork.
   semaphore wait

いちいち書いていたものを自動的にやってくれているのですね。あと、なるほど、処理完了待ちセマフォへのシグナルは #ensure: でするのがスジなのですよね…などと学んだりしつつ。 で、これを使うと、

stream
stream _ String new writeStream. [($1 to: $3) do: [: num | stream nextPut: num. Processor yield]] fork. [($a to: $c) do: [: chr | stream nextPut: chr. Processor yield]] forkAndWait. ^ stream contents
=> '1a2b3c'

とか、

stream putSemaphore
stream _ String new writeStream. putSemaphore _ Semaphore new. [($a to: $c) do: [: chr | putSemaphore wait. stream nextPut: chr]] forkAt: Processor userSchedulingPriority + 1. [($1 to: $3) do: [: num | stream nextPut: num. putSemaphore signal]] forkAndWait. ^ stream contents
=> '1a2b3c'

などと処理終了待ちセマフォの存在を明示的にしなくてよいぶん、(うまい具合に書き換えられれば)ほぼ同じことを比較的すっきり書くことができるので、テストスクリプト向きとしては使えそう。 でも、肝心の semaphore1、semaphore2 に代わるそれらしい名前は、見つからずじまいで orz 。


むむ。semaphore for going on で、semaForGoingOn とか forGoingOn なんてどうでしょうかね。ちょっと上を書き換えてみましょう。多少なりとも意味が通じやすくなりますでしょうか。


あと、完了待ちセマフォはスレッドごとに用意するのと、ひとつを使い回すのとでどちらが分かりやすいでしょうかねぇ…。

stream forGoingOn
stream _ String new writeStream. forGoingOn _ Semaphore new. [($1 to: $3) do: [: num | stream nextPut: num. Processor yield]. forGoingOn signal] fork. [($a to: $c) do: [: chr | stream nextPut: chr. Processor yield]. forGoingOn signal] fork. forGoingOn wait; wait. ^ stream contents
=> '1a2b3c'