View Javadoc

1   /***
2    *  Copyright 2003-2008 Luck Consulting Pty Ltd
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution.jgroups;
18  
19  import java.io.Serializable;
20  import java.rmi.RemoteException;
21  import java.rmi.UnmarshalException;
22  import java.util.ArrayList;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.logging.Logger;
26  import java.util.logging.Level;
27  
28  
29  import net.sf.ehcache.CacheException;
30  import net.sf.ehcache.Ehcache;
31  import net.sf.ehcache.Element;
32  import net.sf.ehcache.Status;
33  import net.sf.ehcache.distribution.CacheManagerPeerProvider;
34  import net.sf.ehcache.distribution.CachePeer;
35  import net.sf.ehcache.distribution.CacheReplicator;
36  
37  /***
38   * @author Pierre Monestie (pmonestie[at]@gmail.com)
39   * @author <a href="mailto:gluck@gregluck.com">Greg Luck</a>
40   * @version $Id: JGroupsCacheReplicator.java 744 2008-08-16 20:10:49Z gregluck $
41   *          <p/> This implements CacheReplicator using JGroups as underlying
42   *          replication mechanism The peer provider should be of type
43   *          JGroupsCacheManagerPeerProvider It is assumed that the cachepeer is
44   *          a JGroupManager
45   */
46  public class JGroupsCacheReplicator implements CacheReplicator {
47      /***
48       * Teh default interval for async cache replication
49       */
50      public static final long DEFAULT_ASYNC_INTERVAL = 1000;
51      
52      private static final Logger LOG = Logger.getLogger(JGroupsCacheReplicator.class.getName());
53  
54      private long asynchronousReplicationInterval = DEFAULT_ASYNC_INTERVAL;
55  
56      /***
57       * Whether or not to replicate puts
58       */
59      private boolean replicatePuts;
60  
61      /***
62       * Whether or not to replicate updates
63       */
64      private boolean replicateUpdates;
65  
66      /***
67       * Replicate update via copying, if false via deleting
68       */
69      private boolean replicateUpdatesViaCopy;
70  
71      /***
72       * Whether or not to replicate remove events
73       */
74      private boolean replicateRemovals;
75  
76      /***
77       * Weather or not to replicate asynchronously. If true a background thread
78       * is ran and fire update at a set intervale
79       */
80      private boolean replicateAsync;
81  
82      private ReplicationThread replicationThread;
83  
84      private List replicationQueue = new LinkedList();
85  
86      private Status status;
87  
88      
89  
90      /***
91       * Constructor called by factory
92       * 
93       * @param replicatePuts
94       * @param replicateUpdates
95       * @param replicateUpdatesViaCopy
96       * @param replicateRemovals
97       * @param replicateAsync
98       */
99      public JGroupsCacheReplicator(boolean replicatePuts, boolean replicateUpdates, boolean replicateUpdatesViaCopy,
100             boolean replicateRemovals, boolean replicateAsync) {
101         super();
102 
103         this.replicatePuts = replicatePuts;
104         this.replicateUpdates = replicateUpdates;
105         this.replicateUpdatesViaCopy = replicateUpdatesViaCopy;
106         this.replicateRemovals = replicateRemovals;
107         this.replicateAsync = replicateAsync;
108 
109         if (replicateAsync) {
110             replicationThread = new ReplicationThread();
111             replicationThread.start();
112         }
113         status = Status.STATUS_ALIVE;
114     }
115 
116     /***
117      * Weather or not the cache is replicated asynchronously. If true a
118      * background thread is ran and fire update at a set intervale
119      * 
120      * @return true if replicated asynchronously, false otherwise
121      */
122     public boolean isReplicateAsync() {
123         return replicateAsync;
124     }
125 
126     /***
127      * Wether or not puts are replicated
128      * 
129      * @return true if puts are replicated, false otherwise
130      */
131     public boolean isReplicatePuts() {
132         return replicatePuts;
133     }
134 
135     /***
136      * wether or not removals are replicated
137      * 
138      * @return true if removals are replicated, false otherwise
139      */
140     public boolean isReplicateRemovals() {
141         return replicateRemovals;
142     }
143 
144     /***
145      * Wether or not updates are replicated
146      * 
147      * @return true if replicated, false otherwise
148      */
149     public boolean isReplicateUpdates() {
150         return replicateUpdates;
151     }
152 
153     /***
154      * {@inheritDoc}
155      */
156     public boolean alive() {
157         return true;
158     }
159 
160     /***
161      * {@inheritDoc}
162      */
163     public boolean isReplicateUpdatesViaCopy() {
164         return replicateUpdatesViaCopy;
165     }
166 
167     /***
168      * {@inheritDoc}
169      */
170     public boolean notAlive() {
171         return false;
172     }
173 
174     /***
175      * {@inheritDoc}
176      */
177     public void dispose() {
178         status = Status.STATUS_SHUTDOWN;
179         flushReplicationQueue();
180     }
181 
182     /***
183      * {@inheritDoc}
184      */
185     public void notifyElementExpired(Ehcache cache, Element element) {
186 
187         // LOG.finest("Sending out exp el:"+element);
188 
189     }
190 
191     /***
192      * Used to send notification to the peer. If Async this method simply add
193      * the element to the replication queue. If not async, searches for the
194      * cachePeer and send the Message. That way the class handles both async and
195      * sync replication Sending is delegated to the peer (of type JGroupManager)
196      * 
197      * @param cache
198      * @param e
199      */
200     protected void sendNotification(Ehcache cache, JGroupEventMessage e) {
201 
202         if (replicateAsync) {
203             addMessageToQueue(e);
204             return;
205         }
206         CacheManagerPeerProvider provider = cache.getCacheManager().getCachePeerProvider();
207         List l = provider.listRemoteCachePeers(cache);
208         ArrayList a = new ArrayList();
209 
210         a.add(e);
211 
212         for (int i = 0; i < l.size(); i++) {
213             CachePeer peer = (CachePeer) l.get(i);
214             try {
215                 peer.send(a);
216             } catch (RemoteException e1) {
217                 // e1.printStackTrace();
218             }
219             // peer.
220         }
221 
222     }
223 
224     /***
225      * {@inheritDoc}
226      */
227     public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
228         if (notAlive()) {
229             return;
230         }
231 
232         if (isReplicatePuts()) {
233             // if (log.isTraceEnabled())
234             // LOG.finest("Sending out add/upd el:" + element);
235             replicatePutNotification(cache, element);
236         }
237 
238     }
239 
240     private void replicatePutNotification(Ehcache cache, Element element) {
241         if (!element.isKeySerializable()) {
242             LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
243             return;
244         }
245         if (!element.isSerializable()) {
246             LOG.warning("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy");
247             return;
248         }
249         JGroupEventMessage e = new JGroupEventMessage(JGroupEventMessage.PUT, (Serializable) element.getObjectKey(), element,
250                 cache, cache.getName());
251 
252         sendNotification(cache, e);
253     }
254 
255     private void replicateRemoveNotification(Ehcache cache, Element element) {
256         if (!element.isKeySerializable()) {
257             LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
258             return;
259         }
260         JGroupEventMessage e = new JGroupEventMessage(JGroupEventMessage.REMOVE, (Serializable) element.getObjectKey(), null,
261                 cache, cache.getName());
262 
263         sendNotification(cache, e);
264     }
265 
266     /***
267      * {@inheritDoc}
268      */
269     public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
270         if (notAlive()) {
271             return;
272         }
273         if (isReplicateRemovals()) {
274             replicateRemoveNotification(cache, element);
275 
276         }
277 
278     }
279 
280     /***
281      * {@inheritDoc}
282      */
283     public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
284         if (notAlive()) {
285             return;
286         }
287         if (!replicateUpdates) {
288             return;
289         }
290 
291         if (isReplicateUpdatesViaCopy()) {
292             replicatePutNotification(cache, element);
293         } else {
294             replicateRemoveNotification(cache, element);
295         }
296 
297     }
298 
299     /***
300      * {@inheritDoc}
301      */
302     public void notifyElementEvicted(Ehcache cache, Element element) {
303 
304     }
305 
306     /***
307      * {@inheritDoc}
308      */
309     public void notifyRemoveAll(Ehcache cache) {
310         if (isReplicateRemovals()) {
311             LOG.finest("Remove all elements called");
312             JGroupEventMessage e = new JGroupEventMessage(JGroupEventMessage.REMOVE_ALL, null, null, cache, cache.getName());
313             sendNotification(cache, e);
314         }
315 
316     }
317 
318     /***
319      * Package protected List of cache peers
320      * 
321      * @param cache
322      * @return a list of {@link CachePeer} peers for the given cache, excluding
323      *         the local peer.
324      */
325     static List listRemoteCachePeers(Ehcache cache) {
326         CacheManagerPeerProvider provider = cache.getCacheManager().getCachePeerProvider();
327         return provider.listRemoteCachePeers(cache);
328     }
329 
330     /***
331      * {@inheritDoc}
332      */
333     public Object clone() throws CloneNotSupportedException {
334         return super.clone();
335     }
336 
337     /***
338      * The replication thread
339      * 
340      * @author pierrem
341      * 
342      */
343     private final class ReplicationThread extends Thread {
344         public ReplicationThread() {
345             super("Replication Thread");
346             setDaemon(true);
347             setPriority(Thread.NORM_PRIORITY);
348         }
349 
350         /***
351          * RemoteDebugger thread method.
352          */
353         public final void run() {
354             replicationThreadMain();
355         }
356     }
357 
358     private void replicationThreadMain() {
359         while (true) {
360             // Wait for elements in the replicationQueue
361             while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
362                 try {
363 
364                     Thread.sleep(asynchronousReplicationInterval);
365                 } catch (InterruptedException e) {
366                     LOG.fine("Spool Thread interrupted.");
367                     return;
368                 }
369             }
370             if (notAlive()) {
371                 return;
372             }
373             try {
374                 if (replicationQueue.size() != 0) {
375                     flushReplicationQueue();
376                 }
377             } catch (Throwable e) {
378                 LOG.log(Level.WARNING, "Exception on flushing of replication queue: " + e.getMessage() + ". Continuing...", e);
379             }
380         }
381     }
382 
383     private void addMessageToQueue(JGroupEventMessage msg) {
384         synchronized (replicationQueue) {
385             replicationQueue.add(msg);
386         }
387     }
388 
389     /***
390      * Gets called once per {@link #asynchronousReplicationInterval}. <p/>
391      * Sends accumulated messages in bulk to each peer. i.e. if ther are 100
392      * messages and 1 peer, 1 RMI invocation results, not 100. Also, if a peer
393      * is unavailable this is discovered in only 1 try. <p/> Makes a copy of the
394      * queue so as not to hold up the enqueue operations. <p/> Any exceptions
395      * are caught so that the replication thread does not die, and because
396      * errors are expected, due to peers becoming unavailable. <p/> This method
397      * issues warnings for problems that can be fixed with configuration
398      * changes.
399      */
400     private void flushReplicationQueue() {
401         List resolvedEventMessages;
402         Ehcache cache;
403         synchronized (replicationQueue) {
404             if (replicationQueue.size() == 0) {
405                 return;
406             }
407             resolvedEventMessages = extractAndResolveEventMessages(replicationQueue);
408             cache = ((JGroupEventMessage) replicationQueue.get(0)).getCache();
409             replicationQueue.clear();
410         }
411 
412         List cachePeers = listRemoteCachePeers(cache);
413 
414         for (int j = 0; j < cachePeers.size(); j++) {
415             CachePeer cachePeer = (CachePeer) cachePeers.get(j);
416             try {
417                 cachePeer.send(resolvedEventMessages);
418             } catch (UnmarshalException e) {
419                 String message = e.getMessage();
420                 if (message.indexOf("Read time out") != 0) {
421                     LOG.warning("Unable to send message to remote peer due to socket read timeout. Consider increasing"
422                             + " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " + "Message was: "
423                             + e.getMessage());
424                 } else {
425                     LOG.fine("Unable to send message to remote peer.  Message was: " + e.getMessage());
426                 }
427             } catch (Throwable t) {
428                 LOG.log(Level.WARNING, "Unable to send message to remote peer.  Message was: " + t.getMessage(), t);
429             }
430         }
431 
432     }
433 
434     /***
435      * Extracts CacheEventMessages and attempts to get a hard reference to the
436      * underlying EventMessage <p/> If an EventMessage has been invalidated due
437      * to SoftReference collection of the Element, it is not propagated. This
438      * only affects puts and updates via copy.
439      * 
440      * @param replicationQueueCopy
441      * @return a list of EventMessages which were able to be resolved
442      */
443     private static List extractAndResolveEventMessages(List replicationQueueCopy) {
444         List list = new ArrayList();
445         for (int i = 0; i < replicationQueueCopy.size(); i++) {
446             JGroupEventMessage eventMessage = (JGroupEventMessage) replicationQueueCopy.get(i);
447             if (eventMessage != null && eventMessage.isValid()) {
448                 list.add(eventMessage);
449             } else {
450                 LOG.severe("Collected soft ref");
451             }
452         }
453         return list;
454     }
455 
456     /***
457      * Get the time interval is ms between asynchronous replication
458      * 
459      * @return the interval
460      */
461     public long getAsynchronousReplicationInterval() {
462         return asynchronousReplicationInterval;
463     }
464 
465     /***
466      * Set the time inteval for asynchronous replication
467      * 
468      * @param asynchronousReplicationInterval
469      *            the interval between replication
470      */
471     public void setAsynchronousReplicationInterval(long asynchronousReplicationInterval) {
472         this.asynchronousReplicationInterval = asynchronousReplicationInterval;
473     }
474 
475 }