Hi all,
I tried to implement a connection pool for Swazoo. I'll probably commit this to the GNU Smalltalk repository early next week or earlier if I get positive feedback about it. The code maintains a linked list of connections ordered by starting time (so it is fair) and eliminates connections that are not currently responding when a given number of connections (default 15, can be changed with a message to SwazooServer singleton) is exceeded. The pool is shared by all servers (i.e. all interfaces) because it helps controlling process-wide resources such as file descriptors, and that is also why its size is controlled by SwazooServer. The code is quite complicated especially because I just used plain Blue Book semaphores, no RecursionLocks or anything like that. From some testing using netcat however it seems to do its job well and to be worth it. For example a "Keep-Alive: 1" connection will always be killed before a "Keep-Alive: 300" connection. Putting it under load may show problems though. In particular, I'm not sure what to do if the server is receiving a response at the time the "reaper" process wants to kill it. Right now I reset the connection, but I'm not sure whether the browser would like it. Since this affects only cases when you have a large number of incoming simultaneous connections, *or* when the timing is so bad that the beginning of the request is within the Keep-Alive limit but the ending is not (e.g. with file POST requests), I'm inclined to ignore it. The file is a patch to GNU Smalltalk, however I release it as MIT license since it may be of interest to Kom or anything else. Swazoo guys, tell me what you think and what is the best shape to contribute this back (changeset?). The changes to Messages.st are independent fixes. Paolo diff --git a/packages/swazoo-httpd/Core.st b/packages/swazoo-httpd/Core.st index acd7b16..7a6c2cc 100644 --- a/packages/swazoo-httpd/Core.st +++ b/packages/swazoo-httpd/Core.st @@ -591,8 +591,229 @@ CompositeResource subclass: SwazooSite [ +Object subclass: HTTPConnectionPool [ + | reapers pool prev next first last firstFree mutex reapDone | + + <comment: 'HTTPConnectionPool is a helper class that manages all the +connections of a SwazooServer. The number of available connections +is limited, so the pool is always kept sorted by first request and +more ancient connections are dropped if there is a request from +somewhere else. + +In addition, this class implements a backoff policy that should help in +case limited resources prevent acceptance of new connections. +'> + + HTTPConnectionPool class >> new: size [ + <category: 'instance creation'> + ^self new initialize: size + ] + + reaperForIndex: i [ + <category: 'private-initialize'> + + | block s | + s := Semaphore new. + + "We need a separate process because Smalltalk Semaphores + do not portably provide a 'try to lock' method. We store + a semaphore in an array and use the semaphore to wake up + the process." + block := [ + | sem doIt | + [s wait. + (sem := reapDone) notNil ifTrue: [ + (pool at: i) whenNotServingDo: [ + "Run under mutex to ensure only one connection + is reapDone." + mutex critical: [ + doIt := reapDone == sem. + doIt ifTrue: [reapDone := nil]]. + + doIt ifTrue: [ + (pool at: i) close. + sem signal]]]] + repeat]. + + block forkAt: Processor userInterruptPriority. + ^s + ] + + initialize: size [ + <category: 'private-initialize'> + + "Free list." + firstFree := size. + next := (0 to: size - 1) asArray. + + "Actual connection pool." + first := last := 0. + pool := Array new: size. + prev := Array new: size. + + mutex := Semaphore forMutualExclusion. + reapers := (1 to: size) collect: [ :i | self reaperForIndex: i ]. + ] + + size [ + <category: 'accessing'> + ^pool size + ] + + acceptConnectionForServer: aServer [ + "Ask aServer for an HTTPConnection object, put it into the pool, + and return it." + <category: 'serving'> + + | connection time | + time := 10. + [connection := SpExceptionContext + for: [aServer acceptConnectionFromSocket] + on: SpError + do: [:ex | + Transcript + show: 'Socket accept error: ' , ex errorString; + cr. + ex return]. + + connection isNil] whileTrue: [ + "Trying immediately would just cause another failure. + Try freeing a connection's resources first, otherwise wait + for connections to finish in 10, 30, 70, 150, 250 ms." + self tryToReapConnection ifFalse: [ + (Delay forMilliseconds: time) wait. + time := (time + 5) * 2 min: 250]]. + + ^self addConnection: connection + ] + + addConnection: aConnection [ + <category: 'private'> + + mutex critical: [ + firstFree = 0 ifTrue: [self reapConnection]. + aConnection id: firstFree. + + "Add to the list and update the free list." + pool at: firstFree put: aConnection. + prev at: firstFree put: last. + last = 0 + ifTrue: [ first := last := firstFree ] + ifFalse: [ last := next at: last put: firstFree ]. + + firstFree := next at: firstFree. + next at: last put: 0]. + + ^aConnection + ] + + removeConnection: aConnection [ + <category: 'accessing'> + + | index | + index := aConnection id. + aConnection id: 0. + index = 0 ifTrue: [ ^self ]. + mutex critical: [ + "Remove from the list..." + | itsPrev itsNext | + itsPrev := prev at: index. + itsNext := next at: index. + pool at: index put: nil. + itsNext = 0 + ifTrue: [ last := itsPrev ] + ifFalse: [ prev at: itsNext put: itsPrev ]. + itsPrev = 0 + ifTrue: [ first := itsNext ] + ifFalse: [ next at: itsPrev put: itsNext ]. + + "... and put back into the free list." + next at: index put: firstFree. + firstFree := index]. + ] + + connections [ + <category: 'accessing'> + + | oc | + oc := OrderedCollection new. + mutex critical: [ + self walk: [:each | oc add: each]]. + ^oc + ] + + walk: aBlock [ + <category: 'private'> + + "Must be called within mutex." + | i | + i := first. + [i = 0] whileFalse: [ + aBlock value: (pool at: i). + i := next at: i] + ] + + tryToReap: i [ + "Start a process that will wait for connection i to finish + serving a response and, if no other connection has been reaped + yet, will forcibly close connection i." + <category: 'reaping'> + + "Must be called within mutex." + (reapers at: i) signal. + ] + + reapConnections: block [ + <category: 'reaping'> + + "Must be called within mutex." + | i sem prev | + + "First the easy case, see if some Keep-Alive connection has + timed out." + self walk: [ :conn | + conn keepAliveTimeout ifTrue: [conn close. ^self]]. + + "Else wake up the reaper processes. They will find the first + connection that finishes serving a response and forcibly close + it." + i := first. + sem := reapDone := Semaphore new. + [reapDone notNil and: [i > 0]] whileTrue: [ + prev := i. + i := next at: i. + mutex signal. + self tryToReap: prev. + mutex wait]. + + "Wait for the processes to do their job if called because of + excessive loads." + block ifTrue: [sem wait]. + reapDone := nil. + ] + + tryToReapConnection [ + "Look for dormient connections and kill one of them. Return whether + one was found." + <category: 'reaping'> + + ^mutex critical: [ + self reapConnections: false. + firstFree > 0] + ] + + reapConnection [ + "Kill one dormient connection, possibly sleeping until one is found." + <category: 'reaping'> + + self reapConnections: true + ] + +] + + Object subclass: SwazooServer [ - | sites servers watchdog | + | connectionPool sites servers watchdog | <category: 'Swazoo-Core'> <comment: 'SwazooServer is where all begins in Swazoo! @@ -650,7 +871,7 @@ SwazooServer demoStart will create and run a demo site on http://localhost:8888 SwazooServer class >> initSingleton [ <category: 'private'> - Singleton := super new + Singleton := super new initialize ] SwazooServer class >> initialize [ @@ -661,6 +882,11 @@ SwazooServer demoStart will create and run a demo site on http://localhost:8888 for: self singleton ] + SwazooServer class >> defaultConnectionPoolSize [ + <category: 'parameters'> + ^15 + ] + SwazooServer class >> new [ <category: 'private'> ^self shouldNotImplement @@ -822,6 +1048,27 @@ SwazooServer demoStart will create and run a demo site on http://localhost:8888 ^self sites contains: [:each | each port = aNumber] ] + connectionPool [ + <category: 'accessing'> + ^connectionPool + ] + + connectionPoolSize [ + <category: 'accessing'> + ^connectionPool size + ] + + connectionPoolSize: anInteger [ + | active | + active := self servers isNil + ifTrue: [ #() ] + ifFalse: [ self servers copy ]. + + active do: [ :each | each stop ]. + connectionPool := HTTPConnectionPool new: anInteger. + active do: [ :each | each start ] + ] + initServers [ <category: 'initialize-release'> servers := Set new @@ -834,6 +1081,7 @@ SwazooServer demoStart will create and run a demo site on http://localhost:8888 initialize [ <category: 'initialize-release'> + self connectionPoolSize: self class defaultConnectionPoolSize. self initSites. self initServers ] diff --git a/packages/swazoo-httpd/HTTP.st b/packages/swazoo-httpd/HTTP.st index cc87f1a..50d98ad 100644 --- a/packages/swazoo-httpd/HTTP.st +++ b/packages/swazoo-httpd/HTTP.st @@ -30,14 +30,14 @@ Object subclass: HTTPConnection [ - | stream loop server task | + | stream loop id server task mutex | <category: 'Swazoo-HTTP'> <comment: nil> HTTPConnection class >> socket: aSocket [ <category: 'instance creation'> - ^self new stream: aSocket stream + ^self new initializeStream: aSocket stream ] close [ @@ -53,44 +53,49 @@ Object subclass: HTTPConnection [ self loop: nil] ] - getAndDispatchMessages [ + whenNotServingDo: aBlock [ + mutex critical: aBlock + ] + + getAndDispatchRequest [ "^self The HTTPRequest is read from my socket stream. I then pass this request to my server to get a response." <category: 'serving'> - self stream anyDataReady - ifTrue: - ["wait for data and if anything read, proceed" + self task: (SwazooTask newOn: self). + self readRequestFor: self task. + mutex critical: [ + self isOpen ifFalse: [^self]. + self produceResponseFor: self task]. - self task: (SwazooTask newOn: self). - self readRequestFor: self task. - self produceResponseFor: self task. - self task request wantsConnectionClose ifTrue: [self close]. - self task request isHttp10 ifTrue: [self close] "well, we won't complicate here"] - ifFalse: - [self keepAliveTimeout ifTrue: [^self close]. - (Delay forMilliseconds: 100) wait. "to finish sending, if any" - self close] + self task request wantsConnectionClose ifTrue: [self close]. + self task request isHttp10 ifTrue: [self close] + ] + + anyDataReady [ + "wait for data and if anything read, proceed" + | theStream | + theStream := self stream. + ^self isOpen and: [theStream anyDataReady] ] - interact [ + interact: multiThreaded [ "longer description is below method" <category: 'serving'> | interactionBlock | interactionBlock := - [ - [[[true] whileTrue: - [self getAndDispatchMessages. + [[[[self anyDataReady] whileTrue: + [self getAndDispatchRequest. Processor yield]] - ifCurtailed: + ensure: [self close]] on: Error do: [:ex | (Delay forMilliseconds: 50) wait. "to finish sending, if any" self close]]. - self server isMultiThreading + multiThreaded ifTrue: [self loop: (interactionBlock forkAt: Processor userBackgroundPriority)] ifFalse: [interactionBlock value]. @@ -112,9 +117,10 @@ Object subclass: HTTPConnection [ <category: 'testing'> | seconds | self task isNil ifTrue: [^false]. + self task request isNil ifTrue: [^false]. self task request isKeepAlive ifFalse: [^false]. seconds := self task request keepAlive notNil - ifTrue: [self task request keepAlive asInteger - 10 "to be sure"] + ifTrue: [self task request keepAlive asInteger] ifFalse: [20]. "probably enough?" ^SpTimestamp now asSeconds - self task request timestamp asSeconds >= seconds @@ -205,6 +211,16 @@ Object subclass: HTTPConnection [ self close] ] + id [ + <category: 'private'> + ^id + ] + + id: anInteger [ + <category: 'private'> + id := anInteger + ] + server [ <category: 'private'> ^server @@ -225,8 +241,9 @@ Object subclass: HTTPConnection [ ^stream ] - stream: aSwazooStream [ + initializeStream: aSwazooStream [ <category: 'private'> + mutex := Semaphore forMutualExclusion. stream := aSwazooStream ] @@ -248,7 +265,7 @@ Object subclass: HTTPConnection [ Object subclass: HTTPServer [ - | ip port connections sites socket loop isMultiThreading | + | ip port sites socket loop isMultiThreading | <category: 'Swazoo-HTTP'> <comment: nil> @@ -273,28 +290,18 @@ Object subclass: HTTPServer [ acceptConnection [ "^self - I accept the next inbound TCP/IP connection. The operating system libraries queue these up for me, so I can just handle one at a time. I create an HTTPConnection instance to actually handle the interaction with the client - if I am in single threaded mode, the connection will completely handle the request before returning control to me, but in multi-threaded mode the connection forks the work into a sepparate thread in this image and control is immediately returned to me (the application programmer must worry about thread safety in this case." + I accept the next inbound TCP/IP connection. The operating system libraries queue these up for me, so I can just handle one at a time. I create an HTTPConnection instance to actually handle the interaction with the client - if I am in single threaded mode, the connection will completely handle the request before returning control to me, but in multi-threaded mode the connection forks the work into a separate thread in this image and control is immediately returned to me (the application programmer must worry about thread safety in this case." - <category: 'private'> - | clientConnection | - clientConnection := SpExceptionContext - for: [HTTPConnection socket: self socket accept] - on: SpError - do: - [:ex | - Transcript - show: 'Socket accept error: ' , ex errorString; - cr. - ^self]. - self addConnection: clientConnection. - clientConnection interact. - ^self + (SwazooServer singleton connectionPool acceptConnectionForServer: self) + interact: self isMultiThreading ] - addConnection: aConnection [ + acceptConnectionFromSocket [ <category: 'private'> - self connections add: aConnection. - aConnection server: self + | connection | + connection := HTTPConnection socket: self socket accept. + connection server: self. + ^connection ] addSite: aSite [ @@ -310,22 +317,11 @@ Object subclass: HTTPServer [ ^response isNil ifTrue: [HTTPResponse notFound] ifFalse: [response] ] - connections [ - <category: 'private'> - connections isNil ifTrue: [self initConnections]. - ^connections - ] - hasNoSites [ <category: 'sites'> ^self sites hasNoResources ] - initConnections [ - <category: 'private-initialize'> - connections := OrderedCollection new - ] - initSites [ <category: 'private-initialize'> sites := ServerRootComposite new @@ -333,7 +329,6 @@ Object subclass: HTTPServer [ initialize [ <category: 'private-initialize'> - self initConnections. self initSites ] @@ -390,7 +385,7 @@ Object subclass: HTTPServer [ removeConnection: aConnection [ <category: 'private'> - self connections remove: aConnection ifAbsent: [nil] + SwazooServer singleton connectionPool removeConnection: aConnection ] removeSite: aSite [ @@ -426,7 +421,6 @@ Object subclass: HTTPServer [ sites [ <category: 'private'> - sites isNil ifTrue: [self initSites]. ^sites ] @@ -463,7 +457,8 @@ Object subclass: HTTPServer [ <category: 'start/stop'> self loop isNil ifFalse: - [self connections copy do: [:each | each close]. + [SwazooServer singleton connectionPool connections + do: [:each | each server == self ifTrue: [each close]]. self loop terminate. self loop: nil. self socket close. diff --git a/packages/swazoo-httpd/Messages.st b/packages/swazoo-httpd/Messages.st index 25d1d0e..9b5fe25 100644 --- a/packages/swazoo-httpd/Messages.st +++ b/packages/swazoo-httpd/Messages.st @@ -343,10 +343,13 @@ HTTPMessage subclass: HTTPRequest [ isKeepAlive [ <category: 'testing'> - | header | + | header result | + result := self isHttp10 not. header := self connection. - header isNil ifTrue: [^false]. - ^'*Keep-Alive*' match: header + header isNil ifFalse: [ + ('*Keep-Alive*' match: header) ifTrue: [ result := true ]. + ('*Close*' match: header) ifTrue: [ result := false ] ]. + ^result ] isOptions [ @@ -373,7 +376,7 @@ HTTPMessage subclass: HTTPRequest [ "how many seconds a connection must be kept alive" <category: 'accessing-headers'> - ^(self headers fieldNamed: 'KeepAlive' ifNone: [^nil]) value + ^(self headers fieldNamed: 'Keep-Alive' ifNone: [^nil]) value ] methodName [ _______________________________________________ help-smalltalk mailing list [hidden email] http://lists.gnu.org/mailman/listinfo/help-smalltalk |
Free forum by Nabble | Edit this page |