Skip to content

Commit

Permalink
Improve IO error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cbalioglu committed Jan 25, 2025
1 parent 16ec96d commit e8fcbe8
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 122 deletions.
11 changes: 9 additions & 2 deletions src/fairseq2/assets/_metadata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(

@override
def _load_cache(self) -> dict[str, dict[str, object]]:
path = self._file_system.resolve(self._path)
path = self._path

cache = {}

Expand All @@ -139,7 +139,14 @@ def cache_file(file: Path, source: str) -> None:

cache[name] = metadata

if self._file_system.is_dir(path):
try:
is_dir = self._file_system.is_dir(path)
except OSError as ex:
raise AssetMetadataError(
f"The '{path}' path cannot be accessed. See the nested exception for details."
) from ex

if is_dir:
source = f"directory:{path}"

def on_error(ex: OSError) -> NoReturn:
Expand Down
3 changes: 3 additions & 0 deletions src/fairseq2/checkpoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@

from __future__ import annotations

from fairseq2.checkpoint._manager import CheckpointDeleteError as CheckpointDeleteError
from fairseq2.checkpoint._manager import CheckpointError as CheckpointError
from fairseq2.checkpoint._manager import CheckpointLoadError as CheckpointLoadError
from fairseq2.checkpoint._manager import CheckpointManager as CheckpointManager
from fairseq2.checkpoint._manager import (
CheckpointNotFoundError as CheckpointNotFoundError,
)
from fairseq2.checkpoint._manager import CheckpointSaveError as CheckpointSaveError
from fairseq2.checkpoint._manager import FileCheckpointManager as FileCheckpointManager
from fairseq2.checkpoint._metadata_provider import (
FileCheckpointMetadataProvider as FileCheckpointMetadataProvider,
Expand Down
184 changes: 124 additions & 60 deletions src/fairseq2/checkpoint/_manager.py

Large diffs are not rendered by default.

96 changes: 60 additions & 36 deletions src/fairseq2/checkpoint/_metadata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from __future__ import annotations

from pathlib import Path
from typing import final
from typing import Iterable, final

from typing_extensions import override

Expand Down Expand Up @@ -86,46 +86,62 @@ def add_checkpoint_metadata(name: str, step_nr: int) -> None:

scores = []

try:
for step_dir in self._file_system.glob(self._checkpoint_dir, "step_*"):
if not self._file_system.is_dir(step_dir):
continue
def iter_step_dirs() -> Iterable[Path]:
try:
for step_dir in self._file_system.glob(self._checkpoint_dir, "step_*"):
if not self._file_system.is_dir(step_dir):
continue

try:
step_nr = int(step_dir.name[5:])
except ValueError:
continue
yield step_dir
except OSError as ex:
raise AssetMetadataError(
f"The '{self._checkpoint_dir}' base checkpoint directory cannot be traversed. See the nested exception for details."
) from ex

for step_dir in iter_step_dirs():
try:
step_nr = int(step_dir.name[5:])
except ValueError:
continue

add_checkpoint_metadata(f"checkpoint_step_{step_nr}@", step_nr)

max_step_nr = max(max_step_nr, step_nr)

add_checkpoint_metadata(f"checkpoint_step_{step_nr}@", step_nr)
# Load score.
score_file = step_dir.joinpath("score.txt")

max_step_nr = max(max_step_nr, step_nr)
def load_error() -> AssetMetadataError:
return AssetMetadataError(
f"The score of the training step {step_nr} cannot be loaded from the '{score_file}' file. See the nested exception for details."
)

# Load score.
score_file = step_dir.joinpath("score.txt")
if self._file_system.exists(score_file):
try:
score_exists = self._file_system.exists(score_file)
except OSError as ex:
raise load_error() from ex

if score_exists:
try:
fp = self._file_system.open_text(score_file)
except OSError as ex:
raise load_error() from ex

try:
line = fp.readline()
except OSError as ex:
raise AssetMetadataError(
f"The score of the training step {step_nr} cannot be loaded from the '{score_file}' file. See the nested exception for details."
) from ex
finally:
fp.close()

try:
score = float(line)
except ValueError:
raise AssetMetadataError(
f"The score of the training step {step_nr} cannot be parsed as a floating-point number."
) from None

scores.append((score, step_nr))
except OSError as ex:
raise AssetMetadataError(
f"The base '{self._checkpoint_dir}' checkpoint directory cannot be traversed. See the nested exception for details."
) from ex
try:
line = fp.readline()
except OSError as ex:
raise load_error() from ex
finally:
fp.close()

try:
score = float(line)
except ValueError:
raise AssetMetadataError(
f"The score of the training step {step_nr} cannot be parsed as a floating-point number."
) from None

scores.append((score, step_nr))

if max_step_nr == -1:
return
Expand All @@ -146,6 +162,14 @@ def add_checkpoint_metadata(name: str, step_nr: int) -> None:

def _load_tokenizer(self, cache: dict[str, dict[str, object]]) -> None:
metadata_file = self._checkpoint_dir.joinpath("tokenizer.yaml")
if self._file_system.exists(metadata_file):

try:
tokenizer_exists = self._file_system.exists(metadata_file)
except OSError as ex:
raise AssetMetadataError(
f"The '{metadata_file}' path cannot be accessed. See the nested exception for details."
) from ex

if tokenizer_exists:
for name, metadata in self._metadata_file_loader.load(metadata_file):
cache[name] = metadata
30 changes: 26 additions & 4 deletions src/fairseq2/recipes/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,27 @@ def register_extra_asset_paths(

metadata_file_loader = StandardMetadataFileLoader(yaml_loader)

def access_error(path: Path) -> SetupError:
return SetupError(
f"The '{path}' path cannot be accessed. See the nested exception for details."
)

path = config_section.extra_path
if path is not None:
if not file_system.exists(path):
try:
path_exists = file_system.exists(path)
except OSError as ex:
raise access_error(path) from ex

if not path_exists:
log.warning("The '{}' path pointed to by the `extra_asset_card_path` configuration does not exist.", path) # fmt: skip

return

path = file_system.resolve(path)
try:
path = file_system.resolve(path)
except OSError as ex:
raise access_error(path) from ex

context.asset_store.user_metadata_providers.append(
FileAssetMetadataProvider(path, file_system, metadata_file_loader)
Expand All @@ -150,12 +163,21 @@ def register_extra_asset_paths(
path = config_section.checkpoint_dir
if path is not None:
metadata_file = path.joinpath("model.yaml")
if not file_system.exists(metadata_file):

try:
metadata_exists = file_system.exists(metadata_file)
except OSError as ex:
raise access_error(metadata_file) from ex

if not metadata_exists:
log.warning("The checkpoint metadata file (model.yaml) is not found under the '{}' directory. Make sure that the `checkpoint_search_dir` configuration points to the base checkpoint directory used during training.", path) # fmt: skip

return

path = file_system.resolve(path)
try:
path = file_system.resolve(path)
except OSError as ex:
raise access_error(path) from ex

context.asset_store.user_metadata_providers.append(
FileCheckpointMetadataProvider(path, file_system, metadata_file_loader)
Expand Down
14 changes: 12 additions & 2 deletions src/fairseq2/recipes/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ def _setup_aten_logging(self, log_file: Path) -> None:

aten_log_file = log_file.parent.joinpath("aten", log_file.name)

self._file_system.make_directory(aten_log_file.parent)
try:
self._file_system.make_directory(aten_log_file.parent)
except OSError as ex:
raise SetupError(
f"The '{aten_log_file.parent}' ATen log directory cannot be created. See the nested exception for details."
) from ex

_enable_aten_logging(aten_log_file)

Expand All @@ -89,7 +94,12 @@ def _setup_nccl_logging(self, log_file: Path) -> None:

nccl_log_file = log_file.parent.joinpath("nccl", log_file.name)

self._file_system.make_directory(nccl_log_file.parent)
try:
self._file_system.make_directory(nccl_log_file.parent)
except OSError as ex:
raise SetupError(
f"The '{nccl_log_file.parent}' NCCL log directory cannot be created. See the nested exception for details."
) from ex

os.environ["NCCL_DEBUG"] = "INFO"
os.environ["NCCL_DEBUG_FILE"] = str(nccl_log_file)
Expand Down
9 changes: 8 additions & 1 deletion src/fairseq2/recipes/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,14 @@ def read(
# Update the configuration with `--config-file`.
if config_files:
for config_file in chain.from_iterable(config_files):
if not self._file_system.is_file(config_file):
try:
is_file = self._file_system.is_file(config_file)
except OSError as ex:
raise SetupError(
f"The '{config_file}' configuration file cannot be accessed. See the nested exception for details."
) from ex

if not is_file:
raise ConfigFileNotFoundError(config_file)

try:
Expand Down
66 changes: 49 additions & 17 deletions src/fairseq2/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,15 @@ def load(
with catch_warnings():
warnings.simplefilter("ignore") # Suppress noisy FSDP warnings.

fp = self._file_system.open(path)
def load_error() -> TensorLoadError:
return TensorLoadError(
f"The '{path}' tensor file cannot be loaded. See the nested exception for details."
)

try:
fp = self._file_system.open(path)
except OSError as ex:
raise load_error() from ex

try:
data: dict[str, object] = torch.load(
Expand All @@ -223,9 +231,7 @@ def load(
except FileNotFoundError:
raise
except (RuntimeError, OSError, PickleError) as ex:
raise TensorLoadError(
f"The '{path}' tensor file cannot be loaded. See the nested exception for details."
) from ex
raise load_error() from ex
finally:
fp.close()

Expand All @@ -244,14 +250,20 @@ def dump(self, data: Mapping[str, object], path: Path) -> None:
with catch_warnings():
warnings.simplefilter("ignore") # Suppress noisy FSDP warnings.

fp = self._file_system.open(path, mode=FileMode.WRITE)
def dump_error() -> TensorDumpError:
return TensorDumpError(
f"The '{path}' tensor file cannot be dumped. See the nested exception for details.",
)

try:
fp = self._file_system.open(path, mode=FileMode.WRITE)
except OSError as ex:
raise dump_error() from ex

try:
torch.save(data, fp)
except (RuntimeError, OSError, PickleError) as ex:
raise TensorDumpError(
f"The '{path}' tensor file cannot be dumped. See the nested exception for details.",
) from ex
raise dump_error() from ex
finally:
fp.close()

Expand Down Expand Up @@ -282,10 +294,21 @@ def load(
"Safetensors only supports `torch.device` and `str` for the `map_location` parameter."
)

if self._file_system.is_dir(path):
file_iter = self._file_system.glob(path, "*.safetensors")
try:
is_dir = self._file_system.is_dir(path)
except OSError as ex:
raise TensorLoadError(
f"The '{path}' path cannot be accessed. See the nested exception for details."
) from ex

if is_dir:
try:
files = list(self._file_system.glob(path, "*.safetensors"))
except OSError as ex:
raise TensorLoadError(
f"The '{path}' directory cannot be traversed. See the nested exception for details."
) from ex

files = list(file_iter)
if not files:
raise TensorLoadError(
f"No Safetensors file found under the '{path}' directory."
Expand Down Expand Up @@ -329,20 +352,29 @@ def __init__(self, file_system: FileSystem) -> None:
def load(
self, path: Path, *, map_location: MapLocation = None
) -> dict[str, object]:
def has_files(path: Path, extension: str) -> bool:
file_iter = self._file_system.glob(path, f"*{extension}")

def has_files(extension: str) -> bool:
try:
next(iter(file_iter))
next(iter(self._file_system.glob(path, f"*{extension}")))
except OSError as ex:
raise TensorLoadError(
f"The '{path}' directory cannot be traversed. See the nested exception for details."
) from ex
except StopIteration:
return False

return True

loader: TensorLoader

if self._file_system.is_dir(path):
if not has_files(path, ".safetensors"):
try:
is_dir = self._file_system.is_dir(path)
except OSError as ex:
raise TensorLoadError(
f"The '{path}' path cannot be accessed. See the nested exception for details."
) from ex

if is_dir:
if not has_files(".safetensors"):
raise TensorLoadError(
f"The '{path}' directory does not contain any supported tensor files."
)
Expand Down

0 comments on commit e8fcbe8

Please sign in to comment.