Dear Sven,
I started to use the StampClient and intend to use it to produce data but for heartbeat and other parts I need to read from the socket as well. I wonder about the best strategy to deal with it. The naive approach. [ | event sendFrame | event := sharedQueue next. sendFrame := self newSendFrameTo: queueName sendFrame text: event convertToText. stampClient write: sendFrame. ] fork. But now the StampClient enforces a non-zero hearbeat.. so I could write something like this [ | event sendFrame | event := sharedQueue nextWaitFor: stampClient timeout * 3. event isNil ifTrue: [stampClient writeHeartBeat] ifFalse: [self convertAndSendEvent: event]. ] fork. But now I face the issue (but maybe I had it before as well) that the server will itself send an empty frame as its heartbeat function and I need to read it. So I could write... [ event := sharedQueue... "try to read all pending events? How often to repeat it read everything??" stampClient readNextFrame. ... ] fork Or to make it more involved? And create a reader and writer? procConsume := [ [stampClient runWith: [:msg | " do nothing " ]] ifCurtailed: [connectionClosed...handling]. ] fork. procProduce := [ [ | event sendFrame | event := sharedQueue next. sendFrame := self newSendFrameTo: queueName sendFrame text: event convertToText. stampClient write: sendFrame. ] ensure: [procConsume... do what exactly? ] fork. So the last options seems to be the best. But how to deal with with re-connects? How to not have have "procConsume" write the heartbeat data in the middle of the produced event? After all How did you solve that? Is the problem space clear enough? holger |
Hi Holger,
Thanks for using Stamp, your feedback and off-list improvements. This is really important. Here are some general answers to the questions you asked. First: I have not yet used a producer only scenario, in a real system. Your questions are very valid, basically you/we have to figure out how to best do this. As you are probably aware (and contrary to the fact that the protocol looks a lot like HTTP), the protocol is asynchronous, with independent reading and writing and interleaved messages. This is harder to deal with. My idea with the current implementation is that I want to separate the pure protocol (reading/writing frames from/on a medium) from the handling logic and especially from the multiprocessing logic. I want the main code base to be free of locks and extra threads - a pure single threaded event handling loop should remain possible. What I have currently running in production (reliably over days/weeks) is what can be seen in StampClient>>#runWith: The idea is that you keep on reading in a loop with a short timeout. This loop handles the heartbeat and maintains an inbox. It is still free of multithreading code or locks - these need to be added by users of the API in the block. I would guess that your scenario needs a similar mechanism. However, you need a regular opportunity to send something out. Thinking out loud, what about something like StampClient>>#runWith: receiveBlock do: sendBlock where receiveBlock is like it is now, and sendBlock is called regularly, basically when the loop goes through another cycle, to give you the opportunity to send something, being sure to have exclusive access. In the sendBlock you could query your sharedQueue that is being filled by another process, properly MP safe. The invocation of #runWith:do: should of course be forked. Does that make sense ? Sven > On 18 May 2016, at 16:10, Holger Freyther <[hidden email]> wrote: > > Dear Sven, > > I started to use the StampClient and intend to use it to produce data but for heartbeat and other parts I need to read from the socket as well. I wonder about the best strategy to deal with it. > > The naive approach. > > > [ > | event sendFrame | > event := sharedQueue next. > sendFrame := self newSendFrameTo: queueName > sendFrame text: event convertToText. > stampClient write: sendFrame. > ] fork. > > > But now the StampClient enforces a non-zero hearbeat.. so I could write something like this > > [ > | event sendFrame | > event := sharedQueue nextWaitFor: stampClient timeout * 3. > event isNil > ifTrue: [stampClient writeHeartBeat] > ifFalse: [self convertAndSendEvent: event]. > ] fork. > > > But now I face the issue (but maybe I had it before as well) that the server will itself send an empty frame as its heartbeat function and I need to read it. So I could write... > > [ > event := sharedQueue... > "try to read all pending events? How often to repeat it read everything??" > stampClient readNextFrame. > ... > ] fork > > > > Or to make it more involved? And create a reader and writer? > > procConsume := [ > [stampClient runWith: [:msg | " do nothing " ]] ifCurtailed: [connectionClosed...handling]. > ] fork. > > procProduce := [ > [ > | event sendFrame | > event := sharedQueue next. > sendFrame := self newSendFrameTo: queueName > sendFrame text: event convertToText. > stampClient write: sendFrame. > ] ensure: [procConsume... do what exactly? > ] fork. > > So the last options seems to be the best. But how to deal with with re-connects? How to not have have "procConsume" write the heartbeat data in the middle of the produced event? After all > > How did you solve that? Is the problem space clear enough? > > > holger > > |
> On 19 May 2016, at 10:40, Sven Van Caekenberghe <[hidden email]> wrote: > > Hi Holger, Dear Sven, > However, you need a regular opportunity to send something out. Thinking out loud, what about something like > > StampClient>>#runWith: receiveBlock do: sendBlock > > where receiveBlock is like it is now, and sendBlock is called regularly, basically when the loop goes through another cycle, to give you the opportunity to send something, being sure to have exclusive access. > > In the sendBlock you could query your sharedQueue that is being filled by another process, properly MP safe. > > The invocation of #runWith:do: should of course be forked. > > Does that make sense ? It makes sense for my unacknowledged SEND but I see several issues for a general scheme: a.) If write / receive ratio is not equal and I block in the send then I will not receive quickly enough. And if we block on receive (with the *TimedOut) we will not write enough. This is one general architecture issue I seem to circle around[1]. I should not have block one or the other. b.) Integration with ACKed sends (putting a receipt, reading a receipt-id). Is there a generic way to handle it? E.g. I would keep an event in the SharedQueue until it has been acked (and detect timeouts or such). Last but not least. How do you handle the ConnectionClosed and do the re-connect? It seems that >>#runWith: will exit iff ConnectionClosed signal has been raised. Will you respawn the process? Will you create another StampClient and re-execute? sorry, more these are more questions than answers. I have a local client that is similar to >>#runWith:do: (but calls receive from within the send routine). kind regards holger [1] With POSIX/C there is select(), in Windows WaitForMultipleObjects and maybe with erlang the selective receive. Now it is not very object orientated but in pseudo erlang syntax. receive FrameReady -> self handleFrame: arg; FrameToSend -> self writeEvent: arg; Disconnect -> self reconnect. ... after self timeout -> self checkRecvHeartbeatOrSendIt end I could emulate it by spawning multiple processes on "receive", creating a queue, having a semaphore.. but I don't know if I want to limit it to sockets... |
+1.
Testing the RabbitMQ client (very cool, BTW) some time ago (which is structured the same way with a single event loop), I ran into the same kind of issues Holger describes; for example, bursts of incoming data would cause a heartbeat to not be sent in time, and the server would drop connections.* At least at the time, I couldn't see a way to solve those issues in a reliable manner without resorting to using multiple threads** (but never got around to implementing that) Cheers, Henry * There also seemed to be drops related to what the client considered activity that should reset the heartbeat timeout, and what the server seemed to think, but even after (I think) fixing those, I could cause drops by flooding the client with traffic ** Yes, I know, You can configure RabbitMQ to not use heartbeats, but where's the fun in that?? |
Free forum by Nabble | Edit this page |