5)使用Spring AOP 来指定 dataSource 的 key ,从而dataSource会根据key选择 dataSourceMaster 和 dataSourceSlave:
package net.aazj.aop; import net.aazj.enums.DataSources; import net.aazj.util.DataSourceTypeManager; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.springframework.stereotype.Component; @Aspect // for aop @Component // for auto scan @Order(0) // execute before @Transactional public class DataSourceInterceptor { @Pointcut("execution(public * net.aazj.service..*.getUser(..))") public void dataSourceSlave(){}; @Before("dataSourceSlave()") public void before(JoinPoint jp) { DataSourceTypeManager.set(DataSources.SLAVE); } // ... ... } |
这里我们定义了一个 Aspect 类,我们使用 @Before 来在符合 @Pointcut("execution(public * net.aazj.service..*.getUser(..))") 中的方法被调用之前,调用 DataSourceTypeManager.set(DataSources.SLAVE) 设置了 key 的类型为 DataSources.SLAVE,所以 dataSource 会根据key=DataSources.SLAVE 选择 dataSourceSlave 这个dataSource。所以该方法对于的sql语句会在slave数据库上执行(经网友老刘1987提醒,这里存在多个Aspect之间的一个执行顺序的问题,必须保证切换数据源的Aspect必须在@Transactional这个Aspect之前执行,所以这里使用了@Order(0)来保证切换数据源先于@Transactional执行)。
我们可以不断的扩充 DataSourceInterceptor 这个 Aspect,在中进行各种各样的定义,来为某个service的某个方法指定合适的数据源对应的dataSource。
这样我们就可以使用 Spring AOP 的强大功能来,十分灵活进行配置了。
6)AbstractRoutingDataSource原理剖析
ThreadLocalRountingDataSource继承了AbstractRoutingDataSource,实现其抽象方法protected abstract Object determineCurrentLookupKey(); 从而实现对不同数据源的路由功能。我们从源码入手分析下其中原理:
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean AbstractRoutingDataSource 实现了 InitializingBean 那么spring在初始化该bean时,会调用InitializingBean的接口 void afterPropertiesSet() throws Exception; 我们看下AbstractRoutingDataSource是如何实现这个接口的: @Override public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property 'targetDataSources' is required"); } this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size()); for (Map.Entry<Object, Object> entry : this.targetDataSources.entrySet()) { Object lookupKey = resolveSpecifiedLookupKey(entry.getKey()); DataSource dataSource = resolveSpecifiedDataSource(entry.getValue()); this.resolvedDataSources.put(lookupKey, dataSource); } if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } } |
targetDataSources 是我们在xml配置文件中注入的 dataSourceMaster 和 dataSourceSlave. afterPropertiesSet方法就是使用注入的
dataSourceMaster 和 dataSourceSlave来构造一个HashMap——resolvedDataSources。方便后面根据 key 从该map 中取得对应的dataSource。
我们在看下 AbstractDataSource 接口中的 Connection getConnection() throws SQLException; 是如何实现的:
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
关键在于 determineTargetDataSource(),根据方法名就可以看出,应该此处就决定了使用哪个 dataSource :
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
Object lookupKey = determineCurrentLookupKey(); 该方法是我们实现的,在其中获取ThreadLocal中保存的 key 值。获得了key之后,在从afterPropertiesSet()中初始化好了的resolvedDataSources这个map中获得key对应的dataSource。而ThreadLocal中保存的 key 值是通过AOP的方式在调用service中相关方法之前设置好的。OK,到此搞定!
7)扩展 ThreadLocalRountingDataSource
上面我们只是实现了 master-slave 数据源的选择。如果有多台 master 或者有多台 slave。多台master组成一个HA,要实现当其中一台master挂了是,自动切换到另一台master,这个功能可以使用LVS/Keepalived来实现,也可以通过进一步扩展ThreadLocalRountingDataSource来实现,可以另外加一个线程专门来每个一秒来测试mysql是否正常来实现。同样对于多台slave之间要实现负载均衡,同时当一台slave挂了时,要实现将其从负载均衡中去除掉,这个功能既可以使用LVS/Keepalived来实现,同样也可以通过近一步扩展ThreadLocalRountingDataSource来实现。
3. 总结
从本文中我们可以体会到AOP的强大和灵活。
本文使用的是mybatis,其实使用Hibernate也应该是相似的配置。