Piotr Gabryanczyk’s Blog

Java, Refactoring, AOP, Spring, DDD, TDD, etc.

  • Blogroll

    • I have joined Anti-IF Campaign

Archive for the ‘spring’ Category

Aspect Oriented Programming – real life examples

Posted by Piotr Gabryanczyk on April 1, 2008

Some time ago, on my previous blog, I wrote a short series of articles showing real live examples of using AOP. It is time to group them together and share with you again:

Enjoy!

Posted in aop, aspectj, java, programming, spring | Leave a Comment »

RmiCluster – working alternative for cluster4spring

Posted by Piotr Gabryanczyk on August 22, 2007

Business problem

I recently spent couple of weeks on writing a distributed Market Data Service for my client. The requirement was to distribute the load on couple of machines as MDS was meant to be used by few hundred node grid.

Cluster4Spring

We investigated couple of options and decided to go for Cluster4Spring. It gave us load distribution, failover and dynamic discovery.

Failed under load

After couple of days of testing it turned out that we needed to switch the dynamic discovery off as some client nodes were failing to find MDS. This left us with predefined list of RMI endpoints. Unfortunately few days later we realised that some of the nodes were loosing connections to all MDS instances in the middle of the job. I ruled out the network problems. MDS instances were not overloaded, so they were not dropping connections. BTW it struck me that a single node could only handle 50 requests per second. Internal processing of request was taking no more than 1ms ( return map.get(key) ). I tried different config options but nothing helped.

RmiCluster

At this point I was desperate enough to write my own clustering proxy to replace Cluster4Spring. When I thought about it, it looked like simple task to do. You maintain a list of alive nodes, refresh it periodically. If invocation fails remove node from the list. When invoking you just pick random remote node and that is it.

It took me 150 lines, and hey, it works! Single node is able to handle in excess of 100 requests per second.

To use it just expose your beans using spring RmiServiceExporter on the server side:

  1     <bean class="org.springframework.remoting.rmi.RmiServiceExporter">
  2         <property name="serviceName" value="serviceX"/>
  3         <property name="service" ref="MarketDataService"/>
  4         <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/>
  5         <property name="registry" ref="RMIRegistry"/>
  6     </bean>

On the client side it could look like this:

  1     <bean name="MarketDataService" class="org.javaexpert.rmicluster.ClusteringRmiProxyFactoryBean">
  2         <property name="serviceInterface" value="com.XXX.marketdata.server.MarketDataServer"/>
  3         <property name="serviceUrl" value="whatever"/>
  4         <property name="serviceUrls">
  5             <list>
  6                 <value>rmi://somehost1:1234/ServiceX</value>
  7                 <value>rmi://somehost2:1234/ServiceX</value>
  8                 <value>rmi://somehost3:1234/ServiceX</value>
  9                 <value>rmi://somehost4:1234/ServiceX</value>
 10             </list>
 11         </property>
 12         <property name="lookupStubOnStartup" value="true"/>
 13     </bean>
 14 

