Source code for glompo.optimizers.cmawrapper

""" Implementation of CMA-ES as a GloMPO compatible optimizer.
        Adapted from:   SCM ParAMS
"""
import copy
import logging
import pickle
import warnings
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import Event, Queue
from multiprocessing.connection import Connection
from typing import Any, Callable, Optional, Sequence, Tuple

import cma
import numpy as np
from cma.restricted_gaussian_sampler import GaussVDSampler, GaussVkDSampler

from .baseoptimizer import BaseOptimizer, MinimizeResult

__all__ = ('CMAOptimizer',)


[docs]class CMAOptimizer(BaseOptimizer): """ Wrapper around a CMA-ES python implementation [c]_. Note that this class is also *stand-alone*, this means it can be used independently of the GloMPO framework. It is also built in such a way that it :meth:`minimize` can be called multiple times on different functions. Parameters ---------- Inherited, _opt_id _signal_pipe _results_queue _pause_flag workers backend is_log_detailed See :class:`.BaseOptimizer`. sampler Allows the use of :code:`'GaussVDSampler'` and :code:`'GaussVkDSampler'` settings. verbose If :obj:`True`, print status messages during the optimization, else no output will be printed. keep_files If :obj:`True` the files produced by CMA are retained otherwise they are not produced. force_injects If :obj:`True`, injections of parameter vectors into the solver will be exact, guaranteeing that that solution will be in the next iteration's population. If :obj:`False`, the injection will result in a direction relative nudge towards the vector. Forcing the injecting can limit global exploration but non-forced injections may have little effect. injection_frequency If :obj:`None`, injections are ignored by the optimizer. If an :obj:`int` is provided then injection are only accepted if at least `injection_frequency` iterations have passed since the last injection. **cmasettings `CMA-ES <http://cma.gforge.inria.fr/apidocs-pycma/>`_ package-specific settings. See :code:`cma.s.pprint(cma.CMAOptions())` for a list of available options. Most useful keys are: :code:`'timeout'`, :code:`'tolstagnation'`, :code:`'popsize'`. Notes ----- #. Although not the default, by adjusting the injection settings above, the optimizer will inject the saved incumbent solution into the solver influencing the points sampled by the following iteration. The incumbent begins at :code:`x0` and is updated by the inject method called by the GloMPO manager. #. If :code:`'popsize'` is not provided during optimizer initialisation, it will be set to the number of :attr:`~.BaseOptimizer.workers` if this is larger than 1, else it will be set to the default: :code:`4 + int(3 * log(d))`. """ def __init__(self, _opt_id: Optional[int] = None, _signal_pipe: Optional[Connection] = None, _results_queue: Optional[Queue] = None, _pause_flag: Optional[Event] = None, workers: int = 1, backend: str = 'threads', is_log_detailed: bool = False, sampler: str = 'full', verbose: bool = True, keep_files: bool = False, force_injects: Optional[bool] = None, injection_frequency: Optional[int] = None, **cmasettings): super().__init__(_opt_id, _signal_pipe, _results_queue, _pause_flag, workers, backend, is_log_detailed) self.verbose = verbose self.es = None self.result = None self.keep_files = keep_files self.cmasettings = cmasettings self.popsize = cmasettings['popsize'] if 'popsize' in cmasettings else None self.force_injects = force_injects self.injection_frequency = injection_frequency self.injection_counter = 0 # Sort all non-native CMA options into the custom cmaoptions key 'vv': customopts = {} for key, val in [*self.cmasettings.items()]: if key not in cma.CMAOptions().keys(): customopts[key] = val del self.cmasettings[key] self.cmasettings['vv'] = customopts self.cmasettings['verbose'] = -3 # Silence CMA Logger # Deactivated to not interfere with GloMPO hunting if 'maxiter' not in self.cmasettings: self.cmasettings['maxiter'] = float('inf') if sampler == 'vd': self.cmasettings = GaussVDSampler.extend_cma_options(self.cmasettings) elif sampler == 'vkd': self.cmasettings = GaussVkDSampler.extend_cma_options(self.cmasettings)
[docs] def minimize(self, function: Callable[[Sequence[float]], float], x0: Sequence[float], bounds: Sequence[Tuple[float, float]], callbacks: Callable[[], Optional[Any]] = None, sigma0: float = 0, **kwargs) -> MinimizeResult: """ Begin CMA-ES minimization loop. Parameters ---------- Inherited, function bounds callbacks See :meth:`.BaseOptimizer.minimize` x0 Initial mean of the multivariate normal distribution from which trials are drawn. Force injected into the solver to guarantee it is evaluated. sigma0 Initial standard deviation of the multivariate normal distribution from which trials are drawn. One value for all parameters which means that all parameters must be scaled accordingly. Default is zero which will not accepted by the optimizer, thus this argument must be provided. Returns ------- MinimizeResult Location, function value and other optimization information about the lowest value found by the optimizer. Raises ------ ValueError If `sigma0` is not changed from the default value of zero. """ task_settings = copy.deepcopy(self.cmasettings) if sigma0 <= 0: self.logger.critical('sigma0 value invalid. Please select a positive value.') raise ValueError('sigma0 value invalid. Please select a positive value.') if not self.popsize: if self.workers > 1: task_settings['popsize'] = self.workers else: task_settings['popsize'] = 4 + int(3 * np.log(len(x0))) self.popsize = task_settings['popsize'] if self.popsize < self.workers: warnings.warn(f"'popsize'={self.popsize} is less than 'workers'={self.workers}. " f"This is an inefficient use of resources") self.logger.warning("'popsize'=%d is less than 'workers'=%d. This is an inefficient use of resources", self.popsize, self.workers) if not self.is_restart: self.logger.info("Setting up fresh CMA") self.result = MinimizeResult() task_settings.update({'bounds': np.transpose(bounds).tolist()}) self.es = cma.CMAEvolutionStrategy(x0, sigma0, task_settings) self.es.inject([x0], force=True) self.logger.debug("Entering optimization loop") i = self.es.countiter x = None while not self.es.stop(): i += 1 self.logger.debug("Asking for parameter vectors") x = self.es.ask() self.logger.debug("Parameter vectors generated") fx = self._parallel_map(function, x) if len(x) != len(fx): self.logger.debug("Unfinished evaluation detected. Breaking out of loop") break if i == 1: self.incumbent = {'x': x0, 'fx': fx[0]} self.es.tell(x, fx) self.logger.debug("Told solutions") self.result.x, self.result.fx = self.es.result[:2] if self.result.fx == float('inf'): self.logger.warning("CMA iteration found no valid results." "fx = 'inf' and x = (first vector generated by es.ask())") self.result.x = x[0] self.logger.debug("Extracted x and fx from result") if self.verbose and i % 10 == 0 or i == 1: print(f"@ iter = {i} fx={self.result.fx:.2E} sigma={self.es.sigma:.3E}") if callbacks and callbacks(): self.callstop("Callbacks termination.") if self._results_queue: self.check_messages() self.logger.debug("Checked messages") self._pause_signal.wait() self.logger.debug("Passed pause test") self.logger.debug("callbacks called") if self.incumbent['fx'] < min(fx) and \ self.injection_frequency and i - self.injection_counter > self.injection_frequency: self.injection_counter = i self.es.inject([self.incumbent['x']], force=self.force_injects) print("Incumbent solution injected.") self.logger.debug("Exited optimization loop") self.result.x, self.result.fx = self.es.result[:2] self.result.success = np.isfinite(self.result.fx) and self.result.success if self.result.fx == float('inf'): self.logger.warning("CMA iteration found no valid results." "fx = 'inf' and x = (first vector generated by es.ask())") self.result.x = x[0] if self.verbose: print(f"Optimization terminated: success = {self.result.success}") print(f"Optimizer convergence {self.es.stop()}") print(f"Final fx={self.result.fx:.2E}") if self._results_queue: self.logger.debug("Messaging termination to manager.") self.message_manager(0, f"Optimizer convergence {self.es.stop()}") if self.es.stop() != "Checkpoint Shutdown": if self.keep_files: name = 'cma_' if self._opt_id: name += f'opt{self._opt_id}_' name += 'results.pkl' with open(name, 'wb') as file: self.logger.debug("Pickling results") pickle.dump(self.es.result, file) return self.result
def _parallel_map(self, function: Callable[[Sequence[float]], float], x: Sequence[Sequence[float]]) -> Sequence[float]: """ Returns the function evaluations for a given set of trial parameters, x. Calculations are distributed over threads or processes depending on the number of workers and backend selected. """ if self.workers > 1: pool_executor = ProcessPoolExecutor if self._backend == 'processes' else ThreadPoolExecutor self.logger.debug("Executing within %s with %d workers", pool_executor.__name__, self.workers) with pool_executor(max_workers=self.workers) as executor: submitted = {slot: executor.submit(function, parms) for slot, parms in enumerate(x)} # For very slow evaluations this will allow evaluations to be interrupted. if self._results_queue: loop = 0 for _ in as_completed(submitted.values()): loop += 1 self.logger.debug("Result %d/%d returned.", loop, len(x)) self._pause_signal.wait() self.check_messages() if self.es.callbackstop == 1: self.logger.debug("Stop command received during function evaluations.") cancelled = [future.cancel() for future in submitted.values()] if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Aborted %d calls.", sum(cancelled)) break fx = [future.result() for future in submitted.values() if not future.cancelled()] else: self.logger.debug("Executing serially") fx = [function(i) for i in x] return fx def callstop(self, reason: str = "Manager termination signal"): if reason and self.verbose: print(reason) self.logger.debug("Calling stop. Reason = %s", reason) self.es.callbackstop = 1 self.result.success = all([reason != cond for cond in ("GloMPO Crash", "Manager termination signal")])