1 /***
2 * Copyright 2003-2008 Luck Consulting Pty Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.Ehcache;
21 import net.sf.ehcache.Element;
22 import net.sf.ehcache.Status;
23
24
25
26 import java.io.Serializable;
27 import java.rmi.UnmarshalException;
28 import java.util.ArrayList;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.logging.Logger;
32 import java.util.logging.Level;
33
34 /***
35 * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
36 * {@link CachePeer} peers of the Cache asynchronously.
37 * <p/>
38 * Updates are guaranteed to be replicated in the order in which they are received.
39 * <p/>
40 * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
41 * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
42 * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
43 * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
44 * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
45 * just using the DiskStore.
46 * <p/>
47 * Accordingly, the Element values in {@link EventMessage}s are held by {@link java.lang.ref.SoftReference} in the queue,
48 * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
49 * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
50 * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
51 * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
52 * up, or put up with a few lost messages while the heap grows.
53 *
54 * @author Greg Luck
55 * @version $Id: RMIAsynchronousCacheReplicator.java 744 2008-08-16 20:10:49Z gregluck $
56 */
57 public class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator {
58
59
60 private static final Logger LOG = Logger.getLogger(RMIAsynchronousCacheReplicator.class.getName());
61
62 /***
63 * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
64 */
65 protected Thread replicationThread = new ReplicationThread();
66
67 /***
68 * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
69 * before checking again.
70 */
71 protected int asynchronousReplicationInterval;
72
73 /***
74 * A queue of updates.
75 */
76 protected final List replicationQueue = new LinkedList();
77
78 /***
79 * Constructor for internal and subclass use
80 *
81 * @param replicatePuts
82 * @param replicateUpdates
83 * @param replicateUpdatesViaCopy
84 * @param replicateRemovals
85 * @param asynchronousReplicationInterval
86 *
87 */
88 public RMIAsynchronousCacheReplicator(
89 boolean replicatePuts,
90 boolean replicateUpdates,
91 boolean replicateUpdatesViaCopy,
92 boolean replicateRemovals,
93 int asynchronousReplicationInterval) {
94 super(replicatePuts,
95 replicateUpdates,
96 replicateUpdatesViaCopy,
97 replicateRemovals);
98 this.asynchronousReplicationInterval = asynchronousReplicationInterval;
99 status = Status.STATUS_ALIVE;
100 replicationThread.start();
101 }
102
103 /***
104 * RemoteDebugger method for the replicationQueue thread.
105 * <p/>
106 * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
107 */
108 private void replicationThreadMain() {
109 while (true) {
110
111 while (alive() && replicationQueue != null && replicationQueue.size() == 0) {
112 try {
113 Thread.sleep(asynchronousReplicationInterval);
114 } catch (InterruptedException e) {
115 LOG.fine("Spool Thread interrupted.");
116 return;
117 }
118 }
119 if (notAlive()) {
120 return;
121 }
122 try {
123 if (replicationQueue.size() != 0) {
124 flushReplicationQueue();
125 }
126 } catch (Throwable e) {
127 LOG.log(Level.SEVERE, "Exception on flushing of replication queue: " + e.getMessage()
128 + ". Continuing...", e);
129 }
130 }
131 }
132
133
134 /***
135 * {@inheritDoc}
136 * <p/>
137 * This implementation queues the put notification for in-order replication to peers.
138 *
139 * @param cache the cache emitting the notification
140 * @param element the element which was just put into the cache.
141 */
142 public final void notifyElementPut(final Ehcache cache, final Element element) throws CacheException {
143 if (notAlive()) {
144 return;
145 }
146
147 if (!replicatePuts) {
148 return;
149 }
150
151 if (!element.isSerializable()) {
152 if (LOG.isLoggable(Level.WARNING)) {
153 LOG.warning("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated");
154 }
155 return;
156 }
157 addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
158 }
159
160 /***
161 * Called immediately after an element has been put into the cache and the element already
162 * existed in the cache. This is thus an update.
163 * <p/>
164 * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
165 * will block until this method returns.
166 * <p/>
167 * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
168 * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
169 *
170 * @param cache the cache emitting the notification
171 * @param element the element which was just put into the cache.
172 */
173 public final void notifyElementUpdated(final Ehcache cache, final Element element) throws CacheException {
174 if (notAlive()) {
175 return;
176 }
177 if (!replicateUpdates) {
178 return;
179 }
180
181 if (replicateUpdatesViaCopy) {
182 if (!element.isSerializable()) {
183 if (LOG.isLoggable(Level.WARNING)) {
184 LOG.warning("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy");
185 }
186 return;
187 }
188 addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
189 } else {
190 if (!element.isKeySerializable()) {
191 if (LOG.isLoggable(Level.WARNING)) {
192 LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
193 }
194 return;
195 }
196 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
197 }
198 }
199
200 /***
201 * Called immediately after an attempt to remove an element. The remove method will block until
202 * this method returns.
203 * <p/>
204 * This notification is received regardless of whether the cache had an element matching
205 * the removal key or not. If an element was removed, the element is passed to this method,
206 * otherwise a synthetic element, with only the key set is passed in.
207 * <p/>
208 *
209 * @param cache the cache emitting the notification
210 * @param element the element just deleted, or a synthetic element with just the key set if
211 * no element was removed.
212 */
213 public final void notifyElementRemoved(final Ehcache cache, final Element element) throws CacheException {
214 if (notAlive()) {
215 return;
216 }
217
218 if (!replicateRemovals) {
219 return;
220 }
221
222 if (!element.isKeySerializable()) {
223 if (LOG.isLoggable(Level.WARNING)) {
224 LOG.warning("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
225 }
226 return;
227 }
228 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
229 }
230
231
232 /***
233 * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
234 * elements have been removed from the cache in a bulk operation. The usual
235 * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
236 * is not called.
237 * <p/>
238 * This notification exists because clearing a cache is a special case. It is often
239 * not practical to serially process notifications where potentially millions of elements
240 * have been bulk deleted.
241 *
242 * @param cache the cache emitting the notification
243 */
244 public void notifyRemoveAll(final Ehcache cache) {
245 if (notAlive()) {
246 return;
247 }
248
249 if (!replicateRemovals) {
250 return;
251 }
252
253 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE_ALL, cache, null, null));
254 }
255
256
257 /***
258 * Adds a message to the queue.
259 * <p/>
260 * This method checks the state of the replication thread and warns
261 * if it has stopped and then discards the message.
262 *
263 * @param cacheEventMessage
264 */
265 protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
266 if (!replicationThread.isAlive()) {
267 LOG.severe("CacheEventMessages cannot be added to the replication queue"
268 + " because the replication thread has died.");
269 } else {
270 synchronized (replicationQueue) {
271 replicationQueue.add(cacheEventMessage);
272 }
273 }
274 }
275
276
277 /***
278 * Gets called once per {@link #asynchronousReplicationInterval}.
279 * <p/>
280 * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
281 * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
282 * <p/>
283 * Makes a copy of the queue so as not to hold up the enqueue operations.
284 * <p/>
285 * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
286 * due to peers becoming unavailable.
287 * <p/>
288 * This method issues warnings for problems that can be fixed with configuration changes.
289 */
290 private void flushReplicationQueue() {
291 List replicationQueueCopy;
292 synchronized (replicationQueue) {
293 if (replicationQueue.size() == 0) {
294 return;
295 }
296
297 replicationQueueCopy = new ArrayList(replicationQueue);
298 replicationQueue.clear();
299 }
300
301
302 Ehcache cache = ((CacheEventMessage) replicationQueueCopy.get(0)).cache;
303 List cachePeers = listRemoteCachePeers(cache);
304
305 List resolvedEventMessages = extractAndResolveEventMessages(replicationQueueCopy);
306
307
308 for (int j = 0; j < cachePeers.size(); j++) {
309 CachePeer cachePeer = (CachePeer) cachePeers.get(j);
310 try {
311 cachePeer.send(resolvedEventMessages);
312 } catch (UnmarshalException e) {
313 String message = e.getMessage();
314 if (message.indexOf("Read time out") != 0) {
315 LOG.warning("Unable to send message to remote peer due to socket read timeout. Consider increasing" +
316 " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
317 "Message was: " + e.getMessage());
318 } else {
319 LOG.fine("Unable to send message to remote peer. Message was: " + e.getMessage());
320 }
321 } catch (Throwable t) {
322 LOG.log(Level.WARNING, "Unable to send message to remote peer. Message was: " + t.getMessage(), t);
323 }
324 }
325 if (LOG.isLoggable(Level.WARNING)) {
326 int eventMessagesNotResolved = replicationQueueCopy.size() - resolvedEventMessages.size();
327 if (eventMessagesNotResolved > 0) {
328 LOG.warning(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
329 "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
330 "starting heap size to a higher value.");
331 }
332
333 }
334 }
335
336 /***
337 * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
338 * <p/>
339 * If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
340 * propagated. This only affects puts and updates via copy.
341 *
342 * @param replicationQueueCopy
343 * @return a list of EventMessages which were able to be resolved
344 */
345 private static List extractAndResolveEventMessages(List replicationQueueCopy) {
346 List list = new ArrayList();
347 for (int i = 0; i < replicationQueueCopy.size(); i++) {
348 EventMessage eventMessage = ((CacheEventMessage) replicationQueueCopy.get(i)).getEventMessage();
349 if (eventMessage != null && eventMessage.isValid()) {
350 list.add(eventMessage);
351 }
352 }
353 return list;
354 }
355
356 /***
357 * A background daemon thread that writes objects to the file.
358 */
359 private final class ReplicationThread extends Thread {
360 public ReplicationThread() {
361 super("Replication Thread");
362 setDaemon(true);
363 setPriority(Thread.NORM_PRIORITY);
364 }
365
366 /***
367 * RemoteDebugger thread method.
368 */
369 public final void run() {
370 replicationThreadMain();
371 }
372 }
373
374
375 /***
376 * A wrapper around an EventMessage, which enables the element to be enqueued along with
377 * what is to be done with it.
378 * <p/>
379 * The wrapper holds a {@link java.lang.ref.SoftReference} to the {@link EventMessage}, so that the queue is never
380 * the cause of an {@link OutOfMemoryError}
381 */
382 private static class CacheEventMessage {
383
384 private final Ehcache cache;
385 private final EventMessage eventMessage;
386
387 public CacheEventMessage(int event, Ehcache cache, Element element, Serializable key) {
388 eventMessage = new EventMessage(event, key, element);
389 this.cache = cache;
390 }
391
392 /***
393 * Gets the component EventMessage
394 */
395 public final EventMessage getEventMessage() {
396 return eventMessage;
397 }
398
399 }
400
401 /***
402 * Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
403 */
404 public final void dispose() {
405 status = Status.STATUS_SHUTDOWN;
406 flushReplicationQueue();
407 }
408
409
410 /***
411 * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
412 * <p/>
413 * This may not be possible for listeners after they have been initialized. Implementations should throw
414 * CloneNotSupportedException if they do not support clone.
415 *
416 * @return a clone
417 * @throws CloneNotSupportedException if the listener could not be cloned.
418 */
419 public Object clone() throws CloneNotSupportedException {
420
421 super.clone();
422 return new RMIAsynchronousCacheReplicator(replicatePuts, replicateUpdates,
423 replicateUpdatesViaCopy, replicateRemovals, asynchronousReplicationInterval);
424 }
425
426
427 }