1.1.1. 调用流程
调用 start 后,会先初始化 Orchestrator 本身,调用函数:
await this._orchestrator.initialize();
在 Orchestrator 类中的 initialize 方法中,会先启动 Orchestrator 服务本身,调用:
await this.startService(KnownServices.orchestrator);
然后初始化 JobHandler:
await Promise.all(
[...this._jobHandlers.values()].map(async (x) => x.initialize())
);
在 initialize 方法中,首先会调用一下父类中的 initialize 方法:
await super.initialize();
public async initialize(): Promise<void> {
this._isShutdown = false;
this._jobLauncher.addJobCompletionListener(
this.jobType,
this.jobCompletionListener
);
}
然后为相应的 job type 添加j JobCompletionListener,event 名为 JobCompleted(实际上 JobCompletionListener 是继承于 EventEmitter)。
接着会调用 JobLauncher 中的 queryExistingJobs 方法(继承与 JobLauncher 的有 ChildProcessLauncher 和 K8sJobLauncher),实际上做的事情是拿到存在的 job。
const existingJobs = await this._jobLauncher.queryExistingJobs(
this.jobType
);
private readonly _runningJobs = new Map<string, ChildProcessRunningJob>();
// ...
public async queryExistingJobs(
jobType: string
): Promise<ChildProcessRunningJob[]> {
return [...this._runningJobs.values()].filter(
(job) => job.runnableType === jobType
);
}
_runningJobs 是一个 value 是 ChildProcessRunningJob 接口类型的 Map,ChildProcessRunningJob 的结构可如下所示:
{
instanceId: string;
runnableType: string;
category: RunnableCategory; // "Job" or "Service" or "ExternalService"
confirmationId?: unknown;
proc?: ChildProcess;
}
然后从 existingJobs 中过滤出 job 类型是 GenericIModelJob 类型(继承 GenericJob 并实现 IModelJob,多了一个 iModelId 的属性)的 job。然后调用 addPreloadedJob。
existingJobs
.filter((x): x is GenericIModelJob => x instanceof GenericIModelJob)
.forEach((job) => this._iModelJobsRegistry.addPreloadedJob(job));
_iModelJobsRegistry 类型是 IModelJobsRegistry,调用 addPreloadedJob 实际上做了:
private readonly _jobsContexts = new Map<string, IModelJobsContext<TJob>>();
// ...
public addPreloadedJob(job: TJob): void {
this._jobsContexts.get(job.iModelId)?.jobs.add(job);
}
_jobsContexts 是一个 key 类型为 string(存 iModelId),value 类型为 IModelJobsContext 的 Map。它的 job 属性类型为 RunnableRegistry,调用其中的 add 方法,实际上做了:
private _runnables = new Map<string, Map<string, T>>();
// ...
public add(runnable: T): void {
let runnableInstances = this._runnables.get(runnable.runnableType);
if (!runnableInstances) {
runnableInstances = new Map<string, T>();
this._runnables.set(runnable.runnableType, runnableInstances);
}
if (runnableInstances.get(runnable.instanceId))
throw new Error(
`Runnable ${runnable.runnableType} with id ${runnable.instanceId} already exists`
);
runnableInstances.set(runnable.instanceId, runnable);
}
_runnables 是一个 key 存 runnableType、value 存 Map 的 Map,其中 value 中的 key 保存 instanceId,value 保存类型为 GenericIModelJob。
容器中的 JobHandler 类型中还存在另一种 JobHandler,它是 CheckpointSchedulerJobHandler,调用它的 initialize 实际上调用的是父类 JobHandler 的 initialize 方法(同上)。
至此,JobHandler 的初始化工作已经完成,Orchestrator 类中的 initialize 方法结束。
接下来会进行服务的初始化:
await this.initializeServices();
在 initializeServices 方法中,首先会通过配置文件(server.config.json)中的配置信息,检查 Messaging 相关的配置是否配置好:
{
"default": {
// ...
"features": {
"messaging": true,
"events": false,
"iModelJsBackends": true
},
// ...
},
// ...
}
如果配置好,则会启动 Messaging 服务,接着会相继启动 Licensing、iModelManager 和 Gateway 服务,启动完成后,就会出现 bank console 中的 log,但不会包含 Messaging 服务启动的 log:
启动服务调用的是这个方法:
await this._orchestrator.startService(KnownServices.licensing) // 如启动 licensing 服务
在调用初期,会先检查 _services 中是否存在此 serviceType 的服务,__services 类型为:
this._services.has(serviceType, bootstrapOptions?.instanceId)
protected readonly _services: RunnableRegistry<GenericService>;
实际上是调用了 RunnableRegistry 类中的 get 方法,从 _runnables 拿值。_runnables 是一个 key 为 string(存 runnableType)、value 为 Map 的 Map。value 的 Map 的 key 存 instanceId,value 存 runnable 类的东西。
如果在 _services 中存在 serviceType 的服务,则调用 findService:
if (this._services.has(serviceType, bootstrapOptions?.instanceId))
return this.findService(
serviceType,
bootstrapOptions?.instanceId
) as Promise<GenericService>;
如果 _services 中不存在 serviceType 的服务,则会:
if (!(await this._boostrapingValidator.isValid(serviceType, bootstrapOptions)))
throw new ResourceDoesNotExistError(
"Starting service is not possible, because dependent resources do not exist",
""
);
在 isValid 方法中,会通过不同的 serviceType 判断其依赖的资源是否存在。
之后,通过 serviceType 调用:
const serviceContextProvider = this._serviceContextProviders.get(
serviceType
);
private readonly _serviceContextProviders = new Map<
string,
ServiceContextProvider<RunnableBootstrapOptions>
>();
_serviceContextProviders 的 value 的类型为 ServiceContextProvider,它是一个抽象类,由 SingletonServiceContextProvider 继承。
接下来会通过 bootstrapOptions 调用 serviceContextProvider 的 createBootstrapper 方法,实际上调用的是
SingletonServiceContextProvider 类中的 createBootstrapper,返回 _bootstrapper:
export class SingletonServiceContextProvider<
T extends RunnableBootstrapOptions
> extends ServiceContextProvider<T> {
public readonly isSingleton = true;
public constructor(
public readonly runnableType: string,
private readonly _bootstrapper: RunnableBootstrapper
) {
super();
}
public createBootstrapper(_options?: T): RunnableBootstrapper {
return this._bootstrapper;
}
}
而 this._bootstrapper 是在初始化依赖容器的时候进行初始化的:
container
.bind(CommonOrchestrationTypes.serviceContextProvider)
.toDynamicValue((context: interfaces.Context) => {
const bootstrappers = context.container.getAll<BaseServiceBootstrapper>(
Types.singletonServiceBootstrapper
);
const result: Array<SingletonServiceContextProvider<any>> = [];
for (const bootstrapper of bootstrappers) {
result.push(
new SingletonServiceContextProvider<RunnableBootstrapOptions>(
bootstrapper.serviceName,
bootstrapper
)
);
}
return result;
})
.inSingletonScope();
在容器中绑定 Types.singletonServiceBootstrapper 名字的有三个类,分别是 GatewayBootstrapper、IModelManagerBootstrapper 和 LicensingBootstrapper,通过 bootstrapper.serviceName 和 bootstrapper 本身进行 SingletonServiceContextProvider 类的初始化。
拿到 bootstrapper 之后,接着调用:
const descriptor = await this._descriptorFactory.createServiceDescriptor(
serviceType,
instanceId,
bootstrapper,
bootstrapOptions
);
protected readonly _descriptorFactory: RunnableDescriptorFactory
public async createServiceDescriptor(
serviceType: string,
instanceId?: string,
bootstrapper?: RunnableBootstrapper,
bootstrapOptions?: unknown
): Promise<RunnableDescriptor<GenericServiceMetadata>> {
const serviceCategory = this.isServiceExternal(serviceType)
? RunnableCategory.ExternalService
: RunnableCategory.Service;
return this.createRunnableDescriptor<GenericServiceMetadata>(
serviceCategory,
serviceType,
instanceId,
bootstrapper,
bootstrapOptions
);
}
在 createServiceDescriptor 方法中,首先调用子类实现的 isServiceExternal (是否外部服务) 确定服务的类型(实现 RunnableDescriptorFactory 类的子类有 LocalhostRunnableDescriptorFactory 和 K8sRunnableDescriptorFactory)。如果是调用的 LocalhostRunnableDescriptorFactory 的 isServiceExternal 方法,则 serviceType 为 orchestrator 或 messaging 的都返回 true,否则返回 false;如果是调用的 K8sRunnableDescriptorFactory 的 isServiceExternal 方法,则 serviceType 为 orchestrator、messaging、gateway、iModelManager 和 licensing 的都返回 true,否则返回 false。如果是外部服务,则 serviceCategory 为 "ExternalService",否则为 “Service”。
接着,调用 createRunnableDescriptor 方法。首先通过 runnableType(serviceType)导入服务的相关配置,然后构造 requiredArgs(可理解为必要的参数):
const requiredArgs = [
...new Set<string>([
...this.getOverriddenCommandLineArguments(
runnableCategory,
runnableType,
instanceId
),
...(bootstrapper?.requiredArgs ?? []),
]),
];
getOverriddenCommandLineArguments 方法是通过 runnableCategory 类型拿到如 protocol、hostname、port、或者空。
接着,构造 optionalArgs(可理解为可选参数),最后过滤掉 requiredArgs 中存在的参数:
let optionalArgs = [
...new Set<string>([
...this.getOverriddenOptionalArguments(runnableCategory),
...(bootstrapper?.requiredArgs ?? []),
]),
];
optionalArgs = optionalArgs.filter((arg) => !requiredArgs.includes(arg));
接着,构造 metadataArgs(可理解为元数据参数):
const metadataArgs = [Args.instanceId].filter(
(arg) => !requiredArgs.includes(arg) && !optionalArgs.includes(arg)
);
然后,通过 resolveMetadataValues 方法,把必要、可选和元数据参数以 key value 的形式存入 metadata 对象中:
const metadata: any = bootstrapOptions ?? {};
await this.resolveMetadataValues(
requiredArgs, // optionalArgs or metadataArgs
metadata,
runnableCategory,
runnableType,
config,
instanceId,
bootstrapper
);
然后,通过刚刚构造好的 metadata,对比 requiredArgs 和 metadataArgs,找到未处理好的参数,如有则抛出错误:
const unresolvedRequiredArgs: string[] = [];
this.addUnresolvedArgs(metadata, requiredArgs, unresolvedRequiredArgs);
this.addUnresolvedArgs(metadata, metadataArgs, unresolvedRequiredArgs);
if (unresolvedRequiredArgs.length)
throw new MissingRequiredPropertiesError(unresolvedRequiredArgs);
接下来,执行:
const args: string[] = [];
this.composeCommandLineArguments(metadata, requiredArgs, args);
this.composeCommandLineArguments(metadata, optionalArgs, args);
private composeCommandLineArguments(
metadata: { [id: string]: string },
argNames: string[],
args: string[]
) {
for (const argName of argNames) {
const value = metadata[argName];
if (value) {
if (argName === Args.entrypoint) args.unshift(value);
else args.push(`--${argName}`, String(value));
}
}
}
将参数名以及参数数值以命令行的形式插入 args 数组。最后,返回一个 RunnableDescriptor,其中包含了该服务的可运行类别(Job、Service、ExternalService)、可运行类型(gateway、licensing等)以及其运行所需要的的参数:
return new RunnableDescriptor<T>(
runnableCategory,
runnableType,
args, // 数组,以命令行的形式保存了参数名以及参数值
metadata // 以 key value 的形式存储必要、可选和元数据参数
);
export class RunnableDescriptor<T extends { [key: string]: any }> {
public constructor(
public readonly category: RunnableCategory,
public readonly runnableType: string,
public readonly args: string[],
public readonly metadata: T
) {}
}
最终把这样一个 RunnableDescriptor 返回给 descriptor:
const descriptor = await this._descriptorFactory.createServiceDescriptor(
serviceType,
instanceId,
bootstrapper,
bootstrapOptions
);
接着,继续调用 Orchestrator 类中的方法:
const startedServer = await this.startServiceInternal(descriptor);
startServiceInternal 方法在 Orchesrtrator 类中是抽象方法,有两个类继承于它,一个是 ChildProcessOrchestrator,另一个是K8sOrchestrator,根据不同调用不同的 startServiceInternal 方法。如果调用的是 ChildProcessOrchestrator 类中的 startServiceInternal 方法。
在 startServiceInternal 方法中,首先会判断服务是否是 ExternalService:
if (descriptor.category === RunnableCategory.ExternalService) {
const externalService = new ExternalService(
descriptor.runnableType,
descriptor.metadata.instanceId,
descriptor.metadata.hostname,
descriptor.metadata.port,
descriptor.metadata.protocol,
descriptor.metadata
);
this._services.add(externalService);
return externalService;
}
如果是,则构造一个 ExternalService 并添加到 _services(RunnableRegistry 类型)中,返回该 ExternalService 类的实例。
如果不是,则调用 ChildProcessLauncher 类中的 startProcess 方法:
const running = this._processLauncher.startProcess(descriptor);
在 startProcess 方法中,将传入参数构造一个 ChildProcessRunningService 类并通过之前配置好的 args,调用
child_process 包中的 spawn 方法开启子进程执行程序:
running.proc = child_process.spawn("node", cmdargs);
running.proc 代表了执行中的子进程。
开启子进程之后,执行对子进程的 stdout 和 stderr 这两个 Readable Stream 的“监听”,获取其中的输出数据。之后便返回构造好的 ChildProcessRunningService 类实例,并把它添加到 _services 中,最后返回 ChildProcessRunningService。
最后,所有返回的 running service 都 push 到 _orchestrator 中。至此,initializeServices 方法执行完毕。
接下来是设置 Fastify 服务器,并设置路由及其 handler,设置的主要路由有:
GET http://orchestrator.url/service/:serviceType/:instanceId
GET http://orchestrator.url/service/:serviceType
POST http://orchestrator.url/service/:serviceType
DELETE http://orchestrator.url/service/:serviceType/:instanceId
POST http://orchestrator.url/job/:jobType
POST http://orchestrator.url/shutdown
接收来自 Orchestrator Client 发过来的请求并调用相应的 handler 去处理。例如,Orchestrator Client 请求 iModel Manager 服务的地址,则会发送 POST 请求 (http://orchestrator.url/service/:serviceType),服务器接收到请求之后,会调用 handler 处理器中的 findServices 方法返回相应的服务(存入result),其中包含该服务的 URL,使得 Gateway 可将请求转发至该服务上:
this._server.get(
"/service/:serviceType",
serviceCollectionResultOptions,
this.getServiceQueryHandler()
);
设置好路由之后,便调用 listen 方法监听端口。
最后,执行:
await this._orchestrator.launch();
public async launch(): Promise<void> {
const tasks: Array<Promise<void>> = [];
for (const jobHandler of this._jobHandlers.values()) {
tasks.push(jobHandler.launch());
}
await Promise.all(tasks);
if (this._config.jobs?.[KnownJobs.checkpointScheduler]) {
const request: JobRequest<JobBootstrapOptions> = {
runnableType: KnownJobs.checkpointScheduler,
options: {},
};
await this.enqueueJob(request);
}
}
对于 job handler,容器中只注入过两种:CheckpointJobHandler 和 CheckpointSchedulerJobHandler。CheckpointSchedulerJobHandler 的 launch 方法调用父类的方法,没有做什么实质操作。而 CheckpointJobHandler 的 launch 方法调用的是它继承的 IModelJobHandler 中的方法:
public async launch(): Promise<void> {
await super.launch(); // 调用父类的方法,没有做什么实质操作
await this.scheduleQueuedJobs();
}
之后调用的 scheduleQueuedJobs 跟 messaging queue 是相关的。
在容器绑定依赖的过程中,如果在配置文件中有 messaging 相关的配置信息,则会绑定 RabbitMQMessagingClient 至容器中。