Source code for randonneur.edge_functions

import warnings
from copy import deepcopy
from typing import List

from randonneur.config import MigrationConfig
from randonneur.utils import EXCLUDED_ATTRS, FlexibleLookupDict, rescale_edge


[docs] class WarningSemaphore: def __init__(self):
[docs] self.missing_edges_label = False
[docs] warning_semaphore = WarningSemaphore()
[docs] def migrate_edges_create( node: dict, migration_fld: List[dict], config: MigrationConfig, ) -> dict: if config.edges_label not in node: if not warning_semaphore.missing_edges_label: warnings.warn( "Skipping edge creation in `{node}` as edges label `{config.edges_label}` missing" ) warning_semaphore.missing_edges_label = True else: for edge in migration_fld: new_edge = deepcopy(edge["target"]) if config.edge_filter and not config.edge_filter(new_edge): continue node[config.edges_label].append(new_edge) return node
[docs] def migrate_edges_delete( node: dict, migration_fld: FlexibleLookupDict, config: MigrationConfig, ) -> dict: if config.edges_label not in node: if not warning_semaphore.missing_edges_label: warnings.warn( "Skipping edge deletion in `{node}` as edges label `{config.edges_label}` missing" ) warning_semaphore.missing_edges_label = True else: edges_to_remove = set() for edge in node[config.edges_label]: if config.edge_filter and not config.edge_filter(edge): continue try: migration_fld[edge] edges_to_remove.add(id(edge)) except KeyError: continue if edges_to_remove: node[config.edges_label] = [ edge for edge in node[config.edges_label] if id(edge) not in edges_to_remove ] return node
[docs] def migrate_edges_disaggregate( node: dict, migration_fld: FlexibleLookupDict, config: MigrationConfig, ) -> dict: edges_to_add, edges_to_remove = [], set() for edge in node.get(config.edges_label, []): if config.edge_filter and not config.edge_filter(edge): continue try: for allocated in migration_fld[edge]["targets"]: new_edge = rescale_edge(deepcopy(edge), allocated["allocation"]) new_edge.update(allocated) if config.add_extra_attributes: new_edge.update( {k: v for k, v in migration_fld[edge].items() if k not in EXCLUDED_ATTRS} ) edges_to_add.append(new_edge) edges_to_remove.add(id(edge)) except KeyError: continue if edges_to_remove: node[config.edges_label] = [ edge for edge in node[config.edges_label] if id(edge) not in edges_to_remove ] + edges_to_add return node
[docs] def migrate_edges_replace( node: dict, migration_fld: FlexibleLookupDict, config: MigrationConfig, ) -> dict: for edge in node.get(config.edges_label, []): if config.edge_filter and not config.edge_filter(edge): continue try: migration = migration_fld[edge] if "conversion_factor" in migration["target"]: rescale_edge(edge, migration["target"]["conversion_factor"]) elif "conversion_factor" in migration: rescale_edge(edge, migration["conversion_factor"]) elif "allocation" in migration["target"]: rescale_edge(edge, migration["target"]["allocation"]) edge.update(migration["target"]) if config.add_extra_attributes: edge.update({k: v for k, v in migration.items() if k not in EXCLUDED_ATTRS}) except KeyError: continue return node
[docs] def migrate_edges_update( node: dict, migration_fld: FlexibleLookupDict, config: MigrationConfig, ) -> dict: """Difference is in intent of data developer, not in implementation.""" return migrate_edges_replace( node=node, migration_fld=migration_fld, config=config, )