SharedQueue doesn't scale

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

SharedQueue doesn't scale

Igor Stasenko
Hello,

just out of interest, i tried to compare the speed of my FIFOQueue
implementation and SharedQueue,
using Levente's benchmarks for stack:

"The linked Stack implementation"
(1 to: 5) collect: [ :run |
       | stack |
       Smalltalk garbageCollect.
       stack := FIFOQueue new.
       {
               [ 1 to: 1000000 do: [ :each | stack nextPut: each ] ] timeToRun.
               [ 1 to: 1000000 do: [ :each | stack next ] ] timeToRun } ]

 #(#(291 69) #(170 65) #(168 66) #(168 65) #(168 65))

Then i changed FIFOQueue  to SharedQueue and run it again..
waiting 1 minute.. wait a bit more.. then i came to smoke.. and after
returning, it was still running..
i interrupted it, and inspected the queue size.. it was slightly above
300000 items.

Of course, SharedQueue usually not used in scenarios, where you need
to push such large number of items.
So, its just a warning.

Btw, here is another comparison (Stack vs thread-safe LIFO queue):

(1 to: 5) collect: [ :run |
       | stack |
       Smalltalk garbageCollect.
       stack := Stack new.
       {
               [ 1 to: 1000000 do: [ :each | stack push: each ] ] timeToRun.
               [ 1 to: 1000000 do: [ :each | stack pop ] ] timeToRun } ]

Stack:
   #(#(166 94) #(160 90) #(162 91) #(162 92) #(160 92))

LIFOQueue:
 #(#(172 250) #(174 248) #(172 250) #(174 252) #(172 250))

Yes, it is slower (mainly for reading). But it is price for being thread safe :)


--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue doesn't scale

Stéphane Ducasse
Igor sorry we were busy running around :)
So could you summarize
        with pros and cons
        with potential path for the next actions
        May be we should schedule something for 1.3

Stef


On Oct 16, 2010, at 4:19 AM, Igor Stasenko wrote:

> Hello,
>
> just out of interest, i tried to compare the speed of my FIFOQueue
> implementation and SharedQueue,
> using Levente's benchmarks for stack:
>
> "The linked Stack implementation"
> (1 to: 5) collect: [ :run |
>       | stack |
>       Smalltalk garbageCollect.
>       stack := FIFOQueue new.
>       {
>               [ 1 to: 1000000 do: [ :each | stack nextPut: each ] ] timeToRun.
>               [ 1 to: 1000000 do: [ :each | stack next ] ] timeToRun } ]
>
> #(#(291 69) #(170 65) #(168 66) #(168 65) #(168 65))
>
> Then i changed FIFOQueue  to SharedQueue and run it again..
> waiting 1 minute.. wait a bit more.. then i came to smoke.. and after
> returning, it was still running..
> i interrupted it, and inspected the queue size.. it was slightly above
> 300000 items.
>
> Of course, SharedQueue usually not used in scenarios, where you need
> to push such large number of items.
> So, its just a warning.
>
> Btw, here is another comparison (Stack vs thread-safe LIFO queue):
>
> (1 to: 5) collect: [ :run |
>       | stack |
>       Smalltalk garbageCollect.
>       stack := Stack new.
>       {
>               [ 1 to: 1000000 do: [ :each | stack push: each ] ] timeToRun.
>               [ 1 to: 1000000 do: [ :each | stack pop ] ] timeToRun } ]
>
> Stack:
>   #(#(166 94) #(160 90) #(162 91) #(162 92) #(160 92))
>
> LIFOQueue:
> #(#(172 250) #(174 248) #(172 250) #(174 252) #(172 250))
>
> Yes, it is slower (mainly for reading). But it is price for being thread safe :)
>
>
> --
> Best regards,
> Igor Stasenko AKA sig.
>


Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue doesn't scale

Igor Stasenko
On 17 October 2010 12:01, Stéphane Ducasse <[hidden email]> wrote:
> Igor sorry we were busy running around :)
> So could you summarize
>        with pros and cons
>        with potential path for the next actions
>        May be we should schedule something for 1.3
>

The two implementations i made in Atomic-Collections package run
without using any kind of locking (i.e. semaphores).
However, we should make a difference between lock-free and wait-free algorithms.

Inserting items in queue (both FIFO and LIFO) using no wait-loops, and
can be considered a O(1) operation,
if we neglect the cost of allocating new queue item.

But to extract items, both FIFO and LIFO queues using a wait loops, if
queue is in the middle of extraction by
other process.
This means, that if there is only one process, which extracting items
from queue,
extraction will be always wait-free. But if you having more than one,
you should expect wait-loops to be used.
There is, however a way to give a quick answer without entering a
wait-loop at attempt
to extract a single item from queue. But it won't be deterministic answer:
 - #nextIfNone: will return nil if there either no items in queue or
queue is currently in the middle of extraction by
other process.

I think that there should be two distinct methods(protocol) for
accessing queue in wait-free way,
which not always guarantees deterministic answers, and methods, which
using wait loops.
Because currently, in order to match with SharedQueue protocol,
there is a mix of methods with wait-free and wait-loops to match the
SharedQueue behavior.

Then, applications, which using these queues, could have a guarantees
that any access to queue
are wait-free, if they can neglect that answers are non-deterministic,
or use methods with wait-loops to always have a deterministic answers.

> Stef
>
>
> On Oct 16, 2010, at 4:19 AM, Igor Stasenko wrote:
>
--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue doesn't scale

Igor Stasenko
On 17 October 2010 13:42, Igor Stasenko <[hidden email]> wrote:

> On 17 October 2010 12:01, Stéphane Ducasse <[hidden email]> wrote:
>> Igor sorry we were busy running around :)
>> So could you summarize
>>        with pros and cons
>>        with potential path for the next actions
>>        May be we should schedule something for 1.3
>>
>
> The two implementations i made in Atomic-Collections package run
> without using any kind of locking (i.e. semaphores).
> However, we should make a difference between lock-free and wait-free algorithms.
>
> Inserting items in queue (both FIFO and LIFO) using no wait-loops, and
> can be considered a O(1) operation,
> if we neglect the cost of allocating new queue item.
>
> But to extract items, both FIFO and LIFO queues using a wait loops, if
> queue is in the middle of extraction by
> other process.
> This means, that if there is only one process, which extracting items
> from queue,
> extraction will be always wait-free. But if you having more than one,
> you should expect wait-loops to be used.
> There is, however a way to give a quick answer without entering a
> wait-loop at attempt
> to extract a single item from queue. But it won't be deterministic answer:
>  - #nextIfNone: will return nil if there either no items in queue or
> queue is currently in the middle of extraction by
> other process.

