From 858b2ce01d80298246a59e4cb6ff403fb06dfead Mon Sep 17 00:00:00 2001 From: Shailesh Pant Date: Mon, 16 Dec 2024 22:34:11 +0530 Subject: [PATCH] - implement a new mode_based_filtering decorator in Assigner class - update all the sub-classes that use task_groups to use the decorator - update fedeval sample workspace to use default assigner, tasks and aggregator - use of federated-evaluation/aggregator.yaml for FedEval specific workspace example to use round_number as 1 - removed assigner and tasks yaml from defaults/federated-evaluation, superseded by default assigner/tasks - Rebase 21-Jan-2025.2 - added additional checks for assigner sub-classes that might not have task_groups - Addressing review comments Signed-off-by: Shailesh Pant --- .../torch_cnn_mnist_fed_eval/plan/plan.yaml | 8 +-- .../workspace/plan/defaults/assigner.yaml | 5 ++ .../federated-evaluation/assigner.yaml | 7 --- .../federated-evaluation/tasks_torch.yaml | 7 --- openfl/component/__init__.py | 1 + openfl/component/assigner/__init__.py | 1 + openfl/component/assigner/assigner.py | 50 ++++++++++++++++++- .../assigner/random_grouped_assigner.py | 3 +- .../assigner/static_grouped_assigner.py | 1 + 9 files changed, 64 insertions(+), 19 deletions(-) delete mode 100644 openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml delete mode 100644 openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml diff --git a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml index 580ce79760..274674af7c 100644 --- a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml @@ -32,10 +32,12 @@ network : defaults : plan/defaults/network.yaml assigner : - defaults : plan/defaults/federated-evaluation/assigner.yaml - + defaults : plan/defaults/assigner.yaml + settings : + mode : evaluate + tasks : - defaults : plan/defaults/federated-evaluation/tasks_torch.yaml + defaults : plan/defaults/tasks_torch.yaml compression_pipeline : defaults : plan/defaults/compression_pipeline.yaml diff --git a/openfl-workspace/workspace/plan/defaults/assigner.yaml b/openfl-workspace/workspace/plan/defaults/assigner.yaml index 6a5903794f..d5f0e66d30 100644 --- a/openfl-workspace/workspace/plan/defaults/assigner.yaml +++ b/openfl-workspace/workspace/plan/defaults/assigner.yaml @@ -7,3 +7,8 @@ settings : - aggregated_model_validation - train - locally_tuned_model_validation + - name : evaluation + percentage : 1.0 + tasks : + - aggregated_model_validation + selected_task_group: learning diff --git a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml b/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml deleted file mode 100644 index c660659e83..0000000000 --- a/openfl-workspace/workspace/plan/defaults/federated-evaluation/assigner.yaml +++ /dev/null @@ -1,7 +0,0 @@ -template : openfl.component.RandomGroupedAssigner -settings : - task_groups : - - name : evaluation - percentage : 1.0 - tasks : - - aggregated_model_validation \ No newline at end of file diff --git a/openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml b/openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml deleted file mode 100644 index f497ca845c..0000000000 --- a/openfl-workspace/workspace/plan/defaults/federated-evaluation/tasks_torch.yaml +++ /dev/null @@ -1,7 +0,0 @@ -aggregated_model_validation: - function : validate_task - kwargs : - apply : global - metrics : - - acc - \ No newline at end of file diff --git a/openfl/component/__init__.py b/openfl/component/__init__.py index 3b787f87d0..45c5226cf0 100644 --- a/openfl/component/__init__.py +++ b/openfl/component/__init__.py @@ -1,6 +1,7 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +"""OpenFL Component Module.""" from openfl.component.aggregator.aggregator import Aggregator from openfl.component.assigner.assigner import Assigner diff --git a/openfl/component/assigner/__init__.py b/openfl/component/assigner/__init__.py index 980a524b7f..18adaab240 100644 --- a/openfl/component/assigner/__init__.py +++ b/openfl/component/assigner/__init__.py @@ -1,6 +1,7 @@ # Copyright 2020-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +"""OpenFL Assigner Module.""" from openfl.component.assigner.assigner import Assigner from openfl.component.assigner.random_grouped_assigner import RandomGroupedAssigner diff --git a/openfl/component/assigner/assigner.py b/openfl/component/assigner/assigner.py index c86aea1ae1..ed05ba33f8 100644 --- a/openfl/component/assigner/assigner.py +++ b/openfl/component/assigner/assigner.py @@ -4,6 +4,11 @@ """Assigner module.""" +import logging +from functools import wraps + +logger = logging.getLogger(__name__) + class Assigner: r""" @@ -35,18 +40,23 @@ class Assigner: \* - ``tasks`` argument is taken from ``tasks`` section of FL plan YAML file. """ - def __init__(self, tasks, authorized_cols, rounds_to_train, **kwargs): + def __init__( + self, tasks, authorized_cols, rounds_to_train, + selected_task_group: str = "learning", **kwargs + ): """Initializes the Assigner. Args: tasks (list of object): List of tasks to assign. authorized_cols (list of str): Collaborators. rounds_to_train (int): Number of training rounds. + selected_task_group (str, optional): Selected task_group. Defaults to "learning". **kwargs: Additional keyword arguments. """ self.tasks = tasks self.authorized_cols = authorized_cols self.rounds = rounds_to_train + self.selected_task_group = selected_task_group self.all_tasks_in_groups = [] self.task_group_collaborators = {} @@ -93,3 +103,41 @@ def get_aggregation_type_for_task(self, task_name): if "aggregation_type" not in self.tasks[task_name]: return None return self.tasks[task_name]["aggregation_type"] + + @classmethod + def task_group_filtering(cls, func): + """Decorator to filter task groups based on selected_task_group. + + This decorator should be applied to define_task_assignments() method + in Assigner subclasses to handle task_group filtering. + """ + + @wraps(func) + def wrapper(self, *args, **kwargs): + # First check if selection of task_group is applicable + if hasattr(self, "selected_task_group"): + # Verify task_groups exists before attempting filtering + if not hasattr(self, "task_groups"): + logger.warning( + "Task group specified for selection but no task_groups found. " + "Skipping filtering. This might be intentional for custom assigners." + ) + return func(self, *args, **kwargs) + + assert self.task_groups, "No task_groups defined in assigner." + + # Perform the filtering + self.task_groups = [ + group for group in self.task_groups if group["name"] == self.selected_task_group + ] + + assert self.task_groups, f"No task groups found for : {self.selected_task_group}" + + # Mode-specific validations + if self.selected_task_group == "evaluation": + assert self.rounds == 1, "Number of rounds should be 1 for evaluation" + + # Call the original method + return func(self, *args, **kwargs) + + return wrapper diff --git a/openfl/component/assigner/random_grouped_assigner.py b/openfl/component/assigner/random_grouped_assigner.py index dea00022a4..7c55f624a1 100644 --- a/openfl/component/assigner/random_grouped_assigner.py +++ b/openfl/component/assigner/random_grouped_assigner.py @@ -38,11 +38,12 @@ def __init__(self, task_groups, **kwargs): Args: task_groups (list of object): Task groups to assign. - **kwargs: Additional keyword arguments. + **kwargs: Additional keyword arguments, including mode. """ self.task_groups = task_groups super().__init__(**kwargs) + @Assigner.task_group_filtering def define_task_assignments(self): """Define task assignments for each round and collaborator. diff --git a/openfl/component/assigner/static_grouped_assigner.py b/openfl/component/assigner/static_grouped_assigner.py index fcb5a59034..3067cbdd67 100644 --- a/openfl/component/assigner/static_grouped_assigner.py +++ b/openfl/component/assigner/static_grouped_assigner.py @@ -42,6 +42,7 @@ def __init__(self, task_groups, **kwargs): self.task_groups = task_groups super().__init__(**kwargs) + @Assigner.task_group_filtering def define_task_assignments(self): """Define task assignments for each round and collaborator.