ClusteringRmiProxyFactoryBean:

  1 package org.javaexpert.rmicluster;
  2 
  3 import java.net.MalformedURLException;
  4 import java.rmi.NotBoundException;
  5 import java.rmi.Remote;
  6 import java.rmi.RemoteException;
  7 import java.rmi.registry.LocateRegistry;
  8 import java.rmi.registry.Registry;
  9 import java.util.ArrayList;
 10 import java.util.HashMap;
 11 import java.util.List;
 12 import java.util.Map;
 13 import java.util.Random;
 14 import java.util.Timer;
 15 import java.util.TimerTask;
 16 import java.util.regex.Matcher;
 17 import java.util.regex.Pattern;
 18 
 19 import org.aopalliance.intercept.MethodInvocation;
 20 import org.javaexpert.error.ErrorUtils;
 21 import org.springframework.beans.factory.DisposableBean;
 22 import org.springframework.remoting.RemoteAccessException;
 23 import org.springframework.remoting.RemoteLookupFailureException;
 24 import org.springframework.remoting.rmi.RmiProxyFactoryBean;
 25 
 26 
 27 public class ClusteringRmiProxyFactoryBean extends RmiProxyFactoryBean implements DisposableBean{
 28     // Inputs
 29     private List<String> serviceUrls;
 30     private int refreshEndpointsMillis = 60*1000;
 31 
 32     // Internals
 33     private Map<String,Remote> aliveServices = new HashMap<String, Remote>();
 34     private Map<Remote, String> aliveServicesReverseMap = new HashMap<Remote, String>();
 35     private Random rand = new Random(System.currentTimeMillis());
 36     private Pattern rmiAddressPattern = Pattern.compile("rmi://(.+):([0-9]*)/(.+)");
 37     private Timer timer;
 38 
 39     protected synchronized Remote lookupStub() throws RemoteLookupFailureException{
 40         refreshServicesIfNeededUntilFoundOne();
 41         return new ArrayList<Remote>(aliveServices.values()).get(rand.nextInt(aliveServices.size()));
 42     }
 43 
 44     protected Remote getStub() throws RemoteLookupFailureException{
 45         return lookupStub();
 46     }
 47 
 48     private synchronized void refreshServicesIfNeededUntilFoundOne(){
 49         while(aliveServices.size() == 0){
 50             logger.info("No services alive - refreshing endpoints.");
 51             refreshServices();
 52         }
 53     }
 54 
 55     public void refreshServices(){
 56         for(String url: serviceUrls){
 57             if (!aliveServices.containsKey(url)){
 58                 try{
 59                     Remote remote = createRemote(url);
 60                     addServiceToAlive(url, remote);
 61                 } catch(Exception e){
 62                     logger.warn("Can not connect to " + url+ " - ignoring.");
 63                 }
 64             }
 65         }
 66     }
 67 
 68     public Remote createRemote(String url) throws RemoteException, NotBoundException, MalformedURLException{
 69         Matcher res = rmiAddressPattern.matcher(url);
 70         if (!res.matches()) throw new MalformedURLException("Wrong address syntax - '" + url +"'. Correct syntax is rmi://host:port/service");
 71 
 72         Registry reg = LocateRegistry.getRegistry(res.group(1), Integer.valueOf(res.group(2)));
 73         return reg.lookup(res.group(3));
 74     }
 75 
 76     public Object invoke(final MethodInvocation invocation) throws Throwable{
 77         return ErrorUtils.executeWithRetry(100, "Invocation failed - retrying...", 5000, new ErrorUtils.CallableWithRecovery(){
 78             private Remote stub;
 79             public void recover() throws Throwable{
 80                 removeServiceFromAlive(stub);
 81             }
 82 
 83             public Object call() throws Throwable{
 84                 refreshServicesIfNeededUntilFoundOne();
 85                 stub = getStub();
 86                 return doInvoke(invocation, stub);
 87             }
 88         }, RemoteAccessException.class);
 89     }
 90 
 91     private synchronized void addServiceToAlive(String url, Remote remote){
 92         aliveServices.put(url, remote);
 93         aliveServicesReverseMap.put(remote, url);
 94     }
 95 
 96     private synchronized void removeServiceFromAlive(Remote s){
 97         String url = aliveServicesReverseMap.remove(s);
 98         aliveServices.remove(url);
 99     }
100 
101     public void setServiceUrls(List<String> serviceUrls){
102         this.serviceUrls = serviceUrls;
103     }
104 
105     public void afterPropertiesSet(){
106         setRefreshStubOnConnectFailure(false);
107         setCacheStub(false);
108         setServiceUrl("legacy");
109         super.afterPropertiesSet();
110 
111         initRefreshTimer();
112     }
113 
114     private void initRefreshTimer(){
115         timer = new Timer();
116         timer.schedule(new TimerTask(){
117             public void run(){
118                 try{
119                     refreshServices();
120                 } catch(Exception e){
121                     logger.warn("Can not refresh services", e);
122                 }
123             }
124         }, refreshEndpointsMillis, refreshEndpointsMillis);
125     }
126 
127     public void destroy() throws Exception{
128         timer.cancel();
129     }
130 
131     public void setRefreshEndpointsMillis(int refreshEndpointsMillis){
132         this.refreshEndpointsMillis = refreshEndpointsMillis;
133     }
134 }

Here is ErrorUtils:

 1 package org.javaexpert.error;
 2 
 3 import java.util.Set;
 4 
 5 
 6 import java.util.Arrays;
 7 import java.util.HashSet;
 8 import java.util.logging.Logger;
 9 
