Issue 3434 in pharo: SharedQueue Fixes

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue 3434 in pharo: SharedQueue Fixes

pharo
Status: FixedWaitingToBePharoed
Owner: stephane.ducasse
CC: siguctua
Labels: Milestone-1.3

New issue 3434 by stephane.ducasse: SharedQueue Fixes
http://code.google.com/p/pharo/issues/detail?id=3434

A new version of Collections was added to project The Inbox:
http://source.squeak.org/inbox/Collections-ul.412.mcz

==================== Summary ====================

Name: Collections-ul.412
Author: ul
Time: 8 December 2010, 4:46:07.6 am
UUID: 0d255954-d008-d647-91fa-59be37678e29
Ancestors: Collections-ul.411

- SharedQueue fixes

=============== Diff against Collections-ul.411 ===============

Item was changed:
  ----- Method: SharedQueue>>flushAllSuchThat: (in category 'accessing')  
-----
  flushAllSuchThat: aBlock
        "Remove from the queue all objects that satisfy aBlock."
+
+       accessProtect critical: [
+               | newReadPos |
-       ^accessProtect critical: [
-               | value newReadPos |
                newReadPos := writePosition.
+               writePosition - 1 to: readPosition by: -1 do: [ :i |
+                       | value |
+                       value := contentsArray at: i.
-               writePosition-1 to: readPosition by: -1 do:
-                       [:i | value := contentsArray at: i.
                        contentsArray at: i put: nil.
+                       ((aBlock value: value) and: [ (readSynch  
waitIfLocked: [ nil ]) notNil ]) ifFalse: [
-                       (aBlock value: value) ifTrue: [
-                               "We take an element out of the queue, and  
therefore, we need to decrement
-                               the readSynch signals"
-                               readSynch wait.
-                       ] ifFalse: [
                                newReadPos := newReadPos - 1.
+                               contentsArray at: newReadPos put: value ] ].
+               readPosition := newReadPos ]!
-                               contentsArray at: newReadPos put: value]].
-               readPosition := newReadPos.
-               value]!

Item was changed:
  ----- Method: SharedQueue>>makeRoomAtEnd (in category 'private') -----
  makeRoomAtEnd

+       | contentsSize newContentsArray |
-       | contentsSize |
        contentsSize := writePosition - readPosition.
+       newContentsArray := contentsSize * 2 > contentsArray size
+               ifTrue: [ contentsArray class new: contentsArray size * 2 ]
-       contentsSize * 2 > contentsArray size
-               ifTrue: [
-                       "grow"
-                       contentsArray := (contentsArray class new:  
contentsArray size * 2)
-                               replaceFrom: 1
-                               to: contentsSize
-                               with: contentsArray
-                               startingAt: readPosition;
-                               yourself ]
                ifFalse: [
                        (contentsArray size > 10 and: [ contentsSize * 4 <=  
contentsArray size ])
+                               ifTrue: [ contentsArray class new:  
(contentsSize * 2 max: 10) ]
+                               ifFalse: [ contentsArray ] ].
+       newContentsArray
+               replaceFrom: 1
+               to: contentsSize
+               with: contentsArray
+               startingAt: readPosition.
+       contentsArray == newContentsArray
+               ifFalse: [ contentsArray := newContentsArray ]
+               ifTrue: [ contentsArray from: contentsSize + 1 to:  
contentsArray size put: nil ].
-                               ifTrue: [
-                                       "shrink"
-                                       contentsArray := (contentsArray  
class new: (contentsSize * 2 max: 10))
-                                               replaceFrom: 1
-                                               to: contentsSize
-                                               with: contentsArray
-                                               startingAt: readPosition;
-                                               yourself ]
-                               ifFalse: [
-                                       "just move the elements to the  
front"
-                                       contentsArray
-                                               replaceFrom: 1
-                                               to: contentsSize
-                                               with: contentsArray
-                                               startingAt: readPosition.
-                                       contentsArray
-                                               from: contentsSize + 1
-                                               to: contentsArray size
-                                               put: nil ] ].
        readPosition := 1.
        writePosition := contentsSize + 1!

Item was changed:
  ----- Method: SharedQueue>>nextOrNil (in category 'accessing') -----
  nextOrNil
        "Answer the object that was sent through the receiver first and has  
not
        yet been received by anyone. If no object has been sent, answer  
<nil>."

