Socket activity events

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Socket activity events

Schwab,Wilhelm K
Hello all,

The recent thread on events from an HTTP client control and some work I
have been doing on multiple fronts lead me to recognize an opportunity
to improve a few of my tools.  The idea(s) would hinge on getting
measures of the amount of data arriving over a socket.
SocketReadStream>>readPage looks like a reasonable place to start; one
could trigger an event with the size of the collection just read.

Any better ideas or cautions?

Have a good one,

Bill

--
Wilhelm K. Schwab, Ph.D.
[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Socket activity events

Chris Uppal-3
Bill,

> The idea(s) would hinge on getting
> measures of the amount of data arriving over a socket.
> SocketReadStream>>readPage looks like a reasonable place to start; one
> could trigger an event with the size of the collection just read.

One approach would be to create a stream wrapper object which generates
progress events.  It would contain a "real" stream, and forward the necessary
methods (#next, #next: #nextAvailable: etc) to the underlying stream and also
monitor how much data had been read, and use that to fire off progress events.

Be warned, unless you are careful you will generate /lots/ of progress events
and slow your application down considerably...

If anyone's interested I'll post my own version of this idea.

    -- chris


Reply | Threaded
Open this post in threaded view
|

Re: Socket activity events

Schwab,Wilhelm K
Chris,

>>The idea(s) would hinge on getting
>>measures of the amount of data arriving over a socket.
>>SocketReadStream>>readPage looks like a reasonable place to start; one
>>could trigger an event with the size of the collection just read.
>
> One approach would be to create a stream wrapper object which generates
> progress events.  It would contain a "real" stream, and forward the necessary
> methods (#next, #next: #nextAvailable: etc) to the underlying stream and also
> monitor how much data had been read, and use that to fire off progress events.
>
> Be warned, unless you are careful you will generate /lots/ of progress events
> and slow your application down considerably...
>
> If anyone's interested I'll post my own version of this idea.

Sure, if you don't mind.  But please don't work too hard at it just for
my benefit.

Have a good one,

Bill

--
Wilhelm K. Schwab, Ph.D.
[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Socket activity events

Chris Uppal-3
Bill,

[me:]
> > If anyone's interested I'll post my own version of this idea.
>
> Sure, if you don't mind.  But please don't work too hard at it just
> for my benefit.

No problem -- it already existed ;-)

Admitendly, I've never used it in anger...

Requires the "Abstract Streams" stuff from my website (under
Miscellania).

BTW, this post contains tabs, but if you are reading in OE then you
won't see them, but they are still there -- just  drag the post to your
desktop, rename, and edit.

I expect that lines will wrap -- you may have to unwrap them before
filing-in.  Sorry !

    -- chris

==============  ProgressStream.cls ==============
"Filed out from Dolphin Smalltalk XP"!

AbstractReadStream subclass: #ProgressStream
        instanceVariableNames: 'block stream'
        classVariableNames: ''
        poolDictionaries: ''
        classInstanceVariableNames: ''!
ProgressStream guid: (GUID fromString:
'{9C900F1C-E8AF-4DBC-82BE-193F7ECEA6F9}')!
ProgressStream comment: 'Copyright © Chris Uppal, 2004.

A readable, positionable, stream that wraps another readable stream and
which provides progress/position notifications as it''s read.

Specifically it contains a <ReadableStream> to which it forwards all
operstions, in addition it contains a <monadicValuable> which is
evaluated after every operation that can change the position of the
underlying stream.

The class-side methods include a convenient way of arranging to be
notifiied at less fine-grrained intervals (#on:evaluating:times:) and a
few examples, including two ways to connect these beasts up to a
ProgressDialog.'!
!ProgressStream categoriesForClass!Unclassified! !
!ProgressStream methodsFor!

atEnd
        "one of the root methods for <ReadStream>"

        ^ stream atEnd.!

block
        "answer our block"

        ^ block.
!

block: a1Block
        "private -- set the block we will evaluate every time we realise that
our #position
        has changed"

        block := a1Block.!

close
        "one of the root methods for <ReadStream>"

        ^ stream close.!

doWithPosition: a0Block
        "private -- answer the result of evaluating a0Block, ensuring that our
        monitoring block is activated appropriately"

        ^ a0Block ensure: [block value: stream position].!

next
        "one of the root methods for <ReadStream>"

        ^ self doWithPosition: [stream next].!

next: anInteger
        "overriden only for efficiency"

        ^ self doWithPosition: [stream next: anInteger].!

next: anInteger into: aSequenceableCollection startingAt: anIndex
        "one of the root methods for <ReadStream>; optional because
implementable in terms
        of #next"

        ^ self doWithPosition: [stream next: anInteger into:
aSequenceableCollection startingAt: anIndex].!

nextAvailable: anInteger
        "overriden only for efficiency"

        ^ self doWithPosition: [stream nextAvailable: anInteger].!

position
        "one of the root methods for positionable Streams"

        ^ stream position.
!

position: anInteger
        "one of the root methods for positionable Streams"

        ^ self doWithPosition: [stream position: anInteger].!

size
        "one of the root methods for positionable Streams"

        ^ stream size.!

skipTo: anObject
        "overriden only for efficiency"

        ^ self doWithPosition: [stream skipTo: anObject].
!

stream
        "answer our stream"

        ^ stream.
!

stream: aReadStream
        "private -- set our wrapped stream to that given"

        stream := aReadStream.!

upTo: anObject
        "overriden only for efficiency"

        ^ self doWithPosition: [stream upTo: anObject].!

upToEnd
        "overriden only for efficiency"

        "actually it's dubious whether we should override this, since it
suppresses many progress notifications"
        ^ self doWithPosition: [stream upToEnd].! !
!ProgressStream categoriesFor: #atEnd!public!streaming!testing! !
!ProgressStream categoriesFor: #block!accessing!public! !
!ProgressStream categoriesFor: #block:!initializing!private! !
!ProgressStream categoriesFor: #close!operations!public! !
!ProgressStream categoriesFor: #doWithPosition:!helpers!private! !
!ProgressStream categoriesFor:
#next!accessing!enumerating!public!streaming! !
!ProgressStream categoriesFor: #next:!accessing!public! !
!ProgressStream categoriesFor: #next:into:startingAt:!accessing!public!
!
!ProgressStream categoriesFor: #nextAvailable:!accessing!public! !
!ProgressStream categoriesFor: #position!positioning!public! !
!ProgressStream categoriesFor: #position:!positioning!public! !
!ProgressStream categoriesFor: #size!accessing!public! !
!ProgressStream categoriesFor: #skipTo:!positioning!public! !
!ProgressStream categoriesFor: #stream!accessing!public! !
!ProgressStream categoriesFor: #stream:!initializing!private! !
!ProgressStream categoriesFor: #upTo:!accessing!public! !
!ProgressStream categoriesFor: #upToEnd!accessing!public! !

!ProgressStream class methodsFor!

example1: aFilename
        "simple example where progess is written to the transcript as it
advances in 255 steps.

                self example1: (SessionManager current imagePath , '.sml')

                self example1: (FileOpenDialog showModal).
        "

        | stream |

        stream := ProgressStream
                        on: (FileStream read: aFilename)
                        evaluating: [:page | Transcript display: page; cr]
                        times: 255.

        [stream atEnd] whileFalse: [stream nextLine].

        stream close.
!

example2: aFilename
        "simple example using a progress dialog.

                self example2: (SessionManager current imagePath , '.sml').

                self example2: (FileOpenDialog showModal).
        "

        | fileStream |

        fileStream := FileStream read: aFilename.

        ProgressDialog showModalWhile:
                [:it || stream |
                it caption: ('Reading ' , aFilename).
                stream := ProgressStream
                        on: fileStream
                        evaluating: [:page | it value: page]
                        times: 100.
                [stream atEnd] whileFalse: [stream nextLine]].

        "do this outside the 'operation' so it gets closed even if the dialog
is cancelled"
        fileStream close.
!

example3: aFilename
        "same as example2, but arranged a little differently.

                self example3: (SessionManager current imagePath , '.sml').

                self example3: (FileOpenDialog showModal).
        "

        | progress stream |

        progress := ProgressDialog create.

        stream := ProgressStream
                        on: (FileStream read: aFilename)
                        evaluating: progress
                        times: 100.

        progress
                operation: [:it | [stream atEnd] whileFalse: [stream nextLine]];
                caption: ('Reading ' , aFilename);
                showModal. "must be modal or we'll close the file too soon!!"

        stream close.
!

fastOn: aReadStream evaluating: a1Block times: anInteger
        "answer a new instance that wraps the given stream, and will evaluate
        a1Block approximately anInteger times, passing in the sucessive values
        (if they are successive) as the parameter.
        Note that we require that the stream knows it's #size"

        | start end total pageStart pageEnd lastPage |

        "slightly optimised logic since this can get called *lots*  if someone
uses
        #next to read all of a big stream"

        start := aReadStream position.
        end:= aReadStream size.
        total := end - start.

        pageStart := 0.
        pageEnd := -1.
        lastPage = -1.
        ^ self on: aReadStream evaluating:
                        [:pos |
                        (pos < pageStart or: [pos > pageEnd]) ifTrue:
                                [| page |
                                page := pos - start * anInteger // total.
                                page = lastPage ifFalse:
                                        [pageStart := page * total // anInteger + start.
                                        pageEnd := page + 1 * total // anInteger + start - 1.
                                        a1Block value: (lastPage := page)]]].!

on: aReadStream evaluating: a1Block
        "answer a new instance that wraps the given stream, and will evaluate
        a1Block after every read or #position, passing the position in the
stream as its
        parameter"

        ^ (self basicNew)
                stream: aReadStream;
                block: a1Block;
                yourself.!

on: aReadStream evaluating: a1Block times: anInteger
        "answer a new instance that wraps the given stream, and will evaluate
        a1Block approximately anInteger times, passing in the sucessive values
        (if they are successive) as the parameter.
        Note that we require that the stream knows it's #size"

        | start end total pageStart pageEnd lastPage |

        "slightly optimised logic since this can get called *lots*  if someone
uses
        #next to read all of a big stream (as my decompressing streams do).
        In fact we can end up spending around half of the total execution time
        in this arithmetic, if we aren't careful.  The unoptimised version of
the
        same calculation (if I haven't messed it up) is in
simpleOn:evaluating:times:"

        start := aReadStream position.
        end:= aReadStream size.
        total := end - start.

        "tempting to say:
        anInteger * 3 < total ifTrue:
                        [^ self  simpleOn: aReadStream evaluating: a1Block times: anInteger].
        but I don't think it's worth the effort"

        pageStart := 0.
        pageEnd := -1.
        lastPage = -1.
        ^ self on: aReadStream evaluating:
                        [:pos |
                        (pos < pageStart or: [pos > pageEnd]) ifTrue:
                                [| page |
                                page := pos - start * anInteger // total.
                                page = lastPage ifFalse:
                                        [pageStart := page * total // anInteger + start.
                                        pageEnd := page + 1 * total // anInteger + start - 1.
                                        a1Block value: (lastPage := page)]]].!

simpleOn: aReadStream evaluating: a1Block times: anInteger
        "answer a new instance that wraps the given stream, and will evaluate
        a1Block approximately anInteger times, passing in the sucessive values
        (if they are successive) as the parameter"

        "this is the simple, but unoptimised and slow, version of the normal
#on:evaluating:times:"

        | last start end |

        start := aReadStream position.
        end := aReadStream size - start.
        last := 0.

        ^ self on: aReadStream evaluating:
                        [:pos || page |
                        page := pos - start * anInteger // end.
                        page = last ifFalse:
                                [a1Block value: (last := page)]].! !
!ProgressStream class categoriesFor: #example1:!examples!public! !
!ProgressStream class categoriesFor: #example2:!examples!public! !
!ProgressStream class categoriesFor: #example3:!examples!public! !
!ProgressStream class categoriesFor: #fastOn:evaluating:times:!instance
creation!public! !
!ProgressStream class categoriesFor: #on:evaluating:!instance
creation!public! !
!ProgressStream class categoriesFor: #on:evaluating:times:!instance
creation!public! !
!ProgressStream class categoriesFor:
#simpleOn:evaluating:times:!instance creation!public! !==============
==============  ==============


Reply | Threaded
Open this post in threaded view
|

Re: Socket activity events

Chris Uppal-3
I wrote:

> Requires the "Abstract Streams" stuff from my website (under
> Miscellania).

Sorry, that should be: 'Abstract Collections', and it's under Miscellan/e/a :-(

    -- chris