博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hystrix指标窗口实现原理
阅读量:5983 次
发布时间:2019-06-20

本文共 17160 字,大约阅读时间需要 57 分钟。

一、引子

Hystrix是一个熔断中间件,能够实现fast-fail并走备用方案。Hystrix基于滑动窗口判定服务失败占比选择性熔断。滑动窗口的实现方案有很多种,指标计数也有很多种实现常见的就是AtomicInteger进行原子增减维护计数,具体的方案就不探讨了。

Hystrix是基于Rxjava去实现的,那么如何利用RxJava实现指标的汇聚和滑动窗口实现呢?当然本篇不是作为教程去介绍RxJava的使用姿势,本篇文章主要解说Hystrix是什么一个思路完成这项功能。

二、指标数据上传

看HystrixCommand执行的主入口

public Observable
toObservable() { final AbstractCommand
_cmd = this; final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true); //user code did run } } }; //mark the command as CANCELLED and store the latency (in addition to standard cleanup) final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { .......省略干扰代码........... handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { .......省略干扰代码........... handleCommandEnd(true); //user code did run } } }; .......省略干扰代码........... return Observable.defer(new Func0
>() { .......省略干扰代码........... return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); }});

我们的主入口Observable当doOnTerminate doOnUnsubscribe 的时候触发 handleCommandEnd 方法,从字面意思就是当command执行结束处理一些事情。

private void handleCommandEnd(boolean commandExecutionStarted) {    ........省略干扰代码..........    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);    if (executionResultAtTimeOfCancellation == null) {        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);    } else {        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);    }    ........省略干扰代码..........}

注意看 metrics.markCommandDone,调用了HystrixCommandMetrics的markCommandDone方法,把一个executionResult传入了进来。ExecutionResult 这是个什么鬼呢?

我们截取部分代码浏览下

public class ExecutionResult {    private final EventCounts eventCounts;    private final Exception failedExecutionException;    private final Exception executionException;    private final long startTimestamp;    private final int executionLatency; //time spent in run() method    private final int userThreadLatency; //time elapsed between caller thread submitting request and response being visible to it    private final boolean executionOccurred;    private final boolean isExecutedInThread;    private final HystrixCollapserKey collapserKey;    private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();    private static final int NUM_EVENT_TYPES = ALL_EVENT_TYPES.length;    private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES);    private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);

以大家聪慧的头脑应该能够猜测到这个类就是当前HystrixCommand的 执行结果记录,只不过这个结果不仅仅是结果,也包含了各种状态以及出现的异常。它的身影在里讲的各Observable里出现,跟着HystrixCommand整个生命周期。

回到上面讲,当时command执行完毕后,调用了HystrixCommandMetrics的markCommandDone方法

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {    HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);    if (executionStarted) {        concurrentExecutionCount.decrementAndGet();    }}

最终调用量HystrixThreadEventStream. executionDone方法的HystrixThreadEventStream是ThreadLocal方式,和当前线程绑定

//HystrixThreadEventStream.threadLocalStreamsprivate static final ThreadLocal
threadLocalStreams = new ThreadLocal
() { @Override protected HystrixThreadEventStream initialValue() { return new HystrixThreadEventStream(Thread.currentThread()); }};

executionDone代码如下

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {    HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);    writeOnlyCommandCompletionSubject.onNext(event);}

这里根据 executionResult, threadpoolkey,comandKey,生成 了一个HystrixCommandCompletion然后通过writeOnlyCommandCompletionSubject写入,writeOnlyCommandCompletionSubject整个东西,我们等会再看。现在思考下HystrixCommandCompletion是什么?HystrixCommandCompletion包含了 ExecutionResultHystrixRequestContext,它是一种HystrixEvent,标识着command执行完成的一个事件,该事件是当前这个点HystrixCommand的请求信息,执行结果,状态等数据的载体。

从上面类图可以看到不仅仅HystrixCommandCompletion一种还有其它的Event,这里就不一一介绍了。

writeOnlyCommandCompletionSubject onNext的时候会触发 writeCommandCompletionsToShardedStreams执行里面的call()方法。

private static final Action1
writeCommandCompletionsToShardedStreams = new Action1
() { @Override public void call(HystrixCommandCompletion commandCompletion) { HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); commandStream.write(commandCompletion); if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) { HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey()); threadPoolStream.write(commandCompletion); } }};

这个方法的意思是,会把HystrixCommandCompletion 通过HystrixCommandCompletionStream 写入,如果当前command使用的是线程池隔离策略的话 会通过 HystrixThreadPoolCompletionStream 再写一遍。HystrixCommandCompletionStream HystrixThreadPoolCompletionStream 他们两个概念类似,我们拿着前者解释,这个是个什么东西。

