Example paralellisation dask.delayed
1 In:
# general python
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
import numpy as np
import os
from pathlib import Path
import yaml
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import scipy
import xarray as xr
from tqdm import tqdm
import glob
from devtools import pprint
from tqdm import tqdm
2 In:
import ewatercycle.models
3 In:
# pip uninstall ewatercycle_parallelisation_sleep -y
4 In:
# pip install git+https://github.com/Daafip/ewatercycle-test-parallelisation@main
5 In:
from ewatercycle.models import ParallelisationSleep
6 In:
model = ParallelisationSleep()
7 In:
config, _ = model.setup(sleepiness=1)
8 In:
model.initialize(config)
9 In:
import time
10 In:
%%time
model.update()
CPU times: user 0 ns, sys: 5.38 ms, total: 5.38 ms
Wall time: 1 s
import DA function:
Locally:
41 In:
import importlib.util
def module_from_file(module_name, file_path):
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
42 In:
DA = module_from_file("DA",r'../eWaterCycle-DA/src/ewatercycle_DA/DA.py')
or from git:
pip install git+https://github.com/Daafip/eWaterCycle-DA@main
43 In:
pip install git+https://github.com/Daafip/eWaterCycle-DA@main
44 In:
from ewatercycle_DA import DA
setup Ensemble
16 In:
n_particles = 3
17 In:
ensemble = DA.Ensemble(N=n_particles)
ensemble.setup()
18 In:
setup_kwargs= {'sleepiness':1}
19 In:
# this initializes the models for all ensemble members.
ensemble.initialize(model_name="ParallelisationSleep",
forcing=None,
setup_kwargs=setup_kwargs)
Time current update step
20 In:
%%time
ensemble.update(assimilate=False)
CPU times: user 8.26 ms, sys: 2.75 ms, total: 11 ms
Wall time: 3.01 s
21 In:
%%time
for ensemble_member in ensemble.ensemble_list:
ensemble_member.update()
CPU times: user 7.51 ms, sys: 1.09 ms, total: 8.59 ms
Wall time: 3.01 s
using dask
62 In:
import dask
from dask import delayed
23 In:
@delayed
def update_member(ensemble, i):
ensemble.ensemble_list[i].update()
return ensemble.ensemble_list[i].get_value("sleep")
24 In:
@delayed
def gather(*args):
return list(args)
25 In:
gathered = gather(*[update_member(ensemble,i) for i in range(ensemble.N)])
26 In:
gathered.visualize()
26 Out:
27 In:
%%time
gathered.compute()
CPU times: user 8.86 ms, sys: 9.63 ms, total: 18.5 ms
Wall time: 1.02 s
27 Out:
[array([1.]), array([1.]), array([1.])]
28 In:
ensemble.finalize()
repeat with more particles & longer time
48 In:
n_particles = 30
49 In:
ensemble = DA.Ensemble(N=n_particles)
ensemble.setup()
50 In:
setup_kwargs= {'sleepiness':1}
51 In:
%%time
# starting up this many docker containers also is a thing but yeah:
# this initializes the models for all ensemble members.
ensemble.initialize(model_name="ParallelisationSleep",
forcing=None,
setup_kwargs=setup_kwargs)
CPU times: user 516 ms, sys: 208 ms, total: 725 ms
Wall time: 41.7 s
Time current update step
52 In:
%%time
ensemble.update(assimilate=False)
CPU times: user 63.1 ms, sys: 103 ms, total: 166 ms
Wall time: 30.1 s
using dask
53 In:
gathered = gather(*[update_member(ensemble,i) for i in range(ensemble.N)])
54 In:
gathered.visualize()
54 Out:
55 In:
%%time
_ = gathered.compute()
CPU times: user 48.8 ms, sys: 72.3 ms, total: 121 ms
Wall time: 3.05 s
can also specify amount of workers:
56 In:
import psutil
logical_n_psu = psutil.cpu_count(logical=True)
logical_n_psu
56 Out:
12
57 In:
%%time
_ = gathered.compute(num_workers=logical_n_psu)
CPU times: user 50.8 ms, sys: 30.9 ms, total: 81.8 ms
Wall time: 3.03 s
58 In:
%%time
_ = gathered.compute(num_workers= 2 * logical_n_psu)
CPU times: user 51 ms, sys: 27.6 ms, total: 78.6 ms
Wall time: 2.02 s
59 In:
%%time
_ = gathered.compute(num_workers= 4 * logical_n_psu)
CPU times: user 44.8 ms, sys: 25.3 ms, total: 70.1 ms
Wall time: 1.02 s
or
63 In:
%%time
with dask.config.set({"multiprocessing.context": "spawn", 'num_workers': 4 * logical_n_psu}):
_ = gathered.compute()
CPU times: user 38.1 ms, sys: 12.8 ms, total: 50.9 ms
Wall time: 1.02 s
Exluding the initialization: x10 speed up
64 In:
ensemble.finalize()