Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Checkpoint] Fix symlink issue where symlink file uploaded before checkpoint files upload #3376

Merged
merged 66 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
f52c770
a
bigning Jun 4, 2024
8ee8364
a
bigning Jun 5, 2024
4fecdf6
a
bigning Jun 6, 2024
7e53a3b
a
bigning Jun 6, 2024
76a5f2d
Merge https://github.com/mosaicml/composer into checkpoint_saver
bigning Jun 6, 2024
20cac57
Merge remote-tracking branch 'remotes/origin/checkpoint_saver' into c…
bigning Jun 6, 2024
f772e33
a
bigning Jun 6, 2024
4e391a6
a
bigning Jun 8, 2024
55ac530
a
bigning Jun 8, 2024
e2d267b
a
bigning Jun 10, 2024
8035f50
fix test
bigning Jun 11, 2024
e65110d
a
bigning Jun 11, 2024
40cddfb
a
bigning Jun 11, 2024
91d838c
a
bigning Jun 11, 2024
a23552b
a
bigning Jun 11, 2024
cf4e0f1
fix unit test
bigning Jun 11, 2024
229b57d
Merge https://github.com/mosaicml/composer into checkpoint_saver
bigning Jun 12, 2024
dde135f
Merge branch 'checkpoint_saver' of https://github.com/mosaicml/compos…
bigning Jun 12, 2024
e6884fc
a
bigning Jun 13, 2024
9911766
a
bigning Jun 13, 2024
36a1dc5
a
bigning Jun 13, 2024
e4db035
a
bigning Jun 13, 2024
081033c
a
bigning Jun 13, 2024
ae5ece3
fix 2gpu unit test
bigning Jun 13, 2024
2f5d6b0
a
bigning Jun 13, 2024
28a36e0
a
bigning Jun 13, 2024
703ef5f
Merge https://github.com/mosaicml/composer into checkpoint_saver
bigning Jun 13, 2024
c78f475
a
bigning Jun 13, 2024
7ecfcf3
a
bigning Jun 13, 2024
1280266
fix doctest
bigning Jun 14, 2024
c0cb94d
a
bigning Jun 14, 2024
95fca9f
fix test and lint
bigning Jun 14, 2024
2c77da9
up
bigning Jun 14, 2024
ca46b4f
a
bigning Jun 14, 2024
4f3108c
a
bigning Jun 14, 2024
11307f0
Merge branch 'dev' into checkpoint_saver
bigning Jun 17, 2024
f415d60
a
bigning Jun 18, 2024
a0a3e92
a
bigning Jun 18, 2024
301dd67
a
bigning Jun 18, 2024
c4c094b
a
bigning Jun 18, 2024
5ec3e28
a
bigning Jun 20, 2024
9813816
a
bigning Jun 20, 2024
8c3c5cc
address comments
bigning Jun 20, 2024
c81cc2f
a
bigning Jun 20, 2024
c1174d4
a
bigning Jun 20, 2024
df601d2
a
bigning Jun 20, 2024
a41f427
a
bigning Jun 20, 2024
bc06a7b
rerun test
bigning Jun 20, 2024
c87f36c
add logging
bigning Jun 21, 2024
1ebf5a7
Merge branch 'dev' into checkpoint_saver
bigning Jun 21, 2024
0e8ae23
remove debug comments
bigning Jun 21, 2024
c7541c4
comments
bigning Jun 21, 2024
a9081c2
a
bigning Jun 25, 2024
b98ad33
cleanup
bigning Jun 26, 2024
8a6f5d1
a
bigning Jun 26, 2024
ebbcc46
linter
bigning Jun 26, 2024
3575d1e
lint
bigning Jun 26, 2024
fb8dbba
Update composer/callbacks/checkpoint_saver.py
bigning Jun 28, 2024
df4f59a
commenst
bigning Jun 28, 2024
4971526
a
bigning Jun 28, 2024
ebbbf56
fix test
bigning Jun 28, 2024
3bb10c9
fix test
bigning Jun 28, 2024
0d4c7af
comments
bigning Jul 2, 2024
9d4e112
Merge branch 'dev' into checkpoint_saver
bigning Jul 3, 2024
6ed9aa7
a
bigning Jul 3, 2024
b781375
Merge branch 'dev' into checkpoint_saver
bigning Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 161 additions & 18 deletions composer/callbacks/checkpoint_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
FORMAT_NAME_WITH_DIST_AND_TIME_TABLE,
FORMAT_NAME_WITH_DIST_TABLE,
PartialFilePath,
RemoteFilesExistingCheckStatus,
RemoteUploader,
checkpoint,
create_interval_scheduler,
create_symlink_file,
Expand All @@ -28,6 +30,7 @@
format_name_with_dist,
format_name_with_dist_and_time,
is_model_deepspeed,
parse_uri,
partial_format,
)
from composer.utils.checkpoint import _TORCH_DISTRIBUTED_CHECKPOINTS_METADATA_FILENAME
Expand Down Expand Up @@ -287,6 +290,9 @@ def __init__(
num_checkpoints_to_keep: int = -1,
weights_only: bool = False,
ignore_keys: Optional[Union[list[str], Callable[[dict], None]]] = None,
save_folder: str = '',
bigning marked this conversation as resolved.
Show resolved Hide resolved
num_concurrent_uploads: int = 1,
upload_timeout_in_seconds: int = 3600,
):
folder = str(folder)
filename = str(filename)
Expand Down Expand Up @@ -320,6 +326,34 @@ def __init__(

self.start_batch = None

self.remote_uploader = None
backend, _, _ = parse_uri(save_folder)
self.rank_saves_symlinks: bool = False
self.tmp_dir_for_symlink = tempfile.TemporaryDirectory()
self.num_concurrent_uploads = num_concurrent_uploads
self.upload_timeout_in_seconds = upload_timeout_in_seconds
# Allow unit test to override this to make it faster
self._symlink_upload_wait_before_next_try_in_seconds = 30.0
self.pid = os.getpid()
self.symlink_count = 0
bigning marked this conversation as resolved.
Show resolved Hide resolved
self.symlink_upload_tasks = []

if backend != '':
if backend == 'wandb':
raise NotImplementedError(
f'There is no implementation for WandB via URI. Please use '
'WandBLogger with log_artifacts set to True.',
)
elif backend not in ['s3', 'oci', 'gs', 'azure', 'dbfs']:
raise NotImplementedError(
f'There is no implementation for the cloud backend {backend} via URI. Please use '
'one of the supported object stores (s3, oci, gs, azure, dbfs).',
)
bigning marked this conversation as resolved.
Show resolved Hide resolved
bigning marked this conversation as resolved.
Show resolved Hide resolved
self.remote_uploader = RemoteUploader(
remote_folder=save_folder,
bigning marked this conversation as resolved.
Show resolved Hide resolved
num_concurrent_uploads=self.num_concurrent_uploads,
)
mvpatel2000 marked this conversation as resolved.
Show resolved Hide resolved

def init(self, state: State, logger: Logger) -> None:
# If MLFlowLogger is being used, format MLFlow-specific placeholders in the save folder and paths.
# Assumes that MLFlowLogger comes before CheckpointSaver in the list of loggers.
Expand All @@ -346,9 +380,10 @@ def init(self, state: State, logger: Logger) -> None:
self.latest_remote_file_name.filename,
**mlflow_format_kwargs,
)

