Skip to content

Commit

Permalink
Split tables if table index and many columns
Browse files Browse the repository at this point in the history
  • Loading branch information
equinor-ruaj committed Jan 22, 2025
1 parent 650e955 commit 34be2f1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 12 deletions.
92 changes: 81 additions & 11 deletions src/fmu/sumo/sim2sumo/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
3. Uploads to Sumo
"""

import base64
import hashlib
import logging
import sys
from copy import deepcopy
from itertools import islice
from pathlib import Path
from typing import Union

Expand Down Expand Up @@ -38,6 +42,18 @@
"transmissibilities": "transmissibilities",
}

if sys.version_info >= (3, 12):
from itertools import batched
else:
try:
from more_itertools import batched
except ImportError:

def batched(iterable, chunk_size):
iterator = iter(iterable)
while chunk := tuple(islice(iterator, chunk_size)):
yield chunk


def table_2_bytestring(table):
"""Convert pa.table to bytestring
Expand Down Expand Up @@ -97,32 +113,84 @@ def generate_table_meta(datafile, obj, tagname, config):
return metadata


# TODO: If the split is inherent in this method it should probably be renamed
def convert_table_2_sumo_file(datafile, obj, tagname, config):
"""Convert table to Sumo File ready for shipping to sumo
Args:
datafile (str|PosixPath): path to datafile connected to extracted object
obj (pa.Table): The object to prepare for upload
tagname (str): what submodule the table is extracted from
config (dict): dictionary with master metadata needed for Sumo
obj (pa.Table): The object to prepare for upload
tagname (str): what submodule the table is extracted from
config (dict): dictionary with master metadata needed for Sumo
Returns:
SumoFile: Object containing table object as bytestring and
metadata as dictionary
files (list): List of SumoFile objects with table object
as bytestring and metadata as dictionary
"""
if obj is None:
return obj

bytestring = table_2_bytestring(obj)
metadata = generate_table_meta(datafile, obj, tagname, config)

print(f"GENERATED METADATA: {metadata}")
# TODO: Use metadata["data"]["table_index"]
# If table_index is not present, do not split

files = []
chunk_size = 500
columns = metadata["data"]["spec"]["columns"]
table_index = metadata["data"]["table_index"]

if len(columns) > chunk_size and table_index:
cols = [c for c in columns if c not in table_index]
chunks = batched(cols, chunk_size - len(table_index))
for idx, chunk in enumerate(chunks):
chunk_columns = table_index + list(chunk)
table = obj.select(chunk_columns)
chunk_meta = deepcopy(metadata)
bytestring = table_2_bytestring(table)

md5_hex = hashlib.md5(bytestring).hexdigest()
md5_b64 = base64.b64encode(bytes.fromhex(md5_hex)).decode()
chunk_meta["data"]["spec"]["columns"] = columns
chunk_meta["data"]["spec"]["num_columns"] = table.num_columns
chunk_meta["data"]["spec"]["num_rows"] = table.num_rows
chunk_meta["data"]["spec"]["size"] = (
table.num_columns * table.num_rows
)
chunk_meta["file"]["size_bytes"] = len(bytestring)
chunk_meta["file"]["checksum_md5"] = md5_hex
relative_path = metadata["file"]["relative_path"]
chunk_meta["file"]["relative_path"] = relative_path.replace(
f"--{tagname}", f"--{tagname}:{idx:03d}"
)

# TODO: FileOnJob resets _sumo metadata
# TODO: Create FileOnJob first, THEN update sumo_file.metadata["_sumo"]
sumo_file = FileOnJob(bytestring, chunk_meta)
sumo_file.path = chunk_meta["file"]["relative_path"]
sumo_file.metadata_path = ""
sumo_file.size = len(sumo_file.byte_string)

file_meta = sumo_file.metadata
if "_sumo" not in file_meta:
file_meta["_sumo"] = {}
_sumo = file_meta["_sumo"]
_sumo["blob_size"] = len(bytestring)
_sumo["blob_md5"] = md5_b64
_sumo["hidden"] = True
_sumo["fragment"] = idx

files.append(sumo_file)

bytestring = table_2_bytestring(obj)
sumo_file = FileOnJob(bytestring, metadata)
sumo_file.path = metadata["file"]["relative_path"]
sumo_file.metadata_path = ""
sumo_file.size = len(sumo_file.byte_string)
files.append(sumo_file)

return sumo_file
# return sumo_file
return files


def get_table(
Expand Down Expand Up @@ -219,10 +287,11 @@ def upload_vfp_tables_from_simulation_run(
table.schema.metadata[b"TABLE_NUMBER"].decode("utf-8")
)
tagname = f"{keyword}_{table_number}"
sumo_file = convert_table_2_sumo_file(
sumo_files = convert_table_2_sumo_file(
datafile, table, tagname.lower(), config
)
dispatcher.add(sumo_file)
for file in sumo_files:
dispatcher.add(file)


def upload_tables_from_simulation_run(
Expand Down Expand Up @@ -257,7 +326,8 @@ def upload_tables_from_simulation_run(
datafile,
)
continue
sumo_file = convert_table_2_sumo_file(
sumo_files = convert_table_2_sumo_file(
datafile, table, submod, config
)
dispatcher.add(sumo_file)
for file in sumo_files:
dispatcher.add(file)
2 changes: 1 addition & 1 deletion tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def test_convert_table_2_sumo_file(

file = tables.convert_table_2_sumo_file(
scratch_files[1], reekrft, "rft", config
)
)[0]

sumo_conn = SumoConnection(env="dev", token=token)
nodisk_upload([file], case_uuid, "dev", connection=sumo_conn)
Expand Down

0 comments on commit 34be2f1

Please sign in to comment.