10 
11 public class ErrorUtils {
12     private static final Logger LOGGER = Logger.getLogger(ErrorUtils.class.getCanonicalName());
13 
14     public static <T> T executeWithRetry(int retryCount, String errorMessage, int sleepMillis, Callable<T> callable, Class... dontIgnoreExceptions) throws Throwable {
15         Set dontIgnoreSet = convertToSet(dontIgnoreExceptions);
16         Throwable lastException = null;
17         while (retryCount-- > 0) {
18             try {
19                 return callable.call();
20             } catch (Throwable e) {
21                 if (!dontIgnoreSet.isEmpty() && !contains(dontIgnoreSet, e)) throw e;
22                 lastException = e;
23                 LOGGER.warning(errorMessage + " Cause: " + e.getMessage());
24                 LOGGER.info("Sleeping for " + sleepMillis + " millis");
25                 sleep(sleepMillis);
26                 LOGGER.info("Retrying...");
27                 if (callable instanceof CallableWithRecovery) {
28                     try {
29                         ((CallableWithRecovery) callable).recover();
30                     } catch (Throwable throwable) {
31                         LOGGER.warning("Recovery action failed.");
32                     }
33                 }
34             }
35         }
36         throw lastException;
37     }
38 
39     private static boolean contains(Set<Class> dontIgnoreSet, Throwable e){
40         for(Class c: dontIgnoreSet){
41             if (c.isInstance(e)) return true;
42         }
43         return false;
44 
45     }
46 
47     private static Set convertToSet(Object... objects){
48         Set res = new HashSet();
49         res.addAll(Arrays.asList(objects));
50         return res;
51     }
52 
53     private static void sleep(int sleepMillis) {
54         try {
55             Thread.sleep(sleepMillis);
56         } catch (InterruptedException e1) {
57             LOGGER.severe("Interrupted"+ e1);
58             throw new RuntimeException(e1);
59         }
60     }
61 
62     public interface Callable<T> {
63         T call() throws Throwable;
64     }
65 
66     public interface CallableWithRecovery extends Callable {
67         void recover() throws Throwable;
68     }
69 }

Posted in cluster, concurrent, java, programming, spring | 7 Comments »

Preventing Spring Context to be loaded more than once.

Posted by Piotr Gabryanczyk on March 21, 2007

Problem:

I have nested import statements in spring context files:

a.xml

<beans>
<import resource="b.xml"/>
<import resource="c.xml"/>
...
</beans>

b.xml

<beans>
<import resource="d.xml"/>
<import resource="e.xml"/>
...
</beans>

c.xml

<beans>
<import resource="d.xml"/>
<import resource="g.xml"/>
...
</beans>

When we load a.xml, both b.xml and c.xml will try to load d.xml. Spring will detect it on the bean level and will override beans already loaded and print “Overiding bean definition for bean” message. But it also means instantiation of overriden bean and lot of expensive preprocessing. (more about the problem…) In some cases (JMX bean registration, etc.) it might even cause spring context loader to stop loading the beans and closing the application.

Solution:

We want to prevent Spring from loading files already loaded. The following class will do this for us:

 1 import java.io.IOException;

 2 import java.net.URL;

 3 import java.util.Set;

 4 import java.util.TreeSet;

 5

 6 import org.springframework.beans.factory.BeanDefinitionStoreException;

 7 import org.springframework.beans.factory.support.DefaultListableBeanFactory;

 8 import org.springframework.beans.factory.xml.ResourceEntityResolver;

 9 import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;

10 import org.springframework.context.support.ClassPathXmlApplicationContext;

11 import org.springframework.core.io.Resource;

12

13 public class IgnoringDuplicateBeansClassPathXmlApplicationContext extends ClassPathXmlApplicationContext {

14     private Set<String> loadedLocations;

15

16     // (PG) Lazy init is needed as the method is called from superclass constructor

17     // (PG) I dont like it!!!

18     public Set<String> getLoadedLocations() {

19         if (loadedLocations == null) loadedLocations = new TreeSet<String>();

20         return loadedLocations;

21     }

22

23

24     public IgnoringDuplicateBeansClassPathXmlApplicationContext(String configFile) throws org.springframework.beans.BeansException {

25         super(new String[]{configFile});

26     }

27

28     public IgnoringDuplicateBeansClassPathXmlApplicationContext(String[] configFiles) {

29         super(configFiles);

30     }

31

32     protected DefaultListableBeanFactory createBeanFactory() {

33         return new IgnoringDuplicateBeansBeanFactory(getInternalParentBeanFactory());

34     }

35

36

37     protected void loadBeanDefinitions(DefaultListableBeanFactory beanFactory) throws IOException {

38         // Create a new XmlBeanDefinitionReader for the given BeanFactory.

39         XmlBeanDefinitionReader beanDefinitionReader = new IgnoringDuplicateFilesBeanDefinitionReader(beanFactory);

40

41         // Configure the bean definition reader with this context's

42         // resource loading environment.

43         beanDefinitionReader.setResourceLoader(this);

44         beanDefinitionReader.setEntityResolver(new ResourceEntityResolver(this));

45

46         // Allow a subclass to provide custom initialization of the reader,

47         // then proceed with actually loading the bean definitions.

48         initBeanDefinitionReader(beanDefinitionReader);

49         loadBeanDefinitions(beanDefinitionReader);

50     }

51

52     public class IgnoringDuplicateFilesBeanDefinitionReader extends XmlBeanDefinitionReader {

53

54         public IgnoringDuplicateFilesBeanDefinitionReader(DefaultListableBeanFactory beanFactory) {

55             super(beanFactory);

56         }

57

58         public int loadBeanDefinitions(Resource resource) throws BeanDefinitionStoreException {

59             String location = getURL(resource).toString();

60             if (getLoadedLocations().contains(location)) {

61                 logger.warn("Ignoring already loaded context [" + resource.getFilename() + "] - " + location);

62                 return 0;

63             } else {

64                 getLoadedLocations().add(location);

65                 return super.loadBeanDefinitions(resource);

66             }

67         }

68

69         private URL getURL(Resource resource) {

70             try {

71                 return resource.getURL();

72             } catch (IOException e) {

73                 throw new RuntimeException(e);

74             }

75         }

76     }

77 }

