Piotr Gabryanczyk’s Blog

Java, Refactoring, AOP, Spring, DDD, TDD, etc.

  • Blogroll

    • I have joined Anti-IF Campaign

Archive for the ‘cluster’ Category

RmiCluster – working alternative for cluster4spring

Posted by Piotr Gabryanczyk on August 22, 2007

Business problem

I recently spent couple of weeks on writing a distributed Market Data Service for my client. The requirement was to distribute the load on couple of machines as MDS was meant to be used by few hundred node grid.

Cluster4Spring

We investigated couple of options and decided to go for Cluster4Spring. It gave us load distribution, failover and dynamic discovery.

Failed under load

After couple of days of testing it turned out that we needed to switch the dynamic discovery off as some client nodes were failing to find MDS. This left us with predefined list of RMI endpoints. Unfortunately few days later we realised that some of the nodes were loosing connections to all MDS instances in the middle of the job. I ruled out the network problems. MDS instances were not overloaded, so they were not dropping connections. BTW it struck me that a single node could only handle 50 requests per second. Internal processing of request was taking no more than 1ms ( return map.get(key) ). I tried different config options but nothing helped.

RmiCluster

At this point I was desperate enough to write my own clustering proxy to replace Cluster4Spring. When I thought about it, it looked like simple task to do. You maintain a list of alive nodes, refresh it periodically. If invocation fails remove node from the list. When invoking you just pick random remote node and that is it.

It took me 150 lines, and hey, it works! Single node is able to handle in excess of 100 requests per second.

To use it just expose your beans using spring RmiServiceExporter on the server side:

  1     <bean class="org.springframework.remoting.rmi.RmiServiceExporter">
  2         <property name="serviceName" value="serviceX"/>
  3         <property name="service" ref="MarketDataService"/>
  4         <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/>
  5         <property name="registry" ref="RMIRegistry"/>
  6     </bean>

On the client side it could look like this:

  1     <bean name="MarketDataService" class="org.javaexpert.rmicluster.ClusteringRmiProxyFactoryBean">
  2         <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/>
  3         <property name="serviceUrl" value="whatever"/>
  4         <property name="serviceUrls">
  5             <list>
  6                 <value>rmi://somehost1:1234/ServiceX</value>
  7                 <value>rmi://somehost2:1234/ServiceX</value>
  8                 <value>rmi://somehost3:1234/ServiceX</value>
  9                 <value>rmi://somehost4:1234/ServiceX</value>
 10             </list>
 11         </property>
 12         <property name="lookupStubOnStartup" value="true"/>
 13     </bean>
 14 

