Source code for glompo.optimizers.random

from multiprocessing import Event, Queue
from multiprocessing.connection import Connection
from typing import Callable, Sequence, Tuple

import numpy as np

from .baseoptimizer import BaseOptimizer, MinimizeResult

__all__ = ('RandomOptimizer',)


[docs]class RandomOptimizer(BaseOptimizer): """ Evaluates random points within the bounds for a fixed number of iterations. **Not** actually an optimizer. Intended for debugging. Parameters ---------- Inherited, _opt_id _signal_pipe _results_queue _pause_flag workers backend is_log_detailed See :class:`.BaseOptimizer`. iters Number of function evaluations the optimizer will execute before terminating. """ def __init__(self, _opt_id: int = None, _signal_pipe: Connection = None, _results_queue: Queue = None, _pause_flag: Event = None, workers: int = 1, backend: str = 'processes', is_log_detailed: bool = False, iters: int = 100): super().__init__(_opt_id, _signal_pipe, _results_queue, _pause_flag, workers, backend, is_log_detailed) self.max_iters = iters self.used_iters = 0 self.result = MinimizeResult() self.stop_called = False self.logger.debug("Setup optimizer") def minimize(self, function: Callable, x0: Sequence[float], bounds: Sequence[Tuple[float, float]], callbacks: Callable = None, **kwargs) -> MinimizeResult: best_f = self.result.fx if self.is_restart else np.inf while self.used_iters < self.max_iters: if self.stop_called: break self.used_iters += 1 vector = [] for bnd in bounds: vector.append(np.random.uniform(bnd[0], bnd[1])) vector = np.array(vector) self.logger.debug("Generated vector = %s", vector) self.logger.debug("Evaluating function.") fx = function(vector) self.logger.debug("Function returned fx = %f", fx) if callbacks and callbacks(): self.stop_called = True if self._results_queue: self.logger.debug("Checking messages") self.check_messages() self._pause_signal.wait() if fx < best_f: best_f = fx best_x = vector self.logger.debug("Updating best") self.result.x, self.result.fx = best_x, best_f self.result.success = True self.logger.debug("Termination successful") if self._results_queue: self.logger.debug("Messaging manager") self.message_manager(0, "Optimizer convergence") self.check_messages() self.logger.debug("Returning result") return self.result def callstop(self, reason: str = ""): self.stop_called = True