Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
garrett4wade committed Sep 3, 2024
1 parent 09647b6 commit 45c97b5
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 53 deletions.
13 changes: 0 additions & 13 deletions realhf/apps/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
logger = logging.getLogger("main", "system")

CONTROLLER_TIME_LIMIT = None
TRACE_TIMEOUT = 360 # Should be larger than TRACER_SAVE_INTERVAL_SECONDS defined in system/worker_base.py


def scheduler_mode(mode: str) -> str:
Expand Down Expand Up @@ -117,8 +116,6 @@ def main_start(args, recover_count: int = 0):
CLUSTER_SPEC_PATH=cluster_spec_path,
REAL_RECOVER_RUN="1" if is_recover_run else "0",
REAL_SAVE_RECOVER_STATES="1" if save_recover_states else "0",
REAL_DUMP_TRACE=os.environ.get("REAL_DUMP_TRACE", "0"),
REAL_DUMP_MEMORY=os.environ.get("REAL_DUMP_MEMORY", "0"),
)
for k, v in BASE_ENVIRONS.items():
os.environ[k] = v
Expand Down Expand Up @@ -195,9 +192,6 @@ def main_start(args, recover_count: int = 0):
args.image_name,
)

timeout = (
None if os.getenv("REAL_TRACE", "0") == "0" else TRACE_TIMEOUT
) # run 5 mins to collect trace
try:
sched.wait(
check_status=(
Expand All @@ -207,13 +201,8 @@ def main_start(args, recover_count: int = 0):
JobState.COMPLETED,
),
remove_status=(),
timeout=timeout,
)
except (KeyboardInterrupt, JobException, TimeoutError) as e:
if os.getenv("REAL_TRACE", "0") != "0" and isinstance(e, TimeoutError):
s = "#" * 30 + " Trace complete. Killing all processes... " + "#" * 30
logger.info("\n" + "#" * len(s) + "\n" + s + "\n" + "#" * len(s))