78 
 1 import org.springframework.beans.factory.BeanDefinitionStoreException;

 2 import org.springframework.beans.factory.BeanFactory;

 3 import org.springframework.beans.factory.config.BeanDefinition;

 4 import org.springframework.beans.factory.support.DefaultListableBeanFactory;

 5 import org.springframework.util.Assert;

 6

 7 public class IgnoringDuplicateBeansBeanFactory extends DefaultListableBeanFactory {

 8     public IgnoringDuplicateBeansBeanFactory(BeanFactory beanFactory) {

 9         super(beanFactory);

10     }

11

12     public void registerBeanDefinition(String beanName, BeanDefinition beanDefinition)

13             throws BeanDefinitionStoreException {

14

15         Assert.hasText(beanName, "'beanName' must not be empty");

16         Assert.notNull(beanDefinition, "Bean definition must not be null");

17         if (containsBeanDefinition(beanName)) {

18             logger.warn("Ignoring bean definition [" + beanDefinition + "] for bean '" + beanName +

19                     "': there's already [" + getBeanDefinition(beanName) + "] bound");

20         } else {

21             super.registerBeanDefinition(beanName, beanDefinition);

22         }

23     }

24

25 }

26 

You can use it as normal ClassPathXmlApplicationContext:

 1         ConfigurableApplicationContext ctx;

 2

 3         ctx = new IgnoringDuplicateBeansClassPathXmlApplicationContext("a.xml");

 4         SomeBean b = (SomeBean) ctx.getBean("xyzBean");

Posted in java, spring | Leave a Comment »

@Configurable without Spring in 10 minutes

Posted by Piotr Gabryanczyk on December 1, 2006

Why

I was asked recently to remove Spring from my project… I know, I know one step forward two steps back… I wasn’t happy at all. It was because Eclipse was crashing when two plugins had its own copy of spring library… Good excuse to remove Spring…

Problem

We are heavily using @Configurable and AspectJ so we needed some mechanism which could replace spring aspect in injecting dependencies.

How

Just look below:

 1 @Aspect()
 2 public class ConfigurableHackAspect {
 3     private static Map<String, Object> beanMap = new HashMap<String, Object>();
 4 
 5     public static void registerBean(String id, Object bean){
 6         beanMap.put(id, bean);
 7     }
 8 
 9     @After("@within(com.xyz.common.ConfigurableHack) && execution(*.new(..))")
10     public void afterConstructor(JoinPoint jp){
11         for(Field f : getAllFields(jp)){
12             if (f.isAnnotationPresent(InjectDependency.class) && beanMap.containsKey( f.getName())){
13                 try {
14                     f.set(jp.getTarget(), beanMap.get(f.getName()));
15                 } catch (IllegalAccessException e) {
16                     throw new RuntimeException(e);
17                 }
18             }
19         }
20     }
21 
22     private Field[] getAllFields(JoinPoint jp) {
23      return jp.getTarget().getClass().getFields();
24     }
25 }

Example usage

 1 class XYZ{
 2      3     @InjectDependency public IMarketDataSupplier marketDataSupplier;
 4      5 }
 6 

Summary

It is not perfect as it requires injected field to be public, but it does the job and is good enough for 10 minutes… 🙂

Posted in annotations, aop, aspectj, java, spring, Uncategorized | 2 Comments »