Forcing a thread to wait a specified number of times

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Forcing a thread to wait a specified number of times

Schwab,Wilhelm K
Hello all,

I am trying to allow other threads (one in particular) to force a worker
thread to wait a given number of times.  This is a bad design intended
to cope with an even worse communications protocol :(  However, I think
it will suffice to be able to place semaphores in the path of the thread
I am trying to hobble for a time.

In short, I have created a shared queue of additional semaphores.  The
"worker thread" uses #nextNoWait to grab every semaphore available, and
sends #wait to each.  It also sends #wait to a persistent semaphore that
keeps it from running wild.  To "signal the worker thread", I either
remove an extra semaphore (again #nextNoWait) or #signal the persistent
semaphore.

Is there a simpler way to do this?  Do you consider the persistent
semaphore to be extraneous?  I suspect not, but am open to pleasant
surprises.

An alternate design that I am considering is to have what amounts to a
mutex protected integer, with one semaphore that is conditionally
signalled.  The multi-semaphore design is (I think) more likely to find
other uses, and should be easier to debug, as individual semaphores
would be un- or over-signalled and in or not in the queue vs. the
integer value being wrong.  Better ideas will be cheefully accepted.

Have a good one,

Bill

--
Wilhelm K. Schwab, Ph.D.
[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Chris Uppal-3
Bill,

> I am trying to allow other threads (one in particular) to force a worker
> thread to wait a given number of times.  [...]
>
> In short, I have created a shared queue of additional semaphores.  [...]

You haven't really described what you want to achieve, only how you have
implemented it, which makes it difficult to comment ;-)

Still, there's something about it that doesn't sound right to me.  The use of
#nextNoWait would worry me, for one thing.  Not that I'm saying it's wrong in
the context of your system, but that sort of method raises an alarm flag with
me saying "have you considered all the race conditions?" (and, speaking
personally, I generally find that I have not...).   Also I don't really see why
the worker thread should be "aware" of all those semaphores; its job is just to
wait until something tells it to go ahead, and its the "something's" job to
manage that (I'd have thought).  I /suspect/ that you might find it easier if
you created a higher-level IPC primitive rather than trying to work with "raw"
semaphores.

But then, as I said, I don't really know what you are trying to do, so that is
all guesswork.  (But nobody else had replied at all, so...)

    -- chris


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Schwab,Wilhelm K
Chris,

> You haven't really described what you want to achieve, only how you have
> implemented it, which makes it difficult to comment ;-)

Fair enough.  Consider a request/reply protocol.  It is unclear whether
there is a good way to know how many reply messages will be sent in
response to any given request.  It is very clear that if a message is
lost, there is no way to request a resend.

You can imagine that there is a lot of reverse engineering involved in
this project.  I am not the first to attempt it, nor am I the first to
take note of errors and ominous silences in the specifications.

Given the above, it seems reasonable to have the ability to wait on the
best-guess number of responses or communications failures, and to keep
going.  Some type of watchdog timer is also needed, and I have that on
at least one level.




> Still, there's something about it that doesn't sound right to me.  The use of
> #nextNoWait would worry me, for one thing.  Not that I'm saying it's wrong in
> the context of your system, but that sort of method raises an alarm flag with
> me saying "have you considered all the race conditions?" (and, speaking
> personally, I generally find that I have not...).

That is part of the charm of multi-threaded software =:0  However, in
this case, I am fairly certain that #nextNoWait is ok; SharedQueue
should protect its entrails suitably well to allow multiple threads to
use #nextPut: and #nextNoWait, and if there is nothing in the queue, it
would be inappropriate (under the current design/hack) for the thread to
  wait for one to appear.


 > Also I don't really see why
> the worker thread should be "aware" of all those semaphores; its job is just to
> wait until something tells it to go ahead, and its the "something's" job to
> manage that (I'd have thought).

If only there were a place to hang the "something" :(   Your take on
awareness is interesting.  My goal was to throw the right number of
stumbling blocks in the thread's path, so that it would wait an
appropriate number of times.  I suppose I could do the same thing with a
mutex-protected value/integer; it would simpler, though the semaphores
seemed more generic as they can be signalled from elsewhere in the system.


 > I /suspect/ that you might find it easier if
> you created a higher-level IPC primitive rather than trying to work with "raw"
> semaphores.

What do you have in mind?


> But then, as I said, I don't really know what you are trying to do, so that is
> all guesswork.  (But nobody else had replied at all, so...)

Thanks for replying.

Have a good one,

Bill


--
Wilhelm K. Schwab, Ph.D.
[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Schwab,Wilhelm K
In reply to this post by Chris Uppal-3
Chris,

> I /suspect/ that you might find it easier if
> you created a higher-level IPC primitive rather than trying to work with "raw"
> semaphores.

Is the stuff below something like what you had in mind?  Comments, test
cases, better ideas, etc. will be cheefully accepted.

Have a good one,

Bill


=========================


!TimedEvaluatorHeadlessTests methodsFor!

testCompositeSemaphoreSignalsFirst
        "Do the signals first, so #wait should not block."

                | composite count |

        "
                TimedEvaluatorHeadlessTests new testCompositeSemaphoreSignalsFirst.
        "

        count := 4.
        composite := CompositeSemaphore count:count.

        count timesRepeat:[ composite signal. ].
        composite wait.
! !
!TimedEvaluatorHeadlessTests categoriesFor:
#testCompositeSemaphoreSignalsFirst!public! !

!TimedEvaluatorHeadlessTests methodsFor!

testCompositeSemaphore

                | composite wait signal count waiterDone ok |

        "
                TimedEvaluatorHeadlessTests new testCompositeSemaphore.
        "

        count := 4.
        waiterDone := Semaphore new.
        ok := Semaphore new.
        composite := CompositeSemaphore count:count.
        self should:[ composite unsignalledSemaphoresSize = count ].

        "A thread that signals the composite semaphore and watches for the
        (inially lack of) progress of the waiter/worker thread.
        Wait for the worker to signal completion, then signal the ok semaphore
        to let the method/test case finish."
        signal := [
                count to:2 by:1 negated do:[ :i |
                        self should:[ composite unsignalledSemaphoresSize = i ].
                        composite signal.
                ].

                "The waiter thread should not have been able to complete
                yet"
                self should:[ waiterDone excessSignals = 0. ].
                self should:[ composite unsignalledSemaphoresSize = 1. ].

                "Signal the waiter one last time and then wait for the waiter thread
                to signal us."
                composite signal.
                waiterDone wait.

                "Now the waiter is finished, but the excess signal count should be zero
                again, because we sent #wait."
                self should:[ composite unsignalledSemaphoresSize = 0. ].
                self should:[ waiterDone excessSignals = 0. ].
                ok signal.

        ] forkAt:Processor userBackgroundPriority.

        "A thread that repeatedly waits on the composite semaphore and then
        signals a semaphore to allow the signaller thread to complete."
        wait := [
                count timesRepeat:[
                        composite wait.
                ].
                waiterDone signal.

        ] forkAt:Processor userBackgroundPriority.

        ok wait.
        self should:[ ok excessSignals = 0. ].
! !
!TimedEvaluatorHeadlessTests categoriesFor:
#testCompositeSemaphore!public! !




"Filed out from Dolphin Smalltalk XP"!

Object subclass: #CompositeSemaphore
        instanceVariableNames: 'semaphoresToWait semaphoresToSignal mutex'
        classVariableNames: ''
        poolDictionaries: ''
        classInstanceVariableNames: ''!
CompositeSemaphore guid: (GUID fromString:
'{77F55650-1AB0-46F7-9A4F-821342D39184}')!
CompositeSemaphore comment: ''!
!CompositeSemaphore categoriesForClass!Kernel-Objects! !
!CompositeSemaphore methodsFor!

initialize
        "Prepare a new instance for use"

        super initialize.

        semaphoresToWait := SharedQueue new.
        semaphoresToSignal := SharedQueue new.

        "3-05 - concerned about races between #wait and #signal; protect the
pair of shared queues"
        mutex := Mutex new.
!

injectSemaphoreCount:anInteger
        "Allow the outside world to force us to wait a given number of times."

        self injectSemaphores:(
                ( 1 to:anInteger ) collect:[ :each | Semaphore new ]
        ).
!

injectSemaphores:aCollectionOfSemaphores
        "Allow the outside world to force us to wait on the indicated semaphores
        in series.  Add each semaphore to both the semaphores and
semaphoresToSignal
        shared queue; #wait and #signal draw from them, respectively."

        mutex critical:[
                aCollectionOfSemaphores do:[ :each |
                        semaphoresToWait nextPut:each.
                        semaphoresToSignal nextPut:each.
                ].
        ].
!

semaphoreToSignal
        ^mutex critical:[
                semaphoresToSignal nextNoWait
        ].
!

semaphoreToWait
        ^mutex critical:[
                semaphoresToWait nextNoWait
        ].
!

signal
        "Signal one semaphore held by the receiver."

                | aSemaphore |

        ( aSemaphore := self semaphoreToSignal ) notNil
                ifTrue:[ aSemaphore signal. ].
!

unsignalledSemaphoresSize
        "For benefit only of unit tests(??), answer the semaphores yet to
        be signalled by the receiver (note that they might be signalled from
        elsewhere in the image).  Oops - SharedQueue does not copy well, so
        settle for the size."

        ^semaphoresToSignal size
!

wait
        "Wait on one of the available semaphores."

                | aSemaphore |

        [ ( aSemaphore := self semaphoreToWait ) notNil ] whileTrue:[
                aSemaphore wait.
        ].
! !
!CompositeSemaphore categoriesFor: #initialize!initializing!private! !
!CompositeSemaphore categoriesFor: #injectSemaphoreCount:!public! !
!CompositeSemaphore categoriesFor: #injectSemaphores:!public! !
!CompositeSemaphore categoriesFor: #semaphoreToSignal!helpers!private! !
!CompositeSemaphore categoriesFor: #semaphoreToWait!helpers!private! !
!CompositeSemaphore categoriesFor: #signal!process synchronisation!public! !
!CompositeSemaphore categoriesFor:
#unsignalledSemaphoresSize!accessing!public! !
!CompositeSemaphore categoriesFor: #wait!process synchronisation!public! !

!CompositeSemaphore class methodsFor!

count:anInteger
        "Create a new instance that will force a caller to #wait to wait anInteger
        times."

        ^self new injectSemaphoreCount:anInteger; yourself.
!

new
        "Answer a new instance of the receiver"
        ^self basicNew initialize; yourself.
! !
!CompositeSemaphore class categoriesFor: #count:!public! !
!CompositeSemaphore class categoriesFor: #new!instance creation!public! !



--
Wilhelm K. Schwab, Ph.D.
[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Chris Uppal-3
Bill,

> > I /suspect/ that you might find it easier if
> > you created a higher-level IPC primitive rather than trying to work
> > with "raw" semaphores.
>
> Is the stuff below something like what you had in mind?  Comments, test
> cases, better ideas, etc. will be cheefully accepted.

Good try, but there is /no way/ you can fool me into writing test cases for you
;-)

Random thoughts in random order....

Basically this thing is a bag-of-reasons-to-block -- a collection of reasons
why a process should not (yet) be allowed to return from its #wait.  When the
list of reasons becomes empty, the #wait returns.

One thing that would worry me, is that if the bag becomes empty transiently the
effects will depend on timing.  It's not even clear what /should/ happen --
should the waiting process be allowed to proceed or not ?  I'd be tempted to
make the bag into a sort of 'latch' which, once it had become empty, would
throw errors if you try to add anything more to it.  (So you'd have to use a
fresh bag for each 'phase' of the protocol).

What I "had in mind" isn't all that relevant, since I was only speculating
about what you might be wanting to do.  Still, and for what little its worth, I
was thinking of something like a "bag-of-blocks" (in the above sense) but
implemented with an API that rather more directly reflects that idea.  I'll add
some code at the end if this, but its only there in case you are interested.
It specifically /isn't/ well tested -- it was just cobbled together from some
stuff I had kicking around (itself largely untested) as a way of exploring the
idea.  Actually it's not too different in concept from your own code, and even
sufferers from a potential weakness if Process>>interrupt is used (which your
implementation doesn't share) that I don't see how to eliminate; I'll start a
new thread on that subject later.

Your description of the underlying problem reminds me (rather more vividly than
I /want/ to be reminded ;-) of a similar problem I had a few years ago.  That
was a telephony system  which sent (or didn't) the most awful, unreliable,
un-designed, mishmash of notifications imaginable.  In that case I went the
opposite route, and instead of trying to block a process until the expected
messages had arrived, I made the processing entirely event-driven.  I don't
know about you, but if I find it easier to think about a sequence of events,
each interpreted on its own merits (using some state to keep track of the
changing context), than to try to code a "straight-line" algorithm that will
try to process the same data sequentially -- at least if that sequence is at
all complicated or variable.  But that's not advice, just an observation -- and
an expression of sympathy ;-)

    -- chris

===================================
"Filed out from Dolphin Smalltalk XP"!

Object subclass: #CompoundBlockingWait
 instanceVariableNames: 'controlSemaphore sleepSemaphore wakeupSignalsNeeded
reasonsToBlock'
 classVariableNames: ''
 poolDictionaries: ''
 classInstanceVariableNames: ''!
CompoundBlockingWait guid: (GUID fromString:
'{8C986E83-16F9-424F-AD39-6D7DF61994BA}')!
CompoundBlockingWait comment: 'One of these is basically a list of reasons why
some Process should not be allowed to continue.

Create one with, say, a list of Symbols.  Add/remove further reasons using
addReason[s]: / clearReason[s]:.

If the list is non-empty, then any Process that calls #wait will be blocked
until the list becomes non-empty.'!
!CompoundBlockingWait categoriesForClass!Unclassified! !
!CompoundBlockingWait methodsFor!

acquireControlMutex
 "private -- grab the control mutex, sleeping as necessary"

 controlSemaphore wait.
!

addReason: anObject
 "add anObject to the list of 'reasons' why a caller of #wait
 will not be allowed to continue"

 self addReasons: (Array with: anObject).!

addReasons: aCollection
 "add all the objects in aCollection to the list of 'reasons' why a caller of
#wait
 will not be allowed to continue"

 self holdingControlMutexDo: [reasonsToBlock addAll: aCollection].!

awakenSleepers
 "private -- cause any Processes that are sleeping in a #wait on this object to
 wake up and re-check the state of the 'reasons' list.
 NB: callers are not expected to ensure that the list is empty (it's perferctly
legal
 to call this when it only /may/ be empty -- or indeed at any other time).
 Nor is there any guarantee that the sleepers will in fact return from #wait
 (unless the list /is/ empty, of course...)"

 self holdingControlMutexDo:
  [wakeupSignalsNeeded timesRepeat: [sleepSemaphore signal].
  wakeupSignalsNeeded := 0].!

clearReason: anObject
 "ensure that anObject is not on the list of 'reasons' why a caller of #wait
 will not be allowed to continue.  If removing the given object will make
 our list of reasons empty, then any #wait-ers will be allowed to continue"

 self clearReasons: (Array with: anObject).
!

clearReasons
 "ensure that we logner think there is any 'reason' why a caller of #wait
should not be allowed to continue;
 any Processes blocked in a #wait may be allowed to continue.
 Note that #wait-ers only /may/ be allowed to continue since it is possible
that another 'reason' will be
 added between this call and the #waiting Process(es) actually waking up and
retesting the condition"

 "NB: this implementation is not atomic, hence (if some other Process is
 concurrently adding reasons while this executes) we may never pass
 through a state where the reason list is actually empty.
 There is no point in trying to ensure that that happens, since even if
 we did, there would be no guarantee that any sleeping Processes would
 wake up at the right moment to 'catch' the reason list in its empty state,
 and hence no guarantee that they'd ever return from their calls to our
 #wait"
 self clearReasons: self outstandingReasons.
!

clearReasons: aCollection
 "ensure that none of the anObjects in the given collection are on the list of
'reasons' why a caller of #wait
 will not be allowed to continue.  If removing those objects makes our list of
reasons empty, then any
 #wait-ers may be allowed to continue.
 Note that #wait-ers only /may/ be allowed to continue since it is possible
that another 'reason' will be
 added between this call and the #waiting Process(es) actually waking up and
retesting the condition"

 self holdingControlMutexDo: [aCollection do: [:each | reasonsToBlock remove:
each ifAbsent: []]].

 "since we have released the control mutex, by the time we get here the reason
list may be non-empty
 again even if we removed all the elements in the above loop.  If that's so
then the following will be a
 null-op"
 self awakenSleepers.!

holdingControlMutexDo: a0Block
 "private -- answer the result of evaluating a0Block whilst holding the
 control semaphore.
 NB: this is /not/ recursive (we are using a Semaphore, not a Mutex)"

 self acquireControlMutex.
 ^ a0Block ensure: [self releaseControlMutex].!

initialize
 "private -- establish a coherent initial state"

 controlSemaphore := Semaphore forMutualExclusion.
 sleepSemaphore := Semaphore new.
 wakeupSignalsNeeded := 0.
 reasonsToBlock := self makeOutstandingReasonsColllection.

 ^ super initialize.!

makeOutstandingReasonsColllection
 "answer a Collection that will be used to hold our 'set' of outstanding
 reasons for blocking"

 "use an IdentitySet on the assumption that 'reasons' will be Symbols
 and that duplicate occurences are intended to be conflated.
 This should actually be a policy decision of some kind rather than
 being hardwired"
 ^ IdentitySet new.!

outstandingReasons
 "answer a Collection of the currently outstanding reasons we have
 for not allowing Processes to call #wait without blocking.
 This is atomic w.r.t to #addReasons: and #clearReasons:, but of
 course the answered collection is only guaranteed to /have been/
 correct, it may already be out-of-date by the time the caller sees it"

 ^ self holdingControlMutexDo: [reasonsToBlock copy].!

printOn: aStream
 "append a develper-friendly representation of the receiver to aStream"

 aStream
  display: self class;
  nextPutAll: ' withReasons: ';
  display: self outstandingReasons asArray.
  !

releaseControlMutex
 "private -- release the control mutex; assumes we've alredy grabbed it"

 controlSemaphore signal.
!

shouldBlockNow
 "private -- answer whether we should block at this instant of time.
 Note that we assume that we are already holding the control semaphore
 when this is called"

 ^ reasonsToBlock notEmpty.!

sleep
 "private -- sleep until somebody signals the signal semaphore.  Note this must
only be called
 when we already own the control semaphore; we release that semaphore while we
are asleep
 and regain ownership of it before we return"

 wakeupSignalsNeeded := wakeupSignalsNeeded + 1.
 self releaseControlMutex.
 [sleepSemaphore wait] ensure:
  [self acquireControlMutex].!

wait
 "block the calling Process until the list of 'reasons' to block is empty.
 Note the following /NON/-guarantees:
  - if the list becomes empty transiently, then there is no guarantee that the
caller will be released.
  - if becomes empty transiently while more than one Process is #waiting, then
there is no guarantee
    that either all or none of them will be released (some may be, others may
not).
 The reason is that while we can guarantee that any #wait-ing Process will wake
up, we cannot guarantee
 that by the time it does so, some other Process will not have added a new
'reason' to the list -- thus causing
 it to go back to sleep"

 self holdingControlMutexDo: [[self shouldBlockNow] whileTrue: [self sleep]].
!

wouldBlock
 "answer whether a call to #wait would block if it had been issued at this
 precise instant.
 This is probably not much use for anything except debugging since the
 condition may change before the caller gets a chance to use the value
 answered"

 ^ self holdingControlMutexDo: [self shouldBlockNow].! !
!CompoundBlockingWait categoriesFor: #acquireControlMutex!helpers!private! !
!CompoundBlockingWait categoriesFor: #addReason:!operations!public! !
!CompoundBlockingWait categoriesFor: #addReasons:!operations!public! !
!CompoundBlockingWait categoriesFor:
#awakenSleepers!helpers!operations!private! !
!CompoundBlockingWait categoriesFor: #clearReason:!operations!public! !
!CompoundBlockingWait categoriesFor: #clearReasons!operations!public! !
!CompoundBlockingWait categoriesFor: #clearReasons:!operations!public! !
!CompoundBlockingWait categoriesFor: #holdingControlMutexDo:!helpers!private! !
!CompoundBlockingWait categoriesFor: #initialize!initializing!private! !
!CompoundBlockingWait categoriesFor:
#makeOutstandingReasonsColllection!initializing!public! !
!CompoundBlockingWait categoriesFor: #outstandingReasons!accessing!public! !
!CompoundBlockingWait categoriesFor: #printOn:!public! !
!CompoundBlockingWait categoriesFor: #releaseControlMutex!helpers!private! !
!CompoundBlockingWait categoriesFor: #shouldBlockNow!helpers!private!testing! !
!CompoundBlockingWait categoriesFor: #sleep!helpers!private! !
!CompoundBlockingWait categoriesFor: #wait!operations!public! !
!CompoundBlockingWait categoriesFor: #wouldBlock!public!testing! !

!CompoundBlockingWait class methodsFor!

new
 "private -- answer a new instance with default initialisation, and hence one
that
 will allow #wait-ers to proceed without delay.
 Note that using this method directlty is dangerous unless you take care
 to ensure that no other Processes are able to 'see' the resulting instance
 during the period between its creation and its being populated with whatever
 'reasons' are relevant.  That's why this methd is marked private"

 ^ (self basicNew)
  initialize;
  yourself.!

withReasons: aCollection
 "answer a new instance that is already populated with the given collection
 of 'reasons' for waiting, and which will therefore cause anything that calls
 its #wait to be blocked until some other thread has cleared all the supplied
 reasons (and any others that have been added since)"

 ^ (self new)
  addReasons: aCollection;
  yourself.! !
!CompoundBlockingWait class categoriesFor: #new!instance creation!private! !
!CompoundBlockingWait class categoriesFor: #withReasons:!instance
creation!public! !


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Schwab,Wilhelm K
In reply to this post by Schwab,Wilhelm K
Blair,

I have received some off-line replies, which all seem to dance around
something that I thought was clear from my original post, but now find
was not explicitly stated.  So, here goes: is it legal to initialize a
semaphore with a negative number of excess signals?  Presumably it would
be done in a variant of #initialize called from a class-side
#newTimesWait: or something.  Is that legal?  It would happen before any
processes were asked to #wait on the semaphore.

I haven't actually tried this yet, but a very quick test based on
#primSetSignals: appeared not to melt any wires.

Have a good one,

Bill


--
Wilhelm K. Schwab, Ph.D.
[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Blair McGlashan-3
"Bill Schwab" <[hidden email]> wrote in message
news:d1qdid$ve0$[hidden email]...
> Blair,
>
> I have received some off-line replies, which all seem to dance around
> something that I thought was clear from my original post, but now find was
> not explicitly stated.  So, here goes: is it legal to initialize a
> semaphore with a negative number of excess signals?  Presumably it would
> be done in a variant of #initialize called from a class-side
> #newTimesWait: or something.  Is that legal?  ...

No, not really. The signal count is not supposed to drop below zero. I think
you may get away with it though.

Regards

Blair


Reply | Threaded
Open this post in threaded view
|

Re: Forcing a thread to wait a specified number of times

Schwab,Wilhelm K
Blair,

>>I have received some off-line replies, which all seem to dance around
>>something that I thought was clear from my original post, but now find was
>>not explicitly stated.  So, here goes: is it legal to initialize a
>>semaphore with a negative number of excess signals?  Presumably it would
>>be done in a variant of #initialize called from a class-side
>>#newTimesWait: or something.  Is that legal?  ...
>
>
> No, not really. The signal count is not supposed to drop below zero. I think
> you may get away with it though.

Hmmm.  A "you might get away with it" from you means a lot more than
"sure, we support that" from most players in the industry, but I think I
might stick with my CompositeSemaphore.  It is clumbsy, but works and
uses public methods.  Still, please feel free to suggest alternative
solutions that might be cleaner.

Thanks!

Bill

--
Wilhelm K. Schwab, Ph.D.
[hidden email]