recover_states = [
JobState.CANCELLED,
JobState.FAILED,
Expand Down Expand Up @@ -295,8 +284,6 @@ def _main_profile_layers(model_family, model_path):
WANDB_MODE="disabled",
REAL_MODE="slurm",
CLUSTER_SPEC_PATH=os.environ.get("CLUSTER_SPEC_PATH", ""),
REAL_DUMP_TRACE=os.environ.get("REAL_DUMP_TRACE", "0"),
REAL_DUMP_MEMORY=os.environ.get("REAL_DUMP_MEMORY", "0"),
)
clear_name_resolve(expr_name, trial_name)
sched = sched_client.make(
Expand Down
5 changes: 3 additions & 2 deletions realhf/base/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def get_tensor(self, tensor_shape, dtype, name, force_zero: bool = False):
QUICKSTART_EXPR_CACHE_PATH = f"{cluster_spec.fileroot}/.cache/{getpass.getuser()}/"
BASE_ENVIRONS = {
"PYTHONPATH": "/realhf",
"REAL_TRACE": os.getenv("REAL_TRACE", "0"),
"REAL_IS_REMOTE": "1",
# "NCCL_P2P_DISABLE": "1",
# "NCCL_IB_DISABLE": "1",
Expand Down Expand Up @@ -107,7 +106,9 @@ def get_tensor(self, tensor_shape, dtype, name, force_zero: bool = False):
# https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573
"TORCH_NCCL_AVOID_RECORD_STREAMS": "1",
# Whether to enable time mark to plot timelines.
# "REAL_CUDA_TMARK": "1",
"REAL_CUDA_TMARK": os.getenv("REAL_CUDA_TMARK", "0"),
"REAL_DUMP_TRACE": os.getenv("REAL_DUMP_TRACE", "0"),
"REAL_DUMP_MEMORY": os.getenv("REAL_DUMP_MEMORY", "0"),
}


Expand Down
11 changes: 11 additions & 0 deletions realhf/base/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ def destroy_all_comm_groups():
dist.destroy_process_group()


def decompose_to_three_factors(n: int) -> List[Tuple[int, int, int]]:
factors = []
for i in range(1, int(n ** (1 / 2)) + 1):
if n % i == 0:
for j in range(i, int((n // i) ** (1 / 2)) + 1):
if (n // i) % j == 0:
k = (n // i) // j
factors += list(set(itertools.permutations([i, j, k])))
return factors


class ProcessCoord(NamedTuple):
pipe: int
data: int
Expand Down
12 changes: 1 addition & 11 deletions realhf/experiments/benchmark/profile_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,12 @@
from realhf.api.quickstart.entrypoint import register_quickstart_exp
from realhf.api.quickstart.model import ModelTrainEvalConfig, ParallelismConfig
from realhf.base import constants, logging
from realhf.base.topology import decompose_to_three_factors
from realhf.experiments.common.common import CommonExperimentConfig

logger = logging.getLogger("Profiling Experiment", "system")


def decompose_to_three_factors(n: int) -> List[Tuple[int, int, int]]:
factors = []
for i in range(1, int(n ** (1 / 2)) + 1):
if n % i == 0:
for j in range(i, int((n // i) ** (1 / 2)) + 1):
if (n // i) % j == 0:
k = (n // i) // j
factors += list(set(itertools.permutations([i, j, k])))
return factors


def default_parallel_config(n_gpus: int) -> List[Dict[str, Any]]:
factors = decompose_to_three_factors(n_gpus)
x = [
Expand Down
12 changes: 1 addition & 11 deletions realhf/search_engine/param_realloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import realhf.base.topology as topology
from realhf.api.core.config import ModelFamily, ModelName
from realhf.api.core.model_api import ReaLModelConfig
from realhf.base.topology import decompose_to_three_factors


def bcast_cost(
Expand Down Expand Up @@ -107,17 +108,6 @@ def compute_cost(
return max_cost


def decompose_to_three_factors(n: int):
factors = []
for i in range(1, int(n ** (1 / 2)) + 1):
if n % i == 0:
for j in range(i, int((n // i) ** (1 / 2)) + 1):
if (n // i) % j == 0:
k = (n // i) // j
factors += list(set(itertools.permutations([i, j, k])))
return factors


def dump_table(
n_nodes: int,
model_family: ModelFamily,
Expand Down
32 changes: 16 additions & 16 deletions realhf/system/model_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,18 @@ def __maybe_profile_rpc(self, rpc: dfg.MFCDef):
finally:
# Dump profiler results.
pfer.__exit__(None, None, None)

def _get_subdir(name):
subdir = os.path.join(
constants.LOG_ROOT,
constants.experiment_name(),
constants.trial_name(),
name,
f"setup{self._setup_counter}",
)
os.makedirs(subdir, exist_ok=True)
return subdir

if _enable_profiler:
if self._dp_rank == 0 and self._is_dp_head:
blogger.info(
Expand All @@ -690,23 +702,11 @@ def __maybe_profile_rpc(self, rpc: dfg.MFCDef):
"This may take for a while..."
)

def _get_subdir(name):
subdir = os.path.join(
constants.LOG_ROOT,
constants.experiment_name(),
constants.trial_name(),
name,
f"setup{self._setup_counter}",
)
os.makedirs(subdir, exist_ok=True)
return subdir

if _enable_profiler:
pfer.export_chrome_trace(
os.path.join(
_get_subdir("trace"), f"{rpc.name}_r{dist.get_rank()}.json"
)
pfer.export_chrome_trace(
os.path.join(
_get_subdir("trace"), f"{rpc.name}_r{dist.get_rank()}.json"
)
)
if self._dp_rank == 0 and self._is_dp_head:
blogger.info(
f"System metrics collected. Time consumption:"
Expand Down

0 comments on commit 45c97b5

Please sign in to comment.