Issue 3349 in pharo: Some network fixes

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue 3349 in pharo: Some network fixes

pharo
Status: FixedWaitingToBePharoed
Owner: stephane.ducasse
Labels: Milestone-1.3 Type-Squeak

New issue 3349 by stephane.ducasse: Some network fixes
http://code.google.com/p/pharo/issues/detail?id=3349

Levente Uzonyi uploaded a new version of Network to project The Trunk:
http://source.squeak.org/trunk/Network-ul.99.mcz

==================== Summary ====================

Name: Network-ul.99
Author: ul
Time: 25 November 2010, 11:25:58.723 pm
UUID: 26c053e2-d198-0144-a3c9-c8181481e3da
Ancestors: Network-ar.98

- fixed clock rollover issues (http://bugs.squeak.org/view.php?id=7343 
http://bugs.squeak.org/view.php?id=6857 )
- replaced #waitForConnectionUntil: sends to #waitForConnectionFor: sends  
in SocksSocket
- simplified a few methods (some are from Pharo)

=============== Diff against Network-ar.98 ===============

Item was changed:
  ----- Method: ConnectionQueue>>listenLoop (in category 'private') -----
  listenLoop
        "Private!! This loop is run in a separate process. It will establish  
up to maxQueueLength connections on the given port."
        "Details: When out of sockets or queue is full, retry more  
frequently, since a socket may become available, space may open in the  
queue, or a previously queued connection may be aborted by the client,  
making it available for a fresh connection."
        "Note: If the machine is disconnected from the network while the  
server is running, the currently waiting socket will go  
from 'isWaitingForConnection' to 'unconnected', and attempts to create new  
sockets will fail. When this happens, delete the broken socket and keep  
trying to create a socket in case the network connection is re-established.  
Connecting and disconnecting was tested under PPP on Mac system 8.1. It is  
not if this will work on other platforms."


        | newConnection |

        socket := Socket newTCP.
        "We'll accept four simultanous connections at the same time"
        socket listenOn: portNumber backlogSize: 4.
        "If the listener is not valid then the we cannot use the
        BSD style accept() mechanism."
        socket isValid ifFalse: [^self oldStyleListenLoop].
        [true] whileTrue: [
                socket isValid ifFalse: [
                        "socket has stopped listening for some reason"
                        socket destroy.
                        (Delay forMilliseconds: 10) wait.
                        ^self listenLoop ].
+               newConnection := socket
+                       waitForAcceptFor: 10
+                       ifTimedOut: [ nil ].
-               newConnection := [socket waitForAcceptFor: 10]
-                       on: ConnectionTimedOut
-                       do: [nil].
                (newConnection notNil and: [newConnection isConnected])  
ifTrue: [
                        accessSema critical: [connections addLast:  
newConnection.].
                        newConnection := nil.
                        self changed].
                self pruneStaleConnections]. !

Item was changed:
  ----- Method: ConnectionQueue>>oldStyleListenLoop (in category 'private')  
-----
  oldStyleListenLoop
        "Private!! This loop is run in a separate process. It will establish  
up to maxQueueLength connections on the given port."
        "Details: When out of sockets or queue is full, retry more  
frequently, since a socket may become available, space may open in the  
queue, or a previously queued connection may be aborted by the client,  
making it available for a fresh connection."
        "Note: If the machine is disconnected from the network while the  
server is running, the currently waiting socket will go  
from 'isWaitingForConnection' to 'unconnected', and attempts to create new  
sockets will fail. When this happens, delete the broken socket and keep  
trying to create a socket in case the network connection is re-established.  
Connecting and disconnecting was tested under PPP on Mac system 8.1. It is  
not if this will work on other platforms."

        [true] whileTrue: [
                ((socket == nil) and: [connections size < maxQueueLength])  
ifTrue: [
                        "try to create a new socket for listening"
                        socket := Socket createIfFail: [nil]].

                socket == nil
                        ifTrue: [(Delay forMilliseconds: 100) wait]
                        ifFalse: [
                                socket isUnconnected ifTrue: [socket  
listenOn: portNumber].
+                               socket
+                                       waitForConnectionFor: 10
+                                       ifTimedOut: [
-                               [socket waitForConnectionFor: 10]
-                                       on: ConnectionTimedOut
-                                       do: [:ex |
                                                socket isConnected
                                                        ifTrue:  
[  "connection established"
                                                                accessSema  
critical: [connections addLast: socket].
                                                                socket :=  
nil]
                                                        ifFalse: [
                                                                socket  
isWaitingForConnection
                                                                         
ifFalse: [socket destroy. socket := nil]]]].  "broken socket; start over"
                self pruneStaleConnections].
  !

Item was changed:
  ----- Method: Socket class>>deadlineSecs: (in category 'utilities') -----
  deadlineSecs: secs
        "Return a deadline time the given number of seconds from now."

+       self deprecated: 'Using this method may result in clock rollover  
related bug. Don''t use it.'.
        ^ Time millisecondClockValue + (secs * 1000) truncated
  !

Item was changed:
  ----- Method: Socket class>>ping: (in category 'utilities') -----
  ping: hostName
        "Ping the given host. Useful for checking network connectivity. The  
host must be running a TCP echo server."
        "Socket ping: 'squeak.cs.uiuc.edu'"

        | tcpPort sock serverAddr startTime echoTime |
        tcpPort := 7.  "7 = echo port, 13 = time port, 19 = character  
generator port"

        serverAddr := NetNameResolver addressForName: hostName timeout: 10.
+       serverAddr ifNil: [ ^self inform: 'Could not find an address for ',  
hostName ].
-       serverAddr = nil ifTrue: [
-               ^ self inform: 'Could not find an address for ', hostName].

        sock := Socket new.
        sock connectNonBlockingTo: serverAddr port: tcpPort.
        [sock waitForConnectionFor: 10]
                on: ConnectionTimedOut
                do: [:ex |
                        (self confirm: 'Continue to wait for connection  
to ', hostName, '?')
                                ifTrue: [ex retry]
                                ifFalse: [
                                        sock destroy.
                                        ^ self]].

        sock sendData: 'echo!!'.
        startTime := Time millisecondClockValue.
        [sock waitForDataFor: 15]
                on: ConnectionTimedOut
                do: [:ex | (self confirm: 'Packet sent but no echo yet; keep  
waiting?')
                        ifTrue: [ex retry]].
        echoTime := Time millisecondClockValue - startTime.

        sock destroy.
        self inform: hostName, ' responded in ', echoTime printString, '  
milliseconds'.
  !

Item was changed:
  ----- Method: Socket class>>pingPorts:on:timeOutSecs: (in  
category 'utilities') -----
  pingPorts: portList on: hostName timeOutSecs: timeOutSecs
        "Attempt to connect to each of the given sockets on the given host.  
Wait at most timeOutSecs for the connections to be established. Answer an  
array of strings indicating the available ports."
-       "Socket pingPorts: #(7 13 19 21 23 25 80 110 119)  
on: 'squeak.cs.uiuc.edu' timeOutSecs: 15"

+       "Socket pingPorts: #(7 13 19 21 23 25 80 110 119) on: 'squeak.org'  
timeOutSecs: 15"
-       | serverAddr sockets deadline done result unconnectedCount  
connectedCount waitingCount |
-       serverAddr := NetNameResolver addressForName: hostName timeout: 10.
-       serverAddr = nil ifTrue: [
-               self inform: 'Could not find an address for ', hostName.
-               ^ #()].

+       | serverAddr sockets startTime timeoutMsecs done result  
unconnectedCount connectedCount waitingCount |
+       serverAddr := NetNameResolver addressForName: hostName timeout: 10.
+       serverAddr ifNil: [
+                       self inform: 'Could not find an address for ' ,  
hostName.
+                       ^ #() ].
+       sockets := portList
+               collect: [ :portNum |
+                       | sock |
+                       sock := Socket new.
+                       [ sock connectTo: serverAddr port: portNum ]
+                               on: ConnectionTimedOut
+                               do: [ ].
+                       sock ].
+       startTime := Time millisecondClockValue.
+       timeoutMsecs := (1000 * timeOutSecs) truncated.
-       sockets := portList collect: [:portNum | | sock |
-               sock := Socket new.
-               sock connectTo: serverAddr port: portNum].
-
-       deadline := self deadlineSecs: timeOutSecs.
        done := false.
+       [ done ]
+               whileFalse: [
+                       unconnectedCount := 0.
+                       connectedCount := 0.
+                       waitingCount := 0.
+                       sockets
+                               do: [ :s |
+                                       s isUnconnectedOrInvalid
+                                               ifTrue: [  
unconnectedCount := unconnectedCount + 1 ]
+                                               ifFalse: [
+                                                       s isConnected
+                                                               ifTrue: [  
connectedCount := connectedCount + 1 ].
+                                                       s  
isWaitingForConnection
+                                                               ifTrue: [  
waitingCount := waitingCount + 1 ] ] ].
+                       waitingCount = 0
+                               ifTrue: [ done := true ].
+                       connectedCount = sockets size
+                               ifTrue: [ done := true ].
+                       (Time millisecondsSince: startTime) >= timeoutMsecs
+                               ifTrue: [ done := true ] ].
+       result := (sockets select: [ :s | s isConnected ]) collect: [ :s |  
self nameForWellKnownTCPPort: s remotePort ].
+       sockets do: [ :s | s destroy ].
+       ^ result!
-       [done] whileFalse: [
-               unconnectedCount := 0.
-               connectedCount := 0.
-               waitingCount := 0.
-               sockets do: [:s |
-                       s isUnconnectedOrInvalid
-                               ifTrue: [unconnectedCount :=  
unconnectedCount + 1]
-                               ifFalse: [
-                                       s isConnected ifTrue:  
[connectedCount := connectedCount + 1].
-                                       s isWaitingForConnection ifTrue:  
[waitingCount := waitingCount + 1]]].
-               waitingCount = 0 ifTrue: [done := true].
-               connectedCount = sockets size ifTrue: [done := true].
-               Time millisecondClockValue > deadline ifTrue: [done :=  
true]].
-
-       result := (sockets select: [:s | s isConnected])
-               collect: [:s | self nameForWellKnownTCPPort: s remotePort].
-       sockets do: [:s | s destroy].
-       ^ result
- !

Item was changed:
  ----- Method: Socket>>acceptFrom: (in category 'initialize-destroy') -----
  acceptFrom: aSocket
        "Initialize a new socket handle from an accept call"
        | semaIndex readSemaIndex writeSemaIndex |

        primitiveOnlySupportsOneSemaphore := false.
        semaphore := Semaphore new.
        readSemaphore := Semaphore new.
        writeSemaphore := Semaphore new.
        semaIndex := Smalltalk registerExternalObject: semaphore.
        readSemaIndex := Smalltalk registerExternalObject: readSemaphore.
        writeSemaIndex := Smalltalk registerExternalObject: writeSemaphore.
        socketHandle := self primAcceptFrom: aSocket socketHandle
                                                receiveBufferSize: 8000
                                                sendBufSize: 8000
                                                semaIndex: semaIndex
                                                readSemaIndex: readSemaIndex
                                                writeSemaIndex:  
writeSemaIndex.
+       socketHandle
+               ifNotNil: [ self register ]
+               ifNil: [  "socket creation failed"
+                       Smalltalk unregisterExternalObject: semaphore.
+                       Smalltalk unregisterExternalObject: readSemaphore.
+                       Smalltalk unregisterExternalObject: writeSemaphore.
+                       readSemaphore := writeSemaphore := semaphore := nil  
]
-       socketHandle = nil ifTrue: [  "socket creation failed"
-               Smalltalk unregisterExternalObject: semaphore.
-               Smalltalk unregisterExternalObject: readSemaphore.
-               Smalltalk unregisterExternalObject: writeSemaphore.
-               readSemaphore := writeSemaphore := semaphore := nil
-       ] ifFalse:[self register].
  !

Item was changed:
  ----- Method: Socket>>destroy (in category 'initialize-destroy') -----
  destroy
        "Destroy this socket. Its connection, if any, is aborted and its  
resources are freed. Do nothing if the socket has already been destroyed  
(i.e., if its socketHandle is nil)."

+       socketHandle ifNotNil: [
+               self isValid ifTrue: [ self primSocketDestroy: socketHandle  
].
-       socketHandle = nil ifFalse:
-               [self isValid ifTrue: [self primSocketDestroy:  
socketHandle].
                Smalltalk unregisterExternalObject: semaphore.
                Smalltalk unregisterExternalObject: readSemaphore.
                Smalltalk unregisterExternalObject: writeSemaphore.
                socketHandle := nil.
                readSemaphore := writeSemaphore := semaphore := nil.
+               self unregister ]!
-               self unregister].
- !

Item was changed:
  ----- Method: Socket>>initialize: (in category 'initialize-destroy') -----
  initialize: socketType
        "Initialize a new socket handle. If socket creation fails,  
socketHandle will be set to nil."
        | semaIndex readSemaIndex writeSemaIndex |

        primitiveOnlySupportsOneSemaphore := false.
        semaphore := Semaphore new.
        readSemaphore := Semaphore new.
        writeSemaphore := Semaphore new.
        semaIndex := Smalltalk registerExternalObject: semaphore.
        readSemaIndex := Smalltalk registerExternalObject: readSemaphore.
        writeSemaIndex := Smalltalk registerExternalObject: writeSemaphore.
        socketHandle :=
                self primSocketCreateNetwork: 0
                        type: socketType
                        receiveBufferSize: 8000
                        sendBufSize: 8000
                        semaIndex: semaIndex
                        readSemaIndex: readSemaIndex
                        writeSemaIndex: writeSemaIndex.

+       socketHandle
+               ifNotNil: [ self register ]
+               ifNil: [  "socket creation failed"
+                       Smalltalk unregisterExternalObject: semaphore.
+                       Smalltalk unregisterExternalObject: readSemaphore.
+                       Smalltalk unregisterExternalObject: writeSemaphore.
+                       readSemaphore := writeSemaphore := semaphore := nil  
]
-       socketHandle = nil ifTrue: [  "socket creation failed"
-               Smalltalk unregisterExternalObject: semaphore.
-               Smalltalk unregisterExternalObject: readSemaphore.
-               Smalltalk unregisterExternalObject: writeSemaphore.
-               readSemaphore := writeSemaphore := semaphore := nil
-       ] ifFalse:[self register].
  !







Reply | Threaded
Open this post in threaded view
|

Re: Issue 3349 in pharo: Some network fixes

pharo

Comment #1 on issue 3349 by stephane.ducasse: Some network fixes
http://code.google.com/p/pharo/issues/detail?id=3349

Item was changed:
  ----- Method: Socket>>localAddress (in category 'accessing') -----
  localAddress
+
+       self isWaitingForConnection ifFalse: [
+               self
+                       waitForConnectionFor: Socket standardTimeout
+                       ifTimedOut: [ ^ByteArray new: 4 ] ].
+       ^self primSocketLocalAddress: socketHandle!
-       self isWaitingForConnection
-               ifFalse: [[self waitForConnectionFor: Socket  
standardTimeout]
-                               on: ConnectionTimedOut
-                               do: [:ex | ^ ByteArray new: 4]].
-       ^ self primSocketLocalAddress: socketHandle!

Item was changed:
  ----- Method: Socket>>localPort (in category 'accessing') -----
  localPort
+
+       self isWaitingForConnection ifFalse: [
+               self
+                       waitForConnectionFor: Socket standardTimeout
+                       ifTimedOut: [ ^0] ].
-       self isWaitingForConnection
-               ifFalse: [[self waitForConnectionFor: Socket  
standardTimeout]
-                               on: ConnectionTimedOut
-                               do: [:ex | ^ 0]].
        ^ self primSocketLocalPort: socketHandle!

Item was changed:
  ----- Method: Socket>>waitForAcceptFor: (in category 'waiting') -----
  waitForAcceptFor: timeout
        "Wait and accept an incoming connection. Return nil if it falis"
+       self waitForConnectionFor: timeout ifTimedOut: [^ nil].
+       ^ self isConnected
-       [self waitForConnectionFor: timeout] on: ConnectionTimedOut do:  
[:ex | ^nil].
-       ^self isConnected
                ifTrue:[self accept]
                !

Item was changed:
  ----- Method: Socket>>waitForConnectionFor:ifTimedOut: (in  
category 'waiting') -----
  waitForConnectionFor: timeout ifTimedOut: timeoutBlock
        "Wait up until the given deadline for a connection to be  
established. Return true if it is established by the deadline, false if  
not."

+       | startTime msecsDelta msecsEllapsed status |
+       startTime := Time millisecondClockValue.
+       msecsDelta := (timeout * 1000) truncated.
-       | status deadline |
-       deadline := Socket deadlineSecs: timeout.
        status := self primSocketConnectionStatus: socketHandle.
+       [(status = WaitingForConnection) and: [(msecsEllapsed := Time  
millisecondsSince: startTime) < msecsDelta]]
-       [(status = WaitingForConnection) and: [Time millisecondClockValue <  
deadline]]
                whileTrue: [
+                       semaphore waitTimeoutMSecs: msecsDelta -  
msecsEllapsed.
-                       semaphore waitTimeoutMSecs: (deadline - Time  
millisecondClockValue).
                        status := self primSocketConnectionStatus:  
socketHandle].

+       status = Connected ifFalse: [^timeoutBlock value].
+       ^ true!
-       status = Connected ifFalse: [^timeoutBlock value]
- !

Item was changed:
  ----- Method: Socket>>waitForDataFor:ifClosed:ifTimedOut: (in  
category 'waiting') -----
  waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock
        "Wait for the given nr of seconds for data to arrive."
+
+       | startTime msecsDelta |
+       startTime := Time millisecondClockValue.
+       msecsDelta := (timeout * 1000) truncated.
+       [(Time millisecondsSince: startTime) < msecsDelta] whileTrue: [
+               (self primSocketReceiveDataAvailable: socketHandle)
+                       ifTrue: [^self].
+               self isConnected
+                       ifFalse: [^closedBlock value].
+               self readSemaphore waitTimeoutMSecs:
+                       (msecsDelta - (Time millisecondsSince: startTime)  
max: 0).
+       ].

-       | deadline |
-       deadline := Socket deadlineSecs: timeout.
-
-       [Time millisecondClockValue < deadline]
-               whileTrue: [
-                       (self primSocketReceiveDataAvailable: socketHandle)
-                               ifTrue: [^self].
-                       self isConnected
-                               ifFalse: [^closedBlock value].
-                       self readSemaphore waitTimeoutMSecs: (deadline -  
Time millisecondClockValue)].
-
        (self primSocketReceiveDataAvailable: socketHandle)
                ifFalse: [
                        self isConnected
                                ifTrue: [^timedOutBlock value]
+                               ifFalse: [^closedBlock value]].!
-                               ifFalse: [^closedBlock value]]!

Item was changed:
  ----- Method: Socket>>waitForDisconnectionFor: (in category 'waiting')  
-----
  waitForDisconnectionFor: timeout
        "Wait for the given nr of seconds for the connection to be broken.
        Return true if it is broken by the deadline, false if not.
        The client should know the connection is really going to be closed
        (e.g., because he has called 'close' to send a close request to the  
other end)
        before calling this method."

+       | startTime msecsDelta status |
+       startTime := Time millisecondClockValue.
+       msecsDelta := (timeout * 1000) truncated.
-       | status deadline |
        status := self primSocketConnectionStatus: socketHandle.
-       deadline := Socket deadlineSecs: timeout.
        [((status == Connected) or: [(status == ThisEndClosed)]) and:
+        [(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
-        [Time millisecondClockValue < deadline]] whileTrue: [
                self discardReceivedData.
+               self readSemaphore waitTimeoutMSecs:
+                       (msecsDelta - (Time millisecondsSince: startTime)  
max: 0).
-               self readSemaphore waitTimeoutMSecs: (deadline - Time  
millisecondClockValue).
                status := self primSocketConnectionStatus: socketHandle].
+       ^ status ~= Connected!
-
-       ^ status ~= Connected
- !
Item was changed:
  ----- Method: Socket>>waitForSendDoneFor: (in category 'waiting') -----
  waitForSendDoneFor: timeout
        "Wait up until the given deadline for the current send operation to  
complete. Return true if it completes by the deadline, false if not."

+       | startTime msecsDelta msecsEllapsed sendDone |
+       startTime := Time millisecondClockValue.
+       msecsDelta := (timeout * 1000) truncated.
+       [(sendDone := self primSocketSendDone: socketHandle) not and: [  
self isConnected
-       | sendDone deadline |
-       deadline := Socket deadlineSecs: timeout.
-       [self isConnected & (sendDone := self primSocketSendDone:  
socketHandle) not
                        "Connection end and final data can happen fast, so  
test in this order"
+               and: [(msecsEllapsed := Time millisecondsSince: startTime)  
< msecsDelta]]] whileTrue: [
+                       self writeSemaphore waitTimeoutMSecs: msecsDelta -  
msecsEllapsed].
-               and: [Time millisecondClockValue < deadline]] whileTrue: [
-                       self writeSemaphore waitTimeoutMSecs: (deadline -  
Time millisecondClockValue)].

        ^ sendDone!

Item was changed:
  ----- Method: SocksSocket>>connectTo:port: (in category 'connection  
open/close') -----
  connectTo: hostAddress port: port
        self initializeNetwork.
        self shouldUseSocks
                ifFalse: [^super connectTo: hostAddress port: port].
        super connectTo: socksIP port: socksPort.
+       self waitForConnectionFor: Socket standardTimeout.
-       self waitForConnectionUntil: Socket standardDeadline.
        dstIP := hostAddress.
        dstPort := port.
        vers = 4
                ifTrue: [self connectSocks4]
                ifFalse: [self connectSocks5]
        !

Item was changed:
  ----- Method: SocksSocket>>connectToHostNamed:port: (in  
category 'connection open/close') -----
  connectToHostNamed: hostName port: port
        super connectTo: socksIP port: socksPort.
+       self waitForConnectionFor: Socket standardTimeout.
-       self waitForConnectionUntil: Socket standardDeadline.
        dstName := hostName.
        dstPort := port.
        vers = 4
                ifTrue: [self connectSocks4]
                ifFalse: [self connectSocks5]
        !


Reply | Threaded
Open this post in threaded view
|

Re: Issue 3349 in pharo: Some network fixes

pharo
Updates:
        Labels: -Milestone-1.3

Comment #2 on issue 3349 by [hidden email]: Some network fixes
http://code.google.com/p/pharo/issues/detail?id=3349

(No comment was entered for this change.)