1.1.1. 调用流程

调用 start 后,会先初始化 Orchestrator 本身,调用函数:

await this._orchestrator.initialize();

在 Orchestrator 类中的 initialize 方法中,会先启动 Orchestrator 服务本身,调用:

await this.startService(KnownServices.orchestrator);

然后初始化 JobHandler:

await Promise.all(
      [...this._jobHandlers.values()].map(async (x) => x.initialize())
    );

在 initialize 方法中,首先会调用一下父类中的 initialize 方法:

await super.initialize();
public async initialize(): Promise<void> {
    this._isShutdown = false;

    this._jobLauncher.addJobCompletionListener(
      this.jobType,
      this.jobCompletionListener
    );
  }

然后为相应的 job type 添加j JobCompletionListener,event 名为 JobCompleted(实际上 JobCompletionListener 是继承于 EventEmitter)。

接着会调用 JobLauncher 中的 queryExistingJobs 方法(继承与 JobLauncher 的有 ChildProcessLauncher 和 K8sJobLauncher),实际上做的事情是拿到存在的 job。

const existingJobs = await this._jobLauncher.queryExistingJobs(
      this.jobType
    );
private readonly _runningJobs = new Map<string, ChildProcessRunningJob>();

// ...

public async queryExistingJobs(
    jobType: string
  ): Promise<ChildProcessRunningJob[]> {
    return [...this._runningJobs.values()].filter(
      (job) => job.runnableType === jobType
    );
  }

_runningJobs 是一个 value 是 ChildProcessRunningJob 接口类型的 Map,ChildProcessRunningJob 的结构可如下所示:

{
  instanceId: string;
  runnableType: string;
  category: RunnableCategory;    // "Job" or "Service" or "ExternalService"
  confirmationId?: unknown;
  proc?: ChildProcess;
}

然后从 existingJobs 中过滤出 job 类型是 GenericIModelJob 类型(继承 GenericJob 并实现 IModelJob,多了一个 iModelId 的属性)的 job。然后调用 addPreloadedJob。

existingJobs
  .filter((x): x is GenericIModelJob => x instanceof GenericIModelJob)
  .forEach((job) => this._iModelJobsRegistry.addPreloadedJob(job));

_iModelJobsRegistry 类型是 IModelJobsRegistry,调用 addPreloadedJob 实际上做了:

private readonly _jobsContexts = new Map<string, IModelJobsContext<TJob>>();

// ...

public addPreloadedJob(job: TJob): void {
    this._jobsContexts.get(job.iModelId)?.jobs.add(job);
  }

_jobsContexts 是一个 key 类型为 string(存 iModelId),value 类型为 IModelJobsContext 的 Map。它的 job 属性类型为 RunnableRegistry,调用其中的 add 方法,实际上做了:

private _runnables = new Map<string, Map<string, T>>();

// ...

public add(runnable: T): void {
    let runnableInstances = this._runnables.get(runnable.runnableType);
    if (!runnableInstances) {
      runnableInstances = new Map<string, T>();
      this._runnables.set(runnable.runnableType, runnableInstances);
    }
    if (runnableInstances.get(runnable.instanceId))
      throw new Error(
        `Runnable ${runnable.runnableType} with id ${runnable.instanceId} already exists`
      );
    runnableInstances.set(runnable.instanceId, runnable);
  }

_runnables 是一个 key 存 runnableType、value 存 Map 的 Map,其中 value 中的 key 保存 instanceId,value 保存类型为 GenericIModelJob。

容器中的 JobHandler 类型中还存在另一种 JobHandler,它是 CheckpointSchedulerJobHandler,调用它的 initialize 实际上调用的是父类 JobHandler 的 initialize 方法(同上)。

至此,JobHandler 的初始化工作已经完成,Orchestrator 类中的 initialize 方法结束。

接下来会进行服务的初始化:

await this.initializeServices();

在 initializeServices 方法中,首先会通过配置文件(server.config.json)中的配置信息,检查 Messaging 相关的配置是否配置好:

{
  "default": {
    // ...
    "features": {
      "messaging": true,
      "events": false,
      "iModelJsBackends": true
    },
    // ...
  },
    // ...
}

