Skip to content

Commit

Permalink
add correct env node_rank, nnodes for torch
Browse files Browse the repository at this point in the history
  • Loading branch information
kuizhiqing committed Jun 28, 2023
1 parent ad5c282 commit d29362c
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 32 deletions.
9 changes: 4 additions & 5 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,6 @@ spec:
to null.
format: int32
type: integer
nProcPerNode:
description: 'Number of workers per node; supported values: [auto,
cpu, gpu, int].'
format: int32
type: integer
rdzvBackend:
type: string
rdzvConf:
Expand Down Expand Up @@ -585,6 +580,10 @@ spec:
set values are ignored.
type: boolean
type: object
nprocPerNode:
description: Number of workers per node
format: int32
type: integer
pytorchReplicaSpecs:
additionalProperties:
description: ReplicaSpec is a description of the replica
Expand Down
15 changes: 5 additions & 10 deletions pkg/controller.v1/pytorch/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ const (
// EnvStartMethod is the environment variable name for the multiprocessing start method to use when creating workers, which could be fork, spawn and forkserver.
EnvStartMethod = "PET_START_METHOD"

// Worker/node size related arguments.

// EnvNprocPerNode is the environment variable name for the number of processes per node.
EnvNprocPerNode = "PET_NPROC_PER_NODE"
// EnvNNodes is the environment variable name for the number of nodes.
EnvNNodes = "PET_NNODES"
// EnvNNodes is the common environment variable name from envvar
)

var (
Expand Down Expand Up @@ -89,7 +84,7 @@ func (e ElasticEnvVarGenerator) Generate(
// Generate RDZV_BACKEND.
envVars = append(envVars, e.generateEnvBackend(elasticPolicy))
// Generate NNODES.
if envVar, err := e.generateEnvNNodes(job); err != nil {
if envVar, err := e.generateEnvNnodes(job); err != nil {
return nil, err
} else {
envVars = append(envVars, *envVar)
Expand Down Expand Up @@ -120,23 +115,23 @@ func (e ElasticEnvVarGenerator) Generate(
return envVars, nil
}

func (e ElasticEnvVarGenerator) generateEnvNNodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) {
func (e ElasticEnvVarGenerator) generateEnvNnodes(job *kubeflowv1.PyTorchJob) (*corev1.EnvVar, error) {
// Return worker.replicas if there is no max and min replicas specified.
if job.Spec.ElasticPolicy.MinReplicas == nil &&
job.Spec.ElasticPolicy.MaxReplicas == nil {
if job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker] == nil {
return nil, fmt.Errorf("cannot find the worker spec")
}
return &corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: strconv.Itoa(
int(*job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].
Replicas)),
}, nil
}

return &corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: fmt.Sprintf("%d:%d",
*job.Spec.ElasticPolicy.MinReplicas, *job.Spec.ElasticPolicy.MaxReplicas),
}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestElasticGenerate(t *testing.T) {
Value: "rdzv-conf-name=rdzv-conf-value,rdzv-conf-name-1=rdzv-conf-value-1",
},
{
Name: EnvNNodes,
Name: EnvNnodes,
Value: "1:3",
},
},
Expand Down
40 changes: 25 additions & 15 deletions pkg/controller.v1/pytorch/envvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ import (
kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
)

const (
// Worker/node size related arguments.

// EnvNprocPerNode is the environment variable name for the number of processes per node.
EnvNprocPerNode = "PET_NPROC_PER_NODE"
// EnvNnodes is the environment variable name for the number of nodes.
EnvNnodes = "PET_NNODES"
// EnvNodeRank is the environment variable name for the rank of nodes.
EnvNodeRank = "PET_NODE_RANK"
)

// EnvVarGenerator is the environment variable generator interface.
type EnvVarGenerator interface {
Generate(job *kubeflowv1.PyTorchJob) ([]corev1.EnvVar, error)
Expand All @@ -49,6 +60,10 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype,
Value: "0",
})