break

if self.remote_uploader is not None:
self.remote_uploader.init()
folder = format_name_with_dist(self.folder, state.run_name)
os.makedirs(folder, exist_ok=True)

Expand Down Expand Up @@ -410,6 +445,27 @@ def load_state_dict(self, state: dict[str, Any]):
load_timestamp.load_state_dict(timestamp_state)
self.all_saved_checkpoints_to_timestamp[save_filename] = load_timestamp

def _upload_checkpoint(
self,
remote_file_name: str,
local_file_name: str,
local_remote_file_names: list[str],
logger: Logger,
):
if self.remote_uploader is not None:
self.remote_uploader.upload_file_async(
remote_file_name=remote_file_name,
file_path=pathlib.Path(local_file_name),
overwrite=self.overwrite,
)
local_remote_file_names.append(remote_file_name)
else:
logger.upload_file(
remote_file_name=remote_file_name,
file_path=local_file_name,
overwrite=self.overwrite,
)

def _save_checkpoint(self, state: State, logger: Logger):
self.last_checkpoint_batch = state.timestamp.batch

Expand All @@ -432,7 +488,13 @@ def _save_checkpoint(self, state: State, logger: Logger):
)
log.debug(f'Checkpoint locally saved to {saved_path}')

self.symlink_count += 1
bigning marked this conversation as resolved.
Show resolved Hide resolved
local_remote_file_names = []
bigning marked this conversation as resolved.
Show resolved Hide resolved
all_remote_filenames = []

