Skip to content

Commit

Permalink
Fix for PytestCollectionWarning: cannot collect test class because it… (
Browse files Browse the repository at this point in the history
#1249)

* Fix for PytestCollectionWarning: cannot collect test class because it has a __init__ constructor

Signed-off-by: noopur <[email protected]>

* Keep local specific imports inside fixtures

Signed-off-by: noopur <[email protected]>

* Replaced virtual memory with process memory

Signed-off-by: noopur <[email protected]>

---------

Signed-off-by: noopur <[email protected]>
  • Loading branch information
noopurintel authored Jan 8, 2025
1 parent 13a727e commit 52c0ba8
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 154 deletions.
2 changes: 1 addition & 1 deletion tests/end_to_end/test_suites/memory_logs_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import os

from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws
from tests.end_to_end.utils.tr_common_fixtures import fx_federation_tr, fx_federation_tr_dws
import tests.end_to_end.utils.constants as constants
from tests.end_to_end.utils import federation_helper as fed_helper, ssh_helper as ssh
from tests.end_to_end.utils.generate_report import generate_memory_report, convert_to_json
Expand Down
2 changes: 1 addition & 1 deletion tests/end_to_end/test_suites/sample_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
import logging

from tests.end_to_end.utils.common_fixtures import (
from tests.end_to_end.utils.tr_common_fixtures import (
fx_federation_tr,
fx_federation_tr_dws,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/end_to_end/test_suites/task_runner_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
import logging

from tests.end_to_end.utils.common_fixtures import (
from tests.end_to_end.utils.tr_common_fixtures import (
fx_federation_tr,
fx_federation_tr_dws,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/end_to_end/test_suites/wf_local_func_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import random
from metaflow import Step

from tests.end_to_end.utils.common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr
from tests.end_to_end.utils.wf_common_fixtures import fx_local_federated_workflow, fx_local_federated_workflow_prvt_attr
from tests.end_to_end.workflow.exclude_flow import TestFlowExclude
from tests.end_to_end.workflow.include_exclude_flow import TestFlowIncludeExclude
from tests.end_to_end.workflow.include_flow import TestFlowInclude
Expand Down
26 changes: 15 additions & 11 deletions tests/end_to_end/utils/generate_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ def chapter_body(self, body):

def generate_memory_report(memory_usage_dict, workspace_path):
"""
Generates a memory usage report from a CSV file.
Generates a memory usage report using input dictionary
and saves it to a PDF file.
Content of memory_usage_dict comes from reading the aggregator
and collaborator memory usage json files inside respective logs folder.
Parameters:
file_path (str): The path to the CSV file containing memory usage data.
memory_usage_dict (dict): A dictionary containing memory usage data.
workspace_path (str): The path to the workspace where the report will be saved.
Returns:
None
Expand All @@ -37,22 +41,22 @@ def generate_memory_report(memory_usage_dict, workspace_path):

# Plotting the chart
plt.figure(figsize=(10, 5))
plt.plot(data["round_number"], data["virtual_memory/used"], marker="o")
plt.plot(data["round_number"], data["process_memory"], marker="o")
plt.title("Memory Usage per Round")
plt.xlabel("round_number")
plt.ylabel("Virtual Memory Used (MB)")
plt.ylabel("Process Memory Used (MB)")
plt.grid(True)
output_path = f"{workspace_path}/mem_usage_plot.png"
plt.savefig(output_path)
plt.close()

# Calculate statistics
min_mem = round(data["virtual_memory/used"].min(), 2)
max_mem = round(data["virtual_memory/used"].max(), 2)
mean_mem = round(data["virtual_memory/used"].mean(), 2)
variance_mem = round(data["virtual_memory/used"].var(), 2)
std_dev_mem = round(data["virtual_memory/used"].std(), 2)
slope, _, _, _, _ = linregress(data.index, data["virtual_memory/used"])
min_mem = round(data["process_memory"].min(), 2)
max_mem = round(data["process_memory"].max(), 2)
mean_mem = round(data["process_memory"].mean(), 2)
variance_mem = round(data["process_memory"].var(), 2)
std_dev_mem = round(data["process_memory"].std(), 2)
slope, _, _, _, _ = linregress(data.index, data["process_memory"])
slope = round(slope, 2)
stats_path = f"{workspace_path}/mem_stats.txt"
with open(stats_path, "w") as file:
Expand Down Expand Up @@ -87,7 +91,7 @@ def add_introduction(pdf):
def add_chart_analysis(pdf, output_path, data):
pdf.chapter_title("Chart Analysis")
pdf.image(output_path, w=180)
diffs = data["virtual_memory/used"].diff().round(2)
diffs = data["process_memory"].diff().round(2)
significant_changes = diffs[diffs.abs() > 500]
for index, value in significant_changes.items():
pdf.chapter_body(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import collections
import concurrent.futures
import logging
import numpy as np

import tests.end_to_end.utils.constants as constants
import tests.end_to_end.utils.federation_helper as fh
Expand All @@ -21,10 +20,6 @@
"model_owner, aggregator, collaborators, workspace_path, local_bind_path",
)

workflow_local_fixture = collections.namedtuple(
"workflow_local_fixture",
"aggregator, collaborators, runtime",
)

@pytest.fixture(scope="function")
def fx_federation_tr(request):
Expand Down Expand Up @@ -235,136 +230,3 @@ def fx_federation_tr_dws(request):
workspace_path=workspace_path,
local_bind_path=local_bind_path,
)


@pytest.fixture(scope="function")
def fx_local_federated_workflow(request):
"""
Fixture to set up a local federated workflow for testing.
This fixture initializes an `Aggregator` and sets up a list of collaborators
based on the number specified in the test configuration. It also configures
a `LocalRuntime` with the aggregator, collaborators, and an optional backend
if specified in the test configuration.
Args:
request (FixtureRequest): The pytest request object that provides access
to the test configuration.
Yields:
LocalRuntime: An instance of `LocalRuntime` configured with the aggregator,
collaborators, and backend.
"""
# Import is done inline because Task Runner does not support importing below openfl packages
from openfl.experimental.workflow.interface import Aggregator, Collaborator
from openfl.experimental.workflow.runtime import LocalRuntime
from tests.end_to_end.utils.wf_helper import (
init_collaborator_private_attr_index,
init_collaborator_private_attr_name,
init_collaborate_pvt_attr_np,
init_agg_pvt_attr_np
)
collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None
collab_value = request.param[1] if hasattr(request, 'param') and request.param else None
agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None

# Get the callback functions from the locals using string
collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None
agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None
collaborators_list = []

if agg_callback_func_name:
aggregator = Aggregator( name="agg",
private_attributes_callable=agg_callback_func_name)
else:
aggregator = Aggregator()

# Setup collaborators
for i in range(request.config.num_collaborators):
func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None
collaborators_list.append(
Collaborator(
name=f"collaborator{i}",
private_attributes_callable=collab_callback_func_name,
param = func_var
)
)

backend = request.config.backend if hasattr(request.config, 'backend') else None
if backend:
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend)
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list)

# Return the federation fixture
return workflow_local_fixture(
aggregator=aggregator,
collaborators=collaborators_list,
runtime=local_runtime,
)


@pytest.fixture(scope="function")
def fx_local_federated_workflow_prvt_attr(request):
"""
Fixture to set up a local federated workflow for testing.
This fixture initializes an `Aggregator` and sets up a list of collaborators
based on the number specified in the test configuration. It also configures
a `LocalRuntime` with the aggregator, collaborators, and an optional backend
if specified in the test configuration.
Args:
request (FixtureRequest): The pytest request object that provides access
to the test configuration.
Yields:
LocalRuntime: An instance of `LocalRuntime` configured with the aggregator,
collaborators, and backend.
"""
# Import is done inline because Task Runner does not support importing below openfl packages
from openfl.experimental.workflow.interface import Aggregator, Collaborator
from openfl.experimental.workflow.runtime import LocalRuntime
from tests.end_to_end.utils.wf_helper import (
init_collaborator_private_attr_index,
init_collaborator_private_attr_name,
init_collaborate_pvt_attr_np,
init_agg_pvt_attr_np
)
collab_callback_func = request.param[0] if hasattr(request, 'param') and request.param else None
collab_value = request.param[1] if hasattr(request, 'param') and request.param else None
agg_callback_func = request.param[2] if hasattr(request, 'param') and request.param else None

# Get the callback functions from the locals using string
collab_callback_func_name = locals()[collab_callback_func] if collab_callback_func else None
agg_callback_func_name = locals()[agg_callback_func] if agg_callback_func else None
collaborators_list = []

# Setup aggregator
if agg_callback_func_name:
aggregator = Aggregator(name="agg",
private_attributes_callable=agg_callback_func_name)
else:
aggregator = Aggregator()

aggregator.private_attributes = {
"test_loader_pvt": np.random.rand(10, 28, 28) # Random data
}
# Setup collaborators
for i in range(request.config.num_collaborators):
func_var = i if collab_value == "int" else f"collaborator{i}" if collab_value == "str" else None
collab = Collaborator(
name=f"collaborator{i}",
private_attributes_callable=collab_callback_func_name,
param = func_var
)
collab.private_attributes = {
"train_loader_pvt": np.random.rand(i * 50, 28, 28),
"test_loader_pvt": np.random.rand(i * 10, 28, 28),
}
collaborators_list.append(collab)

backend = request.config.backend if hasattr(request.config, 'backend') else None
if backend:
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list, backend=backend)
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators_list)

# Return the federation fixture
return workflow_local_fixture(
aggregator=aggregator,
collaborators=collaborators_list,
runtime=local_runtime,
)
Loading

0 comments on commit 52c0ba8

Please sign in to comment.