Skip to content

Commit

Permalink
PEFT eval fix (#7626) (#7638)
Browse files Browse the repository at this point in the history
* fix issue where peft weights are not loaded for distributed checkpoints



* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: Chen Cui <[email protected]>
Co-authored-by: Chen Cui <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 11, 2023
1 parent 9bc6f0a commit f03dd66
Showing 1 changed file with 174 additions and 0 deletions.
174 changes: 174 additions & 0 deletions nemo/collections/nlp/parts/nlp_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,180 @@ def dummy():
return instance


class PEFTSaveRestoreConnector(NLPSaveRestoreConnector):
"""
PEFT models require the ability to load/save a small subset of the full model (once PEFT params have been infused into the base model.)
The PEFTSaveRestoreConnector is used to allow loading and saving only the PEFT params while not saving the entire model.
Args:
peft_model_nemo_path: Used to provide the .nemo file corresponding to a PEFT model (which will only contain a small set of params)
peft_model_ckpt_path: Used to provide the path to .ckpt files of a PEFT model. This is required when no .nemo is available (yet) such as during resumed training.
peft_model_ckpt_name: The filename of the ckpt file inside the peft_model_ckpt_path folder
If both are provided the peft_model_ckpt_path takes precedence.
If neither are provided, PEFT params are initialized at random (not loaded from any external source).
"""

def __init__(
self,
peft_model_nemo_path: Optional[str] = None,
peft_model_ckpt_path: Optional[str] = None,
peft_model_ckpt_name: Optional[str] = "model_weights.ckpt",
) -> None:
super().__init__()
self.peft_model_ckpt_name = peft_model_ckpt_name
if peft_model_ckpt_path:
# First we will try to load a adapter ckpt path
# this is given priority over loading from nemo path to make resumption of training possible
ckpt_name = os.path.basename(peft_model_ckpt_path)
if not ckpt_name.strip() == '':
# update the weights file name inside the ckpt path rank folders
self.peft_model_ckpt_name = ckpt_name
self.peft_model_ckpt_dir = os.path.dirname(peft_model_ckpt_path)
assert os.path.isdir(self.peft_model_ckpt_dir)
self.peft_model_nemo_path = None
elif peft_model_nemo_path:
# If resumption is not possible we will try to load a adapter nemo path
self.peft_model_nemo_path = peft_model_nemo_path
assert os.path.exists(self.peft_model_nemo_path)
self.peft_model_ckpt_dir = None
else:
# We are not resuming training from a nemo file or a ckpt
# We are training the adapter from randomly initialization
self.peft_model_nemo_path = None
self.peft_model_ckpt_dir = None

def _load_state_dict_from_disk(self, model_weights, map_location=None):
"""
Infuse the state_dict of the base model with PEFT params from either a peft_model_nemo_path or peft_model_ckpt_path
"""
# first load based model weights
base_model_state_dict = super()._load_state_dict_from_disk(model_weights, map_location)

# if distributed checkpointing, load peft weights in restore_from
if base_model_state_dict:
# Next, We want to load PEFT model's weights
if self.peft_model_nemo_path:
# if the PEFT weights are provided in a .nemo file
# we need to untar the .nemo if its still tarred
with tempfile.TemporaryDirectory() as tmpdir:
self._unpack_nemo_file(self.peft_model_nemo_path, tmpdir)
model_weights_path = self._inject_model_parallel_rank_for_ckpt(tmpdir, self.peft_model_ckpt_name)
peft_state_dict = torch.load(model_weights_path, map_location)
elif self.peft_model_ckpt_dir:
# if the PEFT weights are provided in a ckpt path file
# we don't need to untar
model_weights_path = self._inject_model_parallel_rank_for_ckpt(
self.peft_model_ckpt_dir, self.peft_model_ckpt_name
)
peft_state_dict = torch.load(model_weights_path, map_location)['state_dict']
else:
peft_state_dict = {}
base_model_state_dict.update(peft_state_dict) # add the PEFT state_dict into the base model's state_dict
return base_model_state_dict

def restore_from(
self,
calling_cls,
restore_path: str,
override_config_path: Optional[Union[OmegaConf, str]] = None,
map_location: Optional[torch.device] = None,
strict: bool = True,
return_config: bool = False,
trainer: Trainer = None,
):
"""
Extends the restore_from method of the `NLPSaveRestoreConnector` so that PEFT params are inserted into the state_dict which is required when training a PEFT model from scratch.
"""
# Get path where the command is executed - the artifacts will be "retrieved" there
# (original .nemo behavior)
loaded_params = super().load_config_and_state_dict(
calling_cls, restore_path, override_config_path, map_location, strict, return_config, trainer,
)
if not isinstance(loaded_params, tuple) or return_config is True:
return loaded_params
conf, instance, state_dict = loaded_params

# if we're using dist checkpointing then state_dict will be None
if state_dict is None:
# dist checkpointing needs torch.distributed to load the checkpoint
if parallel_state.is_unitialized():

def dummy():
return

if trainer.strategy.launcher is not None:
trainer.strategy.launcher.launch(dummy, trainer=trainer)
trainer.strategy.setup_environment()

with tempfile.TemporaryDirectory() as tmpdir:
# Check if self.model_extracted_dir is set, and is a valid path
if self.model_extracted_dir is not None and os.path.isdir(self.model_extracted_dir):
# Log that NeMo will use the provided `model_extracted_dir`
logging.info(
f"Restoration will occur within pre-extracted directory : " f"`{self.model_extracted_dir}`."
)

# Override `tmpdir` above with the pre-extracted `model_extracted_dir`
tmpdir = self.model_extracted_dir

else:
# Extract the nemo file into the temporary directory
self._unpack_nemo_file(
path2file=restore_path, out_folder=tmpdir, extract_config_only=return_config is True
)
checkpoint = {}
sharded_state_dict = instance.sharded_state_dict()

# if distributed checkpointing, load peft weights here instead of in _load_state_dict_from_disk
if self.peft_model_nemo_path:
# if the PEFT weights are provided in a .nemo file
# we need to untar the .nemo if its still tarred
with tempfile.TemporaryDirectory() as tmpdir2:
self._unpack_nemo_file(self.peft_model_nemo_path, tmpdir2)
model_weights_path = self._inject_model_parallel_rank_for_ckpt(
tmpdir2, self.peft_model_ckpt_name
)
peft_state_dict = torch.load(model_weights_path, map_location)
elif self.peft_model_ckpt_dir:
# if the PEFT weights are provided in a ckpt path file
# we don't need to untar
model_weights_path = self._inject_model_parallel_rank_for_ckpt(
self.peft_model_ckpt_dir, self.peft_model_ckpt_name
)
peft_state_dict = torch.load(model_weights_path, map_location)['state_dict']
else:
peft_state_dict = instance.get_peft_state_dict()

if conf.peft.peft_scheme != "ptuning":
for k in peft_state_dict.keys():
sharded_state_dict.pop(k)

checkpoint['state_dict'] = sharded_state_dict
# remove model weights extension
tmp_model_weights_ckpt = os.path.join(tmpdir, self.model_weights_ckpt)
tmp_model_weights_dir = os.path.splitext(tmp_model_weights_ckpt)[0]
assert os.path.isdir(tmp_model_weights_dir), f'Expected {tmp_model_weights_dir} to be a directory.'
checkpoint = dist_checkpointing.load(
sharded_state_dict=checkpoint, checkpoint_dir=tmp_model_weights_dir
)
checkpoint['state_dict'].update(peft_state_dict)
instance.on_load_checkpoint(checkpoint)
if hasattr(instance, 'setup_transformer_engine_tp_groups'):
instance.setup_transformer_engine_tp_groups()

else:
if (
self.peft_model_nemo_path is None and self.peft_model_ckpt_dir is None
): # we have this check only for training PEFT from scratch
peft_state_dict = instance.get_peft_state_dict()
state_dict.update(peft_state_dict)
state_dict = self.modify_state_dict(conf, state_dict)
self.load_instance_with_state_dict(instance, state_dict, strict)

logging.info(f'Model {instance.__class__.__name__} was successfully restored from {restore_path}.')
return instance


class PipelineMixedPrecisionPlugin(MixedPrecisionPlugin):
""" Overrides PTL autocasting to not wrap training/val/test_step.
We do this because we have the megatron-core fwd/bwd functions in training_step.
Expand Down

0 comments on commit f03dd66

Please sign in to comment.