HystrixCommandCompletionStream 以commandKey为key,维护在内存中,调用它的write的方法实则是调用内部属性 writeOnlySubject的方法,writeOnlySubject是一个Subject(RxJava的东西),通过SerializedSubject保证其写入的顺序性,调用其share()方法获得一个Observable也就是readOnlyStream,让外界能够读这个Subject的数据。总结下Subject是连接两个Observable之间的桥梁,它有两个泛型元素标识着进出数据类型,全部都是HystrixCommandCompletion类型

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {        this.commandKey = commandKey;        this.writeOnlySubject = new SerializedSubject
(PublishSubject.
create()); this.readOnlyStream = writeOnlySubject.share(); }

我们从源头开始梳理,明白了这个HystrixCommandCompletion数据流是如何写入的(其它类型的的思路一致,就不一一解释了),那它是如何被搜集起来呢?

三、指标数据搜集

追溯至AbstractCommand初始化

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {    ........省略代码........    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);    ........省略代码........}

初始化command指标

HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {    super(null);    this.key = key;    this.group = commandGroup;    this.threadPoolKey = threadPoolKey;    this.properties = properties;    healthCountsStream = HealthCountsStream.getInstance(key, properties);    rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);    cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);    rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);    rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);    rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);}

有很多各种 XXXStream.getInstance(),这些Stream就是针对各类用途进行指标搜集,统计的具体实现,下面可以看下他们的UML类图

Hystrix几个别Stream类图(并非所有子类)

BucketedCounterStream实现了基本的桶计数器,BucketedCumulativeCounterStream基于父类实现了累计计数,BucketedRollingCounterStream基于父类实现了滑动窗口计数。两者的子类就是对特定指标的具体实现。

接下来分两块累计计数和滑动窗口计数,挑选其对应的CumulativeCommandEventCounterStream和HealthCountsStream进行详细说明。

3.1、BucketedCounterStream 基本桶的实现

数据采集示意图

protected BucketedCounterStream(final HystrixEventStream
inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2
appendRawEventToBucket) { this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1
, Observable
>() { @Override public Observable
call(Observable
eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List
emptyEventCountsToStart = new ArrayList
(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0
>() { @Override public Observable
call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) .flatMap(reduceBucketToSummary) .startWith(emptyEventCountsToStart); } });}

这里父类的构造方法主要成三个部分分别是

I. reduceBucketToSummary 每个桶如何计算聚合的数据

appendRawEventToBucket的实现由其子类决定,不过大同小异,我们自行拔下代码看下HealthCountsStream, 可以看到他用的是HystrixCommandMetrics.appendEventToBucket

public static final Func2
appendEventToBucket = new Func2
() { @Override public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) { ExecutionResult.EventCounts eventCounts = execution.getEventCounts(); for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here default: initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType); break; } } return initialCountArray; } };}

这个方法就是将一个桶时长内的数据进行累计计数相加。initialCountArray可以看出一个桶内前面的n个数据流的计算结果,数组的下标就是HystrixEventType 枚举里事件的下标值。

II. emptyEventCountsToStart 第一个桶的定义,装逼点叫创世桶

III. window窗口的定义,这里第一个参数就是每个桶的时长,第二个参数时间的单位。利用RxJava的window帮我们做聚合数据。

.window(bucketSizeInMs, TimeUnit.MILLISECONDS)

Bucket 时长如何计算

每个桶的时长如何得出的?这个也是基于我们的配置得出,拿HealthCountsStream举例子。
metrics.rollingStats.timeInMilliseconds 滑动窗口时长 默认10000ms
metrics.healthSnapshot.intervalInMilliseconds 检测健康状态的时间片,默认500ms 在这里对应一个bucket的时长

滑动窗口内桶的个数 = 滑动窗口时长 / bucket时长

而 CumulativeCommandEventCounterStream

metrics.rollingStats.timeInMilliseconds 滑动窗口时长 默认10000ms
metrics.rollingStats.numBuckets 滑动窗口要切的桶个数

bucket时长 = 滑动窗口时长 / 桶个数

不同职能的 XXXStream对应的算法和对应的配置也不一样,不过都一个套路,就不一一去展示了。

inputEventStream

inputEventStream 可以认为是窗口采集的数据流,这个数据流由其子类去传递,大致看了下

//HealthCountsStreamprivate HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,                               Func2
reduceCommandCompletion) { super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);}//RollingThreadPoolEventCounterStreamprivate RollingThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey, int numCounterBuckets, int counterBucketSizeInMs, Func2
reduceCommandCompletion, Func2
reduceBucket) { super(HystrixThreadPoolCompletionStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);}

我们发现这个 inputEventStream,其实就是 HystrixCommandCompletionStream、HystrixThreadPoolCompletionStream或者其它的,我们挑其中HystrixCommandCompletionStream看下,这个就是上面第二部分指标数据上传里讲的写数据那个stream,inputEventStream.observe()也就是 HystrixCommandCompletionStream的 readOnlyStreamSubject的只读Observable。(这里如果没明白可以回到第二点看下结尾的部分)

3.2、累计计数器之CumulativeCommandEventCounterStream

先看下累计计数器的父类BucketedCumulativeCounterStream

