[PATCH] More stream-to-stream protocol

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[PATCH] More stream-to-stream protocol

Paolo Bonzini-2
This patch adds #nextHunkPutAllOn: which is simple to implement and
provides a first example of stream-to-stream communication.  The Sockets
microbenchmarks are faster by up to 5x when they use it.

Next on the list, #next:putAllOn: and #nextAvailable:putAllOn:.  These
will speed up Swazoo too.

Actually, the file-serving loop in Swazoo is like this:

[[[rs atEnd] whileFalse: [aStream nextPutAll: (rs nextAvailable: 2000)]]
      ensure: [rs close]]

It would be much faster in GNU Smalltalk to do just "aStream nextPutAll:
rs".  After today's changes, it wouldn't create a single object. :-P

Janko, can you check if it is portable, and if so if it is also fast in
other dialects?

Paolo

implement #nextHunkPutAllOn:

2008-08-05  Paolo Bonzini  <[hidden email]>

        * kernel/FileDescr.st: Move setting atEnd to true into #pastEnd.
        Add #nextHunkPutAllOn:.  Remove duplicate #copyFrom:to: in #nextHunk.
        * kernel/FileStream.st: Add #nextHunkPutAllOn:.
        * kernel/Stream.st: Extract pieces of #nextPutAllOn: into
        #nextHunkPutAllOn:.

2008-08-05  Paolo Bonzini  <[hidden email]>

packages/sockets:
2008-08-05  Paolo Bonzini  <[hidden email]>

        * Buffers.st: Add #nextHunk and #nextHunkPutAllOn:.
        * Sockets.st: Remove the lookahead instance variable.  Delegate
        more stuff to the readBuffer, including #nextHunk.  Implement
        #nextHunkPutAllOn:.
        * Tests.st: Modify test to use #nextHunkPutAllOn: for a 3-4x
        speed increase. :-P :-P :-P

2008-08-05  Paolo Bonzini  <[hidden email]>

packages/zlib:
2008-08-05  Paolo Bonzini  <[hidden email]>

        * ZLibReadStream.st: Add #nextHunkPutAllOn:.
        * zlibtests.st: Test it.

diff --git a/NEWS b/NEWS
index 6dfc10d..3bbcce2 100644
--- a/NEWS
+++ b/NEWS
@@ -32,12 +32,6 @@ o   FileDescriptor and FileStream raise an exception if #next: cannot
 
 o   FileDescriptor is now a subclass of Stream.
 
-o   ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
-    the instance of GNU Smalltalk that produced the snapshot, and
-    true in the instance of GNU Smalltalk that was restored from the
-    snapshot.  Note that this does not apply to CallinProcesses, since
-    those are stopped in saved images (will this be true in 3.1?).
-
 o   If possible, the installation is made relocatable.  To this end,
     the following conditions should be satisfied: 1) the exec-prefix
     and prefix should be identical; 2) the installation should reside
@@ -74,9 +68,18 @@ o   It is possible to create C call-outs that are not attached
     override the #link method (the existing CFunctionDescriptor class
     is now implemented on top of this).
 
+o   A new method #nextHunkPutAllOn: allows to copy from stream to stream
+    while minimizing the number of allocated objects.
+
 o   ObjectDumper now accepts normal String streams.  The class ByteStream
     has been removed.
 
+o   ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
+    the instance of GNU Smalltalk that produced the snapshot, and
+    true in the instance of GNU Smalltalk that was restored from the
+    snapshot.  Note that this does not apply to CallinProcesses, since
+    those are stopped in saved images (will this be true in 3.1?).
+
 o   The VFS subsystem was rewritten.  Virtual filesystems are now
     accessible via special methods on File (such as File>>#zip,
     for example "(File name: 'abc.zip') zip") and not anymore with
diff --git a/kernel/FileDescr.st b/kernel/FileDescr.st
index b6576a3..340b6e6 100644
--- a/kernel/FileDescr.st
+++ b/kernel/FileDescr.st
@@ -346,7 +346,7 @@ do arbitrary processing on the files.'>
  data := data at: 1].
  ^result > 0
     ifTrue: [data]
-    ifFalse: [atEnd := true.  self pastEnd]
+    ifFalse: [self pastEnd]
     ]
 
     peekFor: anObject [
@@ -371,9 +371,7 @@ do arbitrary processing on the files.'>
  result := self read: data from: 1 to: 1.
  ^result > 0
     ifTrue: [peek := data at: 1]
-    ifFalse:
- [atEnd := true.
- self pastEnd]
+    ifFalse: [self pastEnd]
     ]
 
     nextByte [
@@ -817,6 +815,16 @@ do arbitrary processing on the files.'>
  ^true
     ]
 
