Tutorial 3: Metrics Computation and Analysis

This tutorial demonstrates how to compute and analyze metrics for evolving communities and temporal networks using the dyn-benchmark package. Throughout this tutorial, we’ll explore:

  1. Static Network and Community Metrics: Snapshot graph structure, community partitioning, static community properties and community graph structure.

  2. Evolving Community and Temporal Metrics: Community lifetimes, snapshot properties, member behavior, and transition patterns

  3. Customizing Metrics: Creating your own metrics, combining metrics, and selecting specific metrics to compute

  4. Comparing Algorithms: Using metrics to evaluate and compare community detection algorithms against ground truth

Prerequisites

First, we need a benchmark to analyze. Let’s create one using the default parameters:

# Install the package if you haven't already
# pip install dyn-benchmark

# Import required modules
from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator
import dyn.benchmark.metrics

# Create a generator with default parameters
generator = GroundtruthGenerator(seed=42)

# Generate the benchmark
groundtruth = generator.generate()

print(f"Generated benchmark with {len(groundtruth.graphs)} snapshots")
print(f"Total static communities: {len(set(row.static_community_id for row in groundtruth.tcommlist))}")
print(f"Total events: {len(groundtruth.events)}")

1. Introduction to Metrics Computation

This tutorial explores the comprehensive metrics framework provided by the dyn-benchmark package for analyzing evolving communities and temporal networks. The package organizes metrics into several categories, each focusing on different aspects of network structure and community evolution.

To understand the metrics system, let’s start by learning the basic workflow for computing and accessing metrics using GroundtruthMetricsComputer:

# Create a metrics computer with default metrics
metrics_computer = GroundtruthMetricsComputer()

# Compute all metrics for a groundtruth
metrics = metrics_computer.compute(groundtruth)

# Access metrics by category (e.g., "graph", "static_community", etc.)
graph_metrics = metrics["graph"]

# You can easily convert to pandas DataFrame
import pandas as pd
graph_df = pd.DataFrame(graph_metrics)

Refer to Metrics Description page for a complete list of metrics.

Complete Example

Here’s a complete example for generating a benchmark and computing its metrics:

from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator
from dyn.benchmark.metrics import GroundtruthMetricsComputer
import pandas as pd
import os

# Create directory for results
os.makedirs("metrics_results", exist_ok=True)

# Generate a benchmark
generator = GroundtruthGenerator(seed=42)
groundtruth = generator.generate()
print(f"Generated benchmark with {len(groundtruth.graphs)} snapshots")

# Compute all metrics
metrics_computer = GroundtruthMetricsComputer()
metrics = metrics_computer.compute(groundtruth)

# Export metrics to CSV
for key, values in metrics.items():
    if values:
        df = pd.DataFrame(values)
        csv_filename = f"metrics_results/{key}_metrics.csv"
        df.to_csv(csv_filename, index=False)
        print(f"Exported {len(df)} {key} metrics to {csv_filename}")

# Summary of exported metrics
print("\nMetrics summary:")
for key, values in metrics.items():
    if values:
        print(f"- {key}: {len(values)} entries")

Selecting Specific Metrics

In many cases, you may want to compute only specific metrics, especially for large networks where some metrics can be computationally expensive. The GroundtruthMetricsComputer allows you to specify which metrics to include or exclude:

# Import required metrics
from dyn.benchmark.metrics import (
    number_of_nodes, number_of_edges, diameter,
    modularity, coverage,
    static_community_size, static_community_turnover_ratio
)

# Create a metrics computer with specific metrics
selected_metrics_computer = GroundtruthMetricsComputer(
    # Only compute these specific graph metrics
    graph=[number_of_nodes, number_of_edges],

    # Only compute these partition metrics
    graph_partition=[modularity, coverage],

    # Only compute these community metrics
    static_community=[static_community_size, static_community_turnover_ratio],

    # No metrics for other categories
    static_community_graph=None,
    evolving_community=None,
    snapshot=None,
    member=None,
    member_snapshot=None,
    flow=None
)

# Compute only the selected metrics
selected_metrics = selected_metrics_computer.compute(groundtruth)

