There is no class for rooted trees with Strahler stream orders. So I implemented one according to the algorithm described in the Wikipedia.
The same numbers may also be generated via a pruning process in which the tree is simplified in a sequence of stages, where in each stage one removes all leaf nodes and all of the paths of degree-one nodes leading to leaves: the Strahler number of a node is the stage at which it would be removed by this process, and the Strahler number of a tree is the number of stages required to remove all of its nodes. [1]
import networkx as nx
import pandas as pd
from pandas.core.frame import DataFrame
class StrahlerLevel(DataFrame):
"""Class for one level in a Strahler rooted tree."""
def __init__(self, dat: DataFrame):
"""Init a data frame for nodes in the lowest level.
Args:
dat (DataFrame): types of nodes in different tributaries.
"""
index = pd.MultiIndex.from_tuples(dat.keys(), names=_index_names[1:3])
super().__init__(
dat.values(),
columns=['type'],
index=index
)
@classmethod
def parse(cls, rt: nx.DiGraph):
"""Collect nodes in the lost level of a rooted tree.
Args:
rt (nx.DiGraph): a rooted (directed) mathematical tree.
Returns:
StrahlerLevel: with all nodes in the lost level.
"""
predecessors = nx.dfs_predecessors(rt)
def parse_tributary(leaf: str, idx: int) -> Dict[Tuple[str, int], str]:
"""Collect nodes in a tributary specified by the leaf.
Args:
leaf (str): leaf node of a tributary.
idx (int): index of the tributary in the current level.
Returns:
Dict[Tuple[str, int], str]: containing the leaf, fork
and all middle nodes in a tributary.
"""
dat_tributary = {}
dat_tributary[(leaf, idx)] = _types['leaf']
pre = predecessors[leaf]
if_continue = True
while if_continue:
if rt.out_degree(pre) == 1:
dat_tributary[(pre, idx)] = _types['middle']
elif rt.out_degree(pre) > 1:
dat_tributary[(pre, idx)] = _types['fork']
if_continue = False
# Focus on the next predecessor.
try:
pre = predecessors[pre]
except KeyError: # When 'pre' reaches the root.
dat_tributary[(pre, idx)] = _types['fork']
if_continue = False
return dat_tributary
leaves = [bus for bus, d in rt.out_degree() if d == 0]
dat = {}
idx = 1
for leaf in leaves:
dat.update(parse_tributary(leaf, idx))
idx += 1
return cls(dat)
@property
def nodes_to_remove(self) -> List[str]:
"""Collect nodes in the current level excluding forking nodes.
Returns:
List[str]: nodes in the current Strahler level.
"""
to_remove = self.loc[self['type'] >= 0]
return to_remove.index.unique('node')
class StrahlerRootedTree(nx.DiGraph):
"""Class for rooted trees with Strahler ordering system.
Attributes:
df_strahler (DataFrame): all nodes with Strahler ordering info.
"""
def __init__(self, rt: nx.DiGraph):
"""Init a rooted tree with Strahler ordering system.
Args:
rt (nx.DiGraph): initiated rooted tree.
"""
super().__init__(rt)
self._launch()
def _set_level(self, nodes: List[str], level: int):
"""Assign Strahler numbers to all nodes and edges in a level.
Args:
nodes (List[str]): names of nodes to be assigned a number.
level (int): which Strahler number to be assigned.
"""
for n in nodes:
self.nodes[n]['level'] = level
the_edge = list(self.in_edges(n))[0]
self.edges[the_edge]['level'] = level
def _launch(self):
"""Analyse the rooted tree and store Strahler numbers.
Raises:
Exception: when the analysis is unable to terminate withing
reasonable number of iterations.
"""
rt = nx.DiGraph(self) # Create a mutable directed graph.
sls = {}
level = 0
while len(rt.nodes) > 1 and level <= 10:
level += 1
sls[level] = StrahlerLevel.parse(rt)
rt.remove_nodes_from(sls[level].nodes_to_remove)
self._set_level(
nodes=sls[level].nodes_to_remove, level=level
)
if level == 100:
raise Exception("The iteration is interrupted by the counter!")
self.df_strahler = pd.concat(
sls.values(), keys=sls.keys(), names=_index_names
)
self.nodes[self.root]['level'] = level
- https://en.wikipedia.org/wiki/Strahler_number