Hello all,
I am trying to allow other threads (one in particular) to force a worker thread to wait a given number of times. This is a bad design intended to cope with an even worse communications protocol :( However, I think it will suffice to be able to place semaphores in the path of the thread I am trying to hobble for a time. In short, I have created a shared queue of additional semaphores. The "worker thread" uses #nextNoWait to grab every semaphore available, and sends #wait to each. It also sends #wait to a persistent semaphore that keeps it from running wild. To "signal the worker thread", I either remove an extra semaphore (again #nextNoWait) or #signal the persistent semaphore. Is there a simpler way to do this? Do you consider the persistent semaphore to be extraneous? I suspect not, but am open to pleasant surprises. An alternate design that I am considering is to have what amounts to a mutex protected integer, with one semaphore that is conditionally signalled. The multi-semaphore design is (I think) more likely to find other uses, and should be easier to debug, as individual semaphores would be un- or over-signalled and in or not in the queue vs. the integer value being wrong. Better ideas will be cheefully accepted. Have a good one, Bill -- Wilhelm K. Schwab, Ph.D. [hidden email] |
Bill,
> I am trying to allow other threads (one in particular) to force a worker > thread to wait a given number of times. [...] > > In short, I have created a shared queue of additional semaphores. [...] You haven't really described what you want to achieve, only how you have implemented it, which makes it difficult to comment ;-) Still, there's something about it that doesn't sound right to me. The use of #nextNoWait would worry me, for one thing. Not that I'm saying it's wrong in the context of your system, but that sort of method raises an alarm flag with me saying "have you considered all the race conditions?" (and, speaking personally, I generally find that I have not...). Also I don't really see why the worker thread should be "aware" of all those semaphores; its job is just to wait until something tells it to go ahead, and its the "something's" job to manage that (I'd have thought). I /suspect/ that you might find it easier if you created a higher-level IPC primitive rather than trying to work with "raw" semaphores. But then, as I said, I don't really know what you are trying to do, so that is all guesswork. (But nobody else had replied at all, so...) -- chris |
Chris,
> You haven't really described what you want to achieve, only how you have > implemented it, which makes it difficult to comment ;-) Fair enough. Consider a request/reply protocol. It is unclear whether there is a good way to know how many reply messages will be sent in response to any given request. It is very clear that if a message is lost, there is no way to request a resend. You can imagine that there is a lot of reverse engineering involved in this project. I am not the first to attempt it, nor am I the first to take note of errors and ominous silences in the specifications. Given the above, it seems reasonable to have the ability to wait on the best-guess number of responses or communications failures, and to keep going. Some type of watchdog timer is also needed, and I have that on at least one level. > Still, there's something about it that doesn't sound right to me. The use of > #nextNoWait would worry me, for one thing. Not that I'm saying it's wrong in > the context of your system, but that sort of method raises an alarm flag with > me saying "have you considered all the race conditions?" (and, speaking > personally, I generally find that I have not...). That is part of the charm of multi-threaded software =:0 However, in this case, I am fairly certain that #nextNoWait is ok; SharedQueue should protect its entrails suitably well to allow multiple threads to use #nextPut: and #nextNoWait, and if there is nothing in the queue, it would be inappropriate (under the current design/hack) for the thread to wait for one to appear. > Also I don't really see why > the worker thread should be "aware" of all those semaphores; its job is just to > wait until something tells it to go ahead, and its the "something's" job to > manage that (I'd have thought). If only there were a place to hang the "something" :( Your take on awareness is interesting. My goal was to throw the right number of stumbling blocks in the thread's path, so that it would wait an appropriate number of times. I suppose I could do the same thing with a mutex-protected value/integer; it would simpler, though the semaphores seemed more generic as they can be signalled from elsewhere in the system. > I /suspect/ that you might find it easier if > you created a higher-level IPC primitive rather than trying to work with "raw" > semaphores. What do you have in mind? > But then, as I said, I don't really know what you are trying to do, so that is > all guesswork. (But nobody else had replied at all, so...) Thanks for replying. Have a good one, Bill -- Wilhelm K. Schwab, Ph.D. [hidden email] |
In reply to this post by Chris Uppal-3
Chris,
> I /suspect/ that you might find it easier if > you created a higher-level IPC primitive rather than trying to work with "raw" > semaphores. Is the stuff below something like what you had in mind? Comments, test cases, better ideas, etc. will be cheefully accepted. Have a good one, Bill ========================= !TimedEvaluatorHeadlessTests methodsFor! testCompositeSemaphoreSignalsFirst "Do the signals first, so #wait should not block." | composite count | " TimedEvaluatorHeadlessTests new testCompositeSemaphoreSignalsFirst. " count := 4. composite := CompositeSemaphore count:count. count timesRepeat:[ composite signal. ]. composite wait. ! ! !TimedEvaluatorHeadlessTests categoriesFor: #testCompositeSemaphoreSignalsFirst!public! ! !TimedEvaluatorHeadlessTests methodsFor! testCompositeSemaphore | composite wait signal count waiterDone ok | " TimedEvaluatorHeadlessTests new testCompositeSemaphore. " count := 4. waiterDone := Semaphore new. ok := Semaphore new. composite := CompositeSemaphore count:count. self should:[ composite unsignalledSemaphoresSize = count ]. "A thread that signals the composite semaphore and watches for the (inially lack of) progress of the waiter/worker thread. Wait for the worker to signal completion, then signal the ok semaphore to let the method/test case finish." signal := [ count to:2 by:1 negated do:[ :i | self should:[ composite unsignalledSemaphoresSize = i ]. composite signal. ]. "The waiter thread should not have been able to complete yet" self should:[ waiterDone excessSignals = 0. ]. self should:[ composite unsignalledSemaphoresSize = 1. ]. "Signal the waiter one last time and then wait for the waiter thread to signal us." composite signal. waiterDone wait. "Now the waiter is finished, but the excess signal count should be zero again, because we sent #wait." self should:[ composite unsignalledSemaphoresSize = 0. ]. self should:[ waiterDone excessSignals = 0. ]. ok signal. ] forkAt:Processor userBackgroundPriority. "A thread that repeatedly waits on the composite semaphore and then signals a semaphore to allow the signaller thread to complete." wait := [ count timesRepeat:[ composite wait. ]. waiterDone signal. ] forkAt:Processor userBackgroundPriority. ok wait. self should:[ ok excessSignals = 0. ]. ! ! !TimedEvaluatorHeadlessTests categoriesFor: #testCompositeSemaphore!public! ! "Filed out from Dolphin Smalltalk XP"! Object subclass: #CompositeSemaphore instanceVariableNames: 'semaphoresToWait semaphoresToSignal mutex' classVariableNames: '' poolDictionaries: '' classInstanceVariableNames: ''! CompositeSemaphore guid: (GUID fromString: '{77F55650-1AB0-46F7-9A4F-821342D39184}')! CompositeSemaphore comment: ''! !CompositeSemaphore categoriesForClass!Kernel-Objects! ! !CompositeSemaphore methodsFor! initialize "Prepare a new instance for use" super initialize. semaphoresToWait := SharedQueue new. semaphoresToSignal := SharedQueue new. "3-05 - concerned about races between #wait and #signal; protect the pair of shared queues" mutex := Mutex new. ! injectSemaphoreCount:anInteger "Allow the outside world to force us to wait a given number of times." self injectSemaphores:( ( 1 to:anInteger ) collect:[ :each | Semaphore new ] ). ! injectSemaphores:aCollectionOfSemaphores "Allow the outside world to force us to wait on the indicated semaphores in series. Add each semaphore to both the semaphores and semaphoresToSignal shared queue; #wait and #signal draw from them, respectively." mutex critical:[ aCollectionOfSemaphores do:[ :each | semaphoresToWait nextPut:each. semaphoresToSignal nextPut:each. ]. ]. ! semaphoreToSignal ^mutex critical:[ semaphoresToSignal nextNoWait ]. ! semaphoreToWait ^mutex critical:[ semaphoresToWait nextNoWait ]. ! signal "Signal one semaphore held by the receiver." | aSemaphore | ( aSemaphore := self semaphoreToSignal ) notNil ifTrue:[ aSemaphore signal. ]. ! unsignalledSemaphoresSize "For benefit only of unit tests(??), answer the semaphores yet to be signalled by the receiver (note that they might be signalled from elsewhere in the image). Oops - SharedQueue does not copy well, so settle for the size." ^semaphoresToSignal size ! wait "Wait on one of the available semaphores." | aSemaphore | [ ( aSemaphore := self semaphoreToWait ) notNil ] whileTrue:[ aSemaphore wait. ]. ! ! !CompositeSemaphore categoriesFor: #initialize!initializing!private! ! !CompositeSemaphore categoriesFor: #injectSemaphoreCount:!public! ! !CompositeSemaphore categoriesFor: #injectSemaphores:!public! ! !CompositeSemaphore categoriesFor: #semaphoreToSignal!helpers!private! ! !CompositeSemaphore categoriesFor: #semaphoreToWait!helpers!private! ! !CompositeSemaphore categoriesFor: #signal!process synchronisation!public! ! !CompositeSemaphore categoriesFor: #unsignalledSemaphoresSize!accessing!public! ! !CompositeSemaphore categoriesFor: #wait!process synchronisation!public! ! !CompositeSemaphore class methodsFor! count:anInteger "Create a new instance that will force a caller to #wait to wait anInteger times." ^self new injectSemaphoreCount:anInteger; yourself. ! new "Answer a new instance of the receiver" ^self basicNew initialize; yourself. ! ! !CompositeSemaphore class categoriesFor: #count:!public! ! !CompositeSemaphore class categoriesFor: #new!instance creation!public! ! -- Wilhelm K. Schwab, Ph.D. [hidden email] |
Bill,
> > I /suspect/ that you might find it easier if > > you created a higher-level IPC primitive rather than trying to work > > with "raw" semaphores. > > Is the stuff below something like what you had in mind? Comments, test > cases, better ideas, etc. will be cheefully accepted. Good try, but there is /no way/ you can fool me into writing test cases for you ;-) Random thoughts in random order.... Basically this thing is a bag-of-reasons-to-block -- a collection of reasons why a process should not (yet) be allowed to return from its #wait. When the list of reasons becomes empty, the #wait returns. One thing that would worry me, is that if the bag becomes empty transiently the effects will depend on timing. It's not even clear what /should/ happen -- should the waiting process be allowed to proceed or not ? I'd be tempted to make the bag into a sort of 'latch' which, once it had become empty, would throw errors if you try to add anything more to it. (So you'd have to use a fresh bag for each 'phase' of the protocol). What I "had in mind" isn't all that relevant, since I was only speculating about what you might be wanting to do. Still, and for what little its worth, I was thinking of something like a "bag-of-blocks" (in the above sense) but implemented with an API that rather more directly reflects that idea. I'll add some code at the end if this, but its only there in case you are interested. It specifically /isn't/ well tested -- it was just cobbled together from some stuff I had kicking around (itself largely untested) as a way of exploring the idea. Actually it's not too different in concept from your own code, and even sufferers from a potential weakness if Process>>interrupt is used (which your implementation doesn't share) that I don't see how to eliminate; I'll start a new thread on that subject later. Your description of the underlying problem reminds me (rather more vividly than I /want/ to be reminded ;-) of a similar problem I had a few years ago. That was a telephony system which sent (or didn't) the most awful, unreliable, un-designed, mishmash of notifications imaginable. In that case I went the opposite route, and instead of trying to block a process until the expected messages had arrived, I made the processing entirely event-driven. I don't know about you, but if I find it easier to think about a sequence of events, each interpreted on its own merits (using some state to keep track of the changing context), than to try to code a "straight-line" algorithm that will try to process the same data sequentially -- at least if that sequence is at all complicated or variable. But that's not advice, just an observation -- and an expression of sympathy ;-) -- chris =================================== "Filed out from Dolphin Smalltalk XP"! Object subclass: #CompoundBlockingWait instanceVariableNames: 'controlSemaphore sleepSemaphore wakeupSignalsNeeded reasonsToBlock' classVariableNames: '' poolDictionaries: '' classInstanceVariableNames: ''! CompoundBlockingWait guid: (GUID fromString: '{8C986E83-16F9-424F-AD39-6D7DF61994BA}')! CompoundBlockingWait comment: 'One of these is basically a list of reasons why some Process should not be allowed to continue. Create one with, say, a list of Symbols. Add/remove further reasons using addReason[s]: / clearReason[s]:. If the list is non-empty, then any Process that calls #wait will be blocked until the list becomes non-empty.'! !CompoundBlockingWait categoriesForClass!Unclassified! ! !CompoundBlockingWait methodsFor! acquireControlMutex "private -- grab the control mutex, sleeping as necessary" controlSemaphore wait. ! addReason: anObject "add anObject to the list of 'reasons' why a caller of #wait will not be allowed to continue" self addReasons: (Array with: anObject).! addReasons: aCollection "add all the objects in aCollection to the list of 'reasons' why a caller of #wait will not be allowed to continue" self holdingControlMutexDo: [reasonsToBlock addAll: aCollection].! awakenSleepers "private -- cause any Processes that are sleeping in a #wait on this object to wake up and re-check the state of the 'reasons' list. NB: callers are not expected to ensure that the list is empty (it's perferctly legal to call this when it only /may/ be empty -- or indeed at any other time). Nor is there any guarantee that the sleepers will in fact return from #wait (unless the list /is/ empty, of course...)" self holdingControlMutexDo: [wakeupSignalsNeeded timesRepeat: [sleepSemaphore signal]. wakeupSignalsNeeded := 0].! clearReason: anObject "ensure that anObject is not on the list of 'reasons' why a caller of #wait will not be allowed to continue. If removing the given object will make our list of reasons empty, then any #wait-ers will be allowed to continue" self clearReasons: (Array with: anObject). ! clearReasons "ensure that we logner think there is any 'reason' why a caller of #wait should not be allowed to continue; any Processes blocked in a #wait may be allowed to continue. Note that #wait-ers only /may/ be allowed to continue since it is possible that another 'reason' will be added between this call and the #waiting Process(es) actually waking up and retesting the condition" "NB: this implementation is not atomic, hence (if some other Process is concurrently adding reasons while this executes) we may never pass through a state where the reason list is actually empty. There is no point in trying to ensure that that happens, since even if we did, there would be no guarantee that any sleeping Processes would wake up at the right moment to 'catch' the reason list in its empty state, and hence no guarantee that they'd ever return from their calls to our #wait" self clearReasons: self outstandingReasons. ! clearReasons: aCollection "ensure that none of the anObjects in the given collection are on the list of 'reasons' why a caller of #wait will not be allowed to continue. If removing those objects makes our list of reasons empty, then any #wait-ers may be allowed to continue. Note that #wait-ers only /may/ be allowed to continue since it is possible that another 'reason' will be added between this call and the #waiting Process(es) actually waking up and retesting the condition" self holdingControlMutexDo: [aCollection do: [:each | reasonsToBlock remove: each ifAbsent: []]]. "since we have released the control mutex, by the time we get here the reason list may be non-empty again even if we removed all the elements in the above loop. If that's so then the following will be a null-op" self awakenSleepers.! holdingControlMutexDo: a0Block "private -- answer the result of evaluating a0Block whilst holding the control semaphore. NB: this is /not/ recursive (we are using a Semaphore, not a Mutex)" self acquireControlMutex. ^ a0Block ensure: [self releaseControlMutex].! initialize "private -- establish a coherent initial state" controlSemaphore := Semaphore forMutualExclusion. sleepSemaphore := Semaphore new. wakeupSignalsNeeded := 0. reasonsToBlock := self makeOutstandingReasonsColllection. ^ super initialize.! makeOutstandingReasonsColllection "answer a Collection that will be used to hold our 'set' of outstanding reasons for blocking" "use an IdentitySet on the assumption that 'reasons' will be Symbols and that duplicate occurences are intended to be conflated. This should actually be a policy decision of some kind rather than being hardwired" ^ IdentitySet new.! outstandingReasons "answer a Collection of the currently outstanding reasons we have for not allowing Processes to call #wait without blocking. This is atomic w.r.t to #addReasons: and #clearReasons:, but of course the answered collection is only guaranteed to /have been/ correct, it may already be out-of-date by the time the caller sees it" ^ self holdingControlMutexDo: [reasonsToBlock copy].! printOn: aStream "append a develper-friendly representation of the receiver to aStream" aStream display: self class; nextPutAll: ' withReasons: '; display: self outstandingReasons asArray. ! releaseControlMutex "private -- release the control mutex; assumes we've alredy grabbed it" controlSemaphore signal. ! shouldBlockNow "private -- answer whether we should block at this instant of time. Note that we assume that we are already holding the control semaphore when this is called" ^ reasonsToBlock notEmpty.! sleep "private -- sleep until somebody signals the signal semaphore. Note this must only be called when we already own the control semaphore; we release that semaphore while we are asleep and regain ownership of it before we return" wakeupSignalsNeeded := wakeupSignalsNeeded + 1. self releaseControlMutex. [sleepSemaphore wait] ensure: [self acquireControlMutex].! wait "block the calling Process until the list of 'reasons' to block is empty. Note the following /NON/-guarantees: - if the list becomes empty transiently, then there is no guarantee that the caller will be released. - if becomes empty transiently while more than one Process is #waiting, then there is no guarantee that either all or none of them will be released (some may be, others may not). The reason is that while we can guarantee that any #wait-ing Process will wake up, we cannot guarantee that by the time it does so, some other Process will not have added a new 'reason' to the list -- thus causing it to go back to sleep" self holdingControlMutexDo: [[self shouldBlockNow] whileTrue: [self sleep]]. ! wouldBlock "answer whether a call to #wait would block if it had been issued at this precise instant. This is probably not much use for anything except debugging since the condition may change before the caller gets a chance to use the value answered" ^ self holdingControlMutexDo: [self shouldBlockNow].! ! !CompoundBlockingWait categoriesFor: #acquireControlMutex!helpers!private! ! !CompoundBlockingWait categoriesFor: #addReason:!operations!public! ! !CompoundBlockingWait categoriesFor: #addReasons:!operations!public! ! !CompoundBlockingWait categoriesFor: #awakenSleepers!helpers!operations!private! ! !CompoundBlockingWait categoriesFor: #clearReason:!operations!public! ! !CompoundBlockingWait categoriesFor: #clearReasons!operations!public! ! !CompoundBlockingWait categoriesFor: #clearReasons:!operations!public! ! !CompoundBlockingWait categoriesFor: #holdingControlMutexDo:!helpers!private! ! !CompoundBlockingWait categoriesFor: #initialize!initializing!private! ! !CompoundBlockingWait categoriesFor: #makeOutstandingReasonsColllection!initializing!public! ! !CompoundBlockingWait categoriesFor: #outstandingReasons!accessing!public! ! !CompoundBlockingWait categoriesFor: #printOn:!public! ! !CompoundBlockingWait categoriesFor: #releaseControlMutex!helpers!private! ! !CompoundBlockingWait categoriesFor: #shouldBlockNow!helpers!private!testing! ! !CompoundBlockingWait categoriesFor: #sleep!helpers!private! ! !CompoundBlockingWait categoriesFor: #wait!operations!public! ! !CompoundBlockingWait categoriesFor: #wouldBlock!public!testing! ! !CompoundBlockingWait class methodsFor! new "private -- answer a new instance with default initialisation, and hence one that will allow #wait-ers to proceed without delay. Note that using this method directlty is dangerous unless you take care to ensure that no other Processes are able to 'see' the resulting instance during the period between its creation and its being populated with whatever 'reasons' are relevant. That's why this methd is marked private" ^ (self basicNew) initialize; yourself.! withReasons: aCollection "answer a new instance that is already populated with the given collection of 'reasons' for waiting, and which will therefore cause anything that calls its #wait to be blocked until some other thread has cleared all the supplied reasons (and any others that have been added since)" ^ (self new) addReasons: aCollection; yourself.! ! !CompoundBlockingWait class categoriesFor: #new!instance creation!private! ! !CompoundBlockingWait class categoriesFor: #withReasons:!instance creation!public! ! |
In reply to this post by Schwab,Wilhelm K
Blair,
I have received some off-line replies, which all seem to dance around something that I thought was clear from my original post, but now find was not explicitly stated. So, here goes: is it legal to initialize a semaphore with a negative number of excess signals? Presumably it would be done in a variant of #initialize called from a class-side #newTimesWait: or something. Is that legal? It would happen before any processes were asked to #wait on the semaphore. I haven't actually tried this yet, but a very quick test based on #primSetSignals: appeared not to melt any wires. Have a good one, Bill -- Wilhelm K. Schwab, Ph.D. [hidden email] |
"Bill Schwab" <[hidden email]> wrote in message
news:d1qdid$ve0$[hidden email]... > Blair, > > I have received some off-line replies, which all seem to dance around > something that I thought was clear from my original post, but now find was > not explicitly stated. So, here goes: is it legal to initialize a > semaphore with a negative number of excess signals? Presumably it would > be done in a variant of #initialize called from a class-side > #newTimesWait: or something. Is that legal? ... No, not really. The signal count is not supposed to drop below zero. I think you may get away with it though. Regards Blair |
Blair,
>>I have received some off-line replies, which all seem to dance around >>something that I thought was clear from my original post, but now find was >>not explicitly stated. So, here goes: is it legal to initialize a >>semaphore with a negative number of excess signals? Presumably it would >>be done in a variant of #initialize called from a class-side >>#newTimesWait: or something. Is that legal? ... > > > No, not really. The signal count is not supposed to drop below zero. I think > you may get away with it though. Hmmm. A "you might get away with it" from you means a lot more than "sure, we support that" from most players in the industry, but I think I might stick with my CompositeSemaphore. It is clumbsy, but works and uses public methods. Still, please feel free to suggest alternative solutions that might be cleaner. Thanks! Bill -- Wilhelm K. Schwab, Ph.D. [hidden email] |
Free forum by Nabble | Edit this page |