Source code for randonneur.templates

from collections import defaultdict
from pathlib import Path
from typing import List, Optional

import openpyxl
import xlsxwriter

from randonneur import Datapackage, MappingConstants
from randonneur.licenses import LICENSES as ALL_LICENSES

[docs] ROLES = [ "author", "publisher", "maintainer", "wrangler", "contributor", ]
[docs] MAPPINGS = [ "SIMAPRO_CSV", "ECOSPOLD2", "ECOSPOLD1_BIO", "ECOSPOLD2_BIO", "ECOSPOLD2_BIO_FLOWMAPPER", "CUSTOM", ]
[docs] LICENSES = [ "CC-BY-4.0", "CC-BY-NC-SA-4.0", "CC-BY-ND-4.0", "CC-BY-SA-4.0", "CC0-1.0", "CDL-1.0", "MIT", "ODC-By-1.0", "ODbL-1.0", "OML", "OSL-3.0", "PDDL-1.0", "PROPRIETARY", ]
[docs] def create_excel_template(data: List[dict], filepath: Path, replace_existing: bool = False) -> Path: """ Create an Excel template with optionally some data for a new matching data file. `data` should be a list of dictionaries like `{'source': {}, 'target': {}}`. The keys and values in these sub-dictionaries should be strings, or castable to strings in a way that can be reversed. This function doesn't do any type conversion or other data handling. `filepath` should be the complete filepath of the file to be created, including directory and suffix. `replace_existing`: Flag on whether to overwrite `filepath` if it exists. Returns the filepath of the created file. """ filepath = Path(filepath) if filepath.is_file() and not replace_existing: raise OSError("File `{filepath.name}` already exists and `replace_existing` is false.") source_fields, target_fields = set(), set() for obj in data: source_fields.update(set(obj.get("source", []))) target_fields.update(set(obj.get("target", []))) source_fields, target_fields = sorted(source_fields), sorted(target_fields) source_offset = len(source_fields) workbook = xlsxwriter.Workbook(filepath) header = workbook.add_format({"bold": True, "font_size": 16}) orange = workbook.add_format({"bg_color": "#ffdfba", "bold": True, "font_size": 14}) green = workbook.add_format({"bg_color": "#baffc9", "bold": True, "font_size": 14}) blue = workbook.add_format({"bg_color": "#bae1ff", "bold": True, "font_size": 14}) yellow = workbook.add_format({"bg_color": "#ffffba", "bold": True, "font_size": 14}) red = workbook.add_format({"bg_color": "#ffb3ba", "bold": True, "font_size": 14}) italic = workbook.add_format({"italic": True, "font_size": 12}) normal = workbook.add_format({"font_size": 12}) sheet = workbook.add_worksheet("Matching") sheet.write_string(0, 0, "Randonneur template matching file", header) sheet.write_string(2, 0, "Metadata", header) sheet.write_string(3, 0, "Name", orange) sheet.write_string(4, 0, "Description", orange) sheet.write_string(4, 2, "String; optional", italic) sheet.write_string(5, 0, "License", orange) sheet.write_string(6, 0, "Version", orange) sheet.write_string(6, 2, 'String like "1.0"; optional', italic) sheet.write_string(7, 0, "Source ID", orange) sheet.write_string(7, 2, 'String like "SimaPro-9"; optional', italic) sheet.write_string(8, 0, "Target ID", orange) sheet.write_string(8, 2, 'String like "lcacommons-2024"; optional', italic) sheet.write_string(10, 0, "Contributors", yellow) sheet.write_string(10, 1, "You can add additional rows via copying if needed", italic) sheet.write_string(11, 0, "Name", yellow) sheet.write_string(11, 1, "Role", yellow) sheet.write_string(11, 2, "Homepage", yellow) sheet.write_string(14, 0, "Field mapping", red) sheet.write_string(14, 1, "CUSTOM mapping needs to be defined on import", italic) sheet.write_string(15, 0, "Source mapping", red) sheet.write_string(16, 0, "Target mapping", red) sheet.write_string(18, 0, "Data", header) template_offset = 19 sheet.data_validation(5, 1, 5, 1, {"validate": "list", "source": LICENSES}) sheet.write_string(5, 1, "CC-BY-4.0", normal) sheet.data_validation(12, 1, 12, 1, {"validate": "list", "source": ROLES}) sheet.write_string(12, 1, "author", normal) sheet.data_validation(15, 1, 16, 1, {"validate": "list", "source": list(MAPPINGS)}) sheet.write_string(15, 1, "SIMAPRO_CSV", normal) sheet.write_string(16, 1, "ECOSPOLD2", normal) sheet.write_string(template_offset, 0, "Source", blue) for index in range(1, len(source_fields)): sheet.write_string(template_offset, index, "", blue) sheet.write_string(template_offset, source_offset, "Target", green) for index in range(1, len(target_fields)): sheet.write_string(template_offset, index + source_offset, "", green) for i, label in enumerate(source_fields): sheet.write_string(template_offset + 1, i, label, blue) for i, label in enumerate(target_fields): sheet.write_string(template_offset + 1, i + source_offset, label, green) for row_index, row in enumerate(data): for column_index, label in enumerate(source_fields): if row["source"].get(label): sheet.write_string( template_offset + row_index + 2, column_index, row["source"][label], normal ) for column_index, label in enumerate(target_fields): if row["target"].get(label): sheet.write_string( template_offset + row_index + 2, source_offset + column_index, row["target"][label], normal, ) sheet.autofit() # Shrinks target columns too small sheet.set_column(source_offset, source_offset + len(target_fields), 25) workbook.close() return filepath
[docs] def read_excel_template( filepath: Path, worksheet: str = "Matching", license_mapping: Optional[dict] = None, field_mapping: Optional[dict] = None, ) -> Datapackage: wb = openpyxl.load_workbook(filepath) ws = wb[worksheet] SECTIONS = { "Metadata", "Contributors", "Field mapping", "Data", } CONTRIBUTORS = { "Homepage": "path", "Name": "title", "Role": "roles", } METADATA_MAPPING = { "Name": "name", "Description": "description", "License": "licenses", "Version": "version", "Source ID": "source_id", "Target ID": "target_id", "Source mapping": "mapping_source", "Target mapping": "mapping_target", } current_section = None metadata = {} raw_data = defaultdict(list) for line in [[cell.value for cell in row] for row in ws.iter_rows(max_row=ws.max_row)]: if line[0] == "Randonneur template matching file": continue elif not any(line): continue elif line[0] in SECTIONS and current_section != "Data": current_section = line[0] else: raw_data[current_section].append(line) for section in SECTIONS: if section not in raw_data: raise ValueError(f"Missing required section {section}") for line in raw_data["Metadata"]: key, value = line[0], line[1] if key == "License": try: metadata["licenses"] = [ ( license_mapping[value] if (isinstance(license_mapping, dict) and value in license_mapping) else ALL_LICENSES[value] ) ] except KeyError as exc: raise KeyError(f"Can't find given license short name {value}") from exc elif value: try: metadata[METADATA_MAPPING[key]] = str(value) except KeyError as exc: raise KeyError(f"Can't understand metadata field {key}") from exc for line in raw_data["Field mapping"]: if isinstance(field_mapping, dict) and line[1] in field_mapping: mapping = field_mapping[line[1]] else: mapping = getattr(MappingConstants, line[1], None) if mapping is None: raise KeyError( f"Can't find mapping {line[1]} in built-in or custom field mapping dictionaries" ) try: metadata[METADATA_MAPPING[line[0]]] = mapping except KeyError as exc: raise KeyError(f"Can't understand field mapping field {line[0]}") from exc labels = [elem for elem in raw_data["Contributors"][0] if elem] metadata["contributors"] = [] for line in raw_data["Contributors"][1:]: try: metadata["contributors"].append( { CONTRIBUTORS[key]: ([value] if CONTRIBUTORS[key] == "roles" else value) for key, value in zip(labels, line) if value and key } ) except KeyError as exc: raise KeyError("Can't find given contributor field") from exc if "Source" not in raw_data["Data"][0]: raise ValueError("Can't find 'Source' in data headers") if "Target" not in raw_data["Data"][0]: raise ValueError("Can't find 'Target' in data headers") source_index, target_index = raw_data["Data"][0].index("Source"), raw_data["Data"][0].index( "Target" ) if source_index != 0: raise ValueError("'Source' must be the first column in data headers") labels = list( zip( ["source"] * target_index + ["target"] * (len(raw_data["Data"][0]) - target_index), raw_data["Data"][1], ) ) for source_target, label in labels: if label not in metadata[f"mapping_{source_target}"]["labels"]: given_labels = metadata[f"mapping_{source_target}"]["labels"] raise ValueError( f"Data label {label} not defined in {source_target} mapping:\n{given_labels}" ) if len(raw_data["Data"]) <= 2: raise ValueError("No data found") data = [] for line in raw_data["Data"][2:]: this = {"source": {}, "target": {}} for (source_target, column), value in zip(labels, line): this[source_target][column] = value data.append(this) rd = Datapackage(**metadata) rd.add_data("update", data) return rd