This patch adds #nextHunkPutAllOn: which is simple to implement and
provides a first example of stream-to-stream communication. The Sockets microbenchmarks are faster by up to 5x when they use it. Next on the list, #next:putAllOn: and #nextAvailable:putAllOn:. These will speed up Swazoo too. Actually, the file-serving loop in Swazoo is like this: [[[rs atEnd] whileFalse: [aStream nextPutAll: (rs nextAvailable: 2000)]] ensure: [rs close]] It would be much faster in GNU Smalltalk to do just "aStream nextPutAll: rs". After today's changes, it wouldn't create a single object. :-P Janko, can you check if it is portable, and if so if it is also fast in other dialects? Paolo implement #nextHunkPutAllOn: 2008-08-05 Paolo Bonzini <[hidden email]> * kernel/FileDescr.st: Move setting atEnd to true into #pastEnd. Add #nextHunkPutAllOn:. Remove duplicate #copyFrom:to: in #nextHunk. * kernel/FileStream.st: Add #nextHunkPutAllOn:. * kernel/Stream.st: Extract pieces of #nextPutAllOn: into #nextHunkPutAllOn:. 2008-08-05 Paolo Bonzini <[hidden email]> packages/sockets: 2008-08-05 Paolo Bonzini <[hidden email]> * Buffers.st: Add #nextHunk and #nextHunkPutAllOn:. * Sockets.st: Remove the lookahead instance variable. Delegate more stuff to the readBuffer, including #nextHunk. Implement #nextHunkPutAllOn:. * Tests.st: Modify test to use #nextHunkPutAllOn: for a 3-4x speed increase. :-P :-P :-P 2008-08-05 Paolo Bonzini <[hidden email]> packages/zlib: 2008-08-05 Paolo Bonzini <[hidden email]> * ZLibReadStream.st: Add #nextHunkPutAllOn:. * zlibtests.st: Test it. diff --git a/NEWS b/NEWS index 6dfc10d..3bbcce2 100644 --- a/NEWS +++ b/NEWS @@ -32,12 +32,6 @@ o FileDescriptor and FileStream raise an exception if #next: cannot o FileDescriptor is now a subclass of Stream. -o ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in - the instance of GNU Smalltalk that produced the snapshot, and - true in the instance of GNU Smalltalk that was restored from the - snapshot. Note that this does not apply to CallinProcesses, since - those are stopped in saved images (will this be true in 3.1?). - o If possible, the installation is made relocatable. To this end, the following conditions should be satisfied: 1) the exec-prefix and prefix should be identical; 2) the installation should reside @@ -74,9 +68,18 @@ o It is possible to create C call-outs that are not attached override the #link method (the existing CFunctionDescriptor class is now implemented on top of this). +o A new method #nextHunkPutAllOn: allows to copy from stream to stream + while minimizing the number of allocated objects. + o ObjectDumper now accepts normal String streams. The class ByteStream has been removed. +o ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in + the instance of GNU Smalltalk that produced the snapshot, and + true in the instance of GNU Smalltalk that was restored from the + snapshot. Note that this does not apply to CallinProcesses, since + those are stopped in saved images (will this be true in 3.1?). + o The VFS subsystem was rewritten. Virtual filesystems are now accessible via special methods on File (such as File>>#zip, for example "(File name: 'abc.zip') zip") and not anymore with diff --git a/kernel/FileDescr.st b/kernel/FileDescr.st index b6576a3..340b6e6 100644 --- a/kernel/FileDescr.st +++ b/kernel/FileDescr.st @@ -346,7 +346,7 @@ do arbitrary processing on the files.'> data := data at: 1]. ^result > 0 ifTrue: [data] - ifFalse: [atEnd := true. self pastEnd] + ifFalse: [self pastEnd] ] peekFor: anObject [ @@ -371,9 +371,7 @@ do arbitrary processing on the files.'> result := self read: data from: 1 to: 1. ^result > 0 ifTrue: [peek := data at: 1] - ifFalse: - [atEnd := true. - self pastEnd] + ifFalse: [self pastEnd] ] nextByte [ @@ -817,6 +815,16 @@ do arbitrary processing on the files.'> ^true ] + nextHunkPutAllOn: aStream [ + "Copy the next buffers worth of stuff from the receiver to aStream." + + <category: 'low-level access'> + | count coll | + count := self read: (coll := self species new: 1024). + count = 0 ifTrue: [^self pastEnd]. + aStream next: count putAll: coll startingAt: 1 + ] + nextHunk [ "Answer the next buffers worth of stuff in the Stream represented by the receiver. Do at most one actual input operation." @@ -825,11 +833,16 @@ do arbitrary processing on the files.'> | count answer | count := self read: (answer := self species new: 1024). count < answer size ifTrue: [answer := answer copyFrom: 1 to: count]. - count = 0 - ifTrue: - [atEnd := true. - ^self pastEnd]. - ^answer copyFrom: 1 to: count + count = 0 ifTrue: [^self pastEnd]. + ^answer + ] + + pastEnd [ + "The end of the stream has been reached. Signal a Notification." + + <category: 'polymorphism'> + atEnd := true. + super pastEnd ] read: byteArray [ diff --git a/kernel/FileStream.st b/kernel/FileStream.st index 82ac553..1156e16 100644 --- a/kernel/FileStream.st +++ b/kernel/FileStream.st @@ -611,6 +611,19 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'> self flush ] + nextHunkPutAllOn: aStream [ + "Copy the next buffers worth of stuff from the receiver to + aStream. For n consecutive calls to this method, we do + n - 1 or n actual input operation." + + <category: 'buffering'> + writePtr notNil ifTrue: [self flush]. + (ptr > endPtr or: [endPtr < collection size]) ifTrue: [self fill]. + ptr > endPtr ifTrue: [^self pastEnd]. + aStream next: endPtr - ptr + 1 putAll: collection startingAt: ptr. + ptr := endPtr + 1. + ] + nextHunk [ "Answer the next buffers worth of stuff in the Stream represented by the receiver. For n consecutive calls to this method, we do diff --git a/kernel/Stream.st b/kernel/Stream.st index 7658610..77facd7 100644 --- a/kernel/Stream.st +++ b/kernel/Stream.st @@ -232,13 +232,8 @@ provide for writing collections sequentially.'> nextPutAllOn: aStream [ "Write all the objects in the receiver to aStream" - | coll | [self atEnd] whileFalse: - [coll := self nextHunk. - aStream - next: coll size - putAll: coll - startingAt: 1]. + [self nextHunkPutAllOn: aStream]. ] next: anInteger put: anObject [ @@ -507,6 +502,23 @@ provide for writing collections sequentially.'> repeat ] + nextHunkPutAllOn: aStream [ + "Copy to aStream a more-or-less arbitrary amount of data. When used + on files, this does at most one I/O operation. For other kinds of + stream, the definition may vary. This method is used by the VM + when loading data from a Smalltalk stream, and by various kind + of Stream decorators supplied with GNU Smalltalk (including + zlib streams). Subclasses that implement nextHunk can implement + this method to avoid useless work." + + | coll | + coll := self nextHunk. + aStream + next: coll size + putAll: coll + startingAt: 1 + ] + nextHunk [ "Answer a more-or-less arbitrary amount of data. When used on files, this does at most one I/O operation. For other kinds of stream, the definition diff --git a/packages/sockets/Buffers.st b/packages/sockets/Buffers.st index ebdd47b..b822b96 100644 --- a/packages/sockets/Buffers.st +++ b/packages/sockets/Buffers.st @@ -120,6 +120,28 @@ evaluates an user defined block to try to get some more data.'> ^contents ] + nextHunkPutAllOn: aStream [ + "Copy a buffer's worth of data from the receiver to aStream, doing + at most one call to the fill block." + + <category: 'buffer handling'> + self atEnd ifTrue: [^super pastEnd]. + aStream next: endPtr - ptr + 1 putAll: self collection startingAt: ptr. + endPtr := ptr - 1. "Empty the buffer" + ] + + nextHunk [ + "Answer a buffer's worth of data, doing at most one call + to the fill block." + + <category: 'buffer handling'> + | contents | + self atEnd ifTrue: [^super pastEnd]. + contents := self collection copyFrom: ptr to: endPtr. + endPtr := ptr - 1. "Empty the buffer" + ^contents + ] + availableBytes [ "Answer how many bytes are available in the buffer." diff --git a/packages/sockets/Sockets.st b/packages/sockets/Sockets.st index 02cd87e..b2c11f2 100644 --- a/packages/sockets/Sockets.st +++ b/packages/sockets/Sockets.st @@ -1013,7 +1013,7 @@ this class simply redirect their calls to an implementation class.'> AbstractSocket subclass: StreamSocket [ - | lookahead peerDead readBuffer outOfBand | + | peerDead readBuffer outOfBand | <category: 'Sockets-Streams'> <comment: ' @@ -1163,25 +1163,17 @@ This class adds a read buffer to the basic model of AbstractSocket.'> or from the operating system." <category: 'stream protocol'> - | lookaheadBytes | - lookaheadBytes := lookahead isNil ifTrue: [ 0 ] ifFalse: [ 1 ]. - self canRead ifFalse: [ ^lookaheadBytes ]. + self canRead ifFalse: [ ^0 ]. self readBuffer isEmpty ifTrue: [ self readBuffer fill ]. - ^lookaheadBytes + self readBuffer availableBytes + ^self readBuffer availableBytes ] bufferContents [ "Answer the current contents of the read buffer" <category: 'stream protocol'> - | result | readBuffer isNil ifTrue: [^self pastEnd]. - result := self readBuffer bufferContents. - lookahead isNil - ifFalse: - [result := lookahead asString , result. - lookahead := nil]. - ^result + ^self readBuffer bufferContents ] close [ @@ -1212,11 +1204,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'> Smalltalk Processes." <category: 'stream protocol'> - | result | - lookahead isNil ifTrue: [^self readBuffer next]. - result := lookahead. - lookahead := nil. - ^result + readBuffer isNil ifTrue: [^self pastEnd]. + ^self readBuffer next ] nextAvailable: anInteger [ @@ -1225,6 +1214,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'> <category: 'accessing-reading'> | buffer available stream | + readBuffer isNil ifTrue: [ ^self pastEnd ]. + self ensureReadable. available := self availableBytes. available >= anInteger ifTrue: [ ^self next: anInteger ]. @@ -1238,11 +1229,22 @@ This class adds a read buffer to the basic model of AbstractSocket.'> needed." stream := WriteStream with: buffer. [ (available := self availableBytes min: anInteger - stream size) > 0 ] - whileTrue: [ stream nextPutAll: (self next: available) ]. + whileTrue: [ stream nextPutAll: (self readBuffer next: available) ]. ^stream contents ] + nextHunkPutAllOn: aStream [ + "Copy the next buffers worth of stuff from the receiver to aStream. + Do at most one actual input operation." + + <category: 'stream protocol'> + "Ensure that the buffer is full" + + readBuffer isNil ifTrue: [ ^self pastEnd ]. + self readBuffer nextHunkPutAllOn: aStream + ] + nextHunk [ "Answer the next buffers worth of stuff in the Stream represented by the receiver. Do at most one actual input operation." @@ -1250,8 +1252,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'> "Ensure that the buffer is full" <category: 'stream protocol'> - self peek. - ^self bufferContents + readBuffer isNil ifTrue: [ ^self pastEnd ]. + ^self readBuffer nextHunk ] next: count [ @@ -1260,10 +1262,8 @@ This class adds a read buffer to the basic model of AbstractSocket.'> <category: 'stream protocol'> | result | - lookahead isNil ifTrue: [^self readBuffer next: count]. - result := (String with: lookahead) , (self readBuffer next: count - 1). - lookahead := nil. - ^result + readBuffer isNil ifTrue: [ ^self pastEnd ]. + ^self readBuffer next: count ] peek [ @@ -1272,12 +1272,9 @@ This class adds a read buffer to the basic model of AbstractSocket.'> Smalltalk Processes." <category: 'stream protocol'> - lookahead isNil - ifTrue: - [self readBuffer isNil ifTrue: [^nil]. - self readBuffer atEnd ifTrue: [^nil]. - lookahead := self readBuffer next]. - ^lookahead + self readBuffer isNil ifTrue: [^nil]. + self readBuffer atEnd ifTrue: [^nil]. + ^self readBuffer peek ] peekFor: anObject [ @@ -1286,16 +1283,9 @@ This class adds a read buffer to the basic model of AbstractSocket.'> control to other Smalltalk Processes." <category: 'stream protocol'> - lookahead isNil - ifTrue: - [self readBuffer isNil ifTrue: [^false]. - self readBuffer atEnd ifTrue: [^false]. - lookahead := self readBuffer next]. - ^lookahead = anObject - ifTrue: - [lookahead := nil. - true] - ifFalse: [false] + self readBuffer isNil ifTrue: [^false]. + self readBuffer atEnd ifTrue: [^false]. + ^self readBuffer peekFor: anObject ] readBufferSize: size [ diff --git a/packages/sockets/Tests.st b/packages/sockets/Tests.st index 29101d1..d11f9c5 100644 --- a/packages/sockets/Tests.st +++ b/packages/sockets/Tests.st @@ -1,3 +1,14 @@ +Stream subclass: DummyStream [ + <category: 'Sockets-Tests'> + + | n | + DummyStream class >> new [ ^super new initialize ] + initialize [ n := 0 ] + nextPut: anObject [ n := n + 1 ] + next: anInteger putAll: aCollection startingAt: pos [ n := n + anInteger ] + size [ ^n ] +] + Socket class extend [ microTest [ @@ -65,7 +76,8 @@ Socket class extend [ output buffer sizes, and the address class (family) to use." <category: 'tests'> - | queue server client bytesToSend sendBuf bytesSent bytesReceived t extraBytes timeout process | + | queue server client bytesToSend sendBuf bytesSent bytesReceived + t extraBytes timeout process recvBuf | Transcript cr; show: 'starting loopback test'; @@ -94,13 +106,15 @@ Socket class extend [ cr. bytesToSend := 5000000. sendBuf := String new: 4000 withAll: $x. + recvBuf := DummyStream new. bytesSent := bytesReceived := 0. t := Time millisecondsToRun: [ [server nextPutAll: sendBuf. bytesSent := bytesSent + sendBuf size. [client canRead] whileTrue: - [bytesReceived := bytesReceived + client nextHunk size]. + [client nextHunkPutAllOn: recvBuf. + bytesReceived := recvBuf size]. bytesSent >= bytesToSend and: [bytesReceived = bytesSent]] whileFalse]. Transcript @@ -173,11 +187,14 @@ Socket class extend [ sema signal] fork. consumer := - [queueReady wait. + [| recvBuf | + recvBuf := DummyStream new. + queueReady wait. client := Socket remote: queue localAddress port: (self testPortFor: addressClass). [[client canRead] whileTrue: - [bytesReceived := bytesReceived + client nextHunk size]. + [client nextHunkPutAllOn: recvBuf. + bytesReceived := recvBuf size]. bytesSent >= bytesToSend and: [bytesReceived = bytesSent]] whileFalse: [Processor yield]. sema signal] diff --git a/packages/zlib/ZLibReadStream.st b/packages/zlib/ZLibReadStream.st index 5e03ff5..17fbea5 100644 --- a/packages/zlib/ZLibReadStream.st +++ b/packages/zlib/ZLibReadStream.st @@ -70,6 +70,19 @@ used for communication with zlib.'> ^result ] + nextHunkPutAllOn: aStream [ + "Copy the next buffers worth of stuff from the receiver to + aStream. Do at most one actual compression/decompression + operation." + + <category: 'streaming'> + | result | + self atEnd ifTrue: [^self pastEnd]. + aStream next: endPtr - ptr putAll: outBytes startingAt: ptr + 1. + ptr := endPtr. + ^result + ] + nextHunk [ "Answer the next buffers worth of stuff in the Stream represented by the receiver. Do at most one actual compression/decompression diff --git a/packages/zlib/zlibtests.st b/packages/zlib/zlibtests.st index a6bb592..5309368 100644 --- a/packages/zlib/zlibtests.st +++ b/packages/zlib/zlibtests.st @@ -166,6 +166,17 @@ TestCase subclass: ZlibStreamTest [ self assertFooVector: data ] + testNextHunkPutAllOn [ + "Test accessing data with nextHunkPutAllOn." + + <category: 'testing'> + | stream data | + stream := InflateStream on: self doDeflate readStream. + data := String new writeStream. + [stream atEnd] whileFalse: [stream nextHunkPutAllOn: data]. + self assertFooVector: data contents + ] + testRandomAccess [ "Test random access to deflated data." _______________________________________________ help-smalltalk mailing list [hidden email] http://lists.gnu.org/mailman/listinfo/help-smalltalk |
Free forum by Nabble | Edit this page |