Piotr Gabryanczyk’s Blog

Java, Refactoring, AOP, Spring, DDD, TDD, etc.

  • Blogroll

    • I have joined Anti-IF Campaign

RmiCluster – working alternative for cluster4spring

Posted by Piotr Gabryanczyk on August 22, 2007

Business problem

I recently spent couple of weeks on writing a distributed Market Data Service for my client. The requirement was to distribute the load on couple of machines as MDS was meant to be used by few hundred node grid.

Cluster4Spring

We investigated couple of options and decided to go for Cluster4Spring. It gave us load distribution, failover and dynamic discovery.

Failed under load

After couple of days of testing it turned out that we needed to switch the dynamic discovery off as some client nodes were failing to find MDS. This left us with predefined list of RMI endpoints. Unfortunately few days later we realised that some of the nodes were loosing connections to all MDS instances in the middle of the job. I ruled out the network problems. MDS instances were not overloaded, so they were not dropping connections. BTW it struck me that a single node could only handle 50 requests per second. Internal processing of request was taking no more than 1ms ( return map.get(key) ). I tried different config options but nothing helped.

RmiCluster

At this point I was desperate enough to write my own clustering proxy to replace Cluster4Spring. When I thought about it, it looked like simple task to do. You maintain a list of alive nodes, refresh it periodically. If invocation fails remove node from the list. When invoking you just pick random remote node and that is it.

It took me 150 lines, and hey, it works! Single node is able to handle in excess of 100 requests per second.

To use it just expose your beans using spring RmiServiceExporter on the server side:

  1     <bean class="org.springframework.remoting.rmi.RmiServiceExporter">
  2         <property name="serviceName" value="serviceX"/>
  3         <property name="service" ref="MarketDataService"/>
  4         <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/>
  5         <property name="registry" ref="RMIRegistry"/>
  6     </bean>

On the client side it could look like this:

  1     <bean name="MarketDataService" class="org.javaexpert.rmicluster.ClusteringRmiProxyFactoryBean">
  2         <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/>
  3         <property name="serviceUrl" value="whatever"/>
  4         <property name="serviceUrls">
  5             <list>
  6                 <value>rmi://somehost1:1234/ServiceX</value>
  7                 <value>rmi://somehost2:1234/ServiceX</value>
  8                 <value>rmi://somehost3:1234/ServiceX</value>
  9                 <value>rmi://somehost4:1234/ServiceX</value>
 10             </list>
 11         </property>
 12         <property name="lookupStubOnStartup" value="true"/>
 13     </bean>
 14 