oops.. #nextIfNone: , will evaluate a block if there is no items in queue,
it is a #nextOrNil, which answers nil if there are no items in queue.

>
> I think that there should be two distinct methods(protocol) for
> accessing queue in wait-free way,
> which not always guarantees deterministic answers, and methods, which
> using wait loops.
> Because currently, in order to match with SharedQueue protocol,
> there is a mix of methods with wait-free and wait-loops to match the
> SharedQueue behavior.
>
> Then, applications, which using these queues, could have a guarantees
> that any access to queue
> are wait-free, if they can neglect that answers are non-deterministic,
> or use methods with wait-loops to always have a deterministic answers.
>
>> Stef
>>
>>
>> On Oct 16, 2010, at 4:19 AM, Igor Stasenko wrote:
>>
> --
> Best regards,
> Igor Stasenko AKA sig.
>



--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] SharedQueue doesn't scale

Levente Uzonyi-2
In reply to this post by Igor Stasenko
On Sat, 16 Oct 2010, Igor Stasenko wrote:

> Hello,
>
> just out of interest, i tried to compare the speed of my FIFOQueue
> implementation and SharedQueue,
> using Levente's benchmarks for stack:
>
> "The linked Stack implementation"
> (1 to: 5) collect: [ :run |
>       | stack |
>       Smalltalk garbageCollect.
>       stack := FIFOQueue new.
>       {
>               [ 1 to: 1000000 do: [ :each | stack nextPut: each ] ] timeToRun.
>               [ 1 to: 1000000 do: [ :each | stack next ] ] timeToRun } ]
>
> #(#(291 69) #(170 65) #(168 66) #(168 65) #(168 65))
>
> Then i changed FIFOQueue  to SharedQueue and run it again..
> waiting 1 minute.. wait a bit more.. then i came to smoke.. and after
> returning, it was still running..
> i interrupted it, and inspected the queue size.. it was slightly above
> 300000 items.
>
> Of course, SharedQueue usually not used in scenarios, where you need
> to push such large number of items.
> So, its just a warning.

SharedQueue's code for "growing" (#makeRoomAtEnd) is crap IMHO. Because of
that it takes O(n) time to add or remove and element to or from the queue.
SharedQueue2 is a lot better approach, because it doesn't try to
reimplement a dynamic array, but uses OrderedCollection instead.

Btw calling a LIFO datastructure a queue is strange, because
a queue is always FIFO. Please call it a stack.


Levente

>
> Btw, here is another comparison (Stack vs thread-safe LIFO queue):
>
> (1 to: 5) collect: [ :run |
>       | stack |
>       Smalltalk garbageCollect.
>       stack := Stack new.
>       {
>               [ 1 to: 1000000 do: [ :each | stack push: each ] ] timeToRun.
>               [ 1 to: 1000000 do: [ :each | stack pop ] ] timeToRun } ]
>
> Stack:
>   #(#(166 94) #(160 90) #(162 91) #(162 92) #(160 92))
>
> LIFOQueue:
> #(#(172 250) #(174 248) #(172 250) #(174 252) #(172 250))
>
> Yes, it is slower (mainly for reading). But it is price for being thread safe :)
>
>
> --
> Best regards,
> Igor Stasenko AKA sig.
>
> _______________________________________________
> Pharo-project mailing list
> [hidden email]
> http://lists.gforge.inria.fr/cgi-bin/mailman/listinfo/pharo-project
>

Reply | Threaded
Open this post in threaded view
|

SharedQueue does scale (was: Re: [Pharo-project] SharedQueue doesn't scale)

Levente Uzonyi-2
On Sun, 17 Oct 2010, Levente Uzonyi wrote:

snip

> SharedQueue's code for "growing" (#makeRoomAtEnd) is crap IMHO. Because of
> that it takes O(n) time to add or remove and element to or from the queue.
> SharedQueue2 is a lot better approach, because it doesn't try to
> reimplement a dynamic array, but uses OrderedCollection instead.

I uploaded a new version of that method to the Trunk. I don't think
it's really useful, because I'm pretty sure we will get rid of both
SharedQueue and SharedQueue2 in the near future.
Anyway I did some benchmarks using Cog, and SharedQueue became
surprisingly good, though it's still not even close to Igor's new
FIFOQueue or AtomicSharedQueue.

Benchmark #1: Make a large queue

{ SharedQueue. SharedQueue2. FIFOQueue. AtomicSharedQueue } collect: [ :queueClass |
  | queue |
  queue := queueClass new.
  queueClass -> (
  (1 to: 5) collect: [ :run |
  Smalltalk garbageCollect.
  {
                [ 1 to: 1000000 do: [ :each | queue nextPut: each ] ] timeToRun.
                [ 1 to: 1000000 do: [ :each | queue next ] ] timeToRun } ]) ].

SharedQueue->#(#(1028 1615) #(945 1557) #(976 2322) #(492 459) #(489 476)).
SharedQueue2->#(#(1976 2284) #(1318 8298) #(982 692) #(1005 675) #(1002 665)).
FIFOQueue->#(#(180 67) #(184 67) #(182 69) #(181 66) #(177 67)).
AtomicSharedQueue->#(#(208 66) #(207 67) #(209 66) #(213 68) #(209 65)).

This benchmark is similar to what Igor used, but it doesn't create a
new queue between runs. It simply adds 1,000,000 elements then removes
them which is a pretty unrealistic scenario for a shared queue. The effect
of GC is pretty high on this benchmark, probably that's responsible for
the high spikes.

Benchmark #2: Sequential throughput test

{ SharedQueue. SharedQueue2. FIFOQueue. AtomicSharedQueue } collect: [ :queueClass |
  | queue |
  queue := queueClass new.
  queueClass -> (
  (1 to: 5) collect: [ :run |
  | results |
  Smalltalk garbageCollect.
  results := #(0 0).
  1 to: 1000 do: [ :round |
  results := results + {
  [ 1 to: 1000 do: [ :each | queue nextPut: each ] ] timeToRun.
  [ 1 to: 1000 do: [ :each | queue next ] ] timeToRun } ].
  results ]) ].

SharedQueue->#(#(464 452) #(472 436) #(466 437) #(463 449) #(462 452)).
SharedQueue2->#(#(949 692) #(980 663) #(984 670) #(992 670) #(958 677)).
FIFOQueue->#(#(125 67) #(263 62) #(250 76) #(262 63) #(247 81)).
AtomicSharedQueue->#(#(154 70) #(264 77) #(273 62) #(275 63) #(265 71)).

This is similar to benchmark #1, but instead of adding and removing
1,000,000 at once it's chunked up to 1,000 equal parts. It's more
realistic than benchmark #1. It's interesting that both FIFOQueue and
AtomicSharedQueue performed better in the previous benchmark, unlike the
other two queues, which are better here.

Benchmark #3: Concurrent throughput test

{ SharedQueue. SharedQueue2. FIFOQueue. AtomicSharedQueue } collect: [ :queueClass |
  | queue semaphore |
  queue := queueClass new.
  semaphore := Semaphore new.
  queueClass -> (
  (1 to: 5) collect: [ :run |
  | producers consumers |
  Smalltalk garbageCollect.
  producers := (1 to: 100) collect: [ :each |
  [ 1 to: 10000 do: [ :index | queue nextPut: each ] ] newProcess ].
  consumers := (1 to: 100) collect: [ :each |
  [
  1 to: 10000 do: [ :index | queue next ].
  semaphore signal ] newProcess ].
  [
  consumers do: [ :each | each priority: 39; resume ].
  producers do: [ :each | each priority: 39; resume ].
  100 timesRepeat: [ semaphore wait ] ] timeToRun ]) ].

SharedQueue->#(3143 2977 3034 2949 3021).
SharedQueue2->#(4280 4384 4179 4160 4181).
FIFOQueue->#(245 311 252 254 255).
AtomicSharedQueue->#(277 274 277 280 274)

This benchmark is the real concurrent stress test. 100 processes are
adding 10,000 elements to the queue while another 100 are reading from it.
It clearly shows that Igor's queues are an order of magnitude faster.
Also 200 concurrent processes cause much less slowdown compared to the
sequential tests for them.

So, even though SharedQueue is now faster than SharedQueue2, both will
have to go IMHO. :)


