View Javadoc

1   /***
2    *  Copyright 2003-2008 Luck Consulting Pty Ltd
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.Ehcache;
21  import net.sf.ehcache.Element;
22  import net.sf.ehcache.Status;
23  
24  
25  
26  import java.io.Serializable;
27  import java.rmi.UnmarshalException;
28  import java.util.ArrayList;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.logging.Logger;
32  import java.util.logging.Level;
33  
34  /***
35   * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
36   * {@link CachePeer} peers of the Cache asynchronously.
37   * <p/>
38   * Updates are guaranteed to be replicated in the order in which they are received.
39   * <p/>
40   * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
41   * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
42   * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
43   * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
44   * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
45   * just using the DiskStore.
46   * <p/>
47   * Accordingly, the Element values in {@link EventMessage}s are held by {@link java.lang.ref.SoftReference} in the queue,
48   * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
49   * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
50   * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
51   * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
52   * up, or put up with a few lost messages while the heap grows.
53   *
54   * @author Greg Luck
55   * @version $Id: RMIAsynchronousCacheReplicator.java 744 2008-08-16 20:10:49Z gregluck $
56   */
57  public class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator {
58  
59  
60      private static final Logger LOG = Logger.getLogger(RMIAsynchronousCacheReplicator.class.getName());
61  
62      /***
63       * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
64       */
65      protected Thread replicationThread = new ReplicationThread();
66  
67      /***
68       * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
69       * before checking again.
70       */
71      protected int asynchronousReplicationInterval;
72  
73      /***
74       * A queue of updates.
75       */
76      protected final List replicationQueue = new LinkedList();
77  
78      /***
79       * Constructor for internal and subclass use
80       *
81       * @param replicatePuts
82       * @param replicateUpdates
83       * @param replicateUpdatesViaCopy
84       * @param replicateRemovals
85       * @param asynchronousReplicationInterval
86       *
87       */
88      public RMIAsynchronousCacheReplicator(
89              boolean replicatePuts,
90              boolean replicateUpdates,
91              boolean replicateUpdatesViaCopy,
92              boolean replicateRemovals,
93              int asynchronousReplicationInterval) {
94          super(replicatePuts,
95                  replicateUpdates,
96                  replicateUpdatesViaCopy,
97                  replicateRemovals);
98          this.asynchronousReplicationInterval = asynchronousReplicationInterval;
99          status = Status.STATUS_ALIVE;
100         replicationThread.start();
101     }
102 
103     /***
104      * RemoteDebugger method for the replicationQueue thread.
105      * <p/>
106      * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
107      */
108     private void replicationThreadMain() {
109         while (true) {
110             // Wait for elements in the replicationQueue
111             while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
112                 try {
113                     Thread.sleep(asynchronousReplicationInterval);
114                 } catch (InterruptedException e) {
115                     LOG.fine("Spool Thread interrupted.");
116                     return;
117                 }
118             }
119             if (notAlive()) {
120                 return;
121             }
122             try {
123                 if (replicationQueue.size() != 0) {
124                     flushReplicationQueue();
125                 }
126             } catch (Throwable e) {
127                 LOG.log(Level.SEVERE, "Exception on flushing of replication queue: " + e.getMessage()
128                         + ". Continuing...", e);
129             }
130         }
131     }
132 
133 
134     /***
135      * {@inheritDoc}
136      * <p/>
137      * This implementation queues the put notification for in-order replication to peers.
138      *
139      * @param cache   the cache emitting the notification
140      * @param element the element which was just put into the cache.
141      */
142     public final void notifyElementPut(final Ehcache cache, final Element element) throws CacheException {
143         if (notAlive()) {
144             return;
145         }
146 
147         if (!replicatePuts) {
148             return;
149         }
150 
151         if (!element.isSerializable()) {
152             if (LOG.isLoggable(Level.WARNING)) {
153                 LOG.warning("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated");
154             }
155             return;
156         }
157         addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
158     }
159 
160     /***
161      * Called immediately after an element has been put into the cache and the element already
162      * existed in the cache. This is thus an update.
163      * <p/>
164      * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
165      * will block until this method returns.
166      * <p/>
167      * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
168      * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
169      *
170      * @param cache   the cache emitting the notification
171      * @param element the element which was just put into the cache.
172      */
173     public final void notifyElementUpdated(final Ehcache cache, final Element element) throws CacheException {
174         if (notAlive()) {
175             return;
176         }
177         if (!replicateUpdates) {
178             return;
179         }
180 
181         if (replicateUpdatesViaCopy) {
182             if (!element.isSerializable()) {
183                 if (LOG.isLoggable(Level.WARNING)) {
184                     LOG.warning("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy");
185                 }
186                 return;
187             }
188             addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
189         } else {
190             if (!element.isKeySerializable()) {
191                 if (LOG.isLoggable(Level.WARNING)) {
192                     LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
193                 }
194                 return;
195             }
196             addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
197         }
198     }
199 
200     /***
201      * Called immediately after an attempt to remove an element. The remove method will block until
202      * this method returns.
203      * <p/>
204      * This notification is received regardless of whether the cache had an element matching
205      * the removal key or not. If an element was removed, the element is passed to this method,
206      * otherwise a synthetic element, with only the key set is passed in.
207      * <p/>
208      *
209      * @param cache   the cache emitting the notification
210      * @param element the element just deleted, or a synthetic element with just the key set if
211      *                no element was removed.
212      */
213     public final void notifyElementRemoved(final Ehcache cache, final Element element) throws CacheException {
214         if (notAlive()) {
215             return;
216         }
217 
218         if (!replicateRemovals) {
219             return;
220         }
221 
222         if (!element.isKeySerializable()) {
223             if (LOG.isLoggable(Level.WARNING)) {
224                 LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
225             }
226             return;
227         }
228         addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
229     }
230 
231 
232     /***
233      * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
234      * elements have been removed from the cache in a bulk operation. The usual
235      * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
236      * is not called.
237      * <p/>
238      * This notification exists because clearing a cache is a special case. It is often
239      * not practical to serially process notifications where potentially millions of elements
240      * have been bulk deleted.
241      *
242      * @param cache the cache emitting the notification
243      */
244     public void notifyRemoveAll(final Ehcache cache) {
245         if (notAlive()) {
246             return;
247         }
248 
249         if (!replicateRemovals) {
250             return;
251         }
252 
253         addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE_ALL, cache, null, null));
254     }
255 
256 
257     /***
258      * Adds a message to the queue.
259      * <p/>
260      * This method checks the state of the replication thread and warns
261      * if it has stopped and then discards the message.
262      *
263      * @param cacheEventMessage
264      */
265     protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
266         if (!replicationThread.isAlive()) {
267             LOG.severe("CacheEventMessages cannot be added to the replication queue"
268                     + " because the replication thread has died.");
269         } else {
270             synchronized (replicationQueue) {
271                 replicationQueue.add(cacheEventMessage);
272             }
273         }
274     }
275 
276 
277     /***
278      * Gets called once per {@link #asynchronousReplicationInterval}.
279      * <p/>
280      * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
281      * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
282      * <p/>
283      * Makes a copy of the queue so as not to hold up the enqueue operations.
284      * <p/>
285      * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
286      * due to peers becoming unavailable.
287      * <p/>
288      * This method issues warnings for problems that can be fixed with configuration changes.
289      */
290     private void flushReplicationQueue() {
291         List replicationQueueCopy;
292         synchronized (replicationQueue) {
293             if (replicationQueue.size() == 0) {
294                 return;
295             }
296 
297             replicationQueueCopy = new ArrayList(replicationQueue);
298             replicationQueue.clear();
299         }
300 
301 
302         Ehcache cache = ((CacheEventMessage) replicationQueueCopy.get(0)).cache;
303         List cachePeers = listRemoteCachePeers(cache);
304 
305         List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
306 
307 
308         for (int j = 0; j < cachePeers.size(); j++) {
309             CachePeer cachePeer = (CachePeer) cachePeers.get(j);
310             try {
311                 cachePeer.send(resolvedEventMessages);
312             } catch (UnmarshalException e) {
313                 String message = e.getMessage();
314                 if (message.indexOf("Read time out") != 0) {
315                     LOG.warning("Unable to send message to remote peer due to socket read timeout. Consider increasing" +
316                             " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
317                             "Message was: " + e.getMessage());
318                 } else {
319                     LOG.fine("Unable to send message to remote peer.  Message was: " + e.getMessage());
320                 }
321             } catch (Throwable t) {
322                 LOG.log(Level.WARNING, "Unable to send message to remote peer.  Message was: " + t.getMessage(), t);
323             }
324         }
325         if (LOG.isLoggable(Level.WARNING)) {
326             int eventMessagesNotResolved = replicationQueueCopy.size() - resolvedEventMessages.size();
327             if (eventMessagesNotResolved > 0) {
328                 LOG.warning(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
329                         "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
330                         "starting heap size to a higher value.");
331             }
332 
333         }
334     }
335 
336     /***
337      * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
338      * <p/>
339      * If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
340      * propagated. This only affects puts and updates via copy.
341      *
342      * @param replicationQueueCopy
343      * @return a list of EventMessages which were able to be resolved
344      */
345     private static List extractAndResolveEventMessages(List replicationQueueCopy) {
346         List list = new ArrayList();
347         for (int i = 0; i < replicationQueueCopy.size(); i++) {
348             EventMessage eventMessage = ((CacheEventMessage) replicationQueueCopy.get(i)).getEventMessage();
349             if (eventMessage != null && eventMessage.isValid()) {
350                 list.add(eventMessage);
351             }
352         }
353         return list;
354     }
355 
356     /***
357      * A background daemon thread that writes objects to the file.
358      */
359     private final class ReplicationThread extends Thread {
360         public ReplicationThread() {
361             super("Replication Thread");
362             setDaemon(true);
363             setPriority(Thread.NORM_PRIORITY);
364         }
365 
366         /***
367          * RemoteDebugger thread method.
368          */
369         public final void run() {
370             replicationThreadMain();
371         }
372     }
373 
374 
375     /***
376      * A wrapper around an EventMessage, which enables the element to be enqueued along with
377      * what is to be done with it.
378      * <p/>
379      * The wrapper holds a {@link java.lang.ref.SoftReference} to the {@link EventMessage}, so that the queue is never
380      * the cause of an {@link OutOfMemoryError}
381      */
382     private static class CacheEventMessage {
383 
384         private final Ehcache cache;
385         private final EventMessage eventMessage;
386 
387         public CacheEventMessage(int event, Ehcache cache, Element element, Serializable key) {
388             eventMessage = new EventMessage(event, key, element);
389             this.cache = cache;
390         }
391 
392         /***
393          * Gets the component EventMessage
394          */
395         public final EventMessage getEventMessage() {
396             return eventMessage;
397         }
398 
399     }
400 
401     /***
402      * Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
403      */
404     public final void dispose() {
405         status = Status.STATUS_SHUTDOWN;
406         flushReplicationQueue();
407     }
408 
409 
410     /***
411      * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
412      * <p/>
413      * This may not be possible for listeners after they have been initialized. Implementations should throw
414      * CloneNotSupportedException if they do not support clone.
415      *
416      * @return a clone
417      * @throws CloneNotSupportedException if the listener could not be cloned.
418      */
419     public Object clone() throws CloneNotSupportedException {
420         //shutup checkstyle
421         super.clone();
422         return new RMIAsynchronousCacheReplicator(replicatePuts, replicateUpdates,
423                 replicateUpdatesViaCopy, replicateRemovals, asynchronousReplicationInterval);
424     }
425 
426 
427 }