+    nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to aStream."
+
+ <category: 'low-level access'>
+ | count coll |
+ count := self read: (coll := self species new: 1024).
+ count = 0 ifTrue: [^self pastEnd].
+ aStream next: count putAll: coll startingAt: 1
+    ]
+
     nextHunk [
  "Answer the next buffers worth of stuff in the Stream represented
  by the receiver.  Do at most one actual input operation."
@@ -825,11 +833,16 @@ do arbitrary processing on the files.'>
  | count answer |
  count := self read: (answer := self species new: 1024).
  count < answer size ifTrue: [answer := answer copyFrom: 1 to: count].
- count = 0
-    ifTrue:
- [atEnd := true.
- ^self pastEnd].
- ^answer copyFrom: 1 to: count
+ count = 0 ifTrue: [^self pastEnd].
+ ^answer
+    ]
+
+    pastEnd [
+        "The end of the stream has been reached.  Signal a Notification."
+
+        <category: 'polymorphism'>
+ atEnd := true.
+ super pastEnd
     ]
 
     read: byteArray [
diff --git a/kernel/FileStream.st b/kernel/FileStream.st
index 82ac553..1156e16 100644
--- a/kernel/FileStream.st
+++ b/kernel/FileStream.st
@@ -611,6 +611,19 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
  self flush
     ]
 