Levente


snip


Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue does scale (was: Re: [Pharo-project] SharedQueue doesn't scale)

Igor Stasenko
On 18 October 2010 07:00, Levente Uzonyi <[hidden email]> wrote:

> On Sun, 17 Oct 2010, Levente Uzonyi wrote:
>
> snip
>
>> SharedQueue's code for "growing" (#makeRoomAtEnd) is crap IMHO. Because of
>> that it takes O(n) time to add or remove and element to or from the queue.
>> SharedQueue2 is a lot better approach, because it doesn't try to
>> reimplement a dynamic array, but uses OrderedCollection instead.
>
> I uploaded a new version of that method to the Trunk. I don't think it's
> really useful, because I'm pretty sure we will get rid of both SharedQueue
> and SharedQueue2 in the near future.
> Anyway I did some benchmarks using Cog, and SharedQueue became surprisingly
> good, though it's still not even close to Igor's new FIFOQueue or
> AtomicSharedQueue.
>
> Benchmark #1: Make a large queue
>
> { SharedQueue. SharedQueue2. FIFOQueue. AtomicSharedQueue } collect: [
> :queueClass |
>        | queue |
>        queue := queueClass new.
>        queueClass -> (
>                (1 to: 5) collect: [ :run |
>                        Smalltalk garbageCollect.
>                        {
>                               [ 1 to: 1000000 do: [ :each | queue nextPut:
> each ] ] timeToRun.
>                               [ 1 to: 1000000 do: [ :each | queue next ] ]
> timeToRun } ]) ].
>
> SharedQueue->#(#(1028 1615) #(945 1557) #(976 2322) #(492 459) #(489 476)).
> SharedQueue2->#(#(1976 2284) #(1318 8298) #(982 692) #(1005 675) #(1002
> 665)).
> FIFOQueue->#(#(180 67) #(184 67) #(182 69) #(181 66) #(177 67)).
> AtomicSharedQueue->#(#(208 66) #(207 67) #(209 66) #(213 68) #(209 65)).
>
> This benchmark is similar to what Igor used, but it doesn't create a new
> queue between runs. It simply adds 1,000,000 elements then removes them
> which is a pretty unrealistic scenario for a shared queue. The effect of GC
> is pretty high on this benchmark, probably that's responsible for the high
> spikes.
>
> Benchmark #2: Sequential throughput test
>
> { SharedQueue. SharedQueue2. FIFOQueue. AtomicSharedQueue } collect: [
> :queueClass |
>        | queue |
>        queue := queueClass new.
>        queueClass -> (
>                (1 to: 5) collect: [ :run |
>                        | results |
>                        Smalltalk garbageCollect.
>                        results := #(0 0).
>                        1 to: 1000 do: [ :round |
>                                results := results + {
>                                        [ 1 to: 1000 do: [ :each | queue
> nextPut: each ] ] timeToRun.
>                                        [ 1 to: 1000 do: [ :each | queue next
> ] ] timeToRun } ].
>                        results ]) ].
>
> SharedQueue->#(#(464 452) #(472 436) #(466 437) #(463 449) #(462 452)).
> SharedQueue2->#(#(949 692) #(980 663) #(984 670) #(992 670) #(958 677)).
> FIFOQueue->#(#(125 67) #(263 62) #(250 76) #(262 63) #(247 81)).
> AtomicSharedQueue->#(#(154 70) #(264 77) #(273 62) #(275 63) #(265 71)).
>
> This is similar to benchmark #1, but instead of adding and removing
> 1,000,000 at once it's chunked up to 1,000 equal parts. It's more realistic
> than benchmark #1. It's interesting that both FIFOQueue and
> AtomicSharedQueue performed better in the previous benchmark, unlike the
> other two queues, which are better here.
>
> Benchmark #3: Concurrent throughput test
>
> { SharedQueue. SharedQueue2. FIFOQueue. AtomicSharedQueue } collect: [
> :queueClass |
>        | queue semaphore |
>        queue := queueClass new.
>        semaphore := Semaphore new.
>        queueClass -> (
>                (1 to: 5) collect: [ :run |
>                        | producers consumers |
>                        Smalltalk garbageCollect.
>                        producers := (1 to: 100) collect: [ :each |
>                                [ 1 to: 10000 do: [ :index | queue nextPut:
> each ] ] newProcess ].
>                        consumers := (1 to: 100) collect: [ :each |
>                                [
>                                        1 to: 10000 do: [ :index | queue next
> ].
>                                        semaphore signal ] newProcess ].
>                        [
>                                consumers do: [ :each | each priority: 39;
> resume ].
>                                producers do: [ :each | each priority: 39;
> resume ].
>                                100 timesRepeat: [ semaphore wait ] ]
> timeToRun ]) ].
>
> SharedQueue->#(3143 2977 3034 2949 3021).
> SharedQueue2->#(4280 4384 4179 4160 4181).
> FIFOQueue->#(245 311 252 254 255).
> AtomicSharedQueue->#(277 274 277 280 274)
>
> This benchmark is the real concurrent stress test. 100 processes are adding
> 10,000 elements to the queue while another 100 are reading from it. It
> clearly shows that Igor's queues are an order of magnitude faster. Also 200
> concurrent processes cause much less slowdown compared to the sequential
> tests for them.
>
> So, even though SharedQueue is now faster than SharedQueue2, both will have
> to go IMHO. :)
>

