From cbf291ac0b058eeedcd257b2b81e671515a2f0e3 Mon Sep 17 00:00:00 2001 From: MasterSkepticista Date: Thu, 16 Jan 2025 14:00:21 +0530 Subject: [PATCH] Revert "Remove fw code"; keep code until interactive API is removed This reverts commit 0ca06ba78aac5a9050ab0af09a6830fba7ad2006. Signed-off-by: MasterSkepticista --- openfl/native/__init__.py | 7 + openfl/native/fastestimator.py | 226 ++++++++++++++++++++ openfl/native/native.py | 371 +++++++++++++++++++++++++++++++++ 3 files changed, 604 insertions(+) create mode 100644 openfl/native/__init__.py create mode 100644 openfl/native/fastestimator.py create mode 100644 openfl/native/native.py diff --git a/openfl/native/__init__.py b/openfl/native/__init__.py new file mode 100644 index 0000000000..230ded708d --- /dev/null +++ b/openfl/native/__init__.py @@ -0,0 +1,7 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +"""openfl.native package.""" + +from openfl.native.native import * # NOQA diff --git a/openfl/native/fastestimator.py b/openfl/native/fastestimator.py new file mode 100644 index 0000000000..a95db185d0 --- /dev/null +++ b/openfl/native/fastestimator.py @@ -0,0 +1,226 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +"""FederatedFastEstimator module.""" + +import os +from logging import getLogger +from pathlib import Path +from sys import path + +import fastestimator as fe +from fastestimator.trace.io.best_model_saver import BestModelSaver + +import openfl.native as fx +from openfl.federated import Plan +from openfl.federated.data import FastEstimatorDataLoader +from openfl.federated.task import FastEstimatorTaskRunner +from openfl.protocols import utils +from openfl.utilities.split import split_tensor_dict_for_holdouts + + +class FederatedFastEstimator: + """A wrapper for fastestimator.estimator that allows running in federated + mode. + + Attributes: + estimator: The FastEstimator to be used. + logger: A logger to record events. + rounds: The number of rounds to train. + """ + + def __init__(self, estimator, override_config: dict = None, **kwargs): + """Initializes a new instance of the FederatedFastEstimator class. + + Args: + estimator: The FastEstimator to be used. + override_config (dict, optional): A dictionary to override the + default configuration. Defaults to None. + **kwargs: Additional keyword arguments. + """ + self.estimator = estimator + self.logger = getLogger(__name__) + fx.init(**kwargs) + if override_config: + fx.update_plan(override_config) + + def fit(self): # noqa: C901 + """Runs the estimator in federated mode.""" + file = Path(__file__).resolve() + # interface root, containing command modules + root = file.parent.resolve() + work = Path.cwd().resolve() + path.append(str(root)) + path.insert(0, str(work)) + + # TODO: Fix this implementation. The full plan parsing is reused here, + # but the model and data will be overwritten based on + # user specifications + plan_config = Path(fx.WORKSPACE_PREFIX) / "plan" / "plan.yaml" + cols_config = Path(fx.WORKSPACE_PREFIX) / "plan" / "cols.yaml" + data_config = Path(fx.WORKSPACE_PREFIX) / "plan" / "data.yaml" + + plan = Plan.parse( + plan_config_path=plan_config, + cols_config_path=cols_config, + data_config_path=data_config, + ) + + self.rounds = plan.config["aggregator"]["settings"]["rounds_to_train"] + data_loader = FastEstimatorDataLoader(self.estimator.pipeline) + runner = FastEstimatorTaskRunner(self.estimator, data_loader=data_loader) + # Overwrite plan values + tensor_pipe = plan.get_tensor_pipe() + # Initialize model weights + init_state_path = plan.config["aggregator"]["settings"]["init_state_path"] + tensor_dict, holdout_params = split_tensor_dict_for_holdouts( + self.logger, runner.get_tensor_dict(False) + ) + + model_snap = utils.construct_model_proto( + tensor_dict=tensor_dict, round_number=0, tensor_pipe=tensor_pipe + ) + + self.logger.info(f"Creating Initial Weights File" f" 🠆 {init_state_path}") + + utils.dump_proto(model_proto=model_snap, fpath=init_state_path) + + self.logger.info("Starting Experiment...") + + aggregator = plan.get_aggregator() + + model_states = dict.fromkeys(plan.authorized_cols, None) + runners = {} + save_dir = {} + data_path = 1 + for col in plan.authorized_cols: + data = self.estimator.pipeline.data + train_data, eval_data, test_data = split_data( + data["train"], + data["eval"], + data["test"], + data_path, + len(plan.authorized_cols), + ) + pipeline_kwargs = {} + for k, v in self.estimator.pipeline.__dict__.items(): + if k in [ + "batch_size", + "ops", + "num_process", + "drop_last", + "pad_value", + "collate_fn", + ]: + pipeline_kwargs[k] = v + pipeline_kwargs.update( + { + "train_data": train_data, + "eval_data": eval_data, + "test_data": test_data, + } + ) + pipeline = fe.Pipeline(**pipeline_kwargs) + + data_loader = FastEstimatorDataLoader(pipeline) + self.estimator.system.pipeline = pipeline + + runners[col] = FastEstimatorTaskRunner( + estimator=self.estimator, data_loader=data_loader + ) + runners[col].set_optimizer_treatment("CONTINUE_LOCAL") + + for trace in runners[col].estimator.system.traces: + if isinstance(trace, BestModelSaver): + save_dir_path = f"{trace.save_dir}/{col}" + os.makedirs(save_dir_path, exist_ok=True) + save_dir[col] = save_dir_path + + data_path += 1 + + # Create the collaborators + collaborators = { + collaborator: fx.create_collaborator( + plan, collaborator, runners[collaborator], aggregator + ) + for collaborator in plan.authorized_cols + } + + model = None + for round_num in range(self.rounds): + for col in plan.authorized_cols: + collaborator = collaborators[col] + + if round_num != 0: + # For FastEstimator Jupyter notebook, models must be + # saved in different directories (i.e. path must be + # reset here) + + runners[col].estimator.system.load_state(f"save/{col}_state") + runners[col].rebuild_model(round_num, model_states[col]) + + # Reset the save directory if BestModelSaver is present + # in traces + for trace in runners[col].estimator.system.traces: + if isinstance(trace, BestModelSaver): + trace.save_dir = save_dir[col] + + collaborator.run_simulation() + + model_states[col] = runners[col].get_tensor_dict(with_opt_vars=True) + model = runners[col].model + runners[col].estimator.system.save_state(f"save/{col}_state") + + # TODO This will return the model from the last collaborator, + # NOT the final aggregated model (though they should be similar). + # There should be a method added to the aggregator that will load + # the best model from disk and return it + return model + + +def split_data(train, eva, test, rank, collaborator_count): + """Split data into N parts, where N is the collaborator count. + + Args: + train : The training data. + eva : The evaluation data. + test : The testing data. + rank (int): The rank of the current collaborator. + collaborator_count (int): The total number of collaborators. + + Returns: + tuple: The training, evaluation, and testing data for the current + collaborator. + """ + if collaborator_count == 1: + return train, eva, test + + fraction = [1.0 / float(collaborator_count)] + fraction *= collaborator_count - 1 + + # Expand the split list into individual parameters + train_split = train.split(*fraction) + eva_split = eva.split(*fraction) + test_split = test.split(*fraction) + + train = [train] + eva = [eva] + test = [test] + + if type(train_split) is not list: + train.append(train_split) + eva.append(eva_split) + test.append(test_split) + else: + # Combine all partitions into a single list + train = [train] + train_split + eva = [eva] + eva_split + test = [test] + test_split + + # Extract the right shard + train = train[rank - 1] + eva = eva[rank - 1] + test = test[rank - 1] + + return train, eva, test diff --git a/openfl/native/native.py b/openfl/native/native.py new file mode 100644 index 0000000000..117058abbb --- /dev/null +++ b/openfl/native/native.py @@ -0,0 +1,371 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +"""OpenFL Native functions module. + +This file defines openfl entrypoints to be used directly through python (not +CLI) +""" + +import json +import logging +import os +from copy import copy +from importlib import util +from logging import basicConfig, getLogger +from pathlib import Path +from sys import path + +import flatten_json +from rich.console import Console +from rich.logging import RichHandler + +import openfl.interface.aggregator as aggregator +import openfl.interface.collaborator as collaborator +import openfl.interface.workspace as workspace +from openfl.federated import Plan +from openfl.protocols import utils +from openfl.utilities import add_log_level +from openfl.utilities.split import split_tensor_dict_for_holdouts + +logger = getLogger(__name__) + +WORKSPACE_PREFIX = os.path.join(os.path.expanduser("~"), ".local", "workspace") + + +def setup_plan(log_level="CRITICAL"): + """ + Dump the plan with all defaults + overrides set. + + Args: + log_level (str, optional): The log level Whether to save the plan to + disk. + Defaults to 'CRITICAL'. + + Returns: + plan: Plan object. + """ + plan_config = "plan/plan.yaml" + cols_config = "plan/cols.yaml" + data_config = "plan/data.yaml" + + current_level = logging.root.level + getLogger().setLevel(log_level) + plan = Plan.parse( + plan_config_path=Path(plan_config), + cols_config_path=Path(cols_config), + data_config_path=Path(data_config), + resolve=False, + ) + getLogger().setLevel(current_level) + + return plan + + +def flatten(config, return_complete=False): + """ + Flatten nested config. + + Args: + config (dict): The configuration dictionary to flatten. + return_complete (bool, optional): Whether to return the complete + flattened config. Defaults to False. + + Returns: + flattened_config (dict): The flattened configuration dictionary. + """ + flattened_config = flatten_json.flatten(config, ".") + if not return_complete: + keys_to_remove = [k for k, v in flattened_config.items() if ("defaults" in k or v is None)] + else: + keys_to_remove = [k for k, v in flattened_config.items() if v is None] + for k in keys_to_remove: + del flattened_config[k] + + return flattened_config + + +def update_plan(override_config, plan=None, resolve=True): # noqa: C901 + """Updates the plan with the provided override and saves it to disk. + + For a list of available override options, call `fx.get_plan()` + + Args: + override_config (dict): A dictionary of values to override in the plan. + plan (Plan, optional): The plan to update. If None, a new plan is set + up. Defaults to None. + resolve (bool, optional): Whether to resolve the plan. Defaults to + True. + + Returns: + plan (object): The updated plan. + """ + if plan is None: + plan = setup_plan() + flat_plan_config = flatten(plan.config, return_complete=True) + + org_list_keys_with_count = {} + for k in flat_plan_config: + k_split = k.rsplit(".", 1) + if k_split[1].isnumeric(): + if k_split[0] in org_list_keys_with_count: + org_list_keys_with_count[k_split[0]] += 1 + else: + org_list_keys_with_count[k_split[0]] = 1 + + for key, val in override_config.items(): + if key in org_list_keys_with_count: + # remove old list corresponding to this key entirely + for idx in range(org_list_keys_with_count[key]): + del flat_plan_config[f"{key}.{idx}"] + logger.info("Updating %s to %s... ", key, val) + elif key in flat_plan_config: + logger.info("Updating %s to %s... ", key, val) + else: + # TODO: We probably need to validate the new key somehow + logger.info( + "Did not find %s in config. Make sure it should exist. Creating...", + key, + ) + if type(val) is list: + for idx, v in enumerate(val): + flat_plan_config[f"{key}.{idx}"] = v + else: + flat_plan_config[key] = val + + plan.config = unflatten(flat_plan_config, ".") + if resolve: + plan.resolve() + return plan + + +def unflatten(config, separator="."): + """Unfolds `config` settings that have `separator` in their names. + + Args: + config (dict): The flattened configuration dictionary to unfold. + separator (str, optional): The separator used in the flattened config. + Defaults to '.'. + + Returns: + config (dict): The unfolded configuration dictionary. + """ + config = flatten_json.unflatten_list(config, separator) + return config + + +def setup_logging(level="INFO", log_file=None): + """Initializes logging settings. + + Args: + level (str, optional): The log level. Defaults to 'INFO'. + log_file (str, optional): The name of the file to log to. + If None, logs are not saved to a file. Defaults to None. + """ + # Setup logging + + if util.find_spec("tensorflow") is not None: + import tensorflow as tf # pylint: disable=import-outside-toplevel + + tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) + metric = 25 + add_log_level("METRIC", metric) + + if isinstance(level, str): + level = level.upper() + + handlers = [] + if log_file: + fh = logging.FileHandler(log_file) + formatter = logging.Formatter( + "%(asctime)s %(levelname)s %(message)s %(filename)s:%(lineno)d" + ) + fh.setFormatter(formatter) + handlers.append(fh) + + console = Console(width=160) + handlers.append(RichHandler(console=console)) + basicConfig(level=level, format="%(message)s", datefmt="[%X]", handlers=handlers) + + +def init( + workspace_template: str = "default", + log_level: str = "INFO", + log_file: str = None, + agg_fqdn: str = None, + col_names=None, +): + """ + Initialize the openfl package. + + It performs the following tasks: + + 1. Creates a workspace in ~/.local/workspace (Equivalent to `fx + workspace create --prefix ~/.local/workspace --template + $workspace_template) + 2. Setup certificate authority (equivalent to `fx workspace certify`) + 3. Setup aggregator PKI (equivalent to `fx aggregator + generate-cert-request` followed by `fx aggregator certify`) + 4. Setup list of collaborators (col_names) and their PKI. (Equivalent + to running `fx collaborator generate-cert-request` followed by `fx + collaborator certify` for each of the collaborators in col_names) + 5. Setup logging + + Args: + workspace_template (str): The template that should be used as the + basis for the experiment. Defaults to 'default'. + Other options include are any of the template names + [keras_cnn_mnist, tf_2dunet, tf_cnn_histology, + mtorch_cnn_histology, torch_cnn_mnist]. + log_level (str): Log level for logging. METRIC level is available. + Defaults to 'INFO'. + log_file (str): Name of the file in which the log will be duplicated. + If None, logs are not saved to a file. Defaults to None. + agg_fqdn (str): The local node's fully qualified domain name (if it + can't be resolved automatically). Defaults to None. + col_names (list[str]): The names of the collaborators that will be + created. These collaborators will be set up to participate in the + experiment, but are not required to. Defaults to None. + + Returns: + None + """ + if col_names is None: + col_names = ["one", "two"] + workspace.create(WORKSPACE_PREFIX, workspace_template) + os.chdir(WORKSPACE_PREFIX) + workspace.certify() + aggregator.generate_cert_request(agg_fqdn) + aggregator.certify(agg_fqdn, silent=True) + data_path = 1 + for col_name in col_names: + collaborator.create(col_name, str(data_path), silent=True) + collaborator.generate_cert_request(col_name, silent=True, skip_package=True) + collaborator.certify(col_name, silent=True) + data_path += 1 + + setup_logging(level=log_level, log_file=log_file) + + +def get_collaborator(plan, name, model, aggregator): + """Create the collaborator. + + Using the same plan object to create multiple collaborators leads to + identical collaborator objects. This function can be removed once + collaborator generation is fixed in openfl/federated/plan/plan.py + + Args: + plan (Plan): The plan to use to create the collaborator. + name (str): The name of the collaborator. + model (Model): The model to use for the collaborator. + aggregator (Aggregator): The aggregator to use for the collaborator. + + Returns: + Collaborator: The created collaborator. + """ + plan = copy(plan) + + return plan.get_collaborator(name, task_runner=model, client=aggregator) + + +def run_experiment(collaborator_dict: dict, override_config: dict = None): + """Core function that executes the FL Plan. + + Args: + collaborator_dict (dict): A dictionary mapping collaborator names to + their federated models. + Example: {collaborator_name(str): FederatedModel} + This dictionary defines which collaborators will participate in + the experiment, as well as a reference to that collaborator's + federated model. + override_config (dict, optional): A dictionary of values to override + in the plan. Defaults to None. + Example: dict {flplan.key : flplan.value} + Override any of the plan parameters at runtime using this + dictionary. To get a list of the available options, execute + `fx.get_plan()` + + Returns: + model: Final Federated model. The model resulting from the federated + learning experiment + """ + + if override_config is None: + override_config = {} + + file = Path(__file__).resolve() + root = file.parent.resolve() # interface root, containing command modules + work = Path.cwd().resolve() + + path.append(str(root)) + path.insert(0, str(work)) + + # Update the plan if necessary + plan = update_plan(override_config) + # Overwrite plan values + plan.authorized_cols = list(collaborator_dict) + tensor_pipe = plan.get_tensor_pipe() + + # This must be set to the final index of the list (this is the last + # tensorflow session to get created) + plan.runner_ = list(collaborator_dict.values())[-1] + model = plan.runner_ + + # Initialize model weights + init_state_path = plan.config["aggregator"]["settings"]["init_state_path"] + rounds_to_train = plan.config["aggregator"]["settings"]["rounds_to_train"] + tensor_dict, holdout_params = split_tensor_dict_for_holdouts( + logger, model.get_tensor_dict(False) + ) + + model_snap = utils.construct_model_proto( + tensor_dict=tensor_dict, round_number=0, tensor_pipe=tensor_pipe + ) + + logger.info("Creating Initial Weights File 🠆 %s", init_state_path) + + utils.dump_proto(model_proto=model_snap, fpath=init_state_path) + + logger.info("Starting Experiment...") + + aggregator = plan.get_aggregator() + + # get the collaborators + collaborators = { + collaborator: get_collaborator( + plan, collaborator, collaborator_dict[collaborator], aggregator + ) + for collaborator in plan.authorized_cols + } + + for _ in range(rounds_to_train): + for col in plan.authorized_cols: + collaborator = collaborators[col] + collaborator.run_simulation() + + # Set the weights for the final model + model.rebuild_model(rounds_to_train - 1, aggregator.last_tensor_dict, validation=True) + return model + + +def get_plan(fl_plan=None, indent=4, sort_keys=True): + """Returns a string representation of the current Plan. + + Args: + fl_plan (Plan): The plan to get a string representation of. If None, a + new plan is set up. Defaults to None. + indent (int): The number of spaces to use for indentation in the + string representation. Defaults to 4. + sort_keys (bool): Whether to sort the keys in the string + representation. Defaults to True. + + Returns: + str: A string representation of the plan. + """ + if fl_plan is None: + plan = setup_plan() + else: + plan = fl_plan + flat_plan_config = flatten(plan.config) + return json.dumps(flat_plan_config, indent=indent, sort_keys=sort_keys)