diff --git a/.github/actions/tr_post_test_run/action.yml b/.github/actions/tr_post_test_run/action.yml index 04444af4d9..f5c8849283 100644 --- a/.github/actions/tr_post_test_run/action.yml +++ b/.github/actions/tr_post_test_run/action.yml @@ -16,7 +16,7 @@ runs: if: ${{ always() }} run: | export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py + python tests/end_to_end/utils/summary_helper.py --func_name "print_task_runner_score" echo "Test summary printed" shell: bash diff --git a/.github/workflows/federated_runtime.yml b/.github/workflows/federated_runtime.yml new file mode 100644 index 0000000000..717b96e490 --- /dev/null +++ b/.github/workflows/federated_runtime.yml @@ -0,0 +1,66 @@ +#--------------------------------------------------------------------------- +# Workflow to run 301_MNIST_Watermarking notebook +# Authors - Noopur, Payal Chaurasiya +#--------------------------------------------------------------------------- +name: Federated Runtime 301 MNIST Watermarking + +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + + workflow_dispatch: + +permissions: + contents: read + +jobs: + test_federated_runtime_301_watermarking_notebook: + if: github.event.pull_request.draft == false + runs-on: ubuntu-22.04 + timeout-minutes: 20 + steps: + - name: Checkout OpenFL repository + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: "3.10" + + - name: Install dependencies # Without this step, fx command will not work + id: install_dependencies + run: | + python -m pip install --upgrade pip ipython ipykernel + pip install . + pip install -r test-requirements.txt + + - name: Run Federated Runtime 301 MNIST Watermarking via pytest + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/wf_federated_runtime_tests.py -k test_federated_runtime_301_watermarking + echo "Federated Runtime 301 MNIST Watermarking test run completed" + + - name: Print test summary + id: print_test_summary + if: ${{ always() }} + run: | + export PYTHONPATH="$PYTHONPATH:." + python tests/end_to_end/utils/summary_helper.py --func_name "print_federated_runtime_score" + echo "Test summary printed" + + - name: Tar files + if: ${{ always() }} # collect artifacts regardless of failures + run: | + tar -cvf notebook_301.tar --exclude="__pycache__" $HOME/results --ignore-failed-read + echo "TAR file created" + + - name: Upload Artifacts + uses: actions/upload-artifact@v4 + if: ${{ always() }} # collect artifacts regardless of failures + with: + name: federated_runtime_301_watermarking_${{ github.run_id }} + path: notebook_301.tar diff --git a/.github/workflows/straggler-handling.yml b/.github/workflows/straggler-handling.yml index 450caf8e8a..64a2f07153 100644 --- a/.github/workflows/straggler-handling.yml +++ b/.github/workflows/straggler-handling.yml @@ -21,7 +21,7 @@ jobs: matrix: os: ['ubuntu-latest', 'windows-latest'] runs-on: ${{ matrix.os }} - timeout-minutes: 15 + timeout-minutes: 30 steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/task_runner_basic_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml index b50eedd526..4c4aaa12d7 100644 --- a/.github/workflows/task_runner_basic_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -31,7 +31,7 @@ jobs: test_with_tls: name: tr_tls runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # There are open issues for some of the models, so excluding them for now: @@ -74,7 +74,7 @@ jobs: test_with_non_tls: name: tr_non_tls runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 @@ -117,7 +117,7 @@ jobs: test_with_no_client_auth: name: tr_no_client_auth runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 @@ -160,7 +160,7 @@ jobs: test_memory_logs: name: tr_tls_memory_logs runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 diff --git a/.github/workflows/taskrunner.yml b/.github/workflows/taskrunner.yml index d003ad8e1c..088ee60c64 100644 --- a/.github/workflows/taskrunner.yml +++ b/.github/workflows/taskrunner.yml @@ -18,8 +18,8 @@ jobs: build: if: github.event.pull_request.draft == false runs-on: ubuntu-latest - timeout-minutes: 15 - + timeout-minutes: 30 + steps: - uses: actions/checkout@v3 - name: Set up Python diff --git a/.github/workflows/wf_functional_e2e.yml b/.github/workflows/wf_functional_e2e.yml index 923aa73bae..1831949299 100644 --- a/.github/workflows/wf_functional_e2e.yml +++ b/.github/workflows/wf_functional_e2e.yml @@ -29,9 +29,9 @@ env: NUM_COLLABORATORS: ${{ github.event.inputs.num_collaborators || '2' }} jobs: - test_wf_func: + test_wf_functional_local_runtime: if: github.event.pull_request.draft == false - name: wf_func + name: wf_functional_local_runtime runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -74,7 +74,7 @@ jobs: if: ${{ always() }} run: | export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py + python tests/end_to_end/utils/summary_helper.py --func_name "print_local_runtime_score" echo "Test summary printed" - name: Create Tar (exclude cert and data folders) diff --git a/.github/workflows/workflow_interface_101_mnist.yml b/.github/workflows/workflow_interface_101_mnist.yml index 57e1dae46e..1182a74361 100644 --- a/.github/workflows/workflow_interface_101_mnist.yml +++ b/.github/workflows/workflow_interface_101_mnist.yml @@ -17,7 +17,7 @@ jobs: run_notebook: if: github.event.pull_request.draft == false runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 steps: - name: Checkout OpenFL repository uses: actions/checkout@v4.1.1 diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/Bangalore_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/Bangalore_config.yaml similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/Bangalore_config.yaml rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/Bangalore_config.yaml diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/private_attributes.py similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/private_attributes.py rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/private_attributes.py diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/requirements.txt b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/requirements.txt similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/requirements.txt rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/requirements.txt diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/start_envoy.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/start_envoy.sh similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/start_envoy.sh rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/start_envoy.sh diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/Chandler_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/Chandler_config.yaml similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/Chandler_config.yaml rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/Chandler_config.yaml diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/private_attributes.py similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/private_attributes.py rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/private_attributes.py diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/requirements.txt b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/requirements.txt similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/requirements.txt rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/requirements.txt diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/start_envoy.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/start_envoy.sh similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/start_envoy.sh rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/start_envoy.sh diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/README.md b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/README.md similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/README.md rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/README.md diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/director_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/director_config.yaml similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/director_config.yaml rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/director_config.yaml diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/private_attributes.py similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/private_attributes.py rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/private_attributes.py diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/start_director.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/start_director.sh similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/start_director.sh rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/start_director.sh diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/workspace/MNIST_Watermarking.ipynb b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/workspace/MNIST_Watermarking.ipynb similarity index 95% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/workspace/MNIST_Watermarking.ipynb rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/workspace/MNIST_Watermarking.ipynb index 040fb2cb26..0ee4c67681 100644 --- a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/workspace/MNIST_Watermarking.ipynb +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/workspace/MNIST_Watermarking.ipynb @@ -39,7 +39,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "d79eacbd", "metadata": {}, "outputs": [], @@ -66,7 +66,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "f7475cba", "metadata": {}, "outputs": [], @@ -95,7 +95,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "9bd8ac2d", "metadata": {}, "outputs": [], @@ -193,7 +193,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "89cf4866", "metadata": {}, "outputs": [], @@ -245,7 +245,7 @@ " watermark_pretrain_optimizer=None,\n", " watermark_retrain_optimizer=None,\n", " round_number=0,\n", - " n_rounds=1,\n", + " n_rounds=3,\n", " **kwargs,\n", " ):\n", " super().__init__(**kwargs)\n", @@ -425,7 +425,20 @@ " + f\" Acc: {self.watermark_retrain_validation_score:<.6f}\")\n", " retrain_round += 1\n", "\n", - " self.next(self.end)\n", + " self.next(self.internal_loop)\n", + " \n", + " @aggregator\n", + " def internal_loop(self):\n", + " \"\"\"\n", + " Internal loop to continue the Federated Learning process.\n", + " \"\"\"\n", + " if self.round_number == self.n_rounds - 1:\n", + " print(f\"\\nCompleted training for all {self.n_rounds} round(s)\")\n", + " self.next(self.end)\n", + " else:\n", + " self.round_number += 1\n", + " print(f\"\\nCompleted round: {self.round_number}\")\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", "\n", " @aggregator\n", " def end(self):\n", @@ -449,7 +462,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "1715a373", "metadata": {}, "outputs": [], @@ -468,7 +481,7 @@ "federated_runtime = FederatedRuntime(\n", " collaborators=authorized_collaborators,\n", " director=director_info, \n", - " notebook_path='./MNIST_Watermarking.ipynb'\n", + " notebook_path='./MNIST_Watermarking.ipynb',\n", ")" ] }, @@ -552,7 +565,7 @@ ], "metadata": { "kernelspec": { - "display_name": "fed_run", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -566,7 +579,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.15" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/openfl/experimental/workflow/runtime/federated_runtime.py b/openfl/experimental/workflow/runtime/federated_runtime.py index b484860a8d..efa90e2a24 100644 --- a/openfl/experimental/workflow/runtime/federated_runtime.py +++ b/openfl/experimental/workflow/runtime/federated_runtime.py @@ -193,8 +193,12 @@ def get_flow_state(self) -> Tuple[bool, Any]: return status, flow_object - def get_envoys(self) -> None: - """Prints the status of Envoys in a formatted way.""" + def get_envoys(self) -> List[str]: + """ + Prints the status of Envoys in a formatted way. + Returns: + online_envoys (List[str]): List of online envoys. + """ # Fetch envoy data envoys = self._dir_client.get_envoys() DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -204,6 +208,7 @@ def get_envoys(self) -> None: headers = ["Name", "Online", "Last Updated", "Experiment Running", "Experiment Name"] # Prepare the table rows rows = [] + online_envoys = [] for envoy in envoys.envoy_infos: rows.append( [ @@ -214,11 +219,15 @@ def get_envoys(self) -> None: envoy.experiment_name if envoy.experiment_name else "None", ] ) + if envoy.is_online: + online_envoys.append(envoy.envoy_name) + # Use tabulate to format the table result = tabulate(rows, headers=headers, tablefmt="grid") # Display the current timestamp print(f"Status of Envoys connected to Federation at: {now}\n") print(result) + return online_envoys def stream_experiment_stdout(self, experiment_name) -> None: """Stream experiment stdout. diff --git a/test-requirements.txt b/test-requirements.txt index c047afcbdf..b07ea268b9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,4 +6,5 @@ pytest-asyncio==0.25.2 pytest-mock==3.14.0 defusedxml==0.7.1 matplotlib==3.10.0 -fpdf==1.7.2 \ No newline at end of file +fpdf==1.7.2 +papermill==2.6.0 diff --git a/tests/end_to_end/pytest.ini b/tests/end_to_end/pytest.ini index ed865c99c6..e26b1337c6 100644 --- a/tests/end_to_end/pytest.ini +++ b/tests/end_to_end/pytest.ini @@ -8,5 +8,6 @@ markers = log_memory_usage: mark a test as a log memory usage test. task_runner_basic: mark a test as a task runner basic test. task_runner_dockerized_ws: mark a test as a task runner dockerized workspace test. + federated_runtime_301_watermarking: mark a test as a federated runtime 301 watermarking test. asyncio_mode=auto asyncio_default_fixture_loop_scope="function" diff --git a/tests/end_to_end/test_suites/wf_federated_runtime_tests.py b/tests/end_to_end/test_suites/wf_federated_runtime_tests.py new file mode 100644 index 0000000000..18d9f681b2 --- /dev/null +++ b/tests/end_to_end/test_suites/wf_federated_runtime_tests.py @@ -0,0 +1,94 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import logging +import os +import time +import concurrent.futures + +import tests.end_to_end.utils.federation_helper as fh + +log = logging.getLogger(__name__) + + +@pytest.mark.federated_runtime_301_watermarking +def test_federated_runtime_301_watermarking(request): + """ + Test federated runtime without TLS. + Args: + request (Fixture): Pytest fixture + """ + envoys = ["Bangalore", "Chandler"] + workspace_path = os.path.join( + os.getcwd(), + "openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking", + ) + # Activate the experimental feature + cmd = f"fx experimental activate" + error_msg = "Failed to activate the experimental feature" + return_code, output, error = fh.run_command( + cmd, + workspace_path=workspace_path, + error_msg=error_msg, + return_error=True, + ) + + if error: + # Check if the experimental feature is already activated + if [err for err in error if "No such command 'activate'" in err]: + log.info("Experimental feature already activated. Ignore the error.") + else: + log.error(f"{error_msg}: {error}") + raise Exception(error) + + log.info(f"Activated the experimental feature.") + + # Create result log files for the director and envoys + result_path, participant_res_files = fh.create_federated_runtime_participant_res_files( + request.config.results_dir, envoys + ) + + # Start the director + fh.start_director(workspace_path, participant_res_files["director"]) + + # Start envoys Bangalore and Chandler and connect them to the director + executor = concurrent.futures.ThreadPoolExecutor() + results = [ + executor.submit( + fh.start_envoy, + envoy_name=envoy, + workspace_path=workspace_path, + res_file=participant_res_files[envoy.lower()], + ) + for envoy in envoys + ] + assert all([f.result() for f in results]), "Failed to start one or more envoys" + + # Based on the pattern, the envoys take time to connect to the director + # Hence, adding a sleep of 10 seconds anyways. + time.sleep(10) + nb_workspace_path = os.path.join(workspace_path, "workspace") + notebook_path = nb_workspace_path + "/" + "MNIST_Watermarking.ipynb" + + assert fh.check_envoys_director_conn_federated_runtime( + notebook_path=notebook_path, expected_envoys=envoys + ), "Envoys are not connected to the director" + + # IMP - Notebook 301 Watermarking has hard coded notebook path set, hence changing the directory + # This might not be true for all notebooks, thus keeping it as a separate step + os.chdir(nb_workspace_path) + + assert fh.run_notebook( + notebook_path=notebook_path, + output_notebook_path=result_path + "/" + "MNIST_Watermarking_output.ipynb" + ), "Notebook run failed" + + # Change the directory back to the original directory + os.chdir(os.getcwd()) + + assert fh.verify_federated_runtime_experiment_completion( + participant_res_files + ), "Experiment failed" + + log.info("Experiment completed successfully") diff --git a/tests/end_to_end/utils/exceptions.py b/tests/end_to_end/utils/exceptions.py index 4cccce0e5f..31fa596ac0 100644 --- a/tests/end_to_end/utils/exceptions.py +++ b/tests/end_to_end/utils/exceptions.py @@ -71,3 +71,18 @@ class WorkspaceLoadException(Exception): class ReferenceFlowException(Exception): """Exception for reference flow""" pass + + +class NotebookRunException(Exception): + """Exception for notebook run""" + pass + + +class EnvoyStartException(Exception): + """Exception for envoy start""" + pass + + +class DirectorStartException(Exception): + """Exception for director start""" + pass diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 698e580179..50910c4f2e 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -7,6 +7,7 @@ import os import json import re +import papermill as pm from pathlib import Path import tests.end_to_end.utils.constants as constants @@ -16,6 +17,7 @@ from tests.end_to_end.models import collaborator as col_model log = logging.getLogger(__name__) +home_dir = Path().home() def setup_pki_for_collaborators(collaborators, model_owner, local_bind_path): @@ -542,6 +544,7 @@ def run_command( bg_file=None, print_output=False, with_docker=False, + return_error=False, ): """ Run the command @@ -553,6 +556,7 @@ def run_command( bg_file (str): Background file (with path) print_output (bool): Print the output with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. + return_error (bool): Return error message Returns: tuple: Return code, output and error """ @@ -591,7 +595,7 @@ def run_command( ) else: return_code, output, error = ssh.run_command(command) - if return_code != 0: + if return_code != 0 and not return_error: log.error(f"{error_msg}: {error}") raise Exception(f"{error_msg}: {error}") @@ -752,3 +756,185 @@ def start_docker_containers_for_dws( raise ex.DockerException( f"Failed to start {participant.name} docker environment: {e}" ) + + +def start_director(workspace_path, dir_res_file): + """ + Start the director. + Args: + workspace_path (str): Workspace path + dir_res_file (str): Director result file + Returns: + bool: True if successful, else False + """ + try: + error_msg = "Failed to start the director" + return_code, output, error = run_command( + "./start_director.sh", + error_msg=error_msg, + workspace_path=os.path.join(workspace_path, "director"), + run_in_background=True, + bg_file=dir_res_file, + ) + log.debug(f"Director start: Return code: {return_code}, Output: {output}, Error: {error}") + log.info( + "Waiting for 30s for the director to start. With no retry mechanism in place, " + "envoys will fail immediately if the director is not ready." + ) + time.sleep(30) + except ex.DirectorStartException as e: + raise e + return True + + +def start_envoy(envoy_name, workspace_path, res_file): + """ + Start given envoy. + Args: + envoy_name (str): Name of the envoy. For e.g. Bangalore, Chandler (case sensitive) + workspace_path (str): Workspace path + res_file (str): Result file to track the logs. + Returns: + bool: True if successful, else False + """ + try: + error_msg = f"Failed to start {envoy_name} envoy" + return_code, output, error = run_command( + f"./start_envoy.sh {envoy_name} {envoy_name}_config.yaml", + error_msg=error_msg, + workspace_path=os.path.join(workspace_path, envoy_name), + run_in_background=True, + bg_file=res_file, + ) + log.debug(f"{envoy_name} start: Return code: {return_code}, Output: {output}, Error: {error}") + except ex.EnvoyStartException as e: + raise e + return True + + +def create_federated_runtime_participant_res_files(results_dir, envoys, model_name="301_mnist_watermarking"): + """ + Create result log files for the director and envoys. + Args: + results_dir (str): Results directory + envoys (list): List of envoys + model_name (str): Model name + Returns: + tuple: Result path and participant result files (including director) + """ + participant_res_files = {} + result_path = os.path.join( + home_dir, results_dir, model_name + ) + os.makedirs(result_path, exist_ok=True) + + for participant in envoys + ["director"]: + res_file = os.path.join(result_path, f"{participant.lower()}.log") + participant_res_files[participant.lower()] = res_file + # Create the file + open(res_file, 'w').close() + + + return result_path, participant_res_files + + +def check_envoys_director_conn_federated_runtime( + notebook_path, expected_envoys, director_node_fqdn="localhost", director_port=50050 +): + """ + Function to check if the envoys are connected to the director for Federated Runtime notebooks. + Args: + notebook_path (str): Path to the notebook + expected_envoys (list): List of expected envoys + director_node_fqdn (str): Director node FQDN + director_port (int): Director port + Returns: + bool: True if all the envoys are connected to the director, else False + """ + from openfl.experimental.workflow.runtime import FederatedRuntime + + # Number of retries and delay between retries in seconds + MAX_RETRIES = RETRY_DELAY = 5 + + federated_runtime = FederatedRuntime( + collaborators=expected_envoys, + director={ + "director_node_fqdn": director_node_fqdn, + "director_port": director_port, + }, + notebook_path=notebook_path, + ) + # Retry logic + for attempt in range(MAX_RETRIES): + actual_envoys = federated_runtime.get_envoys() + if all( + sorted(expected_envoys) == sorted(actual_envoys) + for expected_envoys, actual_envoys in [(expected_envoys, actual_envoys)] + ): + log.info("All the envoys are connected to the director") + return True + else: + log.warning( + f"Attempt {attempt + 1}/{MAX_RETRIES}: Not all envoys are connected. Retrying in {RETRY_DELAY} seconds..." + ) + time.sleep(RETRY_DELAY) + + return False + + +def run_notebook(notebook_path, output_notebook_path): + """ + Function to run the notebook. + Args: + notebook_path (str): Path to the notebook + participant_res_files (dict): Dictionary containing participant names and their result log files + Returns: + bool: True if successful, else False + """ + try: + log.info(f"Running the notebook: {notebook_path} with output notebook path: {output_notebook_path}") + output = pm.execute_notebook( + input_path=notebook_path, + output_path=output_notebook_path, + request_save_on_cell_execute=True, + autosave_cell_every=5, # autosave every 5 seconds + log_output=True, + ) + except pm.exceptions.PapermillExecutionError as e: + log.error(f"PapermillExecutionError: {e}") + raise e + + except ex.NotebookRunException as e: + log.error(f"Failed to run the notebook: {e}") + raise e + return True + + +def verify_federated_runtime_experiment_completion(participant_res_files): + """ + Verify the completion of the experiment using the participant logs. + Args: + participant_res_files (dict): Dictionary containing participant names and their result log files + Returns: + bool: True if successful, else False + """ + # Check participant logs for successful completion + for name, result_file in participant_res_files.items(): + # Do not open file here as it will be opened in the loop below + # Also it takes time for the federation run to start and write the logs + with open(result_file, "r") as file: + lines = [line.strip() for line in file.readlines()] + last_7_lines = list(filter(str.rstrip, lines))[-7:] + if ( + name == "director" + and [1 for content in last_7_lines if "Experiment FederatedFlow_MNIST_Watermarking was finished successfully" in content] + ): + log.debug(f"Process completed for {name}") + continue + elif name != "director" and [1 for content in last_7_lines if "End of Federation reached." in content]: + log.debug(f"Process completed for {name}") + continue + else: + log.error(f"Process failed for {name}") + return False + return True diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index cfdbc17a9e..0ed8aa0a3a 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -1,9 +1,11 @@ -# Copyright 2020-2023 Intel Corporation +# Copyright 2020-2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import argparse from defusedxml.ElementTree import parse as defused_parse from lxml import etree import os +import re from pathlib import Path import tests.end_to_end.utils.constants as constants @@ -100,7 +102,7 @@ def get_testcase_result(): return database_list -def main(): +def print_task_runner_score(): """ Main function to get the test case results and aggregator logs And write the results to GitHub step summary @@ -166,5 +168,68 @@ def main(): ) +def print_federated_runtime_score(): + summary_file = os.getenv("GITHUB_STEP_SUMMARY") + + search_string = "Aggregated model validation score" + + last_occurrence = aggregated_model_score = None + + # Assumption - result directory is present in the home directory + dir_res_file = os.path.join( + result_path, + "301_mnist_watermarking", + "director.log", + ) + + # Open and read the log file + with open(dir_res_file, "r") as file: + for line in file: + if search_string in line: + last_occurrence = line + + # Extract the value from the last occurrence + if last_occurrence: + match = re.search( + r"Aggregated model validation score = (\d+\.\d+)", last_occurrence + ) + if match: + aggregated_model_score = match.group(1) + print(f"Last Aggregated model validation score: {aggregated_model_score}") + else: + print("No valid score found in the last occurrence.") + else: + print(f"No occurrences of '{search_string}' found in the log file.") + + # Write the results to GitHub step summary file + # This file is created at runtime by the GitHub action, thus we cannot verify its existence beforehand + with open(summary_file, "a") as fh: + # DO NOT change the print statements + print("| Aggregated model validation score |", file=fh) + print("| ------------- |", file=fh) + print(f"| {aggregated_model_score} |", file=fh) + + +def fetch_args(): + """ + Function to fetch the commandline arguments. + Returns: + Parsed arguments + """ + # Initialize the parser and add arguments + parser = argparse.ArgumentParser() + parser.add_argument( + "--func_name", required=True, default="", type=str, help="Name of function to be called" + ) + args = parser.parse_args() + return args + + if __name__ == "__main__": - main() + # Fetch input arguments + args = fetch_args() + func_name = args.func_name + if func_name in ["print_task_runner_score", "print_local_runtime_score"]: + print_task_runner_score() + elif func_name == "print_federated_runtime_score": + print_federated_runtime_score() diff --git a/tests/end_to_end/utils/wf_common_fixtures.py b/tests/end_to_end/utils/wf_common_fixtures.py index 44e6629990..2243ec5ccd 100644 --- a/tests/end_to_end/utils/wf_common_fixtures.py +++ b/tests/end_to_end/utils/wf_common_fixtures.py @@ -142,14 +142,3 @@ def fx_local_federated_workflow_prvt_attr(request): collaborators=collaborators_list, runtime=local_runtime, ) - - -@pytest.fixture(scope="function") -def fx_federated_runtime(request): - request.config.test_env = "workflow_federation_runtime" - - envoys = ["Portland, Seattle, Chandler, Bangalore"] - return workflow_federated_runtime_fixture( - director="director", - envoys=envoys - )