Thanks, Levente for giving a feedback.
Please, feel free to shape my classes in more complete from (such as
proper naming),
to make them ready for inclusion in both Squeak's and Pharo cores.
I propose the following names:
AtomicQueue (base class) -> AtomicCollection
FIFOQueue -> AtomicQueue
LIFOQueue -> AtomicStack
If you, or anyone else having better suggestions, speak now :)

In any case, i'm am open to discuss further details and possible
caveats of using new classes
to anyone interested in using them.

P.S. As a side note, i now can explain (to myself at first place), why
i intuitively choosed to used atomic swap
instead of CAS.
Because it fits better fits with smalltalk language semantics:

A swap operation in smalltalk implemented as two assignments:
x := y. y := z.
An assignments is basic operation, which have nothing to do with
late-bound nature of language.
Unless we going to introduce a meta-object protocol(s), which could
turn a simple assignment
into some message sends under the hood, it will remain a basic,
early-bound operation.
And even if we do, it is highly unlikely, that even then we will throw
away the old,
simple assignment, which identifies an assignment source & target at
compile time.

In contrast, a CAS operation , if written in smalltalk looks like:

(a == b ) ifTrue: [ a := c ]

so, it having two message sends (#== , #ifTrue:), and from strict,
pure language perspective,
this using a late-bound semantics (a message sends),
and as any message send, the message result and behavior cannot be
predicted at compile time
and therefore its wrong to assume that such statement could be an
atomic operation.

Unless, of course, we introduce a new language syntax which will
denote a CAS operation explicitly.

>
> Levente
>
>
> snip
>


--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue does scale

Göran Krampe
Hi guys!

I am in China and haven't read this thread etc, but regarding improved
SharedQueue - have you looked at SharedStreams:

http://map.squeak.org/packagebyname/sharedstreams

...the SharedBufferStream in there was much faster for lots of objects
or characters, since it relied on primitives. In fact, a looooong time
ago Stephen Pair thought it should have replaced SharedQueue.

regards, Göran

Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue does scale

Levente Uzonyi-2
On Mon, 18 Oct 2010, Göran Krampe wrote:

> Hi guys!
>
> I am in China and haven't read this thread etc, but regarding improved
> SharedQueue - have you looked at SharedStreams:
>
> http://map.squeak.org/packagebyname/sharedstreams
>
> ...the SharedBufferStream in there was much faster for lots of objects or
> characters, since it relied on primitives. In fact, a looooong time ago
> Stephen Pair thought it should have replaced SharedQueue.
I just checked it. It works like SharedQueue, but it seems to be designed
to hold Characters (#nextLine, #nextLineCr, etc). It also works with
arrays, so it can be a replacement of SharedQueue. It's design is similar
to SharedQueue's. It uses a Semaphore for synchronization. I don't see
where it relies on primitives, but relying on them nowadays is not future
proof. The performance of pure smalltalk code is on par or better than
some primitives with CogVM.

Here are the results for the three benchmarks I did with an Array as
buffer:

SharedBufferStream->#(#(1052 1309) #(521 533) #(530 543) #(532 537) #(530 530)).

SharedBufferStream->#(#(525 537) #(556 518) #(490 576) #(530 542) #(547 532)).

SharedBufferStream->#(4253 1990 3099 2089 2928).

For the first two benchmarks the results a bit worse than the "new"
SharedQueue's, for the third it's sometimes better, sometimes worse.
Anyway it's still an order of magnitude slower than Igor's atomic
collections.

Btw SharedBufferStream >> #makeRoomAtEndFor: seems to have at least 2
bugs, after fixing one of them the thrid benchmark gives a bit smoother
results:
SharedBufferStream->#(3010 2894 2034 3028 3014)


Levente

>
> regards, Göran
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Levente Uzonyi-2
In reply to this post by Igor Stasenko
On Mon, 18 Oct 2010, Igor Stasenko wrote:

snip

>

Thanks, Levente for giving a feedback.
Please, feel free to shape my classes in more complete from (such as
proper naming),
to make them ready for inclusion in both Squeak's and Pharo cores.
I propose the following names:
AtomicQueue (base class) -> AtomicCollection
FIFOQueue -> AtomicQueue
LIFOQueue -> AtomicStack
If you, or anyone else having better suggestions, speak now :)



I think these should be the names:

FIFOQueue -> SharedQueue
LIFOQueue -> SharedStack

I don't know a really good name for AtomicQueue, maybe SharedList,
SharedCollection or SharedListStub.


Levente



In any case, i'm am open to discuss further details and possible
caveats of using new classes
to anyone interested in using them.

P.S. As a side note, i now can explain (to myself at first place), why
i intuitively choosed to used atomic swap
instead of CAS.
Because it fits better fits with smalltalk language semantics:

A swap operation in smalltalk implemented as two assignments:
x := y. y := z.
An assignments is basic operation, which have nothing to do with
late-bound nature of language.
Unless we going to introduce a meta-object protocol(s), which could
turn a simple assignment
into some message sends under the hood, it will remain a basic,
early-bound operation.
And even if we do, it is highly unlikely, that even then we will throw
away the old,
simple assignment, which identifies an assignment source & target at
compile time.

In contrast, a CAS operation , if written in smalltalk looks like:

(a == b ) ifTrue: [ a := c ]

so, it having two message sends (#== , #ifTrue:), and from strict,
pure language perspective,
this using a late-bound semantics (a message sends),
and as any message send, the message result and behavior cannot be
predicted at compile time
and therefore its wrong to assume that such statement could be an
atomic operation.

Unless, of course, we introduce a new language syntax which will
denote a CAS operation explicitly.

>
> Levente
>
>
> snip
>


--
Best regards,
Igor Stasenko AKA sig.

_______________________________________________
Pharo-project mailing list
[hidden email]
http://lists.gforge.inria.fr/cgi-bin/mailman/listinfo/pharo-project

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Igor Stasenko
On 19 October 2010 02:06, Levente Uzonyi <[hidden email]> wrote:

> On Mon, 18 Oct 2010, Igor Stasenko wrote:
>
> snip
>
>>
>
> Thanks, Levente for giving a feedback.
> Please, feel free to shape my classes in more complete from (such as
> proper naming),
> to make them ready for inclusion in both Squeak's and Pharo cores.
> I propose the following names:
> AtomicQueue (base class) -> AtomicCollection
> FIFOQueue -> AtomicQueue
> LIFOQueue -> AtomicStack
> If you, or anyone else having better suggestions, speak now :)
>
>
>
> I think these should be the names:
>
> FIFOQueue -> SharedQueue

this name already used by Kernel.
So, unless my class will fully replace it, i see no way how i could
use this name in separate package.

Also, i must stress, that behavior of FIFOQueue only attempts to
closely resemble the SharedQueue behavior.
However, there is a situations (related to how scheduling works and
use of #yield), where using them could lead to deadlocks.
In this regard, an AtomicSharedQueue (a subclass of FIFOQueue) is much
better analogy to current SharedQueue.

As i noted in another mail, i see that we might also provide a
separate wait-free interface. So we can guarantee,
that if you using only wait-free interface, a queue can never be the
cause of deadlock.

> LIFOQueue -> SharedStack
>
> I don't know a really good name for AtomicQueue, maybe SharedList,
> SharedCollection or SharedListStub.
>
>
> Levente
>
>
>
> In any case, i'm am open to discuss further details and possible
> caveats of using new classes
> to anyone interested in using them.
>
> P.S. As a side note, i now can explain (to myself at first place), why
> i intuitively choosed to used atomic swap
> instead of CAS.
> Because it fits better fits with smalltalk language semantics:
>
> A swap operation in smalltalk implemented as two assignments:
> x := y. y := z.
> An assignments is basic operation, which have nothing to do with
> late-bound nature of language.
> Unless we going to introduce a meta-object protocol(s), which could
> turn a simple assignment
> into some message sends under the hood, it will remain a basic,
> early-bound operation.
> And even if we do, it is highly unlikely, that even then we will throw
> away the old,
> simple assignment, which identifies an assignment source & target at
> compile time.
>
> In contrast, a CAS operation , if written in smalltalk looks like:
>
> (a == b ) ifTrue: [ a := c ]
>
> so, it having two message sends (#== , #ifTrue:), and from strict,
> pure language perspective,
> this using a late-bound semantics (a message sends),
> and as any message send, the message result and behavior cannot be
> predicted at compile time
> and therefore its wrong to assume that such statement could be an
> atomic operation.
>
> Unless, of course, we introduce a new language syntax which will
> denote a CAS operation explicitly.
>
>>
>> Levente
>>
>>
>> snip
>>
>
>
> --
> Best regards,
> Igor Stasenko AKA sig.
>
> _______________________________________________
> Pharo-project mailing list
> [hidden email]
> http://lists.gforge.inria.fr/cgi-bin/mailman/listinfo/pharo-project
>
>



--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Levente Uzonyi-2
On Tue, 19 Oct 2010, Igor Stasenko wrote:

> On 19 October 2010 02:06, Levente Uzonyi <[hidden email]> wrote:
>> On Mon, 18 Oct 2010, Igor Stasenko wrote:
>>
>> snip
>>
>>>
>>
>> Thanks, Levente for giving a feedback.
>> Please, feel free to shape my classes in more complete from (such as
>> proper naming),
>> to make them ready for inclusion in both Squeak's and Pharo cores.
>> I propose the following names:
>> AtomicQueue (base class) -> AtomicCollection
>> FIFOQueue -> AtomicQueue
>> LIFOQueue -> AtomicStack
>> If you, or anyone else having better suggestions, speak now :)
>>
>>
>>
>> I think these should be the names:
>>
>> FIFOQueue -> SharedQueue
>
> this name already used by Kernel.
> So, unless my class will fully replace it, i see no way how i could
> use this name in separate package.

Yes, it would be good to replace the implementation IMHO. The API seems to
be complete to me (except for copying ;)).

>
> Also, i must stress, that behavior of FIFOQueue only attempts to
> closely resemble the SharedQueue behavior.
> However, there is a situations (related to how scheduling works and
> use of #yield), where using them could lead to deadlocks.
> In this regard, an AtomicSharedQueue (a subclass of FIFOQueue) is much
> better analogy to current SharedQueue.

If you mean the case: "if process A tries to read from an empty queue,
later process B tries to do the same, then process A is guaranteed to
read before process B", then that shouldn't be a problem. It would require
an external synchronization step to make use of this feature with the
current implementation. I doubt that anyone wrote such code ever.

>
> As i noted in another mail, i see that we might also provide a
> separate wait-free interface. So we can guarantee,
> that if you using only wait-free interface, a queue can never be the
> cause of deadlock.

That's great, and it can be a future addition even if we push the current
implementation to the Trunk.


Levente

>
>> LIFOQueue -> SharedStack
>>
>> I don't know a really good name for AtomicQueue, maybe SharedList,
>> SharedCollection or SharedListStub.
>>
>>
>> Levente
>>
>>
>>
>> In any case, i'm am open to discuss further details and possible
>> caveats of using new classes
>> to anyone interested in using them.
>>
>> P.S. As a side note, i now can explain (to myself at first place), why
>> i intuitively choosed to used atomic swap
>> instead of CAS.
>> Because it fits better fits with smalltalk language semantics:
>>
>> A swap operation in smalltalk implemented as two assignments:
>> x := y. y := z.
>> An assignments is basic operation, which have nothing to do with
>> late-bound nature of language.
>> Unless we going to introduce a meta-object protocol(s), which could
>> turn a simple assignment
>> into some message sends under the hood, it will remain a basic,
>> early-bound operation.
>> And even if we do, it is highly unlikely, that even then we will throw
>> away the old,
>> simple assignment, which identifies an assignment source & target at
>> compile time.
>>
>> In contrast, a CAS operation , if written in smalltalk looks like:
>>
>> (a == b ) ifTrue: [ a := c ]
>>
>> so, it having two message sends (#== , #ifTrue:), and from strict,
>> pure language perspective,
>> this using a late-bound semantics (a message sends),
>> and as any message send, the message result and behavior cannot be
>> predicted at compile time
>> and therefore its wrong to assume that such statement could be an
>> atomic operation.
>>
>> Unless, of course, we introduce a new language syntax which will
>> denote a CAS operation explicitly.
>>
>>>
>>> Levente
>>>
>>>
>>> snip
>>>
>>
>>
>> --
>> Best regards,
>> Igor Stasenko AKA sig.
>>
>> _______________________________________________
>> Pharo-project mailing list
>> [hidden email]
>> http://lists.gforge.inria.fr/cgi-bin/mailman/listinfo/pharo-project
>>
>>
>
>
>
> --
> Best regards,
> Igor Stasenko AKA sig.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Igor Stasenko
On 19 October 2010 03:42, Levente Uzonyi <[hidden email]> wrote:

> On Tue, 19 Oct 2010, Igor Stasenko wrote:
>
>> On 19 October 2010 02:06, Levente Uzonyi <[hidden email]> wrote:
>>>
>>> On Mon, 18 Oct 2010, Igor Stasenko wrote:
>>>
>>> snip
>>>
>>>>
>>>
>>> Thanks, Levente for giving a feedback.
>>> Please, feel free to shape my classes in more complete from (such as
>>> proper naming),
>>> to make them ready for inclusion in both Squeak's and Pharo cores.
>>> I propose the following names:
>>> AtomicQueue (base class) -> AtomicCollection
>>> FIFOQueue -> AtomicQueue
>>> LIFOQueue -> AtomicStack
>>> If you, or anyone else having better suggestions, speak now :)
>>>
>>>
>>>
>>> I think these should be the names:
>>>
>>> FIFOQueue -> SharedQueue
>>
>> this name already used by Kernel.
>> So, unless my class will fully replace it, i see no way how i could
>> use this name in separate package.
>
> Yes, it would be good to replace the implementation IMHO. The API seems to
> be complete to me (except for copying ;)).
>
You mean this:

copy
        ^ self errorDontCopy

errorDontCopy
        "copying a structure, involved in concurrent operations is a bad idea"
        ^ self error: 'Copying not available'

:)