ClusteringRmiProxyFactoryBean:

  1 package org.javaexpert.rmicluster;
  2 
  3 import java.net.MalformedURLException;
  4 import java.rmi.NotBoundException;
  5 import java.rmi.Remote;
  6 import java.rmi.RemoteException;
  7 import java.rmi.registry.LocateRegistry;
  8 import java.rmi.registry.Registry;
  9 import java.util.ArrayList;
 10 import java.util.HashMap;
 11 import java.util.List;
 12 import java.util.Map;
 13 import java.util.Random;
 14 import java.util.Timer;
 15 import java.util.TimerTask;
 16 import java.util.regex.Matcher;
 17 import java.util.regex.Pattern;
 18 
 19 import org.aopalliance.intercept.MethodInvocation;
 20 import org.javaexpert.error.ErrorUtils;
 21 import org.springframework.beans.factory.DisposableBean;
 22 import org.springframework.remoting.RemoteAccessException;
 23 import org.springframework.remoting.RemoteLookupFailureException;
 24 import org.springframework.remoting.rmi.RmiProxyFactoryBean;
 25 
 26 
 27 public class ClusteringRmiProxyFactoryBean extends RmiProxyFactoryBean implements DisposableBean{
 28     // Inputs
 29     private List<String> serviceUrls;
 30     private int refreshEndpointsMillis = 60*1000;
 31 
 32     // Internals
 33     private Map<String,Remote> aliveServices = new HashMap<String, Remote>();
 34     private Map<Remote, String> aliveServicesReverseMap = new HashMap<Remote, String>();
 35     private Random rand = new Random(System.currentTimeMillis());
 36     private Pattern rmiAddressPattern = Pattern.compile("rmi://(.+):([0-9]*)/(.+)");
 37     private Timer timer;
 38 
 39     protected synchronized Remote lookupStub() throws RemoteLookupFailureException{
 40         refreshServicesIfNeededUntilFoundOne();
 41         return new ArrayList<Remote>(aliveServices.values()).get(rand.nextInt(aliveServices.size()));
 42     }
 43 
 44     protected Remote getStub() throws RemoteLookupFailureException{
 45         return lookupStub();
 46     }
 47 
 48     private synchronized void refreshServicesIfNeededUntilFoundOne(){
 49         while(aliveServices.size() == 0){
 50             logger.info("No services alive - refreshing endpoints.");
 51             refreshServices();
 52         }
 53     }
 54 
 55     public void refreshServices(){
 56         for(String url: serviceUrls){
 57             if (!aliveServices.containsKey(url)){
 58                 try{
 59                     Remote remote = createRemote(url);
 60                     addServiceToAlive(url, remote);
 61                 } catch(Exception e){
 62                     logger.warn("Can not connect to " + url+ " - ignoring.");
 63                 }
 64             }
 65         }
 66     }
 67 
 68     public Remote createRemote(String url) throws RemoteException, NotBoundException, MalformedURLException{
 69         Matcher res = rmiAddressPattern.matcher(url);
 70         if (!res.matches()) throw new MalformedURLException("Wrong address syntax - '" + url +"'. Correct syntax is rmi://host:port/service");
 71 
 72         Registry reg = LocateRegistry.getRegistry(res.group(1), Integer.valueOf(res.group(2)));
 73         return reg.lookup(res.group(3));
 74     }
 75 
 76     public Object invoke(final MethodInvocation invocation) throws Throwable{
 77         return ErrorUtils.executeWithRetry(100, "Invocation failed - retrying...", 5000, new ErrorUtils.CallableWithRecovery(){
 78             private Remote stub;
 79             public void recover() throws Throwable{
 80                 removeServiceFromAlive(stub);
 81             }
 82 
 83             public Object call() throws Throwable{
 84                 refreshServicesIfNeededUntilFoundOne();
 85                 stub = getStub();
 86                 return doInvoke(invocation, stub);
 87             }
 88         }, RemoteAccessException.class);
 89     }
 90 
 91     private synchronized void addServiceToAlive(String url, Remote remote){
 92         aliveServices.put(url, remote);
 93         aliveServicesReverseMap.put(remote, url);
 94     }
 95 
 96     private synchronized void removeServiceFromAlive(Remote s){
 97         String url = aliveServicesReverseMap.remove(s);
 98         aliveServices.remove(url);
 99     }
100 
101     public void setServiceUrls(List<String> serviceUrls){
102         this.serviceUrls = serviceUrls;
103     }
104 
105     public void afterPropertiesSet(){
106         setRefreshStubOnConnectFailure(false);
107         setCacheStub(false);
108         setServiceUrl("legacy");
109         super.afterPropertiesSet();
110 
111         initRefreshTimer();
112     }
113 
114     private void initRefreshTimer(){
115         timer = new Timer();
116         timer.schedule(new TimerTask(){
117             public void run(){
118                 try{
119                     refreshServices();
120                 } catch(Exception e){
121                     logger.warn("Can not refresh services", e);
122                 }
123             }
124         }, refreshEndpointsMillis, refreshEndpointsMillis);
125     }
126 
127     public void destroy() throws Exception{
128         timer.cancel();
129     }
130 
131     public void setRefreshEndpointsMillis(int refreshEndpointsMillis){
132         this.refreshEndpointsMillis = refreshEndpointsMillis;
133     }
134 }

Here is ErrorUtils:

 1 package org.javaexpert.error;
 2 
 3 import java.util.Set;
 4 
 5 
 6 import java.util.Arrays;
 7 import java.util.HashSet;
 8 import java.util.logging.Logger;
 9 
10 
11 public class ErrorUtils {
12     private static final Logger LOGGER = Logger.getLogger(ErrorUtils.class.getCanonicalName());
13 
14     public static <T> T executeWithRetry(int retryCount, String errorMessage, int sleepMillis, Callable<T> callable, Class... dontIgnoreExceptions) throws Throwable {
15         Set dontIgnoreSet = convertToSet(dontIgnoreExceptions);
16         Throwable lastException = null;
17         while (retryCount-- > 0) {
18             try {
19                 return callable.call();
20             } catch (Throwable e) {
21                 if (!dontIgnoreSet.isEmpty() && !contains(dontIgnoreSet, e)) throw e;
22                 lastException = e;
23                 LOGGER.warning(errorMessage + " Cause: " + e.getMessage());
24                 LOGGER.info("Sleeping for " + sleepMillis + " millis");
25                 sleep(sleepMillis);
26                 LOGGER.info("Retrying...");
27                 if (callable instanceof CallableWithRecovery) {
28                     try {
29                         ((CallableWithRecovery) callable).recover();
30                     } catch (Throwable throwable) {
31                         LOGGER.warning("Recovery action failed.");
32                     }
33                 }
34             }
35         }
36         throw lastException;
37     }
38 
39     private static boolean contains(Set<Class> dontIgnoreSet, Throwable e){
40         for(Class c: dontIgnoreSet){
41             if (c.isInstance(e)) return true;
42         }
43         return false;
44 
45     }
46 
47     private static Set convertToSet(Object... objects){
48         Set res = new HashSet();
49         res.addAll(Arrays.asList(objects));
50         return res;
51     }
52 
53     private static void sleep(int sleepMillis) {
54         try {
55             Thread.sleep(sleepMillis);
56         } catch (InterruptedException e1) {
57             LOGGER.severe("Interrupted"+ e1);
58             throw new RuntimeException(e1);
59         }
60     }
61 
62     public interface Callable<T> {
63         T call() throws Throwable;
64     }
65 
66     public interface CallableWithRecovery extends Callable {
67         void recover() throws Throwable;
68     }
69 }

Posted in cluster, concurrent, java, programming, spring | 7 Comments »