from __future__ import annotations
import warnings
from collections import defaultdict
from typing import TYPE_CHECKING, NamedTuple
import numpy as np
import numpy.typing as npt
from topo_metrics.rings import node_list_to_ring
from topo_metrics.symbols import CARVS, VertexSymbol
from topo_metrics.utils import uniform_repr
if TYPE_CHECKING: # pragma: no cover
from topo_metrics.rings import Ring
from topo_metrics.topology import Topology
[docs]
class Cluster(NamedTuple):
""" The node ID of the central node in the cluster. """
""" The rings that form the cluster. """
def __repr__(self) -> str:
info = {
"node_id": self.central_node_id,
"n_rings": len(self.rings),
"CARVS": str(get_carvs_vector(self)),
}
return uniform_repr("Cluster", **info, indent_size=4)
############################### PROPERTIES ###############################
[docs]
def smallest_ring_size(cluster: Cluster) -> int:
"""
Get the smallest ring size in the ``cluster``.
"""
return min(len(ring) for ring in cluster.rings)
[docs]
def largest_ring_size(cluster: Cluster) -> int:
"""
Get the largest ring size in the ``cluster``.
"""
if not cluster.rings:
return 0
return max(len(ring) for ring in cluster.rings)
[docs]
def number_of_rings(cluster: Cluster) -> int:
"""
Get the number of rings in the ``cluster``.
"""
return len(cluster.rings)
[docs]
def get_unique_angles(cluster: Cluster) -> int:
"""
Get the number of angles in the ``cluster``.
"""
return list(set(ring.angle for ring in cluster.rings))
[docs]
def number_of_angles(cluster: Cluster) -> int:
"""
Get the number of angles in the ``cluster``.
"""
return len(get_unique_angles(cluster))
[docs]
def get_ring_sizes(cluster: Cluster) -> list:
"""
Get the ring sizes in the ``cluster``.
"""
return sorted([ring.size for ring in cluster.rings])
############################### HELPERS ###############################
[docs]
def shares_edge(angle1: tuple, angle2: tuple) -> bool:
"""
Checks if two angles share an edge.
Parameters
----------
angle1
The first angle.
angle2
The second angle.
Returns
-------
True if the angles share an edge, False otherwise.
"""
return not set(angle1).isdisjoint(angle2)
[docs]
def get_opposite_angles(angles: list) -> list:
"""
Finds the opposite angles in a cluster.
Parameters
----------
angles
A list of angles in the cluster.
Returns
-------
A list of opposite angle pairs.
"""
opposite_pairs = []
remaining_angles = angles.copy()
while remaining_angles:
angle1 = remaining_angles.pop(0)
for angle2 in remaining_angles:
if not shares_edge(angle1, angle2):
opposite_pairs.append((angle1, angle2))
remaining_angles.remove(angle2)
break
return opposite_pairs
[docs]
def only_smallest_ring_size(ring_sizes: list[int]) -> list[int]:
"""
Retains only the smallest ring size in this list. If there are multiple
smallest ring sizes, they are all retained.
Parameters
----------
ring_sizes
A list of ring sizes.
Returns
-------
A list of the smallest ring sizes.
"""
return [size for size in ring_sizes if size == min(ring_sizes)]
[docs]
def get_ring_size_by_angle(
cluster: Cluster,
all_rings: bool = False,
) -> dict[int, list[int]]:
"""
Get the ring sizes by angle in a cluster.
Parameters
----------
cluster
The cluster to get the ring sizes by angle of.
all_rings
Whether to consider all rings in the cluster or just the smallest ring
at each angle.
Returns
-------
A dictionary of ring sizes by angle.
"""
angle_to_rings = defaultdict(list)
for ring in cluster.rings:
angle_to_rings[ring.angle].append(ring.size)
sorted_ring_sizes = {
angle: sorted(sizes) for angle, sizes in angle_to_rings.items()
}
if not all_rings:
sorted_ring_sizes = {
angle: only_smallest_ring_size(sizes)
for angle, sizes in sorted_ring_sizes.items()
}
sorted_ring_sizes = dict(
sorted(
sorted_ring_sizes.items(),
key=lambda item: tuple(item[1]),
)
)
return sorted_ring_sizes
############################### CONVERSIONS ###############################
[docs]
def ring_list_to_cluster(
topology: Topology,
ring_list: list[list[tuple[int, npt.NDArray[np.int_]]]],
central_node_id: int,
) -> Cluster:
"""Convert a list of rings to a Cluster object.
Parameters
----------
topology
The topology object containing the nodes.
ring_list
A list of lists of node IDs and image shifts.
central_node_id
The node ID of the central node.
Returns
-------
A Cluster object with the rings in the correct positions.
"""
rings = [node_list_to_ring(topology, x, central_node_id) for x in ring_list]
return Cluster(central_node_id, rings)
[docs]
def get_clusters(
topology: Topology,
all_ring_list: list[list[list[tuple[int, npt.NDArray[np.int_]]]]],
):
"""Convert a list of lists of rings to a list of Cluster objects.
Parameters
----------
topology
The topology object containing the nodes.
all_ring_list
A list of lists of lists of node IDs and image shifts.
Returns
-------
A list of Cluster objects with the rings in the correct positions.
"""
return [
ring_list_to_cluster(topology, ring_list, central_node_id)
for central_node_id, ring_list in enumerate(all_ring_list, start=1)
]
############################### ANALYSIS ###############################
def _get_single_vertex_symbol(cluster: Cluster) -> VertexSymbol:
"""
Obtain the vertex symbol of a cluster, storing both the `all_rings=False`
and `all_rings=True` vectors.
For 4-connected nets (with six angles), opposite angles are paired. If ring
sizes coincide, the 'all_rings' vector is used to break ties.
Please note, while I have attempted to match the ordering scheme with
respect to ToposPro, the exact ordering may differ slightly in cases where
the minimum ring sizes are identical at different angles but the 'all_rings'
vector differs.
TODO: I derive the angles from the rings found about the central node. This
therefore will miss any angles that do not contain any rings, which needs
correcting to match the ToposPro implementation (and indeed, provide the
exact Vertex Symbol)!
Parameters
----------
cluster
The cluster from which to obtain the vertex symbol.
Returns
-------
The vertex symbol with both standard and all-rings representations.
"""
# Calculate ring sizes with all_rings set to False and True.
ring_sizes_by_angle = get_ring_size_by_angle(cluster, all_rings=False)
all_ring_sizes_by_angle = get_ring_size_by_angle(cluster, all_rings=True)
# Prepare the two vectors we wish to return.
standard_vector = []
all_rings_vector = []
# If there are six angles (4-connected nets), pair opposite angles.
if number_of_angles(cluster) == 6:
pairs_data = []
for angle_pair in get_opposite_angles(get_unique_angles(cluster)):
# Sort the pair by ring sizes for each angle.
pair_dict = {a: ring_sizes_by_angle[a] for a in angle_pair}
sorted_pairs = sorted(pair_dict.items(), key=lambda x: tuple(x[1]))
ring_sizes_tuple = tuple(x[1] for x in sorted_pairs)
angles_tuple = tuple(x[0] for x in sorted_pairs)
# If ring sizes are identical, sort by the 'all_rings' vector.
if len({tuple(s) for s in ring_sizes_tuple}) == 1:
all_pair_dict = {
a: all_ring_sizes_by_angle[a] for a in angle_pair
}
all_sorted_pairs = sorted(
all_pair_dict.items(), key=lambda x: tuple(x[1])
)
angles_tuple = tuple(x[0] for x in all_sorted_pairs)
pairs_data.append((ring_sizes_tuple, angles_tuple))
# Sort the collected data by ring sizes.
pairs_data.sort(
key=lambda data: tuple(i for sizes in data[0] for i in sizes)
)
# Extend our final vectors.
for ring_sizes_tuple, angles_tuple in pairs_data:
standard_vector.extend(ring_sizes_tuple)
for angle in angles_tuple:
all_rings_vector.append(all_ring_sizes_by_angle[angle])
else:
# If not a 4-connected net with six angles, just flatten the ring sizes.
for sizes in ring_sizes_by_angle.values():
standard_vector.append(sizes)
for angle in ring_sizes_by_angle:
all_rings_vector.append(all_ring_sizes_by_angle[angle])
return VertexSymbol(
vector=standard_vector,
vector_all_rings=all_rings_vector,
)
[docs]
def get_vertex_symbol(
clusters: Cluster | list[Cluster],
) -> VertexSymbol | list[VertexSymbol]:
"""
Obtain the vertex symbol(s) for one or more clusters, storing both the
`all_rings=False` and `all_rings=True` vectors. If a list of clusters is
provided, this function returns a list of vertex symbols in the same order.
Parameters
----------
clusters
Either a single cluster, or a list of cluster objects from which to
obtain vertex symbols.
Returns
-------
VertexSymbol or list of VertexSymbol
- If a single Cluster is passed, returns a single VertexSymbol.
- If a list of Cluster objects is passed, returns a list of VertexSymbols.
"""
if isinstance(clusters, list):
return [_get_single_vertex_symbol(cluster) for cluster in clusters]
else:
return _get_single_vertex_symbol(clusters)
[docs]
def get_carvs_std_dev(carvs_vectors: npt.NDArray[np.int_]) -> float:
"""Compute the standard deviation amongst the node environments.
Parameters
----------
carvs_vectors
The CARVS vectors for each cluster in the network.
Returns
-------
The standard deviation of the CARVS vectors.
"""
carvs_vectors = np.asarray(carvs_vectors, dtype=np.float64)
if carvs_vectors.ndim == 1:
return 0.0
# ``average node environment``.
carvs_mean = np.mean(carvs_vectors, axis=0)
# distance of each node from the average node environment.
carvs_dist_from_mean = np.linalg.norm(carvs_vectors - carvs_mean, axis=1)
# spread of node environments.
carvs_std_dev = np.sqrt(
np.square(carvs_dist_from_mean).sum() / carvs_dist_from_mean.shape[0]
)
return float(carvs_std_dev)
[docs]
def get_carvs_vector(
cluster: list[Cluster] | Cluster,
max_size: int | None = None,
return_per_cluster: bool = False,
unique: bool = False,
) -> CARVS | npt.NDArray[np.floating]:
"""
Get the Cummulative All-Rings Vertex Symbol (CARVS) vector.
If the input is a ``Cluster`` object, the CARVS is calculated for that
specific cluster. If the input is a list of ``Cluster`` objects, the CARVS
vector is calculated as an average over all ``clusters`` in the entire
network, by calling the ``get_carvs_vector`` method for each cluster.
Parameters
----------
cluster
The input topology object (either a ``Cluster`` or a list of ``Cluster``
objects).
max_size
The maximum ring size to consider. If not specified, the maximum ring
size in the network is used.
return_per_cluster
Whether to return the CARVS vector for each cluster in the network, or
to return the average CARVS vector over all clusters.
unique
If returning the CARVS vector for each cluster, whether to return only
unique CARVS vectors.
Returns
-------
The CARVS vector.
"""
if isinstance(cluster, Cluster):
size, counts = np.unique(get_ring_sizes(cluster), return_counts=True)
# guard against empty clusters.
if not len(size):
size = np.array([1])
counts = np.array([0])
max_size = max(size) if max_size is None else max_size
carvs = np.zeros(shape=(max_size,), dtype=np.int_)
carvs[size - 1] = counts
return CARVS(vector=carvs, spread=0.0, is_single_node=True)
elif isinstance(cluster, list):
found_max_size = max([largest_ring_size(c) for c in cluster])
if max_size is not None and found_max_size > max_size:
max_size = found_max_size
warnings.warn(
f"The maximum ring size in the network is {max_size}, "
f"which is greater than the specified maximum size. "
f"Setting the maximum size to {max_size}.",
UserWarning,
stacklevel=2,
)
if max_size is None:
max_size = found_max_size
carvs = np.zeros(shape=(len(cluster), max_size))
for i, c in enumerate(cluster):
carvs[i] = get_carvs_vector(c, max_size=max_size).vector
if return_per_cluster:
carvs = carvs.astype(np.int_)
if unique:
return np.unique(carvs, axis=0)
return carvs
else:
raise TypeError(
f"Expected a Cluster or list of Cluster objects, "
f"but received {type(cluster)}."
)
return CARVS(
vector=np.mean(carvs, axis=0),
spread=get_carvs_std_dev(carvs),
is_single_node=False,
)