See how Squeak's EventSensor doing right thing to make a 'copy':

EventSensor>>flushAllButDandDEvents
        | newQueue oldQueue  |
       
        newQueue := SharedQueue new.
        self eventQueue ifNil:
                [eventQueue := newQueue.
                ^self].
        oldQueue := self eventQueue.
        [oldQueue size > 0] whileTrue:
                [| item type |
                item := oldQueue next.
                type := item at: 1.
                type = EventTypeDragDropFiles ifTrue: [ newQueue nextPut: item]].
        eventQueue := newQueue.

Well, you might be right, that #copy can be implemented as:

copy
   | copy |
   copy := self class new.
   [ copy put: (self nextIfNone: [ ^ copy ] ) ] repeat

if it makes any sense, to anyone..

Its hard to imagine a situation where one may need to copy existing queue,
because he simply can keep using it.


>>
>> Also, i must stress, that behavior of FIFOQueue only attempts to
>> closely resemble the SharedQueue behavior.
>> However, there is a situations (related to how scheduling works and
>> use of #yield), where using them could lead to deadlocks.
>> In this regard, an AtomicSharedQueue (a subclass of FIFOQueue) is much
>> better analogy to current SharedQueue.
>
> If you mean the case: "if process A tries to read from an empty queue, later
> process B tries to do the same, then process A is guaranteed to read before
> process B", then that shouldn't be a problem. It would require an external
> synchronization step to make use of this feature with the current
> implementation. I doubt that anyone wrote such code ever.
>
No. The problems is not in that. The problem related to scheduling and
how #yield primitive works.

Here the VM's primitiveYield:

primitiveYield
"primitively do the equivalent of Process>yield"
        | activeProc priority processLists processList |
        activeProc := self fetchPointer: ActiveProcessIndex
                                                 ofObject: self schedulerPointer.
        priority := self quickFetchInteger: PriorityIndex ofObject: activeProc.
        processLists := self fetchPointer: ProcessListsIndex ofObject: self
schedulerPointer.
        processList := self fetchPointer: priority - 1 ofObject: processLists.

        (self isEmptyList: processList) ifFalse:[
                self addLastLink: activeProc toList: processList.
                self transferTo: self wakeHighestPriority]

Note #wakeHighestPriority.

So, a fetcher (which using #yield in spin loop) with priority higher
than pusher process, will loop infinitely
blocking pusher and all lower priority processes from advancing.

To avoid this problem, one should make sure that process which pushing
new items to queue
having either higher or same priority as any fetching process(es)
using same queue.
Or use wait-free access to queue (avoid use #next, use #nextOrNil instead).

That's why in subclass - AtomicSharedQueue, i using semaphore to
workaround this issue.

And potentially, it would be good some day to have a way to say to scheduler:
please stop current process and see if you can run anything with lower priority.
(Another reason to move scheduling to language side, so we are free to
modify it in a way we like ;).

>>
>> As i noted in another mail, i see that we might also provide a
>> separate wait-free interface. So we can guarantee,
>> that if you using only wait-free interface, a queue can never be the
>> cause of deadlock.
>
> That's great, and it can be a future addition even if we push the current
> implementation to the Trunk.
>

Yes. I think that for most cases in concurrent environment, a
wait-free access is preferable way to work with queues.


>
> Levente
>



--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue does scale

Göran Krampe
In reply to this post by Levente Uzonyi-2
Hi all!

On 10/18/2010 09:38 PM, Levente Uzonyi wrote:

> On Mon, 18 Oct 2010, Göran Krampe wrote:
>
>> Hi guys!
>>
>> I am in China and haven't read this thread etc, but regarding improved
>> SharedQueue - have you looked at SharedStreams:
>>
>> http://map.squeak.org/packagebyname/sharedstreams
>>
>> ...the SharedBufferStream in there was much faster for lots of objects
>> or characters, since it relied on primitives. In fact, a looooong time
>> ago Stephen Pair thought it should have replaced SharedQueue.
>
> I just checked it. It works like SharedQueue, but it seems to be
> designed to hold Characters (#nextLine, #nextLineCr, etc). It also works

Yes, it was - originally. But then I think we did a few tiny tweaks to
make it work with any objects. IIRC I also think we found some buglets
in SharedQueue when writing SharedStreams.

Sidenote: The SharedBidirectionalStream is a kinda fun class - it uses
two SharedBufferStreams internally "cross connected" so that it can act
as a in-image approximation of a SocketStream. We built this for a
multiplexer/demultiplexer originally so that multiple Squeak processes
could talk to a remote computer and share a single Socket using small
"packets".

> with arrays, so it can be a replacement of SharedQueue. It's design is
> similar to SharedQueue's. It uses a Semaphore for synchronization. I
> don't see where it relies on primitives, but relying on them nowadays is
> not future proof. The performance of pure smalltalk code is on par or
> better than some primitives with CogVM.
 >

> Here are the results for the three benchmarks I did with an Array as
> buffer:
>
> SharedBufferStream->#(#(1052 1309) #(521 533) #(530 543) #(532 537)
> #(530 530)).
>
> SharedBufferStream->#(#(525 537) #(556 518) #(490 576) #(530 542) #(547
> 532)).
>
> SharedBufferStream->#(4253 1990 3099 2089 2928).
>
> For the first two benchmarks the results a bit worse than the "new"
> SharedQueue's, for the third it's sometimes better, sometimes worse.
> Anyway it's still an order of magnitude slower than Igor's atomic
> collections.

Great! ;)

> Btw SharedBufferStream >> #makeRoomAtEndFor: seems to have at least 2
> bugs, after fixing one of them the thrid benchmark gives a bit smoother
> results:
> SharedBufferStream->#(3010 2894 2034 3028 3014)
>
>
> Levente

Super thanks Levente for taking a look! Then we can put that package to
rest I presume. Again, thank you very much.

regards, Göran

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Levente Uzonyi-2
In reply to this post by Igor Stasenko
On Tue, 19 Oct 2010, Igor Stasenko wrote:

> On 19 October 2010 03:42, Levente Uzonyi <[hidden email]> wrote:
>> On Tue, 19 Oct 2010, Igor Stasenko wrote:
>>
>>> On 19 October 2010 02:06, Levente Uzonyi <[hidden email]> wrote:
>>>>
>>>> On Mon, 18 Oct 2010, Igor Stasenko wrote:
>>>>
>>>> snip
>>>>
>>>>>
>>>>
>>>> Thanks, Levente for giving a feedback.
>>>> Please, feel free to shape my classes in more complete from (such as
>>>> proper naming),
>>>> to make them ready for inclusion in both Squeak's and Pharo cores.
>>>> I propose the following names:
>>>> AtomicQueue (base class) -> AtomicCollection
>>>> FIFOQueue -> AtomicQueue
>>>> LIFOQueue -> AtomicStack
>>>> If you, or anyone else having better suggestions, speak now :)
>>>>
>>>>
>>>>
>>>> I think these should be the names:
>>>>
>>>> FIFOQueue -> SharedQueue
>>>
>>> this name already used by Kernel.
>>> So, unless my class will fully replace it, i see no way how i could
>>> use this name in separate package.
>>
>> Yes, it would be good to replace the implementation IMHO. The API seems to
>> be complete to me (except for copying ;)).
>>
> You mean this:
>
> copy
> ^ self errorDontCopy
>
> errorDontCopy
> "copying a structure, involved in concurrent operations is a bad idea"
> ^ self error: 'Copying not available'
>
> :)
>
> See how Squeak's EventSensor doing right thing to make a 'copy':
>
> EventSensor>>flushAllButDandDEvents
> | newQueue oldQueue  |
>
> newQueue := SharedQueue new.
> self eventQueue ifNil:
> [eventQueue := newQueue.
> ^self].
> oldQueue := self eventQueue.
> [oldQueue size > 0] whileTrue:
> [| item type |
> item := oldQueue next.
> type := item at: 1.
> type = EventTypeDragDropFiles ifTrue: [ newQueue nextPut: item]].
> eventQueue := newQueue.
>
> Well, you might be right, that #copy can be implemented as:
>
> copy
>   | copy |
>   copy := self class new.
>   [ copy put: (self nextIfNone: [ ^ copy ] ) ] repeat
>
> if it makes any sense, to anyone..
>
> Its hard to imagine a situation where one may need to copy existing queue,
> because he simply can keep using it.