+    nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to
+ aStream.  For n consecutive calls to this method, we do
+ n - 1 or n actual input operation."
+
+ <category: 'buffering'>
+ writePtr notNil ifTrue: [self flush].
+ (ptr > endPtr or: [endPtr < collection size]) ifTrue: [self fill].
+ ptr > endPtr ifTrue: [^self pastEnd].
+ aStream next: endPtr - ptr + 1 putAll: collection startingAt: ptr.
+ ptr := endPtr + 1.
+    ]
+
     nextHunk [
  "Answer the next buffers worth of stuff in the Stream represented
  by the receiver.  For n consecutive calls to this method, we do
diff --git a/kernel/Stream.st b/kernel/Stream.st
index 7658610..77facd7 100644
--- a/kernel/Stream.st
+++ b/kernel/Stream.st
@@ -232,13 +232,8 @@ provide for writing collections sequentially.'>
     nextPutAllOn: aStream [
         "Write all the objects in the receiver to aStream"
 
- | coll |
  [self atEnd] whileFalse:
- [coll := self nextHunk.
- aStream
-    next: coll size
-    putAll: coll
-    startingAt: 1].
+ [self nextHunkPutAllOn: aStream].
     ]
 
     next: anInteger put: anObject [
@@ -507,6 +502,23 @@ provide for writing collections sequentially.'>
  repeat
     ]
 
+    nextHunkPutAllOn: aStream [
+ "Copy to aStream a more-or-less arbitrary amount of data.  When used
+ on files, this does at most one I/O operation.  For other kinds of
+ stream, the definition may vary.  This method is used by the VM
+ when loading data from a Smalltalk stream, and by various kind
+ of Stream decorators supplied with GNU Smalltalk (including
+ zlib streams).  Subclasses that implement nextHunk can implement
+ this method to avoid useless work."
+
+ | coll |
+ coll := self nextHunk.
+ aStream
+    next: coll size
+    putAll: coll
+    startingAt: 1
+    ]
+
     nextHunk [
  "Answer a more-or-less arbitrary amount of data.  When used on files, this
  does at most one I/O operation.  For other kinds of stream, the definition
diff --git a/packages/sockets/Buffers.st b/packages/sockets/Buffers.st
index ebdd47b..b822b96 100644
--- a/packages/sockets/Buffers.st
+++ b/packages/sockets/Buffers.st
@@ -120,6 +120,28 @@ evaluates an user defined block to try to get some more data.'>
  ^contents
     ]
 
+    nextHunkPutAllOn: aStream [
+ "Copy a buffer's worth of data from the receiver to aStream, doing
+ at most one call to the fill block."
+
+ <category: 'buffer handling'>
+ self atEnd ifTrue: [^super pastEnd].
+ aStream next: endPtr - ptr + 1 putAll: self collection startingAt: ptr.
+ endPtr := ptr - 1. "Empty the buffer"
+    ]
+
+    nextHunk [
+ "Answer a buffer's worth of data, doing at most one call
+ to the fill block."
+
+ <category: 'buffer handling'>
+ | contents |
+ self atEnd ifTrue: [^super pastEnd].
+ contents := self collection copyFrom: ptr to: endPtr.
+ endPtr := ptr - 1. "Empty the buffer"
+ ^contents
+    ]
+
     availableBytes [
         "Answer how many bytes are available in the buffer."
 
diff --git a/packages/sockets/Sockets.st b/packages/sockets/Sockets.st
index 02cd87e..b2c11f2 100644
--- a/packages/sockets/Sockets.st
+++ b/packages/sockets/Sockets.st
@@ -1013,7 +1013,7 @@ this class simply redirect their calls to an implementation class.'>
 
 
 AbstractSocket subclass: StreamSocket [
-    | lookahead peerDead readBuffer outOfBand |
+    | peerDead readBuffer outOfBand |
     
     <category: 'Sockets-Streams'>
     <comment: '
@@ -1163,25 +1163,17 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  or from the operating system."
 
  <category: 'stream protocol'>
- | lookaheadBytes |
- lookaheadBytes := lookahead isNil ifTrue: [ 0 ] ifFalse: [ 1 ].
- self canRead ifFalse: [ ^lookaheadBytes ].
+ self canRead ifFalse: [ ^0 ].
  self readBuffer isEmpty ifTrue: [ self readBuffer fill ].
- ^lookaheadBytes + self readBuffer availableBytes
+ ^self readBuffer availableBytes
     ]
 
     bufferContents [
  "Answer the current contents of the read buffer"
 
  <category: 'stream protocol'>
- | result |
  readBuffer isNil ifTrue: [^self pastEnd].
- result := self readBuffer bufferContents.
- lookahead isNil
-    ifFalse:
- [result := lookahead asString , result.
- lookahead := nil].
- ^result
+ ^self readBuffer bufferContents
     ]
 
     close [
@@ -1212,11 +1204,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  Smalltalk Processes."
 
  <category: 'stream protocol'>
- | result |
- lookahead isNil ifTrue: [^self readBuffer next].
- result := lookahead.
- lookahead := nil.
- ^result
+ readBuffer isNil ifTrue: [^self pastEnd].
+ ^self readBuffer next
     ]
 
     nextAvailable: anInteger [
@@ -1225,6 +1214,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
 
         <category: 'accessing-reading'>
  | buffer available stream |
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+
  self ensureReadable.
  available := self availableBytes.
  available >= anInteger ifTrue: [ ^self next: anInteger ].
@@ -1238,11 +1229,22 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  needed."
  stream := WriteStream with: buffer.
  [ (available := self availableBytes min: anInteger - stream size) > 0 ]
-    whileTrue: [ stream nextPutAll: (self next: available) ].
+    whileTrue: [ stream nextPutAll: (self readBuffer next: available) ].
 
  ^stream contents
     ]
 
+    nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to aStream.
+ Do at most one actual input operation."
+
+ <category: 'stream protocol'>
+ "Ensure that the buffer is full"
+
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+ self readBuffer nextHunkPutAllOn: aStream
+    ]
+
     nextHunk [
  "Answer the next buffers worth of stuff in the Stream represented
  by the receiver.  Do at most one actual input operation."
@@ -1250,8 +1252,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  "Ensure that the buffer is full"
 
  <category: 'stream protocol'>
- self peek.
- ^self bufferContents
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+ ^self readBuffer nextHunk
     ]
 
     next: count [
@@ -1260,10 +1262,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
 
  <category: 'stream protocol'>
  | result |
- lookahead isNil ifTrue: [^self readBuffer next: count].
- result := (String with: lookahead) , (self readBuffer next: count - 1).
- lookahead := nil.
- ^result
+ readBuffer isNil ifTrue: [ ^self pastEnd ].
+ ^self readBuffer next: count
     ]
 
     peek [
@@ -1272,12 +1272,9 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  Smalltalk Processes."
 
  <category: 'stream protocol'>
- lookahead isNil
-    ifTrue:
- [self readBuffer isNil ifTrue: [^nil].
- self readBuffer atEnd ifTrue: [^nil].
- lookahead := self readBuffer next].
- ^lookahead
+ self readBuffer isNil ifTrue: [^nil].
+ self readBuffer atEnd ifTrue: [^nil].
+ ^self readBuffer peek
     ]
 
     peekFor: anObject [
@@ -1286,16 +1283,9 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  control to other Smalltalk Processes."
 
  <category: 'stream protocol'>
- lookahead isNil
-    ifTrue:
- [self readBuffer isNil ifTrue: [^false].
- self readBuffer atEnd ifTrue: [^false].
- lookahead := self readBuffer next].
- ^lookahead = anObject
-    ifTrue:
- [lookahead := nil.
- true]
-    ifFalse: [false]
+ self readBuffer isNil ifTrue: [^false].
+ self readBuffer atEnd ifTrue: [^false].
+ ^self readBuffer peekFor: anObject
     ]
 
     readBufferSize: size [
diff --git a/packages/sockets/Tests.st b/packages/sockets/Tests.st
index 29101d1..d11f9c5 100644
--- a/packages/sockets/Tests.st
+++ b/packages/sockets/Tests.st
@@ -1,3 +1,14 @@
+Stream subclass: DummyStream [
+    <category: 'Sockets-Tests'>
+
+    | n |
+    DummyStream class >> new [ ^super new initialize ]
+    initialize [ n := 0 ]
+    nextPut: anObject [ n := n + 1 ]
+    next: anInteger putAll: aCollection startingAt: pos [ n := n + anInteger ]
+    size [ ^n ]
+]
+
 Socket class extend [
 
     microTest [
@@ -65,7 +76,8 @@ Socket class extend [
  output buffer sizes, and the address class (family) to use."
 
  <category: 'tests'>
- | queue server client bytesToSend sendBuf bytesSent bytesReceived t extraBytes timeout process |
+ | queue server client bytesToSend sendBuf bytesSent bytesReceived
+  t extraBytes timeout process recvBuf |
  Transcript
     cr;
     show: 'starting loopback test';
@@ -94,13 +106,15 @@ Socket class extend [
     cr.
  bytesToSend := 5000000.
  sendBuf := String new: 4000 withAll: $x.
+ recvBuf := DummyStream new.
  bytesSent := bytesReceived := 0.
  t := Time millisecondsToRun:
  [
  [server nextPutAll: sendBuf.
  bytesSent := bytesSent + sendBuf size.
  [client canRead] whileTrue:
- [bytesReceived := bytesReceived + client nextHunk size].
+ [client nextHunkPutAllOn: recvBuf.
+ bytesReceived := recvBuf size].
  bytesSent >= bytesToSend and: [bytesReceived = bytesSent]]
  whileFalse].
  Transcript
@@ -173,11 +187,14 @@ Socket class extend [
  sema signal]
  fork.
  consumer :=
- [queueReady wait.
+ [| recvBuf |
+ recvBuf := DummyStream new.
+ queueReady wait.
  client := Socket remote: queue localAddress port: (self testPortFor: addressClass).
 
  [[client canRead] whileTrue:
- [bytesReceived := bytesReceived + client nextHunk size].
+ [client nextHunkPutAllOn: recvBuf.
+ bytesReceived := recvBuf size].
  bytesSent >= bytesToSend and: [bytesReceived = bytesSent]]
  whileFalse: [Processor yield].
  sema signal]
diff --git a/packages/zlib/ZLibReadStream.st b/packages/zlib/ZLibReadStream.st
index 5e03ff5..17fbea5 100644
--- a/packages/zlib/ZLibReadStream.st
+++ b/packages/zlib/ZLibReadStream.st
@@ -70,6 +70,19 @@ used for communication with zlib.'>
  ^result
     ]
 
+    nextHunkPutAllOn: aStream [
+ "Copy the next buffers worth of stuff from the receiver to
+ aStream.  Do at most one actual compression/decompression
+ operation."
+
+ <category: 'streaming'>
+ | result |
+ self atEnd ifTrue: [^self pastEnd].
+ aStream next: endPtr - ptr putAll: outBytes startingAt: ptr + 1.
+ ptr := endPtr.
+ ^result
+    ]
+
     nextHunk [
  "Answer the next buffers worth of stuff in the Stream represented
  by the receiver.  Do at most one actual compression/decompression
diff --git a/packages/zlib/zlibtests.st b/packages/zlib/zlibtests.st
index a6bb592..5309368 100644
--- a/packages/zlib/zlibtests.st
+++ b/packages/zlib/zlibtests.st
@@ -166,6 +166,17 @@ TestCase subclass: ZlibStreamTest [
  self assertFooVector: data
     ]
 
+    testNextHunkPutAllOn [
+ "Test accessing data with nextHunkPutAllOn."
+
+ <category: 'testing'>
+ | stream data |
+ stream := InflateStream on: self doDeflate readStream.
+ data := String new writeStream.
+ [stream atEnd] whileFalse: [stream nextHunkPutAllOn: data].
+ self assertFooVector: data contents
+    ]
+
     testRandomAccess [
  "Test random access to deflated data."
 

_______________________________________________
help-smalltalk mailing list
[hidden email]
http://lists.gnu.org/mailman/listinfo/help-smalltalk