Ribbon是一個客戶端負載均衡器,運行在客戶端上,Feign已經默認使用了Ribbon。
Ribbon在Netflix組件是非常重要的一個組件,在Zuul中使用Ribbon做負載均衡,以及Feign組件的結合等。做的最多的可能是將RestTemplate和Ribbon相結合,你可能會這樣寫:
@Configuration
public class RibbonConfig {
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}
消費另外一個的服務的接口,差不多是這樣的:
@Service
public class RibbonService {
@Autowired
RestTemplate restTemplate;
public String hi(String name) {
return restTemplate.getForObject("http://eureka-client/hi?name="+name,String.class);
}
}
1. LoadBalancerClient
在Riibon中一個非常重要的組件為LoadBalancerClient,它作為負載均衡的一個客戶端。LoadBalancerClient是一個接口,它繼承ServiceInstanceChooser,它的實現類是RibbonLoadBalancerClient,這三者之間的關係如下圖:
其中LoadBalancerClient接口,有如下三個方法,其中excute()為執行請求,reconstructURI()用來重構url:
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* execute request using a ServiceInstance from the LoadBalancer for the specified
* service
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* execute request using a ServiceInstance from the LoadBalancer for the specified
* service
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* Create a proper URI with a real host and port for systems to utilize.
* Some systems use a URI with the logical serivce name as the host,
* such as http://myservice/path/to/service. This will replace the
* service name with the host:port from the ServiceInstance.
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
ServiceInstanceChooser接口,主要有一個方法,用來根據serviceId來獲取ServiceInstance,代碼如下:
public interface ServiceInstanceChooser {
/**
* Choose a ServiceInstance from the LoadBalancer for the specified service
*/
ServiceInstance choose(String serviceId);
}
LoadBalancerClient的實現類為RibbonLoadBalancerClient,這個類是非常重要的一個類,最終的負載均衡的請求處理,由它來執行。它的部分源碼如下:
public class RibbonLoadBalancerClient implements LoadBalancerClient {
...//省略代碼
@Override
public ServiceInstance choose(String serviceId) {
Server server = getServer(serviceId);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
protected Server getServer(String serviceId) {
return getServer(getLoadBalancer(serviceId));
}
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
...//省略代碼
在RibbonLoadBalancerClient的源碼中,其中choose()方法是選擇具體服務實例的一個方法。該方法通過getServer()方法去獲取實例,經過源碼跟蹤,最終交給了ILoadBalancer類去選擇服務實例。
ILoadBalancer在ribbon-loadbalancer的jar包下,它是定義了實現軟件負載均衡的一個接口,它需要一組可供選擇的服務註冊列表信息,以及根據特定方法去選擇服務,它的源碼如下 :
public interface ILoadBalancer {
/**
* Initial list of servers.
* This API also serves to add additional ones at a later time
* The same logical server (host:port) could essentially be added multiple times
* (helpful in cases where you want to give more "weightage" perhaps ..)
*/
public void addServers(List<Server> newServers);
/**
* Choose a server from load balancer.
*/
public Server chooseServer(Object key);
/**
* To be called by the clients of the load balancer to notify that a Server is down
* else, the LB will think its still Alive until the next Ping cycle - potentially
* (assuming that the LB Impl does a ping)
*/
public void markServerDown(Server server);
/**
* @deprecated 2016-01-20 This method is deprecated in favor of the
* cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
* and {@link #getAllServers} API (equivalent to availableOnly=false).
*
* Get the current list of servers.
*/
@Deprecated
public List<Server> getServerList(boolean availableOnly);
/**
* @return Only the servers that are up and reachable.
*/
public List<Server> getReachableServers();
/**
* @return All known servers, both reachable and unreachable.
*/
public List<Server> getAllServers();
}
addServers()方法是添加一個Server集合;
chooseServer()方法是根據key去獲取Server;
markServerDown()方法用來標記某個服務下線;
getReachableServers()獲取可用的Server集合;
getAllServers()獲取所有的Server集合。
2. DynamicServerListLoadBalancer
BaseLoadBalancer的實現類為DynamicServerListLoadBalancer,這三者之間的關係如下:
查看上述三個類的源碼,可以發現配置以下信息,IClientConfig、IRule、IPing、ServerList、ServerListFilter和ILoadBalancer,查看BaseLoadBalancer類,它默認的情況下,實現了以下配置:
IClientConfig ribbonClientConfig: DefaultClientConfigImpl配置
IRule ribbonRule: RoundRobinRule 路由策略
IPing ribbonPing: DummyPing
ServerList ribbonServerList: ConfigurationBasedServerList
ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter
ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer
IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類為DefaultClientConfigImpl。
IRule用於負載均衡的策略,它有三個方法,其中choose()是根據key來獲取server,setLoadBalancer()和getLoadBalancer()是用來設置和獲取ILoadBalancer的,它的源碼如下:
public interface IRule {
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
IRule有很多默認的實現類,這些實現類根據不同的算法和邏輯來處理負載均衡,Ribbon實現的IRule有以下幾種,在大多數情況下,這些默認的實現類是可以滿足需求的,如果有特性的需求,可以自己實現:
BestAvailableRule 選擇最小請求數
ClientConfigEnabledRoundRobinRule 輪詢
RandomRule 隨機選擇一個server
RoundRobinRule 輪詢選擇server
RetryRule 根據輪詢的方式重試
WeightedResponseTimeRule 根據響應時間去分配一個weight,weight越低,被選擇的可能性就越低
ZoneAvoidanceRule 根據server的zone區域和可用性來輪詢選擇
IPing是用來向server發送”ping”命令,根據該server是否有響應判斷該server是否可用。它有一個isAlive()方法,它的源碼如下:
public interface IPing {
/**
* Checks whether the given <code>Server</code> is "alive" i.e. should be
* considered a candidate while loadbalancing
*/
public boolean isAlive(Server server);
}
IPing的實現類有PingUrl、PingConstant、NoOpPing、DummyPing和NIWSDiscoveryPing。之間的關係如下:
PingUrl 真實的去ping 某個url,判斷其是否alive
PingConstant 固定返回某服務是否可用,默認返回true,即可用
NoOpPing 不去ping,直接返回true,即可用。
DummyPing 直接返回true,並實現了initWithNiwsConfig方法。
NIWSDiscoveryPing,根據DiscoveryEnabledServer的InstanceInfo的InstanceStatus去判斷,如果為InstanceStatus.UP,則為可用,否則不可用。
ServerList是定義獲取所有的server的註冊列表信息的接口,它的代碼如下:
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*/
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*/
public List<T> getUpdatedListOfServers();
}
ServerListFilter接口,定義了可根據配置去過濾或者根據特性動態獲取符合條件的server列表的方法,代碼如下:
/**
* This interface allows for filtering the configured or dynamically obtained
* List of candidate servers with desirable characteristics.
*/
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
閱讀DynamicServerListLoadBalancer的源碼,DynamicServerListLoadBalancer的構造函數中有個initWithNiwsConfig()方法。在該方法中,經過一系列的初始化配置,最終執行了restOfInit()方法。其代碼如下:
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
initWithNiwsConfig(clientConfig);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.ServerListUpdaterClassName,
DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
);
this.serverListUpdater = (ServerListUpdater) ClientFactory
.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}
在restOfInit()方法上,有一個updateListOfServers()的方法,該方法是用來獲取所有的ServerList的。
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
進一步跟蹤updateListOfServers()方法的源碼,最終由serverListImpl.getUpdatedListOfServers()獲取所有的服務列表的,代碼如下:
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
而serverListImpl是ServerList接口的具體實現類。跟蹤代碼,ServerList的實現類為DiscoveryEnabledNIWSServerList。其中DiscoveryEnabledNIWSServerList有getInitialListOfServers()和getUpdatedListOfServers()方法,具體代碼如下:
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
繼續跟蹤obtainServersViaDiscovery(),它根據eurekaClientProvider.get()來獲取EurekaClient,再根據EurekaClient來獲取註冊列表信息,代碼如下:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
其中eurekaClientProvider的實現類是LegacyEurekaClientProvider,它是一個獲取eurekaClient類,通過靜態的方法去獲取eurekaClient,其代碼如下:
/**
* A legacy class to provide eurekaclient via static singletons
*/
class LegacyEurekaClientProvider implements Provider<EurekaClient> {
private volatile EurekaClient eurekaClient;
@Override
public synchronized EurekaClient get() {
if (eurekaClient == null) {
eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient();
}
return eurekaClient;
}
}
EurekaClient的實現類為DiscoveryClient,在之前已經分析了它具有服務註冊、獲取服務註冊列表等的全部功能。
由此可見,負載均衡器是從EurekaClient獲取服務信息,並根據IRule去路由,並且根據IPing去判斷服務的可用性。
那麼現在還有個問題,負載均衡器多久一次去獲取一次從Eureka Client獲取註冊信息呢。
在BaseLoadBalancer類下,BaseLoadBalancer的構造函數,該構造函數開啟了一個PingTask任務,代碼如下:
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
...//代碼省略
setupPingTask();
...//代碼省略
}
setupPingTask()的具體代碼邏輯,它開啟了ShutdownEnabledTimer執行PingTask任務,在默認情況下pingIntervalSeconds為10,即每10秒鐘,想EurekaClient發送一次”ping”。
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
PingTask源碼,即new一個Pinger對象,並執行runPinger()方法。
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
查看Pinger的runPinger()方法,最終根據pingerStrategy.pingServers(ping, allServers)來獲取服務的可用性,如果該返回結果,如之前相同,則不去向EurekaClient獲取註冊列表,如果不同則通知ServerStatusChangeListener或者changeListeners發生了改變,進行更新或者重新拉取。
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
BaseLoadBalancer中定義了一個IPingStrategy,用來描述服務檢查策略,IPingStrategy默認實現採用了SerialPingStrategy實現,SerialPingStrategy中的pingServers方法就是遍歷所有的服務實例,一個一個發送請求,查看這些服務實例是否還有效,如果網絡環境不好的話,這種檢查策略效率會很低,如果我們想自定義檢查策略的話,可以重寫SerialPingStrategy的pingServers方法。
由此可見,LoadBalancerClient是在初始化的時候,會向Eureka回去服務註冊列表,並且向通過10s一次向EurekaClient發送“ping”,來判斷服務的可用性,如果服務的可用性發生了改變或者服務數量和之前的不一致,則更新或者重新拉取。LoadBalancerClient有了這些服務註冊列表,就可以根據具體的IRule來進行負載均衡。
3. RestTemplate是如何和Ribbon結合的
最後,回答問題的本質,為什麼在RestTemplate加一個@LoadBalance註解就可可以開啟負載均衡呢?
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
全局搜索@LoadBalanced有哪些類用到了,發現LoadBalancerAutoConfiguration類,即LoadBalancer自動配置類。
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
}
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
}
};
}
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
}
}
在該類中,首先維護了一個被@LoadBalanced修飾的RestTemplate對象的List,在初始化的過程中,通過調用customizer.customize(restTemplate)方法來給RestTemplate增加攔截器LoadBalancerInterceptor。
而LoadBalancerInterceptor,用於實時攔截,在LoadBalancerInterceptor這裡實現來負載均衡。LoadBalancerInterceptor的攔截方法如下:
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
總結
綜上所述,Ribbon的負載均衡,主要通過LoadBalancerClient來實現的,而LoadBalancerClient具體交給了ILoadBalancer來處理,ILoadBalancer通過配置IRule、IPing等信息,並向EurekaClient獲取註冊列表的信息,並默認10秒一次向EurekaClient發送“ping”,進而檢查是否更新服務列表,最後,得到註冊列表後,ILoadBalancer根據IRule的策略進行負載均衡。
而RestTemplate 被@LoadBalance註解後,能過用負載均衡,主要是維護了一個被@LoadBalance註解的RestTemplate列表,並給列表中的RestTemplate添加攔截器,進而交給負載均衡器去處理。