+       readSynch waitIfLocked: [ ^nil ].
+       ^accessProtect
+               critical: [
+                       | value |
+                       readPosition = writePosition
+                                       ifTrue:
+                                               [self error: 'Error in  
SharedQueue synchronization'.
+                                                value := nil]
+                                       ifFalse:
+                                               [value := contentsArray at:  
readPosition.
+                                                contentsArray at:  
readPosition put: nil.
+                                                readPosition :=  
readPosition + 1].
+                       value ]!
-       ^accessProtect critical: [
-               | value |
-               readPosition >= writePosition ifTrue: [
-                       value := nil
-               ] ifFalse: [
-                       value := contentsArray at: readPosition.
-                       contentsArray at: readPosition put: nil.
-                       readPosition := readPosition + 1
-               ].
-               readPosition >= writePosition ifTrue: [readSynch  
initSignals].
-               value
-       ].!

Item was changed:
  ----- Method: SharedQueue>>nextOrNilSuchThat: (in category 'accessing')  
-----
  nextOrNilSuchThat: aBlock
        "Answer the next object that satisfies aBlock, skipping any  
intermediate objects.
        If no object has been sent, answer <nil> and leave me intact.
        NOTA BENE:  aBlock MUST NOT contain a non-local return (^)."

        ^accessProtect critical: [
                | value readPos |
                value := nil.
                readPos := readPosition.
+               [ readPos < writePosition and: [ value isNil ] ] whileTrue:  
[
-               [readPos < writePosition and: [value isNil]] whileTrue: [
                        value := contentsArray at: readPos.
                        readPos := readPos + 1.
+                       (aBlock value: value)
+                               ifFalse: [ value := nil ]
+                               ifTrue: [
+                                       readSynch waitIfLocked: [ ^nil  
]. "We found the value, but someone else booked it."
+                                       readPosition to: readPos - 1 do:  
[ :j | contentsArray at: j put: nil ].
+                                       readPosition := readPos ] ].
+               value ].
-                       (aBlock value: value) ifTrue: [
-                               readPosition to: readPos - 1 do: [ :j |
-                                       contentsArray at: j put: nil.
-                               ].
-                               readPosition := readPos.
-                       ] ifFalse: [
-                               value := nil.
-                       ].
-               ].
-               readPosition >= writePosition ifTrue: [readSynch  
initSignals].
-               value.
-       ].
  "===
  q := SharedQueue new.
  1 to: 10 do: [ :i | q nextPut: i].
  c := OrderedCollection new.
  [
        v := q nextOrNilSuchThat: [ :e | e odd].
        v notNil
  ] whileTrue: [
        c add: {v. q size}
  ].
  {c. q} explore
  ==="!

Item was changed:
  ----- Method: SharedQueue>>peek (in category 'accessing') -----
  peek
        "Answer the object that was sent through the receiver first and has  
not
        yet been received by anyone but do not remove it from the receiver.  
If
        no object has been sent, return nil"

+       ^readSynch
-       ^accessProtect
                critical: [
+                       accessProtect critical: [
+                               readPosition >= writePosition ifFalse: [
+                                       contentsArray at: readPosition ] ] ]
+               ifLocked: [ nil ]!
-                       | value |
-                       readPosition >= writePosition
-                                       ifTrue: [readPosition := 1.
-                                                       writePosition := 1.
-                                                       value := nil]
-                                       ifFalse: [value := contentsArray  
at: readPosition].
-                       value].!


Reply | Threaded
Open this post in threaded view
|

Re: Issue 3434 in pharo: SharedQueue Fixes

pharo

Comment #1 on issue 3434 by stephane.ducasse: SharedQueue Fixes
http://code.google.com/p/pharo/issues/detail?id=3434

it probably depends on other squeak changes


Reply | Threaded
Open this post in threaded view
|

Re: Issue 3434 in pharo: SharedQueue Fixes

pharo
Updates:
        Status: Invalid

Comment #2 on issue 3434 by [hidden email]: SharedQueue Fixes
http://code.google.com/p/pharo/issues/detail?id=3434

SharedQueue was replaced by SharedQueue2 in Pharo


Reply | Threaded
Open this post in threaded view
|

Re: Issue 3434 in pharo: SharedQueue Fixes

pharo

Comment #3 on issue 3434 by [hidden email]: SharedQueue Fixes
http://code.google.com/p/pharo/issues/detail?id=3434

it reminds me that i must finish with AtomicQueue(s),
so we can integrate and use them instead.