protected BucketedCumulativeCounterStream(HystrixEventStream
stream, int numBuckets, int bucketSizeInMs, Func2
reduceCommandCompletion, Func2
reduceBucket) { super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion); this.sourceStream = bucketedStream .scan(getEmptyOutputValue(), reduceBucket) .skip(numBuckets) ........省略代码........ }

bucketedStream就是3.1里的数据汇聚后的一个一个桶流,这里执行了scan方法,scan方法的意思就是会将当前窗口内已经提交的数据流进行按照顺序进行遍历并执行指定的function逻辑,scan里有两个参数第一个参数表示上一次执行function的结果,第二个参数就是每次遍历要执行的function,scan完毕后skip numBuckets 个bucket,可以认为丢弃掉已经计算过的bucket。

scan里的function是如何实现呢?它也是实现累计计数的关键,由子类实现,本小节也就是CumulativeCommandEventCounterStream来实现

CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);

发现调用的是 HystrixCommandMetrics.bucketAggregator,我们看下其函数体

public static final Func2
bucketAggregator = new Func2
() { @Override public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) { for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) { cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()]; } break; default: cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()]; break; } } return cumulativeEvents; }};

call() 方法有两个参数第一个参数指的之前的计算结果,第二个参数指的当前桶内的计数,方法体不难理解,就是对各个时间的count计数累加。

如此,一个command的计数就实现了,其它累计计数也雷同。

3.3、滑动窗口之HealthCountsStream

直接父类代码

protected BucketedRollingCounterStream(HystrixEventStream
stream, final int numBuckets, int bucketSizeInMs, final Func2
appendRawEventToBucket, final Func2
reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1
, Observable
> reduceWindowToSummary = new Func1
, Observable
>() { @Override public Observable call(Observable
window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream .window(numBuckets, 1) .flatMap(reduceWindowToSummary) ........省略代码........}

依然像累计计数器一样对父级的桶流数据进行操作,这里用的是window(),第一个参数表示桶的个数,第二个参数表示一次移动的个数。这里numBuckets就是我们的滑动窗口桶个数

滑动窗口

第一排我们可以认为是移动前的滑动窗口的数据,在执行完 flatMap里的function之后,滑动窗口向前移动一个桶位,那么 23 5 2 0 这个桶就被丢弃了,然后新进了最新的桶 45 6 2 0

那么每次滑动窗口内的数据是如何被处理呢?就是flatMap里的function做的,reduceWindowToSummary 最终被具体的子类stream实现,我们就研究下HealthCountsStream

private static final Func2
healthCheckAccumulator = new Func2
() { @Override public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) { return healthCounts.plus(bucketEventCounts); }};//HystrixCommandMetrics.HealthCounts#pluspublic HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount);}

方法的实现也显而易见,统计了当前滑动窗口内成功数、失败数、线程拒绝数,超时数.....

该stream的职责就是探测服务的可用性,也是Hystrix熔断器是否生效依赖的数据源。

四、回顾

Hystrix的滑动窗口设计相对于其它可能稍微偏难理解些,其主要原因还是因为我们对RxJava的了解不够,不过这不重要,只要耐心的多看几遍就没有什么问题。

本篇主要从指标数据上报到指标数据收集来逐步解开Hystrix指标搜集的神秘面纱。最后借用一大牛的图汇总下本篇的内容

参考文档


系列文章推荐

转载地址:http://yqrox.baihongyu.com/

你可能感兴趣的文章
从1G到5G,46年屏幕变迁下,富士康、苹果、三星、华为的浴火重生路 ...
查看>>
用flash测试你的ircd
查看>>
白话红黑树系列之二——红黑树的构建
查看>>
客户的一张表中出现重复数据,而该列由唯一键约束,重复值如何产生的呢?...
查看>>
MySQL5.6中新增特性、不推荐使用的功能以及废弃的功能
查看>>
OnePlus安装Kali-NetHunter
查看>>
JavaScript:Array 对象
查看>>
PDFCreator:一款免费,开源的PDF(Tiff,pcx,png,jpeg,bmp,PS,EPS)打印机(VB,GPL),并提供了COM接口,方便使用各种编程语言调用...
查看>>
Note 1773479 - SYB: Displaying multiple triggers per object
查看>>
联手云计算核心技术开发,BoCloud与中科院软件所战略合作
查看>>
2017年背景下的SSD选购技巧有哪些变化?
查看>>
2016年的数据存储和管理的成本将何去何从?
查看>>
Airpods 并非无用,而是苹果借助语音交互布局物联网的新“棋子”
查看>>
SQL中存储过程的创建和使用
查看>>
荷兰政府:保证不强制在任何产品中留有后门
查看>>
编写单元测试的10条理由
查看>>
LINUX-SAMBA服务配置
查看>>
图像处理------光束效果
查看>>
剑指offer 面试题6:重建二叉树
查看>>
基于ES5`defineProperty` 实现简单的 Mvvm框架
查看>>