Zuul 架構圖
Zuul的官方文檔中的架構圖
從架構圖中可以看到 Zuul
通過Zuul Servlet
和 一系列 Zuul Filter
來完成智能路由和過濾器的功能。
Zuul 工作原理概述(轉)
在Zuul
中,整個請求的過程是這樣的,首先將請求給 ZuulServlet
處理,ZuulServlet
中有一個ZuulRunner
對象,該對象中初始化了RequestContext
, RequestContext
作為整個請求的上下文,封裝了請求的一些數據,並被所有的ZuulFilter
共享。ZuulRunner
中還有 FilterProcessor
,FilterProcessor
作為執行所有的ZuulFilter
的管理器。FilterProcessor
從 FilterLoader
中獲取ZuulFilter
,而ZuulFilter
是被FilterFileManager
所加載,並支持groovy
熱加載,採用了輪詢的方式熱加載。
有了這些Filter
之後,ZuulServlet
首先執行的pre
類型的過濾器,再執行route
類型的過濾器,最後執行的是post
類型的過濾器。
如果在執行這些過濾器有錯誤的時候則會執行error
類型的過濾器。執行完這些過濾器,最終將請求的結果返回給客戶端。
Zuul 啟動—源碼分析
在程序的啟動類上加@EnableZuulProxy
註解,我們可以使用Zuul
提供的功能了,該註解的源碼為:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({ZuulProxyMarkerConfiguration.class})
public @interface EnableZuulProxy {
}
源碼中,@EnableZuulProxy
引入了ZuulProxyMarkerConfiguration
配置類,跟蹤ZuulProxyMarkerConfiguration
類:
public class ZuulProxyMarkerConfiguration {
public ZuulProxyMarkerConfiguration() {
}
@Bean
public ZuulProxyMarkerConfiguration.Marker zuulProxyMarkerBean() {
return new ZuulProxyMarkerConfiguration.Marker();
}
class Marker {
Marker() {
}
}
}
在ZuulProxyMarkerConfiguration
配置類中,發現只是註冊了一個ZuulProxyMarkerConfiguration.Marker
的bean
。我們通過分析應該會有依賴這個bean
的配置類。然後我們找到了 ZuulProxyAutoConfiguration
依賴了ZuulProxyMarkerConfiguration.Marker
的bean
。
跟蹤 ZuulProxyAutoConfiguration
:
@Configuration
@Import({RestClientRibbonConfiguration.class, OkHttpRibbonConfiguration.class, HttpClientRibbonConfiguration.class, HttpClientConfiguration.class})
@ConditionalOnBean({Marker.class})
public class ZuulProxyAutoConfiguration extends ZuulServerAutoConfiguration {
@Bean
@ConditionalOnMissingBean({DiscoveryClientRouteLocator.class})
public DiscoveryClientRouteLocator discoveryRouteLocator() {
return new DiscoveryClientRouteLocator(this.server.getServlet().getContextPath(), this.discovery, this.zuulProperties, this.serviceRouteMapper, this.registration);
}
@Bean
@ConditionalOnMissingBean({PreDecorationFilter.class})
public PreDecorationFilter preDecorationFilter(RouteLocator routeLocator, ProxyRequestHelper proxyRequestHelper) {
return new PreDecorationFilter(routeLocator, this.server.getServlet().getContextPath(), this.zuulProperties, proxyRequestHelper);
}
@Bean
@ConditionalOnMissingBean({RibbonRoutingFilter.class})
public RibbonRoutingFilter ribbonRoutingFilter(ProxyRequestHelper helper, RibbonCommandFactory<?> ribbonCommandFactory) {
RibbonRoutingFilter filter = new RibbonRoutingFilter(helper, ribbonCommandFactory, this.requestCustomizers);
return filter;
}
@Bean
@ConditionalOnMissingBean({SimpleHostRoutingFilter.class, CloseableHttpClient.class})
public SimpleHostRoutingFilter simpleHostRoutingFilter(ProxyRequestHelper helper, ZuulProperties zuulProperties, ApacheHttpClientConnectionManagerFactory connectionManagerFactory, ApacheHttpClientFactory httpClientFactory) {
return new SimpleHostRoutingFilter(helper, zuulProperties, connectionManagerFactory, httpClientFactory);
}
@Bean
@ConditionalOnMissingBean({SimpleHostRoutingFilter.class})
public SimpleHostRoutingFilter simpleHostRoutingFilter2(ProxyRequestHelper helper, ZuulProperties zuulProperties, CloseableHttpClient httpClient) {
return new SimpleHostRoutingFilter(helper, zuulProperties, httpClient);
}
@Bean
@ConditionalOnMissingBean({ServiceRouteMapper.class})
public ServiceRouteMapper serviceRouteMapper() {
return new SimpleServiceRouteMapper();
}
}
我們發現在類ZuulProxyAutoConfiguration
中,引入了RestClientRibbonConfiguration
, OkHttpRibbonConfiguration
, HttpClientRibbonConfiguration
, HttpClientConfiguration
,Zuul
默認是用HttpClientRibbonConfiguration
做負載均衡配置, 注入了DiscoveryClient
、RibbonConfiguration
用作負載均衡相關。注入了一些列的Filter
,pre
類型: PreDecorationFilter
; // 裝飾 Requestroute
類型: RibbonRoutingFilter
, SimpleHostRoutingFilter
; // 路由Filter
在ZuulProxyAutoConfiguration
的父類ZuulServerAutoConfiguration
中,也引入了一些配置信息:
@EnableConfigurationProperties({ZuulProperties.class})
@ConditionalOnClass({ZuulServlet.class, ZuulServletFilter.class})
@ConditionalOnBean({Marker.class})
public class ZuulServerAutoConfiguration {
// 在缺失`ZuulServlet`的情況下注入`ZuulServlet`
@Bean
@ConditionalOnMissingBean(
name = {"zuulServlet"}
)
@ConditionalOnProperty(
name = {"zuul.use-filter"},
havingValue = "false",
matchIfMissing = true
)
public ServletRegistrationBean zuulServlet() {
ServletRegistrationBean<ZuulServlet> servlet = new ServletRegistrationBean(new ZuulServlet(), new String[]{this.zuulProperties.getServletPattern()});
servlet.addInitParameter("buffer-requests", "false");
return servlet;
}
// 在缺失`ZuulServletFilter`的情況下注入`ZuulServletFilter`
@Bean
@ConditionalOnMissingBean(
name = {"zuulServletFilter"}
)
@ConditionalOnProperty(
name = {"zuul.use-filter"},
havingValue = "true",
matchIfMissing = false
)
public FilterRegistrationBean zuulServletFilter() {
FilterRegistrationBean<ZuulServletFilter> filterRegistration = new FilterRegistrationBean();
filterRegistration.setUrlPatterns(Collections.singleton(this.zuulProperties.getServletPattern()));
filterRegistration.setFilter(new ZuulServletFilter());
filterRegistration.setOrder(2147483647);
filterRegistration.addInitParameter("buffer-requests", "false");
return filterRegistration;
}
// 注入 `ServletDetectionFilter`
@Bean
public ServletDetectionFilter servletDetectionFilter() {
return new ServletDetectionFilter();
}
// 注入 `FormBodyWrapperFilter`
@Bean
public FormBodyWrapperFilter formBodyWrapperFilter() {
return new FormBodyWrapperFilter();
}
// 注入 `DebugFilter`
@Bean
public DebugFilter debugFilter() {
return new DebugFilter();
}
// 注入 `Servlet30WrapperFilter`
@Bean
public Servlet30WrapperFilter servlet30WrapperFilter() {
return new Servlet30WrapperFilter();
}
// 注入 `SendResponseFilter`
@Bean
public SendResponseFilter sendResponseFilter(ZuulProperties properties) {
return new SendResponseFilter(this.zuulProperties);
}
// 注入 `SendErrorFilter`
@Bean
public SendErrorFilter sendErrorFilter() {
return new SendErrorFilter();
}
// 注入 `SendForwardFilter`
@Bean
public SendForwardFilter sendForwardFilter() {
return new SendForwardFilter();
}
@Configuration
protected static class ZuulFilterConfiguration {
@Autowired
private Map<String, ZuulFilter> filters;
protected ZuulFilterConfiguration() {
}
// ZuulFilterInitializer,在初始化類中將Filter向FilterRegistry註冊
@Bean
public ZuulFilterInitializer zuulFilterInitializer(CounterFactory counterFactory, TracerFactory tracerFactory) {
FilterLoader filterLoader = FilterLoader.getInstance();
FilterRegistry filterRegistry = FilterRegistry.instance();
return new ZuulFilterInitializer(this.filters, counterFactory, tracerFactory, filterLoader, filterRegistry);
}
}
}
父類ZuulServerAutoConfiguration
中,在缺失ZuulServlet
,ZuulServletFilter
的bean
的情況下,注入ZuulServlet
,ZuulServletFilter
。同時也注入了其他的過濾器,pre
類型: ServletDetectionFilter
,DebugFilter
,Servlet30WrapperFilter
;post
類型: SendResponseFilter
; // 響應處理Filterroute
類型: SendForwardFilter
; // 重定向處理Filtererror
類型 : SendErrorFilter
; // 錯誤處理Filter
初始化ZuulFilterInitializer
類,通過FilterLoader
將所有的Filter
向FilterRegistry
註冊。我們看一下ZuulFilterInitializer
類中部分代碼
public class ZuulFilterInitializer {
// 初始化完成後註冊所有的Filter
@PostConstruct
public void contextInitialized() {
log.info("Starting filter initializer");
TracerFactory.initialize(this.tracerFactory);
CounterFactory.initialize(this.counterFactory);
Iterator var1 = this.filters.entrySet().iterator();
while(var1.hasNext()) {
Entry<String, ZuulFilter> entry = (Entry)var1.next();
this.filterRegistry.put((String)entry.getKey(), (ZuulFilter)entry.getValue());
}
}
// 銷燬前移除所有註冊所有的Filter
@PreDestroy
public void contextDestroyed() {
log.info("Stopping filter initializer");
Iterator var1 = this.filters.entrySet().iterator();
while(var1.hasNext()) {
Entry<String, ZuulFilter> entry = (Entry)var1.next();
this.filterRegistry.remove((String)entry.getKey());
}
this.clearLoaderCache();
TracerFactory.initialize((TracerFactory)null);
CounterFactory.initialize((CounterFactory)null);
}
}
Zuul 路由-源碼分析
Filter 的執行
我們站在了源碼的角度分析了 Zuul
啟動過程。接下來我們通過源碼來分析,我們注入的Filter
在 Zuul
在一次路由的過程是怎樣的執行的。在上部分內容,我們介紹了Zuul
的核心類 ZuulServlet
是所有請求的入口,我們來進入 ZuulServlet
的源碼:
public class ZuulServlet extends HttpServlet {
public ZuulServlet() {
}
// 在初始化方法裡初始化了ZuulRunner
public void init(ServletConfig config) throws ServletException {
super.init(config);
String bufferReqsStr = config.getInitParameter("buffer-requests");
boolean bufferReqs = bufferReqsStr != null && bufferReqsStr.equals("true");
this.zuulRunner = new ZuulRunner(bufferReqs);
}
// 標準的Servlet的service方法
public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException, IOException {
try {
// 調用ZuulRunner的init方法
this.init((HttpServletRequest)servletRequest, (HttpServletResponse)servletResponse);
RequestContext context = RequestContext.getCurrentContext();
context.setZuulEngineRan();
try {
// 調用ZuulRunner的preRoute方法
this.preRoute();
} catch (ZuulException var12) {
this.error(var12);
// 調用ZuulRunner的postRoute方法
this.postRoute();
return;
}
try {
this.route();
} catch (ZuulException var13) {
this.error(var13);
this.postRoute();
return;
}
try {
this.postRoute();
} catch (ZuulException var11) {
this.error(var11);
}
} catch (Throwable var14) {
this.error(new ZuulException(var14, 500, "UNHANDLED_EXCEPTION_" + var14.getClass().getName()));
} finally {
RequestContext.getCurrentContext().unset();
}
}
}
在 ZuulServlet
源碼中, service
方法調用 this.init(request,response)
, 跟進 init(request,response)
方法:
void init(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {
this.zuulRunner.init(servletRequest, servletResponse);
}
發現調用的是 ZuulServlet
持有的 ZuulRunner
的 init
方法,進入 ZuulRunner
的 init
方法:
public void init(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {
RequestContext ctx = RequestContext.getCurrentContext();
if (this.bufferRequests) {
ctx.setRequest(new HttpServletRequestWrapper(servletRequest));
} else {
ctx.setRequest(servletRequest);
}
ctx.setResponse(new HttpServletResponseWrapper(servletResponse));
}
在 ZuulRunner
的 init
方法中,我們發現只是對Request,Response
進行了有條件的包裝。
我們回退到 ZuulServlet
的方法中,在執行完 init
方法後,調用 this.preRoute()
, 跟進preRoute()
方法:
void preRoute() throws ZuulException {
this.zuulRunner.preRoute();
}
調用了 ZuulRunner
的 preRoute()
方法, 進入ZuulRunner.preRoute()
:
public void preRoute() throws ZuulException {
FilterProcessor.getInstance().preRoute();
}
ZuulRunner.preRoute()
中獲取了一個FilterProcessor
的實例並且執行了其preRoute()
方法,進入FilterProcessor.preRoute()
方法:
public void preRoute() throws ZuulException {
try {
this.runFilters("pre");
} catch (ZuulException var2) {
throw var2;
} catch (Throwable var3) {
throw new ZuulException(var3, 500, "UNCAUGHT_EXCEPTION_IN_PRE_FILTER_" + var3.getClass().getName());
}
}
FilterProcessor.preRoute()
方法中,執行this.runFilters()
方法並且傳入參數pre
,進入this.runFilters()
方法中:
public Object runFilters(String sType) throws Throwable {
if (RequestContext.getCurrentContext().debugRouting()) {
Debug.addRoutingDebug("Invoking {" + sType + "} type filters");
}
boolean bResult = false;
//通過FilterLoader的實例獲取所有pre的Filter
List<ZuulFilter> list = FilterLoader.getInstance().getFiltersByType(sType);
if (list != null) {
for(int i = 0; i < list.size(); ++i) {
ZuulFilter zuulFilter = (ZuulFilter)list.get(i);
// 執行Filter
Object result = this.processZuulFilter(zuulFilter);
if (result != null && result instanceof Boolean) {
bResult |= (Boolean)result;
}
}
}
return bResult;
}
this.runFilters()
方法中通過FilterLoader
的實例獲取所有pre
類型的Filter
,並調用this.processZuulFilte(Filter)
執行所有pre
類型的Filter
。
上述分析過程中我們瞭解了pre
類型的Filter
在一次路由中優先執行,我們通過一個簡單的圖了加深一下這個過程
這就是一個完整的Filter
的執行過程,route
和 post
類型的Filter
執行過程也是一致的。
Filter 路由
上部分內容我們瞭解Filter
的執行入口,這部分內容我們來了解Zuul
是怎麼選擇路由和負載均衡的。
在第一部分Zuul
的啟動過程中,Zuul
注入了pre
類型的Filter
,有PreDecorationFilter
通過名字我們可以猜測,這個PreDecorationFilter
是起到裝飾Filter
的作用,我們進入PreDecorationFilter.run()
源碼:
public Object run() {
RequestContext ctx = RequestContext.getCurrentContext();
String requestURI = this.urlPathHelper.getPathWithinApplication(ctx.getRequest());
// 獲取請求路徑匹配的路由信息
Route route = this.routeLocator.getMatchingRoute(requestURI);
String location;
if (route != null) {
location = route.getLocation();
if (location != null) {
// 在RequextContext中放入請求路徑,路由的標識
ctx.put("requestURI", route.getPath());
ctx.put("proxy", route.getId());
if (!route.isCustomSensitiveHeaders()) {
this.proxyRequestHelper.addIgnoredHeaders((String[])this.properties.getSensitiveHeaders().toArray(new String[0]));
} else {
this.proxyRequestHelper.addIgnoredHeaders((String[])route.getSensitiveHeaders().toArray(new String[0]));
}
//在RequextContext中放入該請求是否是可重試的標識
if (route.getRetryable() != null) {
ctx.put("retryable", route.getRetryable());
}
if (!location.startsWith("http:") && !location.startsWith("https:")) {
if (location.startsWith("forward:")) {
ctx.set("forward.to", StringUtils.cleanPath(location.substring("forward:".length()) + route.getPath()));
ctx.setRouteHost((URL)null);
return null;
}
//設置路由配置的ServiceId
ctx.set("serviceId", location);
ctx.setRouteHost((URL)null);
ctx.addOriginResponseHeader("X-Zuul-ServiceId", location);
} else {
ctx.setRouteHost(this.getUrl(location));
ctx.addOriginResponseHeader("X-Zuul-Service", location);
}
}
} else {
log.warn("No route found for uri: " + requestURI);
location = this.getForwardUri(requestURI);
ctx.set("forward.to", location);
}
return null;
}
在run
方法中,獲取和當前路徑匹配的路由信息,將路由相關信息放入RequestContenxt
中,路由信息是讀取配置文件中的配置。並且放入了一個很重要的標識retryable
這個就是決定我們這次請求是否可重試的開關,而這個讀取配置文件中的zuul.retryable
來決定的。
那Zuul
到底是怎麼進行負載均衡的呢?
我們知道Zuul負載均衡底層是通過Ribbon來實現的,並且在啟動Zuul
的時候,我們注入了一個RibbonRoutingFilter
的過濾器。這個類很重要,它主要是完成請求的路由轉發。接下來我們看下他的 run
方法
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders(new String[0]);
try {
RibbonCommandContext commandContext = this.buildCommandContext(context);
ClientHttpResponse response = this.forward(commandContext);
this.setResponse(response);
return response;
} catch (ZuulException var4) {
throw new ZuulRuntimeException(var4);
} catch (Exception var5) {
throw new ZuulRuntimeException(var5);
}
}
在 run
中 可以看到,先構建了一個 RibbonCommandContext
然後通過forward()
方法轉發的,進入forward
方法:
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity());
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = (ClientHttpResponse)command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
} catch (HystrixRuntimeException var5) {
return this.handleException(info, var5);
}
}
在 forward
中 可以看到,通過 RibbonCommandFactory
創建一個RibbonCommand
,然後執行RibbonCommand
的execute
方法,
這個 RibbonCommandFactory
是什麼時候注入的呢?
在Zuul
啟動的時候,在 ZuulProxyAutoConfiguration
配置類引入了HttpClientRibbonConfiguration
配置類
@Configuration
@Import({RestClientRibbonConfiguration.class, OkHttpRibbonConfiguration.class, HttpClientRibbonConfiguration.class, HttpClientConfiguration.class})
@ConditionalOnBean({Marker.class})
public class ZuulProxyAutoConfiguration extends ZuulServerAutoConfiguration {
}
在HttpClientRibbonConfiguration
中注入了 RibbonCommandFactory
,源碼如下:
@Configuration
@RibbonCommandFactoryConfiguration.ConditionalOnRibbonHttpClient
protected static class HttpClientRibbonConfiguration {
@Autowired(
required = false
)
private Set<FallbackProvider> zuulFallbackProviders = Collections.emptySet();
protected HttpClientRibbonConfiguration() {
}
@Bean
@ConditionalOnMissingBean
public RibbonCommandFactory<?> ribbonCommandFactory(SpringClientFactory clientFactory, ZuulProperties zuulProperties) {
return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, this.zuulFallbackProviders);
}
}
這個時候我們就明白了 RibbonCommandFactory
是何時注入的了,
然後我們在看一下 RibbonCommandFactory.create()
創建RibbonCommand
的方法: Zuul
默認使用HttpClientRibbonCommandFactory
,進入到create()
方法:
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
/**
*服務降級
*/
FallbackProvider zuulFallbackProvider = this.getFallbackProvider(context.getServiceId());
String serviceId = context.getServiceId();
/**
*負載均衡類,處理請求轉發類
*/
RibbonLoadBalancingHttpClient client = (RibbonLoadBalancingHttpClient)this.clientFactory.getClient(serviceId, RibbonLoadBalancingHttpClient.class);
client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
/**
*將降級、負載,請求轉發類、以及其他一些內容
*包裝成HttpClientRibbonCommand(這個類繼承了HystrixCommand)
*/
return new HttpClientRibbonCommand(serviceId, client, context, this.zuulProperties, zuulFallbackProvider, this.clientFactory.getClientConfig(serviceId));
}
在 create
方法中,分別創建了FallbackProvider
,RibbonLoadBalancingHttpClient
,從命名上我們就可以知道, FallbackProvider
和熔斷相關,RibbonLoadBalancingHttpClient
和負載均衡相關。然後將這些作為參數創建了HttpClientRibbonCommand
。我們看一下HttpClientRibbonCommand
的繼承關係。
從類繼承關係可以看出HttpClientRibbonCommand
繼承了AbstractRibbonCommand
,並且AbstractRibbonCommand
繼承了HystrixCommand
這樣我們就瞭解到HystrixCommand
是如何集成進來的了。
在HystrixCommand
中定義了run
抽象接口,並且在AbstractRibbonCommand
中實現了該接口。
我們回退到RibbonRoutingFilter
中
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity());
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = (ClientHttpResponse)command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
} catch (HystrixRuntimeException var5) {
return this.handleException(info, var5);
}
}
當完成了RibbonCommand
創建工作後,執行的command.execute()
方法,通過剛剛的分析我們知道了command
其實指的是HttpClientRibbonCommand
,同時我們也知道HttpClientRibbonCommand
繼承了HystrixCommand
,所以當執行command.execute()
時,其實執行的是HttpClientRibbonCommand
的run
方法。查看源碼我們並沒有發現run
方法,但是其父類AbstractRibbonCommand
實現了run
方法。所以其實執行的是AbstractRibbonCommand
的run
方法,進入AbstractRibbonCommand
的run
方法:
protected ClientHttpResponse run() throws Exception {
RequestContext context = RequestContext.getCurrentContext();
RQ request = this.createRequest();
boolean retryableClient = this.client instanceof AbstractLoadBalancingClient && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
HttpResponse response;
if (retryableClient) {
response = (HttpResponse)this.client.execute(request, this.config);
} else {
response = (HttpResponse)this.client.executeWithLoadBalancer(request, this.config);
}
context.set("ribbonResponse", response);
if (this.isResponseTimedOut() && response != null) {
response.close();
}
return new RibbonHttpResponse(response);
}
在run
方法中,先判斷是否是重試的client
,通過分析,第一次執行的時候,client
為RibbonLoadBalancingHttpClient
從而會調用executeWithLoadBalancer()
方法,但是RibbonLoadBalancingHttpClient
並沒有executeWithLoadBalancer()
方法,查看而類繼承關係圖,
其父類AbstractLoadBalancerAwareClient
實現了executeWithLoadBalancer()
,進入AbstractLoadBalancerAwareClient.executeWithLoadBalancer()
方法:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);
try {
return (IResponse)command.submit(new ServerOperation<T>() {
public Observable<T> call(Server server) {
URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
ClientRequest requestForServer = request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception var5) {
return Observable.error(var5);
}
}
}).toBlocking().single();
} catch (Exception var6) {
Throwable t = var6.getCause();
if (t instanceof ClientException) {
throw (ClientException)t;
} else {
throw new ClientException(var6);
}
}
}
在executeWithLoadBalancer
方法中,首先進入buildLoadBalancerCommand()
方法:
protected LoadBalancerCommand<T> buildLoadBalancerCommand(S request, IClientConfig config) {
/**
* 創建一個RetryHandler,這個很重要它是用來
* 決定利用RxJava的Observable是否進行重試的標準。
*/
RequestSpecificRetryHandler handler = this.getRequestSpecificRetryHandler(request, config);
Builder<T> builder = LoadBalancerCommand.builder().withLoadBalancerContext(this).withRetryHandler(handler).withLoadBalancerURI(request.getUri());
this.customizeLoadBalancerCommandBuilder(request, config, builder);
return builder.build();
}
在buildLoadBalancerCommand()
中通過了 getRequestSpecificRetryHandler()
獲取 RequestSpecificRetryHandler
的處理類,這個是個抽象方法,在子類AbstractLoadBalancingClient
中實現
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(final S request, final IClientConfig requestConfig) {
if (this.okToRetryOnAllOperations) {
return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
} else {
return !request.getContext().getMethod().equals("GET") ? new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig) : new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
}
}
在方法中我們可以看到重試的默認機制。若不配置ribbon.OkToRetryOnAllOperations=true
, 默認只是連接失敗
和GET
請求失敗才會發生重試。
回到executeWithLoadBalancer()
方法中:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);
try {
return (IResponse)command.submit(new ServerOperation<T>() {
public Observable<T> call(Server server) {
URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
ClientRequest requestForServer = request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception var5) {
return Observable.error(var5);
}
}
}).toBlocking().single();
} catch (Exception var6) {
Throwable t = var6.getCause();
if (t instanceof ClientException) {
throw (ClientException)t;
} else {
throw new ClientException(var6);
}
}
}
調用command.submit()
,創建了一個Observable
(RxJava)並且最終會調用AbstractLoadBalancerAwareClient.execute ()
方法
public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
IClientConfig config = configOverride != null ? configOverride : this.config;
RibbonProperties ribbon = RibbonProperties.from(config);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(ribbon.connectTimeout(this.connectTimeout)).setSocketTimeout(ribbon.readTimeout(this.readTimeout)).setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects)).setContentCompressionEnabled(ribbon.isGZipPayload(this.gzipPayload)).build();
request = this.getSecureRequest(request, configOverride);
HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
HttpResponse httpResponse = ((CloseableHttpClient)this.delegate).execute(httpUriRequest);
return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}
在execute()
方法中,我們看到創建連接並且發送了http
請求並將結果返回。
回退到submit()
方法中,創建了Observable
會監聽execute()
的執行狀態從而決定是否重試請求:
public Observable<T> submit(final ServerOperation<T> operation) {
final LoadBalancerCommand<T>.ExecutionInfoContext context = new LoadBalancerCommand.ExecutionInfoContext();
if (this.listenerInvoker != null) {
try {
this.listenerInvoker.onExecutionStart();
} catch (AbortExecutionException var6) {
return Observable.error(var6);
}
}
/**
* 相同server重試次數,去除首次
*/
final int maxRetrysSame = this.retryHandler.getMaxRetriesOnSameServer();
/**
* 集群內其他server重試次數
**/
final int maxRetrysNext = this.retryHandler.getMaxRetriesOnNextServer();
/**
* 創建一個Observable(RxJava)
**/
Observable<T> o = (this.server == null ? this.selectServer() : Observable.just(this.server)).concatMap(new Func1<Server, Observable<T>>() {
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = LoadBalancerCommand.this.loadBalancerContext.getServerStats(server);
Observable<T> o = Observable.just(server).concatMap(new Func1<Server, Observable<T>>() {
public Observable<T> call(final Server server) {
context.incAttemptCount();
LoadBalancerCommand.this.loadBalancerContext.noteOpenConnection(stats);
if (LoadBalancerCommand.this.listenerInvoker != null) {
try {
LoadBalancerCommand.this.listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException var3) {
return Observable.error(var3);
}
}
final Stopwatch tracer = LoadBalancerCommand.this.loadBalancerContext.getExecuteTracer().start();
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
public void onCompleted() {
this.recordStats(tracer, stats, this.entity, (Throwable)null);
}
public void onError(Throwable e) {
this.recordStats(tracer, stats, (Object)null, e);
LoadBalancerCommand.logger.debug("Got error {} when executed on server {}", e, server);
if (LoadBalancerCommand.this.listenerInvoker != null) {
LoadBalancerCommand.this.listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
public void onNext(T entity) {
this.entity = entity;
if (LoadBalancerCommand.this.listenerInvoker != null) {
LoadBalancerCommand.this.listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracerx, ServerStats statsx, Object entity, Throwable exception) {
tracerx.stop();
LoadBalancerCommand.this.loadBalancerContext.noteRequestCompletion(statsx, entity, exception, tracerx.getDuration(TimeUnit.MILLISECONDS), LoadBalancerCommand.this.retryHandler);
}
});
}
});
if (maxRetrysSame > 0) {
o = o.retry(LoadBalancerCommand.this.retryPolicy(maxRetrysSame, true));
}
return o;
}
});
if (maxRetrysNext > 0 && this.server == null) {
o = o.retry(this.retryPolicy(maxRetrysNext, false));
}
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == maxRetrysNext + 1) {
e = new ClientException(ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), (Throwable)e);
} else if (maxRetrysSame > 0 && context.getAttemptCount() == maxRetrysSame + 1) {
e = new ClientException(ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), (Throwable)e);
}
}
if (LoadBalancerCommand.this.listenerInvoker != null) {
LoadBalancerCommand.this.listenerInvoker.onExecutionFailed((Throwable)e, context.toFinalExecutionInfo());
}
return Observable.error((Throwable)e);
}
});
}
講到這裡,就是一次完整的路由過程了。我們大致回顧一下這個路由過程。
A.Zuul
的轉發是通過RibbonRoutingFilter
這個Filter
進行操作的。
B. 在轉發之前,Zuul
包裝請求為RibbonCommand
,並且RibbonCommand
繼承了HystrixCommand
,並且持有RibbonLoadBalancingHttpClient
和 FallbackProvider
,正應為這樣才使得Zuul
具有了服務降級(Fallback
),和負載均衡的功能,同時HystrixCommand
是具備超時時間的(默認是1s)。而且Zuul
默認採用的隔離級別是信號量模式。
C.在RibbonCommand
的內部Zuul
再次將請求包裝成一個Observable
,(有關RxJava的知識請參照其官方文檔)。並且為Observable
設置了重試次數,默認只對GET請求失敗
和連接失敗
重試。