Example paralellisation dask.delayed

1         In:
# general python
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import numpy as np
import os
from pathlib import Path
import yaml
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import scipy
import xarray as xr
from tqdm import tqdm
import glob
from devtools import pprint
from tqdm import tqdm
2         In:
import ewatercycle.models
3         In:
# pip uninstall ewatercycle_parallelisation_sleep -y
4         In:
# pip install git+https://github.com/Daafip/ewatercycle-test-parallelisation@main
5         In:
from ewatercycle.models import ParallelisationSleep
6         In:
model = ParallelisationSleep()
7         In:
config, _ = model.setup(sleepiness=1)
8         In:
model.initialize(config)
9         In:
import time
10         In:
%%time
model.update()
CPU times: user 0 ns, sys: 5.38 ms, total: 5.38 ms
Wall time: 1 s

import DA function:

Locally:

41         In:
import importlib.util

def module_from_file(module_name, file_path):
    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module
42         In:
DA = module_from_file("DA",r'../eWaterCycle-DA/src/ewatercycle_DA/DA.py')

or from git:

pip install git+https://github.com/Daafip/eWaterCycle-DA@main
43         In:
pip install git+https://github.com/Daafip/eWaterCycle-DA@main
44         In:
from ewatercycle_DA import DA

setup Ensemble

16         In:
n_particles = 3
17         In:
ensemble = DA.Ensemble(N=n_particles)
ensemble.setup()
18         In:
setup_kwargs= {'sleepiness':1}
19         In:
# this initializes the models for all ensemble members.
ensemble.initialize(model_name="ParallelisationSleep",
                    forcing=None,
                    setup_kwargs=setup_kwargs)

Time current update step

20         In:
%%time
ensemble.update(assimilate=False)
CPU times: user 8.26 ms, sys: 2.75 ms, total: 11 ms
Wall time: 3.01 s
21         In:
%%time
for ensemble_member in ensemble.ensemble_list:
    ensemble_member.update()
CPU times: user 7.51 ms, sys: 1.09 ms, total: 8.59 ms
Wall time: 3.01 s

using dask

62         In:
import dask
from dask import delayed
23         In:
@delayed
def update_member(ensemble, i):
    ensemble.ensemble_list[i].update()
    return ensemble.ensemble_list[i].get_value("sleep")
24         In:
@delayed
def gather(*args):
    return list(args)
25         In:
gathered = gather(*[update_member(ensemble,i) for i in range(ensemble.N)])
26         In:
gathered.visualize()
26       Out:
_images/example_parallelisation_31_0.png
27         In:
%%time
gathered.compute()
CPU times: user 8.86 ms, sys: 9.63 ms, total: 18.5 ms
Wall time: 1.02 s
27       Out:
[array([1.]), array([1.]), array([1.])]
28         In:
ensemble.finalize()

repeat with more particles & longer time

48         In:
n_particles = 30
49         In:
ensemble = DA.Ensemble(N=n_particles)
ensemble.setup()
50         In:
setup_kwargs= {'sleepiness':1}
51         In:
%%time
# starting up this many docker containers also is a thing but yeah:
# this initializes the models for all ensemble members.
ensemble.initialize(model_name="ParallelisationSleep",
                    forcing=None,
                    setup_kwargs=setup_kwargs)
CPU times: user 516 ms, sys: 208 ms, total: 725 ms
Wall time: 41.7 s

Time current update step

52         In:
%%time
ensemble.update(assimilate=False)
CPU times: user 63.1 ms, sys: 103 ms, total: 166 ms
Wall time: 30.1 s

using dask

53         In:
gathered = gather(*[update_member(ensemble,i) for i in range(ensemble.N)])
54         In:
gathered.visualize()
54       Out:
_images/example_parallelisation_43_0.png
55         In:
%%time
_ = gathered.compute()
CPU times: user 48.8 ms, sys: 72.3 ms, total: 121 ms
Wall time: 3.05 s

can also specify amount of workers:

56         In:
import psutil
logical_n_psu = psutil.cpu_count(logical=True)
logical_n_psu
56       Out:
12
57         In:
%%time
_ = gathered.compute(num_workers=logical_n_psu)
CPU times: user 50.8 ms, sys: 30.9 ms, total: 81.8 ms
Wall time: 3.03 s
58         In:
%%time
_ = gathered.compute(num_workers= 2 * logical_n_psu)
CPU times: user 51 ms, sys: 27.6 ms, total: 78.6 ms
Wall time: 2.02 s
59         In:
%%time
_ = gathered.compute(num_workers= 4 * logical_n_psu)
CPU times: user 44.8 ms, sys: 25.3 ms, total: 70.1 ms
Wall time: 1.02 s

or

63         In:
%%time
with dask.config.set({"multiprocessing.context": "spawn", 'num_workers': 4 * logical_n_psu}):
    _ = gathered.compute()
CPU times: user 38.1 ms, sys: 12.8 ms, total: 50.9 ms
Wall time: 1.02 s

Exluding the initialization: x10 speed up

64         In:
ensemble.finalize()