Quantcast

Implementing a connection pool for Swazoo

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Implementing a connection pool for Swazoo

Paolo Bonzini-2
Hi all,

I tried to implement a connection pool for Swazoo.  I'll probably
commit this to the GNU Smalltalk repository early next week or
earlier if I get positive feedback about it.

The code maintains a linked list of connections ordered by starting
time (so it is fair) and eliminates connections that are not currently
responding when a given number of connections (default 15, can be
changed with a message to SwazooServer singleton) is exceeded.  The
pool is shared by all servers (i.e. all interfaces) because it helps
controlling process-wide resources such as file descriptors, and that
is also why its size is controlled by SwazooServer.

The code is quite complicated especially because I just used plain Blue
Book semaphores, no RecursionLocks or anything like that.  From some
testing using netcat however it seems to do its job well and to be worth
it.  For example a "Keep-Alive: 1" connection will always be killed
before a "Keep-Alive: 300" connection.  Putting it under load may show
problems though.

In particular, I'm not sure what to do if the server is receiving
a response at the time the "reaper" process wants to kill it.  Right
now I reset the connection, but I'm not sure whether the browser
would like it.  Since this affects only cases when you have a large
number of incoming simultaneous connections, *or* when the timing
is so bad that the beginning of the request is within the Keep-Alive
limit but the ending is not (e.g. with file POST requests), I'm inclined
to ignore it.

The file is a patch to GNU Smalltalk, however I release it as MIT
license since it may be of interest to Kom or anything else.  Swazoo guys,
tell me what you think and what is the best shape to contribute this
back (changeset?).

The changes to Messages.st are independent fixes.

Paolo