如果配置好,则会启动 Messaging 服务,接着会相继启动 Licensing、iModelManager 和 Gateway 服务,启动完成后,就会出现 bank console 中的 log,但不会包含 Messaging 服务启动的 log:

启动服务调用的是这个方法:

await this._orchestrator.startService(KnownServices.licensing) // 如启动 licensing 服务

在调用初期,会先检查 _services 中是否存在此 serviceType 的服务,__services 类型为:

this._services.has(serviceType, bootstrapOptions?.instanceId)
protected readonly _services: RunnableRegistry<GenericService>;

实际上是调用了 RunnableRegistry 类中的 get 方法,从 _runnables 拿值。_runnables 是一个 key 为 string(存 runnableType)、value 为 Map 的 Map。value 的 Map 的 key 存 instanceId,value 存 runnable 类的东西。

如果在 _services 中存在 serviceType 的服务,则调用 findService:

if (this._services.has(serviceType, bootstrapOptions?.instanceId))
    return this.findService(
    serviceType,
    bootstrapOptions?.instanceId
   ) as Promise<GenericService>;

如果 _services 中不存在 serviceType 的服务,则会:

if (!(await this._boostrapingValidator.isValid(serviceType, bootstrapOptions)))
  throw new ResourceDoesNotExistError(
    "Starting service is not possible, because dependent resources do not exist",
    ""
  );

在 isValid 方法中,会通过不同的 serviceType 判断其依赖的资源是否存在。

之后,通过 serviceType 调用:

const serviceContextProvider = this._serviceContextProviders.get(
  serviceType
);
private readonly _serviceContextProviders = new Map<
    string,
    ServiceContextProvider<RunnableBootstrapOptions>
>();

_serviceContextProviders 的 value 的类型为 ServiceContextProvider,它是一个抽象类,由 SingletonServiceContextProvider 继承。

接下来会通过 bootstrapOptions 调用 serviceContextProvider 的 createBootstrapper 方法,实际上调用的是

SingletonServiceContextProvider 类中的 createBootstrapper,返回 _bootstrapper:

export class SingletonServiceContextProvider<
  T extends RunnableBootstrapOptions
> extends ServiceContextProvider<T> {
  public readonly isSingleton = true;

  public constructor(
    public readonly runnableType: string,
    private readonly _bootstrapper: RunnableBootstrapper
  ) {
    super();
  }

  public createBootstrapper(_options?: T): RunnableBootstrapper {
    return this._bootstrapper;
  }
}

而 this._bootstrapper 是在初始化依赖容器的时候进行初始化的:

container
    .bind(CommonOrchestrationTypes.serviceContextProvider)
    .toDynamicValue((context: interfaces.Context) => {
      const bootstrappers = context.container.getAll<BaseServiceBootstrapper>(
        Types.singletonServiceBootstrapper
      );

      const result: Array<SingletonServiceContextProvider<any>> = [];
      for (const bootstrapper of bootstrappers) {
        result.push(
          new SingletonServiceContextProvider<RunnableBootstrapOptions>(
            bootstrapper.serviceName,
            bootstrapper
          )
        );
      }
      return result;
    })
    .inSingletonScope();

在容器中绑定 Types.singletonServiceBootstrapper 名字的有三个类,分别是 GatewayBootstrapper、IModelManagerBootstrapper 和 LicensingBootstrapper,通过 bootstrapper.serviceName 和 bootstrapper 本身进行 SingletonServiceContextProvider 类的初始化。

拿到 bootstrapper 之后,接着调用:

const descriptor = await this._descriptorFactory.createServiceDescriptor(
  serviceType,
  instanceId,
  bootstrapper,
  bootstrapOptions
);
protected readonly _descriptorFactory: RunnableDescriptorFactory
public async createServiceDescriptor(
    serviceType: string,
    instanceId?: string,
    bootstrapper?: RunnableBootstrapper,
    bootstrapOptions?: unknown
  ): Promise<RunnableDescriptor<GenericServiceMetadata>> {
    const serviceCategory = this.isServiceExternal(serviceType)
      ? RunnableCategory.ExternalService
      : RunnableCategory.Service;
    return this.createRunnableDescriptor<GenericServiceMetadata>(
      serviceCategory,
      serviceType,
      instanceId,
      bootstrapper,
      bootstrapOptions
    );
  }

