Source code for cookbase.graph.cbrgraph

from typing import Any, Dict, Hashable, List

import networkx as nx
from cookbase.logging import logger
from cookbase.validation.globals import Definitions
from networkx.readwrite import json_graph


[docs]class CBRGraph: """A class that provides the structures and methods needed to build, manipulate and analyze :doc:`Cookbase Recipe Graphs (CBRGraphs) <cbrg>`. Typically, a :doc:`CBRGraph <cbrg>` is generated by extracting the data from a complete traversal of a :ref:`Cookbase Recipe (CBR) <cbr>`. It makes use of the `NetworkX <https://networkx.github.io/>`_ graph manipulation package. :ivar g: An instance of a :mod:`networkx` directed graph :vartype g: networkx.classes.digraph.DiGraph :ivar _appliances: A dictionary of appliance references included in the recipe :vartype _appliances: dict[str] :ivar _pending_processes_edges: A list of 2-tuples denoting the edges pending to be added :vartype _pending_processes_edges: list[tuple[str, str]] """ def __init__(self): """Constructor method.""" self.g = nx.DiGraph() self._appliances = {} self._pending_processes_edges = []
[docs] def add_ingredient(self, ingredient_ref: str, ingredient: Dict[str, Any]) -> None: """Adds an ingredient to the graph. :param str ingredient_ref: A :ref:`CBR Ingredient <cbr-ingredients>` reference :param ingredient: Dictionary representing a :ref:`CBR Ingredient <cbr-ingredients>` :type ingredient: dict[str, Any] """ self.g.add_node(ingredient_ref, type="cbi", cbiId=ingredient["cbiId"])
[docs] def add_appliance(self, appliance_ref: str, appliance: Dict[str, Any]) -> None: """Adds an appliance to the graph. :param str appliance_ref: A :ref:`CBR Appliance <cbr-appliances>` reference :param appliance: Dictionary representing a :ref:`CBR Appliance <cbr-appliances>` :type appliance: dict[str, Any] """ if "cbaId" in appliance: a = {"type": "cba", "cbaId": appliance["cbaId"]} else: a = {"type": "cba-virtual"} self._appliances[appliance_ref] = a
# self.g.add_node(appliance_ref, type='cba', cbaId=appliance['cbaId'])
[docs] def add_process(self, process_ref: str, process: Dict[str, Any]) -> None: """Adds a process and its in-edges to the graph. :param str process_ref: A :ref:`CBR Process <cbr-preparation>` reference :param process: Dictionary representing a :ref:`CBR Process <cbr-preparation>` :type process: dict[str, Any] """ # Building appliances dictionary a = {} for app in process["appliances"]: app_ref = app["appliance"] a[app_ref] = self._appliances[app_ref] a[app_ref]["usedAfter"] = app["usedAfter"] self.g.add_node(process_ref, type="cbp", cbpId=process["cbpId"], appliances=a) # Adding foodstuff edges def add_foodstuff_edge(foodstuff_ref, process_ref): if foodstuff_ref in self.get_ingredients() or ( foodstuff_ref in self.get_processes() ): self.g.add_edge(foodstuff_ref, process_ref) else: self._pending_processes_edges.append((foodstuff_ref, process_ref)) for fk in Definitions.foodstuff_keywords: if fk in process.keys(): if isinstance(process[fk], str): add_foodstuff_edge(process[fk], process_ref) else: for i in process[fk]: add_foodstuff_edge(i, process_ref)
[docs] def resolve_pending_processes_edges(self) -> None: """Attempts to add edges that could not have been added to the graph before.""" not_found = [] for i in range(len(self._pending_processes_edges)): in_process, out_process = self._pending_processes_edges[i] if in_process in self.get_processes(): self.g.add_edge(in_process, out_process) else: not_found.append(self._pending_processes_edges[i]) self._pending_processes_edges = not_found
[docs] def clear(self) -> None: """Clears the graph and internal structures.""" self.g.clear() self._appliances.clear() self._pending_processes_edges.clear()
[docs] def build_graph(self, data: Dict[str, Any]) -> None: """Adds a process and its in-edges to the graph. :param data: A dictionary containing all the data from a CBR :type data: dict[str, Any] """ self.clear() self.g.graph["name"] = data["info"]["name"] for k, v in data["ingredients"].items(): self.add_ingredient(k, v) for k, v in data["appliances"].items(): self.add_appliance(k, v) for k, v in data["preparation"].items(): self.add_process(k, v) self.resolve_pending_processes_edges() for in_foodstuff, out_process in self._pending_processes_edges: self.g.add_node(in_foodstuff, type="unref_foodstuff") self.g.add_edge(in_foodstuff, out_process) logger.error( "Neither ingredient nor process found with reference " f"'{in_foodstuff}'" )
[docs] def aggregated_appliances_graph(self) -> nx.DiGraph: """Returns a graph where each node represents a concurrent preparation path of a :doc:`CBRGraph <cbrg>`, containing an inverted index on the appliances used in that path together with the list of processes that used it. :return: An aggregated appliances graph :rtype: networkx.classes.digraph.DiGraph """ # TODO: This operation is implemented single-threaded. Consider # multi-threading. roots = self.get_root_processes() pj_processes = self.path_joining_processes() leaf_processes = self.get_leaf_processes() aggregated_graph = nx.DiGraph() # A dictionary with key the ag_id, and value the starting process of a # path current_ag_leaves = {} # A map of the path-starting process reference into the aggregated path # id first_pg_to_ag = {} for i in range(len(roots)): aggregated_graph.add_node(i) current_ag_leaves[i] = roots[i] first_pg_to_ag[roots[i]] = i ag_id_counter = len(current_ag_leaves) while current_ag_leaves: ag_nodes_already_generated = set() pg_nodes_to_generate = set() for ag_id, p in current_ag_leaves.items(): appliances = {} s = p while True: # Build inverted index on appliance for given process path for app_ref in self.g.nodes[s]["appliances"].keys(): if app_ref not in appliances: appliances[app_ref] = [s] else: appliances[app_ref].append(s) if s in pj_processes or s in leaf_processes: break # Not path-joining nor leaf: check if path continues t = next(self.g.successors(s)) if t in pj_processes: break else: s = t # Iterate over path aggregated_graph.add_node(ag_id, appliances=appliances) ag_nodes_already_generated.add(ag_id) for v in self.g.successors(s): pg_nodes_to_generate.add((ag_id, v)) for n in ag_nodes_already_generated: del current_ag_leaves[n] for ag_id, v in pg_nodes_to_generate: if v not in first_pg_to_ag: aggregated_graph.add_edge(ag_id, ag_id_counter) current_ag_leaves[ag_id_counter] = v first_pg_to_ag[v] = ag_id_counter ag_id_counter += 1 else: aggregated_graph.add_edge(ag_id, first_pg_to_ag[v]) return aggregated_graph
[docs] def processes_subgraph_view(self) -> nx.DiGraph: """Returns the subgraph view of the :doc:`CBRGraph <cbrg>` including only its processes. :return: The processes' subgraph view from the :doc:`CBRGraph <cbrg>` :rtype: networkx.classes.digraph.DiGraph """ def filter_process(node): return self.g.nodes[node]["type"] == "cbp" return nx.subgraph_view(self.g, filter_node=filter_process)
[docs] def get_ingredients(self) -> List[Hashable]: """Returns the list of nodes representing ingredients in the :doc:`CBRGraph <cbrg>`. :return: The list of ingredient nodes in the :doc:`CBRGraph <cbrg>` :rtype: list[Hashable] """ return [i for i, _ in self.g.nodes(data="type") if _ == "cbi"]
[docs] def get_processes(self) -> List[Hashable]: """Returns the list of nodes representing processes in the :doc:`CBRGraph <cbrg>`. :return: The list of process nodes in the :doc:`CBRGraph <cbrg>` :rtype: list[Hashable] """ return [i for i, _ in self.g.nodes(data="type") if _ == "cbp"]
[docs] def get_root_processes(self) -> List[Hashable]: """Returns the list of root nodes from the processes' subgraph. :return: The list of process root nodes from the processes' subgraph :rtype: list[Hashable] """ return [i for i, _ in self.processes_subgraph_view().in_degree() if _ == 0]
[docs] def path_joining_processes(self) -> List[Hashable]: """Returns the list of process nodes that represent a junction point of two or more preparation paths. :return: The list of merging process nodes from the processes' subgraph :rtype: list[Hashable] """ pjp = [] psw = self.processes_subgraph_view() for i in list(psw.nodes): if psw.in_degree(i) > 1 or psw.out_degree(i) > 1: pjp.append(i) return pjp
[docs] def get_leaf_processes(self) -> List[Hashable]: """Returns the list of leaf nodes from the processes' subgraph. :return: The list of process leaf nodes from the processes' subgraph. :rtype: list[Hashable] """ return [i for i, _ in self.processes_subgraph_view().out_degree() if _ == 0]
[docs] def get_serializable_graph(self) -> Dict[str, Any]: """Returns the :doc:`CBRGraph <cbrg>` data in a JSON-serializable format. :return: A dict with the :doc:`CBRGraph <cbrg>` data :rtype: dict[str, Any] """ return json_graph.node_link_data(self.g)