diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py index 662099273f..e4b8c3f128 100644 --- a/tests/end_to_end/test_suites/memory_logs_tests.py +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -5,7 +5,7 @@ import logging import os -from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws +from tests.end_to_end.utils.tr_common_fixtures import fx_federation_tr, fx_federation_tr_dws import tests.end_to_end.utils.constants as constants from tests.end_to_end.utils import federation_helper as fed_helper, ssh_helper as ssh from tests.end_to_end.utils.generate_report import generate_memory_report, convert_to_json diff --git a/tests/end_to_end/test_suites/sample_tests.py b/tests/end_to_end/test_suites/sample_tests.py index 01d5fd9394..cea7add2ec 100644 --- a/tests/end_to_end/test_suites/sample_tests.py +++ b/tests/end_to_end/test_suites/sample_tests.py @@ -4,7 +4,7 @@ import pytest import logging -from tests.end_to_end.utils.common_fixtures import ( +from tests.end_to_end.utils.tr_common_fixtures import ( fx_federation_tr, fx_federation_tr_dws, ) diff --git a/tests/end_to_end/test_suites/task_runner_tests.py b/tests/end_to_end/test_suites/task_runner_tests.py index eb9c344da8..a6df29af3a 100644 --- a/tests/end_to_end/test_suites/task_runner_tests.py +++ b/tests/end_to_end/test_suites/task_runner_tests.py @@ -4,7 +4,7 @@ import pytest import logging -from tests.end_to_end.utils.common_fixtures import ( +from tests.end_to_end.utils.tr_common_fixtures import ( fx_federation_tr, fx_federation_tr_dws, ) diff --git a/tests/end_to_end/test_suites/wf_local_func_tests.py b/tests/end_to_end/test_suites/wf_local_func_tests.py index 223ecbfdaa..5f00b7631f 100644 --- a/tests/end_to_end/test_suites/wf_local_func_tests.py +++ b/tests/end_to_end/test_suites/wf_local_func_tests.py @@ -8,7 +8,7 @@ import random from metaflow import Step -from tests.end_to_end.utils.common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr +from tests.end_to_end.utils.wf_common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr from tests.end_to_end.workflow.exclude_flow import TestFlowExclude from tests.end_to_end.workflow.include_exclude_flow import TestFlowIncludeExclude from tests.end_to_end.workflow.include_flow import TestFlowInclude diff --git a/tests/end_to_end/utils/generate_report.py b/tests/end_to_end/utils/generate_report.py index 1e014fa3d1..bba5a79052 100644 --- a/tests/end_to_end/utils/generate_report.py +++ b/tests/end_to_end/utils/generate_report.py @@ -24,10 +24,14 @@ def chapter_body(self, body): def generate_memory_report(memory_usage_dict, workspace_path): """ - Generates a memory usage report from a CSV file. + Generates a memory usage report using input dictionary + and saves it to a PDF file. + Content of memory_usage_dict comes from reading the aggregator + and collaborator memory usage json files inside respective logs folder. Parameters: - file_path (str): The path to the CSV file containing memory usage data. + memory_usage_dict (dict): A dictionary containing memory usage data. + workspace_path (str): The path to the workspace where the report will be saved. Returns: None @@ -37,22 +41,22 @@ def generate_memory_report(memory_usage_dict, workspace_path): # Plotting the chart plt.figure(figsize=(10, 5)) - plt.plot(data["round_number"], data["virtual_memory/used"], marker="o") + plt.plot(data["round_number"], data["process_memory"], marker="o") plt.title("Memory Usage per Round") plt.xlabel("round_number") - plt.ylabel("Virtual Memory Used (MB)") + plt.ylabel("Process Memory Used (MB)") plt.grid(True) output_path = f"{workspace_path}/mem_usage_plot.png" plt.savefig(output_path) plt.close() # Calculate statistics - min_mem = round(data["virtual_memory/used"].min(), 2) - max_mem = round(data["virtual_memory/used"].max(), 2) - mean_mem = round(data["virtual_memory/used"].mean(), 2) - variance_mem = round(data["virtual_memory/used"].var(), 2) - std_dev_mem = round(data["virtual_memory/used"].std(), 2) - slope, _, _, _, _ = linregress(data.index, data["virtual_memory/used"]) + min_mem = round(data["process_memory"].min(), 2) + max_mem = round(data["process_memory"].max(), 2) + mean_mem = round(data["process_memory"].mean(), 2) + variance_mem = round(data["process_memory"].var(), 2) + std_dev_mem = round(data["process_memory"].std(), 2) + slope, _, _, _, _ = linregress(data.index, data["process_memory"]) slope = round(slope, 2) stats_path = f"{workspace_path}/mem_stats.txt" with open(stats_path, "w") as file: @@ -87,7 +91,7 @@ def add_introduction(pdf): def add_chart_analysis(pdf, output_path, data): pdf.chapter_title("Chart Analysis") pdf.image(output_path, w=180) - diffs = data["virtual_memory/used"].diff().round(2) + diffs = data["process_memory"].diff().round(2) significant_changes = diffs[diffs.abs() > 500] for index, value in significant_changes.items(): pdf.chapter_body( diff --git a/tests/end_to_end/utils/common_fixtures.py b/tests/end_to_end/utils/tr_common_fixtures.py similarity index 58% rename from tests/end_to_end/utils/common_fixtures.py rename to tests/end_to_end/utils/tr_common_fixtures.py index 951670bc69..593e57e473 100644 --- a/tests/end_to_end/utils/common_fixtures.py +++ b/tests/end_to_end/utils/tr_common_fixtures.py @@ -5,7 +5,6 @@ import collections import concurrent.futures import logging -import numpy as np import tests.end_to_end.utils.constants as constants import tests.end_to_end.utils.federation_helper as fh @@ -21,10 +20,6 @@ "model_owner, aggregator, collaborators, workspace_path, local_bind_path", ) -workflow_local_fixture = collections.namedtuple( - "workflow_local_fixture", - "aggregator, collaborators, runtime", -) @pytest.fixture(scope="function") def fx_federation_tr(request): @@ -235,136 +230,3 @@ def fx_federation_tr_dws(request): workspace_path=workspace_path, local_bind_path=local_bind_path, ) - - -@pytest.fixture(scope="function") -def fx_local_federated_workflow(request): - """ - Fixture to set up a local federated workflow for testing. - This fixture initializes an `Aggregator` and sets up a list of collaborators - based on the number specified in the test configuration. It also configures - a `LocalRuntime` with the aggregator, collaborators, and an optional backend - if specified in the test configuration. - Args: - request (FixtureRequest): The pytest request object that provides access - to the test configuration. - Yields: - LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, - collaborators, and backend. - """ - # Import is done inline because Task Runner does not support importing below openfl packages - from openfl.experimental.workflow.interface import Aggregator, Collaborator - from openfl.experimental.workflow.runtime import LocalRuntime - from tests.end_to_end.utils.wf_helper import ( - init_collaborator_private_attr_index, - init_collaborator_private_attr_name, - init_collaborate_pvt_attr_np, - init_agg_pvt_attr_np - ) - collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None - collab_value = request.param[1] if hasattr(request, 'param') and request.param else None - agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None - - # Get the callback functions from the locals using string - collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None - agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None - collaborators_list = [] - - if agg_callback_func_name: - aggregator = Aggregator( name="agg", - private_attributes_callable=agg_callback_func_name) - else: - aggregator = Aggregator() - - # Setup collaborators - for i in range(request.config.num_collaborators): - func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None - collaborators_list.append( - Collaborator( - name=f"collaborator{i}", - private_attributes_callable=collab_callback_func_name, - param = func_var - ) - ) - - backend = request.config.backend if hasattr(request.config, 'backend') else None - if backend: - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend) - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list) - - # Return the federation fixture - return workflow_local_fixture( - aggregator=aggregator, - collaborators=collaborators_list, - runtime=local_runtime, - ) - - -@pytest.fixture(scope="function") -def fx_local_federated_workflow_prvt_attr(request): - """ - Fixture to set up a local federated workflow for testing. - This fixture initializes an `Aggregator` and sets up a list of collaborators - based on the number specified in the test configuration. It also configures - a `LocalRuntime` with the aggregator, collaborators, and an optional backend - if specified in the test configuration. - Args: - request (FixtureRequest): The pytest request object that provides access - to the test configuration. - Yields: - LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, - collaborators, and backend. - """ - # Import is done inline because Task Runner does not support importing below openfl packages - from openfl.experimental.workflow.interface import Aggregator, Collaborator - from openfl.experimental.workflow.runtime import LocalRuntime - from tests.end_to_end.utils.wf_helper import ( - init_collaborator_private_attr_index, - init_collaborator_private_attr_name, - init_collaborate_pvt_attr_np, - init_agg_pvt_attr_np - ) - collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None - collab_value = request.param[1] if hasattr(request, 'param') and request.param else None - agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None - - # Get the callback functions from the locals using string - collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None - agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None - collaborators_list = [] - - # Setup aggregator - if agg_callback_func_name: - aggregator = Aggregator(name="agg", - private_attributes_callable=agg_callback_func_name) - else: - aggregator = Aggregator() - - aggregator.private_attributes = { - "test_loader_pvt": np.random.rand(10, 28, 28) # Random data - } - # Setup collaborators - for i in range(request.config.num_collaborators): - func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None - collab = Collaborator( - name=f"collaborator{i}", - private_attributes_callable=collab_callback_func_name, - param = func_var - ) - collab.private_attributes = { - "train_loader_pvt": np.random.rand(i * 50, 28, 28), - "test_loader_pvt": np.random.rand(i * 10, 28, 28), - } - collaborators_list.append(collab) - - backend = request.config.backend if hasattr(request.config, 'backend') else None - if backend: - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend) - local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list) - - # Return the federation fixture - return workflow_local_fixture( - aggregator=aggregator, - collaborators=collaborators_list, - runtime=local_runtime, - ) diff --git a/tests/end_to_end/utils/wf_common_fixtures.py b/tests/end_to_end/utils/wf_common_fixtures.py new file mode 100644 index 0000000000..44e6629990 --- /dev/null +++ b/tests/end_to_end/utils/wf_common_fixtures.py @@ -0,0 +1,155 @@ +# Copyright 2020-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import collections +import numpy as np + +from openfl.experimental.workflow.interface import Aggregator, Collaborator +from openfl.experimental.workflow.runtime import LocalRuntime + +# Define a named tuple to store the objects for model owner, aggregator, and collaborators +workflow_local_fixture = collections.namedtuple( + "workflow_local_fixture", + "aggregator, collaborators, runtime", +) + + +@pytest.fixture(scope="function") +def fx_local_federated_workflow(request): + """ + Fixture to set up a local federated workflow for testing. + This fixture initializes an `Aggregator` and sets up a list of collaborators + based on the number specified in the test configuration. It also configures + a `LocalRuntime` with the aggregator, collaborators, and an optional backend + if specified in the test configuration. + Args: + request (FixtureRequest): The pytest request object that provides access + to the test configuration. + Yields: + LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, + collaborators, and backend. + """ + # Inline import + from tests.end_to_end.utils.wf_helper import ( + init_collaborator_private_attr_index, + init_collaborator_private_attr_name, + init_collaborate_pvt_attr_np, + init_agg_pvt_attr_np + ) + collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None + collab_value = request.param[1] if hasattr(request, 'param') and request.param else None + agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None + + # Get the callback functions from the locals using string + collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None + agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None + collaborators_list = [] + + if agg_callback_func_name: + aggregator = Aggregator( name="agg", + private_attributes_callable=agg_callback_func_name) + else: + aggregator = Aggregator() + + # Setup collaborators + for i in range(request.config.num_collaborators): + func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None + collaborators_list.append( + Collaborator( + name=f"collaborator{i}", + private_attributes_callable=collab_callback_func_name, + param = func_var + ) + ) + + backend = request.config.backend if hasattr(request.config, 'backend') else None + if backend: + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend) + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list) + + # Return the federation fixture + return workflow_local_fixture( + aggregator=aggregator, + collaborators=collaborators_list, + runtime=local_runtime, + ) + + +@pytest.fixture(scope="function") +def fx_local_federated_workflow_prvt_attr(request): + """ + Fixture to set up a local federated workflow for testing. + This fixture initializes an `Aggregator` and sets up a list of collaborators + based on the number specified in the test configuration. It also configures + a `LocalRuntime` with the aggregator, collaborators, and an optional backend + if specified in the test configuration. + Args: + request (FixtureRequest): The pytest request object that provides access + to the test configuration. + Yields: + LocalRuntime: An instance of `LocalRuntime` configured with the aggregator, + collaborators, and backend. + """ + # Inline import + from tests.end_to_end.utils.wf_helper import ( + init_collaborator_private_attr_index, + init_collaborator_private_attr_name, + init_collaborate_pvt_attr_np, + init_agg_pvt_attr_np + ) + collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None + collab_value = request.param[1] if hasattr(request, 'param') and request.param else None + agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None + + # Get the callback functions from the locals using string + collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None + agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None + collaborators_list = [] + + # Setup aggregator + if agg_callback_func_name: + aggregator = Aggregator(name="agg", + private_attributes_callable=agg_callback_func_name) + else: + aggregator = Aggregator() + + aggregator.private_attributes = { + "test_loader_pvt": np.random.rand(10, 28, 28) # Random data + } + # Setup collaborators + for i in range(request.config.num_collaborators): + func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None + collab = Collaborator( + name=f"collaborator{i}", + private_attributes_callable=collab_callback_func_name, + param = func_var + ) + collab.private_attributes = { + "train_loader_pvt": np.random.rand(i * 50, 28, 28), + "test_loader_pvt": np.random.rand(i * 10, 28, 28), + } + collaborators_list.append(collab) + + backend = request.config.backend if hasattr(request.config, 'backend') else None + if backend: + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend) + local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list) + + # Return the federation fixture + return workflow_local_fixture( + aggregator=aggregator, + collaborators=collaborators_list, + runtime=local_runtime, + ) + + +@pytest.fixture(scope="function") +def fx_federated_runtime(request): + request.config.test_env = "workflow_federation_runtime" + + envoys = ["Portland, Seattle, Chandler, Bangalore"] + return workflow_federated_runtime_fixture( + director="director", + envoys=envoys + ) diff --git a/tests/end_to_end/workflow/exclude_flow.py b/tests/end_to_end/workflow/exclude_flow.py index f364dcbbaa..3f68033363 100644 --- a/tests/end_to_end/workflow/exclude_flow.py +++ b/tests/end_to_end/workflow/exclude_flow.py @@ -12,6 +12,7 @@ class TestFlowExclude(FLSpec): """ Testflow to validate exclude functionality in Federated Flow """ + __test__ = False # to prevent pytest from trying to discover tests in the class @aggregator def start(self): diff --git a/tests/end_to_end/workflow/include_exclude_flow.py b/tests/end_to_end/workflow/include_exclude_flow.py index b30e00d8d1..4a78787ac4 100644 --- a/tests/end_to_end/workflow/include_exclude_flow.py +++ b/tests/end_to_end/workflow/include_exclude_flow.py @@ -11,6 +11,7 @@ class TestFlowIncludeExclude(FLSpec): """ Testflow to validate include and exclude functionality in Federated Flow. """ + __test__ = False # to prevent pytest from trying to discover tests in the class @aggregator def start(self): diff --git a/tests/end_to_end/workflow/include_flow.py b/tests/end_to_end/workflow/include_flow.py index 7009e50a46..4cb83cf25a 100644 --- a/tests/end_to_end/workflow/include_flow.py +++ b/tests/end_to_end/workflow/include_flow.py @@ -12,6 +12,7 @@ class TestFlowInclude(FLSpec): """ Testflow to validate include functionality in Federated Flow """ + __test__ = False # to prevent pytest from trying to discover tests in the class @aggregator def start(self): diff --git a/tests/end_to_end/workflow/internal_loop.py b/tests/end_to_end/workflow/internal_loop.py index 8c506018eb..709121876f 100644 --- a/tests/end_to_end/workflow/internal_loop.py +++ b/tests/end_to_end/workflow/internal_loop.py @@ -13,6 +13,8 @@ log = logging.getLogger(__name__) class TestFlowInternalLoop(FLSpec): + __test__ = False # to prevent pytest from trying to discover tests in the class + def __init__(self, model=None, optimizer=None, rounds=None, **kwargs): super().__init__(**kwargs) self.training_rounds = rounds diff --git a/tests/end_to_end/workflow/private_attr_both.py b/tests/end_to_end/workflow/private_attr_both.py index 44f171f723..a44f6332b5 100644 --- a/tests/end_to_end/workflow/private_attr_both.py +++ b/tests/end_to_end/workflow/private_attr_both.py @@ -15,6 +15,7 @@ class TestFlowPrivateAttributesBoth(FLSpec): Testflow to validate Aggregator private attributes are not accessible to collaborators and vice versa """ + __test__ = False # to prevent pytest from trying to discover tests in the class @aggregator def start(self): diff --git a/tests/end_to_end/workflow/private_attr_wo_callable.py b/tests/end_to_end/workflow/private_attr_wo_callable.py index b32758178f..171594d914 100644 --- a/tests/end_to_end/workflow/private_attr_wo_callable.py +++ b/tests/end_to_end/workflow/private_attr_wo_callable.py @@ -14,6 +14,7 @@ class TestFlowPrivateAttributesWoCallable(FLSpec): Testflow to validate Aggregator private attributes are not accessible to collaborators and vice versa """ + __test__ = False # to prevent pytest from trying to discover tests in the class @aggregator def start(self): diff --git a/tests/end_to_end/workflow/private_attributes_flow.py b/tests/end_to_end/workflow/private_attributes_flow.py index 92c9f90d2a..713aee6f04 100644 --- a/tests/end_to_end/workflow/private_attributes_flow.py +++ b/tests/end_to_end/workflow/private_attributes_flow.py @@ -16,6 +16,7 @@ class TestFlowPrivateAttributes(FLSpec): Testflow to validate Aggregator private attributes are not accessible to collaborators and vice versa """ + __test__ = False # to prevent pytest from trying to discover tests in the class @aggregator def start(self): @@ -98,6 +99,7 @@ def end(self): log.info("Testing FederatedFlow - Ending Test for accessibility of private attributes") log.info("...Test case passed...") + def validate_collab_private_attr(self, private_attr, step_name): """ Validates the private attributes of the aggregator and collaborators. diff --git a/tests/end_to_end/workflow/reference_exclude.py b/tests/end_to_end/workflow/reference_exclude.py index 6f8ee766d0..d650e1d6c3 100644 --- a/tests/end_to_end/workflow/reference_exclude.py +++ b/tests/end_to_end/workflow/reference_exclude.py @@ -33,7 +33,7 @@ class TestFlowReferenceWithExclude(FLSpec): """ Testflow to validate references of collaborator attributes in Federated Flow with exclude. """ - + __test__ = False # to prevent pytest from trying to discover tests in the class step_one_collab_attrs = [] step_two_collab_attrs = [] diff --git a/tests/end_to_end/workflow/reference_flow.py b/tests/end_to_end/workflow/reference_flow.py index 24f2cc9850..8e15c24a5c 100644 --- a/tests/end_to_end/workflow/reference_flow.py +++ b/tests/end_to_end/workflow/reference_flow.py @@ -33,6 +33,7 @@ class TestFlowReference(FLSpec): """ Testflow to validate references of collaborator attributes in Federated Flow. """ + __test__ = False # to prevent pytest from trying to discover tests in the class step_one_collab_attrs = [] step_two_collab_attrs = [] all_ref_error_dict = {} diff --git a/tests/end_to_end/workflow/reference_include_flow.py b/tests/end_to_end/workflow/reference_include_flow.py index 65acccc866..187beb3148 100644 --- a/tests/end_to_end/workflow/reference_include_flow.py +++ b/tests/end_to_end/workflow/reference_include_flow.py @@ -29,6 +29,7 @@ def forward(self, x): class TestFlowReferenceWithInclude(FLSpec): + __test__ = False # to prevent pytest from trying to discover tests in the class step_one_collab_attrs = [] step_two_collab_attrs = [] diff --git a/tests/end_to_end/workflow/subset_flow.py b/tests/end_to_end/workflow/subset_flow.py index c4fa8cae39..b84498b582 100644 --- a/tests/end_to_end/workflow/subset_flow.py +++ b/tests/end_to_end/workflow/subset_flow.py @@ -14,6 +14,7 @@ class TestFlowSubsetCollaborators(FLSpec): """ Testflow to validate working of Subset Collaborators in Federated Flow. """ + __test__ = False # to prevent pytest from trying to discover tests in the class def __init__(self, random_ints=[], **kwargs) -> None: """