在 createServiceDescriptor 方法中,首先调用子类实现的 isServiceExternal (是否外部服务) 确定服务的类型(实现 RunnableDescriptorFactory 类的子类有 LocalhostRunnableDescriptorFactory 和 K8sRunnableDescriptorFactory)。如果是调用的 LocalhostRunnableDescriptorFactory 的 isServiceExternal 方法,则 serviceType 为 orchestrator 或 messaging 的都返回 true,否则返回 false;如果是调用的 K8sRunnableDescriptorFactory 的 isServiceExternal 方法,则 serviceType 为 orchestrator、messaging、gateway、iModelManager 和 licensing 的都返回 true,否则返回 false。如果是外部服务,则 serviceCategory 为 "ExternalService",否则为 “Service”。

接着,调用 createRunnableDescriptor 方法。首先通过 runnableType(serviceType)导入服务的相关配置,然后构造 requiredArgs(可理解为必要的参数):

const requiredArgs = [
      ...new Set<string>([
        ...this.getOverriddenCommandLineArguments(
          runnableCategory,
          runnableType,
          instanceId
        ),
        ...(bootstrapper?.requiredArgs ?? []),
      ]),
    ];

getOverriddenCommandLineArguments 方法是通过 runnableCategory 类型拿到如 protocol、hostname、port、或者空。

接着,构造 optionalArgs(可理解为可选参数),最后过滤掉 requiredArgs 中存在的参数:

let optionalArgs = [
      ...new Set<string>([
        ...this.getOverriddenOptionalArguments(runnableCategory),
        ...(bootstrapper?.requiredArgs ?? []),
      ]),
    ];
    optionalArgs = optionalArgs.filter((arg) => !requiredArgs.includes(arg));

接着,构造 metadataArgs(可理解为元数据参数):

const metadataArgs = [Args.instanceId].filter(
      (arg) => !requiredArgs.includes(arg) && !optionalArgs.includes(arg)
    );

然后,通过 resolveMetadataValues 方法,把必要、可选和元数据参数以 key value 的形式存入 metadata 对象中:

const metadata: any = bootstrapOptions ?? {};
await this.resolveMetadataValues(
      requiredArgs,        // optionalArgs or metadataArgs
      metadata,
      runnableCategory,
      runnableType,
      config,
      instanceId,
      bootstrapper
);

然后,通过刚刚构造好的 metadata,对比 requiredArgs 和 metadataArgs,找到未处理好的参数,如有则抛出错误:

const unresolvedRequiredArgs: string[] = [];
this.addUnresolvedArgs(metadata, requiredArgs, unresolvedRequiredArgs);
this.addUnresolvedArgs(metadata, metadataArgs, unresolvedRequiredArgs);

if (unresolvedRequiredArgs.length)
  throw new MissingRequiredPropertiesError(unresolvedRequiredArgs);

接下来,执行:

const args: string[] = [];
this.composeCommandLineArguments(metadata, requiredArgs, args);
this.composeCommandLineArguments(metadata, optionalArgs, args);
private composeCommandLineArguments(
    metadata: { [id: string]: string },
    argNames: string[],
    args: string[]
  ) {
    for (const argName of argNames) {
      const value = metadata[argName];
      if (value) {
        if (argName === Args.entrypoint) args.unshift(value);
        else args.push(`--${argName}`, String(value));
      }
    }
  }

将参数名以及参数数值以命令行的形式插入 args 数组。最后,返回一个 RunnableDescriptor,其中包含了该服务的可运行类别(Job、Service、ExternalService)、可运行类型(gateway、licensing等)以及其运行所需要的的参数:

return new RunnableDescriptor<T>(
  runnableCategory,
  runnableType,
  args,                   // 数组,以命令行的形式保存了参数名以及参数值
  metadata                                // 以 key value 的形式存储必要、可选和元数据参数
);
export class RunnableDescriptor<T extends { [key: string]: any }> {
  public constructor(
    public readonly category: RunnableCategory,
    public readonly runnableType: string,
    public readonly args: string[],
    public readonly metadata: T
  ) {}
}

最终把这样一个 RunnableDescriptor 返回给 descriptor:

