The Inbox: Network-ul.180.mcz

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

The Inbox: Network-ul.180.mcz

commits-2
Levente Uzonyi uploaded a new version of Network to project The Inbox:
http://source.squeak.org/inbox/Network-ul.180.mcz

==================== Summary ====================

Name: Network-ul.180
Author: ul
Time: 25 July 2016, 8:40:01.001452 pm
UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95
Ancestors: Network-nice.179

Socket changes:
- fixed the comment of #isOtherEndConnected and #isThisEndConnected
- do not slice the data (TCP) in the image in #sendData:. Let the VM, the kernel, the hardware deal with that.
- use #isOtherEndConnected when receiving data, and #isThisEndConnected when sending data instead of #isConnected
- move away from #milliseconds:since:, since we have a clock that won't roll over

=============== Diff against Network-nice.179 ===============

Item was changed:
  ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') -----
  closeAndDestroy: timeoutSeconds
  "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
 
+ socketHandle ifNil: [ ^self ].
+ self isThisEndConnected ifTrue: [
+ self close.  "Close this end." ].
+ (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
+ "The other end has not closed the connect yet, so we will just abort it."
+ self primSocketAbortConnection: socketHandle ].
+ self destroy!
- socketHandle ifNotNil: [
- self isConnected ifTrue: [
- self close.  "close this end"
- (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
- "The other end didn't close so we just abort the connection"
- self primSocketAbortConnection: socketHandle]].
- self destroy].
- !

Item was changed:
  ----- Method: Socket>>discardReceivedData (in category 'receiving') -----
  discardReceivedData
  "Discard any data received up until now, and return the number of bytes discarded."
 
  | buf totalBytesDiscarded |
  buf := String new: 10000.
  totalBytesDiscarded := 0.
