Tutorial 2: Scalable Ground truths Generation

This tutorial demonstrates how to efficiently generate multiple benchmark datasets with parameter variations, manage parallel execution, and monitor progress during generation.

Prerequisites

# Standard library imports
import concurrent.futures
import itertools
import os
import time
from pathlib import Path

# Third-party imports
import numpy as np

# Package imports
from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator, ProgressiveGroundtruthGenerator
from dyn.benchmark.generator.communities_generator import CommunitiesGenerator, RelativeOverlap
from dyn.benchmark.generator.edges_generator import SBM, FastBPAM
from dyn.benchmark.generator.nodes_generator import RandomMemberGenerator

1. Creating Parameterized Generators

Creating multiple benchmarks involves defining a parameter space and instantiating generators for each parameter combination. This approach allows systematic exploration of how different parameters affect community evolution.

Basic parameter setup

# Define parameter variations
p_in_values = [0.5, 0.7, 0.9]
p_out_values = [0.05, 0.1, 0.15]
core_nodes_ratios = [0.3, 0.5, 0.7]

# Create generators for each parameter combination
generators = []

for p_in in p_in_values:
    for p_out in p_out_values:
        for core_ratio in core_nodes_ratios:
            # Create a unique name for this configuration
            config_name = f"benchmark_pin{p_in}_pout{p_out}_core{core_ratio}"

            # Create generator with these parameters
            generator = GroundtruthGenerator(
                community_generator=CommunitiesGenerator(
                    community_count=10,
                    snapshot_count=8,
                    community_size_min=5,
                    core_nodes_ratio=core_ratio,
                    seed=42  # Same seed for reproducibility
                ),
                edge_generator=SBM(
                    p_in=p_in,
                    p_out=p_out,
                    seed=42
                ),
                seed=42
            )

            # Store the generator with its configuration name
            generators.append((config_name, generator))

print(f"Created {len(generators)} generator configurations")

Each generator is an instance of GroundtruthGenerator configured with specific parameters.

Ensuring Reproducibility with Seed Sequences

When generating multiple benchmarks, ensuring reproducibility requires careful seed management. The GroundtruthGenerator class uses NumPy’s SeedSequence class to spawn child seeds that produce statistically independent random streams.

# Create reproducible variants with different seeds
def create_reproducible_variants(base_generator, num_variants=5):
    """Create multiple variant generators with independent random streams.

    Args:
        base_generator: The base generator to copy
        num_variants: Number of variant generators to create

    Returns:
        List of (name, generator) tuples with different seeds
    """
    variants = []

    # Create master seed sequence
    master_seed = np.random.SeedSequence(42)

    # Spawn child seeds
    # This creates statistically independent sequences
    child_seeds = master_seed.spawn(num_variants)

    for i, seed in enumerate(child_seeds):
        # Copy the base generator (deep copy)
        variant = base_generator.copy()

        # Configure with new seed
        # This internally propagates the seed to all component generators
        variant.configure_seed(seed)

        variants.append((f"variant_{i}", variant))

    return variants

# Example usage:
base_generator = GroundtruthGenerator(seed=42)
variant_generators = create_reproducible_variants(base_generator, num_variants=5)

print(f"Created {len(variant_generators)} reproducible variants")

The spawn method of SeedSequence creates new sequences that are statistically independent from the parent sequence. This ensures that each generator creates different random outcomes while maintaining reproducibility across runs.

Parameter Grid Exploration

For systematic exploration of parameter spaces, you can use parameter grids to generate all combinations:

def create_parameter_grid(param_grid):
    """Create all combinations from a parameter grid dictionary.

    Args:
        param_grid: Dictionary of parameter names and their possible values

    Returns:
        List of parameter dictionaries, one for each combination
    """
    # Get all parameter names and their possible values
    param_names = list(param_grid.keys())
    param_values = list(param_grid.values())

    # Generate all combinations
    combinations = list(itertools.product(*param_values))

    # Convert to list of parameter dictionaries
    result = []
    for combo in combinations:
        params = {name: value for name, value in zip(param_names, combo)}
        result.append(params)

    return result

# Example of a parameter grid
param_grid = {
    'community_count': [5, 10, 15],
    'core_nodes_ratio': [0.3, 0.5, 0.7],
    'p_in': [0.6, 0.8],
    'p_out': [0.05, 0.1]
}

# Generate all parameter combinations
parameter_combinations = create_parameter_grid(param_grid)
print(f"Created {len(parameter_combinations)} parameter combinations")

# Create generators for each parameter combination with reproducible seeds
grid_generators = []

# Create master seed sequence
master_seed = np.random.SeedSequence(42)

# Spawn child seeds, one for each combination
child_seeds = master_seed.spawn(len(parameter_combinations))

for i, (params, seed) in enumerate(zip(parameter_combinations, child_seeds)):
    config_name = f"grid_search_{i}"

    generator = GroundtruthGenerator(
        community_generator=CommunitiesGenerator(
            community_count=params['community_count'],
            snapshot_count=8,
            community_size_min=5,
            core_nodes_ratio=params['core_nodes_ratio'],
            seed=seed  # Use spawned seed
        ),
        edge_generator=SBM(
            p_in=params['p_in'],
            p_out=params['p_out'],
            seed=seed  # Use same spawned seed
        ),
        seed=seed  # Use same spawned seed for consistency
    )

    grid_generators.append((config_name, generator))

The itertools.product() function efficiently creates all combinations of parameter values, which are then used to instantiate the generators. Each generator receives a unique spawned seed to ensure both reproducibility and uniqueness.

2. Parallel Generation with Progress Monitoring

For large-scale benchmark generation, parallel processing significantly reduces computation time:

Generation and Saving Function

def generate_and_save(config_name, generator, output_dir):
    """Generate a benchmark and save it to disk.

    Args:
        config_name: Name of the configuration
        generator: GroundtruthGenerator instance
        output_dir: Directory to save the benchmark

    Returns:
        Tuple of (config_name, elapsed_time)
    """
    start_time = time.time()

    # Create output directory
    benchmark_dir = Path(output_dir) / config_name
    benchmark_dir.mkdir(parents=True, exist_ok=True)

    # Generate groundtruth
    groundtruth = generator.generate()

    # Save benchmark components (you might want to use pickle or other formats)
    # Save tcommlist
    tcommlist_file = benchmark_dir / "tcommlist.csv"
    with open(tcommlist_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['node_id', 'evolving_community', 'snapshot', 'static_community_id'])
        for row in groundtruth.tcommlist:
            writer.writerow([row.node_id, row.evolving_community_id, row.snapshot, row.static_community_id])

    # Save graphs (as edgelists)
    graphs_dir = benchmark_dir / "graphs"
    graphs_dir.mkdir(exist_ok=True)
    for t, graph in groundtruth.graphs.items():
        nx.write_edgelist(graph, graphs_dir / f"graph_{t}.edgelist")

    # Save metadata about the generation
    with open(benchmark_dir / "metadata.txt", "w") as f:
        f.write(f"Generation time: {time.time() - start_time:.2f} seconds\n")
        f.write(f"Snapshots: {len(groundtruth.graphs)}\n")
        f.write(f"Events: {len(groundtruth.events)}\n")

    return config_name, time.time() - start_time

Parallel Generation Execution

# Define output directory
output_dir = Path("benchmarks")
output_dir.mkdir(exist_ok=True)

# Function to generate all benchmarks in parallel
def generate_all_benchmarks(generators, output_dir, max_workers=None):
    """Generate all benchmarks in parallel.

    Args:
        generators: List of (name, generator) tuples
        output_dir: Directory to save benchmarks
        max_workers: Maximum number of workers (defaults to CPU count)

    Returns:
        List of (name, elapsed_time) tuples
    """
    results = []

    # Determine appropriate number of workers
    if max_workers is None:
        max_workers = min(os.cpu_count(), len(generators))

    print(f"Starting parallel generation using {max_workers} workers")

    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit all generation tasks
        future_to_config = {
            executor.submit(generate_and_save, config_name, generator, output_dir): config_name
            for config_name, generator in generators
        }

        # Process results as they complete
        completed = 0
        for future in concurrent.futures.as_completed(future_to_config):
            config_name = future_to_config[future]
            try:
                name, elapsed_time = future.result()
                results.append((name, elapsed_time))
                completed += 1

                # Display progress
                print(f"Completed {completed}/{len(generators)}: {config_name} in {elapsed_time:.2f} seconds")
            except Exception as e:
                print(f"Error generating {config_name}: {e}")

    return results

# Example usage with a subset of generators
subset_generators = grid_generators[:4]  # Using only the first 4 configurations for example
results = generate_all_benchmarks(subset_generators, output_dir, max_workers=2)

The concurrent.futures.ProcessPoolExecutor runs generation tasks in parallel processes. The as_completed iterator processes results as they become available, providing immediate feedback.

Enhanced Progress Monitoring with alive-progress

For better progress visualization, you can use the alive-progress package (included when installing with dyn-benchmark[pretty]):

try:
    from alive_progress import alive_bar

    def generate_all_benchmarks_with_alive(generators, output_dir, max_workers=None):
        """Generate all benchmarks with alive-progress bar.

        Args:
            generators: List of (name, generator) tuples
            output_dir: Directory to save benchmarks
            max_workers: Maximum number of workers

        Returns:
            List of (name, elapsed_time) tuples
        """
        results = []

        if max_workers is None:
            max_workers = min(os.cpu_count(), len(generators))

        print(f"Starting parallel generation using {max_workers} workers")

        with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
            future_to_config = {
                executor.submit(generate_and_save, config_name, generator, output_dir): config_name
                for config_name, generator in generators
            }

            # Use alive_bar for nice progress display
            with alive_bar(len(generators), title="Generating Benchmarks", force_tty=True) as bar:
                for future in concurrent.futures.as_completed(future_to_config):
                    config_name = future_to_config[future]
                    try:
                        name, elapsed_time = future.result()
                        results.append((name, elapsed_time))
                        bar.text(f"Completed: {config_name} in {elapsed_time:.2f}s")
                    except Exception as e:
                        bar.text(f"Error: {config_name}")
                    bar()

        return results

    # Example usage with the visualization
    results = generate_all_benchmarks_with_alive(subset_generators, output_dir, max_workers=2)

except ImportError:
    print("For better progress visualization, install 'dyn-benchmark[pretty]'")

The alive-progress package provides an animated progress bar with customizable features, enhancing the user experience during long-running generations.

3. Complete Example

Here’s a complete example that demonstrates a scalable approach for generating multiple benchmarks with varied parameters. This implementation:

  1. Defines a parameter grid for systematic exploration

  2. Creates reproducible generators for each parameter combination

  3. Uses GroundtruthGenerator for visual feedback during generation

  4. Implements parallel processing with multiple workers

  5. Saves the generated benchmarks and summarizes the results

  6. Handles errors gracefully and logs them for review

"""
Complete example of multiple benchmark generation with grid exploration and parallel processing
"""
from pathlib import Path
import concurrent.futures
import os
import time
import itertools
import csv

import numpy as np
import networkx as nx

from dyn.benchmark.generator.groundtruth_generator import ProgressiveGroundtruthGenerator
from dyn.benchmark.generator.communities_generator import CommunitiesGenerator
from dyn.benchmark.generator.edges_generator import SBM, FastBPAM

# Define parameter grid with simplified structure
param_grid = {
    'model': ['SBM', 'FastBPAM'],
    'community_count': [10],
    'core_nodes_ratio': [0.5, 0.7],
    'p_in': [0.4, 0.7],
    'p_out': [0.05, 0.1]
}

# Create all parameter combinations using itertools.product
def create_parameter_grid(param_grid):
    """Create all combinations from parameter grid using itertools.product"""
    param_names = list(param_grid.keys())
    param_values = list(param_grid.values())

    # Generate all combinations using itertools.product
    combinations = list(itertools.product(*param_values))

    # Convert to list of parameter dictionaries
    result = []
    for combo in combinations:
        params = {name: value for name, value in zip(param_names, combo)}
        result.append(params)

    return result

# Create generators based on parameter combinations
def create_generators(combinations, master_seed=42):
    """Create generators for all parameter combinations"""
    generators = []

    # Create master seed sequence for reproducibility
    master_seed_seq = np.random.SeedSequence(master_seed)

    # Spawn child seeds, one for each combination
    child_seeds = master_seed_seq.spawn(len(combinations))

    for i, (params, seed) in enumerate(zip(combinations, child_seeds)):
        # Create configuration name based on parameters
        config_name = f"{params['model']}_pin{params['p_in']}_pout{params['p_out']}_core{params['core_nodes_ratio']}_comm{params['community_count']}_{i}"

        # Create community generator
        community_generator = CommunitiesGenerator(
            community_count=params['community_count'],
            snapshot_count=8,
            community_size_min=5,
            core_nodes_ratio=params['core_nodes_ratio'],
            seed=seed
        )

        # Create edge generator based on model type
        if params['model'] == 'SBM':
            edge_generator = SBM(p_in=params['p_in'], p_out=params['p_out'], seed=seed)
        else:  # FastBPAM
            edge_generator = FastBPAM(gamma_in=params['p_in'], gamma_out=params['p_out'], m=5, seed=seed)

        # Create groundtruth generator
        # Use ProgressiveGroundtruthGenerator for visual feedback
        generator = ProgressiveGroundtruthGenerator(
            community_generator=community_generator,
            edge_generator=edge_generator,
            seed=seed
        )

        generators.append((config_name, generator))

    return generators

# Function to generate and save a benchmark
def generate_and_save(config_name, generator, output_dir):
    """Generate a benchmark and save it to disk.

    Args:
        config_name: Name of the configuration
        generator: GroundtruthGenerator instance
        output_dir: Directory to save the benchmark

    Returns:
        Tuple of (config_name, elapsed_time)
    """
    start_time = time.time()

    # Create output directory
    benchmark_dir = Path(output_dir) / config_name
    benchmark_dir.mkdir(parents=True, exist_ok=True)

    # Generate groundtruth
    groundtruth = generator.generate()

    # Save benchmark components
    # Save tcommlist
    tcommlist_file = benchmark_dir / "tcommlist.csv"
    with open(tcommlist_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['node_id', 'evolving_community', 'snapshot', 'static_community_id'])
        for row in groundtruth.tcommlist:
            writer.writerow([row.node_id, row.evolving_community_id, row.snapshot, row.static_community_id])

    # Save graphs (as edgelists)
    graphs_dir = benchmark_dir / "graphs"
    graphs_dir.mkdir(exist_ok=True)
    for t, graph in groundtruth.graphs.items():
        nx.write_edgelist(graph, graphs_dir / f"graph_{t}.edgelist")

    # Save metadata about the generation
    with open(benchmark_dir / "metadata.txt", "w") as f:
        f.write(f"Generation time: {time.time() - start_time:.2f} seconds\n")
        f.write(f"Snapshots: {len(groundtruth.graphs)}\n")
        f.write(f"Events: {len(groundtruth.events)}\n")

    return config_name, time.time() - start_time

# Generate benchmarks in parallel with progress reporting
def generate_all_benchmarks(generators, output_dir, max_workers=None):
    """Generate all benchmarks in parallel with progress reporting"""
    if max_workers is None:
        max_workers = min(os.cpu_count(), len(generators))

    print(f"Starting parallel generation with {max_workers} workers")

    try:
        from alive_progress import alive_bar
        has_alive = True
    except ImportError:
        has_alive = False
        print("For better visualization install with: pip install 'dyn-benchmark[pretty]'")

    results = []
    errors = []

    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for config_name, generator in generators:
            futures.append((
                executor.submit(generate_and_save, config_name, generator, output_dir),
                config_name
            ))

        # Setup progress tracking
        if has_alive:
            with alive_bar(len(futures), title="Generating benchmarks", force_tty=True) as bar:
                for future, config_name in futures:
                    try:
                        name, elapsed_time = future.result()
                        results.append((name, elapsed_time))
                        bar.text(f"Completed: {name} in {elapsed_time:.2f}s")
                    except Exception as e:
                        errors.append((config_name, str(e)))
                        bar.text(f"Error in {config_name}: {str(e)[:50]}...")
                    bar()
        else:
            total = len(futures)
            completed = 0
            for future, config_name in [(f[0], f[1]) for f in futures]:
                try:
                    name, elapsed_time = future.result()
                    results.append((name, elapsed_time))
                    completed += 1
                    print(f"Progress: {completed}/{total} - Completed {name} in {elapsed_time:.2f}s")
                except Exception as e:
                    errors.append((config_name, str(e)))
                    print(f"Error in {config_name}: {e}")

    # Print summary
    print("\nGeneration Summary:")
    print(f"Total configurations: {len(generators)}")
    print(f"Successfully generated: {len(results)}")
    print(f"Failed generations: {len(errors)}")

    if errors:
        print("\nErrors:")
        for config_name, error in errors:
            print(f"- {config_name}: {error}")

    if results:
        avg_time = sum(r[1] for r in results) / len(results)
        print(f"Average generation time: {avg_time:.2f} seconds")

    # Save error log
    if errors:
        with open(output_dir / "generation_errors.txt", "w") as f:
            f.write(f"Generation errors: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"Total configurations: {len(generators)}\n")
            f.write(f"Failed generations: {len(errors)}\n\n")

            f.write("Details:\n")
            for config_name, error in errors:
                f.write(f"{config_name}: {error}\n")

    return results

# Main execution
if __name__ == "__main__":
    # Define output directory
    output_dir = Path("benchmarks")
    output_dir.mkdir(exist_ok=True, parents=True)

    # Create parameter combinations
    combinations = create_parameter_grid(param_grid)
    print(f"Created {len(combinations)} parameter combinations")

    # Create generators
    generators = create_generators(combinations)
    print(f"Created {len(generators)} generator configurations")

    # Run the generation
    results = generate_all_benchmarks(generators, output_dir, max_workers=3)

    # Save results summary
    with open(output_dir / "generation_summary.txt", "w") as f:
        f.write(f"Generation completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Total configurations: {len(generators)}\n")
        f.write(f"Successfully generated: {len(results)}\n\n")

        f.write("Details:\n")
        for name, elapsed_time in sorted(results):
            f.write(f"{name}: {elapsed_time:.2f}s\n")