Skip to content

API Reference

assess_optimal_chunk_size(n_simulations=1000, chunk_step_size=100, plot_outfile=None)

This runs the same set of doublet simulations using different chunk sizes and prints the results to the terminal to show find which chunk size is optimal. It runs each chunk size 3 times and takes the average of their times to find the time taken for that chunk size. The chunks which are tested go from 1 -> n_simulations with steps of chunk_step_size.

Parameters:

Name Type Description Default
n_simulations int

The number of simulations which are run in parralel

1000
chunk_step_size int

The step size of chunks to test; going from 1 to n_simulations

100
plot_outfile str | Path

If provided a plot is made showing the time taken per-chunk size and saved to this path

None
Source code in src/pythermogis/dask_utils/assess_optimal_chunk_size.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def assess_optimal_chunk_size(n_simulations: int = 1000, chunk_step_size: int = 100, plot_outfile : str | Path = None):
    """
    This runs the same set of doublet simulations using different chunk sizes and prints the results to the terminal to show find which chunk size is optimal.
    It runs each chunk size 3 times and takes the average of their times to find the time taken for that chunk size.
    The chunks which are tested go from 1 -> n_simulations with steps of chunk_step_size.

    Parameters
    ----------
    n_simulations : int
        The number of simulations which are run in parralel
    chunk_step_size : int
        The step size of chunks to test; going from 1 to n_simulations
    plot_outfile : str | Path
        If provided a plot is made showing the time taken per-chunk size and saved to this path

    """
    reservoir_properties = xr.Dataset(
        {
            "thickness": (["sample"], np.ones(n_simulations) * 200),
            "porosity": (["sample"], np.ones(n_simulations)),
            "ntg": (["sample"], np.ones(n_simulations)),
            "depth": (["sample"], np.ones(n_simulations) * 1000),
            "permeability": (["sample"], np.ones(n_simulations) * 500),
        },
        coords={"sample": np.arange(n_simulations)}
    )

    n_attempts = 3 # do the same operation n_attempts and take an average of their times.
    sample_chunks = np.arange(1, n_simulations + 2, chunk_step_size)

    # run in series
    time_attempt = []
    for attempt in range(n_attempts):
        start = timeit.default_timer()
        simulation_benchmark = calculate_doublet_performance(reservoir_properties)
        time_attempt.append(timeit.default_timer() - start)
    normal_time = np.mean(time_attempt)
    print(f"non-parralel simulation took {normal_time:.1f} seconds, {n_simulations / normal_time:.1f} samples per second")


    # run in parallel:
    mean_time = []
    std_time = []
    for sample_chunk in sample_chunks:
        time_attempt = []
        for attempt in range(n_attempts):
            start = timeit.default_timer()
            simulations_parallel = calculate_doublet_performance(reservoir_properties, chunk_size=sample_chunk, print_execution_duration=False)
            time_attempt.append(timeit.default_timer() - start)

            # additional check that the results are identical
            xr.testing.assert_allclose(simulation_benchmark, simulations_parallel)
            xr.testing.assert_equal(simulation_benchmark, simulations_parallel)

        mean_time.append(np.mean(time_attempt))
        std_time.append(np.std(time_attempt))
        print(f"parralel simulation, chunk size: {sample_chunk}, took {np.mean(time_attempt):.1f} seconds to run {n_simulations} simulations, {n_simulations / mean_time[-1]:.1f} samples per second")

    if plot_outfile is None:
        return

    fig, ax = plt.subplots(1, 1, figsize=(8, 5))
    ax.errorbar(sample_chunks, mean_time, yerr=std_time, fmt='o', capsize=5, label='parralel simulation')
    ax.axhline(normal_time, label="non-parralel simulation", color="tab:orange", linestyle="--")
    ax.set_xlabel("chunk size")
    ax.set_ylabel("time (s)")
    ax.set_title("Chunk size assessment")
    ax.legend()
    plt.savefig(plot_outfile)