+ [self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
- [self isConnected and: [self dataAvailable]] whileTrue: [
  totalBytesDiscarded :=
  totalBytesDiscarded + (self receiveDataInto: buf)].
  ^ totalBytesDiscarded
  !

Item was changed:
  ----- Method: Socket>>isOtherEndConnected (in category 'queries') -----
  isOtherEndConnected
+ "Return true if this socket is connected, or this end has closed the connection but not the other end, so we can still receive data."
- "Return true if this socket is connected, or this end has closed the connection but not the other end, so we can still send data."
 
  | state |
  socketHandle ifNil: [ ^false ].
  (state := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ].
  ^state == ThisEndClosed
  !

Item was changed:
  ----- Method: Socket>>isThisEndConnected (in category 'queries') -----
  isThisEndConnected
+ "Return true if this socket is connected, other the other end has closed the connection but not this end, so we can still send data."
- "Return true if this socket is connected, other the other end has closed the connection but not this end, so we can still receive data."
 
  | state |
  socketHandle ifNil: [ ^false ].
  (state := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ].
  ^state == OtherEndClosed
  !

Item was changed:
  ----- Method: Socket>>sendData: (in category 'sending') -----
  sendData: aStringOrByteArray
  "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
 
  "An experimental version use on slow lines: Longer timeout and smaller writes to try to avoid spurious timeouts."
 
  | bytesSent bytesToSend count |
  bytesToSend := aStringOrByteArray size.
  bytesSent := 0.
  [bytesSent < bytesToSend] whileTrue: [
  (self waitForSendDoneFor: 60)
  ifFalse: [ConnectionTimedOut signal: 'send data timeout; data not sent'].
  count := self primSocket: socketHandle
  sendData: aStringOrByteArray
  startIndex: bytesSent + 1
+ count: bytesToSend - bytesSent.
- count: (bytesToSend - bytesSent min: DefaultSendBufferSize).
  bytesSent := bytesSent + count].
 
  ^ bytesSent
  !

Item was changed:
  ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') -----
  waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock
  "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
 
+ | deadline timeLeft status |
+ deadline := Time millisecondClockValue + (timeout * 1000) truncated.
+ (status := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [^true].
+ [ (status == WaitingForConnection) and: [ (timeLeft := deadline - Time millisecondClockValue) > 0 ] ]
- | startTime msecsDelta msecsEllapsed status |
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated.
- status := self primSocketConnectionStatus: socketHandle.
- status = Connected ifTrue: [^true].
- [(status = WaitingForConnection) and: [(msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta]]
  whileTrue: [
+ semaphore waitTimeoutMSecs: timeLeft.
+ status := self primSocketConnectionStatus: socketHandle ].
+ status == Connected ifTrue: [ ^true ].
+ status == WaitingForConnection
+ ifTrue: [ timeoutBlock value ]
+ ifFalse: [ refusedBlock value ].
+ ^false!
- semaphore waitTimeoutMSecs: msecsDelta - msecsEllapsed.
- status := self primSocketConnectionStatus: socketHandle].
- status = Connected
- ifFalse: [
- status = WaitingForConnection
- ifTrue: [timeoutBlock value]
- ifFalse: [refusedBlock value].
- ^false].
- ^ true!

Item was changed:
  ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting') -----
  waitForConnectionUntil: deadline
  "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
 
+ | status timeLeft |
- | status waitTime |
  [
  (status := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ].
  status == WaitingForConnection ifFalse: [ ^false ].
+ (timeLeft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ].
+ semaphore waitTimeoutMSecs: timeLeft ] repeat!
- (waitTime := deadline - Time millisecondClockValue) > 0 ifFalse: [ ^false ].
- semaphore waitTimeoutMSecs: waitTime ] repeat!

Item was changed:
  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') -----
  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
  "Wait for the given nr of seconds for data to arrive."
 
+ | deadline timeLeft |
- | startTime msecsDelta |
  socketHandle ifNil: [ ^closedBlock value ].
+ deadline := Time millisecondClockValue + (timeout * 1000) truncated.
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated.
  [
  (self primSocketReceiveDataAvailable: socketHandle) ifTrue: [ ^self ].
+ self isOtherEndConnected ifFalse: [ ^closedBlock value ].
+ (timeLeft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^timedOutBlock value ].
- self isConnected ifFalse: [ ^closedBlock value ].
- (Time millisecondsSince: startTime) < msecsDelta ifFalse: [ ^timedOutBlock value ].
  "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
  readSemaphore waitTimeoutMSecs:
+ (timeLeft min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
- (msecsDelta - (Time millisecondsSince: startTime) min: self class maximumReadSemaphoreWaitTimeout) ] repeat!

Item was changed:
  ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') -----
  waitForDataIfClosed: closedBlock
  "Wait indefinitely for data to arrive.  This method will block until
  data is available or the socket is closed."
 
  socketHandle ifNil: [ ^closedBlock value ].
  [
  (self primSocketReceiveDataAvailable: socketHandle) ifTrue: [ ^self ].
+ self isOtherEndConnected ifFalse: [ ^closedBlock value ].
- self isConnected ifFalse: [ ^closedBlock value ].
  "ul 8/13/2014 21:16
   Providing a maximum for the time for waiting is a workaround for a VM bug which
   causes sockets waiting for data forever in some rare cases, because the semaphore
   doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout""
   part with ""wait"" when the bug is fixed."
  readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!

Item was changed:
  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting') -----
  waitForDisconnectionFor: timeout
  "Wait for the given nr of seconds for the connection to be broken.
  Return true if it is broken by the deadline, false if not.
  The client should know the connection is really going to be closed
  (e.g., because he has called 'close' to send a close request to the other end)
  before calling this method."
 
+ | deadline |
+ deadline := Time millisecondClockValue + (timeout * 1000) truncated.
+ [ self isOtherEndConnected and: [ deadline - Time millisecondClockValue > 0 ] ]
+ whileTrue: [
+ self discardReceivedData.
+ "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
+ readSemaphore waitTimeoutMSecs:
+ (deadline - Time millisecondClockValue min: self class maximumReadSemaphoreWaitTimeout) ].
+ ^self isOtherEndConnected!
- | startTime msecsDelta status |
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated.
- status := self primSocketConnectionStatus: socketHandle.
- [((status == Connected) or: [(status == ThisEndClosed)]) and:
- [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
- self discardReceivedData.
- "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
- readSemaphore waitTimeoutMSecs:
- (msecsDelta - (Time millisecondsSince: startTime) min: self class maximumReadSemaphoreWaitTimeout).
- status := self primSocketConnectionStatus: socketHandle].
- ^ status ~= Connected!

Item was changed:
  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
  waitForSendDoneFor: timeout
  "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
 
+ | deadline timeleft |
+ deadline := Time millisecondClockValue + (timeout * 1000) truncated.
- | startTime msecsDelta msecsEllapsed |
- startTime := Time millisecondClockValue.
- msecsDelta := (timeout * 1000) truncated.
  [
  (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
+ self isThisEndConnected ifFalse: [ ^false ].
+ (timeleft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ].
+ writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
- self isConnected ifFalse: [ ^false ].
- (msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta ifFalse: [ ^false ].
- writeSemaphore waitTimeoutMSecs: msecsDelta - msecsEllapsed ] repeat!


Reply | Threaded
Open this post in threaded view
|

Re: The Inbox: Network-ul.180.mcz

Levente Uzonyi
Hi All,

This is something to test on all platforms before push. I've used it on
64-bit linux without problems so far.

Levente

On Mon, 25 Jul 2016, [hidden email] wrote:

> Levente Uzonyi uploaded a new version of Network to project The Inbox:
> http://source.squeak.org/inbox/Network-ul.180.mcz
>
> ==================== Summary ====================
>
> Name: Network-ul.180
> Author: ul
> Time: 25 July 2016, 8:40:01.001452 pm
> UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95
> Ancestors: Network-nice.179
>
> Socket changes:
> - fixed the comment of #isOtherEndConnected and #isThisEndConnected
> - do not slice the data (TCP) in the image in #sendData:. Let the VM, the kernel, the hardware deal with that.
> - use #isOtherEndConnected when receiving data, and #isThisEndConnected when sending data instead of #isConnected
> - move away from #milliseconds:since:, since we have a clock that won't roll over
>
> =============== Diff against Network-nice.179 ===============
>
> Item was changed:
>  ----- Method: Socket>>closeAndDestroy: (in category 'connection open/close') -----
>  closeAndDestroy: timeoutSeconds
>   "First, try to close this connection gracefully. If the close attempt fails or times out, abort the connection. In either case, destroy the socket. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
>
> + socketHandle ifNil: [ ^self ].
> + self isThisEndConnected ifTrue: [
> + self close.  "Close this end." ].
> + (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
> + "The other end has not closed the connect yet, so we will just abort it."
> + self primSocketAbortConnection: socketHandle ].
> + self destroy!
> - socketHandle ifNotNil: [
> - self isConnected ifTrue: [
> - self close.  "close this end"
> - (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
> - "The other end didn't close so we just abort the connection"
> - self primSocketAbortConnection: socketHandle]].
> - self destroy].
> - !
>
> Item was changed:
>  ----- Method: Socket>>discardReceivedData (in category 'receiving') -----
>  discardReceivedData
>   "Discard any data received up until now, and return the number of bytes discarded."
>
>   | buf totalBytesDiscarded |
>   buf := String new: 10000.
>   totalBytesDiscarded := 0.
> + [self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
> - [self isConnected and: [self dataAvailable]] whileTrue: [
>   totalBytesDiscarded :=
>   totalBytesDiscarded + (self receiveDataInto: buf)].
>   ^ totalBytesDiscarded
>  !
>
> Item was changed:
>  ----- Method: Socket>>isOtherEndConnected (in category 'queries') -----
>  isOtherEndConnected
> + "Return true if this socket is connected, or this end has closed the connection but not the other end, so we can still receive data."
> - "Return true if this socket is connected, or this end has closed the connection but not the other end, so we can still send data."
>
>   | state |
>   socketHandle ifNil: [ ^false ].
>   (state := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ].
>   ^state == ThisEndClosed
>  !
>
> Item was changed:
>  ----- Method: Socket>>isThisEndConnected (in category 'queries') -----
>  isThisEndConnected
> + "Return true if this socket is connected, other the other end has closed the connection but not this end, so we can still send data."
> - "Return true if this socket is connected, other the other end has closed the connection but not this end, so we can still receive data."
>
>   | state |
>   socketHandle ifNil: [ ^false ].
>   (state := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ].
>   ^state == OtherEndClosed
>  !
>
> Item was changed:
>  ----- Method: Socket>>sendData: (in category 'sending') -----
>  sendData: aStringOrByteArray
>   "Send all of the data in the given array, even if it requires multiple calls to send it all. Return the number of bytes sent."
>
>   "An experimental version use on slow lines: Longer timeout and smaller writes to try to avoid spurious timeouts."
>
>   | bytesSent bytesToSend count |
>   bytesToSend := aStringOrByteArray size.
>   bytesSent := 0.
>   [bytesSent < bytesToSend] whileTrue: [
>   (self waitForSendDoneFor: 60)
>   ifFalse: [ConnectionTimedOut signal: 'send data timeout; data not sent'].
>   count := self primSocket: socketHandle
>   sendData: aStringOrByteArray
>   startIndex: bytesSent + 1
> + count: bytesToSend - bytesSent.
> - count: (bytesToSend - bytesSent min: DefaultSendBufferSize).
>   bytesSent := bytesSent + count].
>
>   ^ bytesSent
>  !
>
> Item was changed:
>  ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in category 'waiting') -----
>  waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused: refusedBlock
>   "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
>
> + | deadline timeLeft status |
> + deadline := Time millisecondClockValue + (timeout * 1000) truncated.
> + (status := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [^true].
> + [ (status == WaitingForConnection) and: [ (timeLeft := deadline - Time millisecondClockValue) > 0 ] ]
> - | startTime msecsDelta msecsEllapsed status |
> - startTime := Time millisecondClockValue.
> - msecsDelta := (timeout * 1000) truncated.
> - status := self primSocketConnectionStatus: socketHandle.
> - status = Connected ifTrue: [^true].
> - [(status = WaitingForConnection) and: [(msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta]]
>   whileTrue: [
> + semaphore waitTimeoutMSecs: timeLeft.
> + status := self primSocketConnectionStatus: socketHandle ].
> + status == Connected ifTrue: [ ^true ].
> + status == WaitingForConnection
> + ifTrue: [ timeoutBlock value ]
> + ifFalse: [ refusedBlock value ].
> + ^false!
> - semaphore waitTimeoutMSecs: msecsDelta - msecsEllapsed.
> - status := self primSocketConnectionStatus: socketHandle].
> - status = Connected
> - ifFalse: [
> - status = WaitingForConnection
> - ifTrue: [timeoutBlock value]
> - ifFalse: [refusedBlock value].
> - ^false].
> - ^ true!
>
> Item was changed:
>  ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting') -----
>  waitForConnectionUntil: deadline
>   "Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
>
> + | status timeLeft |
> - | status waitTime |
>   [
>   (status := self primSocketConnectionStatus: socketHandle) == Connected ifTrue: [ ^true ].
>   status == WaitingForConnection ifFalse: [ ^false ].
> + (timeLeft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ].
> + semaphore waitTimeoutMSecs: timeLeft ] repeat!
> - (waitTime := deadline - Time millisecondClockValue) > 0 ifFalse: [ ^false ].
> - semaphore waitTimeoutMSecs: waitTime ] repeat!
>
> Item was changed:
>  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category 'waiting') -----
>  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
>   "Wait for the given nr of seconds for data to arrive."
>
> + | deadline timeLeft |
> - | startTime msecsDelta |
>   socketHandle ifNil: [ ^closedBlock value ].
> + deadline := Time millisecondClockValue + (timeout * 1000) truncated.
> - startTime := Time millisecondClockValue.
> - msecsDelta := (timeout * 1000) truncated.
>   [
>   (self primSocketReceiveDataAvailable: socketHandle) ifTrue: [ ^self ].
> + self isOtherEndConnected ifFalse: [ ^closedBlock value ].
> + (timeLeft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^timedOutBlock value ].
> - self isConnected ifFalse: [ ^closedBlock value ].
> - (Time millisecondsSince: startTime) < msecsDelta ifFalse: [ ^timedOutBlock value ].
>   "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
>   readSemaphore waitTimeoutMSecs:
> + (timeLeft min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
> - (msecsDelta - (Time millisecondsSince: startTime) min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
>
> Item was changed:
>  ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') -----
>  waitForDataIfClosed: closedBlock
>   "Wait indefinitely for data to arrive.  This method will block until
>   data is available or the socket is closed."
>
>   socketHandle ifNil: [ ^closedBlock value ].
>   [
>   (self primSocketReceiveDataAvailable: socketHandle) ifTrue: [ ^self ].
> + self isOtherEndConnected ifFalse: [ ^closedBlock value ].
> - self isConnected ifFalse: [ ^closedBlock value ].
>   "ul 8/13/2014 21:16
>    Providing a maximum for the time for waiting is a workaround for a VM bug which
>    causes sockets waiting for data forever in some rare cases, because the semaphore
>    doesn't get signaled. Replace the ""waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout""
>    part with ""wait"" when the bug is fixed."
>   readSemaphore waitTimeoutMSecs: self class maximumReadSemaphoreWaitTimeout ] repeat!
>
> Item was changed:
>  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting') -----
>  waitForDisconnectionFor: timeout
>   "Wait for the given nr of seconds for the connection to be broken.
>   Return true if it is broken by the deadline, false if not.
>   The client should know the connection is really going to be closed
>   (e.g., because he has called 'close' to send a close request to the other end)
>   before calling this method."
>
> + | deadline |
> + deadline := Time millisecondClockValue + (timeout * 1000) truncated.
> + [ self isOtherEndConnected and: [ deadline - Time millisecondClockValue > 0 ] ]
> + whileTrue: [
> + self discardReceivedData.
> + "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
> + readSemaphore waitTimeoutMSecs:
> + (deadline - Time millisecondClockValue min: self class maximumReadSemaphoreWaitTimeout) ].
> + ^self isOtherEndConnected!
> - | startTime msecsDelta status |
> - startTime := Time millisecondClockValue.
> - msecsDelta := (timeout * 1000) truncated.
> - status := self primSocketConnectionStatus: socketHandle.
> - [((status == Connected) or: [(status == ThisEndClosed)]) and:
> - [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
> - self discardReceivedData.
> - "Providing a maximum for the time for waiting is a workaround for a VM bug which causes sockets waiting for data forever in some rare cases, because the semaphore doesn't get signaled. Remove the ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is fixed."
> - readSemaphore waitTimeoutMSecs:
> - (msecsDelta - (Time millisecondsSince: startTime) min: self class maximumReadSemaphoreWaitTimeout).
> - status := self primSocketConnectionStatus: socketHandle].
> - ^ status ~= Connected!
>
> Item was changed:
>  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
>  waitForSendDoneFor: timeout
>   "Wait up until the given deadline for the current send operation to complete. Return true if it completes by the deadline, false if not."
>
> + | deadline timeleft |
> + deadline := Time millisecondClockValue + (timeout * 1000) truncated.
> - | startTime msecsDelta msecsEllapsed |
> - startTime := Time millisecondClockValue.
> - msecsDelta := (timeout * 1000) truncated.
>   [
>   (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
> + self isThisEndConnected ifFalse: [ ^false ].
> + (timeleft := deadline - Time millisecondClockValue) <= 0 ifTrue: [ ^false ].
> + writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
> - self isConnected ifFalse: [ ^false ].
> - (msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta ifFalse: [ ^false ].
> - writeSemaphore waitTimeoutMSecs: msecsDelta - msecsEllapsed ] repeat!
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: The Inbox: Network-ul.180.mcz

Chris Muller-3
Hi Levente, Magma's test cases can't seem to get to the end with
this..  I haven't investigated yet..

On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi <[hidden email]> wrote:

> Hi All,
>
> This is something to test on all platforms before push. I've used it on
> 64-bit linux without problems so far.
>
> Levente
>
>
> On Mon, 25 Jul 2016, [hidden email] wrote:
>
>> Levente Uzonyi uploaded a new version of Network to project The Inbox:
>> http://source.squeak.org/inbox/Network-ul.180.mcz
>>
>> ==================== Summary ====================
>>
>> Name: Network-ul.180
>> Author: ul
>> Time: 25 July 2016, 8:40:01.001452 pm
>> UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95
>> Ancestors: Network-nice.179
>>
>> Socket changes:
>> - fixed the comment of #isOtherEndConnected and #isThisEndConnected
>> - do not slice the data (TCP) in the image in #sendData:. Let the VM, the
>> kernel, the hardware deal with that.
>> - use #isOtherEndConnected when receiving data, and #isThisEndConnected
>> when sending data instead of #isConnected
>> - move away from #milliseconds:since:, since we have a clock that won't
>> roll over
>>
>> =============== Diff against Network-nice.179 ===============
>>
>> Item was changed:
>>  ----- Method: Socket>>closeAndDestroy: (in category 'connection
>> open/close') -----
>>  closeAndDestroy: timeoutSeconds
>>         "First, try to close this connection gracefully. If the close
>> attempt fails or times out, abort the connection. In either case, destroy
>> the socket. Do nothing if the socket has already been destroyed (i.e., if
>> its socketHandle is nil)."
>>
>> +       socketHandle ifNil: [ ^self ].
>> +       self isThisEndConnected ifTrue: [
>> +               self close.  "Close this end." ].
>> +       (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
>> +               "The other end has not closed the connect yet, so we will
>> just abort it."
>> +               self primSocketAbortConnection: socketHandle ].
>> +       self destroy!
>> -       socketHandle ifNotNil: [
>> -                       self isConnected ifTrue: [
>> -                               self close.  "close this end"
>> -                               (self waitForDisconnectionFor:
>> timeoutSeconds) ifFalse: [
>> -                                               "The other end didn't
>> close so we just abort the connection"
>> -                                               self
>> primSocketAbortConnection: socketHandle]].
>> -                       self destroy].
>> - !
>>
>> Item was changed:
>>  ----- Method: Socket>>discardReceivedData (in category 'receiving') -----
>>  discardReceivedData
>>         "Discard any data received up until now, and return the number of
>> bytes discarded."
>>
>>         | buf totalBytesDiscarded |
>>         buf := String new: 10000.
>>         totalBytesDiscarded := 0.
>> +       [self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
>> -       [self isConnected and: [self dataAvailable]] whileTrue: [
>>                 totalBytesDiscarded :=
>>                         totalBytesDiscarded + (self receiveDataInto:
>> buf)].
>>         ^ totalBytesDiscarded
>>  !
>>
>> Item was changed:
>>  ----- Method: Socket>>isOtherEndConnected (in category 'queries') -----
>>  isOtherEndConnected
>> +       "Return true if this socket is connected, or this end has closed
>> the connection but not the other end, so we can still receive data."
>> -       "Return true if this socket is connected, or this end has closed
>> the connection but not the other end, so we can still send data."
>>
>>         | state |
>>         socketHandle ifNil: [ ^false ].
>>         (state := self primSocketConnectionStatus: socketHandle) ==
>> Connected ifTrue: [ ^true ].
>>         ^state == ThisEndClosed
>>  !
>>
>> Item was changed:
>>  ----- Method: Socket>>isThisEndConnected (in category 'queries') -----
>>  isThisEndConnected
>> +       "Return true if this socket is connected, other the other end has
>> closed the connection but not this end, so we can still send data."
>> -       "Return true if this socket is connected, other the other end has
>> closed the connection but not this end, so we can still receive data."
>>
>>         | state |
>>         socketHandle ifNil: [ ^false ].
>>         (state := self primSocketConnectionStatus: socketHandle) ==
>> Connected ifTrue: [ ^true ].
>>         ^state == OtherEndClosed
>>  !
>>
>> Item was changed:
>>  ----- Method: Socket>>sendData: (in category 'sending') -----
>>  sendData: aStringOrByteArray
>>         "Send all of the data in the given array, even if it requires
>> multiple calls to send it all. Return the number of bytes sent."
>>
>>         "An experimental version use on slow lines: Longer timeout and
>> smaller writes to try to avoid spurious timeouts."
>>
>>         | bytesSent bytesToSend count |
>>         bytesToSend := aStringOrByteArray size.
>>         bytesSent := 0.
>>         [bytesSent < bytesToSend] whileTrue: [
>>                 (self waitForSendDoneFor: 60)
>>                         ifFalse: [ConnectionTimedOut signal: 'send data
>> timeout; data not sent'].
>>                 count := self primSocket: socketHandle
>>                         sendData: aStringOrByteArray
>>                         startIndex: bytesSent + 1
>> +                       count: bytesToSend - bytesSent.
>> -                       count: (bytesToSend - bytesSent min:
>> DefaultSendBufferSize).
>>                 bytesSent := bytesSent + count].
>>
>>         ^ bytesSent
>>  !
>>
>> Item was changed:
>>  ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in
>> category 'waiting') -----
>>  waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused:
>> refusedBlock
>>         "Wait up until the given deadline for a connection to be
>> established. Return true if it is established by the deadline, false if
>> not."
>>
>> +       | deadline timeLeft status |
>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>> truncated.
>> +       (status := self primSocketConnectionStatus: socketHandle) ==
>> Connected ifTrue: [^true].
>> +       [ (status == WaitingForConnection) and: [ (timeLeft := deadline -
>> Time millisecondClockValue) > 0 ] ]
>> -       | startTime msecsDelta msecsEllapsed status |
>> -       startTime := Time millisecondClockValue.
>> -       msecsDelta := (timeout * 1000) truncated.
>> -       status := self primSocketConnectionStatus: socketHandle.
>> -       status = Connected ifTrue: [^true].
>> -       [(status = WaitingForConnection) and: [(msecsEllapsed := Time
>> millisecondsSince: startTime) < msecsDelta]]
>>                 whileTrue: [
>> +                       semaphore waitTimeoutMSecs: timeLeft.
>> +                       status := self primSocketConnectionStatus:
>> socketHandle ].
>> +       status == Connected ifTrue: [ ^true ].
>> +       status == WaitingForConnection
>> +               ifTrue: [ timeoutBlock value ]
>> +               ifFalse: [ refusedBlock value ].
>> +       ^false!
>> -                       semaphore waitTimeoutMSecs: msecsDelta -
>> msecsEllapsed.
>> -                       status := self primSocketConnectionStatus:
>> socketHandle].
>> -       status = Connected
>> -               ifFalse: [
>> -                       status = WaitingForConnection
>> -                               ifTrue: [timeoutBlock value]
>> -                               ifFalse: [refusedBlock value].
>> -                       ^false].
>> -       ^ true!
>>
>> Item was changed:
>>  ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
>> -----
>>  waitForConnectionUntil: deadline
>>         "Wait up until the given deadline for a connection to be
>> established. Return true if it is established by the deadline, false if
>> not."
>>
>> +       | status timeLeft |
>> -       | status waitTime |
>>         [
>>                 (status := self primSocketConnectionStatus: socketHandle)
>> == Connected ifTrue: [ ^true ].
>>                 status == WaitingForConnection ifFalse: [ ^false ].
>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>> ifTrue: [ ^false ].
>> +               semaphore waitTimeoutMSecs: timeLeft ] repeat!
>> -               (waitTime := deadline - Time millisecondClockValue) > 0
>> ifFalse: [ ^false ].
>> -               semaphore waitTimeoutMSecs: waitTime ] repeat!
>>
>> Item was changed:
>>  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category
>> 'waiting') -----
>>  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
>>         "Wait for the given nr of seconds for data to arrive."
>>
>> +       | deadline timeLeft |
>> -       | startTime msecsDelta |
>>         socketHandle ifNil: [ ^closedBlock value ].
>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>> truncated.
>> -       startTime := Time millisecondClockValue.
>> -       msecsDelta := (timeout * 1000) truncated.
>>         [
>>                 (self primSocketReceiveDataAvailable: socketHandle)
>> ifTrue: [ ^self ].
>> +               self isOtherEndConnected ifFalse: [ ^closedBlock value ].
>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>> ifTrue: [ ^timedOutBlock value ].
>> -               self isConnected ifFalse: [ ^closedBlock value ].
>> -               (Time millisecondsSince: startTime) < msecsDelta ifFalse:
>> [ ^timedOutBlock value ].
>>                 "Providing a maximum for the time for waiting is a
>> workaround for a VM bug which causes sockets waiting for data forever in
>> some rare cases, because the semaphore doesn't get signaled. Remove the
>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>> fixed."
>>                 readSemaphore waitTimeoutMSecs:
>> +                       (timeLeft min: self class
>> maximumReadSemaphoreWaitTimeout) ] repeat!
>> -                       (msecsDelta - (Time millisecondsSince: startTime)
>> min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
>>
>> Item was changed:
>>  ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') -----
>>  waitForDataIfClosed: closedBlock
>>         "Wait indefinitely for data to arrive.  This method will block
>> until
>>         data is available or the socket is closed."
>>
>>         socketHandle ifNil: [ ^closedBlock value ].
>>         [
>>                 (self primSocketReceiveDataAvailable: socketHandle)
>> ifTrue: [ ^self ].
>> +                self isOtherEndConnected ifFalse: [ ^closedBlock value ].
>> -                self isConnected ifFalse: [ ^closedBlock value ].
>>                  "ul 8/13/2014 21:16
>>                   Providing a maximum for the time for waiting is a
>> workaround for a VM bug which
>>                   causes sockets waiting for data forever in some rare
>> cases, because the semaphore
>>                   doesn't get signaled. Replace the ""waitTimeoutMSecs:
>> self class maximumReadSemaphoreWaitTimeout""
>>                   part with ""wait"" when the bug is fixed."
>>                  readSemaphore waitTimeoutMSecs: self class
>> maximumReadSemaphoreWaitTimeout ] repeat!
>>
>> Item was changed:
>>  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
>> -----
>>  waitForDisconnectionFor: timeout
>>         "Wait for the given nr of seconds for the connection to be broken.
>>         Return true if it is broken by the deadline, false if not.
>>         The client should know the connection is really going to be closed
>>         (e.g., because he has called 'close' to send a close request to
>> the other end)
>>         before calling this method."
>>
>> +       | deadline |
>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>> truncated.
>> +       [ self isOtherEndConnected and: [ deadline - Time
>> millisecondClockValue > 0 ] ]
>> +               whileTrue: [
>> +                       self discardReceivedData.
>> +                       "Providing a maximum for the time for waiting is a
>> workaround for a VM bug which causes sockets waiting for data forever in
>> some rare cases, because the semaphore doesn't get signaled. Remove the
>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>> fixed."
>> +                       readSemaphore waitTimeoutMSecs:
>> +                               (deadline - Time millisecondClockValue
>> min: self class maximumReadSemaphoreWaitTimeout) ].
>> +       ^self isOtherEndConnected!
>> -       | startTime msecsDelta status |
>> -       startTime := Time millisecondClockValue.
>> -       msecsDelta := (timeout * 1000) truncated.
>> -       status := self primSocketConnectionStatus: socketHandle.
>> -       [((status == Connected) or: [(status == ThisEndClosed)]) and:
>> -        [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
>> -               self discardReceivedData.
>> -               "Providing a maximum for the time for waiting is a
>> workaround for a VM bug which causes sockets waiting for data forever in
>> some rare cases, because the semaphore doesn't get signaled. Remove the
>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>> fixed."
>> -               readSemaphore waitTimeoutMSecs:
>> -                       (msecsDelta - (Time millisecondsSince: startTime)
>> min: self class maximumReadSemaphoreWaitTimeout).
>> -               status := self primSocketConnectionStatus: socketHandle].
>> -       ^ status ~= Connected!
>>
>> Item was changed:
>>  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
>>  waitForSendDoneFor: timeout
>>         "Wait up until the given deadline for the current send operation
>> to complete. Return true if it completes by the deadline, false if not."
>>
>> +       | deadline timeleft |
>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>> truncated.
>> -       | startTime msecsDelta msecsEllapsed |
>> -       startTime := Time millisecondClockValue.
>> -       msecsDelta := (timeout * 1000) truncated.
>>         [
>>                 (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
>> +               self isThisEndConnected ifFalse: [ ^false ].
>> +               (timeleft := deadline - Time millisecondClockValue) <= 0
>> ifTrue: [ ^false ].
>> +               writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
>> -               self isConnected ifFalse: [ ^false ].
>> -               (msecsEllapsed := Time millisecondsSince: startTime) <
>> msecsDelta ifFalse: [ ^false ].
>> -               writeSemaphore waitTimeoutMSecs: msecsDelta -
>> msecsEllapsed ] repeat!
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: The Inbox: Network-ul.180.mcz

Levente Uzonyi
Hi Chris,

I decide to move this to the Trunk because the feature freeze is here.
This should also help getting more feedback. :)

Levente

On Mon, 25 Jul 2016, Chris Muller wrote:

> Hi Levente, Magma's test cases can't seem to get to the end with
> this..  I haven't investigated yet..
>
> On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi <[hidden email]> wrote:
>> Hi All,
>>
>> This is something to test on all platforms before push. I've used it on
>> 64-bit linux without problems so far.
>>
>> Levente
>>
>>
>> On Mon, 25 Jul 2016, [hidden email] wrote:
>>
>>> Levente Uzonyi uploaded a new version of Network to project The Inbox:
>>> http://source.squeak.org/inbox/Network-ul.180.mcz
>>>
>>> ==================== Summary ====================
>>>
>>> Name: Network-ul.180
>>> Author: ul
>>> Time: 25 July 2016, 8:40:01.001452 pm
>>> UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95
>>> Ancestors: Network-nice.179
>>>
>>> Socket changes:
>>> - fixed the comment of #isOtherEndConnected and #isThisEndConnected
>>> - do not slice the data (TCP) in the image in #sendData:. Let the VM, the
>>> kernel, the hardware deal with that.
>>> - use #isOtherEndConnected when receiving data, and #isThisEndConnected
>>> when sending data instead of #isConnected
>>> - move away from #milliseconds:since:, since we have a clock that won't
>>> roll over
>>>
>>> =============== Diff against Network-nice.179 ===============
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>closeAndDestroy: (in category 'connection
>>> open/close') -----
>>>  closeAndDestroy: timeoutSeconds
>>>         "First, try to close this connection gracefully. If the close
>>> attempt fails or times out, abort the connection. In either case, destroy
>>> the socket. Do nothing if the socket has already been destroyed (i.e., if
>>> its socketHandle is nil)."
>>>
>>> +       socketHandle ifNil: [ ^self ].
>>> +       self isThisEndConnected ifTrue: [
>>> +               self close.  "Close this end." ].
>>> +       (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
>>> +               "The other end has not closed the connect yet, so we will
>>> just abort it."
>>> +               self primSocketAbortConnection: socketHandle ].
>>> +       self destroy!
>>> -       socketHandle ifNotNil: [
>>> -                       self isConnected ifTrue: [
>>> -                               self close.  "close this end"
>>> -                               (self waitForDisconnectionFor:
>>> timeoutSeconds) ifFalse: [
>>> -                                               "The other end didn't
>>> close so we just abort the connection"
>>> -                                               self
>>> primSocketAbortConnection: socketHandle]].
>>> -                       self destroy].
>>> - !
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>discardReceivedData (in category 'receiving') -----
>>>  discardReceivedData
>>>         "Discard any data received up until now, and return the number of
>>> bytes discarded."
>>>
>>>         | buf totalBytesDiscarded |
>>>         buf := String new: 10000.
>>>         totalBytesDiscarded := 0.
>>> +       [self isOtherEndConnected and: [self dataAvailable]] whileTrue: [
>>> -       [self isConnected and: [self dataAvailable]] whileTrue: [
>>>                 totalBytesDiscarded :=
>>>                         totalBytesDiscarded + (self receiveDataInto:
>>> buf)].
>>>         ^ totalBytesDiscarded
>>>  !
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>isOtherEndConnected (in category 'queries') -----
>>>  isOtherEndConnected
>>> +       "Return true if this socket is connected, or this end has closed
>>> the connection but not the other end, so we can still receive data."
>>> -       "Return true if this socket is connected, or this end has closed
>>> the connection but not the other end, so we can still send data."
>>>
>>>         | state |
>>>         socketHandle ifNil: [ ^false ].
>>>         (state := self primSocketConnectionStatus: socketHandle) ==
>>> Connected ifTrue: [ ^true ].
>>>         ^state == ThisEndClosed
>>>  !
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>isThisEndConnected (in category 'queries') -----
>>>  isThisEndConnected
>>> +       "Return true if this socket is connected, other the other end has
>>> closed the connection but not this end, so we can still send data."
>>> -       "Return true if this socket is connected, other the other end has
>>> closed the connection but not this end, so we can still receive data."
>>>
>>>         | state |
>>>         socketHandle ifNil: [ ^false ].
>>>         (state := self primSocketConnectionStatus: socketHandle) ==
>>> Connected ifTrue: [ ^true ].
>>>         ^state == OtherEndClosed
>>>  !
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>sendData: (in category 'sending') -----
>>>  sendData: aStringOrByteArray
>>>         "Send all of the data in the given array, even if it requires
>>> multiple calls to send it all. Return the number of bytes sent."
>>>
>>>         "An experimental version use on slow lines: Longer timeout and
>>> smaller writes to try to avoid spurious timeouts."
>>>
>>>         | bytesSent bytesToSend count |
>>>         bytesToSend := aStringOrByteArray size.
>>>         bytesSent := 0.
>>>         [bytesSent < bytesToSend] whileTrue: [
>>>                 (self waitForSendDoneFor: 60)
>>>                         ifFalse: [ConnectionTimedOut signal: 'send data
>>> timeout; data not sent'].
>>>                 count := self primSocket: socketHandle
>>>                         sendData: aStringOrByteArray
>>>                         startIndex: bytesSent + 1
>>> +                       count: bytesToSend - bytesSent.
>>> -                       count: (bytesToSend - bytesSent min:
>>> DefaultSendBufferSize).
>>>                 bytesSent := bytesSent + count].
>>>
>>>         ^ bytesSent
>>>  !
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in
>>> category 'waiting') -----
>>>  waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused:
>>> refusedBlock
>>>         "Wait up until the given deadline for a connection to be
>>> established. Return true if it is established by the deadline, false if
>>> not."
>>>
>>> +       | deadline timeLeft status |
>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>> truncated.
>>> +       (status := self primSocketConnectionStatus: socketHandle) ==
>>> Connected ifTrue: [^true].
>>> +       [ (status == WaitingForConnection) and: [ (timeLeft := deadline -
>>> Time millisecondClockValue) > 0 ] ]
>>> -       | startTime msecsDelta msecsEllapsed status |
>>> -       startTime := Time millisecondClockValue.
>>> -       msecsDelta := (timeout * 1000) truncated.
>>> -       status := self primSocketConnectionStatus: socketHandle.
>>> -       status = Connected ifTrue: [^true].
>>> -       [(status = WaitingForConnection) and: [(msecsEllapsed := Time
>>> millisecondsSince: startTime) < msecsDelta]]
>>>                 whileTrue: [
>>> +                       semaphore waitTimeoutMSecs: timeLeft.
>>> +                       status := self primSocketConnectionStatus:
>>> socketHandle ].
>>> +       status == Connected ifTrue: [ ^true ].
>>> +       status == WaitingForConnection
>>> +               ifTrue: [ timeoutBlock value ]
>>> +               ifFalse: [ refusedBlock value ].
>>> +       ^false!
>>> -                       semaphore waitTimeoutMSecs: msecsDelta -
>>> msecsEllapsed.
>>> -                       status := self primSocketConnectionStatus:
>>> socketHandle].
>>> -       status = Connected
>>> -               ifFalse: [
>>> -                       status = WaitingForConnection
>>> -                               ifTrue: [timeoutBlock value]
>>> -                               ifFalse: [refusedBlock value].
>>> -                       ^false].
>>> -       ^ true!
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
>>> -----
>>>  waitForConnectionUntil: deadline
>>>         "Wait up until the given deadline for a connection to be
>>> established. Return true if it is established by the deadline, false if
>>> not."
>>>
>>> +       | status timeLeft |
>>> -       | status waitTime |
>>>         [
>>>                 (status := self primSocketConnectionStatus: socketHandle)
>>> == Connected ifTrue: [ ^true ].
>>>                 status == WaitingForConnection ifFalse: [ ^false ].
>>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>>> ifTrue: [ ^false ].
>>> +               semaphore waitTimeoutMSecs: timeLeft ] repeat!
>>> -               (waitTime := deadline - Time millisecondClockValue) > 0
>>> ifFalse: [ ^false ].
>>> -               semaphore waitTimeoutMSecs: waitTime ] repeat!
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category
>>> 'waiting') -----
>>>  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
>>>         "Wait for the given nr of seconds for data to arrive."
>>>
>>> +       | deadline timeLeft |
>>> -       | startTime msecsDelta |
>>>         socketHandle ifNil: [ ^closedBlock value ].
>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>> truncated.
>>> -       startTime := Time millisecondClockValue.
>>> -       msecsDelta := (timeout * 1000) truncated.
>>>         [
>>>                 (self primSocketReceiveDataAvailable: socketHandle)
>>> ifTrue: [ ^self ].
>>> +               self isOtherEndConnected ifFalse: [ ^closedBlock value ].
>>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>>> ifTrue: [ ^timedOutBlock value ].
>>> -               self isConnected ifFalse: [ ^closedBlock value ].
>>> -               (Time millisecondsSince: startTime) < msecsDelta ifFalse:
>>> [ ^timedOutBlock value ].
>>>                 "Providing a maximum for the time for waiting is a
>>> workaround for a VM bug which causes sockets waiting for data forever in
>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>> fixed."
>>>                 readSemaphore waitTimeoutMSecs:
>>> +                       (timeLeft min: self class
>>> maximumReadSemaphoreWaitTimeout) ] repeat!
>>> -                       (msecsDelta - (Time millisecondsSince: startTime)
>>> min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting') -----
>>>  waitForDataIfClosed: closedBlock
>>>         "Wait indefinitely for data to arrive.  This method will block
>>> until
>>>         data is available or the socket is closed."
>>>
>>>         socketHandle ifNil: [ ^closedBlock value ].
>>>         [
>>>                 (self primSocketReceiveDataAvailable: socketHandle)
>>> ifTrue: [ ^self ].
>>> +                self isOtherEndConnected ifFalse: [ ^closedBlock value ].
>>> -                self isConnected ifFalse: [ ^closedBlock value ].
>>>                  "ul 8/13/2014 21:16
>>>                   Providing a maximum for the time for waiting is a
>>> workaround for a VM bug which
>>>                   causes sockets waiting for data forever in some rare
>>> cases, because the semaphore
>>>                   doesn't get signaled. Replace the ""waitTimeoutMSecs:
>>> self class maximumReadSemaphoreWaitTimeout""
>>>                   part with ""wait"" when the bug is fixed."
>>>                  readSemaphore waitTimeoutMSecs: self class
>>> maximumReadSemaphoreWaitTimeout ] repeat!
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
>>> -----
>>>  waitForDisconnectionFor: timeout
>>>         "Wait for the given nr of seconds for the connection to be broken.
>>>         Return true if it is broken by the deadline, false if not.
>>>         The client should know the connection is really going to be closed
>>>         (e.g., because he has called 'close' to send a close request to
>>> the other end)
>>>         before calling this method."
>>>
>>> +       | deadline |
>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>> truncated.
>>> +       [ self isOtherEndConnected and: [ deadline - Time
>>> millisecondClockValue > 0 ] ]
>>> +               whileTrue: [
>>> +                       self discardReceivedData.
>>> +                       "Providing a maximum for the time for waiting is a
>>> workaround for a VM bug which causes sockets waiting for data forever in
>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>> fixed."
>>> +                       readSemaphore waitTimeoutMSecs:
>>> +                               (deadline - Time millisecondClockValue
>>> min: self class maximumReadSemaphoreWaitTimeout) ].
>>> +       ^self isOtherEndConnected!
>>> -       | startTime msecsDelta status |
>>> -       startTime := Time millisecondClockValue.
>>> -       msecsDelta := (timeout * 1000) truncated.
>>> -       status := self primSocketConnectionStatus: socketHandle.
>>> -       [((status == Connected) or: [(status == ThisEndClosed)]) and:
>>> -        [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
>>> -               self discardReceivedData.
>>> -               "Providing a maximum for the time for waiting is a
>>> workaround for a VM bug which causes sockets waiting for data forever in
>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>> fixed."
>>> -               readSemaphore waitTimeoutMSecs:
>>> -                       (msecsDelta - (Time millisecondsSince: startTime)
>>> min: self class maximumReadSemaphoreWaitTimeout).
>>> -               status := self primSocketConnectionStatus: socketHandle].
>>> -       ^ status ~= Connected!
>>>
>>> Item was changed:
>>>  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
>>>  waitForSendDoneFor: timeout
>>>         "Wait up until the given deadline for the current send operation
>>> to complete. Return true if it completes by the deadline, false if not."
>>>
>>> +       | deadline timeleft |
>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>> truncated.
>>> -       | startTime msecsDelta msecsEllapsed |
>>> -       startTime := Time millisecondClockValue.
>>> -       msecsDelta := (timeout * 1000) truncated.
>>>         [
>>>                 (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
>>> +               self isThisEndConnected ifFalse: [ ^false ].
>>> +               (timeleft := deadline - Time millisecondClockValue) <= 0
>>> ifTrue: [ ^false ].
>>> +               writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
>>> -               self isConnected ifFalse: [ ^false ].
>>> -               (msecsEllapsed := Time millisecondsSince: startTime) <
>>> msecsDelta ifFalse: [ ^false ].
>>> -               writeSemaphore waitTimeoutMSecs: msecsDelta -
>>> msecsEllapsed ] repeat!
>>>
>>>
>>>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: The Inbox: Network-ul.180.mcz

Chris Muller-3
Hi Levente, I've narrowed Magma's problem with these changes down to
one line in Socket>>#waitForSendDoneFor:.  If I revert the call to
#isThisEndConnected back to #isConnected, Magma's test suite works
fine.

waitForSendDoneFor: timeout
    "Wait up until the given deadline for the current send operation
to complete. Return true if it completes by the deadline, false if
not."

    | deadline timeleft |
    deadline := Time millisecondClockValue + (timeout * 1000) truncated.
    [
        (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
        self isThisEndConnected ifFalse: [ ^false ].  "<---- want to
go back to #isConnected for this line"
        (timeleft := deadline - Time millisecondClockValue) <= 0
ifTrue: [ ^false ].
        writeSemaphore waitTimeoutMSecs: timeleft ] repeat

It affects Magma's High-Availability test cases, which involve
terminating the server while its busy serving clients.  For each
release of Squeak, I put out a new release of Magma that works OOTB.
I would very much like to be able to do that this time too, without
confusion of a changeset / modified system for Magma users.  Would you
be willing to revert this one line to buy us time to explore this
change together under greater scrutiny for the duration of the next
release of Squeak, instead of this release?

Best,
  Chris

On Sun, Jul 31, 2016 at 8:39 PM, Levente Uzonyi <[hidden email]> wrote:

> Hi Chris,
>
> I decide to move this to the Trunk because the feature freeze is here. This
> should also help getting more feedback. :)
>
> Levente
>
>
> On Mon, 25 Jul 2016, Chris Muller wrote:
>
>> Hi Levente, Magma's test cases can't seem to get to the end with
>> this..  I haven't investigated yet..
>>
>> On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi <[hidden email]>
>> wrote:
>>>
>>> Hi All,
>>>
>>> This is something to test on all platforms before push. I've used it on
>>> 64-bit linux without problems so far.
>>>
>>> Levente
>>>
>>>
>>> On Mon, 25 Jul 2016, [hidden email] wrote:
>>>
>>>> Levente Uzonyi uploaded a new version of Network to project The Inbox:
>>>> http://source.squeak.org/inbox/Network-ul.180.mcz
>>>>
>>>> ==================== Summary ====================
>>>>
>>>> Name: Network-ul.180
>>>> Author: ul
>>>> Time: 25 July 2016, 8:40:01.001452 pm
>>>> UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95
>>>> Ancestors: Network-nice.179
>>>>
>>>> Socket changes:
>>>> - fixed the comment of #isOtherEndConnected and #isThisEndConnected
>>>> - do not slice the data (TCP) in the image in #sendData:. Let the VM,
>>>> the
>>>> kernel, the hardware deal with that.
>>>> - use #isOtherEndConnected when receiving data, and #isThisEndConnected
>>>> when sending data instead of #isConnected
>>>> - move away from #milliseconds:since:, since we have a clock that won't
>>>> roll over
>>>>
>>>> =============== Diff against Network-nice.179 ===============
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>closeAndDestroy: (in category 'connection
>>>> open/close') -----
>>>>  closeAndDestroy: timeoutSeconds
>>>>         "First, try to close this connection gracefully. If the close
>>>> attempt fails or times out, abort the connection. In either case,
>>>> destroy
>>>> the socket. Do nothing if the socket has already been destroyed (i.e.,
>>>> if
>>>> its socketHandle is nil)."
>>>>
>>>> +       socketHandle ifNil: [ ^self ].
>>>> +       self isThisEndConnected ifTrue: [
>>>> +               self close.  "Close this end." ].
>>>> +       (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
>>>> +               "The other end has not closed the connect yet, so we
>>>> will
>>>> just abort it."
>>>> +               self primSocketAbortConnection: socketHandle ].
>>>> +       self destroy!
>>>> -       socketHandle ifNotNil: [
>>>> -                       self isConnected ifTrue: [
>>>> -                               self close.  "close this end"
>>>> -                               (self waitForDisconnectionFor:
>>>> timeoutSeconds) ifFalse: [
>>>> -                                               "The other end didn't
>>>> close so we just abort the connection"
>>>> -                                               self
>>>> primSocketAbortConnection: socketHandle]].
>>>> -                       self destroy].
>>>> - !
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>discardReceivedData (in category 'receiving')
>>>> -----
>>>>  discardReceivedData
>>>>         "Discard any data received up until now, and return the number
>>>> of
>>>> bytes discarded."
>>>>
>>>>         | buf totalBytesDiscarded |
>>>>         buf := String new: 10000.
>>>>         totalBytesDiscarded := 0.
>>>> +       [self isOtherEndConnected and: [self dataAvailable]] whileTrue:
>>>> [
>>>> -       [self isConnected and: [self dataAvailable]] whileTrue: [
>>>>                 totalBytesDiscarded :=
>>>>                         totalBytesDiscarded + (self receiveDataInto:
>>>> buf)].
>>>>         ^ totalBytesDiscarded
>>>>  !
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>isOtherEndConnected (in category 'queries') -----
>>>>  isOtherEndConnected
>>>> +       "Return true if this socket is connected, or this end has closed
>>>> the connection but not the other end, so we can still receive data."
>>>> -       "Return true if this socket is connected, or this end has closed
>>>> the connection but not the other end, so we can still send data."
>>>>
>>>>         | state |
>>>>         socketHandle ifNil: [ ^false ].
>>>>         (state := self primSocketConnectionStatus: socketHandle) ==
>>>> Connected ifTrue: [ ^true ].
>>>>         ^state == ThisEndClosed
>>>>  !
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>isThisEndConnected (in category 'queries') -----
>>>>  isThisEndConnected
>>>> +       "Return true if this socket is connected, other the other end
>>>> has
>>>> closed the connection but not this end, so we can still send data."
>>>> -       "Return true if this socket is connected, other the other end
>>>> has
>>>> closed the connection but not this end, so we can still receive data."
>>>>
>>>>         | state |
>>>>         socketHandle ifNil: [ ^false ].
>>>>         (state := self primSocketConnectionStatus: socketHandle) ==
>>>> Connected ifTrue: [ ^true ].
>>>>         ^state == OtherEndClosed
>>>>  !
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>sendData: (in category 'sending') -----
>>>>  sendData: aStringOrByteArray
>>>>         "Send all of the data in the given array, even if it requires
>>>> multiple calls to send it all. Return the number of bytes sent."
>>>>
>>>>         "An experimental version use on slow lines: Longer timeout and
>>>> smaller writes to try to avoid spurious timeouts."
>>>>
>>>>         | bytesSent bytesToSend count |
>>>>         bytesToSend := aStringOrByteArray size.
>>>>         bytesSent := 0.
>>>>         [bytesSent < bytesToSend] whileTrue: [
>>>>                 (self waitForSendDoneFor: 60)
>>>>                         ifFalse: [ConnectionTimedOut signal: 'send data
>>>> timeout; data not sent'].
>>>>                 count := self primSocket: socketHandle
>>>>                         sendData: aStringOrByteArray
>>>>                         startIndex: bytesSent + 1
>>>> +                       count: bytesToSend - bytesSent.
>>>> -                       count: (bytesToSend - bytesSent min:
>>>> DefaultSendBufferSize).
>>>>                 bytesSent := bytesSent + count].
>>>>
>>>>         ^ bytesSent
>>>>  !
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in
>>>> category 'waiting') -----
>>>>  waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused:
>>>> refusedBlock
>>>>         "Wait up until the given deadline for a connection to be
>>>> established. Return true if it is established by the deadline, false if
>>>> not."
>>>>
>>>> +       | deadline timeLeft status |
>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>> truncated.
>>>> +       (status := self primSocketConnectionStatus: socketHandle) ==
>>>> Connected ifTrue: [^true].
>>>> +       [ (status == WaitingForConnection) and: [ (timeLeft := deadline
>>>> -
>>>> Time millisecondClockValue) > 0 ] ]
>>>> -       | startTime msecsDelta msecsEllapsed status |
>>>> -       startTime := Time millisecondClockValue.
>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>> -       status := self primSocketConnectionStatus: socketHandle.
>>>> -       status = Connected ifTrue: [^true].
>>>> -       [(status = WaitingForConnection) and: [(msecsEllapsed := Time
>>>> millisecondsSince: startTime) < msecsDelta]]
>>>>                 whileTrue: [
>>>> +                       semaphore waitTimeoutMSecs: timeLeft.
>>>> +                       status := self primSocketConnectionStatus:
>>>> socketHandle ].
>>>> +       status == Connected ifTrue: [ ^true ].
>>>> +       status == WaitingForConnection
>>>> +               ifTrue: [ timeoutBlock value ]
>>>> +               ifFalse: [ refusedBlock value ].
>>>> +       ^false!
>>>> -                       semaphore waitTimeoutMSecs: msecsDelta -
>>>> msecsEllapsed.
>>>> -                       status := self primSocketConnectionStatus:
>>>> socketHandle].
>>>> -       status = Connected
>>>> -               ifFalse: [
>>>> -                       status = WaitingForConnection
>>>> -                               ifTrue: [timeoutBlock value]
>>>> -                               ifFalse: [refusedBlock value].
>>>> -                       ^false].
>>>> -       ^ true!
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
>>>> -----
>>>>  waitForConnectionUntil: deadline
>>>>         "Wait up until the given deadline for a connection to be
>>>> established. Return true if it is established by the deadline, false if
>>>> not."
>>>>
>>>> +       | status timeLeft |
>>>> -       | status waitTime |
>>>>         [
>>>>                 (status := self primSocketConnectionStatus:
>>>> socketHandle)
>>>> == Connected ifTrue: [ ^true ].
>>>>                 status == WaitingForConnection ifFalse: [ ^false ].
>>>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>>>> ifTrue: [ ^false ].
>>>> +               semaphore waitTimeoutMSecs: timeLeft ] repeat!
>>>> -               (waitTime := deadline - Time millisecondClockValue) > 0
>>>> ifFalse: [ ^false ].
>>>> -               semaphore waitTimeoutMSecs: waitTime ] repeat!
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category
>>>> 'waiting') -----
>>>>  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
>>>>         "Wait for the given nr of seconds for data to arrive."
>>>>
>>>> +       | deadline timeLeft |
>>>> -       | startTime msecsDelta |
>>>>         socketHandle ifNil: [ ^closedBlock value ].
>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>> truncated.
>>>> -       startTime := Time millisecondClockValue.
>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>>         [
>>>>                 (self primSocketReceiveDataAvailable: socketHandle)
>>>> ifTrue: [ ^self ].
>>>> +               self isOtherEndConnected ifFalse: [ ^closedBlock value
>>>> ].
>>>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>>>> ifTrue: [ ^timedOutBlock value ].
>>>> -               self isConnected ifFalse: [ ^closedBlock value ].
>>>> -               (Time millisecondsSince: startTime) < msecsDelta
>>>> ifFalse:
>>>> [ ^timedOutBlock value ].
>>>>                 "Providing a maximum for the time for waiting is a
>>>> workaround for a VM bug which causes sockets waiting for data forever in
>>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>>> fixed."
>>>>                 readSemaphore waitTimeoutMSecs:
>>>> +                       (timeLeft min: self class
>>>> maximumReadSemaphoreWaitTimeout) ] repeat!
>>>> -                       (msecsDelta - (Time millisecondsSince:
>>>> startTime)
>>>> min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting')
>>>> -----
>>>>  waitForDataIfClosed: closedBlock
>>>>         "Wait indefinitely for data to arrive.  This method will block
>>>> until
>>>>         data is available or the socket is closed."
>>>>
>>>>         socketHandle ifNil: [ ^closedBlock value ].
>>>>         [
>>>>                 (self primSocketReceiveDataAvailable: socketHandle)
>>>> ifTrue: [ ^self ].
>>>> +                self isOtherEndConnected ifFalse: [ ^closedBlock value
>>>> ].
>>>> -                self isConnected ifFalse: [ ^closedBlock value ].
>>>>                  "ul 8/13/2014 21:16
>>>>                   Providing a maximum for the time for waiting is a
>>>> workaround for a VM bug which
>>>>                   causes sockets waiting for data forever in some rare
>>>> cases, because the semaphore
>>>>                   doesn't get signaled. Replace the ""waitTimeoutMSecs:
>>>> self class maximumReadSemaphoreWaitTimeout""
>>>>                   part with ""wait"" when the bug is fixed."
>>>>                  readSemaphore waitTimeoutMSecs: self class
>>>> maximumReadSemaphoreWaitTimeout ] repeat!
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
>>>> -----
>>>>  waitForDisconnectionFor: timeout
>>>>         "Wait for the given nr of seconds for the connection to be
>>>> broken.
>>>>         Return true if it is broken by the deadline, false if not.
>>>>         The client should know the connection is really going to be
>>>> closed
>>>>         (e.g., because he has called 'close' to send a close request to
>>>> the other end)
>>>>         before calling this method."
>>>>
>>>> +       | deadline |
>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>> truncated.
>>>> +       [ self isOtherEndConnected and: [ deadline - Time
>>>> millisecondClockValue > 0 ] ]
>>>> +               whileTrue: [
>>>> +                       self discardReceivedData.
>>>> +                       "Providing a maximum for the time for waiting is
>>>> a
>>>> workaround for a VM bug which causes sockets waiting for data forever in
>>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>>> fixed."
>>>> +                       readSemaphore waitTimeoutMSecs:
>>>> +                               (deadline - Time millisecondClockValue
>>>> min: self class maximumReadSemaphoreWaitTimeout) ].
>>>> +       ^self isOtherEndConnected!
>>>> -       | startTime msecsDelta status |
>>>> -       startTime := Time millisecondClockValue.
>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>> -       status := self primSocketConnectionStatus: socketHandle.
>>>> -       [((status == Connected) or: [(status == ThisEndClosed)]) and:
>>>> -        [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue:
>>>> [
>>>> -               self discardReceivedData.
>>>> -               "Providing a maximum for the time for waiting is a
>>>> workaround for a VM bug which causes sockets waiting for data forever in
>>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>>> fixed."
>>>> -               readSemaphore waitTimeoutMSecs:
>>>> -                       (msecsDelta - (Time millisecondsSince:
>>>> startTime)
>>>> min: self class maximumReadSemaphoreWaitTimeout).
>>>> -               status := self primSocketConnectionStatus:
>>>> socketHandle].
>>>> -       ^ status ~= Connected!
>>>>
>>>> Item was changed:
>>>>  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
>>>>  waitForSendDoneFor: timeout
>>>>         "Wait up until the given deadline for the current send operation
>>>> to complete. Return true if it completes by the deadline, false if not."
>>>>
>>>> +       | deadline timeleft |
>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>> truncated.
>>>> -       | startTime msecsDelta msecsEllapsed |
>>>> -       startTime := Time millisecondClockValue.
>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>>         [
>>>>                 (self primSocketSendDone: socketHandle) ifTrue: [ ^true
>>>> ].
>>>> +               self isThisEndConnected ifFalse: [ ^false ].
>>>> +               (timeleft := deadline - Time millisecondClockValue) <= 0
>>>> ifTrue: [ ^false ].
>>>> +               writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
>>>> -               self isConnected ifFalse: [ ^false ].
>>>> -               (msecsEllapsed := Time millisecondsSince: startTime) <
>>>> msecsDelta ifFalse: [ ^false ].
>>>> -               writeSemaphore waitTimeoutMSecs: msecsDelta -
>>>> msecsEllapsed ] repeat!
>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: The Inbox: Network-ul.180.mcz

Levente Uzonyi
Hi Chris,

Thanks for tracking this down. I'll write some tests to see if the VM
actually supports half-closed connections properly when I have time.
If not, I'll revert this change before the release.

Levente

On Tue, 2 Aug 2016, Chris Muller wrote:

> Hi Levente, I've narrowed Magma's problem with these changes down to
> one line in Socket>>#waitForSendDoneFor:.  If I revert the call to
> #isThisEndConnected back to #isConnected, Magma's test suite works
> fine.
>
> waitForSendDoneFor: timeout
>    "Wait up until the given deadline for the current send operation
> to complete. Return true if it completes by the deadline, false if
> not."
>
>    | deadline timeleft |
>    deadline := Time millisecondClockValue + (timeout * 1000) truncated.
>    [
>        (self primSocketSendDone: socketHandle) ifTrue: [ ^true ].
>        self isThisEndConnected ifFalse: [ ^false ].  "<---- want to
> go back to #isConnected for this line"
>        (timeleft := deadline - Time millisecondClockValue) <= 0
> ifTrue: [ ^false ].
>        writeSemaphore waitTimeoutMSecs: timeleft ] repeat
>
> It affects Magma's High-Availability test cases, which involve
> terminating the server while its busy serving clients.  For each
> release of Squeak, I put out a new release of Magma that works OOTB.
> I would very much like to be able to do that this time too, without
> confusion of a changeset / modified system for Magma users.  Would you
> be willing to revert this one line to buy us time to explore this
> change together under greater scrutiny for the duration of the next
> release of Squeak, instead of this release?
>
> Best,
>  Chris
>
> On Sun, Jul 31, 2016 at 8:39 PM, Levente Uzonyi <[hidden email]> wrote:
>> Hi Chris,
>>
>> I decide to move this to the Trunk because the feature freeze is here. This
>> should also help getting more feedback. :)
>>
>> Levente
>>
>>
>> On Mon, 25 Jul 2016, Chris Muller wrote:
>>
>>> Hi Levente, Magma's test cases can't seem to get to the end with
>>> this..  I haven't investigated yet..
>>>
>>> On Mon, Jul 25, 2016 at 2:40 PM, Levente Uzonyi <[hidden email]>
>>> wrote:
>>>>
>>>> Hi All,
>>>>
>>>> This is something to test on all platforms before push. I've used it on
>>>> 64-bit linux without problems so far.
>>>>
>>>> Levente
>>>>
>>>>
>>>> On Mon, 25 Jul 2016, [hidden email] wrote:
>>>>
>>>>> Levente Uzonyi uploaded a new version of Network to project The Inbox:
>>>>> http://source.squeak.org/inbox/Network-ul.180.mcz
>>>>>
>>>>> ==================== Summary ====================
>>>>>
>>>>> Name: Network-ul.180
>>>>> Author: ul
>>>>> Time: 25 July 2016, 8:40:01.001452 pm
>>>>> UUID: 2f23a55c-fec5-41ac-95bd-6a8c2458be95
>>>>> Ancestors: Network-nice.179
>>>>>
>>>>> Socket changes:
>>>>> - fixed the comment of #isOtherEndConnected and #isThisEndConnected
>>>>> - do not slice the data (TCP) in the image in #sendData:. Let the VM,
>>>>> the
>>>>> kernel, the hardware deal with that.
>>>>> - use #isOtherEndConnected when receiving data, and #isThisEndConnected
>>>>> when sending data instead of #isConnected
>>>>> - move away from #milliseconds:since:, since we have a clock that won't
>>>>> roll over
>>>>>
>>>>> =============== Diff against Network-nice.179 ===============
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>closeAndDestroy: (in category 'connection
>>>>> open/close') -----
>>>>>  closeAndDestroy: timeoutSeconds
>>>>>         "First, try to close this connection gracefully. If the close
>>>>> attempt fails or times out, abort the connection. In either case,
>>>>> destroy
>>>>> the socket. Do nothing if the socket has already been destroyed (i.e.,
>>>>> if
>>>>> its socketHandle is nil)."
>>>>>
>>>>> +       socketHandle ifNil: [ ^self ].
>>>>> +       self isThisEndConnected ifTrue: [
>>>>> +               self close.  "Close this end." ].
>>>>> +       (self waitForDisconnectionFor: timeoutSeconds) ifFalse: [
>>>>> +               "The other end has not closed the connect yet, so we
>>>>> will
>>>>> just abort it."
>>>>> +               self primSocketAbortConnection: socketHandle ].
>>>>> +       self destroy!
>>>>> -       socketHandle ifNotNil: [
>>>>> -                       self isConnected ifTrue: [
>>>>> -                               self close.  "close this end"
>>>>> -                               (self waitForDisconnectionFor:
>>>>> timeoutSeconds) ifFalse: [
>>>>> -                                               "The other end didn't
>>>>> close so we just abort the connection"
>>>>> -                                               self
>>>>> primSocketAbortConnection: socketHandle]].
>>>>> -                       self destroy].
>>>>> - !
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>discardReceivedData (in category 'receiving')
>>>>> -----
>>>>>  discardReceivedData
>>>>>         "Discard any data received up until now, and return the number
>>>>> of
>>>>> bytes discarded."
>>>>>
>>>>>         | buf totalBytesDiscarded |
>>>>>         buf := String new: 10000.
>>>>>         totalBytesDiscarded := 0.
>>>>> +       [self isOtherEndConnected and: [self dataAvailable]] whileTrue:
>>>>> [
>>>>> -       [self isConnected and: [self dataAvailable]] whileTrue: [
>>>>>                 totalBytesDiscarded :=
>>>>>                         totalBytesDiscarded + (self receiveDataInto:
>>>>> buf)].
>>>>>         ^ totalBytesDiscarded
>>>>>  !
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>isOtherEndConnected (in category 'queries') -----
>>>>>  isOtherEndConnected
>>>>> +       "Return true if this socket is connected, or this end has closed
>>>>> the connection but not the other end, so we can still receive data."
>>>>> -       "Return true if this socket is connected, or this end has closed
>>>>> the connection but not the other end, so we can still send data."
>>>>>
>>>>>         | state |
>>>>>         socketHandle ifNil: [ ^false ].
>>>>>         (state := self primSocketConnectionStatus: socketHandle) ==
>>>>> Connected ifTrue: [ ^true ].
>>>>>         ^state == ThisEndClosed
>>>>>  !
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>isThisEndConnected (in category 'queries') -----
>>>>>  isThisEndConnected
>>>>> +       "Return true if this socket is connected, other the other end
>>>>> has
>>>>> closed the connection but not this end, so we can still send data."
>>>>> -       "Return true if this socket is connected, other the other end
>>>>> has
>>>>> closed the connection but not this end, so we can still receive data."
>>>>>
>>>>>         | state |
>>>>>         socketHandle ifNil: [ ^false ].
>>>>>         (state := self primSocketConnectionStatus: socketHandle) ==
>>>>> Connected ifTrue: [ ^true ].
>>>>>         ^state == OtherEndClosed
>>>>>  !
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>sendData: (in category 'sending') -----
>>>>>  sendData: aStringOrByteArray
>>>>>         "Send all of the data in the given array, even if it requires
>>>>> multiple calls to send it all. Return the number of bytes sent."
>>>>>
>>>>>         "An experimental version use on slow lines: Longer timeout and
>>>>> smaller writes to try to avoid spurious timeouts."
>>>>>
>>>>>         | bytesSent bytesToSend count |
>>>>>         bytesToSend := aStringOrByteArray size.
>>>>>         bytesSent := 0.
>>>>>         [bytesSent < bytesToSend] whileTrue: [
>>>>>                 (self waitForSendDoneFor: 60)
>>>>>                         ifFalse: [ConnectionTimedOut signal: 'send data
>>>>> timeout; data not sent'].
>>>>>                 count := self primSocket: socketHandle
>>>>>                         sendData: aStringOrByteArray
>>>>>                         startIndex: bytesSent + 1
>>>>> +                       count: bytesToSend - bytesSent.
>>>>> -                       count: (bytesToSend - bytesSent min:
>>>>> DefaultSendBufferSize).
>>>>>                 bytesSent := bytesSent + count].
>>>>>
>>>>>         ^ bytesSent
>>>>>  !
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>waitForConnectionFor:ifTimedOut:ifRefused: (in
>>>>> category 'waiting') -----
>>>>>  waitForConnectionFor: timeout ifTimedOut: timeoutBlock ifRefused:
>>>>> refusedBlock
>>>>>         "Wait up until the given deadline for a connection to be
>>>>> established. Return true if it is established by the deadline, false if
>>>>> not."
>>>>>
>>>>> +       | deadline timeLeft status |
>>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>>> truncated.
>>>>> +       (status := self primSocketConnectionStatus: socketHandle) ==
>>>>> Connected ifTrue: [^true].
>>>>> +       [ (status == WaitingForConnection) and: [ (timeLeft := deadline
>>>>> -
>>>>> Time millisecondClockValue) > 0 ] ]
>>>>> -       | startTime msecsDelta msecsEllapsed status |
>>>>> -       startTime := Time millisecondClockValue.
>>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>>> -       status := self primSocketConnectionStatus: socketHandle.
>>>>> -       status = Connected ifTrue: [^true].
>>>>> -       [(status = WaitingForConnection) and: [(msecsEllapsed := Time
>>>>> millisecondsSince: startTime) < msecsDelta]]
>>>>>                 whileTrue: [
>>>>> +                       semaphore waitTimeoutMSecs: timeLeft.
>>>>> +                       status := self primSocketConnectionStatus:
>>>>> socketHandle ].
>>>>> +       status == Connected ifTrue: [ ^true ].
>>>>> +       status == WaitingForConnection
>>>>> +               ifTrue: [ timeoutBlock value ]
>>>>> +               ifFalse: [ refusedBlock value ].
>>>>> +       ^false!
>>>>> -                       semaphore waitTimeoutMSecs: msecsDelta -
>>>>> msecsEllapsed.
>>>>> -                       status := self primSocketConnectionStatus:
>>>>> socketHandle].
>>>>> -       status = Connected
>>>>> -               ifFalse: [
>>>>> -                       status = WaitingForConnection
>>>>> -                               ifTrue: [timeoutBlock value]
>>>>> -                               ifFalse: [refusedBlock value].
>>>>> -                       ^false].
>>>>> -       ^ true!
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>waitForConnectionUntil: (in category 'waiting')
>>>>> -----
>>>>>  waitForConnectionUntil: deadline
>>>>>         "Wait up until the given deadline for a connection to be
>>>>> established. Return true if it is established by the deadline, false if
>>>>> not."
>>>>>
>>>>> +       | status timeLeft |
>>>>> -       | status waitTime |
>>>>>         [
>>>>>                 (status := self primSocketConnectionStatus:
>>>>> socketHandle)
>>>>> == Connected ifTrue: [ ^true ].
>>>>>                 status == WaitingForConnection ifFalse: [ ^false ].
>>>>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>>>>> ifTrue: [ ^false ].
>>>>> +               semaphore waitTimeoutMSecs: timeLeft ] repeat!
>>>>> -               (waitTime := deadline - Time millisecondClockValue) > 0
>>>>> ifFalse: [ ^false ].
>>>>> -               semaphore waitTimeoutMSecs: waitTime ] repeat!
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in category
>>>>> 'waiting') -----
>>>>>  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
>>>>>         "Wait for the given nr of seconds for data to arrive."
>>>>>
>>>>> +       | deadline timeLeft |
>>>>> -       | startTime msecsDelta |
>>>>>         socketHandle ifNil: [ ^closedBlock value ].
>>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>>> truncated.
>>>>> -       startTime := Time millisecondClockValue.
>>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>>>         [
>>>>>                 (self primSocketReceiveDataAvailable: socketHandle)
>>>>> ifTrue: [ ^self ].
>>>>> +               self isOtherEndConnected ifFalse: [ ^closedBlock value
>>>>> ].
>>>>> +               (timeLeft := deadline - Time millisecondClockValue) <= 0
>>>>> ifTrue: [ ^timedOutBlock value ].
>>>>> -               self isConnected ifFalse: [ ^closedBlock value ].
>>>>> -               (Time millisecondsSince: startTime) < msecsDelta
>>>>> ifFalse:
>>>>> [ ^timedOutBlock value ].
>>>>>                 "Providing a maximum for the time for waiting is a
>>>>> workaround for a VM bug which causes sockets waiting for data forever in
>>>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>>>> fixed."
>>>>>                 readSemaphore waitTimeoutMSecs:
>>>>> +                       (timeLeft min: self class
>>>>> maximumReadSemaphoreWaitTimeout) ] repeat!
>>>>> -                       (msecsDelta - (Time millisecondsSince:
>>>>> startTime)
>>>>> min: self class maximumReadSemaphoreWaitTimeout) ] repeat!
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>waitForDataIfClosed: (in category 'waiting')
>>>>> -----
>>>>>  waitForDataIfClosed: closedBlock
>>>>>         "Wait indefinitely for data to arrive.  This method will block
>>>>> until
>>>>>         data is available or the socket is closed."
>>>>>
>>>>>         socketHandle ifNil: [ ^closedBlock value ].
>>>>>         [
>>>>>                 (self primSocketReceiveDataAvailable: socketHandle)
>>>>> ifTrue: [ ^self ].
>>>>> +                self isOtherEndConnected ifFalse: [ ^closedBlock value
>>>>> ].
>>>>> -                self isConnected ifFalse: [ ^closedBlock value ].
>>>>>                  "ul 8/13/2014 21:16
>>>>>                   Providing a maximum for the time for waiting is a
>>>>> workaround for a VM bug which
>>>>>                   causes sockets waiting for data forever in some rare
>>>>> cases, because the semaphore
>>>>>                   doesn't get signaled. Replace the ""waitTimeoutMSecs:
>>>>> self class maximumReadSemaphoreWaitTimeout""
>>>>>                   part with ""wait"" when the bug is fixed."
>>>>>                  readSemaphore waitTimeoutMSecs: self class
>>>>> maximumReadSemaphoreWaitTimeout ] repeat!
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')
>>>>> -----
>>>>>  waitForDisconnectionFor: timeout
>>>>>         "Wait for the given nr of seconds for the connection to be
>>>>> broken.
>>>>>         Return true if it is broken by the deadline, false if not.
>>>>>         The client should know the connection is really going to be
>>>>> closed
>>>>>         (e.g., because he has called 'close' to send a close request to
>>>>> the other end)
>>>>>         before calling this method."
>>>>>
>>>>> +       | deadline |
>>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>>> truncated.
>>>>> +       [ self isOtherEndConnected and: [ deadline - Time
>>>>> millisecondClockValue > 0 ] ]
>>>>> +               whileTrue: [
>>>>> +                       self discardReceivedData.
>>>>> +                       "Providing a maximum for the time for waiting is
>>>>> a
>>>>> workaround for a VM bug which causes sockets waiting for data forever in
>>>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>>>> fixed."
>>>>> +                       readSemaphore waitTimeoutMSecs:
>>>>> +                               (deadline - Time millisecondClockValue
>>>>> min: self class maximumReadSemaphoreWaitTimeout) ].
>>>>> +       ^self isOtherEndConnected!
>>>>> -       | startTime msecsDelta status |
>>>>> -       startTime := Time millisecondClockValue.
>>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>>> -       status := self primSocketConnectionStatus: socketHandle.
>>>>> -       [((status == Connected) or: [(status == ThisEndClosed)]) and:
>>>>> -        [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue:
>>>>> [
>>>>> -               self discardReceivedData.
>>>>> -               "Providing a maximum for the time for waiting is a
>>>>> workaround for a VM bug which causes sockets waiting for data forever in
>>>>> some rare cases, because the semaphore doesn't get signaled. Remove the
>>>>> ""min: self class maximumReadSemaphoreWaitTimeout"" part when the bug is
>>>>> fixed."
>>>>> -               readSemaphore waitTimeoutMSecs:
>>>>> -                       (msecsDelta - (Time millisecondsSince:
>>>>> startTime)
>>>>> min: self class maximumReadSemaphoreWaitTimeout).
>>>>> -               status := self primSocketConnectionStatus:
>>>>> socketHandle].
>>>>> -       ^ status ~= Connected!
>>>>>
>>>>> Item was changed:
>>>>>  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
>>>>>  waitForSendDoneFor: timeout
>>>>>         "Wait up until the given deadline for the current send operation
>>>>> to complete. Return true if it completes by the deadline, false if not."
>>>>>
>>>>> +       | deadline timeleft |
>>>>> +       deadline := Time millisecondClockValue + (timeout * 1000)
>>>>> truncated.
>>>>> -       | startTime msecsDelta msecsEllapsed |
>>>>> -       startTime := Time millisecondClockValue.
>>>>> -       msecsDelta := (timeout * 1000) truncated.
>>>>>         [
>>>>>                 (self primSocketSendDone: socketHandle) ifTrue: [ ^true
>>>>> ].
>>>>> +               self isThisEndConnected ifFalse: [ ^false ].
>>>>> +               (timeleft := deadline - Time millisecondClockValue) <= 0
>>>>> ifTrue: [ ^false ].
>>>>> +               writeSemaphore waitTimeoutMSecs: timeleft ] repeat!
>>>>> -               self isConnected ifFalse: [ ^false ].
>>>>> -               (msecsEllapsed := Time millisecondsSince: startTime) <
>>>>> msecsDelta ifFalse: [ ^false ].
>>>>> -               writeSemaphore waitTimeoutMSecs: msecsDelta -
>>>>> msecsEllapsed ] repeat!
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>