[PATCH] Provide optimized #next: and #nextAvailable: for sockets

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[PATCH] Provide optimized #next: and #nextAvailable: for sockets

Paolo Bonzini-2
This is a first step towards limiting the huge number of copies and
object creations that happen in Sockets.

The next step will be the creation of methods to directly push a stream
to another stream -- such as #nextPutAllOn:, #next:putAllOn: and
#nextAvailable:putAllOn:.

Paolo

diff --git a/ChangeLog b/ChangeLog
index b7cbe6c..fdc74fd 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2008-08-04  Paolo Bonzini  <[hidden email]>
+
+ * kernel/FileDescr.st: Reimplement #next:, move previous implementation
+ to #nextAvailable:.
+ * kernel/FileStream.st: Modify #next:into: to fail if the given number
+ of bytes cannot be read, implement #nextAvailable:.
+ * kernel/Stream.st: Document #nextAvailable: better.
+
 2008-08-01  Paolo Bonzini  <[hidden email]>
 
  * kernel/Stream.st: Fix #do: and #linesDo: to check for the
diff --git a/NEWS b/NEWS
index 6b3646e..36298c6 100644
--- a/NEWS
+++ b/NEWS
@@ -20,6 +20,11 @@ o   CObjects can be backed with garbage-collected (as opposed to
 
 o   Error backtraces include line numbers and filenames.
 
+o   FileDescriptor and FileStream raise an exception if #next: cannot
+    return the given number of bytes.  They also implement #nextAvailable:
+    which is similar to #nextHunk but returns at most the number of bytes
+    given by the argument.
+
 o   ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
     the instance of GNU Smalltalk that produced the snapshot, and
     true in the instance of GNU Smalltalk that was restored from the
diff --git a/kernel/FileDescr.st b/kernel/FileDescr.st
index f7ee5e5..012c28a 100644
--- a/kernel/FileDescr.st
+++ b/kernel/FileDescr.st
@@ -691,10 +691,25 @@ do arbitrary processing on the files.'>
  "Return the next 'anInteger' characters from the stream, as a String."
 
  <category: 'overriding inherited methods'>
- | result n |
+ | result read |
+ result := self species new: anInteger.
+ read := 0.
+ [ read = anInteger ] whileFalse: [
+            self atEnd ifTrue: [
+                ^SystemExceptions.NotEnoughElements signalOn: anInteger - read].
+            read := read + (self read: result from: read + 1 to: anInteger).
+        ].
+ ^result
+    ]
+
+    nextAvailable: anInteger [
+        "Return up to anInteger objects in the receiver, stopping if
+         the end of the stream is reached"
+
+        <category: 'accessing-reading'>
+        | result n |
  result := self species new: anInteger.
  n := self read: result.
- n = 0 ifTrue: [atEnd := true].
  ^n < anInteger ifTrue: [result copyFrom: 1 to: n] ifFalse: [result]
     ]
 
diff --git a/kernel/FileStream.st b/kernel/FileStream.st
index cbbbbb3..5da19fe 100644
--- a/kernel/FileStream.st
+++ b/kernel/FileStream.st
@@ -500,6 +500,23 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
  ^self next: anInteger into: (self species new: anInteger)
     ]
 
+    nextAvailable: anInteger [
+ "Private - Read up to anInteger bytes from the stream and store them
+ into answer.  Return `answer' itself, or raise an exception if we
+ could not read the full amount of data."
+
+ <category: 'buffering'>
+ | answer last |
+ writePtr notNil ifTrue: [self flush].
+ ptr > endPtr ifTrue: [self fill].
+
+ "Fetch data from the buffer, without doing more than one I/O operation."
+ last := endPtr min: ptr + anInteger - 1.
+ answer := collection copyFrom: ptr to: last.
+ ptr := ptr + answer size.
+ ^answer
+    ]
+
     atEnd [
  "Answer whether data has come to an end"
 
@@ -620,8 +637,8 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
 
     next: anInteger into: answer [
  "Private - Read up to anInteger bytes from the stream and store them
- into answer.  Return `answer' itself, possibly truncated if we could not
- read the full amount of data."
+ into answer.  Return `answer' itself, or raise an exception if we
+ could not read the full amount of data."
 
  <category: 'buffering'>
  | read last |
@@ -657,18 +674,11 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
  "Anything more?  We read it from the file.  We can come here only if the
  buffer cannot be filled completely, or if we want to read really a
  lot of data."
- read = anInteger
-    ifFalse:
- [self atEnd
-    ifFalse:
- [read := read + (self
-    read: answer
-    from: read + 1
-    to: answer size)].
- read = anInteger
-    ifFalse:
- [self atEnd ifTrue: [self pastEnd].
- ^answer copyFrom: 1 to: read]].
+ [ read = anInteger ] whileFalse: [
+            self atEnd ifTrue: [
+                ^SystemExceptions.NotEnoughElements signalOn: anInteger - read].
+            read := read + (self read: answer from: read + 1 to: anInteger)
+        ].
  ^answer
     ]
 
diff --git a/kernel/Stream.st b/kernel/Stream.st
index 47b6fdf..32ca159 100644
--- a/kernel/Stream.st
+++ b/kernel/Stream.st
@@ -72,8 +72,11 @@ provide for writing collections sequentially.'>
     ]
 
     nextAvailable: anInteger [
- "Return up to anInteger objects in the receiver, stopping if
- the end of the stream is reached"
+ "Return up to anInteger objects in the receiver.  Besides stopping if
+ the end of the stream is reached, this may return less than this
+ number of bytes for various reasons.  For example, on files and sockets
+ this operation could be non-blocking, or could do at most one I/O
+ operation."
 
  <category: 'accessing-reading'>
  | stream |
diff --git a/packages/sockets/ChangeLog b/packages/sockets/ChangeLog
index 880edfe..d7863d8 100644
--- a/packages/sockets/ChangeLog
+++ b/packages/sockets/ChangeLog
@@ -1,6 +1,10 @@
+2008-08-04  Paolo Bonzini  <[hidden email]>
+
+ * Sockets.st: Add StreamSocket>>#nextAvailable:.
+
 2008-08-01  Paolo Bonzini  <[hidden email]>
 
- * sysdep.c: Use SOCK_CLOEXEC if available, else use FD_CLOEXEC.
+ * sockets.c: Use SOCK_CLOEXEC if available, else use FD_CLOEXEC.
 
 2008-07-28  Paolo Bonzini  <[hidden email]>
 
diff --git a/packages/sockets/Sockets.st b/packages/sockets/Sockets.st
index 533de6e..02cd87e 100644
--- a/packages/sockets/Sockets.st
+++ b/packages/sockets/Sockets.st
@@ -1163,9 +1163,11 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  or from the operating system."
 
  <category: 'stream protocol'>
- self canRead ifFalse: [ ^0 ].
+ | lookaheadBytes |
+ lookaheadBytes := lookahead isNil ifTrue: [ 0 ] ifFalse: [ 1 ].
+ self canRead ifFalse: [ ^lookaheadBytes ].
  self readBuffer isEmpty ifTrue: [ self readBuffer fill ].
- ^self readBuffer availableBytes
+ ^lookaheadBytes + self readBuffer availableBytes
     ]
 
     bufferContents [
@@ -1216,6 +1218,30 @@ This class adds a read buffer to the basic model of AbstractSocket.'>
  lookahead := nil.
  ^result
     ]
+
+    nextAvailable: anInteger [
+        "Return up to anInteger objects in the receiver, stopping if
+         the end of the stream is reached"
+
+        <category: 'accessing-reading'>
+ | buffer available stream |
+ self ensureReadable.
+ available := self availableBytes.
+ available >= anInteger ifTrue: [ ^self next: anInteger ].
+
+ "Try filling the first buffer."
+ buffer := self next: available.
+ available := self availableBytes min: anInteger - available.
+ available = 0 ifTrue: [ ^buffer ].
+
+ "Streams have extra costs because of copying, use them only if
+ needed."
+ stream := WriteStream with: buffer.
+ [ (available := self availableBytes min: anInteger - stream size) > 0 ]
+    whileTrue: [ stream nextPutAll: (self next: available) ].
+
+ ^stream contents
+    ]
 
     nextHunk [
  "Answer the next buffers worth of stuff in the Stream represented
diff --git a/packages/sport/ChangeLog b/packages/sport/ChangeLog
index c233c72..3599bbb 100644
--- a/packages/sport/ChangeLog
+++ b/packages/sport/ChangeLog
@@ -1,3 +1,7 @@
+2008-08-04  Paolo Bonzini  <[hidden email]>
+
+ * sport.st: Use StreamSocket>>#nextAvailable:.
+
 2008-07-28  Paolo Bonzini  <[hidden email]>
 
  * sport.st: Fix SpFilename>>#tail and SpSocket>>#readInto:startingAt:for:.
diff --git a/packages/sport/sport.st b/packages/sport/sport.st
index 7ed23de..6ed8633 100644
--- a/packages/sport/sport.st
+++ b/packages/sport/sport.st
@@ -1205,19 +1205,10 @@ Object subclass: SpSocket [
  If the targetNumberOfBytes are not available, I return what I can get."
 
  <category: 'services-io'>
- "FIXME: this needs targetNumberOfBytes"
- ^SpExceptionContext
-    for: [
- | result buf |
- result := ByteArray new: targetNumberOfBytes.
- buf := self underlyingSocket readBuffer.
- buf isEmpty ifTrue: [ buf fill ].
- 1 to: targetNumberOfBytes do: [ :i |
-    buf isEmpty ifTrue: [ ^result copyFrom: 1 to: i - 1 ].
-    result at: i put: buf next asInteger ].
- ^result ]
-    on: Error
-    do: [:ex | SpSocketError raiseSignal: ex]
+ "FIXME: this needs #nextAvailable:into: to avoid a copy in #asByteArray"
+ ^(self underlyingSocket
+    ensureReadable;
+    nextAvailable: targetNumberOfBytes) asByteArray
     ]
 
     readInto: aByteArray startingAt: startIndex for: aNumberOfBytes [
@@ -1226,18 +1217,13 @@ Object subclass: SpSocket [
  number of bytes to be read. We get what its there no matter how much their is!!"
 
  <category: 'services-io'>
- | actuallyRead total |
- total := 0.
- self underlyingSocket ensureReadable.
- [
-    actuallyRead := self underlyingSocket availableBytes
- min: aNumberOfBytes - total.
-    actuallyRead = 0 ifTrue: [ ^total ].
-    aByteArray replaceFrom: startIndex to: startIndex + actuallyRead - 1
-     with: (self underlyingSocket next: actuallyRead)
-     startingAt: total + 1.
-    total := total + actuallyRead.
- ] repeat
+ | buffer |
+ buffer := self underlyingSocket
+    ensureReadable;
+    nextAvailable: aNumberOfBytes.
+ aByteArray replaceFrom: startIndex to: startIndex + buffer size - 1
+     with: buffer startingAt: 1.
+ ^buffer size
     ]
 
     readyForRead [

_______________________________________________
help-smalltalk mailing list
[hidden email]
http://lists.gnu.org/mailman/listinfo/help-smalltalk