ClusteringRmiProxyFactoryBean:

  1 package org.javaexpert.rmicluster;
  2 
  3 import java.net.MalformedURLException;
  4 import java.rmi.NotBoundException;
  5 import java.rmi.Remote;
  6 import java.rmi.RemoteException;
  7 import java.rmi.registry.LocateRegistry;
  8 import java.rmi.registry.Registry;
  9 import java.util.ArrayList;
 10 import java.util.HashMap;
 11 import java.util.List;
 12 import java.util.Map;
 13 import java.util.Random;
 14 import java.util.Timer;
 15 import java.util.TimerTask;
 16 import java.util.regex.Matcher;
 17 import java.util.regex.Pattern;
 18 
 19 import org.aopalliance.intercept.MethodInvocation;
 20 import org.javaexpert.error.ErrorUtils;
 21 import org.springframework.beans.factory.DisposableBean;
 22 import org.springframework.remoting.RemoteAccessException;
 23 import org.springframework.remoting.RemoteLookupFailureException;
 24 import org.springframework.remoting.rmi.RmiProxyFactoryBean;
 25 
 26 
 27 public class ClusteringRmiProxyFactoryBean extends RmiProxyFactoryBean implements DisposableBean{
 28     // Inputs
 29     private List<String> serviceUrls;
 30     private int refreshEndpointsMillis = 60*1000;
 31 
 32     // Internals
 33     private Map<String,Remote> aliveServices = new HashMap<String, Remote>();
 34     private Map<Remote, String> aliveServicesReverseMap = new HashMap<Remote, String>();
 35     private Random rand = new Random(System.currentTimeMillis());
 36     private Pattern rmiAddressPattern = Pattern.compile("rmi://(.+):([0-9]*)/(.+)");
 37     private Timer timer;
 38 
 39     protected synchronized Remote lookupStub() throws RemoteLookupFailureException{
 40         refreshServicesIfNeededUntilFoundOne();
 41         return new ArrayList<Remote>(aliveServices.values()).get(rand.nextInt(aliveServices.size()));
 42     }
 43 
 44     protected Remote getStub() throws RemoteLookupFailureException{
 45         return lookupStub();
 46     }
 47 
 48     private synchronized void refreshServicesIfNeededUntilFoundOne(){
 49         while(aliveServices.size() == 0){
 50             logger.info("No services alive - refreshing endpoints.");
 51             refreshServices();
 52         }
 53     }
 54 
 55     public void refreshServices(){
 56         for(String url: serviceUrls){
 57             if (!aliveServices.containsKey(url)){
 58                 try{
 59                     Remote remote = createRemote(url);
 60                     addServiceToAlive(url, remote);
 61                 } catch(Exception e){
 62                     logger.warn("Can not connect to " + url+ " - ignoring.");
 63                 }
 64             }
 65         }
 66     }
 67 
 68     public Remote createRemote(String url) throws RemoteException, NotBoundException, MalformedURLException{
 69         Matcher res = rmiAddressPattern.matcher(url);
 70         if (!res.matches()) throw new MalformedURLException("Wrong address syntax - '" + url +"'. Correct syntax is rmi://host:port/service");
 71 
 72         Registry reg = LocateRegistry.getRegistry(res.group(1), Integer.valueOf(res.group(2)));
 73         return reg.lookup(res.group(3));
 74     }
 75 
 76     public Object invoke(final MethodInvocation invocation) throws Throwable{
 77         return ErrorUtils.executeWithRetry(100, "Invocation failed - retrying...", 5000, new ErrorUtils.CallableWithRecovery(){
 78             private Remote stub;
 79             public void recover() throws Throwable{
 80                 removeServiceFromAlive(stub);
 81             }
 82 
 83             public Object call() throws Throwable{
 84                 refreshServicesIfNeededUntilFoundOne();
 85                 stub = getStub();
 86                 return doInvoke(invocation, stub);
 87             }
 88         }, RemoteAccessException.class);
 89     }
 90 
 91     private synchronized void addServiceToAlive(String url, Remote remote){
 92         aliveServices.put(url, remote);
 93         aliveServicesReverseMap.put(remote, url);
 94     }
 95 
 96     private synchronized void removeServiceFromAlive(Remote s){
 97         String url = aliveServicesReverseMap.remove(s);
 98         aliveServices.remove(url);
 99     }
100 
101     public void setServiceUrls(List<String> serviceUrls){
102         this.serviceUrls = serviceUrls;
103     }
104 
105     public void afterPropertiesSet(){
106         setRefreshStubOnConnectFailure(false);
107         setCacheStub(false);
108         setServiceUrl("legacy");
109         super.afterPropertiesSet();
110 
111         initRefreshTimer();
112     }
113 
114     private void initRefreshTimer(){
115         timer = new Timer();
116         timer.schedule(new TimerTask(){
117             public void run(){
118                 try{
119                     refreshServices();
120                 } catch(Exception e){
121                     logger.warn("Can not refresh services", e);
122                 }
123             }
124         }, refreshEndpointsMillis, refreshEndpointsMillis);
125     }
126 
127     public void destroy() throws Exception{
128         timer.cancel();
129     }
130 
131     public void setRefreshEndpointsMillis(int refreshEndpointsMillis){
132         this.refreshEndpointsMillis = refreshEndpointsMillis;
133     }
134 }

Here is ErrorUtils:

 1 package org.javaexpert.error;
 2 
 3 import java.util.Set;
 4 
 5 
 6 import java.util.Arrays;
 7 import java.util.HashSet;
 8 import java.util.logging.Logger;
 9 
10 
11 public class ErrorUtils {
12     private static final Logger LOGGER = Logger.getLogger(ErrorUtils.class.getCanonicalName());
13 
14     public static <T> T executeWithRetry(int retryCount, String errorMessage, int sleepMillis, Callable<T> callable, Class... dontIgnoreExceptions) throws Throwable {
15         Set dontIgnoreSet = convertToSet(dontIgnoreExceptions);
16         Throwable lastException = null;
17         while (retryCount-- > 0) {
18             try {
19                 return callable.call();
20             } catch (Throwable e) {
21                 if (!dontIgnoreSet.isEmpty() && !contains(dontIgnoreSet, e)) throw e;
22                 lastException = e;
23                 LOGGER.warning(errorMessage + " Cause: " + e.getMessage());
24                 LOGGER.info("Sleeping for " + sleepMillis + " millis");
25                 sleep(sleepMillis);
26                 LOGGER.info("Retrying...");
27                 if (callable instanceof CallableWithRecovery) {
28                     try {
29                         ((CallableWithRecovery) callable).recover();
30                     } catch (Throwable throwable) {
31                         LOGGER.warning("Recovery action failed.");
32                     }
33                 }
34             }
35         }
36         throw lastException;
37     }
38 
39     private static boolean contains(Set<Class> dontIgnoreSet, Throwable e){
40         for(Class c: dontIgnoreSet){
41             if (c.isInstance(e)) return true;
42         }
43         return false;
44 
45     }
46 
47     private static Set convertToSet(Object... objects){
48         Set res = new HashSet();
49         res.addAll(Arrays.asList(objects));
50         return res;
51     }
52 
53     private static void sleep(int sleepMillis) {
54         try {
55             Thread.sleep(sleepMillis);
56         } catch (InterruptedException e1) {
57             LOGGER.severe("Interrupted"+ e1);
58             throw new RuntimeException(e1);
59         }
60     }
61 
62     public interface Callable<T> {
63         T call() throws Throwable;
64     }
65 
66     public interface CallableWithRecovery extends Callable {
67         void recover() throws Throwable;
68     }
69 }
About these ads

