Status: FixedWaitingToBePharoed
Owner: stephane.ducasse CC: siguctua Labels: Milestone-1.3 New issue 3434 by stephane.ducasse: SharedQueue Fixes http://code.google.com/p/pharo/issues/detail?id=3434 A new version of Collections was added to project The Inbox: http://source.squeak.org/inbox/Collections-ul.412.mcz ==================== Summary ==================== Name: Collections-ul.412 Author: ul Time: 8 December 2010, 4:46:07.6 am UUID: 0d255954-d008-d647-91fa-59be37678e29 Ancestors: Collections-ul.411 - SharedQueue fixes =============== Diff against Collections-ul.411 =============== Item was changed: ----- Method: SharedQueue>>flushAllSuchThat: (in category 'accessing') ----- flushAllSuchThat: aBlock "Remove from the queue all objects that satisfy aBlock." + + accessProtect critical: [ + | newReadPos | - ^accessProtect critical: [ - | value newReadPos | newReadPos := writePosition. + writePosition - 1 to: readPosition by: -1 do: [ :i | + | value | + value := contentsArray at: i. - writePosition-1 to: readPosition by: -1 do: - [:i | value := contentsArray at: i. contentsArray at: i put: nil. + ((aBlock value: value) and: [ (readSynch waitIfLocked: [ nil ]) notNil ]) ifFalse: [ - (aBlock value: value) ifTrue: [ - "We take an element out of the queue, and therefore, we need to decrement - the readSynch signals" - readSynch wait. - ] ifFalse: [ newReadPos := newReadPos - 1. + contentsArray at: newReadPos put: value ] ]. + readPosition := newReadPos ]! - contentsArray at: newReadPos put: value]]. - readPosition := newReadPos. - value]! Item was changed: ----- Method: SharedQueue>>makeRoomAtEnd (in category 'private') ----- makeRoomAtEnd + | contentsSize newContentsArray | - | contentsSize | contentsSize := writePosition - readPosition. + newContentsArray := contentsSize * 2 > contentsArray size + ifTrue: [ contentsArray class new: contentsArray size * 2 ] - contentsSize * 2 > contentsArray size - ifTrue: [ - "grow" - contentsArray := (contentsArray class new: contentsArray size * 2) - replaceFrom: 1 - to: contentsSize - with: contentsArray - startingAt: readPosition; - yourself ] ifFalse: [ (contentsArray size > 10 and: [ contentsSize * 4 <= contentsArray size ]) + ifTrue: [ contentsArray class new: (contentsSize * 2 max: 10) ] + ifFalse: [ contentsArray ] ]. + newContentsArray + replaceFrom: 1 + to: contentsSize + with: contentsArray + startingAt: readPosition. + contentsArray == newContentsArray + ifFalse: [ contentsArray := newContentsArray ] + ifTrue: [ contentsArray from: contentsSize + 1 to: contentsArray size put: nil ]. - ifTrue: [ - "shrink" - contentsArray := (contentsArray class new: (contentsSize * 2 max: 10)) - replaceFrom: 1 - to: contentsSize - with: contentsArray - startingAt: readPosition; - yourself ] - ifFalse: [ - "just move the elements to the front" - contentsArray - replaceFrom: 1 - to: contentsSize - with: contentsArray - startingAt: readPosition. - contentsArray - from: contentsSize + 1 - to: contentsArray size - put: nil ] ]. readPosition := 1. writePosition := contentsSize + 1! Item was changed: ----- Method: SharedQueue>>nextOrNil (in category 'accessing') ----- nextOrNil "Answer the object that was sent through the receiver first and has not yet been received by anyone. If no object has been sent, answer <nil>." + readSynch waitIfLocked: [ ^nil ]. + ^accessProtect + critical: [ + | value | + readPosition = writePosition + ifTrue: + [self error: 'Error in SharedQueue synchronization'. + value := nil] + ifFalse: + [value := contentsArray at: readPosition. + contentsArray at: readPosition put: nil. + readPosition := readPosition + 1]. + value ]! - ^accessProtect critical: [ - | value | - readPosition >= writePosition ifTrue: [ - value := nil - ] ifFalse: [ - value := contentsArray at: readPosition. - contentsArray at: readPosition put: nil. - readPosition := readPosition + 1 - ]. - readPosition >= writePosition ifTrue: [readSynch initSignals]. - value - ].! Item was changed: ----- Method: SharedQueue>>nextOrNilSuchThat: (in category 'accessing') ----- nextOrNilSuchThat: aBlock "Answer the next object that satisfies aBlock, skipping any intermediate objects. If no object has been sent, answer <nil> and leave me intact. NOTA BENE: aBlock MUST NOT contain a non-local return (^)." ^accessProtect critical: [ | value readPos | value := nil. readPos := readPosition. + [ readPos < writePosition and: [ value isNil ] ] whileTrue: [ - [readPos < writePosition and: [value isNil]] whileTrue: [ value := contentsArray at: readPos. readPos := readPos + 1. + (aBlock value: value) + ifFalse: [ value := nil ] + ifTrue: [ + readSynch waitIfLocked: [ ^nil ]. "We found the value, but someone else booked it." + readPosition to: readPos - 1 do: [ :j | contentsArray at: j put: nil ]. + readPosition := readPos ] ]. + value ]. - (aBlock value: value) ifTrue: [ - readPosition to: readPos - 1 do: [ :j | - contentsArray at: j put: nil. - ]. - readPosition := readPos. - ] ifFalse: [ - value := nil. - ]. - ]. - readPosition >= writePosition ifTrue: [readSynch initSignals]. - value. - ]. "=== q := SharedQueue new. 1 to: 10 do: [ :i | q nextPut: i]. c := OrderedCollection new. [ v := q nextOrNilSuchThat: [ :e | e odd]. v notNil ] whileTrue: [ c add: {v. q size} ]. {c. q} explore ==="! Item was changed: ----- Method: SharedQueue>>peek (in category 'accessing') ----- peek "Answer the object that was sent through the receiver first and has not yet been received by anyone but do not remove it from the receiver. If no object has been sent, return nil" + ^readSynch - ^accessProtect critical: [ + accessProtect critical: [ + readPosition >= writePosition ifFalse: [ + contentsArray at: readPosition ] ] ] + ifLocked: [ nil ]! - | value | - readPosition >= writePosition - ifTrue: [readPosition := 1. - writePosition := 1. - value := nil] - ifFalse: [value := contentsArray at: readPosition]. - value].! |
Comment #1 on issue 3434 by stephane.ducasse: SharedQueue Fixes http://code.google.com/p/pharo/issues/detail?id=3434 it probably depends on other squeak changes |
Updates:
Status: Invalid Comment #2 on issue 3434 by [hidden email]: SharedQueue Fixes http://code.google.com/p/pharo/issues/detail?id=3434 SharedQueue was replaced by SharedQueue2 in Pharo |
Comment #3 on issue 3434 by [hidden email]: SharedQueue Fixes http://code.google.com/p/pharo/issues/detail?id=3434 it reminds me that i must finish with AtomicQueue(s), so we can integrate and use them instead. |
Free forum by Nabble | Edit this page |