totalReplicas := getTotalReplicas(pytorchjob)
nprocPerNode := getNprocPerNode(pytorchjob)
worldSize := totalReplicas * nprocPerNode

// If the master is not null, then we need to set the MASTER_ADDR and RANK.
if pytorchjob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] != nil {
envVars, err := GetMasterEnvVarGenerator().Generate(pytorchjob)
Expand All @@ -68,10 +83,6 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype,
rank = rank + 1
}

totalReplicas := getTotalReplicas(pytorchjob)
worldSize := getWorldSize(pytorchjob)
nprocPerNode := getNprocPerNode(pytorchjob)

podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{
Name: "WORLD_SIZE",
Value: strconv.Itoa(int(worldSize)),
Expand All @@ -85,12 +96,14 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype,
Value: strconv.Itoa(int(nprocPerNode)),
})
podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{
Name: EnvNNodes,
Value: strconv.Itoa(int(totalReplicas)),
Name: EnvNodeRank,
Value: strconv.Itoa(rank),
})
}

// Set the elastic environment variables if the elasticPolicy is not null.
// nnodes is set in range format in elastic mode, e.g. nnodes=1:4
// otherwise, nnodes is set by int, e.g. nnodes=2
if pytorchjob.Spec.ElasticPolicy != nil {
envVars, err := GetElasticEnvVarGenerator().Generate(pytorchjob)
if err != nil {
Expand All @@ -99,6 +112,12 @@ func setPodEnv(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype,
// Set elastic related environment variables.
podTemplateSpec.Spec.Containers[i].Env = append(
podTemplateSpec.Spec.Containers[i].Env, envVars...)
} else {
podTemplateSpec.Spec.Containers[i].Env = append(
podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{
Name: EnvNnodes,
Value: strconv.Itoa(int(totalReplicas)),
})
}
}

Expand All @@ -113,15 +132,6 @@ func getNprocPerNode(job *kubeflowv1.PyTorchJob) int32 {
}
}

func getWorldSize(job *kubeflowv1.PyTorchJob) int32 {
worldSize := int32(0)
nprocPerNode := getNprocPerNode(job)
for _, r := range job.Spec.PyTorchReplicaSpecs {
worldSize += *r.Replicas * nprocPerNode
}
return worldSize
}

func getTotalReplicas(job *kubeflowv1.PyTorchJob) int32 {
jobReplicas := int32(0)
for _, r := range job.Spec.PyTorchReplicaSpecs {
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller.v1/pytorch/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ var (
onceMaster sync.Once
EnvMasterPort = "MASTER_PORT"
EnvMasterAddr = "MASTER_ADDR"

PETMasterPort = "PET_MASTER_PORT"
PETMasterAddr = "PET_MASTER_ADDR"
)

// MasterEnvVarGenerator is the environment variable generator for Master related arguments.
Expand Down Expand Up @@ -42,10 +45,18 @@ func (e MasterEnvVarGenerator) Generate(
Name: EnvMasterPort,
Value: strconv.Itoa(int(masterPort)),
})
envVars = append(envVars, corev1.EnvVar{
Name: PETMasterPort,
Value: strconv.Itoa(int(masterPort)),
})
envVars = append(envVars, corev1.EnvVar{
Name: EnvMasterAddr,
Value: masterAddr,
})
envVars = append(envVars, corev1.EnvVar{
Name: PETMasterAddr,
Value: masterAddr,
})
}
return envVars, nil
}
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/pytorchjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ var _ = Describe("PyTorchJob controller", func() {
Name: EnvRDZVBackend,
Value: string(backendC10D),
}, corev1.EnvVar{
Name: EnvNNodes,
Name: EnvNnodes,
Value: fmt.Sprintf("%d:%d", *minReplicas, *maxReplicas),
}, corev1.EnvVar{
Name: EnvRDZVEndpoint,
Expand Down

0 comments on commit d29362c

Please sign in to comment.