Broadcasting with Announcements

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Broadcasting with Announcements

Ladislav Lenart
Hello,

we run into the following problem: our customer wants us to implement
"timestamp of last change" feature to our product, but our model is not
just plain structure, quite contrary - one business object is a complex
structure itself. We use Announcement framework for delivering notices
about changes in objects to those who are interested in them. But now
we need some kind of broadcast or multicast support in announcements.

So I would like to hear your suggestions about how to solve this problem
or ideas about how to implement broadcast support to announcements...

Thanks.

BTW while back we had performance problems with announcements. We found
that copying 1000 of our objects took about 1 minute of real time on 3GHz
PC, 4 seconds of which was spent in our computation, the rest took to
collect generated garbage. This operation allocated about 1GB of memory.
The cause of this was COW strategy in announcements. To fix this, we
implemented our own MutexSubscribtionRegistry (see attachment) that does
not copy anything at all but instead uses mutex to control access to the
registry. Suddenly, the same operation takes 2-3s on the same PC (we made
other minor optimizations as well) and allocates about 30MB of memory.

The only drawback of MutexSubscriptionRegistry is that dead-lock occurs
when the same thread that is delivering an announcement wants to change
the registry (someone who processes the announcement wants to do this).
We had this issue in only one place so it is most probably very rare
and we solved it by only forking the change of the registry.

We also modified overrides in AnnouncementSubscription. We made it in the
same fashion as dependents, that is:
        * nil (none),
        * object (one override),
        * OrderedCollection (more overrides).
This change prooved useful too, because most of the time there is none or
one override. See attachment.

Enjoy.

Ladislav Lenart


<?xml version="1.0"?>

<st-source>
<time-stamp>From VisualWorks®, 7.4 of 5. prosinec 2005 on 8. srpen 2006 at 16:36:34</time-stamp>
<!-- Package AnnouncementSubscription-DestinationOverrides speed up(1.2,Ladislav Lenart)* -->


<component-property>
<name>AnnouncementSubscription-DestinationOverrides speed up</name> <type>package</type>
<property>fractalCreator</property> <value>'fractal'</value>
</component-property>

<component-property>
<name>AnnouncementSubscription-DestinationOverrides speed up</name> <type>package</type>
<property>packageName</property> <value>'AnnouncementSubscription-DestinationOverrides speed up'</value>
</component-property>

<component-property>
<name>AnnouncementSubscription-DestinationOverrides speed up</name> <type>package</type>
<property>prerequisiteParcels</property> <value>#(#('System-Announcements' ''))</value>
</component-property>

<component-property>
<name>AnnouncementSubscription-DestinationOverrides speed up</name> <type>package</type>
<property>developmentPrerequisites</property> <value>#(#(#any 'System-Announcements' ''))</value>
</component-property>

<component-property>
<name>AnnouncementSubscription-DestinationOverrides speed up</name> <type>package</type>
<property>comment</property> <value>'AnnouncementSubscription-DestinationOverrides speed up