# Check what we computed
for key, values in selected_metrics.items():
    if values:  # Skip empty groups
        print(f"- {key}: {len(values)} entries with fields {list(values[0].keys())}")

For example, some metrics like diameter and average_shortest_path_length can be particularly expensive to compute on large networks, as they require calculating all-pairs shortest paths with O(n³) complexity. Excluding them can significantly improve computation time:

from dyn.benchmark.metrics import graph_heavy_metrics

# Create metrics computer excluding heavy metrics
light_metrics_computer = GroundtruthMetricsComputer(
    graph=[m for m in graph_handler.funcs if m not in graph_heavy_metrics]
)

# Now we can compute metrics even for large networks
light_metrics = light_metrics_computer.compute(large_groundtruth)

Creating Custom Metrics with Handlers

The dyn-benchmark package provides flexible ways to customize your metrics computation by defining new metrics or modifying existing ones.

The package uses a handler system with decorators to organize metrics into categories. Each category has its own handler (e.g., graph_handler, static_community_handler, etc.). You can create custom metrics by decorating functions with the appropriate handler:

from dyn.benchmark.metrics import graph_handler, static_community_handler
import numpy as np

# Define a custom graph metric using the graph_handler decorator
@graph_handler
def density(graph):
    """Return the graph density (fraction of possible edges that exist)"""
    n = len(graph.nodes)
    max_edges = n * (n - 1) / 2  # Maximum possible edges in undirected graph
    if max_edges == 0:  # Handle single-node graphs
        return 0
    return len(graph.edges) / max_edges

# Define a custom community metric using the static_community_handler decorator
@static_community_handler
def perimeter_ratio(flow_graph, static_community):
    """Ratio of external connections to community size"""
    size = flow_graph.node_size(static_community)
    if size == 0:
        return 0

    # Count edges to other communities
    external = 0
    for target in flow_graph.successors(static_community):
        if flow_graph.node_community(target) != flow_graph.node_community(static_community):
            external += 1

    return external / size

# Create a metrics computer (our custom metrics are automatically included)
custom_metrics_computer = GroundtruthMetricsComputer()

# Compute metrics with our new functions included
custom_metrics = custom_metrics_computer.compute(groundtruth)

# Extract and display our custom metrics
graph_metrics_df = pd.DataFrame(custom_metrics["graph"])
print(graph_metrics_df[["snapshot", "density"]].head())

static_community_df = pd.DataFrame(custom_metrics["static_community"])
print(static_community_df[["snapshot", "static_community", "perimeter_ratio"]].head())

The decorator system works by registering your function with the appropriate handler. The handler determines:

  1. What arguments your function will receive (e.g., a graph for graph_handler, a flow graph and static community for static_community_handler)

  2. When your function will be called during metrics computation

  3. How your function’s results will be stored in the metrics output

When you decorate a function with a handler, it’s automatically included in that handler’s collection of metrics. When you create a new GroundtruthMetricsComputer, it pulls in all metrics from each handler.

2. Computing Events Describing the Evolution of Communities

Community detection in temporal networks requires not only identifying communities at each snapshot but also understanding how these communities evolve over time.

Basic Event Detection

To detect events in community evolution, this package provide an implementation of the ICEM (Iterative Community Evolution Method) algorithm introduced by Kadkhoda Mohammadmosaferi et al. [KMN20]:

from dyn.events.icem_calculator import ICEMCalculator
from dyn.core.communities import Membership

# First, we need a benchmark with communities
groundtruth = generator.generate()

# Create membership object from tcommlist
membership = Membership.from_tcommlist(groundtruth.tcommlist)

# Create ICEM calculator with default parameters
# alpha: threshold for surviving communities
# beta: threshold for partial survival
icem = ICEMCalculator(alpha=0, beta=0.5)

# Calculate events
events = icem.calculate_events(membership.community_graph)

# Display number of events by type
from collections import Counter
event_counts = Counter(event.label for event in events)
print("Community events detected:")
for event_type, count in event_counts.items():
    print(f"- {event_type}: {count}")

Types of Community Evolution Events

The ICEM algorithm identifies the following types of events that describe community evolution:

  • Form: A new community appears in the network

  • Dissolve: A community disappears from the network

  • Continue: A community continues without significant change

  • Grow: A community increases in size without structural changes

  • Shrink: A community decreases in size without structural changes

  • Partial survive and grow: Some members of a community survive and are joined by new members

  • Divide: A community splits into multiple separate communities

  • Divide and grow: A community splits and one part grows significantly

  • Merge: Multiple communities merge into a single community

  • Merge and grow: Multiple communities merge and the resulting community grows further

  • Partial merge: Part of a community merges with another community

  • Partial merge and grow: Part of a community merges with another and grows

These events represent the fundamental patterns of evolution in community structures over time. The ICEM algorithm identifies these events by tracking how members flow between communities across consecutive snapshots and applying thresholds to determine significant changes.

3. Comparing Algorithms with Ground Truth Metrics

One powerful application of metrics is to evaluate how well community detection algorithms recover ground truth communities. This comparison involves three main steps:

  1. Generating ground truth benchmarks (as covered in Tutorial 1: Benchmark Generation)

  2. Applying community detection algorithms to each snapshot

  3. Evaluating the detected communities against the ground truth

Generating Ground Truth Benchmarks

As covered in Tutorial 1, we can generate benchmarks with known ground truth communities:

from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator

# Generate ground truth with specific parameters
generator = GroundtruthGenerator(seed=42)
groundtruth = generator.generate()

print(f"Generated benchmark with {len(groundtruth.graphs)} snapshots")
print(f"Total static communities: {len(set(row.static_community_id for row in groundtruth.tcommlist))}")

Applying Community Detection Algorithms

For this example, we’ll use a naive approach that applies community detection algorithms to each snapshot independently, without considering the temporal dimension. This serves as a baseline for comparison with more sophisticated temporal community detection methods.

The detection algorithms need to output their results in a format compatible with our evaluation functions. Specifically, we need to convert the detected communities into a Tcommlist structure, which contains TcommlistRow representing node-community assignments at each snapshot.

from dyn.core.communities import Tcommlist, TcommlistRow
import networkx.algorithms.community as nx_comm

# Function to detect communities using different algorithms
def detect_communities(graph, algorithm="louvain"):
    """Detect communities using specified algorithm"""
    if algorithm == "louvain":
        return list(nx_comm.louvain_communities(graph))
    elif algorithm == "label_propagation":
        return list(nx_comm.label_propagation_communities(graph))
    elif algorithm == "greedy_modularity":
        return list(nx_comm.greedy_modularity_communities(graph))
    else:
        raise ValueError(f"Unknown algorithm: {algorithm}")

# Apply community detection algorithms to each snapshot
algorithm = "louvain"  # We'll use Louvain algorithm in this example
detected_rows = []

for t, graph in groundtruth.graphs.items():
    # Detect communities in this snapshot
    communities = detect_communities(graph, algorithm=algorithm)

    # Convert to TcommlistRow format
    for comm_id, comm_nodes in enumerate(communities):
        for node in comm_nodes:
            # Create a row for this node-community assignment
            detected_rows.append(TcommlistRow(
                node_id=node,
                evolving_community_id=f"{t}_{comm_id}",
                snapshot=t
            ))

# Create Tcommlist from detected communities
detected_tcommlist = Tcommlist(detected_rows)

print(f"Detected {len(set(row.static_community_id for row in detected_rows))} static communities")

Evaluating Detection Accuracy

Once we have both ground truth and detected communities in compatible formats, we can evaluate how well the algorithm performed using two key aspects:

  1. Partition Metrics: How well the algorithm recovers community memberships at each snapshot

  2. Transition Metrics: How well the algorithm captures community evolution over time

The assess_partition_metrics and assess_transition_metrics functions from the dyn.benchmark.assess module provide comprehensive evaluation metrics for both aspects. Refer to Metrics Description page for a complete list of comparison metrics.

from dyn.benchmark.assess import assess_partition_metrics, assess_transition_metrics
import pandas as pd

# Evaluate partition accuracy at each snapshot
partition_metrics = assess_partition_metrics(groundtruth.tcommlist, detected_tcommlist)

# Convert to DataFrame for analysis
partition_df = pd.DataFrame([
    {"snapshot": snapshot, **metrics}
    for snapshot, metrics in partition_metrics.items()
])

# Display key partition metrics
print("\nPartition accuracy at each snapshot:")
print(partition_df[["snapshot", "adjusted_rand", "normalized_mutual_information"]].head())

# Evaluate transition accuracy for different time windows
transition_metrics = assess_transition_metrics(
    groundtruth.tcommlist, detected_tcommlist, dt_min=1, dt_max=3
)

# Convert to DataFrame for analysis
transition_df = pd.DataFrame([
    {"t1": t1, "t2": t2, "dt": t2-t1, **metrics}
    for (t1, t2), metrics in transition_metrics.items()
])

# Display key transition metrics
print("\nTransition accuracy for different time windows:")
print(transition_df[["t1", "t2", "dt", "normalized_mutual_information"]].head())

4. Complete Example

Here’s a complete example that demonstrates how to compare different community detection algorithms against ground truth:

from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator
from dyn.benchmark.assess import assess_partition_metrics, assess_transition_metrics
from dyn.core.communities import Tcommlist, TcommlistRow
import networkx as nx
import networkx.algorithms.community as nx_comm
import pandas as pd

# Generate ground truth
generator = GroundtruthGenerator(seed=42)
groundtruth = generator.generate()
print(f"Generated benchmark with {len(groundtruth.graphs)} snapshots")

# Define community detection algorithms to test
algorithms = ["louvain", "label_propagation", "greedy_modularity"]

# Function to detect communities
def detect_communities(graph, algorithm="louvain"):
    """Detect communities using specified algorithm"""
    if algorithm == "louvain":
        return list(nx_comm.louvain_communities(graph))
    elif algorithm == "label_propagation":
        return list(nx_comm.label_propagation_communities(graph))
    elif algorithm == "greedy_modularity":
        return list(nx_comm.greedy_modularity_communities(graph))
    else:
        raise ValueError(f"Unknown algorithm: {algorithm}")

# Run community detection algorithms
algorithm_results = {}

for algo in algorithms:
    print(f"Running {algo} algorithm...")
    rows = []

    for t, graph in groundtruth.graphs.items():
        communities = detect_communities(graph, algorithm=algo)

        for comm_id, comm_nodes in enumerate(communities):
            for node in comm_nodes:
                rows.append(TcommlistRow(
                    node_id=node,
                    evolving_community_id=f"{t}_{comm_id}",
                    snapshot=t
                ))

    algorithm_results[algo] = Tcommlist(rows)

# Evaluate partition metrics
partition_results = {}
for algo, tcommlist in algorithm_results.items():
    partition_results[algo] = assess_partition_metrics(groundtruth.tcommlist, tcommlist)

# Convert to DataFrame
partition_df = pd.DataFrame([
    {"algorithm": algo, "snapshot": snapshot, **metrics}
    for algo, snapshots in partition_results.items()
    for snapshot, metrics in snapshots.items()
])

# Display average partition metrics by algorithm
print("\nAverage partition metrics by algorithm:")
avg_partition = partition_df.groupby("algorithm")[
    ["adjusted_rand", "normalized_mutual_information", "fowlkes_mallows"]
].mean()
print(avg_partition)

# Evaluate transition metrics
transition_results = {}
for algo, tcommlist in algorithm_results.items():
    transition_results[algo] = assess_transition_metrics(
        groundtruth.tcommlist, tcommlist, dt_min=1, dt_max=2
    )

# Convert to DataFrame
transition_df = pd.DataFrame([
    {"algorithm": algo, "t1": t1, "t2": t2, "dt": t2-t1, **metrics}
    for algo, transitions in transition_results.items()
    for (t1, t2), metrics in transitions.items()
])

# Display average transition metrics by algorithm and time window
print("\nAverage transition metrics by algorithm and time window:")
avg_transition = transition_df.groupby(["algorithm", "dt"])[
    ["normalized_mutual_information"]
].mean().unstack()
print(avg_transition)

print("\nAlgorithm comparison complete.")

References

[KMN20]

Kaveh Kadkhoda Mohammadmosaferi and Hassan Naderi. Evolution of communities in dynamic social networks: an efficient map-based approach. Expert Systems with Applications, 147:113221, 2020. doi:https://doi.org/10.1016/j.eswa.2020.113221.