Tutorial 2: Scalable Ground truths Generation ============================================= This tutorial demonstrates how to efficiently generate multiple benchmark datasets with parameter variations, manage parallel execution, and monitor progress during generation. Prerequisites ------------- .. code-block:: python # Standard library imports import concurrent.futures import itertools import os import time from pathlib import Path # Third-party imports import numpy as np # Package imports from dyn.benchmark.generator.groundtruth_generator import GroundtruthGenerator, ProgressiveGroundtruthGenerator from dyn.benchmark.generator.communities_generator import CommunitiesGenerator, RelativeOverlap from dyn.benchmark.generator.edges_generator import SBM, FastBPAM from dyn.benchmark.generator.nodes_generator import RandomMemberGenerator 1. Creating Parameterized Generators ------------------------------------ Creating multiple benchmarks involves defining a parameter space and instantiating generators for each parameter combination. This approach allows systematic exploration of how different parameters affect community evolution. Basic parameter setup ~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python # Define parameter variations p_in_values = [0.5, 0.7, 0.9] p_out_values = [0.05, 0.1, 0.15] core_nodes_ratios = [0.3, 0.5, 0.7] # Create generators for each parameter combination generators = [] for p_in in p_in_values: for p_out in p_out_values: for core_ratio in core_nodes_ratios: # Create a unique name for this configuration config_name = f"benchmark_pin{p_in}_pout{p_out}_core{core_ratio}" # Create generator with these parameters generator = GroundtruthGenerator( community_generator=CommunitiesGenerator( community_count=10, snapshot_count=8, community_size_min=5, core_nodes_ratio=core_ratio, seed=42 # Same seed for reproducibility ), edge_generator=SBM( p_in=p_in, p_out=p_out, seed=42 ), seed=42 ) # Store the generator with its configuration name generators.append((config_name, generator)) print(f"Created {len(generators)} generator configurations") Each generator is an instance of :class:`GroundtruthGenerator ` configured with specific parameters. Ensuring Reproducibility with Seed Sequences ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When generating multiple benchmarks, ensuring reproducibility requires careful seed management. The :class:`GroundtruthGenerator ` class uses NumPy's `SeedSequence` class to spawn child seeds that produce statistically independent random streams. .. code-block:: python # Create reproducible variants with different seeds def create_reproducible_variants(base_generator, num_variants=5): """Create multiple variant generators with independent random streams. Args: base_generator: The base generator to copy num_variants: Number of variant generators to create Returns: List of (name, generator) tuples with different seeds """ variants = [] # Create master seed sequence master_seed = np.random.SeedSequence(42) # Spawn child seeds # This creates statistically independent sequences child_seeds = master_seed.spawn(num_variants) for i, seed in enumerate(child_seeds): # Copy the base generator (deep copy) variant = base_generator.copy() # Configure with new seed # This internally propagates the seed to all component generators variant.configure_seed(seed) variants.append((f"variant_{i}", variant)) return variants # Example usage: base_generator = GroundtruthGenerator(seed=42) variant_generators = create_reproducible_variants(base_generator, num_variants=5) print(f"Created {len(variant_generators)} reproducible variants") The `spawn` method of `SeedSequence` creates new sequences that are statistically independent from the parent sequence. This ensures that each generator creates different random outcomes while maintaining reproducibility across runs. Parameter Grid Exploration ~~~~~~~~~~~~~~~~~~~~~~~~~~ For systematic exploration of parameter spaces, you can use parameter grids to generate all combinations: .. code-block:: python def create_parameter_grid(param_grid): """Create all combinations from a parameter grid dictionary. Args: param_grid: Dictionary of parameter names and their possible values Returns: List of parameter dictionaries, one for each combination """ # Get all parameter names and their possible values param_names = list(param_grid.keys()) param_values = list(param_grid.values()) # Generate all combinations combinations = list(itertools.product(*param_values)) # Convert to list of parameter dictionaries result = [] for combo in combinations: params = {name: value for name, value in zip(param_names, combo)} result.append(params) return result # Example of a parameter grid param_grid = { 'community_count': [5, 10, 15], 'core_nodes_ratio': [0.3, 0.5, 0.7], 'p_in': [0.6, 0.8], 'p_out': [0.05, 0.1] } # Generate all parameter combinations parameter_combinations = create_parameter_grid(param_grid) print(f"Created {len(parameter_combinations)} parameter combinations") # Create generators for each parameter combination with reproducible seeds grid_generators = [] # Create master seed sequence master_seed = np.random.SeedSequence(42) # Spawn child seeds, one for each combination child_seeds = master_seed.spawn(len(parameter_combinations)) for i, (params, seed) in enumerate(zip(parameter_combinations, child_seeds)): config_name = f"grid_search_{i}" generator = GroundtruthGenerator( community_generator=CommunitiesGenerator( community_count=params['community_count'], snapshot_count=8, community_size_min=5, core_nodes_ratio=params['core_nodes_ratio'], seed=seed # Use spawned seed ), edge_generator=SBM( p_in=params['p_in'], p_out=params['p_out'], seed=seed # Use same spawned seed ), seed=seed # Use same spawned seed for consistency ) grid_generators.append((config_name, generator)) The `itertools.product()` function efficiently creates all combinations of parameter values, which are then used to instantiate the generators. Each generator receives a unique spawned seed to ensure both reproducibility and uniqueness. 2. Parallel Generation with Progress Monitoring ----------------------------------------------- For large-scale benchmark generation, parallel processing significantly reduces computation time: Generation and Saving Function ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def generate_and_save(config_name, generator, output_dir): """Generate a benchmark and save it to disk. Args: config_name: Name of the configuration generator: GroundtruthGenerator instance output_dir: Directory to save the benchmark Returns: Tuple of (config_name, elapsed_time) """ start_time = time.time() # Create output directory benchmark_dir = Path(output_dir) / config_name benchmark_dir.mkdir(parents=True, exist_ok=True) # Generate groundtruth groundtruth = generator.generate() # Save benchmark components (you might want to use pickle or other formats) # Save tcommlist tcommlist_file = benchmark_dir / "tcommlist.csv" with open(tcommlist_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['node_id', 'evolving_community', 'snapshot', 'static_community_id']) for row in groundtruth.tcommlist: writer.writerow([row.node_id, row.evolving_community_id, row.snapshot, row.static_community_id]) # Save graphs (as edgelists) graphs_dir = benchmark_dir / "graphs" graphs_dir.mkdir(exist_ok=True) for t, graph in groundtruth.graphs.items(): nx.write_edgelist(graph, graphs_dir / f"graph_{t}.edgelist") # Save metadata about the generation with open(benchmark_dir / "metadata.txt", "w") as f: f.write(f"Generation time: {time.time() - start_time:.2f} seconds\n") f.write(f"Snapshots: {len(groundtruth.graphs)}\n") f.write(f"Events: {len(groundtruth.events)}\n") return config_name, time.time() - start_time Parallel Generation Execution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python # Define output directory output_dir = Path("benchmarks") output_dir.mkdir(exist_ok=True) # Function to generate all benchmarks in parallel def generate_all_benchmarks(generators, output_dir, max_workers=None): """Generate all benchmarks in parallel. Args: generators: List of (name, generator) tuples output_dir: Directory to save benchmarks max_workers: Maximum number of workers (defaults to CPU count) Returns: List of (name, elapsed_time) tuples """ results = [] # Determine appropriate number of workers if max_workers is None: max_workers = min(os.cpu_count(), len(generators)) print(f"Starting parallel generation using {max_workers} workers") with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit all generation tasks future_to_config = { executor.submit(generate_and_save, config_name, generator, output_dir): config_name for config_name, generator in generators } # Process results as they complete completed = 0 for future in concurrent.futures.as_completed(future_to_config): config_name = future_to_config[future] try: name, elapsed_time = future.result() results.append((name, elapsed_time)) completed += 1 # Display progress print(f"Completed {completed}/{len(generators)}: {config_name} in {elapsed_time:.2f} seconds") except Exception as e: print(f"Error generating {config_name}: {e}") return results # Example usage with a subset of generators subset_generators = grid_generators[:4] # Using only the first 4 configurations for example results = generate_all_benchmarks(subset_generators, output_dir, max_workers=2) The `concurrent.futures.ProcessPoolExecutor` runs generation tasks in parallel processes. The `as_completed` iterator processes results as they become available, providing immediate feedback. Enhanced Progress Monitoring with `alive-progress` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For better progress visualization, you can use the `alive-progress` package (included when installing with `dyn-benchmark[pretty]`): .. code-block:: python try: from alive_progress import alive_bar def generate_all_benchmarks_with_alive(generators, output_dir, max_workers=None): """Generate all benchmarks with alive-progress bar. Args: generators: List of (name, generator) tuples output_dir: Directory to save benchmarks max_workers: Maximum number of workers Returns: List of (name, elapsed_time) tuples """ results = [] if max_workers is None: max_workers = min(os.cpu_count(), len(generators)) print(f"Starting parallel generation using {max_workers} workers") with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: future_to_config = { executor.submit(generate_and_save, config_name, generator, output_dir): config_name for config_name, generator in generators } # Use alive_bar for nice progress display with alive_bar(len(generators), title="Generating Benchmarks", force_tty=True) as bar: for future in concurrent.futures.as_completed(future_to_config): config_name = future_to_config[future] try: name, elapsed_time = future.result() results.append((name, elapsed_time)) bar.text(f"Completed: {config_name} in {elapsed_time:.2f}s") except Exception as e: bar.text(f"Error: {config_name}") bar() return results # Example usage with the visualization results = generate_all_benchmarks_with_alive(subset_generators, output_dir, max_workers=2) except ImportError: print("For better progress visualization, install 'dyn-benchmark[pretty]'") The `alive-progress` package provides an animated progress bar with customizable features, enhancing the user experience during long-running generations. 3. Complete Example ------------------- Here's a complete example that demonstrates a scalable approach for generating multiple benchmarks with varied parameters. This implementation: 1. Defines a parameter grid for systematic exploration 2. Creates reproducible generators for each parameter combination 3. Uses :class:`GroundtruthGenerator ` for visual feedback during generation 4. Implements parallel processing with multiple workers 5. Saves the generated benchmarks and summarizes the results 6. Handles errors gracefully and logs them for review .. code-block:: python """ Complete example of multiple benchmark generation with grid exploration and parallel processing """ from pathlib import Path import concurrent.futures import os import time import itertools import csv import numpy as np import networkx as nx from dyn.benchmark.generator.groundtruth_generator import ProgressiveGroundtruthGenerator from dyn.benchmark.generator.communities_generator import CommunitiesGenerator from dyn.benchmark.generator.edges_generator import SBM, FastBPAM # Define parameter grid with simplified structure param_grid = { 'model': ['SBM', 'FastBPAM'], 'community_count': [10], 'core_nodes_ratio': [0.5, 0.7], 'p_in': [0.4, 0.7], 'p_out': [0.05, 0.1] } # Create all parameter combinations using itertools.product def create_parameter_grid(param_grid): """Create all combinations from parameter grid using itertools.product""" param_names = list(param_grid.keys()) param_values = list(param_grid.values()) # Generate all combinations using itertools.product combinations = list(itertools.product(*param_values)) # Convert to list of parameter dictionaries result = [] for combo in combinations: params = {name: value for name, value in zip(param_names, combo)} result.append(params) return result # Create generators based on parameter combinations def create_generators(combinations, master_seed=42): """Create generators for all parameter combinations""" generators = [] # Create master seed sequence for reproducibility master_seed_seq = np.random.SeedSequence(master_seed) # Spawn child seeds, one for each combination child_seeds = master_seed_seq.spawn(len(combinations)) for i, (params, seed) in enumerate(zip(combinations, child_seeds)): # Create configuration name based on parameters config_name = f"{params['model']}_pin{params['p_in']}_pout{params['p_out']}_core{params['core_nodes_ratio']}_comm{params['community_count']}_{i}" # Create community generator community_generator = CommunitiesGenerator( community_count=params['community_count'], snapshot_count=8, community_size_min=5, core_nodes_ratio=params['core_nodes_ratio'], seed=seed ) # Create edge generator based on model type if params['model'] == 'SBM': edge_generator = SBM(p_in=params['p_in'], p_out=params['p_out'], seed=seed) else: # FastBPAM edge_generator = FastBPAM(gamma_in=params['p_in'], gamma_out=params['p_out'], m=5, seed=seed) # Create groundtruth generator # Use ProgressiveGroundtruthGenerator for visual feedback generator = ProgressiveGroundtruthGenerator( community_generator=community_generator, edge_generator=edge_generator, seed=seed ) generators.append((config_name, generator)) return generators # Function to generate and save a benchmark def generate_and_save(config_name, generator, output_dir): """Generate a benchmark and save it to disk. Args: config_name: Name of the configuration generator: GroundtruthGenerator instance output_dir: Directory to save the benchmark Returns: Tuple of (config_name, elapsed_time) """ start_time = time.time() # Create output directory benchmark_dir = Path(output_dir) / config_name benchmark_dir.mkdir(parents=True, exist_ok=True) # Generate groundtruth groundtruth = generator.generate() # Save benchmark components # Save tcommlist tcommlist_file = benchmark_dir / "tcommlist.csv" with open(tcommlist_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['node_id', 'evolving_community', 'snapshot', 'static_community_id']) for row in groundtruth.tcommlist: writer.writerow([row.node_id, row.evolving_community_id, row.snapshot, row.static_community_id]) # Save graphs (as edgelists) graphs_dir = benchmark_dir / "graphs" graphs_dir.mkdir(exist_ok=True) for t, graph in groundtruth.graphs.items(): nx.write_edgelist(graph, graphs_dir / f"graph_{t}.edgelist") # Save metadata about the generation with open(benchmark_dir / "metadata.txt", "w") as f: f.write(f"Generation time: {time.time() - start_time:.2f} seconds\n") f.write(f"Snapshots: {len(groundtruth.graphs)}\n") f.write(f"Events: {len(groundtruth.events)}\n") return config_name, time.time() - start_time # Generate benchmarks in parallel with progress reporting def generate_all_benchmarks(generators, output_dir, max_workers=None): """Generate all benchmarks in parallel with progress reporting""" if max_workers is None: max_workers = min(os.cpu_count(), len(generators)) print(f"Starting parallel generation with {max_workers} workers") try: from alive_progress import alive_bar has_alive = True except ImportError: has_alive = False print("For better visualization install with: pip install 'dyn-benchmark[pretty]'") results = [] errors = [] with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: futures = [] for config_name, generator in generators: futures.append(( executor.submit(generate_and_save, config_name, generator, output_dir), config_name )) # Setup progress tracking if has_alive: with alive_bar(len(futures), title="Generating benchmarks", force_tty=True) as bar: for future, config_name in futures: try: name, elapsed_time = future.result() results.append((name, elapsed_time)) bar.text(f"Completed: {name} in {elapsed_time:.2f}s") except Exception as e: errors.append((config_name, str(e))) bar.text(f"Error in {config_name}: {str(e)[:50]}...") bar() else: total = len(futures) completed = 0 for future, config_name in [(f[0], f[1]) for f in futures]: try: name, elapsed_time = future.result() results.append((name, elapsed_time)) completed += 1 print(f"Progress: {completed}/{total} - Completed {name} in {elapsed_time:.2f}s") except Exception as e: errors.append((config_name, str(e))) print(f"Error in {config_name}: {e}") # Print summary print("\nGeneration Summary:") print(f"Total configurations: {len(generators)}") print(f"Successfully generated: {len(results)}") print(f"Failed generations: {len(errors)}") if errors: print("\nErrors:") for config_name, error in errors: print(f"- {config_name}: {error}") if results: avg_time = sum(r[1] for r in results) / len(results) print(f"Average generation time: {avg_time:.2f} seconds") # Save error log if errors: with open(output_dir / "generation_errors.txt", "w") as f: f.write(f"Generation errors: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Total configurations: {len(generators)}\n") f.write(f"Failed generations: {len(errors)}\n\n") f.write("Details:\n") for config_name, error in errors: f.write(f"{config_name}: {error}\n") return results # Main execution if __name__ == "__main__": # Define output directory output_dir = Path("benchmarks") output_dir.mkdir(exist_ok=True, parents=True) # Create parameter combinations combinations = create_parameter_grid(param_grid) print(f"Created {len(combinations)} parameter combinations") # Create generators generators = create_generators(combinations) print(f"Created {len(generators)} generator configurations") # Run the generation results = generate_all_benchmarks(generators, output_dir, max_workers=3) # Save results summary with open(output_dir / "generation_summary.txt", "w") as f: f.write(f"Generation completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Total configurations: {len(generators)}\n") f.write(f"Successfully generated: {len(results)}\n\n") f.write("Details:\n") for name, elapsed_time in sorted(results): f.write(f"{name}: {elapsed_time:.2f}s\n")