Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI hotfix: NGC access #683

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

CI hotfix: NGC access #683

wants to merge 1 commit into from

Conversation

dorotat-nv
Copy link
Collaborator

Description

The unit tests in CI are failing due to issues with NGC. Temporary changes source=ngc to source=pbss in the failing unit tests

Issue to track: #682

Type of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Refactor
  • Documentation update
  • Other (please describe):

CI Pipeline Configuration

Configure CI behavior by applying the relevant labels:

Note

By default, the notebooks validation tests are skipped unless explicitly enabled.

Usage

TODO: Add code snippet

Pre-submit Checklist

  • I have tested these changes locally
  • I have updated the documentation accordingly
  • I have added/updated tests as needed
  • All existing tests pass successfully

@dorotat-nv dorotat-nv added INCLUDE_NOTEBOOKS_TESTS Add Jupyter notebook validation to the CI pipeline INCLUDE_SLOW_TESTS Add unit tests marked as slow to CI pipeline labels Feb 7, 2025
@dorotat-nv dorotat-nv self-assigned this Feb 7, 2025
@dorotat-nv dorotat-nv enabled auto-merge February 7, 2025 15:34
result = subprocess.run(
["download_bionemo_data", "--source", "ngc", "single_cell/testdata-20240506"],
["download_bionemo_data", "--source", "pbss", "single_cell/testdata-20240506"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just xfail this test for now? This will never pass in our pre-merge CI that runs off-prem

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this test was that the ngcsdk library uses an asyncio loop that previously failed in a notebook setting. We've now wrapped that such that it works in a notebook, so this is a regression test that makes sure that stays supported.

Can we change the data target? Ideally there would be some public <1kb test file we could download just to ensure the connection is alive.

@codecov-commenter
Copy link

codecov-commenter commented Feb 7, 2025

❌ 5 Tests Failed:

Tests completed Failed Passed Skipped
891 5 886 12
View the top 3 failed test(s) by shortest run time
sub-packages/bionemo-esm2/tests/bionemo/esm2/scripts/test_train_esm2.py::test_val_dataloader_in_main_runs_with_limit_val_batches[4]
Stack Traces | 0.476s run time
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x706fd04ff590>
tmpdir = local('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru2')
dummy_protein_dataset = PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru2/protein_dataset.db')
dummy_parquet_train_val_inputs = (PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru2/train_clusters.parquet'), PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru2/valid_clusters.parquet'))
limit_val_batches = 4

    @pytest.mark.parametrize("limit_val_batches", [0.0, 1.0, 4, None])
    def test_val_dataloader_in_main_runs_with_limit_val_batches(
        monkeypatch, tmpdir, dummy_protein_dataset, dummy_parquet_train_val_inputs, limit_val_batches
    ):
        # TODO: pydantic.
        """Ensures doesn't run out of validation samples whenever updating limit_val_batches logic.
    
        Args:
            monkeypatch: (MonkeyPatch): Monkey patch for environment variables.
            tmpdir (str): Temporary directory.
            dummy_protein_dataset (str): Path to dummy protein dataset.
            dummy_parquet_train_val_inputs (tuple[str, str]): Tuple of dummy protein train and val cluster parquet paths.
            limit_val_batches (Union[int, float, None]): Limit validation batches. None implies 1.0 as in PTL.
        """
        train_cluster_path, valid_cluster_path = dummy_parquet_train_val_inputs
    
        result_dir = Path(tmpdir.mkdir("results"))
    
        with megatron_parallel_state_utils.distributed_model_parallel_state():
>           main(
                train_cluster_path=train_cluster_path,
                train_database_path=dummy_protein_dataset,
                valid_cluster_path=valid_cluster_path,
                valid_database_path=dummy_protein_dataset,
                num_nodes=1,
                devices=1,
                min_seq_length=128,
                max_seq_length=128,
                result_dir=result_dir,
                wandb_project=None,
                wandb_offline=True,
                scheduler_num_steps=None,
                num_steps=5,
                warmup_steps=2,
                limit_val_batches=limit_val_batches,
                val_check_interval=2,
                log_every_n_steps=None,
                num_dataset_workers=1,
                biobert_spec_option=BiobertSpecOption.esm2_bert_layer_with_transformer_engine_spec,
                lr=1e-4,
                micro_batch_size=2,
                accumulate_grad_batches=1,
                precision="bf16-mixed",
                experiment_name="test_experiment",
                resume_if_exists=False,
                create_tensorboard_logger=False,
                num_layers=2,
                num_attention_heads=2,
                hidden_size=4,
                ffn_hidden_size=4 * 4,
            )

.../esm2/scripts/test_train_esm2.py:171: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.12.../esm2/scripts/train_esm2.py:335: in main
    llm.train(
.../local/lib/python3.12.../collections/llm/api.py:107: in train
    trainer.fit(model, data)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:538: in fit
    call._call_and_handle_interrupt(
.../local/lib/python3.12.../pytorch/trainer/call.py:46: in _call_and_handle_interrupt
    return trainer.strategy.launcher.launch(trainer_fn, *args, trainer=trainer, **kwargs)
.../local/lib/python3.12.../strategies/launchers/subprocess_script.py:105: in launch
    return function(*args, **kwargs)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:574: in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:981: in _run
    results = self._run_stage()
.../local/lib/python3.12.../pytorch/trainer/trainer.py:1023: in _run_stage
    self._run_sanity_check()
.../local/lib/python3.12.../pytorch/trainer/trainer.py:1052: in _run_sanity_check
    val_loop.run()
.../local/lib/python3.12.../pytorch/loops/utilities.py:178: in _decorator
    return loop_run(self, *args, **kwargs)
.../local/lib/python3.12.../pytorch/loops/evaluation_loop.py:135: in run
    self._evaluation_step(batch, batch_idx, dataloader_idx, dataloader_iter)
.../local/lib/python3.12.../pytorch/loops/evaluation_loop.py:396: in _evaluation_step
    output = call._call_strategy_hook(trainer, hook_name, *step_args)
.../local/lib/python3.12.../pytorch/trainer/call.py:319: in _call_strategy_hook
    output = fn(*args, **kwargs)
.../local/lib/python3.12.../pytorch/strategies/megatron_strategy.py:640: in validation_step
    out = self.model.validation_step(dataloader_iter, *args, **kwargs)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:349: in validation_step
    return self._step(
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:436: in _step
    return self.forward(
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:286: in forward
    microbatch_outputs = step()
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:1167: in __call__
    return self.forward_backward_func(
.../local/lib/python3.12.../core/pipeline_parallel/schedules.py:471: in forward_backward_no_pipelining
    output_tensor, num_tokens = forward_step(
.../local/lib/python3.12.../core/pipeline_parallel/schedules.py:275: in forward_step
    output_tensor, loss_func = forward_step_func(data_iterator, model)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:499: in wrapped_forward_step_func
    output_tensor = _forward_step(model, batch)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:778: in wrapped
    return attr(*args)
.../local/lib/python3.12.../bionemo/llm/lightning.py:330: in validation_step
    outputs = self.forward_step(batch)
.../local/lib/python3.12.../bionemo/llm/lightning.py:313: in forward_step
    return self._forward_step(self.module, batch)
.../local/lib/python3.12.../model/biobert/lightning.py:149: in bert_forward_step
    forward_results = model.forward(input_ids=batch["text"], attention_mask=batch["attention_mask"])
.../local/lib/python3.12.../core/distributed/data_parallel_base.py:22: in forward
    return self.module(*inputs, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../core/transformer/module.py:178: in forward
    outputs = self.module(*inputs, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../model/biobert/model.py:392: in forward
    encoder_input: Optional[Tensor] = self.embedding_forward(
.../local/lib/python3.12.../esm2/model/model.py:219: in embedding_forward
    return self.embedding(
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../esm2/model/embedding.py:112: in forward
    word_embeddings = self.word_embeddings(input_ids)  # [b, s, h]
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1783: in inner
    args_result = hook(self, args)
.../local/lib/python3.12.../core/distributed/distributed_data_parallel.py:356: in hook
    self.param_to_bucket_group[param].finish_param_sync(
.../local/lib/python3.12.../core/distributed/param_and_grad_buffer.py:237: in finish_param_sync
    self.start_param_sync()
.../local/lib/python3.12.../core/distributed/param_and_grad_buffer.py:192: in start_param_sync
    with _coalescing_manager(
.../usr/lib/python3.12/contextlib.py:144: in __exit__
    next(self.gen)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

group = <torch.distributed.distributed_c10d.ProcessGroup object at 0x706f725ddeb0>
device = None, async_ops = True

    @contextlib.contextmanager
    def _coalescing_manager(
        group: Optional[ProcessGroup] = None,
        device: Optional[torch.device] = None,
        async_ops: Optional[bool] = False,
    ):
        """
        Context manager used to coalesce collectives or P2P operations when possible.
    
        Args:
            group (`ProcessGroup`, optional): The process group to work on. If None,
                the default process group will be used.
            device (`torch.device`, optional): Default is None, set to a device if
                there isn't a `**_coalesced` implementation by the backend.
            async_ops (`bool`, optional): whether the coalesced ops are async ops.
    
        Examples:
            >>> # xdoctest: +SKIP("no rank")
            >>> # Synchronous ops
            >>> with _coalescing_manager():
            >>>     for i in range(num_colls):
            >>>         dist.all_reduce(tensors[i])
            >>> # Asynchronous ops
            >>> with _coalescing_manager(async_ops=True) as cm:
            >>>     for i in range(num_colls):
            >>>         dist.all_reduce(tensors[i])
            >>> cm.wait()
    
        .. warning::
           :func:`_coalescing_manager` currently do not support coalescing
           all-reduces with different reduce operators, e.g.  `ReduceOp.SUM` mixed
           with `ReduceOp.PRODUCT`.
        """
        group = group or _get_default_group()
        op_list = _world.pg_coalesce_state.setdefault(group, [])
        if op_list:
            raise ValueError(
                "ProcessGroup has non-empty op list at the start of coalescing"
            )
        if device:
            group._start_coalescing(device)
        cm = _CoalescingManager()
        yield cm
        op_list = _world.pg_coalesce_state.pop(group)
        if op_list:
            # Collectives supporting "Fast Path" coalescing are captured.
            # See implementation in corresponding collective APIs.
            # Currently supported:
            # - coalesced `all_reduce`
            # - coalesced `all_gather_into_tensor`
            # - coalesced `reduce_scatter_tensor`
            op0 = op_list[0].op
            if op0 == all_reduce:
                tensors = [op.tensor for op in op_list]
                all_reduce_opts = AllreduceCoalescedOptions()
                all_reduce_opts.reduceOp = not_none(op_list[0].redop)
                work = group.allreduce_coalesced(tensors, all_reduce_opts)
            elif op0 == all_gather_into_tensor:
                inputs = []
                outputs = []
                for op in op_list:
                    inputs.append(op.tensor)
                    outputs.append(not_none(op.dst_tensor))
>               work = group.allgather_into_tensor_coalesced(outputs, inputs)
E               torch.distributed.DistBackendError: NCCL error in: .../distributed/c10d/NCCLUtils.hpp:328, unhandled cuda error (run with NCCL_DEBUG=INFO for details), NCCL version 2.25.1
E               ncclUnhandledCudaError: Call to CUDA function failed.
E               Last error:
E               Failed to CUDA calloc async 24 bytes

.../local/lib/python3.12.../torch/distributed/distributed_c10d.py:2524: DistBackendError
sub-packages/bionemo-esm2/tests/bionemo/esm2/scripts/test_train_esm2.py::test_val_dataloader_in_main_runs_with_limit_val_batches[None]
Stack Traces | 0.485s run time
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x706fd0300a10>
tmpdir = local('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru3')
dummy_protein_dataset = PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru3/protein_dataset.db')
dummy_parquet_train_val_inputs = (PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru3/train_clusters.parquet'), PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru3/valid_clusters.parquet'))
limit_val_batches = None

    @pytest.mark.parametrize("limit_val_batches", [0.0, 1.0, 4, None])
    def test_val_dataloader_in_main_runs_with_limit_val_batches(
        monkeypatch, tmpdir, dummy_protein_dataset, dummy_parquet_train_val_inputs, limit_val_batches
    ):
        # TODO: pydantic.
        """Ensures doesn't run out of validation samples whenever updating limit_val_batches logic.
    
        Args:
            monkeypatch: (MonkeyPatch): Monkey patch for environment variables.
            tmpdir (str): Temporary directory.
            dummy_protein_dataset (str): Path to dummy protein dataset.
            dummy_parquet_train_val_inputs (tuple[str, str]): Tuple of dummy protein train and val cluster parquet paths.
            limit_val_batches (Union[int, float, None]): Limit validation batches. None implies 1.0 as in PTL.
        """
        train_cluster_path, valid_cluster_path = dummy_parquet_train_val_inputs
    
        result_dir = Path(tmpdir.mkdir("results"))
    
        with megatron_parallel_state_utils.distributed_model_parallel_state():
>           main(
                train_cluster_path=train_cluster_path,
                train_database_path=dummy_protein_dataset,
                valid_cluster_path=valid_cluster_path,
                valid_database_path=dummy_protein_dataset,
                num_nodes=1,
                devices=1,
                min_seq_length=128,
                max_seq_length=128,
                result_dir=result_dir,
                wandb_project=None,
                wandb_offline=True,
                scheduler_num_steps=None,
                num_steps=5,
                warmup_steps=2,
                limit_val_batches=limit_val_batches,
                val_check_interval=2,
                log_every_n_steps=None,
                num_dataset_workers=1,
                biobert_spec_option=BiobertSpecOption.esm2_bert_layer_with_transformer_engine_spec,
                lr=1e-4,
                micro_batch_size=2,
                accumulate_grad_batches=1,
                precision="bf16-mixed",
                experiment_name="test_experiment",
                resume_if_exists=False,
                create_tensorboard_logger=False,
                num_layers=2,
                num_attention_heads=2,
                hidden_size=4,
                ffn_hidden_size=4 * 4,
            )

.../esm2/scripts/test_train_esm2.py:171: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.12.../esm2/scripts/train_esm2.py:335: in main
    llm.train(
.../local/lib/python3.12.../collections/llm/api.py:107: in train
    trainer.fit(model, data)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:538: in fit
    call._call_and_handle_interrupt(
.../local/lib/python3.12.../pytorch/trainer/call.py:46: in _call_and_handle_interrupt
    return trainer.strategy.launcher.launch(trainer_fn, *args, trainer=trainer, **kwargs)
.../local/lib/python3.12.../strategies/launchers/subprocess_script.py:105: in launch
    return function(*args, **kwargs)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:574: in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:981: in _run
    results = self._run_stage()
.../local/lib/python3.12.../pytorch/trainer/trainer.py:1023: in _run_stage
    self._run_sanity_check()
.../local/lib/python3.12.../pytorch/trainer/trainer.py:1052: in _run_sanity_check
    val_loop.run()
.../local/lib/python3.12.../pytorch/loops/utilities.py:178: in _decorator
    return loop_run(self, *args, **kwargs)
.../local/lib/python3.12.../pytorch/loops/evaluation_loop.py:135: in run
    self._evaluation_step(batch, batch_idx, dataloader_idx, dataloader_iter)
.../local/lib/python3.12.../pytorch/loops/evaluation_loop.py:396: in _evaluation_step
    output = call._call_strategy_hook(trainer, hook_name, *step_args)
.../local/lib/python3.12.../pytorch/trainer/call.py:319: in _call_strategy_hook
    output = fn(*args, **kwargs)
.../local/lib/python3.12.../pytorch/strategies/megatron_strategy.py:640: in validation_step
    out = self.model.validation_step(dataloader_iter, *args, **kwargs)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:349: in validation_step
    return self._step(
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:436: in _step
    return self.forward(
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:286: in forward
    microbatch_outputs = step()
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:1167: in __call__
    return self.forward_backward_func(
.../local/lib/python3.12.../core/pipeline_parallel/schedules.py:471: in forward_backward_no_pipelining
    output_tensor, num_tokens = forward_step(
.../local/lib/python3.12.../core/pipeline_parallel/schedules.py:275: in forward_step
    output_tensor, loss_func = forward_step_func(data_iterator, model)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:499: in wrapped_forward_step_func
    output_tensor = _forward_step(model, batch)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:778: in wrapped
    return attr(*args)
.../local/lib/python3.12.../bionemo/llm/lightning.py:330: in validation_step
    outputs = self.forward_step(batch)
.../local/lib/python3.12.../bionemo/llm/lightning.py:313: in forward_step
    return self._forward_step(self.module, batch)
.../local/lib/python3.12.../model/biobert/lightning.py:149: in bert_forward_step
    forward_results = model.forward(input_ids=batch["text"], attention_mask=batch["attention_mask"])
.../local/lib/python3.12.../core/distributed/data_parallel_base.py:22: in forward
    return self.module(*inputs, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../core/transformer/module.py:178: in forward
    outputs = self.module(*inputs, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../model/biobert/model.py:392: in forward
    encoder_input: Optional[Tensor] = self.embedding_forward(
.../local/lib/python3.12.../esm2/model/model.py:219: in embedding_forward
    return self.embedding(
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../esm2/model/embedding.py:112: in forward
    word_embeddings = self.word_embeddings(input_ids)  # [b, s, h]
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1783: in inner
    args_result = hook(self, args)
.../local/lib/python3.12.../core/distributed/distributed_data_parallel.py:356: in hook
    self.param_to_bucket_group[param].finish_param_sync(
.../local/lib/python3.12.../core/distributed/param_and_grad_buffer.py:237: in finish_param_sync
    self.start_param_sync()
.../local/lib/python3.12.../core/distributed/param_and_grad_buffer.py:192: in start_param_sync
    with _coalescing_manager(
.../usr/lib/python3.12/contextlib.py:144: in __exit__
    next(self.gen)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

group = <torch.distributed.distributed_c10d.ProcessGroup object at 0x706f7231cab0>
device = None, async_ops = True

    @contextlib.contextmanager
    def _coalescing_manager(
        group: Optional[ProcessGroup] = None,
        device: Optional[torch.device] = None,
        async_ops: Optional[bool] = False,
    ):
        """
        Context manager used to coalesce collectives or P2P operations when possible.
    
        Args:
            group (`ProcessGroup`, optional): The process group to work on. If None,
                the default process group will be used.
            device (`torch.device`, optional): Default is None, set to a device if
                there isn't a `**_coalesced` implementation by the backend.
            async_ops (`bool`, optional): whether the coalesced ops are async ops.
    
        Examples:
            >>> # xdoctest: +SKIP("no rank")
            >>> # Synchronous ops
            >>> with _coalescing_manager():
            >>>     for i in range(num_colls):
            >>>         dist.all_reduce(tensors[i])
            >>> # Asynchronous ops
            >>> with _coalescing_manager(async_ops=True) as cm:
            >>>     for i in range(num_colls):
            >>>         dist.all_reduce(tensors[i])
            >>> cm.wait()
    
        .. warning::
           :func:`_coalescing_manager` currently do not support coalescing
           all-reduces with different reduce operators, e.g.  `ReduceOp.SUM` mixed
           with `ReduceOp.PRODUCT`.
        """
        group = group or _get_default_group()
        op_list = _world.pg_coalesce_state.setdefault(group, [])
        if op_list:
            raise ValueError(
                "ProcessGroup has non-empty op list at the start of coalescing"
            )
        if device:
            group._start_coalescing(device)
        cm = _CoalescingManager()
        yield cm
        op_list = _world.pg_coalesce_state.pop(group)
        if op_list:
            # Collectives supporting "Fast Path" coalescing are captured.
            # See implementation in corresponding collective APIs.
            # Currently supported:
            # - coalesced `all_reduce`
            # - coalesced `all_gather_into_tensor`
            # - coalesced `reduce_scatter_tensor`
            op0 = op_list[0].op
            if op0 == all_reduce:
                tensors = [op.tensor for op in op_list]
                all_reduce_opts = AllreduceCoalescedOptions()
                all_reduce_opts.reduceOp = not_none(op_list[0].redop)
                work = group.allreduce_coalesced(tensors, all_reduce_opts)
            elif op0 == all_gather_into_tensor:
                inputs = []
                outputs = []
                for op in op_list:
                    inputs.append(op.tensor)
                    outputs.append(not_none(op.dst_tensor))
>               work = group.allgather_into_tensor_coalesced(outputs, inputs)
E               torch.distributed.DistBackendError: NCCL error in: .../distributed/c10d/NCCLUtils.hpp:328, unhandled cuda error (run with NCCL_DEBUG=INFO for details), NCCL version 2.25.1
E               ncclUnhandledCudaError: Call to CUDA function failed.
E               Last error:
E               Failed to CUDA calloc async 608 bytes

.../local/lib/python3.12.../torch/distributed/distributed_c10d.py:2524: DistBackendError
sub-packages/bionemo-esm2/tests/bionemo/esm2/scripts/test_train_esm2.py::test_val_dataloader_in_main_runs_with_limit_val_batches[1.0]
Stack Traces | 0.584s run time
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x706fd050e060>
tmpdir = local('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru1')
dummy_protein_dataset = PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru1/protein_dataset.db')
dummy_parquet_train_val_inputs = (PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru1/train_clusters.parquet'), PosixPath('.../pytest-of-root/pytest-1/test_val_dataloader_in_main_ru1/valid_clusters.parquet'))
limit_val_batches = 1.0

    @pytest.mark.parametrize("limit_val_batches", [0.0, 1.0, 4, None])
    def test_val_dataloader_in_main_runs_with_limit_val_batches(
        monkeypatch, tmpdir, dummy_protein_dataset, dummy_parquet_train_val_inputs, limit_val_batches
    ):
        # TODO: pydantic.
        """Ensures doesn't run out of validation samples whenever updating limit_val_batches logic.
    
        Args:
            monkeypatch: (MonkeyPatch): Monkey patch for environment variables.
            tmpdir (str): Temporary directory.
            dummy_protein_dataset (str): Path to dummy protein dataset.
            dummy_parquet_train_val_inputs (tuple[str, str]): Tuple of dummy protein train and val cluster parquet paths.
            limit_val_batches (Union[int, float, None]): Limit validation batches. None implies 1.0 as in PTL.
        """
        train_cluster_path, valid_cluster_path = dummy_parquet_train_val_inputs
    
        result_dir = Path(tmpdir.mkdir("results"))
    
        with megatron_parallel_state_utils.distributed_model_parallel_state():
>           main(
                train_cluster_path=train_cluster_path,
                train_database_path=dummy_protein_dataset,
                valid_cluster_path=valid_cluster_path,
                valid_database_path=dummy_protein_dataset,
                num_nodes=1,
                devices=1,
                min_seq_length=128,
                max_seq_length=128,
                result_dir=result_dir,
                wandb_project=None,
                wandb_offline=True,
                scheduler_num_steps=None,
                num_steps=5,
                warmup_steps=2,
                limit_val_batches=limit_val_batches,
                val_check_interval=2,
                log_every_n_steps=None,
                num_dataset_workers=1,
                biobert_spec_option=BiobertSpecOption.esm2_bert_layer_with_transformer_engine_spec,
                lr=1e-4,
                micro_batch_size=2,
                accumulate_grad_batches=1,
                precision="bf16-mixed",
                experiment_name="test_experiment",
                resume_if_exists=False,
                create_tensorboard_logger=False,
                num_layers=2,
                num_attention_heads=2,
                hidden_size=4,
                ffn_hidden_size=4 * 4,
            )

.../esm2/scripts/test_train_esm2.py:171: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../local/lib/python3.12.../esm2/scripts/train_esm2.py:335: in main
    llm.train(
.../local/lib/python3.12.../collections/llm/api.py:107: in train
    trainer.fit(model, data)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:538: in fit
    call._call_and_handle_interrupt(
.../local/lib/python3.12.../pytorch/trainer/call.py:46: in _call_and_handle_interrupt
    return trainer.strategy.launcher.launch(trainer_fn, *args, trainer=trainer, **kwargs)
.../local/lib/python3.12.../strategies/launchers/subprocess_script.py:105: in launch
    return function(*args, **kwargs)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:574: in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
.../local/lib/python3.12.../pytorch/trainer/trainer.py:981: in _run
    results = self._run_stage()
.../local/lib/python3.12.../pytorch/trainer/trainer.py:1023: in _run_stage
    self._run_sanity_check()
.../local/lib/python3.12.../pytorch/trainer/trainer.py:1052: in _run_sanity_check
    val_loop.run()
.../local/lib/python3.12.../pytorch/loops/utilities.py:178: in _decorator
    return loop_run(self, *args, **kwargs)
.../local/lib/python3.12.../pytorch/loops/evaluation_loop.py:135: in run
    self._evaluation_step(batch, batch_idx, dataloader_idx, dataloader_iter)
.../local/lib/python3.12.../pytorch/loops/evaluation_loop.py:396: in _evaluation_step
    output = call._call_strategy_hook(trainer, hook_name, *step_args)
.../local/lib/python3.12.../pytorch/trainer/call.py:319: in _call_strategy_hook
    output = fn(*args, **kwargs)
.../local/lib/python3.12.../pytorch/strategies/megatron_strategy.py:640: in validation_step
    out = self.model.validation_step(dataloader_iter, *args, **kwargs)
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:349: in validation_step
    return self._step(
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:436: in _step
    return self.forward(
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:286: in forward
    microbatch_outputs = step()
.../local/lib/python3.12.../nemo/lightning/megatron_parallel.py:1167: in __call__
    return self.forward_backward_func(
.../local/lib/python3.12.../core/pipeline_parallel/schedules.py:471: in forward_backward_no_pipelining
    output_tensor, num_tokens = forward_step(
.../local/lib/python3.12.../core/pipeline_parallel/schedules.py:284: in forward_step
    outputs = loss_func(output_tensor)
.../local/lib/python3.12.../nn/modules/module.py:1740: in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
.../local/lib/python3.12.../nn/modules/module.py:1848: in _call_impl
    return inner()
.../local/lib/python3.12.../nn/modules/module.py:1794: in inner
    result = forward_call(*args, **kwargs)
.../local/lib/python3.12.../llm/model/loss.py:229: in forward
    reduced_loss = average_losses_across_data_parallel_group([loss_for_microbatch])
.../local/lib/python3.12.../common/megatron/utils.py:198: in average_losses_across_data_parallel_group
    torch.distributed.all_reduce(averaged_losses, group=parallel_state.get_data_parallel_group())
.../local/lib/python3.12.../torch/distributed/c10d_logger.py:81: in wrapper
    return func(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

tensor = tensor([4.8837], device='cuda:0'), op = <RedOpType.SUM: 0>
group = <torch.distributed.distributed_c10d.ProcessGroup object at 0x706fd04a16b0>
async_op = False

    @_exception_logger
    def all_reduce(tensor, op=ReduceOp.SUM, group=None, async_op=False):
        """
        Reduces the tensor data across all machines in a way that all get the final result.
    
        After the call ``tensor`` is going to be bitwise identical in all processes.
    
        Complex tensors are supported.
    
        Args:
            tensor (Tensor): Input and output of the collective. The function
                operates in-place.
            op (optional): One of the values from
                ``torch.distributed.ReduceOp``
                enum.  Specifies an operation used for element-wise reductions.
            group (ProcessGroup, optional): The process group to work on. If None,
                the default process group will be used.
            async_op (bool, optional): Whether this op should be an async op
    
        Returns:
            Async work handle, if async_op is set to True.
            None, if not async_op or if not part of the group
    
        Examples:
            >>> # xdoctest: +SKIP("no rank")
            >>> # All tensors below are of torch.int64 type.
            >>> # We have 2 process groups, 2 ranks.
            >>> device = torch.device(f'cuda:{rank}')
            >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
            >>> tensor
            tensor([1, 2], device='cuda:0') # Rank 0
            tensor([3, 4], device='cuda:1') # Rank 1
            >>> dist.all_reduce(tensor, op=ReduceOp.SUM)
            >>> tensor
            tensor([4, 6], device='cuda:0') # Rank 0
            tensor([4, 6], device='cuda:1') # Rank 1
    
            >>> # All tensors below are of torch.cfloat type.
            >>> # We have 2 process groups, 2 ranks.
            >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
            >>> tensor
            tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
            tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
            >>> dist.all_reduce(tensor, op=ReduceOp.SUM)
            >>> tensor
            tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
            tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
    
        """
        _check_single_tensor(tensor, "tensor")
        if _rank_not_in_group(group):
            _warn_not_in_group("all_reduce")
            return
    
        if tensor.is_complex():
            if not supports_complex(op):
                raise ValueError(f"all_reduce does not support {op} on complex tensors")
            tensor = torch.view_as_real(tensor)
    
        opts = AllreduceOptions()
        opts.reduceOp = op
        if group is None:
            group = _get_default_group()
    
        if group in _world.pg_coalesce_state.keys():
            # We are in coalescing context, do not issue single operation, just append a collective representation
            coll = _CollOp(all_reduce, tensor, None, op, None)
            _world.pg_coalesce_state[group].append(coll)
            if async_op:
                return _IllegalWork()
            else:
                return None
    
>       work = group.allreduce([tensor], opts)
E       torch.distributed.DistBackendError: NCCL error in: .../distributed/c10d/NCCLUtils.hpp:328, unhandled cuda error (run with NCCL_DEBUG=INFO for details), NCCL version 2.25.1
E       ncclUnhandledCudaError: Call to CUDA function failed.
E       Last error:
E       Failed to CUDA calloc async 608 bytes

.../local/lib/python3.12.../torch/distributed/distributed_c10d.py:2729: DistBackendError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@dorotat-nv dorotat-nv disabled auto-merge February 11, 2025 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
INCLUDE_NOTEBOOKS_TESTS Add Jupyter notebook validation to the CI pipeline INCLUDE_SLOW_TESTS Add unit tests marked as slow to CI pipeline
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants