Source code for homcloud.interface.parallel_volume_computation
from concurrent.futures import ProcessPoolExecutor
def _parallel_task(task_params):
import homcloud.interface as hc
methodname, nth, volume_params = task_params
pair = shared_pd.pair(nth)
volume_params["return_failure_if_not_found"] = True
return getattr(pair, methodname)(**volume_params).dump_to_dict()
def _worker_init(path, degree):
import homcloud.interface as hc
global shared_pd
shared_pd = hc.PDList(path).dth_diagram(degree)
[docs]
def parallel_volume_computation(optimal_or_stable, pd, pairs_and_params, max_workers=None):
"""
Compute optimal / stable volume parallely.
Summary of benchmark result: 3x faster with max_workers=8 for 200 birth-death pairs.
Args:
optimal_or_stable (str): "optimal_volume" or "stable_volume"
pd (:class:`PD`): The diagram used in the volume computation
pairs_and_params (list[tuple[:class:`Pair`, dict]]): The list of tuples of
(pair, volume computation parameter). The computation parameter should be given
by dict whose keys are parameter names.
max_workers (Option[int]): The maximum number of concurrent computation workers.
This parameter is passed to concurrent.futures.ProcessPoolExecutor.
See the python's official documentation of concurrent.futures.
Returns:
list[VolumeFailure | OptimalVolume | StableVolume]: The list of volumes.
VolumeFailure object is used to represent the failure of a computation.
Remarks:
The `save_to` option is required when persistence diagrams are computed if you want
to use this functionality.
If not, this function raises an ValueError exception.
This function is not thread safe. Do not call this function parallely.
Do not call this function under multi-thread environment.
Example:
>>> import homcloud.interface as hc
>>>
>>> hc.PDList(hc.example_data("tetrahedron"), save_boundary_map=True, save_to="tetrahedron.pdgm")
>>> pd1 = hc.PDList("tetrahedron.pdgm").dth_diagram(1)
>>> # Computes all stable volumes of pd1's pairs parallely with eight workers.
>>> results = hc.parallel_volume_computation(
>>> "stable_volume", pd1, [(pair, {"threshold": 0.01}) for pair in pd1.pairs()], 8
>>> )
`results` has all stable volumes.
"""
from homcloud.interface.optimal_volume import Volume
assert optimal_or_stable in ["optimal_volume", "stable_volume"]
if pd.path is None:
raise ValueError("save_to must be given for parallel computation when PDs are computed")
import homcloud.interface as hc
with ProcessPoolExecutor(
max_workers=max_workers,
initializer=_worker_init,
initargs=(pd.path, pd.degree),
) as executor:
results = executor.map(
_parallel_task,
[(optimal_or_stable, pair.nth, param) for (pair, param) in pairs_and_params],
)
return [Volume.restore_from_dict(pd, result) for result in results]