Pymoo学习 (7):并行化Parallelization:python实现并行计算


本文主要介绍关于python,pymoo的知识点,对【Pymoo学习 (7):并行化Parallelization】和【python实现并行计算】有兴趣的朋友可以看下由【因吉】投稿的技术文章,希望该技术和经验能帮到你解决你所遇的编程实战之Python相关技术问题。

python实现并行计算

文章目录 1 引入2 向量化矩阵运算3 Starmap接口3.1 线程3.2 进程3.3 Dask 4 个性化并行4.1 线程 4.2 Dask参考文献

1 引入

  在实际中,并行化可以显着提升优化的效率。对于基于Population的算法,可以通过并行化评估本身,实现对一组解决方案的评估。

2 向量化矩阵运算

  一种方法是使用Numpy矩阵运算,它已用于几乎所有在Pymoo中实现的测试问题。默认情况下,elementwise_evaluation设置为False,这意味着_evaluate检索一组解决方案。 因此,输入矩阵 x x x的每一行是一个个体,每一列是一个变量:

import numpy as np
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize


class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
        out["F"] = np.sum(x ** 2, axis=1)


res = minimize(MyProblem(), GA())
print('Threads:', res.exec_time)

  输出如下:

Threads: 1.416006326675415
3 Starmap接口

  Starmap由Python标准库multiprocessing.Pool.starmap提供,可以方便的进行并行化。此时需要设置elementwise_evaluation=True,意味着每一次调用_evaluate只评估一个方案。

3.1 线程
import numpy as np
from pymoo.core.problem import Problem
from pymoo.core.problem import starmap_parallelized_eval
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize
from multiprocessing.pool import ThreadPool


class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
        out["F"] = np.sum(x ** 2, axis=1)


n_threads = 8
pool = ThreadPool(n_threads)
problem = MyProblem(runner=pool.starmap, func_eval=starmap_parallelized_eval)
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Threads:', res.exec_time)

  输出如下:

Threads: 0.5501224994659424
3.2 进程
import multiprocessing


n_proccess = 8
pool = multiprocessing.Pool(n_proccess)
problem = MyProblem(runner=pool.starmap, func_eval=starmap_parallelized_eval)
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Processes:', res.exec_time)

  输出如下:

Processes: 1.1640357971191406
3.3 Dask

  更高级的方法是将评估函数分配给几个worker。在Pymoo中推荐使用Dask。
  注:可能需要安装以下库:

pip install dask distributed

  代码如下:

import numpy as np
from dask.distributed import Client
from pymoo.core.problem import dask_parallelized_eval
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize


class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
        out["F"] = np.sum(x ** 2, axis=1)


if __name__ == '__main__':
    client = Client()
    client.restart()
    print("STARTED")

    client = Client()
    problem = MyProblem(runner=client, func_eval=dask_parallelized_eval)

    res = minimize(problem, PSO(), seed=1, n_gen=100)
    print('Dask:', res.exec_time)

  输出如下:

STARTED
Dask: 1.30446195602417
4 个性化并行 4.1 线程
import numpy as np
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize


class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):

        def my_eval(x):
            return (x ** 2).sum()

        params = [[X[k]] for k in range(len(X))]
        F = pool.starmap(my_eval, params)
        out["F"] = np.array(F)


if __name__ == '__main__':
    pool = ThreadPool(8)
    problem = MyProblem()
    res = minimize(problem, PSO(), seed=1, n_gen=100)
    print('Threads:', res.exec_time)

  输出如下:

Threads: 1.0212376117706299
4.2 Dask
import numpy as np
from dask.distributed import Client
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.core.problem import Problem
from pymoo.optimize import minimize


class MyProblem(Problem):

    def __init__(self, *args, **kwargs):
        super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
                         elementwise_evaluation=False, *args, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        def fun(x):
            return np.sum(x ** 2)

        jobs = [client.submit(fun, x) for x in X]
        out["F"] = np.row_stack([job.result() for job in jobs])


if __name__ == '__main__':
    client = Client(processes=False)
    problem = MyProblem()
    res = minimize(problem, PSO(), seed=1, n_gen=100)
    print('Dask:', res.exec_time)
    client.close()

  输出如下:

Dask: 19.102460861206055
参考文献

【1】https://pymoo.org/problems/parallelization.html
【2】https://blog.csdn.net/u013066730/article/details/105821888

本文《Pymoo学习 (7):并行化Parallelization》版权归因吉所有,引用Pymoo学习 (7):并行化Parallelization需遵循CC 4.0 BY-SA版权协议。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
  © 2014-2022 ITdaan.com 联系我们: