Hystrix的一些问题~

Author Avatar
AF_ 05月 01,2020
  • 使用微信扫码分享

前段时间项目中加入了Hystrix做了服务接口的降级和熔断,随之引发了一些问题。

发现问题

  当Hystrix隔离策略为THREAD模式时,是无法获取到ThreadLocal中的值的;
  当时我们使用Feign调用接口时RequestInterceptor作为拦截器来实现远程调用间Header信息的传递,但是当我打开熔断策略开关后,RequestContextHolder.getRequestAttributes()无法获取到当前请求的头信息,导致后置服务无法通过鉴权。如下代码:

@Configuration
@Slf4j
public class FeignConfig implements RequestInterceptor {
	@Override
	public void apply(RequestTemplate template) {
		//开启熔断后,这里是无法获取到值的
		ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
		...
		...
	}
}

  原因在于,我们使用的隔离策略是THREAD 。而 RequestContextHolder 源码中,使用了两个血淋淋的ThreadLocal

解决问题

  最简单无脑的解决方式就是将隔离策略修改为SEMAPHORE即可,信号量的方式不会导致获取不到当前线程的请求头信息。
  但是这种解决方式略微牵强,因为Hystrix官方强烈建议使用THREAD作为隔离策略。
  那么有什么办法可以在不改变隔离策略的前提下解决这个问题吗?答案是自定义并发策略:

  • 编写自定义并发策略,编写一个类,让其继承HystrixConcurrencyStrategy ,并重写wrapCallable方法即可。
    • wrapCallable 方法拿到 RequestContextHolder.getRequestAttributes() ,也就是我们想传播的对象;
    • WrappedCallable 类中,我们将要传播的对象作为成员变量,并在其中的call方法中,为静态方法设值。
    • 这样,在Hystrix包裹的方法中,就可以使用RequestContextHolder.getRequestAttributes() 获取到相关属性——也就是说,可以拿到RequestContextHolder 中的ThreadLocal 属性。
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private static final Log log = LogFactory.getLog(RequestHystrixConcurrencyStrategy.class);

    public RequestHystrixConcurrencyStrategy() {
        HystrixPlugins.reset();
        HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        @Override
        public T call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                return target.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}
  • 至此,已经实现了ThreadLocal属性的传递,然而Hystrix只允许有一个并发策略!这意味着——如果不做任何处理,Sleuth、Spring Security将无法正常拿到上下文!(目前Sleuth、Spring Security都是通过自定义并发策略的方式来传递ThreadLocal对象的。)
    如何解决这个问题呢?参考下Sleuth以及Spring Security的实现:
    • org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy
    • org.springframework.cloud.netflix.hystrix.security.SecurityContextConcurrencyStrategy
  • 模仿它们的写法,改编上面的并发策略:
    • 将现有的并发策略作为新并发策略的成员变量
    • 在新并发策略中,返回现有并发策略的线程池、Queue。
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
	private static final Log log = LogFactory.getLog(RequestAttributeHystrixConcurrencyStrategy.class);

	private HystrixConcurrencyStrategy delegate;

	public RequestAttributeHystrixConcurrencyStrategy() {
		try {
			this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
			if (this.delegate instanceof RequestAttributeHystrixConcurrencyStrategy) {
				// Welcome to singleton hell...
				return;
			}
			HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
					.getInstance().getCommandExecutionHook();
			HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
					.getEventNotifier();
			HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
					.getMetricsPublisher();
			HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
					.getPropertiesStrategy();
			this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
					propertiesStrategy);
			HystrixPlugins.reset();
			HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
			HystrixPlugins.getInstance()
					.registerCommandExecutionHook(commandExecutionHook);
			HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
			HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
			HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
		}
		catch (Exception e) {
			log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
		}
	}

	private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
			HystrixMetricsPublisher metricsPublisher,
			HystrixPropertiesStrategy propertiesStrategy) {
		if (log.isDebugEnabled()) {
			log.debug("Current Hystrix plugins configuration is ["
					+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
					+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
					+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
			log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
		}
	}

	@Override
	public <T> Callable<T> wrapCallable(Callable<T> callable) {
		RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
		return new WrappedCallable<>(callable, requestAttributes);
	}

	@Override
	public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
			HystrixProperty<Integer> corePoolSize,
			HystrixProperty<Integer> maximumPoolSize,
			HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
				keepAliveTime, unit, workQueue);
	}

	@Override
	public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
			HystrixThreadPoolProperties threadPoolProperties) {
		return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
	}

	@Override
	public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
		return this.delegate.getBlockingQueue(maxQueueSize);
	}

	@Override
	public <T> HystrixRequestVariable<T> getRequestVariable(
			HystrixRequestVariableLifecycle<T> rv) {
		return this.delegate.getRequestVariable(rv);
	}

	static class WrappedCallable<T> implements Callable<T> {

		private final Callable<T> target;
		private final RequestAttributes requestAttributes;

		public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
			this.target = target;
			this.requestAttributes = requestAttributes;
		}

		@Override
		public T call() throws Exception {
			try {
				RequestContextHolder.setRequestAttributes(requestAttributes);
				return target.call();
			}
			finally {
				RequestContextHolder.resetRequestAttributes();
			}
		}
	}
}
  • 至此,解决Hystrix传播ThreadLocal对象的问题。