高异步并发在前端大数据渲染场景是常遇到的问题,如何有效管理并发过程,解决资料合并与节流,消息中心等问题一直是该场景下常遇到的问题,但目前为止一直没有一套完善的解决方案。本文阐述了如何利用工作流模式搭配rxjs实践一套有效的流管理工具。工作流模式是2000年由W.M.P. van der Aalst等人提出的一种设计模式,该模式旨在透过清晰的定义各种流动的流场景之处理模式,来简化流程问题的复杂性,并达到流程可视化与自动化的目的。工作流模式可以分为控制流模式,资源模式,数据模式与异常处理模式四大方向,其中控制流模式决定了业务流程的流向与行为,资源模式阐述了业务流程过程中所使用到的资源分配行为,数据模式则讲述了数据在控制流当中的传递情况与使用,异常处理则描述了流异常的情况下终止流的规则与行为。本文中我们运用了rxjs作为实践工具。rxjs是一个基于观察者模式搭建异步序列的工具库,透过名为observer的观察对象发送消息给订阅者。本文中,我们结合了工作流模式与rxjs的pub/sub特性实现了控制流的并行,循环,条件,控制等特性,并透过observer的观察对象,来达成消息中心与异常处理流。解决了高频并发异步场景下的代码复杂问题,并达到流程可视化与自动化的特性。
介绍
工作流模式
工作流模式 (workflow pattern)是设计模式的一种,用于具有流程性质的场景。
工作流模式是一种设计模式,它描述了在工作流程当中对于流程的管控,资源的分配,数据管理与异常处理情况。以下我们将针对四个方向进行介绍。
控制流
控制流是工作流模式中的基础,它描述了如何将工作流程前后串接,并行与条件处理等等。透过将程序想像成河流流动的方式,我们可以很容易的管理不同场景下需要的流动行为,例如,不同的参数需要请求不同的接口,此时出现了条件判断,应该使用排他选择流 (Exclusive Choice),又例如我们希望接收到反应后(response),做一个节流的动作,则我们可以选择使用合并流蒐集反应,再接上节流阀。以下简单介绍几种常见的控制流模式:
- 序列流 (Sequence Flow)
序列流是最常见的一种控制流,它描述了顺序执行的流的特性,如下图:
当程序A执行完毕后,才能执行程序B,然后C,最后有一个终止程序。该流动当中不会出现循环或条件分流的情况。
- 并行流 (Parallel Flow)
当流动出现需要同时处理的情况时,可以选择使用并行流,并行流具有同时触发不同程序的特性,如下图:
- 排它选择流 (Exclusive Choice Flow)
当场景中出现了需要根据情况来流向不同河道的情况时,可以选择使用排它选择流。排它选择流描述了从n中选择1来执行的特性,如下图:
- 循环流 (Cycle Flow)
当场景中出现循环的情况时,如多次请求,数据分批处理等等,可以使用循环流模式。循环流模式具有河道逆流的特性,可以将结果回传到先前的某一程序当中并重新执行,如下图:
程序A顺序执行到C后,会逆流回A并重复执行,直到满足循环条件后终止循环然后进入程序D。程序D的传入参数取决于C的传出参数,若需要蒐集结果,可在C当中做一个闭包来缓存执行内容。
除了上述介绍了4种控制流模式,还有一些高级控制流模式,例如递归流 (Recursive Flow),多路合并流 (Multi Merge Flow),同步合并流 (Synchronize Merge Flow)等等。
资源模式
资源模式描述了在业务流程中,资源如何分配的规则。整個資源模式可以歸類成7大類,43種模式。资源分配亦为高频异步场景中值得探讨的一环,我们可以将資源比喻为线程,将资源模式套用到程序分配的过程当中。资源模式的规模与体系极大,涉及各种不同场景,本文重点还是在于流控制模式的实践与介绍,这里我们便不再展开,只简单介绍几个资源分配的模式以及线程的实现情况。
- 直接分配
直接分配是指在运行期间为活动指定特定的资源,如下图。
在开发过程中,我们会遇到上面两种场景。第一种场景是顺序执行的控制流,这类场景我们可以单纯的使用主线程来达成直接分配,因为每个程序都依赖上一个工作完成后才能执行,故主程序一次只执行一个程序。第二种场景是并行的情况,在这种场景中,我们可以分配给每个程序一个worker来执行程序。
- 推模式 - Single Resource模式
单一资源模式下,当活动确立后,会指派给一个资源,资源不一定要即时处理该活动,它可以选择延时或忽略。当工作项超时后,系统会重新分配该工作项给随机资源。
- 拉模式 - Resource-Initiated Allocation模式
拉模式与推模式相反,他的第一人称主体是资源,资源可以选取活动到队列当中以等待,同样地资源不必马上处理活动,整个情境如下图。
资源模式在worker 组当中占有相当重的份量。worker组是指创建数个worker来协助执行高频触发的事件,此时怎么分配程序给worker就成了一门重要的学问,在本文当中不再展开,有兴趣的朋友们可以期待我最近的文章,我将分享关于worker组的概念与实践。
数据模式
数据模式是用于规范系统运行当中数据的的交互行为。数据模式可以分为四大类,分别为可见性模式(Data Visibility)、交互模式(Data Interaction)、传递模式(Data Transfer)与基于数据的路由模式(Data-based Routing)。可见性模式定义了数据的可见性,例如在某个活动中可见,或是流程中可见等。对应到编成语言中,即为变量的作用域,如函数内可见的局部变量,整个范围可见的全局变量或是在工作流中可见的流程变量等等。交互模式定义了数据的交互行为,例如活动与活动间的交互,活动与子流程间的交互等。在编成语言中,可以转换为数据的交互行为,例如程序中变量经由函数后,与函数进行交互,并修改变量当前值,又例如插件系统中,插件直接对环境变量进行修改的行为等等。传递模式讲述了数据如何在活动间进行传递。转换后我们可以理解为变量的传递方式,例如在函数调用时,若传入变量为整数或文字等类型时,我们传递的是一个数据,编译器会在函数空间中腾出一个记忆体来摆放这个数据,若我们传递的是一个对象类型时,则传递的是一个引用,在函数过程中的修改都会直接反应到原实例当中。基于数据的路由模式是一个比较特殊的定义,它的定义范围只在流程当中,当变量能够影响流程的路由时,我们就定义此变量为基于数据的路由模式。数据模式在实作上并没有太多新鲜的东西,因为这是每个工程师都会有的基本常识,只是工作流模式中将其规范化罢了。本文中也只是在介绍中带过。
异常处理模式
在软件开发当中,我们将不在自己控制范围内因素所造成的问题和没有预料到的情况称为异常。工作流异常和软件开发里异常的概念一致,将流程实例执行过程中出现的问题和错误称为异常,异常情况发生时就会涉及资源分配与异常处理情况。工作流模式将流当中的异常分为五类:工作项执行失效、超时、资源不可用、外部触发与违反约束。
工作项执行失败是指工作项中所执行的工作不能继续执行或无法按照期望执行的情况。在程序上,引发工作项执行失败的原因有很多,例如请求失败、程序撰写错误、异常抛出等等。超时是指工作项没有在指定的时间内完成工作或是在指定的时间内开始工作等等。违反约束的情况包含流程流转的约束,如死循环、死锁等等,资源的约束以及业务约束等等。外部触发的情况通常指由工作流以外的程序所导致的异常情况,例如顶层程序强制要求工作流结束的命令,或是用户终止程序等等。资源不可用的情况发生在当资源不足无法支撑程序执行时,例如记忆体不足(out-of-memory, OOM),线程阻塞等等。
在流程实例级别的异常处理中,异常不仅会影响到与其关连的工作,同时还会影响到同一个流程实例里正在执行的其他工作,甚至是同一个流程实例里正在执行的工作,所以我们需要一个更高层级的处理逻辑来执行异常处理,可能的异常处理方式有三种:
- 继续执行(continue with case, CWC):当前流程实例中其他工作不受影响,流程实例继续执行。异常在工作项级别已经得到完全的处理。
- 当前流程实例中止执行(remove current case,RCC):当前流程实例中的部分或所有工作中止执行,如果所有工作都中止执行,那么即流程实例中止执行。异常所影响的范围限于当前流程实例内。
- 所有属于同一流程定义的流程实例都中止执行(remove all cases,RAC):属于同一流程定义的流程实例中的部分或所有工作中止执行,如果所有工作都中止执行,那么即所有流程实例都中止执行。异常影响所有属于同一流程定义的流程实例。
rxjs 概述
rxjs是一个用于处理异步情况的工具库,它使用了观察者模式来达成异步处理。以下是一个简单的rxjs异步调用的场景:
import { Observable } from 'rxjs';
const url = 'http://mock/api/category';
// 产生一个观察者对象
const observable = new Observable((observer) => {
const request = fetch(url);
request.then((response) => {
observer.next(response);
observer.complete();
}).catch((err) => {
observer.error(err);
});
});
// 注册一个订阅者
observable.subscribe({
next: (value) => console.info(value),
complete: () => console.log('complete'),
error: (err) => consolle.error(err),
});
范例中请求了一个get接口,当请求成功后,会将结果透过观察对象observer传递出去,然后完成一个观察流程,若请求失败,则会进入catch并由观察对象返回error信息。透过observer观察对象,可以很容易的完成异步操作以及发送给所有注册者。
在rxjs当中,也提供了管道(pipeline)的机制。它可以将一系列的观察者对象组合成一个管道,并将一系列动作拆解成许多的操作子(operators),透过管道传递的方式,将参数传递到操作子中进行工作。以下是一个简单的管道范例:
import { interval } from 'rxjs';
import { sample, take } from 'rxjs/operators';
const intervalCount = interval(1000);
const process = intervalCount.pipe(
sample(interval(2000)),
take(5),
map(value => value * 10),
);
process.subscribe(x => console.log(x));
范例中我们建立了一个运算流程。首先,我们生成了一个计时器,每1秒会触发下一个动作。接著,我们串接了sample方法,该方法会生成样本实例,里面又接了interval计时器,故它会每个2秒吐出一个值。下一个动作是take方法,入参为5,表示它共会取5个值。最后一个方法是map,它会将值进行映射,全部乘以10。整个pipeline的过程可以表示为弹珠图:
更多关于rxjs的介绍可以参考他们的官方文档(https://rxjs-dev.firebaseapp.com/guide/overview),有非常详尽的介绍。
高异步场景与其所面临的问题
前端的高异步场景常发生在高频的异步请求当中。例如目录树,流程图(Dag图),或是地图等等。因为请求数据量过大,若一次请求会对性能带来相当程度的考验。一般针对这样的场景,常见的作法是对数据进行切片请求,如google的gmail,或是百度的搜索页等等。前端会给后端一个页码以及请求的数据量作为参数,然后后端会根据该参数设置offset与limit来做查询最后把结果给到前端。但切片请求并不表示高频操作,例如在流程图与地图的场景当中,可以透过可视区范围来决定现在应该请求到哪一个切片片段然后再修改offest与limit并发送请求,这样的操作频率是很低的。但在特定场景中,我们期望一次性的获取所有数据,以达到前端搜索,定位等功能,这时候就有必要在短时间内获取全部数据,也就会造成所谓的高频异步请求。
地图的切片网格系统
流程图
高异步场景下会面临到许多的设计难点,如数据合并,异常处理机制,资源锁定等等,同时也会引发其他的边际效应问题,例如UI hang情况,内存控制等等,一般的设计模式很难处理这样的流式场景,对开发者的素质要求也会相对提高很多。此外,代码的复杂性也会提高很多,以下是一段高频异步场景的代码:
import drawer from './drawer';
function createWorker() {
const worker = new Worker('dataHandler.js');
return worker;
}
function createDB() {
return new Promise((resolve, reject) => {
const request = indexedDB.open("testDB");
request.onsuccess = function onsuccess(e) {
resolve(e.result);
};
request.onupgradeneeded = function onupgradeneeded(e) {
const db = e.target.result;
db.createObjectStore('list', { keyPath: 'id' });
resolve(db);
};
request.onerror = function onerror() {
reject(new Error('The db establish failed'));
};
});
}
function saveToDB(db, data) {
const transaction = db.transaction(['list'], 'readwrite');
const objectStore = transaction.objectStore('list');
objectStore.add(data);
}
function recursiveFetchListData(worker, total, offset = 0) {
fetch(`http://rest/example/list?offset=${offset}`).then(response) => {
worker.postMessage(response);
}).catch((e) => throw e);
if (total > offset) {
setTimeout(() => {
recursiveFetchListData(worker, total, offset);
}, 50);
}
}
function main() {
try {
let count = 0;
let result = [];
const worker = createWorker();
const db = await createDB();
const total = await fetch('http://rest/example/list/count');
worker.onmessage = (e) => {
const data = e.data;
saveToDB(db, data);
if (total === count) {
drawer(result);
} else {
result = result.concat(data);
count += 1;
}
};
recursiveFetchListData(worker, total);
} catch (e) {
throw e;
}
}
main();
上述代码中描述了一个简单的流式场景。首先我们透过createWorker方法创建一个worker线程来处理复杂的数距转换问题,接著使用createDB方法创建数据库模型以缓存这些被处理后的数据。我们请求了list/count接口以获取当前要轮询的次数,接著将结果传递到recursiveFetchListData方法当中。recursiveFetchListData方法会请求example/list接口来获取列表数据,并将数据透过worker.postMessage方法递给worker线程做数据转换。recursiveFetchListData方法会不停的进行递归直到offset大于或等于count为止。另一方面,在worker处理完成后,会透过worker.onmessage方法将结果回传,接著我们透过saveToDB方法将结果记录到indexedDB当中。同时,我们在这里做了一个节流阀,只有当整个请求完成后,即total === count的情况下,才会将完整数据递到drawer方法中进行绘制工作。
因为recursiveFetchListData方法中不阻塞下一个请求的开始,所以请求几乎是同步进行,为了避免过度高频导致主线程被一瞬间的请求返回结果占满,这里设置了一个setTimeout方法,让每个请求之间相隔50毫秒。
整个流式场景中我们并未考虑异常处理状况,如外部中断,网络异常重连,读写异常处理等等,若将这些状况加进来将使整个代码更加复杂,更加难以读懂与维护,因此,我们需要有一个模式来支撑这样复杂的高频异步情况。
控制流开发模式
鉴于高频异步场景的复杂性与开发难度,我们结合了rxjs与工作流模式开发了一套针对高频流式场景的前端开发模式工具:cabala-workflow。
运用rxjs的管道能力,配合它所提供的丰富的操作子 (operators)搭建出业务场景中需要的管道。然而,除了默认的操作子外,我们还需要更多自定义的操作子来满足业务场景。rxjs中提供了扩展操作子的能力,以下我们封装了自定义操作子的逻辑如下:
function createOperator(func: NFlow.TFlowOutput, flow: NFlow.TCreateFlowOutput):
NFlow.TCreateFlowOperatorOutput {
return (
(source: Observable<any>) => (
new Observable((observer) => {
return source.subscribe({
func(value, observer, flow);
},
error(e) {
observer.error(e);
},
complete() { observer.complete(); },
});
})
)
);
}
在上述代码中,我们定义了主要的封装函数createOperator。函数中共有两个传入参数func与flow,func为主要的业务逻辑函数,flow则为我们所封装的一个工作流实例。函数将返回另一个函数,这个函数的传入参数是source,即可观察对象的资源,透过这个参数我们可以对它进行注册(subscribe)以监听上一个工作结束的信号,并执行下一个func方法。
有了该容器后,我们便能封装异步逻辑并组装成各式各样的工作流。在我们的工具中提供了几种常见的控制流模式,分别为序列流、并行流、排它选择流、循环流与递归流,这里我们将一一进行介绍:
序列流
简单的使用范例如下:
import { createFlow, terminal } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const flow = createFlow();
flow.pipe([
operators.createContainer(),
operators.fetchLineData(),
operators.drawLine(),
terminal(),
]);
flow.subscribe(notification.create());
flow.start();
上图为rxjs当中常见的弹珠图,横轴表为时间流动,纵轴为程序触发顺序。图中有三个程序分别为createContainer,fetchLineData与drawLine,它们是顺序执行的,首先触发了createContainer方法,该方法会创建一个绘图容器,接著fetchLineData方法会获取绘图的资料,最后drawLine方法会去绘制线段到容器当中。createFlow是我们封装的一个方法,它能够创建一个流,并透过pipe方法来组装序列流。terminal方法为一个终止符,表示该流动已结束。
并行流
简单的使用范例如下:
import { createFlow, terminal, parallel } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const flow = createFlow();
flow.pipe([
operators.createContainer(),
parallel(
[operators.fetchLineData('line 1')],
[operators.fetchLineData('line 2')],
[operators.fetchLineData('line 3')],
),
operators.throttle(),
operators.drawLine(),
terminal(),
]);
flow.subscribe(notification.create());
flow.start();
范例中我们沿用了序列流的例子,只是稍作了改变。我们将请求数据的过程改为切片请求,透过parallel方法来同时发送请求,然后再透过节流阀(throttle)来合并结果,最后画出线段。parallel方法可以接受数个传参,表示要并行的所有子程序,每个子程序可以是一个数组表示一系列的工作串接,或是单一一个工作。
排它选择流
简单的使用范例如下:
import { createFlow, terminal, exclusive } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const flow = createFlow();
const condition = (value) => {
switch (value) {
case 'line':
return 0;
case 'circle':
default:
return 1;
}
};
flow.pipe([
operators.createContainer(),
exclusive(condition, [
[
fetchLineData(),
operators.drawLine(),
],
[
fetchCircleData(),
operators.drawCircle(),
],
]),
terminal(),
]);
flow.subscribe(notification.create());
flow.start('line');
exclusive方法接收两个传参,第一个传参是判断方法(condition),exclusive方法会在流经该处时调用condition方法来决定流应该流向哪个河道。第二个传参是河道,它是一个数组,其内包含可以流向的所有子程序,子程序可以是一个数组或是单一一个工作。范例中,在start时给定初始的输入参数line,在判断方法中返回了0,故exclusive会选择数组第0位作为河道。
循环流
简单的使用范例如下:
import { createFlow, terminal, circle } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const flow = createFlow();
const condition = (value) => value !== [];
flow.pipe([
operators.createContainer(),
circle(condition, [
fetchLineData(),
operators.drawLine(),
]),
terminal(),
]);
flow.subscribe(notification.create());
flow.start();
范例中circle方法共有两个传参,第一个传参是condition,用来判断是否满足循环条件,若回传为true则继续循环,否则退出循环; 第二个传参是要循环的子程序,它可以是一个数组表示一系列的工作,或是单一一个工作。在范例中,初始输入值为undefined,故条件判断condition会得到true,进而执行子程序。子程序中会执行获取线数据并进行绘制的动作。该循环会在获取的数据为空数组后结束。
递归流
简单的使用范例如下:
import { createFlow, terminal } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const flow = createFlow();
const fetchLineData = (totalCount) => (
(value, observer) => {
const pageNum = value ? value : 0;
const request = fetch(`/mock/api/category?pageNum=${pageNum}&limit=50`);
request.then((response) => {
observer.next(response);
});
if (totalCount >= pageNum) {
fetchLineData(totalCount)(pageNum + 1, observer);
}
}
);
flow.pipe([
operators.createContainer(),
fetchLineData(5),
operators.drawLine(),
terminal(),
]);
flow.subscribe(notification.create());
flow.start();
递归流运用了rxjs的特性天然地达成了递归流的实践,范例中我们实作了fetchLineData函数,当中会根据pageNum来决定是否继续进行递归,若pageNum小于等于totalCount就继续进行递归。递归的过程中会请求http://mock/api/category接口返回的数据,并在then当中调用observer.next发送结果给订阅者。如此便可实现异步递归的情景。
使用上述的五种工具,我们可以将它们互相结合达成更加复杂的流模式,如多路合并流,同步合并流等等。
控制流异常处理
本章节中我们将讲述在控制流当中常遇到的异常情况与处理方法。根据工作流模式,我们将异常情况定义成5类:
- 资源不可用
- 外部触发
- 违反约束
- 工作项执行失败
- 超时
资源不可用
资源不可用的情况一般发生在线程阻塞或是记忆体不足的情境下,在控制流模式中,若未对资源不可用的情况进行处理,可能导致流的堵塞,最后导致溃堤的情况发生。在前端工程当中,一般的UI rendering与JS代码都运行在同一个线程当中(主线程),为单线程模式。我们常希望将一些繁重的程序运行到另一个线程,透过worker来执行它们。若为主线程阻塞,目前前端并无有效的方法来防范或处理,因此暂不讨论,我们将主轴放在worker的阻塞情况来进行讨论。因为worker本身也是单线程模式执行,当worker中执行的程序进入死循环,或是逻辑过于复杂时,就有可能导致后续的工作进入worker时遭到阻塞。解决worker阻塞的问题我们想到了两种方案来进行异常处理:
方案一: 对worker加上时间限制
在worker上加上计时器,若onmessage在指定时间内未被调用,则强制执行worker.terminate()方法中断执行。这样的处理方式优点是线程一定会被释放,即时碰到死循环的情况,坏处是时间难以估算,可能会因为执行的环境,逻辑复杂度等因素影响运行时间。
方案二:worker当中另起worker来执行工作,可以启数个worker来缓冲工作量
在worker当中我们可以另外起worker来执行工作,因此,我们可以将指派给worker的工作在委派给其他worker执行,如此便能避免单一一个worker导致阻塞的情况发生,这样做的好处是不确定性降低,可以保证正常的工作被执行完毕,坏处是无法处理极端情况,例如死循环这样的场景; 另外,此方案也受到运行环境的影响,如电脑的核心数等。
两种方案各有优劣且相当明显,为了解决此问题,我们想出了一个新的方案,将两者结合在一起进行互补。我们的想法是这样的:
最终方案:对worker加上一个较长的时间限制,同时允许多个worker并行处理
方案中我们依然对worker加上了计时器,但计时器的触发时间较长,为5秒钟(此为我们的场景下最佳的等待时间,依据不同的场景可以进行调整),同时,我们也设置了数个worker来处理工作,减少运行时间过长的可能性。
当异常发生时,我们采用当前流程实例终止执行(RCC)方案,因为我们认为此类问题的影响范围只在同一个流当中,因次不需要终止其他正在运行的流。并且通常此类异常发生时,我们无法获得worker处理后的数据结果,因此无法继续往下一个工作前进。至于记忆体原因所导致的异常,我们认为这是代码层面的问题,且前端目前无有效的机制来避免或处理此类问题,故暂不讨论。
外部触发
外部触发发生在当顶层服务下达终止执行命令时,强制终止流继续流动的异常情况。在控制流开发模式中,我们提供了一套外部触发机制来终止流的运行。它的使用方式如下:
import { interrupt, createFlow, middleware } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const clearFlow = createFlow();
const drawLineFlow = createFlow();
function interruptDrawLine(flowId) {
return (
async (value, observer) => {
try {
await interrupt(flowId);
observer.next(value);
} catch (e) {
observer.error(e);
}
}
);
}
clearFlow.pipe(
interruptDrawLine(drawLineFlow.id),
operators.clearContainer(),
);
drawLineFlow.pipe(
operators.createContainer(),
operators.fetchLineData(),
operators.drawLine(),
);
clearFlow.subscribe(notification.create());
drawLineFlow.subscribe(notification.create());
drawLineFlow.start();
setTimeout(() => {
clearFlow.start();
}, 500);
上述范例中,我们创建了两个流,分别为drawLineFlow与clearFlow。drawLineFlow主要的目的是在容器中画线,clearFlow则是将画板清空。我们先运行了drawLineFlow,因为drawLineFlow当中会异步的去请求数据,所以它不会马上完成,接著我们运行了clearFlow将画板清空。
因为drawLineFlow与clearFlow两个流的动作是冲突的。实际场景中,drawLineFlow可能在页面加载过程中被执行,同时,在加载过程中用户可能就先点击了清除按钮。若我们不将drawLineFlow的流中断掉,那么用户就会发现它点击的清除动作可能无效,因为drawLineFlow还会继续执行。
这里我们透过interrrupt方法终止了drawLine流动。interrupt方法有一个传入参数id,它可以透过流的id找到对应的流并中断掉正在执行的流动。id参数可传可不传,若不传递时则默认会将所有正在运行的流终止掉。
实践上我们运用了rxjs的串接特性,在工作跟工作之间接入了中间层,并在中间层当中监控是否需要继续往下流动。以下是一个简易的实践:
function createOperator(func, flow) {
return (
(source) => (
new Observable((observer) => {
return source.subscribe({
next(value) {
if (flow._dispose === DISPOSE_CONDITION.RUNNING) {
flow._dispose = DISPOSE_CONDITION.FINISH;
observer.complete();
} else {
func(value, observer, flow);
}
},
error(e) {
if (flow._dispose === DISPOSE_CONDITION.RUNNING) {
flow._dispose = DISPOSE_CONDITION.FINISH;
}
observer.error(e);
},
complete() { observer.complete(); },
});
})
)
);
}
上述代码中,我们在自定义操作子的封装逻辑中添加了一些判断逻辑。首先,在next方法中,当_dispose参数等于DISPOSE_CONDITION.RUNNING时,我们会直接将_dispose设为完成状态,并将流直接导向为complete状态。在error方法中,我们同样去判断_dispose参数,若为RUNNING时将其设为完成状态。
_dispose参数有三种状态,NONE、RUNNING与FINISH。NONE表示该流处于正常操作中未被外部中断,RUNNING表示流遭遇异常状况,已被外部中断,FINISH表示流的中断程序已经执行完成。会区分这三种状态的原因是因为当我们遭遇外部中断时,流可能还在某一个工作中尚未到达中间层,因此我们需要先设一个标签告诉流当它进入中间层后应该终止流动。
违反约束
控制流当中违反约束的情况常来自于对工作的定义错误或约束条件不正确,例如在工作当中设定了循环却无法满足跳出循环的条件而导致死循环,又例如工作与工作之间的参数传递错误等。这类错误无法从模式方面矫正,只能依赖撰写者修正错误的逻辑代码。
工作项执行失败
工作项执行失败的情况常发生在工作项中未预期到的错误发生,例如请求后端接口失败,或是转译型别出错等等。在控制流模式中,我们可以透过rxjs的特性来处理工作项执行失败的情况,以下是一个简单的范例:
import { createFlow, terminal, circle } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
function fetchLineData() {
return (
async (value, observer) => {
try {
const response = await fetch('http://mock/api/category');
observer.next(response);
} catch (e) {
observer.error(e);
}
}
);
}
const flow = createFlow();
flow.pipe(
operators.createContainer(),
fetchLineData(),
operators.drawLine(),
);
flow.subscribe(notification.create());
flow.start();
范例中,我们定义了fetchLineData方法,方法中我们会去后端获取数据,并透过observer.next将结果传递到下一个工作当中。若后端接口返回失败时,我们透过try catch方法拦截错误,并将错误情况透过observer.error方法终止流动,并由消息中心将结果抛出来。透过这样的方式,我们便能处理工作项执行失败的情况。
超时
超时指流未在指定时间内完成时发生。在前端场景中,我们很少对流设定超时条件,因为超时的定义相当难以界定,它可能会受到物理环境影响,如用户的电脑好坏、或是CPU占用程度等,也可能受到传输速度的影响,例如网路条件或服务器条件等。但在一些较特殊场景中,我们为了提升性能,在确保没问题的情况下,我们才可能设定一个保守的超时时间。以下我们举一个简单的例子说明如何在控制流模式中设定超时时间。
import { createFlow, terminal, circle } from '@alife/cabala-workflow';
import * as notification from '@alife/cabala-notification';
import * as operators from './operators';
const flow = createFlow();
flow.pipe(
operators.createContainer(),
operators.fetchLineData(),
operators.drawLine(),
);
flow.setTimeout(5000);
flow.start()
上述范例中,我们透过flow.setTimeout方法指定超时时间为5秒钟。它的时间会从start方法执行后开始计算,若超过5秒钟时,flow当中会启动外部触发机制,将流强制中断掉,并返回错误信息。
总结
前端的高频异步场景常发生在高频的异步请求当中。例如目录树,流程图(Dag图),或是地图等等。該場景中会面临到许多的设计难点,如数据合并,异常处理机制,资源锁定等等。鉴于此场景的复杂性与开发难度,我们结合了rxjs与工作流模式开发了一套针对高频流式场景的前端控制流模式开发工具:cabala-workflow。工具中提供了序列流、并行流、排它选择流、循环流与递归流的API实作,同时,我们可以将它们互相结合达成更加复杂的流模式,如多路合并流,同步合并流等。
工作流流程实例执行过程中出现的问题和错误称为异常,异常情况发生时就会涉及资源分配与异常处理情况。工作流模式将流当中的异常分为五类:工作项执行失效、超时、资源不可用、外部触发与违反约束。资源不可用的情况我们提出了「对worker加上一个较长的时间限制,同时允许多个worker并行处理」的作法来解决高频工作进入worker时遭到阻塞的问题; 在控制流开发模式中,我们提供了一套外部触发机制来终止流的运行,使用者可以透过interrrupt方法从外部停止流继续执行; 违反约束的情况我们认为无法从模式方面矫正,只能依赖撰写者修正错误的逻辑代码; 工作项执行失败的情况我们可以透过rxjs的特性来处理,透过try catch方法拦截错误,并将错误情况透过observer.error方法终止流动,并由消息中心将结果抛出来; 在前端场景中,我们很少对流设定超时条件,但在一些较特殊场景中,我们为了提升性能,在确保没问题的情况下,我们才可能设定一个保守的超时时间,透过cabala-workflow提供的setTimeout方法可以启动外部触发机制,将超时的流强制中断掉。
参考文献
- http://www.workflowpatterns.com/documentation/documents/Chinese-Book-WF-Patterns-Appendices.pdf
- http://www.workflowpatterns.com/
- https://www.cnblogs.com/wuhong/archive/2010/12/01/1890830.html
- http://www.padsweb.rwth-aachen.de/wvdaalst/publications/p159.pdf
- https://derickbailey.com/2015/08/07/making-workflow-explicit-in-javascript/
- https://www.sciencedirect.com/topics/computer-science/workflow-pattern
- https://docs.aws.amazon.com/amazonswf/latest/awsrbflowguide/programming-workflow-patterns.html
- https://rxjs-dev.firebaseapp.com/
未完待续
- 卡巴拉树开发秘辛:前端缓存方案优化
- 卡巴拉树开发秘辛:工作组模式
- 卡巴拉树开发秘辛:DataStudio 树的实作全公开
- 卡巴拉树开发秘辛:关于未来