Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some small fix and merge logic #256

Merged
merged 7 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

'use strict';

import * as component from "../../../common/component";
import { EventEmitter } from 'events';
import { delay } from "../../../common/utils";
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { EnvironmentInformation, Channel } from "../environment";
import { AMLEnvironmentInformation } from '../aml/amlConfig';
import { EventEmitter } from 'events';
import { AMLEnvironmentService } from "../environments/amlEnvironmentService";
import { STDOUT } from "../../../core/commands";
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { Channel, EnvironmentInformation } from "../environment";

class AMLRunnerConnection extends RunnerConnection {
}
Expand All @@ -19,7 +16,6 @@ export class AMLCommandChannel extends CommandChannel {
private stopping: boolean = false;
private currentMessageIndex: number = -1;
private sendQueues: [EnvironmentInformation, string][] = [];
private metricEmitter: EventEmitter | undefined;
private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?<metrics>.*?)'`;

public constructor(commandEmitter: EventEmitter) {
Expand All @@ -30,23 +26,25 @@ export class AMLCommandChannel extends CommandChannel {
}

public async config(_key: string, _value: any): Promise<void> {
switch (_key) {
case "MetricEmitter":
this.metricEmitter = _value as EventEmitter;
break;
}
// do nothing
}

public async start(): Promise<void> {
// start command loops
this.receiveLoop();
this.sendLoop();
// do nothing
}

public async stop(): Promise<void> {
this.stopping = true;
}

public async run(): Promise<void> {
// start command loops
await Promise.all([
this.receiveLoop(),
this.sendLoop()
]);
}

protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
this.sendQueues.push([environment, message]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import * as component from "../../../common/component";
import { delay } from "../../../common/utils";
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { EnvironmentInformation, Channel } from "../environment";
import { Channel, EnvironmentInformation } from "../environment";
import { StorageService } from "../storageService";

class FileHandler {
Expand Down Expand Up @@ -38,15 +38,21 @@ export class FileCommandChannel extends CommandChannel {
}

public async start(): Promise<void> {
// start command loops
this.receiveLoop();
this.sendLoop();
// do nothing
}

public async stop(): Promise<void> {
this.stopping = true;
}

public async run(): Promise<void> {
// start command loops
await Promise.all([
this.receiveLoop(),
this.sendLoop()
]);
}

protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
this.sendQueues.push([environment, message]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export class WebCommandChannel extends CommandChannel {
}
}

public async run(): Promise<void>{
// do nothing
}

protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
if (this.webSocketServer === undefined) {
throw new Error(`WebCommandChannel: uninitialized!`)
Expand Down
3 changes: 3 additions & 0 deletions src/nni_manager/training_service/reusable/commandChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export abstract class CommandChannel {
public abstract start(): Promise<void>;
public abstract stop(): Promise<void>;

// Pull-based command channels need loop to check messages, the loop should be started with await here.
public abstract run(): Promise<void>;

protected abstract sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void>;
protected abstract createRunnerConnection(environment: EnvironmentInformation): RunnerConnection;

Expand Down
36 changes: 18 additions & 18 deletions src/nni_manager/training_service/reusable/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,6 @@ import { CommandChannel } from "./commandChannel";
export type EnvironmentStatus = 'UNKNOWN' | 'WAITING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'USER_CANCELED';
export type Channel = "web" | "file" | "aml" | "ut";

export abstract class EnvironmentService {

public abstract get hasStorageService(): boolean;

public abstract config(key: string, value: string): Promise<void>;
public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void>;
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>;
public abstract stopEnvironment(environment: EnvironmentInformation): Promise<void>;

public getCommandChannel(commandEmitter: EventEmitter): CommandChannel {
return new WebCommandChannel(commandEmitter);
}

public createEnviornmentInfomation(envId: string, envName: string): EnvironmentInformation {
return new EnvironmentInformation(envId, envName);
}
}

export class NodeInfomation {
public id: string;
public status: TrialJobStatus = "UNKNOWN";
Expand Down Expand Up @@ -110,3 +92,21 @@ export class EnvironmentInformation {
}
}
}

export abstract class EnvironmentService {

public abstract get hasStorageService(): boolean;

public abstract config(key: string, value: string): Promise<void>;
public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void>;
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>;
public abstract stopEnvironment(environment: EnvironmentInformation): Promise<void>;

public getCommandChannel(commandEmitter: EventEmitter): CommandChannel {
return new WebCommandChannel(commandEmitter);
}

public createEnviornmentInfomation(envId: string, envName: string): EnvironmentInformation {
return new EnvironmentInformation(envId, envName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ export class OpenPaiEnvironmentService extends EnvironmentService {
}

// Step 1. Prepare PAI job configuration
environment.runnerWorkingFolder = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/envs/${environment.id}`;
environment.command = `cd ${environment.runnerWorkingFolder} && ${environment.command}`
const environmentRoot = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}`;
environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`;
environment.command = `cd ${environmentRoot} && ${environment.command}`
environment.trackingUrl = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${environment.jobId}`

// Step 2. Generate Job Configuration in yaml format
Expand Down
80 changes: 36 additions & 44 deletions src/nni_manager/training_service/reusable/trialDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as path from 'path';
import { Writable } from 'stream';
import { String } from 'typescript-string-operations';
import * as component from '../../common/component';
import { getExperimentId, getPlatform, getBasePort } from '../../common/experimentStartupInfo';
import { getBasePort, getExperimentId, getPlatform } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { NNIManagerIpConfig, TrainingService, TrialJobApplicationForm, TrialJobMetric, TrialJobStatus } from '../../common/trainingService';
import { delay, getExperimentRootDir, getLogLevel, getVersion, mkDirPSync, uniqueString } from '../../common/utils';
Expand All @@ -18,11 +18,10 @@ import { GPUSummary } from '../../training_service/common/gpuData';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { validateCodeDir, execMkdir, execCopydir, tarAdd } from '../common/util';
import { WebCommandChannel } from './channels/webCommandChannel';
import { AMLCommandChannel } from './channels/amlCommandChannel';
import { validateCodeDir } from '../common/util';
import { Command, CommandChannel } from './commandChannel';
import { EnvironmentInformation, EnvironmentService, NodeInfomation, RunnerSettings } from './environment';
import { MountedStorageService } from './storages/mountedStorageService';
import { StorageService } from './storageService';
import { TrialDetail } from './trial';

Expand Down Expand Up @@ -169,59 +168,58 @@ class TrialDispatcher implements TrainingService {
this.runnerSettings.commandChannel = this.commandChannel.channelName;

// for AML channel, other channels can ignore this.
this.commandChannel.config("MetricEmitter", this.metricsEmitter);
await this.commandChannel.config("MetricEmitter", this.metricsEmitter);

// start channel
this.commandEmitter.on("command", (command: Command): void => {
this.handleCommand(command).catch((err: Error) => {
this.log.error(`TrialDispatcher: error on handle env ${command.environment.id} command: ${command.command}, data: ${command.data}, error: ${err}`);
})
});
this.commandChannel.start();
await this.commandChannel.start();
this.log.info(`TrialDispatcher: started channel: ${this.commandChannel.constructor.name}`);

if (this.trialConfig === undefined) {
throw new Error(`trial config shouldn't be undefined in run()`);
}

this.log.info(`TrialDispatcher: copying code and settings.`);
let storageService: StorageService;
if (environmentService.hasStorageService) {
this.log.info(`TrialDispatcher: copying code and settings.`);
const storageService = component.get<StorageService>(StorageService);
// Copy the compressed file to remoteDirectory and delete it
const codeDir = path.resolve(this.trialConfig.codeDir);
const envDir = storageService.joinPath("envs");
const codeFileName = await storageService.copyDirectory(codeDir, envDir, true);
storageService.rename(codeFileName, "nni-code.tar.gz");

const installFileName = storageService.joinPath(envDir, 'install_nni.sh');
await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT, installFileName);

const runnerSettings = storageService.joinPath(envDir, "settings.json");
await storageService.save(JSON.stringify(this.runnerSettings), runnerSettings);

if (this.isDeveloping) {
let trialToolsPath = path.join(__dirname, "../../../../../tools/nni_trial_tool");
if (false === fs.existsSync(trialToolsPath)) {
trialToolsPath = path.join(__dirname, "..\\..\\..\\..\\..\\tools\\nni_trial_tool");
}
await storageService.copyDirectory(trialToolsPath, envDir, true);
}
this.log.debug(`TrialDispatcher: use existing storage service.`);
storageService = component.get<StorageService>(StorageService);
} else {
//write configuration to local folder, for AML
let environmentLocalTempFolder = path.join(this.experimentRootDir, this.experimentId, "environment-temp", "envs");
await execMkdir(environmentLocalTempFolder);
const runnerSettingsPath = path.join(environmentLocalTempFolder, "settings.json");
this.runnerSettings.command = this.trialConfig.command;
await fs.promises.writeFile(runnerSettingsPath, JSON.stringify(this.runnerSettings), { encoding: 'utf8' });
const installFilePath = path.join(environmentLocalTempFolder, "install_nni.sh");
await fs.promises.writeFile(installFilePath, CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
await tarAdd(path.join(environmentLocalTempFolder, 'nni-code.tar.gz'), this.trialConfig.codeDir);
this.log.debug(`TrialDispatcher: create temp storage service to temp folder.`);
storageService = new MountedStorageService();
const environmentLocalTempFolder = path.join(this.experimentRootDir, this.experimentId, "environment-temp");
storageService.initialize(this.trialConfig.codeDir, environmentLocalTempFolder);
}

// Copy the compressed file to remoteDirectory and delete it
const codeDir = path.resolve(this.trialConfig.codeDir);
const envDir = storageService.joinPath("envs");
const codeFileName = await storageService.copyDirectory(codeDir, envDir, true);
storageService.rename(codeFileName, "nni-code.tar.gz");

const installFileName = storageService.joinPath(envDir, 'install_nni.sh');
await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT, installFileName);

const runnerSettings = storageService.joinPath(envDir, "settings.json");
await storageService.save(JSON.stringify(this.runnerSettings), runnerSettings);

if (this.isDeveloping) {
let trialToolsPath = path.join(__dirname, "../../../../../tools/nni_trial_tool");
if (false === fs.existsSync(trialToolsPath)) {
trialToolsPath = path.join(__dirname, "..\\..\\..\\..\\..\\tools\\nni_trial_tool");
}
await storageService.copyDirectory(trialToolsPath, envDir, true);
}

this.log.info(`TrialDispatcher: run loop started.`);
await Promise.all([
this.environmentMaintenanceLoop(),
this.trialManagementLoop(),
this.commandChannel.run(),
]);
}

Expand Down Expand Up @@ -288,7 +286,7 @@ class TrialDispatcher implements TrainingService {
}

this.commandEmitter.off("command", this.handleCommand);
this.commandChannel.stop();
await this.commandChannel.stop();
}

private async environmentMaintenanceLoop(): Promise<void> {
Expand Down Expand Up @@ -454,13 +452,7 @@ class TrialDispatcher implements TrainingService {
environment.command = "[ -d \"nni_trial_tool\" ] && echo \"nni_trial_tool exists already\" || (mkdir ./nni_trial_tool && tar -xof ../nni_trial_tool.tar.gz -C ./nni_trial_tool) && pip3 install websockets && " + environment.command;
}

if (environmentService.hasStorageService) {
const storageService = component.get<StorageService>(StorageService);
environment.workingFolder = storageService.joinPath("envs", envId);
await storageService.createDirectory(environment.workingFolder);
} else {
environment.command = `mkdir envs/${envId} && cd envs/${envId} && ${environment.command}`;
}
environment.command = `mkdir -p envs/${envId} && cd envs/${envId} && ${environment.command}`;

await environmentService.startEnvironment(environment);
this.environments.set(environment.id, environment);
Expand Down