7 Responses to “RmiCluster – working alternative for cluster4spring”

  1. Hi Piotr,

    That’s really strange situation with cluster4spring since we’ve used it in quite a loaded system and didn’t found any issues…

    If possible, could you please give me more details about issues you’ve found with cluster4spring (errors, logs, XML configurations)?

    Btw, did you specify that endpoints should be refreshed after connection failure in Cluster4Spring configuration?

    Regards,
    Andrew Sazonov

  2. Andrew thanks for your comment.

    I sent you all the details couple of weeks ago, but since I hadn’t got the answer I wrote RmiCluster.

    >Btw, did you specify that endpoints should be refreshed after connection failure in Cluster4Spring configuration?
    This is a good point – when you use the setting you suggest, it breaks load balancing, as Cluster4Spring refreshes endpoints only when one of them fails. This means, that if one enpoint fails and comes back after 10 minutes , it will not be used until another endpoint fails and endpoints are refreshed.

    Cluster4spring fails in an interesting way – it throws stack overflow exception. I was surprised, but when I looked in the source code, it indeed used RECURSION to retry when connection fails! GRRRRR this smells…
    (I am talking about refreshAndRetry method in RemoteClientInterceptor class.)

    My load numbers for test environment are 100 processes(25 boxes) accessing 5 MDS instances. Number of requests per sec is 40~50 x 5 = 200~250.

    The same number of requests can be handled by RmiCluster on 2 MDS instances.

  3. Gisbert Amm said

    It would be very interesting to read how this story went on.

  4. Gisbert Amm said

    I gave cluster4spring a try and got this:

    RMI lookup for bean [contextService] with interface [my.own.Interface] failed ; nested exception is java.lang.ClassCastException: java.lang.String

    Looking a bit closer, I found a quirky bug (as I’d name it) wrt the configuration. The docs say, that you configure your services like that:

    rmi://localhost:1097/TestService
    rmi://localhost:1098/TestService
    rmi://localhost:1099/TestService

    However, that’s not correct – at least for RmiUrlListProxyFactoryBean. The method setServiceURLs(List aServiceInfos) expects a list of SI, not a list of Strings. Spring does not complain, for the parameter of the list has disappeared at runtime. But then, in UrlListEndpointProvider (line 108) there is a loop using the same list and bang!, we get a class cast exception:

    for (SI serviceInfo : fServiceMonikers) { …

    fServiceMonikers is here still a list of Strings, not of SI. I found another setServiceURLs() method in UrlListEndpointProvider, however, this has not been used in my case. I worked around the problem changing the Spring configuration as follows:

    rmi://localhost:1097/TestService

    rmi://localhost:1098/TestService

    rmi://localhost:1099/TestService

    Then it works …

  5. Gisbert Amm said

    Arrggghhh … The blog cut out the XML (of course). Another try to post the config sections:

    In the docs:

    <property name=”serviceURLs”>
    <list>
    <value>rmi://localhost:1097/TestService</value>
    <value>rmi://localhost:1098/TestService</value>
    <value>rmi://localhost:1099/TestService</value>
    </list>
    </property>

    My version:

    <property name=”serviceURLs”>
    <list>
    <bean class=”org.softamis.cluster4spring.support.ServiceMoniker”>
    <constructor-arg><value>rmi://localhost:1097/TestService</value></constructor-arg>
    </bean>
    <bean class=”org.softamis.cluster4spring.support.ServiceMoniker”>
    <constructor-arg><value>rmi://localhost:1098/TestService</value></constructor-arg>
    </bean>
    <bean class=”org.softamis.cluster4spring.support.ServiceMoniker”>
    <constructor-arg><value>rmi://localhost:1099/TestService</value></constructor-arg>
    </bean>
    </list>
    </property>

  6. :) I had the same, but all the problems, which I describe are happening after you fix the problem with SIs…

  7. Ramkumar S said

    I am facing similar issue with Spring RMI not doing load balance (as far as I can see in our Spring 3.0.5 version). Did spring come out with any out of the box solution to address this? Or is the solution presented above still good. I saw the cluster4spring in google code base and I find your solution above more simpler and elegant and I feel it will suffice my needs. I need to talk to a bunch of JBoss machines in the back using Spring RMI mechanism and I realized that Spring RMI does not support load balancing / clustering support as far as I can see. Thanks

    Is there a downloadable version available for the above code. Will it work in Spring 3.0.5 (or Spring 3.1) versions?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

%d bloggers like this: