1 /***
2 * Copyright 2003-2008 Luck Consulting Pty Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution.jgroups;
18
19 import java.io.Serializable;
20 import java.rmi.RemoteException;
21 import java.rmi.UnmarshalException;
22 import java.util.ArrayList;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.logging.Logger;
26 import java.util.logging.Level;
27
28
29 import net.sf.ehcache.CacheException;
30 import net.sf.ehcache.Ehcache;
31 import net.sf.ehcache.Element;
32 import net.sf.ehcache.Status;
33 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
34 import net.sf.ehcache.distribution.CachePeer;
35 import net.sf.ehcache.distribution.CacheReplicator;
36
37 /***
38 * @author Pierre Monestie (pmonestie[at]@gmail.com)
39 * @author <a href="mailto:gluck@gregluck.com">Greg Luck</a>
40 * @version $Id: JGroupsCacheReplicator.java 744 2008-08-16 20:10:49Z gregluck $
41 * <p/> This implements CacheReplicator using JGroups as underlying
42 * replication mechanism The peer provider should be of type
43 * JGroupsCacheManagerPeerProvider It is assumed that the cachepeer is
44 * a JGroupManager
45 */
46 public class JGroupsCacheReplicator implements CacheReplicator {
47 /***
48 * Teh default interval for async cache replication
49 */
50 public static final long DEFAULT_ASYNC_INTERVAL = 1000;
51
52 private static final Logger LOG = Logger.getLogger(JGroupsCacheReplicator.class.getName());
53
54 private long asynchronousReplicationInterval = DEFAULT_ASYNC_INTERVAL;
55
56 /***
57 * Whether or not to replicate puts
58 */
59 private boolean replicatePuts;
60
61 /***
62 * Whether or not to replicate updates
63 */
64 private boolean replicateUpdates;
65
66 /***
67 * Replicate update via copying, if false via deleting
68 */
69 private boolean replicateUpdatesViaCopy;
70
71 /***
72 * Whether or not to replicate remove events
73 */
74 private boolean replicateRemovals;
75
76 /***
77 * Weather or not to replicate asynchronously. If true a background thread
78 * is ran and fire update at a set intervale
79 */
80 private boolean replicateAsync;
81
82 private ReplicationThread replicationThread;
83
84 private List replicationQueue = new LinkedList();
85
86 private Status status;
87
88
89
90 /***
91 * Constructor called by factory
92 *
93 * @param replicatePuts
94 * @param replicateUpdates
95 * @param replicateUpdatesViaCopy
96 * @param replicateRemovals
97 * @param replicateAsync
98 */
99 public JGroupsCacheReplicator(boolean replicatePuts, boolean replicateUpdates, boolean replicateUpdatesViaCopy,
100 boolean replicateRemovals, boolean replicateAsync) {
101 super();
102
103 this.replicatePuts = replicatePuts;
104 this.replicateUpdates = replicateUpdates;
105 this.replicateUpdatesViaCopy = replicateUpdatesViaCopy;
106 this.replicateRemovals = replicateRemovals;
107 this.replicateAsync = replicateAsync;
108
109 if (replicateAsync) {
110 replicationThread = new ReplicationThread();
111 replicationThread.start();
112 }
113 status = Status.STATUS_ALIVE;
114 }
115
116 /***
117 * Weather or not the cache is replicated asynchronously. If true a
118 * background thread is ran and fire update at a set intervale
119 *
120 * @return true if replicated asynchronously, false otherwise
121 */
122 public boolean isReplicateAsync() {
123 return replicateAsync;
124 }
125
126 /***
127 * Wether or not puts are replicated
128 *
129 * @return true if puts are replicated, false otherwise
130 */
131 public boolean isReplicatePuts() {
132 return replicatePuts;
133 }
134
135 /***
136 * wether or not removals are replicated
137 *
138 * @return true if removals are replicated, false otherwise
139 */
140 public boolean isReplicateRemovals() {
141 return replicateRemovals;
142 }
143
144 /***
145 * Wether or not updates are replicated
146 *
147 * @return true if replicated, false otherwise
148 */
149 public boolean isReplicateUpdates() {
150 return replicateUpdates;
151 }
152
153 /***
154 * {@inheritDoc}
155 */
156 public boolean alive() {
157 return true;
158 }
159
160 /***
161 * {@inheritDoc}
162 */
163 public boolean isReplicateUpdatesViaCopy() {
164 return replicateUpdatesViaCopy;
165 }
166
167 /***
168 * {@inheritDoc}
169 */
170 public boolean notAlive() {
171 return false;
172 }
173
174 /***
175 * {@inheritDoc}
176 */
177 public void dispose() {
178 status = Status.STATUS_SHUTDOWN;
179 flushReplicationQueue();
180 }
181
182 /***
183 * {@inheritDoc}
184 */
185 public void notifyElementExpired(Ehcache cache, Element element) {
186
187
188
189 }
190
191 /***
192 * Used to send notification to the peer. If Async this method simply add
193 * the element to the replication queue. If not async, searches for the
194 * cachePeer and send the Message. That way the class handles both async and
195 * sync replication Sending is delegated to the peer (of type JGroupManager)
196 *
197 * @param cache
198 * @param e
199 */
200 protected void sendNotification(Ehcache cache, JGroupEventMessage e) {
201
202 if (replicateAsync) {
203 addMessageToQueue(e);
204 return;
205 }
206 CacheManagerPeerProvider provider = cache.getCacheManager().getCachePeerProvider();
207 List l = provider.listRemoteCachePeers(cache);
208 ArrayList a = new ArrayList();
209
210 a.add(e);
211
212 for (int i = 0; i < l.size(); i++) {
213 CachePeer peer = (CachePeer) l.get(i);
214 try {
215 peer.send(a);
216 } catch (RemoteException e1) {
217
218 }
219
220 }
221
222 }
223
224 /***
225 * {@inheritDoc}
226 */
227 public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
228 if (notAlive()) {
229 return;
230 }
231
232 if (isReplicatePuts()) {
233
234
235 replicatePutNotification(cache, element);
236 }
237
238 }
239
240 private void replicatePutNotification(Ehcache cache, Element element) {
241 if (!element.isKeySerializable()) {
242 LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
243 return;
244 }
245 if (!element.isSerializable()) {
246 LOG.warning("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy");
247 return;
248 }
249 JGroupEventMessage e = new JGroupEventMessage(JGroupEventMessage.PUT, (Serializable) element.getObjectKey(), element,
250 cache, cache.getName());
251
252 sendNotification(cache, e);
253 }
254
255 private void replicateRemoveNotification(Ehcache cache, Element element) {
256 if (!element.isKeySerializable()) {
257 LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
258 return;
259 }
260 JGroupEventMessage e = new JGroupEventMessage(JGroupEventMessage.REMOVE, (Serializable) element.getObjectKey(), null,
261 cache, cache.getName());
262
263 sendNotification(cache, e);
264 }
265
266 /***
267 * {@inheritDoc}
268 */
269 public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
270 if (notAlive()) {
271 return;
272 }
273 if (isReplicateRemovals()) {
274 replicateRemoveNotification(cache, element);
275
276 }
277
278 }
279
280 /***
281 * {@inheritDoc}
282 */
283 public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
284 if (notAlive()) {
285 return;
286 }
287 if (!replicateUpdates) {
288 return;
289 }
290
291 if (isReplicateUpdatesViaCopy()) {
292 replicatePutNotification(cache, element);
293 } else {
294 replicateRemoveNotification(cache, element);
295 }
296
297 }
298
299 /***
300 * {@inheritDoc}
301 */
302 public void notifyElementEvicted(Ehcache cache, Element element) {
303
304 }
305
306 /***
307 * {@inheritDoc}
308 */
309 public void notifyRemoveAll(Ehcache cache) {
310 if (isReplicateRemovals()) {
311 LOG.finest("Remove all elements called");
312 JGroupEventMessage e = new JGroupEventMessage(JGroupEventMessage.REMOVE_ALL, null, null, cache, cache.getName());
313 sendNotification(cache, e);
314 }
315
316 }
317
318 /***
319 * Package protected List of cache peers
320 *
321 * @param cache
322 * @return a list of {@link CachePeer} peers for the given cache, excluding
323 * the local peer.
324 */
325 static List listRemoteCachePeers(Ehcache cache) {
326 CacheManagerPeerProvider provider = cache.getCacheManager().getCachePeerProvider();
327 return provider.listRemoteCachePeers(cache);
328 }
329
330 /***
331 * {@inheritDoc}
332 */
333 public Object clone() throws CloneNotSupportedException {
334 return super.clone();
335 }
336
337 /***
338 * The replication thread
339 *
340 * @author pierrem
341 *
342 */
343 private final class ReplicationThread extends Thread {
344 public ReplicationThread() {
345 super("Replication Thread");
346 setDaemon(true);
347 setPriority(Thread.NORM_PRIORITY);
348 }
349
350 /***
351 * RemoteDebugger thread method.
352 */
353 public final void run() {
354 replicationThreadMain();
355 }
356 }
357
358 private void replicationThreadMain() {
359 while (true) {
360
361 while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
362 try {
363
364 Thread.sleep(asynchronousReplicationInterval);
365 } catch (InterruptedException e) {
366 LOG.fine("Spool Thread interrupted.");
367 return;
368 }
369 }
370 if (notAlive()) {
371 return;
372 }
373 try {
374 if (replicationQueue.size() != 0) {
375 flushReplicationQueue();
376 }
377 } catch (Throwable e) {
378 LOG.log(Level.WARNING, "Exception on flushing of replication queue: " + e.getMessage() + ". Continuing...", e);
379 }
380 }
381 }
382
383 private void addMessageToQueue(JGroupEventMessage msg) {
384 synchronized (replicationQueue) {
385 replicationQueue.add(msg);
386 }
387 }
388
389 /***
390 * Gets called once per {@link #asynchronousReplicationInterval}. <p/>
391 * Sends accumulated messages in bulk to each peer. i.e. if ther are 100
392 * messages and 1 peer, 1 RMI invocation results, not 100. Also, if a peer
393 * is unavailable this is discovered in only 1 try. <p/> Makes a copy of the
394 * queue so as not to hold up the enqueue operations. <p/> Any exceptions
395 * are caught so that the replication thread does not die, and because
396 * errors are expected, due to peers becoming unavailable. <p/> This method
397 * issues warnings for problems that can be fixed with configuration
398 * changes.
399 */
400 private void flushReplicationQueue() {
401 List resolvedEventMessages;
402 Ehcache cache;
403 synchronized (replicationQueue) {
404 if (replicationQueue.size() == 0) {
405 return;
406 }
407 resolvedEventMessages = extractAndResolveEventMessages(replicationQueue);
408 cache = ((JGroupEventMessage) replicationQueue.get(0)).getCache();
409 replicationQueue.clear();
410 }
411
412 List cachePeers = listRemoteCachePeers(cache);
413
414 for (int j = 0; j < cachePeers.size(); j++) {
415 CachePeer cachePeer = (CachePeer) cachePeers.get(j);
416 try {
417 cachePeer.send(resolvedEventMessages);
418 } catch (UnmarshalException e) {
419 String message = e.getMessage();
420 if (message.indexOf("Read time out") != 0) {
421 LOG.warning("Unable to send message to remote peer due to socket read timeout. Consider increasing"
422 + " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " + "Message was: "
423 + e.getMessage());
424 } else {
425 LOG.fine("Unable to send message to remote peer. Message was: " + e.getMessage());
426 }
427 } catch (Throwable t) {
428 LOG.log(Level.WARNING, "Unable to send message to remote peer. Message was: " + t.getMessage(), t);
429 }
430 }
431
432 }
433
434 /***
435 * Extracts CacheEventMessages and attempts to get a hard reference to the
436 * underlying EventMessage <p/> If an EventMessage has been invalidated due
437 * to SoftReference collection of the Element, it is not propagated. This
438 * only affects puts and updates via copy.
439 *
440 * @param replicationQueueCopy
441 * @return a list of EventMessages which were able to be resolved
442 */
443 private static List extractAndResolveEventMessages(List replicationQueueCopy) {
444 List list = new ArrayList();
445 for (int i = 0; i < replicationQueueCopy.size(); i++) {
446 JGroupEventMessage eventMessage = (JGroupEventMessage) replicationQueueCopy.get(i);
447 if (eventMessage != null && eventMessage.isValid()) {
448 list.add(eventMessage);
449 } else {
450 LOG.severe("Collected soft ref");
451 }
452 }
453 return list;
454 }
455
456 /***
457 * Get the time interval is ms between asynchronous replication
458 *
459 * @return the interval
460 */
461 public long getAsynchronousReplicationInterval() {
462 return asynchronousReplicationInterval;
463 }
464
465 /***
466 * Set the time inteval for asynchronous replication
467 *
468 * @param asynchronousReplicationInterval
469 * the interval between replication
470 */
471 public void setAsynchronousReplicationInterval(long asynchronousReplicationInterval) {
472 this.asynchronousReplicationInterval = asynchronousReplicationInterval;
473 }
474
475 }