const descriptor = await this._descriptorFactory.createServiceDescriptor(
  serviceType,
  instanceId,
  bootstrapper,
  bootstrapOptions
);

接着,继续调用 Orchestrator 类中的方法:

const startedServer = await this.startServiceInternal(descriptor);

startServiceInternal 方法在 Orchesrtrator 类中是抽象方法,有两个类继承于它,一个是 ChildProcessOrchestrator,另一个是K8sOrchestrator,根据不同调用不同的 startServiceInternal 方法。如果调用的是 ChildProcessOrchestrator 类中的 startServiceInternal 方法。

在 startServiceInternal 方法中,首先会判断服务是否是 ExternalService:

if (descriptor.category === RunnableCategory.ExternalService) {
  const externalService = new ExternalService(
    descriptor.runnableType,
    descriptor.metadata.instanceId,
    descriptor.metadata.hostname,
    descriptor.metadata.port,
    descriptor.metadata.protocol,
    descriptor.metadata
  );
  this._services.add(externalService);
  return externalService;
}

如果是,则构造一个 ExternalService 并添加到 _services(RunnableRegistry 类型)中,返回该 ExternalService 类的实例。

如果不是,则调用 ChildProcessLauncher 类中的 startProcess 方法:

const running = this._processLauncher.startProcess(descriptor);

在 startProcess 方法中,将传入参数构造一个 ChildProcessRunningService 类并通过之前配置好的 args,调用

child_process 包中的 spawn 方法开启子进程执行程序:

running.proc = child_process.spawn("node", cmdargs);

running.proc 代表了执行中的子进程。

开启子进程之后,执行对子进程的 stdout 和 stderr 这两个 Readable Stream 的“监听”,获取其中的输出数据。之后便返回构造好的 ChildProcessRunningService 类实例,并把它添加到 _services 中,最后返回 ChildProcessRunningService。

最后,所有返回的 running service 都 push 到 _orchestrator 中。至此,initializeServices 方法执行完毕。

接下来是设置 Fastify 服务器,并设置路由及其 handler,设置的主要路由有:

GET http://orchestrator.url/service/:serviceType/:instanceId
GET http://orchestrator.url/service/:serviceType
POST http://orchestrator.url/service/:serviceType
DELETE http://orchestrator.url/service/:serviceType/:instanceId
POST http://orchestrator.url/job/:jobType
POST http://orchestrator.url/shutdown

接收来自 Orchestrator Client 发过来的请求并调用相应的 handler 去处理。例如,Orchestrator Client 请求 iModel Manager 服务的地址,则会发送 POST 请求 (http://orchestrator.url/service/:serviceType),服务器接收到请求之后,会调用 handler 处理器中的 findServices 方法返回相应的服务(存入result),其中包含该服务的 URL,使得 Gateway 可将请求转发至该服务上:

this._server.get(
  "/service/:serviceType",
  serviceCollectionResultOptions,
  this.getServiceQueryHandler()
);

设置好路由之后,便调用 listen 方法监听端口。

最后,执行:

await this._orchestrator.launch();
public async launch(): Promise<void> {
    const tasks: Array<Promise<void>> = [];
    for (const jobHandler of this._jobHandlers.values()) {
      tasks.push(jobHandler.launch());
    }
    await Promise.all(tasks);

    if (this._config.jobs?.[KnownJobs.checkpointScheduler]) {
      const request: JobRequest<JobBootstrapOptions> = {
        runnableType: KnownJobs.checkpointScheduler,
        options: {},
      };
      await this.enqueueJob(request);
    }
  }

对于 job handler,容器中只注入过两种:CheckpointJobHandler 和 CheckpointSchedulerJobHandler。CheckpointSchedulerJobHandler 的 launch 方法调用父类的方法,没有做什么实质操作。而 CheckpointJobHandler 的 launch 方法调用的是它继承的 IModelJobHandler 中的方法:

public async launch(): Promise<void> {
    await super.launch();     // 调用父类的方法,没有做什么实质操作

    await this.scheduleQueuedJobs();
  }

之后调用的 scheduleQueuedJobs 跟 messaging queue 是相关的。

在容器绑定依赖的过程中,如果在配置文件中有 messaging 相关的配置信息,则会绑定 RabbitMQMessagingClient 至容器中。

results matching ""

    No results matching ""