if not saved_path: # not all ranks save
if self.remote_file_name is not None and self.remote_uploader is not None:
all_remote_filenames = dist.all_gather_object(local_remote_file_names)
return

metadata_local_file_path = None
Expand All @@ -443,6 +505,7 @@ def _save_checkpoint(self, state: State, logger: Logger):
state.timestamp,
)

self.rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled
if self.latest_filename is not None and self.num_checkpoints_to_keep != 0:
symlink = self.latest_filename.format(state, is_deepspeed)
os.makedirs(os.path.dirname(symlink), exist_ok=True)
Expand All @@ -455,8 +518,7 @@ def _save_checkpoint(self, state: State, logger: Logger):
src_path = str(pathlib.Path(saved_path).parent)
else:
src_path = saved_path
this_rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled
if this_rank_saves_symlinks:
if self.rank_saves_symlinks:
os.symlink(os.path.relpath(src_path, os.path.dirname(symlink)), symlink)

# if remote file name provided, upload the checkpoint
Expand All @@ -482,10 +544,11 @@ def _save_checkpoint(self, state: State, logger: Logger):
state.timestamp,
)
assert metadata_local_file_path is not None
logger.upload_file(
self._upload_checkpoint(
remote_file_name=metadata_remote_file_name,
file_path=metadata_local_file_path,
overwrite=self.overwrite,
local_file_name=metadata_local_file_path,
local_remote_file_names=local_remote_file_names,
logger=logger,
)
else:
remote_file_name = self.remote_file_name.format(
Expand All @@ -495,12 +558,20 @@ def _save_checkpoint(self, state: State, logger: Logger):

log.debug(f'Uploading checkpoint to {remote_file_name}')
try:
logger.upload_file(remote_file_name=remote_file_name, file_path=saved_path, overwrite=self.overwrite)
self._upload_checkpoint(
remote_file_name=remote_file_name,
local_file_name=saved_path,
local_remote_file_names=local_remote_file_names,
logger=logger,
)
except FileExistsError as e:
raise FileExistsError(
f'Uploading checkpoint failed with error: {e}. overwrite was set to {self.overwrite}. To overwrite checkpoints with Trainer, set save_overwrite to True.',
) from e

if self.remote_uploader is not None:
all_remote_filenames = dist.all_gather_object(local_remote_file_names)

# symlinks stay the same with sharded checkpointing
if self.latest_remote_file_name is not None:
symlink_name = self.latest_remote_file_name.format(
Expand All @@ -509,17 +580,31 @@ def _save_checkpoint(self, state: State, logger: Logger):
).lstrip('/') + '.symlink'

# create and upload a symlink file
with tempfile.TemporaryDirectory() as tmpdir:
symlink_filename = os.path.join(tmpdir, 'latest.symlink')
# Sharded checkpoints for torch >2.0 use directories not files for load_paths
if state.fsdp_sharded_state_dict_enabled:
src_path = str(pathlib.Path(remote_file_name).parent)
symlink_filename = os.path.join(
self.tmp_dir_for_symlink.name,
f'latest.{self.symlink_count}.symlink',
)
# Sharded checkpoints for torch >2.0 use directories not files for load_paths
if state.fsdp_sharded_state_dict_enabled:
src_path = str(pathlib.Path(remote_file_name).parent)
else:
src_path = remote_file_name
log.debug(f'Creating symlink file {symlink_filename} -> {src_path}')
if self.rank_saves_symlinks:
create_symlink_file(src_path, symlink_filename)
if self.remote_uploader is not None:
bigning marked this conversation as resolved.
Show resolved Hide resolved
remote_checkpoint_file_names = []
for file_names in all_remote_filenames:
remote_checkpoint_file_names += file_names
bigning marked this conversation as resolved.
Show resolved Hide resolved
check_remote_files_exist_future = self.remote_uploader.check_remote_files_exist_async(
remote_checkpoint_file_names=remote_checkpoint_file_names,
max_wait_time_in_seconds=self.upload_timeout_in_seconds,
wait_before_next_try_in_seconds=self._symlink_upload_wait_before_next_try_in_seconds,
)
self.symlink_upload_tasks.append(
(check_remote_files_exist_future, symlink_filename, symlink_name),
)
else:
src_path = remote_file_name
log.debug(f'Creating symlink file {symlink_filename} -> {src_path}')
this_rank_saves_symlinks = dist.get_global_rank() == 0 or not state.fsdp_sharded_state_dict_enabled
if this_rank_saves_symlinks:
create_symlink_file(src_path, symlink_filename)
logger.upload_file(
remote_file_name=symlink_name,
file_path=symlink_filename,
Expand All @@ -532,7 +617,6 @@ def _save_checkpoint(self, state: State, logger: Logger):
self._rotate_checkpoints(sharding_enabled=state.fsdp_sharded_state_dict_enabled)

def _rotate_checkpoints(self, sharding_enabled: bool = False):

while len(self.saved_checkpoints) > self.num_checkpoints_to_keep:
prefix_dir = None
checkpoint_to_delete = self.saved_checkpoints.pop(0)
Expand All @@ -542,3 +626,62 @@ def _rotate_checkpoints(self, sharding_enabled: bool = False):
else:
if dist.get_global_rank() == 0:
shutil.rmtree(prefix_dir)

def batch_end(self, state: State, logger: Logger) -> None:
del state, logger # unused
if self.remote_uploader is None:
return
self.remote_uploader.check_workers()
if not self.rank_saves_symlinks:
return
undone_symlink_upload_tasks = []
for (check_remote_files_exist_future, local_symlink_file,
remote_symlink_file) in reversed(self.symlink_upload_tasks):
if not check_remote_files_exist_future.done():
undone_symlink_upload_tasks.insert(
0,
(check_remote_files_exist_future, local_symlink_file, remote_symlink_file),
)
continue
if check_remote_files_exist_future.done():
result = check_remote_files_exist_future.result()
if result == RemoteFilesExistingCheckStatus.EXIST:
self.remote_uploader.upload_file_async(
remote_file_name=remote_symlink_file,
file_path=local_symlink_file,
overwrite=True,
)
break
bigning marked this conversation as resolved.
Show resolved Hide resolved
else:
raise RuntimeError(f'Failed to check if checkpoint files upload finish: {result}')
self.symlink_upload_tasks = undone_symlink_upload_tasks

def fit_end(self, state: State, logger: Logger) -> None:
del state, logger # unused
if self.remote_uploader is None:
return
log.info('Waiting for checkpoint uploading to finish')
self.remote_uploader.wait()
if self.rank_saves_symlinks and len(self.symlink_upload_tasks) > 0:
log.debug('Uploading symlink to the latest checkpoint')
# We only need to upload the latest symlinke file, ignoring the old ones
bigning marked this conversation as resolved.
Show resolved Hide resolved
check_remote_files_exist_future, local_symlink_file, remote_symlink_file = self.symlink_upload_tasks[-1]
bigning marked this conversation as resolved.
Show resolved Hide resolved
result = check_remote_files_exist_future.result()
if result == RemoteFilesExistingCheckStatus.EXIST:
symlink_upload_future = self.remote_uploader.upload_file_async(
remote_file_name=remote_symlink_file,
file_path=local_symlink_file,
overwrite=True,
)
symlink_upload_future.result()
else:
raise RuntimeError(f'Failed to check if checkpoint files upload finish: {result}')
log.info('Checkpoint uploading finished!')

def post_close(self):
if self.remote_uploader is not None:
# Wait the symlink file upload to finish and close remote uploader
try:
self.remote_uploader.wait_and_close()
except Exception as e:
log.error(f'RemoteUploader run into exception {e}')
59 changes: 5 additions & 54 deletions composer/loggers/remote_uploader_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@
from composer.loggers import Logger, MosaicMLLogger
from composer.loggers.logger_destination import LoggerDestination
from composer.utils import (
GCSObjectStore,
LibcloudObjectStore,
MLFlowObjectStore,
ObjectStore,
ObjectStoreTransientError,
OCIObjectStore,
S3ObjectStore,
SFTPObjectStore,
UCObjectStore,
build_remote_backend,
dist,
format_name_with_dist,
get_file,
retry,
validate_credentials,
)
from composer.utils.object_store.mlflow_object_store import MLFLOW_DBFS_PATH_PREFIX

Expand All @@ -50,37 +46,6 @@
__all__ = ['RemoteUploaderDownloader']


def _build_remote_backend(remote_backend_name: str, backend_kwargs: dict[str, Any]):
remote_backend_cls = None
remote_backend_name_to_cls = {
's3': S3ObjectStore,
'oci': OCIObjectStore,
'sftp': SFTPObjectStore,
'libcloud': LibcloudObjectStore,
'gs': GCSObjectStore,
}

# Handle `dbfs` backend as a special case, since it can map to either :class:`.UCObjectStore`
# or :class:`.MLFlowObjectStore`.
if remote_backend_name == 'dbfs':
path = backend_kwargs['path']
if path.startswith(MLFLOW_DBFS_PATH_PREFIX):
remote_backend_cls = MLFlowObjectStore
else:
# Validate if the path conforms to the requirements for UC volume paths
UCObjectStore.validate_path(path)
remote_backend_cls = UCObjectStore
else:
remote_backend_cls = remote_backend_name_to_cls.get(remote_backend_name, None)
if remote_backend_cls is None:
supported_remote_backends = list(remote_backend_name_to_cls.keys()) + ['dbfs']
raise ValueError(
f'The remote backend {remote_backend_name} is not supported. Please use one of ({supported_remote_backends})',
)

return remote_backend_cls(**backend_kwargs)


class RemoteUploaderDownloader(LoggerDestination):
r"""Logger destination that uploads (downloads) files to (from) a remote backend.

Expand Down Expand Up @@ -339,7 +304,7 @@ def __init__(
def remote_backend(self) -> ObjectStore:
"""The :class:`.ObjectStore` instance for the main thread."""
if self._remote_backend is None:
self._remote_backend = _build_remote_backend(self.remote_backend_name, self.backend_kwargs)
self._remote_backend = build_remote_backend(self.remote_backend_name, self.backend_kwargs)
return self._remote_backend

def init(self, state: State, logger: Logger) -> None:
Expand All @@ -359,7 +324,7 @@ def init(self, state: State, logger: Logger) -> None:
retry(
ObjectStoreTransientError,
self.num_attempts,
)(lambda: _validate_credentials(self.remote_backend, file_name_to_test))()
)(lambda: validate_credentials(self.remote_backend, file_name_to_test))()

# If the remote backend is an `MLFlowObjectStore`, the original path kwarg may have placeholders that can be
# updated with information generated at runtime, i.e., the MLFlow experiment and run IDs. This information
Expand Down Expand Up @@ -635,20 +600,6 @@ def _remote_file_name(self, remote_file_name: str):
return key_name


def _validate_credentials(
remote_backend: ObjectStore,
remote_file_name_to_test: str,
) -> None:
# Validates the credentials by attempting to touch a file in the bucket
# raises an error if there was a credentials failure.
with tempfile.NamedTemporaryFile('wb') as f:
f.write(b'credentials_validated_successfully')
remote_backend.upload_object(
object_name=remote_file_name_to_test,
filename=f.name,
)


def _upload_worker(
file_queue: Union[queue.Queue[tuple[str, str, bool]], multiprocessing.JoinableQueue[tuple[str, str, bool]]],
completed_queue: Union[queue.Queue[str], multiprocessing.JoinableQueue[str]],
Expand All @@ -663,7 +614,7 @@ def _upload_worker(
The worker will continuously poll ``file_queue`` for files to upload. Once ``is_finished`` is set, the worker will
exit once ``file_queue`` is empty.
"""
remote_backend = _build_remote_backend(remote_backend_name, backend_kwargs)
remote_backend = build_remote_backend(remote_backend_name, backend_kwargs)
while True:
try:
file_path_to_upload, remote_file_name, overwrite = file_queue.get(block=True, timeout=0.5)
Expand Down
Loading
Loading