RmiCluster – working alternative for cluster4spring
Posted by Piotr Gabryanczyk on August 22, 2007
Business problem
I recently spent couple of weeks on writing a distributed Market Data Service for my client. The requirement was to distribute the load on couple of machines as MDS was meant to be used by few hundred node grid.
Cluster4Spring
We investigated couple of options and decided to go for Cluster4Spring. It gave us load distribution, failover and dynamic discovery.
Failed under load
After couple of days of testing it turned out that we needed to switch the dynamic discovery off as some client nodes were failing to find MDS. This left us with predefined list of RMI endpoints. Unfortunately few days later we realised that some of the nodes were loosing connections to all MDS instances in the middle of the job. I ruled out the network problems. MDS instances were not overloaded, so they were not dropping connections. BTW it struck me that a single node could only handle 50 requests per second. Internal processing of request was taking no more than 1ms ( return map.get(key) ). I tried different config options but nothing helped.
RmiCluster
At this point I was desperate enough to write my own clustering proxy to replace Cluster4Spring. When I thought about it, it looked like simple task to do. You maintain a list of alive nodes, refresh it periodically. If invocation fails remove node from the list. When invoking you just pick random remote node and that is it.
It took me 150 lines, and hey, it works! Single node is able to handle in excess of 100 requests per second.
To use it just expose your beans using spring RmiServiceExporter on the server side:
1 <bean class="org.springframework.remoting.rmi.RmiServiceExporter"> 2 <property name="serviceName" value="serviceX"/> 3 <property name="service" ref="MarketDataService"/> 4 <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/> 5 <property name="registry" ref="RMIRegistry"/> 6 </bean>
On the client side it could look like this:
1 <bean name="MarketDataService" class="org.javaexpert.rmicluster.ClusteringRmiProxyFactoryBean"> 2 <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/> 3 <property name="serviceUrl" value="whatever"/> 4 <property name="serviceUrls"> 5 <list> 6 <value>rmi://somehost1:1234/ServiceX</value> 7 <value>rmi://somehost2:1234/ServiceX</value> 8 <value>rmi://somehost3:1234/ServiceX</value> 9 <value>rmi://somehost4:1234/ServiceX</value> 10 </list> 11 </property> 12 <property name="lookupStubOnStartup" value="true"/> 13 </bean> 14
ClusteringRmiProxyFactoryBean:
1 package org.javaexpert.rmicluster; 2 3 import java.net.MalformedURLException; 4 import java.rmi.NotBoundException; 5 import java.rmi.Remote; 6 import java.rmi.RemoteException; 7 import java.rmi.registry.LocateRegistry; 8 import java.rmi.registry.Registry; 9 import java.util.ArrayList; 10 import java.util.HashMap; 11 import java.util.List; 12 import java.util.Map; 13 import java.util.Random; 14 import java.util.Timer; 15 import java.util.TimerTask; 16 import java.util.regex.Matcher; 17 import java.util.regex.Pattern; 18 19 import org.aopalliance.intercept.MethodInvocation; 20 import org.javaexpert.error.ErrorUtils; 21 import org.springframework.beans.factory.DisposableBean; 22 import org.springframework.remoting.RemoteAccessException; 23 import org.springframework.remoting.RemoteLookupFailureException; 24 import org.springframework.remoting.rmi.RmiProxyFactoryBean; 25 26 27 public class ClusteringRmiProxyFactoryBean extends RmiProxyFactoryBean implements DisposableBean{ 28 // Inputs 29 private List<String> serviceUrls; 30 private int refreshEndpointsMillis = 60*1000; 31 32 // Internals 33 private Map<String,Remote> aliveServices = new HashMap<String, Remote>(); 34 private Map<Remote, String> aliveServicesReverseMap = new HashMap<Remote, String>(); 35 private Random rand = new Random(System.currentTimeMillis()); 36 private Pattern rmiAddressPattern = Pattern.compile("rmi://(.+):([0-9]*)/(.+)"); 37 private Timer timer; 38 39 protected synchronized Remote lookupStub() throws RemoteLookupFailureException{ 40 refreshServicesIfNeededUntilFoundOne(); 41 return new ArrayList<Remote>(aliveServices.values()).get(rand.nextInt(aliveServices.size())); 42 } 43 44 protected Remote getStub() throws RemoteLookupFailureException{ 45 return lookupStub(); 46 } 47 48 private synchronized void refreshServicesIfNeededUntilFoundOne(){ 49 while(aliveServices.size() == 0){ 50 logger.info("No services alive - refreshing endpoints."); 51 refreshServices(); 52 } 53 } 54 55 public void refreshServices(){ 56 for(String url: serviceUrls){ 57 if (!aliveServices.containsKey(url)){ 58 try{ 59 Remote remote = createRemote(url); 60 addServiceToAlive(url, remote); 61 } catch(Exception e){ 62 logger.warn("Can not connect to " + url+ " - ignoring."); 63 } 64 } 65 } 66 } 67 68 public Remote createRemote(String url) throws RemoteException, NotBoundException, MalformedURLException{ 69 Matcher res = rmiAddressPattern.matcher(url); 70 if (!res.matches()) throw new MalformedURLException("Wrong address syntax - '" + url +"'. Correct syntax is rmi://host:port/service"); 71 72 Registry reg = LocateRegistry.getRegistry(res.group(1), Integer.valueOf(res.group(2))); 73 return reg.lookup(res.group(3)); 74 } 75 76 public Object invoke(final MethodInvocation invocation) throws Throwable{ 77 return ErrorUtils.executeWithRetry(100, "Invocation failed - retrying...", 5000, new ErrorUtils.CallableWithRecovery(){ 78 private Remote stub; 79 public void recover() throws Throwable{ 80 removeServiceFromAlive(stub); 81 } 82 83 public Object call() throws Throwable{ 84 refreshServicesIfNeededUntilFoundOne(); 85 stub = getStub(); 86 return doInvoke(invocation, stub); 87 } 88 }, RemoteAccessException.class); 89 } 90 91 private synchronized void addServiceToAlive(String url, Remote remote){ 92 aliveServices.put(url, remote); 93 aliveServicesReverseMap.put(remote, url); 94 } 95 96 private synchronized void removeServiceFromAlive(Remote s){ 97 String url = aliveServicesReverseMap.remove(s); 98 aliveServices.remove(url); 99 } 100 101 public void setServiceUrls(List<String> serviceUrls){ 102 this.serviceUrls = serviceUrls; 103 } 104 105 public void afterPropertiesSet(){ 106 setRefreshStubOnConnectFailure(false); 107 setCacheStub(false); 108 setServiceUrl("legacy"); 109 super.afterPropertiesSet(); 110 111 initRefreshTimer(); 112 } 113 114 private void initRefreshTimer(){ 115 timer = new Timer(); 116 timer.schedule(new TimerTask(){ 117 public void run(){ 118 try{ 119 refreshServices(); 120 } catch(Exception e){ 121 logger.warn("Can not refresh services", e); 122 } 123 } 124 }, refreshEndpointsMillis, refreshEndpointsMillis); 125 } 126 127 public void destroy() throws Exception{ 128 timer.cancel(); 129 } 130 131 public void setRefreshEndpointsMillis(int refreshEndpointsMillis){ 132 this.refreshEndpointsMillis = refreshEndpointsMillis; 133 } 134 }
Here is ErrorUtils:
1 package org.javaexpert.error; 2 3 import java.util.Set; 4 5 6 import java.util.Arrays; 7 import java.util.HashSet; 8 import java.util.logging.Logger; 9 10 11 public class ErrorUtils { 12 private static final Logger LOGGER = Logger.getLogger(ErrorUtils.class.getCanonicalName()); 13 14 public static <T> T executeWithRetry(int retryCount, String errorMessage, int sleepMillis, Callable<T> callable, Class... dontIgnoreExceptions) throws Throwable { 15 Set dontIgnoreSet = convertToSet(dontIgnoreExceptions); 16 Throwable lastException = null; 17 while (retryCount-- > 0) { 18 try { 19 return callable.call(); 20 } catch (Throwable e) { 21 if (!dontIgnoreSet.isEmpty() && !contains(dontIgnoreSet, e)) throw e; 22 lastException = e; 23 LOGGER.warning(errorMessage + " Cause: " + e.getMessage()); 24 LOGGER.info("Sleeping for " + sleepMillis + " millis"); 25 sleep(sleepMillis); 26 LOGGER.info("Retrying..."); 27 if (callable instanceof CallableWithRecovery) { 28 try { 29 ((CallableWithRecovery) callable).recover(); 30 } catch (Throwable throwable) { 31 LOGGER.warning("Recovery action failed."); 32 } 33 } 34 } 35 } 36 throw lastException; 37 } 38 39 private static boolean contains(Set<Class> dontIgnoreSet, Throwable e){ 40 for(Class c: dontIgnoreSet){ 41 if (c.isInstance(e)) return true; 42 } 43 return false; 44 45 } 46 47 private static Set convertToSet(Object... objects){ 48 Set res = new HashSet(); 49 res.addAll(Arrays.asList(objects)); 50 return res; 51 } 52 53 private static void sleep(int sleepMillis) { 54 try { 55 Thread.sleep(sleepMillis); 56 } catch (InterruptedException e1) { 57 LOGGER.severe("Interrupted"+ e1); 58 throw new RuntimeException(e1); 59 } 60 } 61 62 public interface Callable<T> { 63 T call() throws Throwable; 64 } 65 66 public interface CallableWithRecovery extends Callable { 67 void recover() throws Throwable; 68 } 69 }

Andrew Sazonov said
Hi Piotr,
That’s really strange situation with cluster4spring since we’ve used it in quite a loaded system and didn’t found any issues…
If possible, could you please give me more details about issues you’ve found with cluster4spring (errors, logs, XML configurations)?
Btw, did you specify that endpoints should be refreshed after connection failure in Cluster4Spring configuration?
Regards,
Andrew Sazonov
Piotr Gabryanczyk said
Andrew thanks for your comment.
I sent you all the details couple of weeks ago, but since I hadn’t got the answer I wrote RmiCluster.
>Btw, did you specify that endpoints should be refreshed after connection failure in Cluster4Spring configuration?
This is a good point – when you use the setting you suggest, it breaks load balancing, as Cluster4Spring refreshes endpoints only when one of them fails. This means, that if one enpoint fails and comes back after 10 minutes , it will not be used until another endpoint fails and endpoints are refreshed.
Cluster4spring fails in an interesting way – it throws stack overflow exception. I was surprised, but when I looked in the source code, it indeed used RECURSION to retry when connection fails! GRRRRR this smells…
(I am talking about refreshAndRetry method in RemoteClientInterceptor class.)
My load numbers for test environment are 100 processes(25 boxes) accessing 5 MDS instances. Number of requests per sec is 40~50 x 5 = 200~250.
The same number of requests can be handled by RmiCluster on 2 MDS instances.
Gisbert Amm said
It would be very interesting to read how this story went on.
Gisbert Amm said
I gave cluster4spring a try and got this:
RMI lookup for bean [contextService] with interface [my.own.Interface] failed ; nested exception is java.lang.ClassCastException: java.lang.String
Looking a bit closer, I found a quirky bug (as I’d name it) wrt the configuration. The docs say, that you configure your services like that:
rmi://localhost:1097/TestService
rmi://localhost:1098/TestService
rmi://localhost:1099/TestService
However, that’s not correct – at least for RmiUrlListProxyFactoryBean. The method setServiceURLs(List aServiceInfos) expects a list of SI, not a list of Strings. Spring does not complain, for the parameter of the list has disappeared at runtime. But then, in UrlListEndpointProvider (line 108) there is a loop using the same list and bang!, we get a class cast exception:
for (SI serviceInfo : fServiceMonikers) { …
fServiceMonikers is here still a list of Strings, not of SI. I found another setServiceURLs() method in UrlListEndpointProvider, however, this has not been used in my case. I worked around the problem changing the Spring configuration as follows:
rmi://localhost:1097/TestService
rmi://localhost:1098/TestService
rmi://localhost:1099/TestService
Then it works …
Gisbert Amm said
Arrggghhh … The blog cut out the XML (of course). Another try to post the config sections:
In the docs:
<property name=”serviceURLs”>
<list>
<value>rmi://localhost:1097/TestService</value>
<value>rmi://localhost:1098/TestService</value>
<value>rmi://localhost:1099/TestService</value>
</list>
</property>
My version:
<property name=”serviceURLs”>
<list>
<bean class=”org.softamis.cluster4spring.support.ServiceMoniker”>
<constructor-arg><value>rmi://localhost:1097/TestService</value></constructor-arg>
</bean>
<bean class=”org.softamis.cluster4spring.support.ServiceMoniker”>
<constructor-arg><value>rmi://localhost:1098/TestService</value></constructor-arg>
</bean>
<bean class=”org.softamis.cluster4spring.support.ServiceMoniker”>
<constructor-arg><value>rmi://localhost:1099/TestService</value></constructor-arg>
</bean>
</list>
</property>
Piotr Gabryanczyk said
Ramkumar S said
I am facing similar issue with Spring RMI not doing load balance (as far as I can see in our Spring 3.0.5 version). Did spring come out with any out of the box solution to address this? Or is the solution presented above still good. I saw the cluster4spring in google code base and I find your solution above more simpler and elegant and I feel it will suffice my needs. I need to talk to a bunch of JBoss machines in the back using Spring RMI mechanism and I realized that Spring RMI does not support load balancing / clustering support as far as I can see. Thanks
Is there a downloadable version available for the above code. Will it work in Spring 3.0.5 (or Spring 3.1) versions?