Tutorial 2: Scalable Ground truths Generation
This tutorial demonstrates how to efficiently generate multiple benchmark datasets with parameter variations, manage parallel execution, and monitor progress during generation.
Prerequisites
# Standard library imports
import concurrent.futures
import itertools
import os
import time
from pathlib import Path
# Third-party imports
import numpy as np
# Package imports
from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator, ProgressiveGroundtruthGenerator
from dyn.benchmark.generator.communities_generator import CommunitiesGenerator, RelativeOverlap
from dyn.benchmark.generator.edges_generator import SBM, FastBPAM
from dyn.benchmark.generator.nodes_generator import RandomMemberGenerator
1. Creating Parameterized Generators
Creating multiple benchmarks involves defining a parameter space and instantiating generators for each parameter combination. This approach allows systematic exploration of how different parameters affect community evolution.
Basic parameter setup
# Define parameter variations
p_in_values = [0.5, 0.7, 0.9]
p_out_values = [0.05, 0.1, 0.15]
core_nodes_ratios = [0.3, 0.5, 0.7]
# Create generators for each parameter combination
generators = []
for p_in in p_in_values:
for p_out in p_out_values:
for core_ratio in core_nodes_ratios:
# Create a unique name for this configuration
config_name = f"benchmark_pin{p_in}_pout{p_out}_core{core_ratio}"
# Create generator with these parameters
generator = GroundtruthGenerator(
community_generator=CommunitiesGenerator(
community_count=10,
snapshot_count=8,
community_size_min=5,
core_nodes_ratio=core_ratio,
seed=42 # Same seed for reproducibility
),
edge_generator=SBM(
p_in=p_in,
p_out=p_out,
seed=42
),
seed=42
)
# Store the generator with its configuration name
generators.append((config_name, generator))
print(f"Created {len(generators)} generator configurations")
Each generator is an instance of GroundtruthGenerator configured with specific parameters.
Ensuring Reproducibility with Seed Sequences
When generating multiple benchmarks, ensuring reproducibility requires careful seed management. The GroundtruthGenerator class uses NumPy’s SeedSequence class to spawn child seeds that produce statistically independent random streams.
# Create reproducible variants with different seeds
def create_reproducible_variants(base_generator, num_variants=5):
"""Create multiple variant generators with independent random streams.
Args:
base_generator: The base generator to copy
num_variants: Number of variant generators to create
Returns:
List of (name, generator) tuples with different seeds
"""
variants = []
# Create master seed sequence
master_seed = np.random.SeedSequence(42)
# Spawn child seeds
# This creates statistically independent sequences
child_seeds = master_seed.spawn(num_variants)
for i, seed in enumerate(child_seeds):
# Copy the base generator (deep copy)
variant = base_generator.copy()
# Configure with new seed
# This internally propagates the seed to all component generators
variant.configure_seed(seed)
variants.append((f"variant_{i}", variant))
return variants
# Example usage:
base_generator = GroundtruthGenerator(seed=42)
variant_generators = create_reproducible_variants(base_generator, num_variants=5)
print(f"Created {len(variant_generators)} reproducible variants")
The spawn method of SeedSequence creates new sequences that are statistically independent from the parent sequence. This ensures that each generator creates different random outcomes while maintaining reproducibility across runs.
Parameter Grid Exploration
For systematic exploration of parameter spaces, you can use parameter grids to generate all combinations:
def create_parameter_grid(param_grid):
"""Create all combinations from a parameter grid dictionary.
Args:
param_grid: Dictionary of parameter names and their possible values
Returns:
List of parameter dictionaries, one for each combination
"""
# Get all parameter names and their possible values
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
# Generate all combinations
combinations = list(itertools.product(*param_values))
# Convert to list of parameter dictionaries
result = []
for combo in combinations:
params = {name: value for name, value in zip(param_names, combo)}
result.append(params)
return result
# Example of a parameter grid
param_grid = {
'community_count': [5, 10, 15],
'core_nodes_ratio': [0.3, 0.5, 0.7],
'p_in': [0.6, 0.8],
'p_out': [0.05, 0.1]
}
# Generate all parameter combinations
parameter_combinations = create_parameter_grid(param_grid)
print(f"Created {len(parameter_combinations)} parameter combinations")
# Create generators for each parameter combination with reproducible seeds
grid_generators = []
# Create master seed sequence
master_seed = np.random.SeedSequence(42)
# Spawn child seeds, one for each combination
child_seeds = master_seed.spawn(len(parameter_combinations))
for i, (params, seed) in enumerate(zip(parameter_combinations, child_seeds)):
config_name = f"grid_search_{i}"
generator = GroundtruthGenerator(
community_generator=CommunitiesGenerator(
community_count=params['community_count'],
snapshot_count=8,
community_size_min=5,
core_nodes_ratio=params['core_nodes_ratio'],
seed=seed # Use spawned seed
),
edge_generator=SBM(
p_in=params['p_in'],
p_out=params['p_out'],
seed=seed # Use same spawned seed
),
seed=seed # Use same spawned seed for consistency
)
grid_generators.append((config_name, generator))
The itertools.product() function efficiently creates all combinations of parameter values, which are then used to instantiate the generators. Each generator receives a unique spawned seed to ensure both reproducibility and uniqueness.
2. Parallel Generation with Progress Monitoring
For large-scale benchmark generation, parallel processing significantly reduces computation time:
Generation and Saving Function
def generate_and_save(config_name, generator, output_dir):
"""Generate a benchmark and save it to disk.
Args:
config_name: Name of the configuration
generator: GroundtruthGenerator instance
output_dir: Directory to save the benchmark
Returns:
Tuple of (config_name, elapsed_time)
"""
start_time = time.time()
# Create output directory
benchmark_dir = Path(output_dir) / config_name
benchmark_dir.mkdir(parents=True, exist_ok=True)
# Generate groundtruth
groundtruth = generator.generate()
# Save benchmark components (you might want to use pickle or other formats)
# Save tcommlist
tcommlist_file = benchmark_dir / "tcommlist.csv"
with open(tcommlist_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['node_id', 'evolving_community', 'snapshot', 'static_community_id'])
for row in groundtruth.tcommlist:
writer.writerow([row.node_id, row.evolving_community_id, row.snapshot, row.static_community_id])
# Save graphs (as edgelists)
graphs_dir = benchmark_dir / "graphs"
graphs_dir.mkdir(exist_ok=True)
for t, graph in groundtruth.graphs.items():
nx.write_edgelist(graph, graphs_dir / f"graph_{t}.edgelist")
# Save metadata about the generation
with open(benchmark_dir / "metadata.txt", "w") as f:
f.write(f"Generation time: {time.time() - start_time:.2f} seconds\n")
f.write(f"Snapshots: {len(groundtruth.graphs)}\n")
f.write(f"Events: {len(groundtruth.events)}\n")
return config_name, time.time() - start_time
Parallel Generation Execution
# Define output directory
output_dir = Path("benchmarks")
output_dir.mkdir(exist_ok=True)
# Function to generate all benchmarks in parallel
def generate_all_benchmarks(generators, output_dir, max_workers=None):
"""Generate all benchmarks in parallel.
Args:
generators: List of (name, generator) tuples
output_dir: Directory to save benchmarks
max_workers: Maximum number of workers (defaults to CPU count)
Returns:
List of (name, elapsed_time) tuples
"""
results = []
# Determine appropriate number of workers
if max_workers is None:
max_workers = min(os.cpu_count(), len(generators))
print(f"Starting parallel generation using {max_workers} workers")
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all generation tasks
future_to_config = {
executor.submit(generate_and_save, config_name, generator, output_dir): config_name
for config_name, generator in generators
}
# Process results as they complete
completed = 0
for future in concurrent.futures.as_completed(future_to_config):
config_name = future_to_config[future]
try:
name, elapsed_time = future.result()
results.append((name, elapsed_time))
completed += 1
# Display progress
print(f"Completed {completed}/{len(generators)}: {config_name} in {elapsed_time:.2f} seconds")
except Exception as e:
print(f"Error generating {config_name}: {e}")
return results
# Example usage with a subset of generators
subset_generators = grid_generators[:4] # Using only the first 4 configurations for example
results = generate_all_benchmarks(subset_generators, output_dir, max_workers=2)
The concurrent.futures.ProcessPoolExecutor runs generation tasks in parallel processes. The as_completed iterator processes results as they become available, providing immediate feedback.
Enhanced Progress Monitoring with alive-progress
For better progress visualization, you can use the alive-progress package (included when installing with dyn-benchmark[pretty]):
try:
from alive_progress import alive_bar
def generate_all_benchmarks_with_alive(generators, output_dir, max_workers=None):
"""Generate all benchmarks with alive-progress bar.
Args:
generators: List of (name, generator) tuples
output_dir: Directory to save benchmarks
max_workers: Maximum number of workers
Returns:
List of (name, elapsed_time) tuples
"""
results = []
if max_workers is None:
max_workers = min(os.cpu_count(), len(generators))
print(f"Starting parallel generation using {max_workers} workers")
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_config = {
executor.submit(generate_and_save, config_name, generator, output_dir): config_name
for config_name, generator in generators
}
# Use alive_bar for nice progress display
with alive_bar(len(generators), title="Generating Benchmarks", force_tty=True) as bar:
for future in concurrent.futures.as_completed(future_to_config):
config_name = future_to_config[future]
try:
name, elapsed_time = future.result()
results.append((name, elapsed_time))
bar.text(f"Completed: {config_name} in {elapsed_time:.2f}s")
except Exception as e:
bar.text(f"Error: {config_name}")
bar()
return results
# Example usage with the visualization
results = generate_all_benchmarks_with_alive(subset_generators, output_dir, max_workers=2)
except ImportError:
print("For better progress visualization, install 'dyn-benchmark[pretty]'")
The alive-progress package provides an animated progress bar with customizable features, enhancing the user experience during long-running generations.
3. Complete Example
Here’s a complete example that demonstrates a scalable approach for generating multiple benchmarks with varied parameters. This implementation:
Defines a parameter grid for systematic exploration
Creates reproducible generators for each parameter combination
Uses
GroundtruthGeneratorfor visual feedback during generationImplements parallel processing with multiple workers
Saves the generated benchmarks and summarizes the results
Handles errors gracefully and logs them for review
"""
Complete example of multiple benchmark generation with grid exploration and parallel processing
"""
from pathlib import Path
import concurrent.futures
import os
import time
import itertools
import csv
import numpy as np
import networkx as nx
from dyn.benchmark.generator.groundtruth_generator import ProgressiveGroundtruthGenerator
from dyn.benchmark.generator.communities_generator import CommunitiesGenerator
from dyn.benchmark.generator.edges_generator import SBM, FastBPAM
# Define parameter grid with simplified structure
param_grid = {
'model': ['SBM', 'FastBPAM'],
'community_count': [10],
'core_nodes_ratio': [0.5, 0.7],
'p_in': [0.4, 0.7],
'p_out': [0.05, 0.1]
}
# Create all parameter combinations using itertools.product
def create_parameter_grid(param_grid):
"""Create all combinations from parameter grid using itertools.product"""
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
# Generate all combinations using itertools.product
combinations = list(itertools.product(*param_values))
# Convert to list of parameter dictionaries
result = []
for combo in combinations:
params = {name: value for name, value in zip(param_names, combo)}
result.append(params)
return result
# Create generators based on parameter combinations
def create_generators(combinations, master_seed=42):
"""Create generators for all parameter combinations"""
generators = []
# Create master seed sequence for reproducibility
master_seed_seq = np.random.SeedSequence(master_seed)
# Spawn child seeds, one for each combination
child_seeds = master_seed_seq.spawn(len(combinations))
for i, (params, seed) in enumerate(zip(combinations, child_seeds)):
# Create configuration name based on parameters
config_name = f"{params['model']}_pin{params['p_in']}_pout{params['p_out']}_core{params['core_nodes_ratio']}_comm{params['community_count']}_{i}"
# Create community generator
community_generator = CommunitiesGenerator(
community_count=params['community_count'],
snapshot_count=8,
community_size_min=5,
core_nodes_ratio=params['core_nodes_ratio'],
seed=seed
)
# Create edge generator based on model type
if params['model'] == 'SBM':
edge_generator = SBM(p_in=params['p_in'], p_out=params['p_out'], seed=seed)
else: # FastBPAM
edge_generator = FastBPAM(gamma_in=params['p_in'], gamma_out=params['p_out'], m=5, seed=seed)
# Create groundtruth generator
# Use ProgressiveGroundtruthGenerator for visual feedback
generator = ProgressiveGroundtruthGenerator(
community_generator=community_generator,
edge_generator=edge_generator,
seed=seed
)
generators.append((config_name, generator))
return generators
# Function to generate and save a benchmark
def generate_and_save(config_name, generator, output_dir):
"""Generate a benchmark and save it to disk.
Args:
config_name: Name of the configuration
generator: GroundtruthGenerator instance
output_dir: Directory to save the benchmark
Returns:
Tuple of (config_name, elapsed_time)
"""
start_time = time.time()
# Create output directory
benchmark_dir = Path(output_dir) / config_name
benchmark_dir.mkdir(parents=True, exist_ok=True)
# Generate groundtruth
groundtruth = generator.generate()
# Save benchmark components
# Save tcommlist
tcommlist_file = benchmark_dir / "tcommlist.csv"
with open(tcommlist_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['node_id', 'evolving_community', 'snapshot', 'static_community_id'])
for row in groundtruth.tcommlist:
writer.writerow([row.node_id, row.evolving_community_id, row.snapshot, row.static_community_id])
# Save graphs (as edgelists)
graphs_dir = benchmark_dir / "graphs"
graphs_dir.mkdir(exist_ok=True)
for t, graph in groundtruth.graphs.items():
nx.write_edgelist(graph, graphs_dir / f"graph_{t}.edgelist")
# Save metadata about the generation
with open(benchmark_dir / "metadata.txt", "w") as f:
f.write(f"Generation time: {time.time() - start_time:.2f} seconds\n")
f.write(f"Snapshots: {len(groundtruth.graphs)}\n")
f.write(f"Events: {len(groundtruth.events)}\n")
return config_name, time.time() - start_time
# Generate benchmarks in parallel with progress reporting
def generate_all_benchmarks(generators, output_dir, max_workers=None):
"""Generate all benchmarks in parallel with progress reporting"""
if max_workers is None:
max_workers = min(os.cpu_count(), len(generators))
print(f"Starting parallel generation with {max_workers} workers")
try:
from alive_progress import alive_bar
has_alive = True
except ImportError:
has_alive = False
print("For better visualization install with: pip install 'dyn-benchmark[pretty]'")
results = []
errors = []
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for config_name, generator in generators:
futures.append((
executor.submit(generate_and_save, config_name, generator, output_dir),
config_name
))
# Setup progress tracking
if has_alive:
with alive_bar(len(futures), title="Generating benchmarks", force_tty=True) as bar:
for future, config_name in futures:
try:
name, elapsed_time = future.result()
results.append((name, elapsed_time))
bar.text(f"Completed: {name} in {elapsed_time:.2f}s")
except Exception as e:
errors.append((config_name, str(e)))
bar.text(f"Error in {config_name}: {str(e)[:50]}...")
bar()
else:
total = len(futures)
completed = 0
for future, config_name in [(f[0], f[1]) for f in futures]:
try:
name, elapsed_time = future.result()
results.append((name, elapsed_time))
completed += 1
print(f"Progress: {completed}/{total} - Completed {name} in {elapsed_time:.2f}s")
except Exception as e:
errors.append((config_name, str(e)))
print(f"Error in {config_name}: {e}")
# Print summary
print("\nGeneration Summary:")
print(f"Total configurations: {len(generators)}")
print(f"Successfully generated: {len(results)}")
print(f"Failed generations: {len(errors)}")
if errors:
print("\nErrors:")
for config_name, error in errors:
print(f"- {config_name}: {error}")
if results:
avg_time = sum(r[1] for r in results) / len(results)
print(f"Average generation time: {avg_time:.2f} seconds")
# Save error log
if errors:
with open(output_dir / "generation_errors.txt", "w") as f:
f.write(f"Generation errors: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total configurations: {len(generators)}\n")
f.write(f"Failed generations: {len(errors)}\n\n")
f.write("Details:\n")
for config_name, error in errors:
f.write(f"{config_name}: {error}\n")
return results
# Main execution
if __name__ == "__main__":
# Define output directory
output_dir = Path("benchmarks")
output_dir.mkdir(exist_ok=True, parents=True)
# Create parameter combinations
combinations = create_parameter_grid(param_grid)
print(f"Created {len(combinations)} parameter combinations")
# Create generators
generators = create_generators(combinations)
print(f"Created {len(generators)} generator configurations")
# Run the generation
results = generate_all_benchmarks(generators, output_dir, max_workers=3)
# Save results summary
with open(output_dir / "generation_summary.txt", "w") as f:
f.write(f"Generation completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total configurations: {len(generators)}\n")
f.write(f"Successfully generated: {len(results)}\n\n")
f.write("Details:\n")
for name, elapsed_time in sorted(results):
f.write(f"{name}: {elapsed_time:.2f}s\n")