I didn't say that I find copying useful, but the API is different.

>
>
>>>
>>> Also, i must stress, that behavior of FIFOQueue only attempts to
>>> closely resemble the SharedQueue behavior.
>>> However, there is a situations (related to how scheduling works and
>>> use of #yield), where using them could lead to deadlocks.
>>> In this regard, an AtomicSharedQueue (a subclass of FIFOQueue) is much
>>> better analogy to current SharedQueue.
>>
>> If you mean the case: "if process A tries to read from an empty queue, later
>> process B tries to do the same, then process A is guaranteed to read before
>> process B", then that shouldn't be a problem. It would require an external
>> synchronization step to make use of this feature with the current
>> implementation. I doubt that anyone wrote such code ever.
>>
> No. The problems is not in that. The problem related to scheduling and
> how #yield primitive works.
>
> Here the VM's primitiveYield:
>
> primitiveYield
> "primitively do the equivalent of Process>yield"
> | activeProc priority processLists processList |
> activeProc := self fetchPointer: ActiveProcessIndex
> ofObject: self schedulerPointer.
> priority := self quickFetchInteger: PriorityIndex ofObject: activeProc.
> processLists := self fetchPointer: ProcessListsIndex ofObject: self
> schedulerPointer.
> processList := self fetchPointer: priority - 1 ofObject: processLists.
>
> (self isEmptyList: processList) ifFalse:[
> self addLastLink: activeProc toList: processList.
> self transferTo: self wakeHighestPriority]
>
> Note #wakeHighestPriority.
>
> So, a fetcher (which using #yield in spin loop) with priority higher
> than pusher process, will loop infinitely
> blocking pusher and all lower priority processes from advancing.
>
> To avoid this problem, one should make sure that process which pushing
> new items to queue
> having either higher or same priority as any fetching process(es)
> using same queue.
> Or use wait-free access to queue (avoid use #next, use #nextOrNil instead).
>
> That's why in subclass - AtomicSharedQueue, i using semaphore to
> workaround this issue.

Okay. So AtomicSharedQueue is the class which can be used to replace
SharedQueue. So the names could be:

LIFOQueue -> AtomicStack
FIFOQueue -> AtomicQueue
AtomicSharedQueue -> SharedQueue

>
> And potentially, it would be good some day to have a way to say to scheduler:
> please stop current process and see if you can run anything with lower priority.

That would break the current scheduling policy.


Levente

> (Another reason to move scheduling to language side, so we are free to
> modify it in a way we like ;).
>
>>>
>>> As i noted in another mail, i see that we might also provide a
>>> separate wait-free interface. So we can guarantee,
>>> that if you using only wait-free interface, a queue can never be the
>>> cause of deadlock.
>>
>> That's great, and it can be a future addition even if we push the current
>> implementation to the Trunk.
>>
>
> Yes. I think that for most cases in concurrent environment, a
> wait-free access is preferable way to work with queues.
>
>
>>
>> Levente
>>
>
>
>
> --
> Best regards,
> Igor Stasenko AKA sig.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [Pharo-project] [squeak-dev] SharedQueue does scale (was: Re: SharedQueue doesn't scale)

Igor Stasenko
On 19 October 2010 06:29, Levente Uzonyi <[hidden email]> wrote:

> On Tue, 19 Oct 2010, Igor Stasenko wrote:
>
>> On 19 October 2010 03:42, Levente Uzonyi <[hidden email]> wrote:
>>>
>>> On Tue, 19 Oct 2010, Igor Stasenko wrote:
>>>
>>>> On 19 October 2010 02:06, Levente Uzonyi <[hidden email]> wrote:
>>>>>
>>>>> On Mon, 18 Oct 2010, Igor Stasenko wrote:
>>>>>
>>>>> snip
>>>>>
>>>>>>
>>>>>
>>>>> Thanks, Levente for giving a feedback.
>>>>> Please, feel free to shape my classes in more complete from (such as
>>>>> proper naming),
>>>>> to make them ready for inclusion in both Squeak's and Pharo cores.
>>>>> I propose the following names:
>>>>> AtomicQueue (base class) -> AtomicCollection
>>>>> FIFOQueue -> AtomicQueue
>>>>> LIFOQueue -> AtomicStack
>>>>> If you, or anyone else having better suggestions, speak now :)
>>>>>
>>>>>
>>>>>
>>>>> I think these should be the names:
>>>>>
>>>>> FIFOQueue -> SharedQueue
>>>>
>>>> this name already used by Kernel.
>>>> So, unless my class will fully replace it, i see no way how i could
>>>> use this name in separate package.
>>>
>>> Yes, it would be good to replace the implementation IMHO. The API seems
>>> to
>>> be complete to me (except for copying ;)).
>>>
>> You mean this:
>>
>> copy
>>        ^ self errorDontCopy
>>
>> errorDontCopy
>>        "copying a structure, involved in concurrent operations is a bad
>> idea"
>>        ^ self error: 'Copying not available'
>>
>> :)
>>
>> See how Squeak's EventSensor doing right thing to make a 'copy':
>>
>> EventSensor>>flushAllButDandDEvents
>>        | newQueue oldQueue  |
>>
>>        newQueue := SharedQueue new.
>>        self eventQueue ifNil:
>>                [eventQueue := newQueue.
>>                ^self].
>>        oldQueue := self eventQueue.
>>        [oldQueue size > 0] whileTrue:
>>                [| item type |
>>                item := oldQueue next.
>>                type := item at: 1.
>>                type = EventTypeDragDropFiles ifTrue: [ newQueue nextPut:
>> item]].
>>        eventQueue := newQueue.
>>
>> Well, you might be right, that #copy can be implemented as:
>>
>> copy
>>  | copy |
>>  copy := self class new.
>>  [ copy put: (self nextIfNone: [ ^ copy ] ) ] repeat
>>
>> if it makes any sense, to anyone..
>>
>> Its hard to imagine a situation where one may need to copy existing queue,
>> because he simply can keep using it.
>
> I didn't say that I find copying useful, but the API is different.
>
Why? #copy is implemented.. it just behaves differently :)

>>
>>
>>>>
>>>> Also, i must stress, that behavior of FIFOQueue only attempts to
>>>> closely resemble the SharedQueue behavior.
>>>> However, there is a situations (related to how scheduling works and
>>>> use of #yield), where using them could lead to deadlocks.
>>>> In this regard, an AtomicSharedQueue (a subclass of FIFOQueue) is much
>>>> better analogy to current SharedQueue.
>>>
>>> If you mean the case: "if process A tries to read from an empty queue,
>>> later
>>> process B tries to do the same, then process A is guaranteed to read
>>> before
>>> process B", then that shouldn't be a problem. It would require an
>>> external
>>> synchronization step to make use of this feature with the current
>>> implementation. I doubt that anyone wrote such code ever.
>>>
>> No. The problems is not in that. The problem related to scheduling and
>> how #yield primitive works.
>>
>> Here the VM's primitiveYield:
>>
>> primitiveYield
>> "primitively do the equivalent of Process>yield"
>>        | activeProc priority processLists processList |
>>        activeProc := self fetchPointer: ActiveProcessIndex
>>                                                 ofObject: self
>> schedulerPointer.
>>        priority := self quickFetchInteger: PriorityIndex ofObject:
>> activeProc.
>>        processLists := self fetchPointer: ProcessListsIndex ofObject: self
>> schedulerPointer.
>>        processList := self fetchPointer: priority - 1 ofObject:
>> processLists.
>>
>>        (self isEmptyList: processList) ifFalse:[
>>                self addLastLink: activeProc toList: processList.
>>                self transferTo: self wakeHighestPriority]
>>
>> Note #wakeHighestPriority.
>>
>> So, a fetcher (which using #yield in spin loop) with priority higher
>> than pusher process, will loop infinitely
>> blocking pusher and all lower priority processes from advancing.
>>
>> To avoid this problem, one should make sure that process which pushing
>> new items to queue
>> having either higher or same priority as any fetching process(es)
>> using same queue.
>> Or use wait-free access to queue (avoid use #next, use #nextOrNil
>> instead).
>>
>> That's why in subclass - AtomicSharedQueue, i using semaphore to
>> workaround this issue.
>
> Okay. So AtomicSharedQueue is the class which can be used to replace
> SharedQueue. So the names could be:
>
> LIFOQueue -> AtomicStack
> FIFOQueue -> AtomicQueue
> AtomicSharedQueue -> SharedQueue
>

Right, and i think i'll move all non wait-free methods (like #next )
into SharedQueue,
so AtomicQueue will not contain #next in its protocol, and will be
purely a wait-free based implementation.

>>
>> And potentially, it would be good some day to have a way to say to
>> scheduler:
>> please stop current process and see if you can run anything with lower
>> priority.
>
> That would break the current scheduling policy.
>

I prefer an 'alter' word as in 'alternative' :)

>
> Levente
>

--
Best regards,
Igor Stasenko AKA sig.

Reply | Threaded
Open this post in threaded view
|

Re: SharedQueue does scale

Stephen Pair
In reply to this post by Göran Krampe
The thing that always bothered me about SharedQueue (aside from some bugs that may be long gone) was that it didn't really behave quite like a stream (and I wanted to be able to use it with code written for streams)...I took Göran's SharedStreams and took it a bit further to try and be fully compatible with code that expected read or write streams (and at least one version I did would work efficiently for characters and bytes but still support full objects).  It also tried to firm up the concepts of end of stream vs. buffer under-run (with both blocking and non-blocking protocols when there is an under run.  I believe a version would also support buffer over-runs on insertion if you wanted to cap the size of the buffer (with similar choice about whether to block or not when hitting an overrun on insert).

Unifying this with streams in the base image would be a good thing.  As would firming up the concepts of end of stream, under-run, over-run as well as separate protocols for blocking vs non-blocking APIs (for over-run and under-run) in the entire stream framework.  The protocols for reading and writing should also be clearly delineated (and perhaps separate objects should always be used for the reading and writing and the object that acts as the collection or buffer should be factored out and its protocol well articulated).

- Stephen

2010/10/18 Göran Krampe <[hidden email]>
Hi guys!

I am in China and haven't read this thread etc, but regarding improved SharedQueue - have you looked at SharedStreams:

http://map.squeak.org/packagebyname/sharedstreams

...the SharedBufferStream in there was much faster for lots of objects or characters, since it relied on primitives. In fact, a looooong time ago Stephen Pair thought it should have replaced SharedQueue.

regards, Göran