本文主要介绍关于python,pymoo的知识点,对【Pymoo学习 (7):并行化Parallelization】和【python实现并行计算】有兴趣的朋友可以看下由【因吉】投稿的技术文章,希望该技术和经验能帮到你解决你所遇的编程实战之Python相关技术问题。
在实际中,并行化可以显着提升优化的效率。对于基于Population的算法,可以通过并行化评估本身,实现对一组解决方案的评估。
2 向量化矩阵运算一种方法是使用Numpy矩阵运算,它已用于几乎所有在Pymoo中实现的测试问题。默认情况下,elementwise_evaluation设置为False,这意味着_evaluate检索一组解决方案。 因此,输入矩阵 x x x的每一行是一个个体,每一列是一个变量:
import numpy as np
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize
class MyProblem(Problem):
def __init__(self, **kwargs):
super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)
def _evaluate(self, x, out, *args, **kwargs):
out["F"] = np.sum(x ** 2, axis=1)
res = minimize(MyProblem(), GA())
print('Threads:', res.exec_time)
输出如下:
Threads: 1.416006326675415
3 Starmap接口
Starmap由Python标准库multiprocessing.Pool.starmap提供,可以方便的进行并行化。此时需要设置elementwise_evaluation=True,意味着每一次调用_evaluate只评估一个方案。
3.1 线程import numpy as np
from pymoo.core.problem import Problem
from pymoo.core.problem import starmap_parallelized_eval
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize
from multiprocessing.pool import ThreadPool
class MyProblem(Problem):
def __init__(self, **kwargs):
super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)
def _evaluate(self, x, out, *args, **kwargs):
out["F"] = np.sum(x ** 2, axis=1)
n_threads = 8
pool = ThreadPool(n_threads)
problem = MyProblem(runner=pool.starmap, func_eval=starmap_parallelized_eval)
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Threads:', res.exec_time)
输出如下:
Threads: 0.5501224994659424
3.2 进程
import multiprocessing
n_proccess = 8
pool = multiprocessing.Pool(n_proccess)
problem = MyProblem(runner=pool.starmap, func_eval=starmap_parallelized_eval)
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Processes:', res.exec_time)
输出如下:
Processes: 1.1640357971191406
3.3 Dask
更高级的方法是将评估函数分配给几个worker。在Pymoo中推荐使用Dask。
注:可能需要安装以下库:
pip install dask distributed
代码如下:
import numpy as np
from dask.distributed import Client
from pymoo.core.problem import dask_parallelized_eval
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize
class MyProblem(Problem):
def __init__(self, **kwargs):
super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)
def _evaluate(self, x, out, *args, **kwargs):
out["F"] = np.sum(x ** 2, axis=1)
if __name__ == '__main__':
client = Client()
client.restart()
print("STARTED")
client = Client()
problem = MyProblem(runner=client, func_eval=dask_parallelized_eval)
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Dask:', res.exec_time)
输出如下:
STARTED
Dask: 1.30446195602417
4 个性化并行 4.1 线程
import numpy as np
from multiprocessing.pool import ThreadPool
from pymoo.core.problem import Problem
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.optimize import minimize
class MyProblem(Problem):
def __init__(self, **kwargs):
super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5, **kwargs)
def _evaluate(self, X, out, *args, **kwargs):
def my_eval(x):
return (x ** 2).sum()
params = [[X[k]] for k in range(len(X))]
F = pool.starmap(my_eval, params)
out["F"] = np.array(F)
if __name__ == '__main__':
pool = ThreadPool(8)
problem = MyProblem()
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Threads:', res.exec_time)
输出如下:
Threads: 1.0212376117706299
4.2 Dask
import numpy as np
from dask.distributed import Client
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.core.problem import Problem
from pymoo.optimize import minimize
class MyProblem(Problem):
def __init__(self, *args, **kwargs):
super().__init__(n_var=10, n_obj=1, n_constr=0, xl=-5, xu=5,
elementwise_evaluation=False, *args, **kwargs)
def _evaluate(self, X, out, *args, **kwargs):
def fun(x):
return np.sum(x ** 2)
jobs = [client.submit(fun, x) for x in X]
out["F"] = np.row_stack([job.result() for job in jobs])
if __name__ == '__main__':
client = Client(processes=False)
problem = MyProblem()
res = minimize(problem, PSO(), seed=1, n_gen=100)
print('Dask:', res.exec_time)
client.close()
输出如下:
Dask: 19.102460861206055
参考文献
【1】https://pymoo.org/problems/parallelization.html
【2】https://blog.csdn.net/u013066730/article/details/105821888
本文《Pymoo学习 (7):并行化Parallelization》版权归因吉所有,引用Pymoo学习 (7):并行化Parallelization需遵循CC 4.0 BY-SA版权协议。
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。