diff --git a/packages/swazoo-httpd/Core.st b/packages/swazoo-httpd/Core.st
index acd7b16..7a6c2cc 100644
--- a/packages/swazoo-httpd/Core.st
+++ b/packages/swazoo-httpd/Core.st
@@ -591,8 +591,229 @@ CompositeResource subclass: SwazooSite [
 
 
 
+Object subclass: HTTPConnectionPool [
+    | reapers pool prev next first last firstFree mutex reapDone |
+
+    <comment: 'HTTPConnectionPool is a helper class that manages all the
+connections of a SwazooServer.  The number of available connections
+is limited, so the pool is always kept sorted by first request and
+more ancient connections are dropped if there is a request from
+somewhere else.
+
+In addition, this class implements a backoff policy that should help in
+case limited resources prevent acceptance of new connections.
+'>
+
+    HTTPConnectionPool class >> new: size [
+ <category: 'instance creation'>
+ ^self new initialize: size
+    ]
+
+    reaperForIndex: i [
+ <category: 'private-initialize'>
+
+ | block s |
+ s := Semaphore new.
+
+ "We need a separate process because Smalltalk Semaphores
+ do not portably provide a 'try to lock' method.  We store
+ a semaphore in an array and use the semaphore to wake up
+ the process."
+ block := [
+    | sem doIt |
+    [s wait.
+    (sem := reapDone) notNil ifTrue: [
+        (pool at: i) whenNotServingDo: [
+    "Run under mutex to ensure only one connection
+     is reapDone."
+    mutex critical: [
+                doIt := reapDone == sem.
+        doIt ifTrue: [reapDone := nil]].
+
+    doIt ifTrue: [
+        (pool at: i) close.
+        sem signal]]]]
+ repeat].
+
+ block forkAt: Processor userInterruptPriority.
+ ^s
+    ]
+
+    initialize: size [
+ <category: 'private-initialize'>
+
+ "Free list."
+ firstFree := size.
+ next := (0 to: size - 1) asArray.
+
+ "Actual connection pool."
+ first := last := 0.
+ pool := Array new: size.
+ prev := Array new: size.
+
+ mutex := Semaphore forMutualExclusion.
+ reapers := (1 to: size) collect: [ :i | self reaperForIndex: i ].
+    ]
+
+    size [
+ <category: 'accessing'>
+ ^pool size
+    ]
+
+    acceptConnectionForServer: aServer [
+ "Ask aServer for an HTTPConnection object, put it into the pool,
+ and return it."
+ <category: 'serving'>
+
+ | connection time |
+ time := 10.
+ [connection := SpExceptionContext
+    for: [aServer acceptConnectionFromSocket]
+    on: SpError
+    do: [:ex |
+ Transcript
+    show: 'Socket accept error: ' , ex errorString;
+    cr.
+ ex return].
+
+ connection isNil] whileTrue: [
+     "Trying immediately would just cause another failure.
+      Try freeing a connection's resources first, otherwise wait
+      for connections to finish in 10, 30, 70, 150, 250 ms."
+     self tryToReapConnection ifFalse: [
+ (Delay forMilliseconds: time) wait.
+ time := (time + 5) * 2 min: 250]].
+
+ ^self addConnection: connection
+    ]
+
+    addConnection: aConnection [
+ <category: 'private'>
+
+ mutex critical: [
+    firstFree = 0 ifTrue: [self reapConnection].
+    aConnection id: firstFree.
+
+    "Add to the list and update the free list."
+    pool at: firstFree put: aConnection.
+    prev at: firstFree put: last.
+    last = 0
+ ifTrue: [ first := last := firstFree ]
+ ifFalse: [ last := next at: last put: firstFree ].
+
+    firstFree := next at: firstFree.
+    next at: last put: 0].
+
+ ^aConnection
+    ]
+
+    removeConnection: aConnection [
+ <category: 'accessing'>
+
+ | index |
+ index := aConnection id.
+ aConnection id: 0.
+ index = 0 ifTrue: [ ^self ].
+ mutex critical: [
+    "Remove from the list..."
+    | itsPrev itsNext |
+    itsPrev := prev at: index.
+    itsNext := next at: index.
+    pool at: index put: nil.
+    itsNext = 0
+ ifTrue: [ last := itsPrev ]
+ ifFalse: [ prev at: itsNext put: itsPrev ].
+    itsPrev = 0
+ ifTrue: [ first := itsNext ]
+ ifFalse: [ next at: itsPrev put: itsNext ].
+
+    "... and put back into the free list."
+    next at: index put: firstFree.
+    firstFree := index].
+    ]
+
+    connections [
+ <category: 'accessing'>
+
+ | oc |
+ oc := OrderedCollection new.
+ mutex critical: [
+    self walk: [:each | oc add: each]].
+ ^oc
+    ]
+
+    walk: aBlock [
+ <category: 'private'>
+
+ "Must be called within mutex."
+ | i |
+ i := first.
+ [i = 0] whileFalse: [
+    aBlock value: (pool at: i).
+    i := next at: i]
+    ]
+
+    tryToReap: i [
+ "Start a process that will wait for connection i to finish
+ serving a response and, if no other connection has been reaped
+ yet, will forcibly close connection i."
+ <category: 'reaping'>
+
+ "Must be called within mutex."
+ (reapers at: i) signal.
+    ]
+
+    reapConnections: block [
+ <category: 'reaping'>
+
+ "Must be called within mutex."
+ | i sem prev |
+
+ "First the easy case, see if some Keep-Alive connection has
+ timed out."
+ self walk: [ :conn |
+    conn keepAliveTimeout ifTrue: [conn close. ^self]].
+
+ "Else wake up the reaper processes.  They will find the first
+ connection that finishes serving a response and forcibly close
+ it."
+ i := first.
+ sem := reapDone := Semaphore new.
+ [reapDone notNil and: [i > 0]] whileTrue: [
+    prev := i.
+    i := next at: i.
+    mutex signal.
+    self tryToReap: prev.
+    mutex wait].
+
+ "Wait for the processes to do their job if called because of
+ excessive loads."
+ block ifTrue: [sem wait].
+ reapDone := nil.
+    ]
+
+    tryToReapConnection [
+ "Look for dormient connections and kill one of them.  Return whether
+ one was found."
+ <category: 'reaping'>
+
+ ^mutex critical: [
+    self reapConnections: false.
+    firstFree > 0]
+    ]
+
+    reapConnection [
+ "Kill one dormient connection, possibly sleeping until one is found."
+ <category: 'reaping'>
+
+ self reapConnections: true
+    ]
+
+]
+
+
 Object subclass: SwazooServer [
-    | sites servers watchdog |
+    | connectionPool sites servers watchdog |
     
     <category: 'Swazoo-Core'>
     <comment: 'SwazooServer is where all begins in Swazoo!
@@ -650,7 +871,7 @@ SwazooServer demoStart  will create and run a demo site on http://localhost:8888
 
     SwazooServer class >> initSingleton [
  <category: 'private'>
- Singleton := super new
+ Singleton := super new initialize
     ]
 
     SwazooServer class >> initialize [
@@ -661,6 +882,11 @@ SwazooServer demoStart  will create and run a demo site on http://localhost:8888
     for: self singleton
     ]
 
+    SwazooServer class >> defaultConnectionPoolSize [
+ <category: 'parameters'>
+ ^15
+    ]
+
     SwazooServer class >> new [
  <category: 'private'>
  ^self shouldNotImplement
@@ -822,6 +1048,27 @@ SwazooServer demoStart  will create and run a demo site on http://localhost:8888
  ^self sites contains: [:each | each port = aNumber]
     ]
 
+    connectionPool [
+ <category: 'accessing'>
+ ^connectionPool
+    ]
+
+    connectionPoolSize [
+ <category: 'accessing'>
+ ^connectionPool size
+    ]
+
+    connectionPoolSize: anInteger [
+ | active |
+ active := self servers isNil
+    ifTrue: [ #() ]
+    ifFalse: [ self servers copy ].
+
+ active do: [ :each | each stop ].
+ connectionPool := HTTPConnectionPool new: anInteger.
+ active do: [ :each | each start ]
+    ]
+
     initServers [
  <category: 'initialize-release'>
  servers := Set new
@@ -834,6 +1081,7 @@ SwazooServer demoStart  will create and run a demo site on http://localhost:8888
 
     initialize [
  <category: 'initialize-release'>
+ self connectionPoolSize: self class defaultConnectionPoolSize.
  self initSites.
  self initServers
     ]
diff --git a/packages/swazoo-httpd/HTTP.st b/packages/swazoo-httpd/HTTP.st
index cc87f1a..50d98ad 100644
--- a/packages/swazoo-httpd/HTTP.st
+++ b/packages/swazoo-httpd/HTTP.st
@@ -30,14 +30,14 @@
 
 
 Object subclass: HTTPConnection [
-    | stream loop server task |
+    | stream loop id server task mutex |
     
     <category: 'Swazoo-HTTP'>
     <comment: nil>
 
     HTTPConnection class >> socket: aSocket [
  <category: 'instance creation'>
- ^self new stream: aSocket stream
+ ^self new initializeStream: aSocket stream
     ]
 
     close [
@@ -53,44 +53,49 @@ Object subclass: HTTPConnection [
  self loop: nil]
     ]
 
-    getAndDispatchMessages [
+    whenNotServingDo: aBlock [
+ mutex critical: aBlock
+    ]
+
+    getAndDispatchRequest [
  "^self
  The HTTPRequest is read from my socket stream.  I then pass this request to my server
  to get a response."
 
  <category: 'serving'>
- self stream anyDataReady
-    ifTrue:
- ["wait for data and if anything read, proceed"
+ self task: (SwazooTask newOn: self).
+ self readRequestFor: self task.
+ mutex critical: [
+    self isOpen ifFalse: [^self].
+    self produceResponseFor: self task].
 
- self task: (SwazooTask newOn: self).
- self readRequestFor: self task.
- self produceResponseFor: self task.
- self task request wantsConnectionClose ifTrue: [self close].
- self task request isHttp10 ifTrue: [self close] "well, we won't complicate here"]
-    ifFalse:
- [self keepAliveTimeout ifTrue: [^self close].
- (Delay forMilliseconds: 100) wait. "to finish sending, if any"
- self close]
+ self task request wantsConnectionClose ifTrue: [self close].
+ self task request isHttp10 ifTrue: [self close]
+    ]
+
+    anyDataReady [
+ "wait for data and if anything read, proceed"
+ | theStream |
+ theStream := self stream.
+ ^self isOpen and: [theStream anyDataReady]
     ]
 
-    interact [
+    interact: multiThreaded [
  "longer description is below method"
 
  <category: 'serving'>
  | interactionBlock |
  interactionBlock :=
- [
- [[[true] whileTrue:
- [self getAndDispatchMessages.
+ [[[[self anyDataReady] whileTrue:
+ [self getAndDispatchRequest.
  Processor yield]]
-    ifCurtailed:
+    ensure:
         [self close]]
     on: Error
     do: [:ex |
  (Delay forMilliseconds: 50) wait. "to finish sending, if any"
  self close]].
- self server isMultiThreading
+ multiThreaded
     ifTrue:
  [self loop: (interactionBlock forkAt: Processor userBackgroundPriority)]
     ifFalse: [interactionBlock value].
@@ -112,9 +117,10 @@ Object subclass: HTTPConnection [
  <category: 'testing'>
  | seconds |
  self task isNil ifTrue: [^false].
+ self task request isNil ifTrue: [^false].
  self task request isKeepAlive ifFalse: [^false].
  seconds := self task request keepAlive notNil
-    ifTrue: [self task request keepAlive asInteger - 10 "to be sure"]
+    ifTrue: [self task request keepAlive asInteger]
     ifFalse: [20]. "probably enough?"
  ^SpTimestamp now asSeconds - self task request timestamp asSeconds
     >= seconds
@@ -205,6 +211,16 @@ Object subclass: HTTPConnection [
  self close]
     ]
 
+    id [
+ <category: 'private'>
+ ^id
+    ]
+
+    id: anInteger [
+ <category: 'private'>
+ id := anInteger
+    ]
+
     server [
  <category: 'private'>
  ^server
@@ -225,8 +241,9 @@ Object subclass: HTTPConnection [
  ^stream
     ]
 
-    stream: aSwazooStream [
+    initializeStream: aSwazooStream [
  <category: 'private'>
+ mutex := Semaphore forMutualExclusion.
  stream := aSwazooStream
     ]
 
@@ -248,7 +265,7 @@ Object subclass: HTTPConnection [
 
 
 Object subclass: HTTPServer [
-    | ip port connections sites socket loop isMultiThreading |
+    | ip port sites socket loop isMultiThreading |
     
     <category: 'Swazoo-HTTP'>
     <comment: nil>
@@ -273,28 +290,18 @@ Object subclass: HTTPServer [
 
     acceptConnection [
  "^self
- I accept the next inbound TCP/IP connection.  The operating system libraries queue these up for me, so I can just handle one at a time.  I create an HTTPConnection instance to actually handle the interaction with the client - if I am in single threaded mode, the connection will completely handle the request before returning control to me, but in multi-threaded mode the connection forks the work into a sepparate thread in this image and control is immediately returned to me (the application programmer must worry about thread safety in this case."
+ I accept the next inbound TCP/IP connection.  The operating system libraries queue these up for me, so I can just handle one at a time.  I create an HTTPConnection instance to actually handle the interaction with the client - if I am in single threaded mode, the connection will completely handle the request before returning control to me, but in multi-threaded mode the connection forks the work into a separate thread in this image and control is immediately returned to me (the application programmer must worry about thread safety in this case."
 
- <category: 'private'>
- | clientConnection |
- clientConnection := SpExceptionContext
-    for: [HTTPConnection socket: self socket accept]
-    on: SpError
-    do:
- [:ex |
- Transcript
-    show: 'Socket accept error: ' , ex errorString;
-    cr.
- ^self].
- self addConnection: clientConnection.
- clientConnection interact.
- ^self
+ (SwazooServer singleton connectionPool acceptConnectionForServer: self)
+    interact: self isMultiThreading
     ]
 
-    addConnection: aConnection [
+    acceptConnectionFromSocket [
  <category: 'private'>
- self connections add: aConnection.
- aConnection server: self
+ | connection |
+ connection := HTTPConnection socket: self socket accept.
+ connection server: self.
+ ^connection
     ]
 
     addSite: aSite [
@@ -310,22 +317,11 @@ Object subclass: HTTPServer [
  ^response isNil ifTrue: [HTTPResponse notFound] ifFalse: [response]
     ]
 
-    connections [
- <category: 'private'>
- connections isNil ifTrue: [self initConnections].
- ^connections
-    ]
-
     hasNoSites [
  <category: 'sites'>
  ^self sites hasNoResources
     ]
 
-    initConnections [
- <category: 'private-initialize'>
- connections := OrderedCollection new
-    ]
-
     initSites [
  <category: 'private-initialize'>
  sites := ServerRootComposite new
@@ -333,7 +329,6 @@ Object subclass: HTTPServer [
 
     initialize [
  <category: 'private-initialize'>
- self initConnections.
  self initSites
     ]
 
@@ -390,7 +385,7 @@ Object subclass: HTTPServer [
 
     removeConnection: aConnection [
  <category: 'private'>
- self connections remove: aConnection ifAbsent: [nil]
+ SwazooServer singleton connectionPool removeConnection: aConnection
     ]
 
     removeSite: aSite [
@@ -426,7 +421,6 @@ Object subclass: HTTPServer [
 
     sites [
  <category: 'private'>
- sites isNil ifTrue: [self initSites].
  ^sites
     ]
 
@@ -463,7 +457,8 @@ Object subclass: HTTPServer [
  <category: 'start/stop'>
  self loop isNil
     ifFalse:
- [self connections copy do: [:each | each close].
+ [SwazooServer singleton connectionPool connections
+    do: [:each | each server == self ifTrue: [each close]].
  self loop terminate.
  self loop: nil.
  self socket close.
diff --git a/packages/swazoo-httpd/Messages.st b/packages/swazoo-httpd/Messages.st
index 25d1d0e..9b5fe25 100644
--- a/packages/swazoo-httpd/Messages.st
+++ b/packages/swazoo-httpd/Messages.st
@@ -343,10 +343,13 @@ HTTPMessage subclass: HTTPRequest [
 
     isKeepAlive [
  <category: 'testing'>
- | header |
+ | header result |
+ result := self isHttp10 not.
  header := self connection.
- header isNil ifTrue: [^false].
- ^'*Keep-Alive*' match: header
+ header isNil ifFalse: [
+    ('*Keep-Alive*' match: header) ifTrue: [ result := true ].
+    ('*Close*' match: header) ifTrue: [ result := false ] ].
+ ^result
     ]
 
     isOptions [
@@ -373,7 +376,7 @@ HTTPMessage subclass: HTTPRequest [
  "how many seconds a connection must be kept alive"
 
  <category: 'accessing-headers'>
- ^(self headers fieldNamed: 'KeepAlive' ifNone: [^nil]) value
+ ^(self headers fieldNamed: 'Keep-Alive' ifNone: [^nil]) value
     ]
 
     methodName [

------------------------------------------------------------------------------
_______________________________________________
Swazoo-devel mailing list
[hidden email]
https://lists.sourceforge.net/lists/listinfo/swazoo-devel
Loading...