This speeds up destinationOverrides (things like #intercepWith:while: or #suspendWhile:ifAnyMissed:) of
AnnouncementSubscription (AS). This changes destinationOverrides so they states like dependents:
        * nil (none),
        * AnnouncementDeliveryDestination (one) a
        * OrderedCollection of: AnnouncementDeliveryDestination (more than one).

By default, AS toggles between nil and collection states, but this has worse performance for the most
common case (none &lt;-&gt; one) and generates unnecessary garbage.

Collaborators: System-Announcements
'</value>
</component-property>


<methods>
<class-id>Core.AnnouncementSubscription</class-id> <category>private</category>

<body package="AnnouncementSubscription-DestinationOverrides speed up" selector="overridesDo:">overridesDo: oneArgBlock

        destinationOverrides ifNil: [^self].
        destinationOverrides isSequenceable
                ifTrue: [destinationOverrides do: oneArgBlock]
                ifFalse: [oneArgBlock value: destinationOverrides].</body>
</methods>

<methods>
<class-id>Core.AnnouncementSubscription</class-id> <category>accessing</category>

<body package="AnnouncementSubscription-DestinationOverrides speed up" selector="process:from:">process: anAnnouncement from: broadcasterObject
        "The workhorse of announcement delivery. This message is sent by the registry or
        an announcement collection to deliver an announcement as appropriate."

        destinationOverrides isNil
                ifTrue: [destination deliver: anAnnouncement from: broadcasterObject for: self]
                ifFalse: [self overridesDo: [:each | each deliver: anAnnouncement from: broadcasterObject for: self]].</body>

<body package="AnnouncementSubscription-DestinationOverrides speed up" selector="removeDestinationOverride:">removeDestinationOverride: aDestination

        destinationOverrides ifNil: [^self].
        destinationOverrides = aDestination ifTrue: [^destinationOverrides := nil].
        destinationOverrides isSequenceable ifFalse: [^self].
        destinationOverrides remove: aDestination ifAbsent: [^self].
        destinationOverrides size = 1 ifTrue: [destinationOverrides := destinationOverrides first].</body>

<body package="AnnouncementSubscription-DestinationOverrides speed up" selector="addDestinationOverride:">addDestinationOverride: aDestination

        destinationOverrides ifNil: [^destinationOverrides := aDestination].
        destinationOverrides isSequenceable
                ifTrue: [destinationOverrides add: aDestination]
                ifFalse: [destinationOverrides := OrderedCollection with: destinationOverrides with: aDestination].</body>
</methods>



</st-source>

<?xml version="1.0"?>

<st-source>
<time-stamp>From VisualWorks®, 7.4 of 5. prosinec 2005 on 8. srpen 2006 at 16:36:47</time-stamp>
<!-- Package MutexSubscriptionRegistry(1.0,Ladislav Lenart)* -->


<component-property>
<name>MutexSubscriptionRegistry</name> <type>package</type>
<property>fractalCreator</property> <value>'fractal'</value>
</component-property>

<component-property>
<name>MutexSubscriptionRegistry</name> <type>package</type>
<property>packageName</property> <value>'MutexSubscriptionRegistry'</value>
</component-property>

<component-property>
<name>MutexSubscriptionRegistry</name> <type>package</type>
<property>prerequisiteParcels</property> <value>#(#('System-Announcements' ''))</value>
</component-property>

<component-property>
<name>MutexSubscriptionRegistry</name> <type>package</type>
<property>developmentPrerequisites</property> <value>#(#(#any 'System-Announcements' ''))</value>
</component-property>

<component-property>
<name>MutexSubscriptionRegistry</name> <type>package</type>
<property>comment</property> <value>'MutexSubscriptionRegistry

I am a one class package with MutexSubscriptionRegistry, special kind
of SubscriptionRegistry that uses mutex instead of copy-on-write strategy.

Collaborators: System-Announcements.
'</value>
</component-property>

<class>
<name>MutexSubscriptionRegistry</name>
<environment>Core</environment>
<super>Core.SubscriptionRegistry</super>
<private>false</private>
<indexed-type>none</indexed-type>
<inst-vars>mutex </inst-vars>
<class-inst-vars></class-inst-vars>
<imports></imports>
<category>System-Announcements</category>
<attributes>
<package>MutexSubscriptionRegistry</package>
</attributes>
</class>

<comment>
<class-id>Core.MutexSubscriptionRegistry</class-id>
<body>MutexSubscriptionRegistry

I am SubscriptionRegistry that uses mutex instead of copy-on-write strategy.

Mutex is needed to ensure thread-safe behavior of the registry.
RecursionLock can not be used instead because we want to
prevent one concrete thread from modifying the registry while
delivering an announcement.

Instance variables:
        mutex &lt;Semaphore&gt;
</body>
</comment>


<methods>
<class-id>Core.MutexSubscriptionRegistry class</class-id> <category>instance creation</category>

<body package="MutexSubscriptionRegistry" selector="new">new

        ^self basicNew initialize.</body>
</methods>


<methods>
<class-id>Core.MutexSubscriptionRegistry</class-id> <category>testing</category>

<body package="MutexSubscriptionRegistry" selector="isEmpty">isEmpty

        ^mutex critical: [super isEmpty].</body>

<body package="MutexSubscriptionRegistry" selector="notEmpty">notEmpty

        ^mutex critical: [super notEmpty].</body>
</methods>

<methods>
<class-id>Core.MutexSubscriptionRegistry</class-id> <category>privileged</category>

<body package="MutexSubscriptionRegistry" selector="deliver:from:">deliver: announcement from: anObject
        "Deliver the announcement in critical section."

        mutex critical: [super deliver: announcement from: anObject].</body>
</methods>

<methods>
<class-id>Core.MutexSubscriptionRegistry</class-id> <category>initialize-release</category>

<body package="MutexSubscriptionRegistry" selector="initialize">initialize

        super initialize.
       
        mutex := Semaphore forMutualExclusion.</body>
</methods>

<methods>
<class-id>Core.MutexSubscriptionRegistry</class-id> <category>adding</category>

<body package="MutexSubscriptionRegistry" selector="addSubscription:">addSubscription: anAnnouncementSubscription
        "Add a subscription in critical section."

        mutex critical: [
                | subscriptions |
                subscriptions :=
                        classesAndSubscriptions
                                at: anAnnouncementSubscription announcementClass
                                ifAbsentPut: [OrderedCollection new].
                subscriptions add: anAnnouncementSubscription.
        ].</body>
</methods>

<methods>
<class-id>Core.MutexSubscriptionRegistry</class-id> <category>selecting</category>

<body package="MutexSubscriptionRegistry" selector="subscriptionsMatching:">subscriptionsMatching: anAnnouncement

        ^mutex critical: [super subscriptionsMatching: anAnnouncement].</body>

<body package="MutexSubscriptionRegistry" selector="subscriptionsFor:">subscriptionsFor: announcementClassOrSet

        ^mutex critical: [super subscriptionsFor: announcementClassOrSet].</body>

<body package="MutexSubscriptionRegistry" selector="subscriptionsOf:for:">subscriptionsOf: anObject for: announcementClassOrSet

        ^mutex critical: [super subscriptionsOf: anObject for: announcementClassOrSet].</body>

<body package="MutexSubscriptionRegistry" selector="subscriptionsOf:">subscriptionsOf: anObject

        ^mutex critical: [super subscriptionsOf: anObject].</body>

<body package="MutexSubscriptionRegistry" selector="allSubscriptions">allSubscriptions

        ^mutex critical: [super allSubscriptions].</body>
</methods>

<methods>
<class-id>Core.MutexSubscriptionRegistry</class-id> <category>removing</category>

<body package="MutexSubscriptionRegistry" selector="removeSubscription:">removeSubscription: anAnnouncementSubscription
        "Remove subscription in critical section."

        mutex critical: [
                | announcementClass subscriptions |
                announcementClass := anAnnouncementSubscription announcementClass.
                subscriptions := classesAndSubscriptions at: announcementClass ifAbsent: [^self].
                subscriptions remove: anAnnouncementSubscription ifAbsent: [^self].
                anAnnouncementSubscription deactivate.
                subscriptions isEmpty ifTrue: [classesAndSubscriptions removeKey: announcementClass].
        ].</body>
</methods>



</st-source>