Source code for glompo.optimizers.gfls

from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, ThreadPoolExecutor, wait
from functools import partial
from multiprocessing import Event
from multiprocessing.connection import Connection
from queue import Queue
from typing import Callable, Optional, Sequence, Tuple

import numpy as np
from optsam import GFLS, Hook, Logger, Reporter

from .baseoptimizer import BaseOptimizer, MinimizeResult


def _function_pack(ask, func):
    _x, _is_constrained = ask()
    _fx, _resids = func.detailed_call(_x)[:2]
    return _x, _is_constrained, _fx, _resids


[docs]class GFLSOptimizer(BaseOptimizer): """ Wrapper around the :class:`!optsam.GFLS` algorithm [d]_. Note that this class is also *stand-alone*, this means it can be used independently of the GloMPO framework. Parameters ---------- Inherited, _opt_id _signal_pipe _results_queue _pause_flag workers backend is_log_detailed See :class:`.BaseOptimizer`. logger If :obj:`True` a :class:`!optsam.Logger` will be run along with the optimization and saved after the minimization. verbose If :obj:`True` an :class:`!optsam.Reporter` will be run along with the optimisation to print progress in realtime. other_hooks Any extra :class:`!optsam.Hook` instances which should be included. **gfls_algo_kwargs Keyword arguments for the optsam GFLS class. If :obj:`None`, the default arguments are used: ==================== =============== Setting Default ==================== =============== :code:`tr_max` :code:`0.5` :code:`xtol` :code:`1e-3` :code:`ftol` :code:`1e-7` :code:`constraints` :code:`()` :code:`tr_min` :code:`None` :code:`tr_scale` :code:`0.9` :code:`noise_scale` :code:`0.1` :code:`pop_size` :code:`None` :code:`diis_mode` :code:`"qrsvd"` :code:`seed` :code:`None` ==================== =============== Notes ----- :class:`GFLSOptimizer` requires residuals (differences between a training set and evaluated values) to work. Thus, it cannot be used on all global optimization cases. To ensure compatibility and allow simultaneous use of multiple optimizer types, :class:`GFLSOptimizer` will automatically use :meth:`~.BaseFunction.detailed_call` when evaluating the function. It is assumed that the first element of the return is the total error and the second element is the list of residuals. Other returns are ignored. """ def __init__(self, _opt_id: Optional[int] = None, _signal_pipe: Optional[Connection] = None, _results_queue: Optional[Queue] = None, _pause_flag: Optional[Event] = None, workers: int = 1, backend: str = 'threads', is_log_detailed: bool = False, logger: bool = False, verbose: bool = False, other_hooks: Optional[Sequence[Hook]] = None, **gfls_algo_kwargs): super().__init__(_opt_id, _signal_pipe, _results_queue, _pause_flag, workers, backend, is_log_detailed) self.gfls = None self.result = None self.stopcond = None self.algo_kwargs = gfls_algo_kwargs if gfls_algo_kwargs else {} if 'tr_max' not in self.algo_kwargs: self.algo_kwargs['tr_max'] = 0.5 self.hooks = list(other_hooks) if other_hooks else [] if logger: self.hooks.append(Logger()) if verbose: self.hooks.append(Reporter()) # Used to manage async evaluation self._pool_evaluator = None self._futures = set() self._wrapped_func = None def minimize(self, function, x0: Sequence[float], bounds: Sequence[Tuple[float, float]], callbacks: Callable = None, **kwargs) -> MinimizeResult: self._wrapped_func = partial(_function_pack, func=function) if not self.is_restart: self.logger.info("Setting up fresh GFLS") self.algo_kwargs['bounds'] = bounds self.gfls = GFLS(np.array(x0, dtype=float), **self.algo_kwargs) self.result = MinimizeResult() for hook in self.hooks: hook.before_start(self.gfls) else: self._pool_evaluator = None self._futures = set() # Open executor pool if self.workers > 1: self._pool_evaluator = ProcessPoolExecutor if self._backend == 'processes' else ThreadPoolExecutor self._pool_evaluator = self._pool_evaluator(max_workers=self.workers) self.logger.debug("Entering optimization loop") while not self.stopcond: x, is_constrained, _, resids = self.get_evaluation() self.gfls.tell(np.array(x), is_constrained, np.array(resids)) for hook in self.hooks: new_stopcond = hook.after_tell(self.gfls, self.stopcond) if new_stopcond: self.stopcond = new_stopcond if callbacks: new_stopcond = callbacks() if new_stopcond: self.stopcond = new_stopcond if self._results_queue: self.check_messages() self._pause_signal.wait() self.logger.debug("Exited optimization loop") for hook in self.hooks: hook.after_stop(self.gfls, self.stopcond) if self._pool_evaluator: self._pool_evaluator.shutdown() if self._results_queue: self.logger.debug("Messaging termination to manager.") self.message_manager(0, f"Optimizer convergence {self.stopcond}") return self.result
[docs] def get_evaluation(self) -> Tuple[Sequence[float], bool, float, Sequence[float]]: """ Returns a parameter vector and its evaluation. Depending on the configuration of the optimizer this can be a simple serial evaluation or retrieving from a list of completed evaluations from a pool of asynchronous parallel evaluations. """ if self.workers > 1: while len(self._futures) < self.workers: future = self._pool_evaluator.submit(self._wrapped_func, self.gfls.ask()) self._futures.add(future) self.gfls.state["ncall"] += 1 done, not_done = wait(self._futures, return_when=FIRST_COMPLETED) result = done.pop().result() self._futures = done | not_done # To prevent conditions where an evaluation may be accidentally dropped return result self.gfls.state["ncall"] += 1 return self._wrapped_func(self.gfls.ask())
def callstop(self, reason: str = "Manager termination signal"): self.logger.debug("Calling stop. Reason = %s", reason) self.stopcond = reason self.result.success = all([reason != cond for cond in ("GloMPO Crash", "Manager termination signal")])