Source code for glompo.optimizers.scipy

import warnings
from multiprocessing import Event
from multiprocessing.connection import Connection
from queue import Queue
from typing import Callable, Sequence, Tuple

from scipy.optimize import basinhopping, differential_evolution, dual_annealing, minimize, shgo

from .baseoptimizer import BaseOptimizer, MinimizeResult

__all__ = ("ScipyOptimizeWrapper",)

AVAILABLE_OPTS = {'basinhopping': basinhopping,
                  'dual_annealing': dual_annealing,
                  'shgo': shgo,
                  'differential_evolution': differential_evolution}


[docs]class ScipyOptimizeWrapper(BaseOptimizer): """ Wrapper around :func:`scipy.optimize.minimize`, :func:`scipy.optimize.basinhopping`, :func:`scipy.optimize.differential_evolution`, :func:`scipy.optimize.shgo`, and :func:`scipy.optimize.dual_annealing`. .. warning:: This is quite a rough wrapper around SciPy's optimizers since the code is quite impenetrable to outside code, and callbacks do not function consistently. Therefore, most GloMPO functionality like checkpointing and information sharing is not available. Users are advised to try :class:`.Nevergrad` instead which offers an interface to the SciPy optimizers with full GloMPO functionality. .. attention:: **Must** be used with :attr:`.GloMPOManager.aggressive_kill` as :obj:`True`, this also implies that this optimizer can **only** be used with a multiprocessing backend; threads are incompatible. Parameters ---------- Inherited, _opt_id _signal_pipe _results_queue _pause_flag workers backend is_log_detailed See :class:`.BaseOptimizer`. method Accepts :code:`'basinhopping'`, :code:`'dual_annealing'`, :code:`'differential_evolution'`, and :code:`'shgo'` which will run the :mod:`scipy.optimize` function of the same name. Also accepts all the allowed methods to :func:`scipy.optimize.minimize`. """ def __init__(self, _opt_id: int = None, _signal_pipe: Connection = None, _results_queue: Queue = None, _pause_flag: Event = None, workers: int = 1, backend: str = 'processes', is_log_detailed: bool = False, method: str = 'Nelder-Mead'): super().__init__(_opt_id, _signal_pipe, _results_queue, _pause_flag, workers, backend, is_log_detailed) self.opt_name = method self.opt_meth = AVAILABLE_OPTS.get(self.opt_name, minimize)
[docs] def minimize(self, function: Callable[[Sequence[float]], float], x0: Sequence[float], bounds: Sequence[Tuple[float, float]], callbacks: Callable = None, **kwargs) -> MinimizeResult: warnings.filterwarnings('ignore', "Method .+ cannot handle constraints nor bounds.") general_opt = False def callback(*args, **kwargs): ret = None if callbacks: for cb in callbacks: r = cb(*args, **kwargs) if r is not None: ret = r return ret if self.opt_meth is not basinhopping: kwargs['bounds'] = bounds if self.opt_meth is minimize: kwargs['method'] = self.opt_name sp_result = self.opt_meth(function, x0=x0, callback=callback, **kwargs) try: sp_result = sp_result.lowest_optimization_result except AttributeError: pass if self._results_queue: self.message_manager(0, "Optimizer convergence") result = MinimizeResult() result.x = sp_result.x result.fx = sp_result.fun if general_opt: result.success = sp_result.success return result
def callstop(self, *args): pass