(Gzip)Streams

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

(Gzip)Streams

Reinout Heeck-2
Hi All,

I am trying to compress the data going over a socket connection. When I
found the GzipRead/WriteStream
I thought it would be easy, but alas things are not as they seem.
 
First of all GZipWriteStream behaves strangely when sending #flush. It
finishes the stream - writes the trailer - and you can't do anything
with it anymore. However the GZip dll is able to do a flush (using a
different parameter) of the available data and continue afterwards.

Another issue is that GzipReadStream on creation immediately tries to
read the header, which is fine for files
but not for sockets. Instead it should read the header when trying to
read the first data, not earlier because the calling application will
block on this (during initialization in our case: the app blocks and
doesn't get a chance to initialize the communication in the reverse
direction which triggers sending of the header we are waiting for)
I have implemented the correct flush behavior on GZipWriteStream but
couldn't really test it because of problems with external streams.
Here the problem is that the GZipReadstream tries to fill its buffer
(InflateStream>>fillInputBuffer). It tries to do this with
#next:into:startingAt: expecting an IncompleteNextCountError when less
data is available than the requested amount. The only problem is that
sockets won't raise this error (filestreams do). So we block on data
that is not (yet) available because the code doesn't get the
IncompleteNextCount it expects even though there might be enough data to
decode the next bit (and the gzip code expects this case through
catching IncompleteNextCount ).

There is a difference in behavior between externalstreams on files and
on sockets...

first on files:

"create the file"
| f ws frs |
f := 'testfile' asFilename.
ws := f writeStream binary.
ws nextPutAll: #[1 2 3 4 5 6 7 8 9 0].
ws close.

frs := f readStream binary.
frs next: 20 into: (ByteArray new: 20) startingAt: 1. "=> raises an
IncompleteNextCountError "
frs close.

frs := f readStream binary.
frs nextAvailable: 20.  "=> return 10 bytes."
frs close

Now on sockets:
| skts c1 c2 w1 rs2 |
skts := SocketAccessor openPair.
c1 := skts first asExternalConnection.
c2 := skts last asExternalConnection.

w1 := c1 writeStream binary.
rs2 := c2 readStream binary.
w1 nextPutAll: #[ 1 2 3 4 5 6 7 8 9 0].
w1 flush.
rs2 nextAvailable: 20. "=> blocks until data becomes avaliable or the
connection is closed (it then returns the bytes that were available)
if we use next:into:startingAt: instead of nextAvailable: "
rs2 next: 20 into: (ByteArray new: 20) startingAt: 1. "=> blocks until
data becomes avaliable or connection is closed (it then raises an
IncompleteNextCountError)"
c1 close.
c2 close

If there are no bytes available I can live with the fact that the socket
code blocks but in the other case I expect to receive the available
data, blocking is not OK.

Cham & Reinout




















as far as I know there is no easy way to read all available data from a
externalStream connected to a socket.
-

Reply | Threaded
Open this post in threaded view
|

Re: [Bulk] (Gzip)Streams

kobetic
Cham Püschel wrote:

> Now on sockets:
> | skts c1 c2 w1 rs2 |
> skts := SocketAccessor openPair.
> c1 := skts first asExternalConnection.
> c2 := skts last asExternalConnection.
>
> w1 := c1 writeStream binary.
> rs2 := c2 readStream binary.
> w1 nextPutAll: #[ 1 2 3 4 5 6 7 8 9 0].
> w1 flush.
> rs2 nextAvailable: 20. "=> blocks until data becomes avaliable or the
> connection is closed (it then returns the bytes that were available)
> if we use next:into:startingAt: instead of nextAvailable: "
> rs2 next: 20 into: (ByteArray new: 20) startingAt: 1. "=> blocks until
> data becomes avaliable or connection is closed (it then raises an
> IncompleteNextCountError)"
> c1 close.
> c2 close
>
> If there are no bytes available I can live with the fact that the socket
> code blocks but in the other case I expect to receive the available
> data, blocking is not OK.

I believe that #atEnd and sockets just don't play well together. Until the other side actually closes the socket, the read call has no chance to infer that this is the end of the stream. More bytes can still come until the connection is properly shut down. If you rewrite your example as:

