From fcf701ea7907c6b8443b98570eaa16bcb03f2a1c Mon Sep 17 00:00:00 2001 From: Su Zhou Date: Thu, 9 Nov 2023 12:12:17 -0800 Subject: [PATCH] Infra improvements (#66) * set docker context to root of autogluon-bench project to prepare for copying package setup files to docker * install agbench according to local agbench version * use static base dir in docker to increase caching * Use /home as base dir for dependencies * require IMDSV2 in instances * use AWS array job to avoid throttle * raise lambda error * use custom metrics with standard metrics * use custom_configs/ for mounting * handle empty params and default eval_metric for init * add metrics support * update tests * lint * update README --- .dockerignore | 4 + README.md | 14 +- pyproject.toml | 1 + src/autogluon/bench/Dockerfile | 40 ++-- .../lambdas/amlb_configs/__init__.py | 1 - .../custom_configs/amlb_configs/__init__.py | 0 .../batch_stack/lambdas/lambda_function.py | 176 +++++------------- .../bench/cloud/aws/batch_stack/stack.py | 30 ++- src/autogluon/bench/datasets/constants.py | 1 + src/autogluon/bench/entrypoint.sh | 10 +- .../preprocess/preprocess_openml.py | 2 +- .../eval/hardware_metrics/hardware_metrics.py | 37 ++-- .../bench/frameworks/multimodal/exec.py | 33 ++-- .../multimodal/multimodal_benchmark.py | 13 +- .../bench/frameworks/multimodal/setup.sh | 42 ++--- .../bench/frameworks/tabular/exec.sh | 16 +- .../bench/frameworks/tabular/setup.sh | 16 +- .../frameworks/tabular/tabular_benchmark.py | 7 +- .../bench/frameworks/timeseries/exec.sh | 16 +- .../bench/frameworks/timeseries/setup.sh | 18 +- .../timeseries/timeseries_benchmark.py | 6 +- src/autogluon/bench/runbenchmark.py | 156 ++++++++-------- .../bench/utils/hardware_utilization.sh | 3 +- .../unittests/benchmark/test_runbenchmarks.py | 69 ++----- tests/unittests/cloud/aws/test_stack.py | 1 - .../hardware_metrics/test_hardware_metrics.py | 4 +- 26 files changed, 303 insertions(+), 413 deletions(-) create mode 100644 .dockerignore delete mode 100644 src/autogluon/bench/cloud/aws/batch_stack/lambdas/amlb_configs/__init__.py create mode 100644 src/autogluon/bench/cloud/aws/batch_stack/lambdas/custom_configs/amlb_configs/__init__.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..7aa95b5b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +* +!.git/ +!src/ +!pyproject.toml diff --git a/README.md b/README.md index 944c3c1b..cf2a9ca7 100644 --- a/README.md +++ b/README.md @@ -33,12 +33,6 @@ cd autogluon-bench pip install -e ".[tests]" ``` -For development, please be aware that `autogluon.bench` is installed as a dependency in certain places, such as the [Dockerfile](https://github.com/autogluon/autogluon-bench/blob/master/src/autogluon/bench/Dockerfile) and [Multimodal Setup](https://github.com/autogluon/autogluon-bench/blob/master/src/autogluon/bench/frameworks/multimodal/setup.sh). We made it possible to reflect the development changes by pushing the changes to a remote GitHub branch, and providing the URI when testing on benchmark runs: - -``` -agbench run sample_configs/multimodal_cloud_configs.yaml --dev-branch https://github.com//autogluon-bench.git# -``` - ## Run benchmarks locally @@ -144,11 +138,11 @@ After having the configuration file ready, use the command below to initiate ben agbench run /path/to/cloud_config_file ``` -This command automatically sets up an AWS Batch environment using instance specifications defined in the [cloud config files](https://github.com/autogluon/autogluon-bench/tree/master/sample_configs). It also creates a lambda function named with your chosen `LAMBDA_FUNCTION_NAME`. This lambda function is automatically invoked with the cloud config file you provided, submitting multiple AWS Batch jobs to the job queue (named with the `PREFIX` you provided). +This command automatically sets up an AWS Batch environment using instance specifications defined in the [cloud config files](https://github.com/autogluon/autogluon-bench/tree/master/sample_configs). It also creates a lambda function named with your chosen `LAMBDA_FUNCTION_NAME`. This lambda function is automatically invoked with the cloud config file you provided, submitting a single AWS Batch job or a parent job for [Array jobs](https://docs.aws.amazon.com/batch/latest/userguide/array_jobs.html) to the job queue (named with the `PREFIX` you provided). -In order for the Lambda function to submit multiple jobs simultaneously, you need to specify a list of values for each module-specific key. Each combination of configurations is saved and uploaded to your specified `METRICS_BUCKET` in S3, stored under `S3://{METRICS_BUCKET}/configs/{BENCHMARK_NAME}_{timestamp}/{BENCHMARK_NAME}_split_{UID}.yaml`. Here, `UID` is a unique ID assigned to the split. +In order for the Lambda function to submit multiple Array child jobs simultaneously, you need to specify a list of values for each module-specific key. Each combination of configurations is saved and uploaded to your specified `METRICS_BUCKET` in S3, stored under `S3://{METRICS_BUCKET}/configs/{module}/{BENCHMARK_NAME}_{timestamp}/{BENCHMARK_NAME}_split_{UID}.yaml`. Here, `UID` is a unique ID assigned to the split. -The AWS infrastructure configurations and submitted job IDs are saved locally at `{WORKING_DIR}/{root_dir}/{module}/{benchmark_name}_{timestamp}/aws_configs.yaml`. You can use this file to check the job status at any time: +The AWS infrastructure configurations and submitted job ID is saved locally at `{WORKING_DIR}/{root_dir}/{module}/{benchmark_name}_{timestamp}/aws_configs.yaml`. You can use this file to check the job status at any time: ```bash agbench get-job-status --config-file /path/to/aws_configs.yaml @@ -272,5 +266,5 @@ agbench clean-amlb-results --help Step 3: Run evaluation on multiple cleaned files from `Step 2` ``` -agbench evaluate-amlb-results --frameworks-run framework_1 --frameworks-run framework_2 --results-dir-input data/results/input/prepared/openml/ --paths file_name_1.csv --paths file_name_2.csv --no-clean-data +agbench evaluate-amlb-results --frameworks-run framework_1 --frameworks-run framework_2 --results-dir-input data/results/input/prepared/openml/ --paths file_name_1.csv --paths file_name_2.csv --output-suffix benchmark_name --no-clean-data ``` diff --git a/pyproject.toml b/pyproject.toml index 4309b28e..3fb50330 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,3 +108,4 @@ xfail_strict = true [tool.setuptools_scm] write_to = "src/autogluon/bench/version.py" +fallback_version = "0.0.1.dev0" \ No newline at end of file diff --git a/src/autogluon/bench/Dockerfile b/src/autogluon/bench/Dockerfile index ded26faa..59cb6858 100644 --- a/src/autogluon/bench/Dockerfile +++ b/src/autogluon/bench/Dockerfile @@ -2,6 +2,8 @@ ARG AG_BENCH_BASE_IMAGE FROM $AG_BENCH_BASE_IMAGE ENV DEBIAN_FRONTEND=noninteractive +ENV RUNNING_IN_DOCKER=true +ENV AGBENCH_BASE=src/autogluon/bench/ # Install essential packages and Python 3.9 RUN apt-get update && \ @@ -22,48 +24,38 @@ RUN apt-get install -y python3-pip unzip curl git pciutils && \ rm -rf /var/lib/apt/lists/* /usr/local/aws # Application-specific steps -ARG AG_BENCH_DEV_URL ARG AG_BENCH_VERSION ARG CDK_DEPLOY_REGION ARG FRAMEWORK_PATH ARG GIT_URI ARG GIT_BRANCH -ARG BENCHMARK_DIR ARG AMLB_FRAMEWORK ARG AMLB_USER_DIR WORKDIR /app/ -RUN if [ -n "$AG_BENCH_DEV_URL" ]; then \ - echo "Cloning: $AG_BENCH_DEV_URL" \ - && AG_BENCH_DEV_REPO=$(echo "$AG_BENCH_DEV_URL" | cut -d "#" -f 1) \ - && AG_BENCH_DEV_BRANCH=$(echo "$AG_BENCH_DEV_URL" | cut -d "#" -f 2) \ - && git clone --branch "$AG_BENCH_DEV_BRANCH" --single-branch "$AG_BENCH_DEV_REPO" /app/autogluon-bench \ - && python3 -m pip install -e /app/autogluon-bench; \ +# Copying necessary files for autogluon-bench package +COPY . /app/ +COPY ${AGBENCH_BASE}entrypoint.sh /app/ +COPY ${AGBENCH_BASE}custom_configs /app/custom_configs/ + +# check if autogluon.bench version contains "dev" tag +RUN if echo "$AG_BENCH_VERSION" | grep -q "dev"; then \ + # install from local source + pip install /app/; \ else \ - output=$(pip install autogluon.bench==$AG_BENCH_VERSION 2>&1) || true; \ - if echo $output | grep -q "No matching distribution"; then \ - echo -e "ERROR: No matching distribution found for autogluon.bench==$AG_BENCH_VERSION\n\ - To resolve the issue, try 'agbench run --dev-branch #'"; \ - exit 1; \ - fi; \ + pip install autogluon.bench==$AG_BENCH_VERSION; \ fi -COPY entrypoint.sh utils/hardware_utilization.sh $FRAMEWORK_PATH/setup.sh custom_configs/ /app/ - -RUN chmod +x setup.sh entrypoint.sh hardware_utilization.sh \ +RUN chmod +x entrypoint.sh \ && if echo "$FRAMEWORK_PATH" | grep -q -E "tabular|timeseries"; then \ if [ -n "$AMLB_USER_DIR" ]; then \ - bash setup.sh $GIT_URI $GIT_BRANCH $BENCHMARK_DIR $AMLB_FRAMEWORK $AMLB_USER_DIR; \ + bash ${AGBENCH_BASE}${FRAMEWORK_PATH}setup.sh $GIT_URI $GIT_BRANCH "/home" $AMLB_FRAMEWORK $AMLB_USER_DIR; \ else \ - bash setup.sh $GIT_URI $GIT_BRANCH $BENCHMARK_DIR $AMLB_FRAMEWORK; \ + bash ${AGBENCH_BASE}${FRAMEWORK_PATH}setup.sh $GIT_URI $GIT_BRANCH "/home" $AMLB_FRAMEWORK; \ fi; \ elif echo "$FRAMEWORK_PATH" | grep -q "multimodal"; then \ - if [ -n "$AG_BENCH_DEV_URL" ]; then \ - bash setup.sh $GIT_URI $GIT_BRANCH $BENCHMARK_DIR --AGBENCH_DEV_URL=$AG_BENCH_DEV_URL; \ - else \ - bash setup.sh $GIT_URI $GIT_BRANCH $BENCHMARK_DIR --AG_BENCH_VER=$AG_BENCH_VERSION; \ - fi; \ + bash ${AGBENCH_BASE}${FRAMEWORK_PATH}setup.sh $GIT_URI $GIT_BRANCH "/home" $AG_BENCH_VERSION; \ fi \ && echo "CDK_DEPLOY_REGION=$CDK_DEPLOY_REGION" >> /etc/environment diff --git a/src/autogluon/bench/cloud/aws/batch_stack/lambdas/amlb_configs/__init__.py b/src/autogluon/bench/cloud/aws/batch_stack/lambdas/amlb_configs/__init__.py deleted file mode 100644 index 161754df..00000000 --- a/src/autogluon/bench/cloud/aws/batch_stack/lambdas/amlb_configs/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# placeholder mounting point for custom user_dir diff --git a/src/autogluon/bench/cloud/aws/batch_stack/lambdas/custom_configs/amlb_configs/__init__.py b/src/autogluon/bench/cloud/aws/batch_stack/lambdas/custom_configs/amlb_configs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/autogluon/bench/cloud/aws/batch_stack/lambdas/lambda_function.py b/src/autogluon/bench/cloud/aws/batch_stack/lambdas/lambda_function.py index 8aa5df14..d973bf06 100644 --- a/src/autogluon/bench/cloud/aws/batch_stack/lambdas/lambda_function.py +++ b/src/autogluon/bench/cloud/aws/batch_stack/lambdas/lambda_function.py @@ -2,7 +2,6 @@ import itertools import logging import os -import uuid import zipfile import requests @@ -18,7 +17,7 @@ AMLB_DEPENDENT_MODULES = ["tabular", "timeseries"] -def submit_batch_job(env: list, job_name: str, job_queue: str, job_definition: str): +def submit_batch_job(env: list, job_name: str, job_queue: str, job_definition: str, array_size: int): """ Submits a Batch job with the given environment variables, job name, job queue and job definition. @@ -27,17 +26,23 @@ def submit_batch_job(env: list, job_name: str, job_queue: str, job_definition: s job_name (str): Name of the job. job_queue (str): Name of the job queue. job_definition (str): Name of the job definition. + array_size (int): Number of jobs to submit. Returns: str: Job ID. """ container_overrides = {"environment": env} - response = aws_batch.submit_job( - jobName=job_name, - jobQueue=job_queue, - jobDefinition=job_definition, - containerOverrides=container_overrides, - ) + job_params = { + "jobName": job_name, + "jobQueue": job_queue, + "jobDefinition": job_definition, + "containerOverrides": container_overrides, + } + if array_size > 1: + job_params["arrayProperties"] = {"size": array_size} + + response = aws_batch.submit_job(**job_params) + logger.info("Job %s submitted to AWS Batch queue %s.", job_name, job_queue) logger.info(response) return response["jobId"] @@ -88,7 +93,7 @@ def download_dir_from_s3(s3_path: str, local_path: str) -> str: return local_path -def upload_config(bucket: str, benchmark_name: str, file: str): +def upload_config(config_list: list, bucket: str, benchmark_name: str): """ Uploads a file to the given S3 bucket. @@ -99,28 +104,9 @@ def upload_config(bucket: str, benchmark_name: str, file: str): Returns: str: S3 path of the uploaded file. """ - file_name = f'{file.split("/")[-1].split(".")[0]}.yaml' - s3_path = f"configs/{benchmark_name}/{file_name}" - s3.upload_file(file, bucket, s3_path) - return f"s3://{bucket}/{s3_path}" - - -def save_configs(configs: dict, uid: str): - """ - Saves the given dictionary of configs to a YAML file with the given UID as a part of the filename. - - Args: - configs (Dict[str, Any]): Dictionary of configurations to be saved. - uid (str): UID to be added to the filename of the saved file. - - Returns: - str: Local path of the saved file. - """ - benchmark_name = configs["benchmark_name"] - config_file_path = os.path.join("/tmp", f"{benchmark_name}_split_{uid}.yaml") - with open(config_file_path, "w+") as f: - yaml.dump(configs, f, default_flow_style=False) - return config_file_path + s3_key = f"configs/{benchmark_name}/{benchmark_name}_job_configs.yaml" + s3.put_object(Body=yaml.dump(config_list), Bucket=bucket, Key=s3_key) + return f"s3://{bucket}/{s3_key}" def download_automlbenchmark_resources(): @@ -217,59 +203,37 @@ def process_benchmark_runs(module_configs: dict, amlb_benchmark_search_dirs: lis module_configs["fold_to_run"][benchmark][task] = amlb_task_folds[benchmark][task] -def process_combination(configs, metrics_bucket, batch_job_queue, batch_job_definition): - """ - Processes a combination of configurations by generating and submitting Batch jobs. - - Args: - combination (Tuple): tuple of configurations to process. - keys (List[str]): list of keys of the configurations. - metrics_bucket (str): name of the bucket to upload metrics to. - batch_job_queue (str): name of the Batch job queue to submit jobs to. - batch_job_definition (str): name of the Batch job definition to use for submitting jobs. - - Returns: - str: job id of the submitted batch job. - """ - logger.info(f"Generating config with: {configs}") - config_uid = uuid.uuid1().hex - config_local_path = save_configs(configs=configs, uid=config_uid) - config_s3_path = upload_config( - bucket=metrics_bucket, benchmark_name=configs["benchmark_name"], file=config_local_path - ) - job_name = f"{configs['benchmark_name']}-{configs['module']}-{config_uid}" - env = [{"name": "config_file", "value": config_s3_path}] - - job_id = submit_batch_job( - env=env, - job_name=job_name, - job_queue=batch_job_queue, - job_definition=batch_job_definition, - ) - return job_id, config_s3_path +def get_cloudwatch_logs_url(region: str, job_id: str, log_group_name: str = "aws/batch/job"): + base_url = f"https://console.aws.amazon.com/cloudwatch/home?region={region}" + job_response = aws_batch.describe_job(jobs=[job_id]) + log_stream_name = job_response["jobs"][0]["attempts"][0]["container"]["logStreamName"] + return f"{base_url}#logsV2:log-groups/log-group/{log_group_name.replace('/', '%2F')}/log-events/{log_stream_name.replace('/', '%2F')}" def generate_config_combinations(config, metrics_bucket, batch_job_queue, batch_job_definition): - job_configs = {} - config.pop("cdk_context") + job_configs = [] if config["module"] in AMLB_DEPENDENT_MODULES: - job_configs = generate_amlb_module_config_combinations( - config, metrics_bucket, batch_job_queue, batch_job_definition - ) + job_configs = generate_amlb_module_config_combinations(config) elif config["module"] == "multimodal": - job_configs = generate_multimodal_config_combinations( - config, metrics_bucket, batch_job_queue, batch_job_definition - ) + job_configs = generate_multimodal_config_combinations(config) else: raise ValueError("Invalid module. Choose either 'tabular', 'timeseries', or 'multimodal'.") - response = { - "job_configs": job_configs, - } - return response + benchmark_name = config["benchmark_name"] + config_s3_path = upload_config(config_list=job_configs, bucket=metrics_bucket, benchmark_name=benchmark_name) + env = [{"name": "config_file", "value": config_s3_path}] + job_name = f"{benchmark_name}-array-job" + parent_job_id = submit_batch_job( + env=env, + job_name=job_name, + job_queue=batch_job_queue, + job_definition=batch_job_definition, + array_size=len(job_configs), + ) + return {parent_job_id: config_s3_path} -def generate_multimodal_config_combinations(config, metrics_bucket, batch_job_queue, batch_job_definition): +def generate_multimodal_config_combinations(config): common_keys = [] specific_keys = [] for key in config.keys(): @@ -278,23 +242,21 @@ def generate_multimodal_config_combinations(config, metrics_bucket, batch_job_qu else: common_keys.append(key) - job_configs = {} specific_value_combinations = list( itertools.product(*(config[key] for key in specific_keys if key in config.keys())) ) or [None] + all_configs = [] for combo in specific_value_combinations: new_config = {key: config[key] for key in common_keys} if combo is not None: new_config.update(dict(zip(specific_keys, combo))) + all_configs.append(new_config) - job_id, config_s3_path = process_combination(new_config, metrics_bucket, batch_job_queue, batch_job_definition) - job_configs[job_id] = config_s3_path - - return job_configs + return all_configs -def generate_amlb_module_config_combinations(config, metrics_bucket, batch_job_queue, batch_job_definition): +def generate_amlb_module_config_combinations(config): specific_keys = ["git_uri#branch", "framework", "amlb_constraint", "amlb_user_dir"] exclude_keys = ["amlb_benchmark", "amlb_task", "fold_to_run"] common_keys = [] @@ -308,13 +270,13 @@ def generate_amlb_module_config_combinations(config, metrics_bucket, batch_job_q else: common_keys.append(key) - job_configs = {} specific_value_combinations = list( itertools.product(*(config[key] for key in specific_keys if key in config.keys())) ) or [None] # Iterate through the combinations and the amlb benchmark task keys # Generates a config for each combination of specific key and keys in `fold_to_run` + all_configs = [] for combo in specific_value_combinations: for benchmark, tasks in config["fold_to_run"].items(): for task, fold_numbers in tasks.items(): @@ -325,11 +287,9 @@ def generate_amlb_module_config_combinations(config, metrics_bucket, batch_job_q new_config["amlb_benchmark"] = benchmark new_config["amlb_task"] = task new_config["fold_to_run"] = fold_num - job_id, config_s3_path = process_combination( - new_config, metrics_bucket, batch_job_queue, batch_job_definition - ) - job_configs[job_id] = config_s3_path - return job_configs + all_configs.append(new_config) + + return all_configs def handler(event, context): @@ -337,50 +297,6 @@ def handler(event, context): Execution entrypoint for AWS Lambda. Triggers batch jobs with hyperparameter combinations. ENV variables are set by the AWS CDK infra code. - - Sample of cloud_configs.yaml to be supplied by user - - # Infra configurations - cdk_context: - CDK_DEPLOY_ACCOUNT: dummy - CDK_DEPLOY_REGION: dummy - - # Benchmark configurations - module: multimodal - mode: aws - benchmark_name: test_yaml - metrics_bucket: autogluon-benchmark-metrics - - # Module specific configurations - module_configs: - # Multimodal specific - multimodal: - git_uri#branch: https://github.com/autogluon/autogluon#master - dataset_name: melbourne_airbnb - presets: medium_quality - hyperparameters: - optimization.learning_rate: 0.0005 - optimization.max_epochs: 5 - time_limit: 10 - - - # Tabular specific - # You can refer to AMLB (https://github.com/openml/automlbenchmark#quickstart) for more details - tabular: - framework: - - AutoGluon - label: - - stable - amlb_benchmark: - - test - - small - amlb_task: - test: null - small: - - credit-g - - vehicle - amlb_constraint: - - test """ if "config_file" not in event or not event["config_file"].startswith("s3"): raise KeyError("S3 path of config file is required.") diff --git a/src/autogluon/bench/cloud/aws/batch_stack/stack.py b/src/autogluon/bench/cloud/aws/batch_stack/stack.py index c7877a59..0cd0b1fc 100644 --- a/src/autogluon/bench/cloud/aws/batch_stack/stack.py +++ b/src/autogluon/bench/cloud/aws/batch_stack/stack.py @@ -22,8 +22,25 @@ AWS Batch as the compute enviroment in which a docker image runs the benchmarking script. """ + +def find_project_root_or_fallback(start_dir: str, root_identifier: str = "pyproject.toml"): + """Find the project root directory by searching for a specific identifier file. + If not found, fall back to the starting directory. + """ + current_dir = start_dir + + while current_dir != os.path.dirname(current_dir): + if os.path.exists(os.path.join(current_dir, root_identifier)): + return str(current_dir) + current_dir = os.path.dirname(current_dir) + + return start_dir + + with importlib.resources.path("autogluon.bench", "Dockerfile") as file_path: docker_base_dir = os.path.dirname(file_path) + project_root = find_project_root_or_fallback(docker_base_dir) + docker_path = os.path.relpath(file_path, project_root) with importlib.resources.path("autogluon.bench.cloud.aws.batch_stack.lambdas", "lambda_function.py") as file_path: lambda_script_dir = os.path.dirname(file_path) @@ -160,17 +177,16 @@ def __init__(self, scope: Construct, id: str, static_stack: StaticResourceStack, docker_image_asset = ecr_assets.DockerImageAsset( self, f"{prefix}-ecr-docker-image-asset", - directory=docker_base_dir, + directory=project_root, + file=docker_path, follow_symlinks=core.SymlinkFollowMode.ALWAYS, build_args={ "AG_BENCH_BASE_IMAGE": os.environ["AG_BENCH_BASE_IMAGE"], "AG_BENCH_VERSION": os.getenv("AG_BENCH_VERSION", "latest"), - "AG_BENCH_DEV_URL": os.getenv("AG_BENCH_DEV_URL", ""), "CDK_DEPLOY_REGION": os.environ["CDK_DEPLOY_REGION"], "FRAMEWORK_PATH": os.environ["FRAMEWORK_PATH"], "GIT_URI": os.environ["GIT_URI"], "GIT_BRANCH": os.environ["GIT_BRANCH"], - "BENCHMARK_DIR": os.environ["BENCHMARK_DIR"], "AMLB_FRAMEWORK": os.getenv("AMLB_FRAMEWORK", ""), "AMLB_USER_DIR": os.getenv("AMLB_USER_DIR", ""), }, @@ -186,10 +202,6 @@ def __init__(self, scope: Construct, id: str, static_stack: StaticResourceStack, # Bug that this parameter is not rending in the CF stack under cdk.out # https://github.com/aws/aws-cdk/issues/13023 linux_params=ecs.LinuxParameters(self, f"{prefix}-linux_params", shared_memory_size=container_memory), - environment={ - "AG_BENCH_VERSION": os.getenv("AG_BENCH_VERSION", "latest"), - "AG_BENCH_DEV_URL": os.getenv("AG_BENCH_DEV_URL", ""), - }, ) job_definition = batch.JobDefinition( @@ -213,8 +225,8 @@ def __init__(self, scope: Construct, id: str, static_stack: StaticResourceStack, volume=ec2.BlockDeviceVolume.ebs(block_device_volume), ) ], - http_tokens=ec2.LaunchTemplateHttpTokens.OPTIONAL, - http_endpoint=True, + http_tokens=ec2.LaunchTemplateHttpTokens.REQUIRED, + require_imdsv2=True, ) cloudwatch_policy = iam.Policy( diff --git a/src/autogluon/bench/datasets/constants.py b/src/autogluon/bench/datasets/constants.py index 3f1ac480..d9fdc011 100644 --- a/src/autogluon/bench/datasets/constants.py +++ b/src/autogluon/bench/datasets/constants.py @@ -9,3 +9,4 @@ _IMAGE_TEXT_SIMILARITY = "image_text_similarity" _TEXT_SIMILARITY = "text_similarity" _OBJECT_DETECTION = "object_detection" +_FEW_SHOT_CLASSIFICATION = "few_shot_classification" diff --git a/src/autogluon/bench/entrypoint.sh b/src/autogluon/bench/entrypoint.sh index 33fb4537..b71f9fc4 100644 --- a/src/autogluon/bench/entrypoint.sh +++ b/src/autogluon/bench/entrypoint.sh @@ -1,12 +1,6 @@ #!/bin/bash echo "Running hardware utilization monitoring in the background..." -./hardware_utilization.sh & +${AGBENCH_BASE}utils/hardware_utilization.sh & -if [ -n "$AG_BENCH_DEV_URL" ]; then - echo "Using Development Branch: $AG_BENCH_DEV_URL" >&2 - agbench run $config_file --skip-setup --dev-branch $AG_BENCH_DEV_URL -else - echo "Using Released autogluon.bench: " >&2 - agbench run $config_file --skip-setup -fi +agbench run $config_file --skip-setup diff --git a/src/autogluon/bench/eval/evaluation/preprocess/preprocess_openml.py b/src/autogluon/bench/eval/evaluation/preprocess/preprocess_openml.py index 79f1ce8d..7c53408b 100644 --- a/src/autogluon/bench/eval/evaluation/preprocess/preprocess_openml.py +++ b/src/autogluon/bench/eval/evaluation/preprocess/preprocess_openml.py @@ -34,7 +34,7 @@ def preprocess_openml_input( # TODO: This is a hack and won't work for all metrics, metric_error should ideally be calculated prior to preprocessing raw_input[METRIC_ERROR] = [ - 1 - score if metric in ["auc", "acc", "balacc", "map", "roc_auc", "r2"] else -score + 1 - score if metric in ["auc", "acc", "accuracy", "balacc", "map", "roc_auc", "r2", "coverage"] else -score for score, metric in zip(raw_input[METRIC_SCORE], raw_input["metric"]) ] diff --git a/src/autogluon/bench/eval/hardware_metrics/hardware_metrics.py b/src/autogluon/bench/eval/hardware_metrics/hardware_metrics.py index 6b0756eb..1c135253 100644 --- a/src/autogluon/bench/eval/hardware_metrics/hardware_metrics.py +++ b/src/autogluon/bench/eval/hardware_metrics/hardware_metrics.py @@ -32,7 +32,7 @@ def find_s3_file(s3_bucket: str, prefix: str, file: str): return None -def get_job_ids(config_file: str): +def get_job_ids(config: dict): """ This function returns a list of job IDs of all jobs ran for a benchmark run Parameters @@ -40,7 +40,7 @@ def get_job_ids(config_file: str): config_file: str, Path to config file containing job IDs """ - job_ids = list(config_file.get("job_configs", {}).keys()) + job_ids = list(config.get("job_configs", {}).keys()) return job_ids @@ -283,18 +283,33 @@ def get_hardware_metrics( aws_account_region = config.get("CDK_DEPLOY_REGION") cloudwatch_client = boto3.client("cloudwatch", region_name=aws_account_region) + batch_client = boto3.client("batch", region_name=aws_account_region) metrics_list = [] for job_id in job_ids: - sub_folder = config["job_configs"][f"{job_id}"].split("/")[-1].split(".")[0].replace("_split", "") - metrics_list += get_metrics( - job_id=job_id, - s3_bucket=s3_bucket, - module=module, - benchmark_name=benchmark_name, - sub_folder=sub_folder, - cloudwatch_client=cloudwatch_client, - ) + response = batch_client.describe_jobs(jobs=[job_id]) + job_detail = response["jobs"][0] + + # Check if the job is an array job + if "arrayProperties" in job_detail and "size" in job_detail["arrayProperties"]: + size = job_detail["arrayProperties"]["size"] + sub_ids = [f"{job_id}:{i}" for i in range(size)] + else: + sub_ids = [job_id] + + for sub_id in sub_ids: + id = sub_id.split(":")[-1] + if id != "": + id = "_" + id + sub_folder = f"{benchmark_name}{id}" + metrics_list += get_metrics( + job_id=sub_id, + s3_bucket=s3_bucket, + module=module, + benchmark_name=benchmark_name, + sub_folder=sub_folder, + cloudwatch_client=cloudwatch_client, + ) if metrics_list: with tempfile.TemporaryDirectory() as temp_dir: local_path = save_results(metrics_list, temp_dir) diff --git a/src/autogluon/bench/frameworks/multimodal/exec.py b/src/autogluon/bench/frameworks/multimodal/exec.py index 5ff7bec9..ab7c4e78 100644 --- a/src/autogluon/bench/frameworks/multimodal/exec.py +++ b/src/autogluon/bench/frameworks/multimodal/exec.py @@ -8,16 +8,17 @@ from datetime import datetime from typing import Optional, Union -from autogluon.bench.datasets.constants import ( - _IMAGE_SIMILARITY, - _IMAGE_TEXT_SIMILARITY, - _OBJECT_DETECTION, - _TEXT_SIMILARITY, -) from autogluon.bench.datasets.dataset_registry import multimodal_dataset_registry from autogluon.core.metrics import make_scorer from autogluon.multimodal import MultiModalPredictor from autogluon.multimodal import __version__ as ag_version +from autogluon.multimodal.constants import ( + FEW_SHOT_CLASSIFICATION, + IMAGE_SIMILARITY, + IMAGE_TEXT_SIMILARITY, + OBJECT_DETECTION, + TEXT_SIMILARITY, +) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -151,7 +152,7 @@ def run( benchmark_dir: str, metrics_dir: str, constraint: Optional[str] = None, - params: Optional[dict] = {}, + params: Optional[dict] = None, custom_dataloader: Optional[dict] = None, custom_metrics: Optional[dict] = None, ): @@ -191,34 +192,32 @@ def run( label_column = train_data.label_columns[0] except (AttributeError, IndexError): # Object Detection does not have label columns label_column = None - + if params is None: + params = {} predictor_args = { "label": label_column, "problem_type": train_data.problem_type, "presets": params.pop("presets", None), "path": os.path.join(benchmark_dir, "models"), } - - if train_data.problem_type == _IMAGE_SIMILARITY: + if train_data.problem_type == IMAGE_SIMILARITY: predictor_args["query"] = train_data.image_columns[0] predictor_args["response"] = train_data.image_columns[1] predictor_args["match_label"] = train_data.match_label - elif train_data.problem_type == _IMAGE_TEXT_SIMILARITY: + elif train_data.problem_type == IMAGE_TEXT_SIMILARITY: predictor_args["query"] = train_data.text_columns[0] predictor_args["response"] = train_data.image_columns[0] - predictor_args["eval_metric"] = train_data.metric del predictor_args["label"] - elif train_data.problem_type == _TEXT_SIMILARITY: + elif train_data.problem_type == TEXT_SIMILARITY: predictor_args["query"] = train_data.text_columns[0] predictor_args["response"] = train_data.text_columns[1] predictor_args["match_label"] = train_data.match_label - elif train_data.problem_type == _OBJECT_DETECTION: + elif train_data.problem_type == OBJECT_DETECTION: predictor_args["sample_data_path"] = train_data.data metrics_func = None - if custom_metrics is not None: + if custom_metrics is not None and custom_metrics["function_name"] == train_data.metric: metrics_func = load_custom_metrics(custom_metrics=custom_metrics) - predictor_args["eval_metric"] = metrics_func predictor = MultiModalPredictor(**predictor_args) @@ -236,7 +235,7 @@ def run( "metrics": test_data.metric if metrics_func is None else metrics_func, } - if test_data.problem_type == _IMAGE_TEXT_SIMILARITY: + if test_data.problem_type == IMAGE_TEXT_SIMILARITY: evaluate_args["query_data"] = test_data.data[test_data.text_columns[0]].unique().tolist() evaluate_args["response_data"] = test_data.data[test_data.image_columns[0]].unique().tolist() evaluate_args["cutoffs"] = [1, 5, 10] diff --git a/src/autogluon/bench/frameworks/multimodal/multimodal_benchmark.py b/src/autogluon/bench/frameworks/multimodal/multimodal_benchmark.py index a7eddaa7..b5b34d11 100644 --- a/src/autogluon/bench/frameworks/multimodal/multimodal_benchmark.py +++ b/src/autogluon/bench/frameworks/multimodal/multimodal_benchmark.py @@ -29,7 +29,6 @@ def setup( self, git_uri: str = "https://github.com/autogluon/autogluon.git", git_branch: str = "master", - agbench_dev_url: str = None, ): """ Sets up the virtual environment for running the benchmark. @@ -42,11 +41,7 @@ def setup( None """ setup_script_path = os.path.abspath(os.path.dirname(__file__)) + "/setup.sh" - command = [setup_script_path, git_uri, git_branch, self.benchmark_dir] - if agbench_dev_url is not None: - command.append(f"--AGBENCH_DEV_URL={agbench_dev_url}") - else: - command.append(f"--AG_BENCH_VER={agbench_version}") + command = [setup_script_path, git_uri, git_branch, self.benchmark_dir, agbench_version] result = subprocess.run(command) if result.returncode != 0: sys.exit(1) @@ -93,7 +88,11 @@ def run( Returns: None """ - PY_EXC_PATH = self.benchmark_dir + "/.venv/bin/python" + if os.environ.get("RUNNING_IN_DOCKER", False): + venv_base_dir = "/home/" + else: + venv_base_dir = self.benchmark_dir + PY_EXC_PATH = os.path.join(venv_base_dir, ".venv/bin/python") exec_path = os.path.abspath(os.path.dirname(__file__)) + "/exec.py" command = [ diff --git a/src/autogluon/bench/frameworks/multimodal/setup.sh b/src/autogluon/bench/frameworks/multimodal/setup.sh index 7d2e7736..4babbac5 100755 --- a/src/autogluon/bench/frameworks/multimodal/setup.sh +++ b/src/autogluon/bench/frameworks/multimodal/setup.sh @@ -4,50 +4,32 @@ set -eo pipefail GIT_URI=$1 BRANCH=$2 -DIR=$3 # from root of benchmark run -ARG=$4 +venv_base_dir=$3 # from root of benchmark run +AG_BENCH_VERSION=$4 -if [ ! -d $DIR ]; then - mkdir -p $DIR +if [ ! -d $venv_base_dir ]; then + mkdir -p $venv_base_dir fi echo "Cloning $GIT_URI#$BRANCH..." repo_name=$(basename -s .git $(echo $GIT_URI)) -git clone --depth 1 --single-branch --branch ${BRANCH} --recurse-submodules ${GIT_URI} $DIR/$repo_name +git clone --depth 1 --single-branch --branch ${BRANCH} --recurse-submodules ${GIT_URI} $venv_base_dir/$repo_name # create virtual env -python3 -m venv $DIR/.venv -source $DIR/.venv/bin/activate +python3 -m venv $venv_base_dir/.venv +source $venv_base_dir/.venv/bin/activate python3 -m pip install --upgrade pip python3 -m pip install --upgrade setuptools wheel -if [[ "$ARG" == "--AGBENCH_DEV_URL="* ]]; then - AGBENCH_DEV_URL="${ARG#*=}" - echo "Installing Dev Branch $AGBENCH_DEV_URL" - AGBENCH_URI=$(echo "$AGBENCH_DEV_URL" | cut -d '#' -f 1) - AGBENCH_BRANCH=$(echo "$AGBENCH_DEV_URL" | cut -d '#' -f 2) - agbench_repo_name=$(basename -s .git $(echo $AGBENCH_URI)) - git clone --single-branch --branch ${AGBENCH_BRANCH} ${AGBENCH_URI} $DIR/$agbench_repo_name - cd $DIR/$agbench_repo_name - python3 -m pip install -e . - cd - -elif [[ "$ARG" == "--AG_BENCH_VER="* ]]; then - AG_BENCH_VER="${ARG#*=}" - output=$(python3 -m pip install autogluon.bench==$AG_BENCH_VER 2>&1) || { - err_message=$output - if [[ $err_message == *"No matching distribution"* ]]; then - echo -e "ERROR: No matching distribution found for autogluon.bench==$AG_BENCH_VER\n \ - To resolve the issue, try 'agbench run --dev-branch #" - fi - exit 1 - } +if echo "$AG_BENCH_VERSION" | grep -q "dev"; then + # install from local source or docker + pip install . else - echo "Invalid argument: $ARG" - exit 1 + pip install autogluon.bench==$AG_BENCH_VERSION fi -cd $DIR/$repo_name +cd $venv_base_dir/$repo_name python3 -m pip install -e common python3 -m pip install -e core[all] diff --git a/src/autogluon/bench/frameworks/tabular/exec.sh b/src/autogluon/bench/frameworks/tabular/exec.sh index a990c24f..e4aa6873 100755 --- a/src/autogluon/bench/frameworks/tabular/exec.sh +++ b/src/autogluon/bench/frameworks/tabular/exec.sh @@ -2,11 +2,11 @@ set -eo pipefail -framework=${1} -benchmark=${2} -constraint=${3} -benchmark_dir=${4} # from root of project -metrics_dir=${5} +framework=$1 +benchmark=$2 +constraint=$3 +venv_base_dir=$4 +metrics_dir=$5 shift 5 while getopts "t:f:u:" opt; do @@ -30,11 +30,11 @@ if [ -n "$fold" ]; then fi if [ -n "$user_dir" ]; then - cp -r $user_dir $benchmark_dir + cp -r $user_dir $venv_base_dir amlb_args+=" -u $user_dir" fi -source $benchmark_dir/.venv/bin/activate +source $venv_base_dir/.venv/bin/activate echo "Running AMLB benchmark with args $amlb_args" -python3 $benchmark_dir/automlbenchmark/runbenchmark.py $amlb_args +python3 $venv_base_dir/automlbenchmark/runbenchmark.py $amlb_args diff --git a/src/autogluon/bench/frameworks/tabular/setup.sh b/src/autogluon/bench/frameworks/tabular/setup.sh index 1f979453..1c8d273b 100755 --- a/src/autogluon/bench/frameworks/tabular/setup.sh +++ b/src/autogluon/bench/frameworks/tabular/setup.sh @@ -4,26 +4,26 @@ set -eo pipefail GIT_URI=$1 # AMLB Git URI BRANCH=$2 # AMLB branch -DIR=$3 # from root of benchmark run +venv_base_dir=$3 # from root of benchmark run AMLB_FRAMEWORK=$4 # e.g. AutoGluon_dev:test AMLB_USER_DIR=$5 # directory where AMLB customizations are located -if [ ! -d $DIR ]; then - mkdir -p $DIR +if [ ! -d $venv_base_dir ]; then + mkdir -p $venv_base_dir fi # create virtual env -python3 -m venv $DIR/.venv -source $DIR/.venv/bin/activate +python3 -m venv $venv_base_dir/.venv +source $venv_base_dir/.venv/bin/activate echo "Cloning $GIT_URI#$BRANCH..." repo_name=$(basename -s .git $(echo $GIT_URI)) -git clone --depth 1 --branch ${BRANCH} ${GIT_URI} $DIR/$repo_name +git clone --depth 1 --branch ${BRANCH} ${GIT_URI} $venv_base_dir/$repo_name python3 -m pip install --upgrade pip python3 -m pip install --upgrade setuptools wheel -python3 -m pip install -r $DIR/automlbenchmark/requirements.txt +python3 -m pip install -r $venv_base_dir/automlbenchmark/requirements.txt # install amlb framework only echo "Installing framework $AMLB_FRAMEWORK" @@ -33,4 +33,4 @@ if [ -n "$AMLB_USER_DIR" ]; then echo "using user_dir $AMLB_USER_DIR" amlb_args+=" -u $AMLB_USER_DIR" fi -python3 $DIR/automlbenchmark/runbenchmark.py $amlb_args +python3 $venv_base_dir/automlbenchmark/runbenchmark.py $amlb_args diff --git a/src/autogluon/bench/frameworks/tabular/tabular_benchmark.py b/src/autogluon/bench/frameworks/tabular/tabular_benchmark.py index dce04cea..060f58e9 100644 --- a/src/autogluon/bench/frameworks/tabular/tabular_benchmark.py +++ b/src/autogluon/bench/frameworks/tabular/tabular_benchmark.py @@ -52,13 +52,18 @@ def run( None """ + if os.environ.get("RUNNING_IN_DOCKER", False): + venv_base_dir = "/home/" + else: + venv_base_dir = self.benchmark_dir + exec_script_path = os.path.abspath(os.path.dirname(__file__)) + "/exec.sh" command = [ exec_script_path, framework, benchmark, constraint, - self.benchmark_dir, + venv_base_dir, self.metrics_dir, ] diff --git a/src/autogluon/bench/frameworks/timeseries/exec.sh b/src/autogluon/bench/frameworks/timeseries/exec.sh index a990c24f..f526fa8f 100755 --- a/src/autogluon/bench/frameworks/timeseries/exec.sh +++ b/src/autogluon/bench/frameworks/timeseries/exec.sh @@ -2,11 +2,11 @@ set -eo pipefail -framework=${1} -benchmark=${2} -constraint=${3} -benchmark_dir=${4} # from root of project -metrics_dir=${5} +framework=$1 +benchmark=$2 +constraint=$3 +venv_base_dir=$4 +metrics_dir=$5 shift 5 while getopts "t:f:u:" opt; do @@ -30,11 +30,11 @@ if [ -n "$fold" ]; then fi if [ -n "$user_dir" ]; then - cp -r $user_dir $benchmark_dir + cp -r $user_dir $venv_base_dir amlb_args+=" -u $user_dir" fi -source $benchmark_dir/.venv/bin/activate +source $venv_base_dir/.venv/bin/activate echo "Running AMLB benchmark with args $amlb_args" -python3 $benchmark_dir/automlbenchmark/runbenchmark.py $amlb_args +python3 $venv_base_dir/automlbenchmark/runbenchmark.py $amlb_args diff --git a/src/autogluon/bench/frameworks/timeseries/setup.sh b/src/autogluon/bench/frameworks/timeseries/setup.sh index 1d2755b1..c0722b64 100755 --- a/src/autogluon/bench/frameworks/timeseries/setup.sh +++ b/src/autogluon/bench/frameworks/timeseries/setup.sh @@ -4,27 +4,27 @@ set -eo pipefail GIT_URI=$1 # AMLB Git URI BRANCH=$2 # AMLB branch -DIR=$3 # from root of benchmark run +venv_base_dir=$3 # from root of benchmark run AMLB_FRAMEWORK=$4 # e.g. AutoGluon_dev:test AMLB_USER_DIR=$5 # directory where AMLB customizations are located -echo "Setup environment at $DIR" +echo "Setup environment at $venv_base_dir" -if [ ! -d $DIR ]; then - mkdir -p $DIR +if [ ! -d $venv_base_dir ]; then + mkdir -p $venv_base_dir fi # create virtual env -python3 -m venv $DIR/.venv -source $DIR/.venv/bin/activate +python3 -m venv $venv_base_dir/.venv +source $venv_base_dir/.venv/bin/activate echo "Cloning $GIT_URI#$BRANCH..." repo_name=$(basename -s .git $(echo $GIT_URI)) -git clone --depth 1 --branch ${BRANCH} ${GIT_URI} $DIR/$repo_name +git clone --depth 1 --branch ${BRANCH} ${GIT_URI} $venv_base_dir/$repo_name python3 -m pip install --upgrade pip python3 -m pip install --upgrade setuptools wheel -python3 -m pip install -r $DIR/automlbenchmark/requirements.txt +python3 -m pip install -r $venv_base_dir/automlbenchmark/requirements.txt # install amlb framework only echo "Installing framework $AMLB_FRAMEWORK" @@ -34,4 +34,4 @@ if [ -n "$AMLB_USER_DIR" ]; then echo "using user_dir $AMLB_USER_DIR" amlb_args+=" -u $AMLB_USER_DIR" fi -python3 $DIR/automlbenchmark/runbenchmark.py $amlb_args +python3 $venv_base_dir/automlbenchmark/runbenchmark.py $amlb_args diff --git a/src/autogluon/bench/frameworks/timeseries/timeseries_benchmark.py b/src/autogluon/bench/frameworks/timeseries/timeseries_benchmark.py index 9e787a13..78ce10fd 100644 --- a/src/autogluon/bench/frameworks/timeseries/timeseries_benchmark.py +++ b/src/autogluon/bench/frameworks/timeseries/timeseries_benchmark.py @@ -50,6 +50,10 @@ def run( Returns: None """ + if os.environ.get("RUNNING_IN_DOCKER", False): + venv_base_dir = "/home/" + else: + venv_base_dir = self.benchmark_dir exec_script_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "exec.sh") command = [ @@ -57,7 +61,7 @@ def run( framework, benchmark, constraint, - self.benchmark_dir, + venv_base_dir, self.metrics_dir, ] diff --git a/src/autogluon/bench/runbenchmark.py b/src/autogluon/bench/runbenchmark.py index 56f0849a..fa5c04ad 100644 --- a/src/autogluon/bench/runbenchmark.py +++ b/src/autogluon/bench/runbenchmark.py @@ -12,6 +12,7 @@ import typer import yaml +from autogluon.bench import __path__ as agbench_path from autogluon.bench import __version__ as agbench_version from autogluon.bench.cloud.aws.stack_handler import deploy_stack, destroy_stack from autogluon.bench.eval.hardware_metrics.hardware_metrics import get_hardware_metrics @@ -32,7 +33,7 @@ AMLB_DEPENDENT_MODULES = ["tabular", "timeseries"] -def get_kwargs(module: str, configs: dict, agbench_dev_url: str): +def get_kwargs(module: str, configs: dict): """Returns a dictionary of keyword arguments to be used for setting up and running the benchmark. Args: @@ -49,7 +50,6 @@ def get_kwargs(module: str, configs: dict, agbench_dev_url: str): "setup_kwargs": { "git_uri": framework_configs["repo"], "git_branch": framework_configs.get("version", "stable"), - "agbench_dev_url": agbench_dev_url, }, "run_kwargs": { "dataset_name": configs["dataset_name"], @@ -91,7 +91,6 @@ def run_benchmark( benchmark_dir: str, configs: dict, benchmark_dir_s3: str = None, - agbench_dev_url: str = None, skip_setup: str = False, ): """Runs a benchmark based on the provided configuration options. @@ -116,7 +115,7 @@ def run_benchmark( benchmark = benchmark_class(benchmark_name=benchmark_name, benchmark_dir=benchmark_dir) - module_kwargs = get_kwargs(module=module_name, configs=configs, agbench_dev_url=agbench_dev_url) + module_kwargs = get_kwargs(module=module_name, configs=configs) if not skip_setup: benchmark.setup(**module_kwargs["setup_kwargs"]) @@ -145,10 +144,15 @@ def invoke_lambda(configs: dict, config_file: str) -> dict: response = lambda_client.invoke( FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload=json.dumps(payload) ) - response = json.loads(response["Payload"].read().decode("utf-8")) + response_payload = json.loads(response["Payload"].read().decode("utf-8")) + + if "FunctionError" in response: + error_payload = response_payload + raise Exception(f"Lambda function error: {error_payload}") + logger.info("AWS Batch jobs submitted by %s.", configs["LAMBDA_FUNCTION_NAME"]) - return response + return response_payload @app.command() @@ -181,7 +185,6 @@ def get_job_status( config = yaml.safe_load(f) job_ids = list(config.get("job_configs", {}).keys()) cdk_deploy_region = config.get("CDK_DEPLOY_REGION", cdk_deploy_region) - if job_ids is None or cdk_deploy_region is None: raise ValueError("Either job_ids or cdk_deploy_region must be provided or configured in the config_file.") @@ -191,8 +194,13 @@ def get_job_status( for job_id in job_ids: response = batch_client.describe_jobs(jobs=[job_id]) - job = response["jobs"][0] - status_dict[job_id] = job["status"] + job_detail = response["jobs"][0] + + # Check if the job is an array job + if "arrayProperties" in job_detail and "size" in job_detail["arrayProperties"]: + status_dict[job_id] = job_detail["arrayProperties"]["statusSummary"] + else: + status_dict[job_id] = job_detail["status"] logger.info(status_dict) return status_dict @@ -205,18 +213,35 @@ def wait_for_jobs( quit_statuses: Optional[list] = ["SUCCEEDED", "FAILED"], frequency: Optional[int] = 120, ): + if config_file is not None: + with open(config_file, "r") as f: + config = yaml.safe_load(f) + job_ids = list(config.get("job_configs", {}).keys()) + aws_region = config.get("CDK_DEPLOY_REGION", aws_region) + + batch_client = boto3.client("batch", region_name=aws_region) while True: all_jobs_completed = True - failed_jobs = [] + failed_jobs = set() try: - job_status = get_job_status(job_ids=job_ids, cdk_deploy_region=aws_region, config_file=config_file) - - for job_id, job_status in job_status.items(): - if job_status == "FAILED": - failed_jobs.append(job_id) - elif job_status not in quit_statuses: - all_jobs_completed = False + job_status = get_job_status(job_ids=job_ids, cdk_deploy_region=aws_region, config_file=None) + + for job_id, status in job_status.items(): + if isinstance(status, str): + if status == "FAILED": + failed_jobs.append(job_id) + elif status not in quit_statuses: + all_jobs_completed = False + elif isinstance(status, dict): + for status, num in status.items(): + if status == "FAILED" and num > 0: + paginator = batch_client.get_paginator("list_jobs") + for page in paginator.paginate(arrayJobId=job_id, jobStatus="FAILED"): + for job in page["jobSummaryList"]: + failed_jobs.add(job["jobId"]) + if status not in quit_statuses and num > 0: + all_jobs_completed = False except botocore.exceptions.ClientError as e: logger.error(f"An error occurred: {e}.") return @@ -224,23 +249,11 @@ def wait_for_jobs( if all_jobs_completed: break else: - time.sleep(frequency) # Poll job statuses every 60 seconds + time.sleep(frequency) # Poll job statuses every 120 seconds return failed_jobs -def _get_split_id(file_name: str): - if "split" in file_name: - file_name = os.path.basename(file_name) - match = re.search(r"([a-f0-9]{32})", file_name) - if match: - return match.group(1) - else: - return None - - return None - - def _dump_configs(benchmark_dir: str, configs: dict, file_name: str): os.makedirs(benchmark_dir, exist_ok=True) config_path = os.path.join(benchmark_dir, file_name) @@ -260,14 +273,6 @@ def _get_git_info(git_uri_branch: str): return git_uri, git_branch -def _validate_single_value(configs: dict, key: str): - value = configs[key] - if isinstance(value, str): - configs[key] = [value] - elif isinstance(value, list) and len(value) != 1: - raise ValueError(f"Only single value (str, list[str]) is supported for {key}.") - - def _is_mounted(path: str): with open("/proc/mounts", "r") as mounts: for line in mounts: @@ -286,6 +291,7 @@ def _umount_if_needed(path: str = None): def _mount_dir(orig_path: str, new_path: str): + logging.info(f"Mounting from {orig_path} to {new_path}.") subprocess.run(["sudo", "mount", "--bind", orig_path, new_path]) @@ -302,8 +308,8 @@ def update_custom_dataloader(configs: dict): current_path = os.path.dirname(os.path.abspath(__file__)) custom_dataloader_path = os.path.join(current_path, "custom_configs", "dataloaders") - configs["custom_dataloader"]["dataloader_file"] = f"dataloaders/{dataloader_file_name}" - configs["custom_dataloader"]["dataset_config_file"] = f"dataloaders/{dataset_config_file_name}" + configs["custom_dataloader"]["dataloader_file"] = f"custom_configs/dataloaders/{dataloader_file_name}" + configs["custom_dataloader"]["dataset_config_file"] = f"custom_configs/dataloaders/{dataset_config_file_name}" return original_path, custom_dataloader_path @@ -316,24 +322,25 @@ def update_custom_metrics(configs: dict): current_path = os.path.dirname(os.path.abspath(__file__)) custom_metrics_path = os.path.join(current_path, "custom_configs", "metrics") - configs["custom_metrics"]["metrics_path"] = f"metrics/{metrics_file_name}" + configs["custom_metrics"]["metrics_path"] = f"custom_configs/metrics/{metrics_file_name}" return original_path, custom_metrics_path def get_resource(configs: dict, resource_name: str): - current_path = os.path.dirname(os.path.abspath(__file__)) - default_resource_file = os.path.join(current_path, "resources", f"{resource_name}.yaml") + ag_path = agbench_path[0] + default_resource_file = os.path.join(ag_path, "resources", f"{resource_name}.yaml") with open(default_resource_file, "r") as f: - default_resource = yaml.safe_load(f) + resources = yaml.safe_load(f) + current_path = os.getcwd() if configs.get("custom_resource_dir") is not None: custom_resource_dir = configs["custom_resource_dir"] - if os.path.exists(os.path.join(custom_resource_dir, f"{resource_name}.yaml")): - with open(os.path.join(custom_resource_dir, f"{resource_name}.yaml"), "r") as f: - custom_resource = yaml.safe_load(f) - default_resource.update(custom_resource) - return default_resource + resource_file = os.path.join(current_path, custom_resource_dir, f"{resource_name}.yaml") + if os.path.exists(resource_file): + with open(resource_file, "r") as f: + resources = yaml.safe_load(f) + return resources def update_resource_constraint(configs: dict): @@ -355,7 +362,6 @@ def run( config_file: str = typer.Argument(..., help="Path to custom config file."), remove_resources: bool = typer.Option(False, help="Remove resources after run."), wait: bool = typer.Option(False, help="Whether to block and wait for the benchmark to finish, default to False."), - dev_branch: Optional[str] = typer.Option(None, help="Path to a development AutoGluon-Bench branch."), skip_setup: bool = typer.Option( False, help="Whether to skip setting up framework in local mode, default to False." ), @@ -367,6 +373,10 @@ def run( config_file = download_file_from_s3(s3_path=config_file) with open(config_file, "r") as f: configs = yaml.safe_load(f) + if isinstance(configs, list) and os.environ.get( + "AWS_BATCH_JOB_ARRAY_INDEX" + ): # AWS array job sets ARRAY_INDEX environment variable for each child job + configs = configs[int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])] benchmark_name = _get_benchmark_name(configs=configs) timestamp_pattern = r"\d{8}T\d{6}" # Timestamp that matches YYYYMMDDTHHMMSS @@ -384,15 +394,9 @@ def run( try: configs["benchmark_name"] = benchmark_name # setting environment variables for docker build ARG - if dev_branch is not None: - os.environ["AG_BENCH_DEV_URL"] = dev_branch # pull dev branch from GitHub - else: - os.environ[ - "AG_BENCH_VERSION" - ] = agbench_version # set the installed version for Dockerfile to align with + os.environ["AG_BENCH_VERSION"] = agbench_version - os.environ["FRAMEWORK_PATH"] = f"frameworks/{module}" - os.environ["BENCHMARK_DIR"] = benchmark_dir + os.environ["FRAMEWORK_PATH"] = f"frameworks/{module}/" if module in AMLB_DEPENDENT_MODULES: os.environ["AMLB_FRAMEWORK"] = configs["framework"] @@ -410,9 +414,10 @@ def run( else: amlb_user_dir_local = amlb_user_dir - custom_configs_path = os.path.join(current_path, "custom_configs/amlb_configs") + default_user_dir = "custom_configs/amlb_configs" + custom_configs_path = os.path.join(current_path, default_user_dir) lambda_custom_configs_path = os.path.join( - current_path, "cloud/aws/batch_stack/lambdas/amlb_configs" + current_path, "cloud/aws/batch_stack/lambdas", default_user_dir ) original_path = amlb_user_dir_local paths += [custom_configs_path, lambda_custom_configs_path] @@ -421,10 +426,9 @@ def run( # to make it available for Docker build _umount_if_needed(path) _mount_dir(orig_path=original_path, new_path=path) - os.environ["AMLB_USER_DIR"] = "amlb_configs" - configs["amlb_user_dir"] = "amlb_configs" + os.environ["AMLB_USER_DIR"] = default_user_dir # For Docker build + configs["amlb_user_dir"] = default_user_dir # For Lambda job config elif module == "multimodal": - update_resource_constraint(configs=configs) if configs.get("custom_dataloader") is not None: original_path, custom_dataloader_path = update_custom_dataloader(configs=configs) paths.append(custom_dataloader_path) @@ -437,15 +441,17 @@ def run( _umount_if_needed(custom_metrics_path) _mount_dir(orig_path=original_path, new_path=custom_metrics_path) + update_resource_constraint(configs=configs) framework_configs = get_framework_configs(configs=configs) - os.environ["GIT_URI"] = framework_configs["repo"] - os.environ["GIT_BRANCH"] = framework_configs.get("version", "stable") if configs.get("custom_resource_dir") is not None: custom_resource_path = os.path.join(current_path, "custom_configs", "resources") paths.append(custom_resource_path) _umount_if_needed(custom_resource_path) _mount_dir(orig_path=configs["custom_resource_dir"], new_path=custom_resource_path) - configs["custom_resource_dir"] = "resources" + configs["custom_resource_dir"] = "custom_configs/resources" + + os.environ["GIT_URI"] = framework_configs["repo"] + os.environ["GIT_BRANCH"] = framework_configs.get("version", "stable") infra_configs = deploy_stack(custom_configs=configs) @@ -454,11 +460,16 @@ def run( ) config_s3_path = upload_to_s3( s3_bucket=infra_configs["METRICS_BUCKET"], - s3_dir=f"configs/{benchmark_name}", + s3_dir=f"configs/{module}/{benchmark_name}", local_path=cloud_config_path, ) - lambda_response = invoke_lambda(configs=infra_configs, config_file=config_s3_path) - aws_configs = {**infra_configs, **lambda_response} + + response = invoke_lambda(configs=infra_configs, config_file=config_s3_path) + + job_configs = { + "job_configs": response, + } + aws_configs = {**infra_configs, **job_configs} logger.info(f"Saving infra configs and submitted job configs under {benchmark_dir}.") aws_config_path = _dump_configs( benchmark_dir=benchmark_dir, configs=aws_configs, file_name="aws_configs.yaml" @@ -476,7 +487,7 @@ def run( f"`agbench destroy-stack --config-file {aws_config_path}` " "to delete the stack after jobs have run to completion if you choose to quit now." ) - + time.sleep(10) # wait for Batch to load failed_jobs = wait_for_jobs(config_file=aws_config_path) if len(failed_jobs) > 0: logger.warning("Some jobs have failed: %s.", failed_jobs) @@ -509,7 +520,7 @@ def run( _umount_if_needed(path) elif configs["mode"] == "local": - split_id = _get_split_id(config_file) + split_id = os.environ.get("AWS_BATCH_JOB_ARRAY_INDEX", 0) benchmark_dir_s3 = f"{module}/{benchmark_name}" if split_id is not None: benchmark_dir_s3 += f"/{benchmark_name}_{split_id}" @@ -521,15 +532,12 @@ def run( configs["amlb_user_dir"] = download_dir_from_s3(s3_path=amlb_user_dir, local_path=tmpdir.name) logger.info(f"Running benchmark {benchmark_name} at {benchmark_dir}.") - if dev_branch is not None: - logger.info(f"Using Development Branch: {dev_branch} for set up...") run_benchmark( benchmark_name=benchmark_name, benchmark_dir=benchmark_dir, configs=configs, benchmark_dir_s3=benchmark_dir_s3, - agbench_dev_url=dev_branch, skip_setup=skip_setup, ) else: diff --git a/src/autogluon/bench/utils/hardware_utilization.sh b/src/autogluon/bench/utils/hardware_utilization.sh index 0fb2e1aa..b4ecedca 100755 --- a/src/autogluon/bench/utils/hardware_utilization.sh +++ b/src/autogluon/bench/utils/hardware_utilization.sh @@ -4,7 +4,8 @@ source /etc/environment GPU_EXISTS=$(lspci | grep -i nvidia > /dev/null 2>&1 && echo "true" || echo "false") -INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id) +TOKEN=$(curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 60") +INSTANCE_ID=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" -v http://169.254.169.254/latest/meta-data/instance-id) while true; do if [ "$GPU_EXISTS" = "true" ]; then diff --git a/tests/unittests/benchmark/test_runbenchmarks.py b/tests/unittests/benchmark/test_runbenchmarks.py index 81d45337..c202c4c1 100644 --- a/tests/unittests/benchmark/test_runbenchmarks.py +++ b/tests/unittests/benchmark/test_runbenchmarks.py @@ -102,13 +102,11 @@ def test_get_kwargs_multimodal(): "framework": "AutoGluon_stable", "dataset_name": "dataset", } - agbench_dev_url = "https://github.com/test/autogluon-bench.git#branch" expected_result = { "setup_kwargs": { "git_uri": "https://github.com/autogluon/autogluon.git", "git_branch": "master", - "agbench_dev_url": "https://github.com/test/autogluon-bench.git#branch", }, "run_kwargs": { "dataset_name": "dataset", @@ -120,7 +118,7 @@ def test_get_kwargs_multimodal(): }, } - assert get_kwargs(module, configs, agbench_dev_url) == expected_result + assert get_kwargs(module, configs) == expected_result def test_get_kwargs_tabular(): @@ -134,7 +132,6 @@ def test_get_kwargs_tabular(): "fold_to_run": 6, "amlb_user_dir": "sample_configs/amlb_configs", } - agbench_dev_url = None expected_result = { "setup_kwargs": { @@ -153,7 +150,7 @@ def test_get_kwargs_tabular(): }, } - assert get_kwargs(module, configs, agbench_dev_url) == expected_result + assert get_kwargs(module, configs) == expected_result def test_invoke_lambda(mocker): @@ -184,14 +181,13 @@ def test_run_aws_mode(mocker, tmp_path): setup["config_file"], remove_resources=False, wait=False, - dev_branch=None, skip_setup=True, save_hardware_metrics=False, ) setup["mock_deploy_stack"].assert_called_once_with(custom_configs=setup["custom_configs"]) setup["mock_upload_to_s3"].assert_called_once_with( - s3_bucket="test_bucket", s3_dir="configs/test_benchmark_test_time", local_path="test_dump" + s3_bucket="test_bucket", s3_dir="configs/tabular/test_benchmark_test_time", local_path="test_dump" ) setup["mock_invoke_lambda"].assert_called_once_with(configs=setup["infra_configs"], config_file="test_s3_path") setup["mock_wait_for_jobs"].assert_not_called() @@ -206,14 +202,13 @@ def test_run_aws_mode_remove_resources(mocker, tmp_path): setup["config_file"], remove_resources=True, wait=False, - dev_branch=None, skip_setup=True, save_hardware_metrics=False, ) setup["mock_deploy_stack"].assert_called_once_with(custom_configs=setup["custom_configs"]) setup["mock_upload_to_s3"].assert_called_once_with( - s3_bucket="test_bucket", s3_dir="configs/test_benchmark_test_time", local_path="test_dump" + s3_bucket="test_bucket", s3_dir="configs/tabular/test_benchmark_test_time", local_path="test_dump" ) setup["mock_invoke_lambda"].assert_called_once_with(configs=setup["infra_configs"], config_file="test_s3_path") @@ -234,14 +229,13 @@ def test_run_aws_mode_wait(mocker, tmp_path): setup["config_file"], remove_resources=False, wait=True, - dev_branch=None, skip_setup=True, save_hardware_metrics=False, ) setup["mock_deploy_stack"].assert_called_once_with(custom_configs=setup["custom_configs"]) setup["mock_upload_to_s3"].assert_called_once_with( - s3_bucket="test_bucket", s3_dir="configs/test_benchmark_test_time", local_path="test_dump" + s3_bucket="test_bucket", s3_dir="configs/tabular/test_benchmark_test_time", local_path="test_dump" ) setup["mock_invoke_lambda"].assert_called_once_with(configs=setup["infra_configs"], config_file="test_s3_path") @@ -249,27 +243,6 @@ def test_run_aws_mode_wait(mocker, tmp_path): setup["mock_get_hardware_metrics"].assert_not_called() -def test_run_aws_mode_dev_branch(mocker, tmp_path): - setup = setup_mock(mocker, tmp_path) - dev_branch = "dev_branch_url" - - run( - setup["config_file"], - remove_resources=False, - wait=False, - dev_branch=dev_branch, - skip_setup=True, - save_hardware_metrics=False, - ) - - assert os.environ["AG_BENCH_DEV_URL"] == dev_branch - setup["mock_deploy_stack"].assert_called_once_with(custom_configs=setup["custom_configs"]) - setup["mock_upload_to_s3"].assert_called_once_with( - s3_bucket="test_bucket", s3_dir="configs/test_benchmark_test_time", local_path="test_dump" - ) - setup["mock_invoke_lambda"].assert_called_once_with(configs=setup["infra_configs"], config_file="test_s3_path") - - def test_run_aws_tabular_user_dir(mocker, tmp_path): setup = setup_mock(mocker, tmp_path) temp_dir_mock = mocker.patch("tempfile.TemporaryDirectory") @@ -279,17 +252,14 @@ def test_run_aws_tabular_user_dir(mocker, tmp_path): setup["config_file"], remove_resources=False, wait=False, - dev_branch="https://git_url#git_branch", skip_setup=True, save_hardware_metrics=False, ) - assert os.environ["AG_BENCH_DEV_URL"] == "https://git_url#git_branch" - assert os.environ["FRAMEWORK_PATH"] == "frameworks/tabular" - assert os.environ["BENCHMARK_DIR"] == "ag_bench_runs/tabular/test_benchmark_test_time" + assert os.environ["FRAMEWORK_PATH"] == "frameworks/tabular/" assert os.environ["GIT_URI"] == "https://github.com/openml/automlbenchmark.git" assert os.environ["GIT_BRANCH"] == "master" assert os.environ["AMLB_FRAMEWORK"] == "AutoGluon:stable" - assert os.environ["AMLB_USER_DIR"] == "amlb_configs" + assert os.environ["AMLB_USER_DIR"] == "custom_configs/amlb_configs" temp_dir_mock.assert_not_called() s3_mock.assert_not_called() assert setup["mock_umount"].call_count == 4 @@ -310,12 +280,14 @@ def test_run_aws_multimodal_custom_dataloader(mocker, tmp_path): setup["config_file"], remove_resources=False, wait=False, - dev_branch="https://git_url#git_branch", skip_setup=True, save_hardware_metrics=False, ) - assert setup["custom_configs"]["custom_dataloader"]["dataloader_file"] == "dataloaders/dataset.py" - assert setup["custom_configs"]["custom_dataloader"]["dataset_config_file"] == "dataloaders/datasets.yaml" + assert setup["custom_configs"]["custom_dataloader"]["dataloader_file"] == "custom_configs/dataloaders/dataset.py" + assert ( + setup["custom_configs"]["custom_dataloader"]["dataset_config_file"] + == "custom_configs/dataloaders/datasets.yaml" + ) assert umount_mock.call_count == 2 assert mount_mock.call_count == 1 @@ -333,11 +305,10 @@ def test_run_aws_multimodal_custom_metrics(mocker, tmp_path): setup["config_file"], remove_resources=False, wait=False, - dev_branch="https://git_url#git_branch", skip_setup=True, save_hardware_metrics=False, ) - assert setup["custom_configs"]["custom_metrics"]["metrics_path"] == "metrics/metrics.py" + assert setup["custom_configs"]["custom_metrics"]["metrics_path"] == "custom_configs/metrics/metrics.py" assert umount_mock.call_count == 2 assert mount_mock.call_count == 1 @@ -362,7 +333,6 @@ def test_run_local_mode(mocker, tmp_path): str(config_file), remove_resources=False, wait=False, - dev_branch=None, skip_setup=False, save_hardware_metrics=False, ) @@ -372,8 +342,7 @@ def test_run_local_mode(mocker, tmp_path): benchmark_name="test_benchmark_test_time", benchmark_dir="ag_bench_runs/tabular/test_benchmark_test_time", configs=configs, - benchmark_dir_s3="tabular/test_benchmark_test_time", - agbench_dev_url=None, + benchmark_dir_s3="tabular/test_benchmark_test_time/test_benchmark_test_time_0", skip_setup=False, ) @@ -477,7 +446,6 @@ def test_get_kwargs_timeseries(): "fold_to_run": 6, "amlb_user_dir": "sample_configs/amlb_configs", } - agbench_dev_url = None expected_result = { "setup_kwargs": { @@ -496,7 +464,7 @@ def test_get_kwargs_timeseries(): }, } - assert get_kwargs(module, configs, agbench_dev_url) == expected_result + assert get_kwargs(module, configs) == expected_result def test_run_aws_timeseries_user_dir(mocker, tmp_path): @@ -508,16 +476,13 @@ def test_run_aws_timeseries_user_dir(mocker, tmp_path): setup["config_file"], remove_resources=False, wait=False, - dev_branch="https://git_url#git_branch", skip_setup=True, ) - assert os.environ["AG_BENCH_DEV_URL"] == "https://git_url#git_branch" - assert os.environ["FRAMEWORK_PATH"] == "frameworks/timeseries" - assert os.environ["BENCHMARK_DIR"] == "ag_bench_runs/timeseries/test_benchmark_test_time" + assert os.environ["FRAMEWORK_PATH"] == "frameworks/timeseries/" assert os.environ["GIT_URI"] == "https://github.com/openml/automlbenchmark.git" assert os.environ["GIT_BRANCH"] == "master" assert os.environ["AMLB_FRAMEWORK"] == "AutoGluon:stable" - assert os.environ["AMLB_USER_DIR"] == "amlb_configs" + assert os.environ["AMLB_USER_DIR"] == "custom_configs/amlb_configs" temp_dir_mock.assert_not_called() s3_mock.assert_not_called() assert setup["mock_umount"].call_count == 4 diff --git a/tests/unittests/cloud/aws/test_stack.py b/tests/unittests/cloud/aws/test_stack.py index d2eb2594..2c554fdb 100644 --- a/tests/unittests/cloud/aws/test_stack.py +++ b/tests/unittests/cloud/aws/test_stack.py @@ -59,7 +59,6 @@ def test_batch_job_stack(): os.environ["FRAMEWORK_PATH"] = "frameworks/tabular" os.environ["GIT_URI"] = "https://github.com/openml/automlbenchmark.git" os.environ["GIT_BRANCH"] = "master" - os.environ["BENCHMARK_DIR"] = "benchmark_name_20230725" with patch.object( StaticResourceStack, diff --git a/tests/unittests/evaluation/hardware_metrics/test_hardware_metrics.py b/tests/unittests/evaluation/hardware_metrics/test_hardware_metrics.py index b03e7329..5366552b 100644 --- a/tests/unittests/evaluation/hardware_metrics/test_hardware_metrics.py +++ b/tests/unittests/evaluation/hardware_metrics/test_hardware_metrics.py @@ -136,7 +136,7 @@ def test_get_hardware_metrics(self, mock_boto3, mock_metrics, mock_upload_to_s3, s3_bucket="some bucket", module="tabular", benchmark_name="some_benchmark", - sub_folder="ag_bench_20230720T102030_2d42d496266911ee8df28ee9311e6528", + sub_folder="some_benchmark_123456-abc-efg", cloudwatch_client=ANY, ), call( @@ -144,7 +144,7 @@ def test_get_hardware_metrics(self, mock_boto3, mock_metrics, mock_upload_to_s3, s3_bucket="some bucket", module="tabular", benchmark_name="some_benchmark", - sub_folder="ag_bench_20230720T102030_2d794800266911ee8df28ee9311e6528", + sub_folder="some_benchmark_010101-xxx-zzz", cloudwatch_client=ANY, ), ]