> | skts c1 c2 w1 rs2 |
> skts := SocketAccessor openPair.
> c1 := skts first asExternalConnection.
> c2 := skts last asExternalConnection.
>
> w1 := c1 writeStream binary.
> rs2 := c2 readStream binary.
> w1 nextPutAll: #[ 1 2 3 4 5 6 7 8 9 0].
> w1 flush.

  w1 close. "<<<< this is added>>>"

> rs2 nextAvailable: 20. "=> blocks until data becomes avaliable or the
> connection is closed (it then returns the bytes that were available)
> if we use next:into:startingAt: instead of nextAvailable: "
> rs2 next: 20 into: (ByteArray new: 20) startingAt: 1. "=> blocks until
> data becomes avaliable or connection is closed (it then raises an
> IncompleteNextCountError)"
> c1 close.
> c2 close

Then the reading does what you want. I doubt that the plain socket streams can do much better than that. It's possible that the zipping streams could be written so that they are more friendly to socket streams, i.e. possibly avoid relying on #atEnd like behavior. I have yet to look at those.

However generally, as soon as there are sockets involved it's always better to provide some other indication of the actual length of data transferred. Obviously, if you need to zip your data, it's probably big enough that you don't want to zip it all first so that you can prefix it with a total byte size. In these cases the common pattern is to "chunk" the zipped bytes, i.e. split it to chunks of size that fits into memory conveniently and prefix each chunk with its size. Look into the recent version of the HTTP code in the public repository, we're doing this exact thing there with stacked ChunkedStream and GZipStream.

HTH,

Martin

Reply | Threaded
Open this post in threaded view
|

Re:(Gzip)Streams

kobetic
In reply to this post by Reinout Heeck-2
Cham Püschel wrote:

> Now on sockets:
> | skts c1 c2 w1 rs2 |
> skts := SocketAccessor openPair.
> c1 := skts first asExternalConnection.
> c2 := skts last asExternalConnection.
>
> w1 := c1 writeStream binary.
> rs2 := c2 readStream binary.
> w1 nextPutAll: #[ 1 2 3 4 5 6 7 8 9 0].
> w1 flush.
> rs2 nextAvailable: 20. "=> blocks until data becomes avaliable or the
> connection is closed (it then returns the bytes that were available)
> if we use next:into:startingAt: instead of nextAvailable: "
> rs2 next: 20 into: (ByteArray new: 20) startingAt: 1. "=> blocks until
> data becomes avaliable or connection is closed (it then raises an
> IncompleteNextCountError)"
> c1 close.
> c2 close
>
> If there are no bytes available I can live with the fact that the socket
> code blocks but in the other case I expect to receive the available
> data, blocking is not OK.

I believe that #atEnd and sockets just don't play well together. Until the other side actually closes the socket, the read call has no chance to infer that this is the end of the stream. More bytes can still come until the connection is properly shut down. If you rewrite your example as:

> | skts c1 c2 w1 rs2 |
> skts := SocketAccessor openPair.
> c1 := skts first asExternalConnection.
> c2 := skts last asExternalConnection.
>
> w1 := c1 writeStream binary.
> rs2 := c2 readStream binary.
> w1 nextPutAll: #[ 1 2 3 4 5 6 7 8 9 0].
> w1 flush.

  w1 close. "<<<< this is added>>>"

> rs2 nextAvailable: 20. "=> blocks until data becomes avaliable or the
> connection is closed (it then returns the bytes that were available)
> if we use next:into:startingAt: instead of nextAvailable: "
> rs2 next: 20 into: (ByteArray new: 20) startingAt: 1. "=> blocks until
> data becomes avaliable or connection is closed (it then raises an
> IncompleteNextCountError)"
> c1 close.
> c2 close

Then the reading does what you want. I doubt that the plain socket streams can do much better than that. It's possible that the zipping streams could be written so that they are more friendly to socket streams, i.e. possibly avoid relying on #atEnd like behavior. I have yet to look at those.

However generally, as soon as there are sockets involved it's always better to provide some other indication of the actual length of data transferred. Obviously, if you need to zip your data, it's probably big enough that you don't want to zip it all first so that you can prefix it with a total byte size. In these cases the common pattern is to "chunk" the zipped bytes, i.e. split it to chunks of size that fits into memory conveniently and prefix each chunk with its size. Look into the recent version of the HTTP code in the public repository, we're doing this exact thing there with stacked ChunkedStream and GZipStream.

HTH,

Martin