"""A collection of Evolutionary Multiobjective Optimization Algorithms.

The algorithm implementation is provided by the *evoalgos* package. This module
implements them for SWIM.

The algorithms are described here:

To run the algorithm either via SLURM (sbatch jobfiles) or python native
multiprocessing, import any of the algorithms in your file and
execute them as a project method. Refer to the docs of the __call__ method for
from __future__ import print_function, absolute_import
import random
import os.path as osp
import warnings
import collections
import datetime as dt

import numpy as np
import pandas as pd
from modelmanager.settings import parse_settings
from modelmanager.utils import propertyplugin
from modelmanager.plugins.pandas import ProjectOrRunData
from modelmanager.plugins.browser import try_again

from swimpy.plot import plot_objective_scatter, plot_function, plt

    from evoalgos import algo
    from evoalgos.individual import SBXIndividual
    from optproblems import Problem
except ImportError:
    raise ImportError('The %s module requires the evoalgos ' % __name__ +
                      'package. Try `pip install evoalgos`')

[docs]class _EvoalgosSwimProblem(Problem): '''The evoalgos.Problem to be run with any of the algorithms provided.''' #: Default algorithm algorithm = None #: Value to use if nan is returned by objective function nanval = 2e31 #: Attributes of individuals written to output output_attribute_columns = ['generation', 'id_number', 'clone', 'birthgeneration'] #: factor to multiply the average runtime for job time out estimation time_safty_factor = 3 plugin = ['__call__'] def __init__(self, project): self.project = project self.algorithm = self.algorithm or self.__class__.__name__ msg = ('Either set the algorithm class attribute or name the subclass ' 'after a valid evoalgos algorithm.') assert hasattr(algo, self.algorithm), msg self.clones = [] self.__call__.__func__.__doc__ = getattr(algo, self.algorithm).__doc__ # output interface self.project.settings(propertyplugin(optimization_populations)) return
[docs] @parse_settings def __call__(self, parameters=None, objectives=None, population_size=10, max_generations=10, output=None, restart=False, test=None, prefix=None, keep_clones=False, **kwargs): """Run the optimisation algorithm. Arguments --------- parameters : dict of {'name': (lower, upper)} Dictionary of parameter names and upper and lower boundaries as values (by default) or any other values to determine their initial value if the ``create_start_population`` method is overridden. This is only a keyword parameter to allow setting it via the settings file. objectives : list of strings or dict of strings Performance indicators passed to Only strings are accepted. Pass a dict to redefine names. population_size : int Population size the algorithm is run with. By default this is also the ``num_offspring`` argument to the algorithm. max_generation : int Maximum number of iterations the algorithm is run with. output : string path, optional Output .csv file with saved parameters and performances for each generation and individual (default: [<prefix>_]<algorithm>_populations.csv). test : bool | None Only run tests before running algorithm if True, dont run them if False and run them before running the algorithm if None (default). prefix : str, optional A prefix to use for run tags and project clones. Defaults to algorithm name. restart : bool Attempt to restart the algorithm if output exists. keep_clones : bool Do not remove the project clones when completed and run the final population in them. kwargs : Any overriding parameter parsed to the algorithm (for details see the algorithm descriptions at: Returns ------- <browser.Run> : A run instance with the mean objective values as indicators, the mean parameter values of the last generation and the output file attached. """ st = assert type(parameters) == dict self.parameters = collections.OrderedDict(sorted(parameters.items())) self.objectives, self.indicators = self._parse_objectives(objectives) self.prefix = prefix or self.algorithm do = prefix+'_'+self.algorithm if prefix else self.algorithm defout = osp.join(self.project.projectdir, do+'_populations.csv') self.output = output or defout self.restart = restart and osp.exists(self.output) # init problem Problem.__init__(self, lambda dummy: dummy, num_objectives=len(objectives), name=self.__class__.__name__) # set defaults self.population_size = kwargs.setdefault('population_size', population_size) self.max_generations = kwargs.setdefault("max_generations", max_generations) self.num_offspring = kwargs.setdefault("num_offspring", self.population_size) stpop = self.restart_population() if self.restart \ else self.create_start_population() self.start_population = kwargs.setdefault('start_population', stpop) # run tests if test is True (and exit) or None (and continue) self.evaltimes = [] self.max_run_time = None if test is not False and not self.restart: print('Testing single run...') tst = self.run_tests() self.evaltimes.append( self.max_run_time = self.evaltimes[0] if test is True: return # initialise algorithm self.ea = getattr(algo, self.algorithm)(self, **kwargs) # attach observer function self.ea.attach(self.observe_population) # set generation if restart if self.restart: self.ea.generation = self.read_populations().index.levels[0][-1] self.ea.remaining_generations = max_generations-self.ea.generation print('Restarting from generation %i' % self.ea.generation) else: # write initial population to file self.observe_population(self.ea, initial=True) # create clones self.clones = self._create_clones() # process # run final population again or remove if keep_clones: self.batch_evaluate(self.ea.population) else: for c in self.clones: self.project.clone[c].remove() # reset runIDs self.project.browser.runs.reset_ids() run = self._save_run() print('Elapsed time: %s hh:mm:ss' % ( return run
def _parse_objectives(self, objectives): assert type(objectives) in [list, dict] if type(objectives) == dict: assert all([type(v) == str for v in objectives.values()]) o, i = zip(*sorted(objectives.items())) o, i = list(o), list(i) else: o = sorted(objectives) i = sorted(objectives) assert all([type(k) == str for k in objectives]) return o, i def _save_run(self): # insert run pops = self.read_populations() indi = {pops.indicators[n]: {'mean_final_population': v} for n, v in pops.lastgen[pops.objectives].mean().items()} pars = [{'name': n, 'value': v, "tags": 'mean_final_population'} for n, v in pops.lastgen[pops.parameters].mean().items()] # unique tags tags = ' '.join(set([self.prefix, self.algorithm])) notes = ("population_size=%s, max_generations=%s" % (self.population_size, self.max_generations)) run = self.project.save_run(indicators=indi, files={tags: pops}, parameters=pars, tags=tags, notes=notes) return run
[docs] def batch_evaluate(self, individuals): """Evaluate/run a batch of individuals/projects in parallel. Arguments --------- individuals : list of <evoalgos.Individual>s """ # only run if there is anything to evaluate, not checked in evoalgos if not individuals: return st = # start swim runs pnames = self.parameters.keys() for i, ind in enumerate(individuals): clone = self.project.clone[self.clones[i]] ind.clonename = clone.clonename self.set_parameters(clone, dict(zip(pnames, ind.phenome))) del clone runs = self.batch_run() self.project.browser.settings.unset() self.project.browser.settings.setup() objective_values = self.retrieve_objectives(runs) mrt = max(runs.values_list('run_time', flat=True)) self.max_run_time = max(self.max_run_time or dt.timedelta(0), mrt) # delte all runs again runs.delete() # assign values to individuals for i in individuals: i.objective_values = objective_values[i.clonename] self.evaltimes += [] return
def batch_run(self): mrt = self.max_run_time rt = (int(round(mrt.total_seconds()*self.time_safty_factor/60. + 0.5)) if mrt else None) # process clones and wait for runs runs = self.project.cluster.run_parallel( self.clones, time=rt, indicators=self.indicators, parameters=False) return runs def _create_clones(self): cn = self.prefix+('_%'+'0%0ii' % len(str(self.population_size-1))) cnames = [] for i in range(self.population_size): name = cn % i self.project.clone(name) cnames.append(name) return cnames @try_again def retrieve_objectives(self, runs): """Get objective values from the browser database. Arguments --------- runs : browser.Run QuerySet """ obvals = {} for r in runs: t, clonename = r.tags.split() ri = r.indicators.all() vals = [] for i in self.indicators: rv = ri.filter(name=i) if len(rv) == 1: vals += [float(rv[0].value)] else: print(i+' for '+clonename+' returned %s.' % rv + 'Will set it to %s.' % self.nanval) vals += [self.nanval] obvals[clonename] = vals return obvals
[docs] def run_tests(self, quiet=False): """Execute a series of tests before running the algorithm. Tests: - creates clone - calls self.set_parameters - runs clone - checks if returned run contains the same number of indicators as self.indicators - checks if retrived_objectives return same number of values as self.objectives """ try: clone = self.project.clone(self.prefix+'__test', fresh=True) params0 = dict(zip(self.parameters.keys(), self.start_population[0].genome)) self.set_parameters(clone, params0) run =, quiet=quiet, tags='run_test '+clone.clonename, parameters=False) assert run.indicators.all().count() == len(self.indicators) runqset = clone.browser.runs.filter(tags__contains=clone.clonename) assert runqset.count() == 1 obj_vals = self.retrieve_objectives(runqset)[clone.clonename] assert len(obj_vals) == len(self.objectives) obj_str = ['%s=%s' % (k, v) for k, v in zip(self.objectives, obj_vals)] print('Test objective values:\n%s' % ('\n'.join(obj_str))) except Exception: raise finally: try: clone.remove() run.delete() except NameError: pass return
[docs] def create_start_population(self): """Create the initial population out of the parameter boundaries. Returns ------- list of evoalgos.individual.SBXIndividual s """ population = [] lo, up = zip(*self.parameters.values()) for _ in range(self.population_size): parameters = [random.uniform(l, u) for l, u in zip(lo, up)] kw = {"min_bounds": lo, "max_bounds": up, 'clonename': None} population.append(self.create_individual(parameters, **kw)) return population
[docs] def restart_population(self): """Read output and return the last generation as population.""" opop = self.read_populations() assert set(opop.parameters) == set(self.parameters), \ 'Restart parameters dont match parsed.' params = self.parameters.keys() # OrderedDict assert len(opop.last_generation) == self.population_size # using the old parameter ranges from file lo, up = zip(*[opop.parameter_ranges[i] for i in params]) population = [] for id, prs in opop.last_generation.iterrows(): parameters = prs[params].tolist() kw = {"min_bounds": lo, "max_bounds": up, 'id': id, 'clonename': prs['clone'], 'objective_values': prs[self.objectives].tolist(), 'date_of_birth': prs['birthgeneration']} population.append(self.create_individual(parameters, **kw)) return population
[docs] def create_individual(self, parameters, **kwargs): """Create a single evoalgos individual with parameters and bounds.""" indiv = SBXIndividual(genome=parameters) for k, v in kwargs.items(): setattr(indiv, k, v) return indiv
[docs] def set_parameters(self, clonedproject, parameters): """Default parameter setting in method for convenient overriding. Arguments --------- clonedproject : <swimpy.Project> Instance of the cloned project. parameters : dict Parameters to set. """ if bool(self.project.basin_parameters['subcatch']): warnings.warn('subcatch parameters are active.') clonedproject.basin_parameters(**parameters) return
[docs] def observe_population(self, ea, initial=False): """Evoalgos function to write the population to the output file. The function only takes the algorithm instance as first argument and doesnt return anything. """ # columns to write out pars = ['parameter:%s:%r' % kv for kv in self.parameters.items()] objs = ['objective:%s:%s' % oi for oi in zip(self.objectives, self.indicators)] cols = self.output_attribute_columns + objs + pars # collect population info popinfolist = [] for i in self.ea.population: iline = [self.ea.generation+(0 if initial else 1), i.id_number, i.clonename, i.date_of_birth] iline += list(i.objective_values or [None]*len(self.objectives)) iline += list(i.genome) popinfolist.append(iline) # make dataframe and write out popframe = pd.DataFrame(popinfolist, columns=cols) with open(self.output, 'w' if initial else 'a') as f: popframe.to_csv(f, header=initial, index=False) # report stats if not initial: obj_stats = popframe[objs].describe().T[['50%', 'min']] mt = self.mean_generation_time() rt = self.max_run_time mg = mt*(self.max_generations-self.ea.generation-1) ovstr = ['%s: %3.6f %3.6f' % (o, i[0], i[1]) for o, i in zip(self.objectives, obj_stats.values)] msg = ('Generation %s completed in %s, mean generation time %s, ' + 'max run time %s, max_generations in ~%s hh:mm:ss\n' + 'Objectives (median, min):\n' + '\n'.join(ovstr)) print(msg % (self.ea.generation+1, self.evaltimes[-1], mt, rt, mg)) return
[docs] def mean_generation_time(self): """Calculate the average time taken for a generation to finish. Returns ------- datetime.timedelta """ # giving dt.timedelta(0) as the start value makes sum work on tds nt = max(len(self.evaltimes), 1) return sum(self.evaltimes, dt.timedelta(0)) / nt
def read_populations(self, filepath=None): assert filepath or self.output path = filepath or self.output return optimization_populations(self.project).from_path(path)
class SMSEMOA(_EvoalgosSwimProblem): pass class CommaEA(_EvoalgosSwimProblem): pass class CMSAES(_EvoalgosSwimProblem): pass class NSGA2b(_EvoalgosSwimProblem): pass class optimization_populations(ProjectOrRunData): """Dataframe to handle successive populations from a genetic optimisation. Note ---- Class of propertyplugin that will be added to settings by the algorithm plugin. It enables reading population files via:: project.optimization_populations.from_path(path) run.optimization_populations # if the run was saved with algorithm project.algorithm.read_populations # with_EvoalgosSwimProblem subclass """ path = None index_col = (0, 1) _metadata = ['parameters', 'parameter_ranges', 'objectives', 'indicators'] plugin = ['plot_generation_objectives', 'plot_objective_scatter', 'plot_parameter_distribution', 'best_tradeoff'] def from_csv(self, path, **readkw): """Read csv file and interpret objectives and parameter ranges from columns. """ df = pd.read_csv(path, index_col=self.index_col, **readkw) # interpret columns cols = [] self.parameters = [] self.parameter_ranges = {} self.objectives = [] self.indicators = {} for c in df.columns: pr = c.split(':') if len(pr) >= 3: c = pr[1] if pr[0] == 'parameter': self.parameters.append(c) self.parameter_ranges[c] = eval(pr[2]) elif pr[0] == 'objective': self.objectives.append(c) self.indicators[c] = pr[2] cols.append(c) df.columns = cols return df from_project = from_csv def to_csv(self, *args, **kwargs): """Put parameter ranges back into columns and save as csv. """ df = self.copy() cols = [] for c in df.columns: if c in self.parameters: c = 'parameter:'+c+':%r' % (self.parameter_ranges[c],) elif c in self.objectives: c = 'objective:'+c+':%s' % self.indicators[c] cols.append(c) df.columns = cols df.to_csv(*args, **kwargs) return def to_run(self, run, tags=''): """Save with run as compressed csv. """ tags = (tags+' ' if tags else '')+'optimization_populations' fn = osp.basename(osp.splitext(self.path)[0])+'.csv.gzip' tmpf = osp.join(self.project.browser.settings.tmpfilesdir, fn) self.to_csv(tmpf, compression='gzip') f = self.project.browser.insert('file', run=run, tags=tags, file=tmpf) return f @property def last_generation(self): return self.loc[max(self.index.levels[0])] # alias lastgen = last_generation @plot_function def plot_generation_objectives(self, ax=None, output=None, **kw): """Show the median (min-max) objective values over all generations. """ genperf = self[self.objectives].groupby(axis=0, level=0) axs = genperf.median().plot(subplots=True, ax=ax, legend=False, rot=0, sharex=True, title=self.objectives) for a, col in zip(axs, self.objectives): color = a.get_lines()[0].get_color() a.fill_between(genperf.groups.keys(), genperf.min()[col], genperf.max()[col], color=color, alpha=0.15) plt.tight_layout() return axs @plot_function def plot_objective_scatter(self, generation=None, best=None, selected=None, selected_color='r', ax=None, runs=None, output=None, **scatterkwargs): """Plot all objectives against each other in a stepped subplot. Arguments --------- generation : int, optional The generation to plot objectives from. Default: last. best : bool | min. objectives Highlight the best tradeoff solution. Takes precendence over selected. selected : dict-like Highlight selected point(s). selected_color : matplotlib.color spec | str Color for the selected points. scatterkwargs : Any keyword passed onto the scatter function. """ # get objectives to plot gen = self.loc[generation] if generation else self.lastgen if best is not None: selected = self.best_tradeoff(best) ax = plot_objective_scatter(gen[self.objectives], selected=selected, ax=ax, **scatterkwargs) return ax @plot_function def plot_parameter_distribution(self, parameters=None, generation=None, runs=None, ax=None, output=None, **histkw): '''Plot parameter distribution histograms of all parameters. Arguments --------- parameters : list Limit what parameters to show. Default: all generation : int, optional Generation to plot. Default: last histkw : Any keyword parsed to the plt.hist call (excpt for ``range``) and the subsequent Patch. A useful keyword for multiple calls is ``histtype='step'`` which is the default if runs are parsed. ''' gen = self.loc[generation] if generation else self.lastgen parameters = parameters or self.parameters # setup nice plots sq = np.sqrt(len(parameters)) ncols = int(sq) nrows = ncols + (1 if sq-ncols else 0) if ax: f = ax.get_figure() axs = f.get_axes() if len(axs) == ncols*nrows: ax = np.array(axs) else: f.clear() ax = None else: f = plt.figure() if ax is None: ax = f.subplots(nrows, ncols, squeeze=False, sharey=True).flatten() histkw.setdefault("bins", 10) if runs: histkw.setdefault('histtype', 'step') hdls = [] for a, par in zip(ax, parameters): bars = a.hist(gen[par], range=self.parameter_ranges[par], **histkw) hdls.append(bars) a.set_title(par) a.set_xlim(*self.parameter_ranges[par]) ax[0].set_ylabel('N runs') # remove not needed axes for i in range(len(parameters), ncols*nrows): ax[i].set_axis_off() plt.tight_layout() return hdls def best_tradeoff(self, minobjectives=None): '''Select from last generation the parameter with the shortest distance to the scaled Pareto front to the origin. The front is either scaled by the max of each objective dimension or by minobjectives, which should be a dictionary of objectives to take account for.''' if minobjectives is not None: minobjectives = pd.Series(minobjectives) else: # scale by objective maximum minobjectives = self.lastgen[self.objectives].max() scobs = self.lastgen[self.objectives] / minobjectives # distance to origin dist = np.sqrt((scobs**2).sum(1)) # store best parameter set with the lowest distance best = self.lastgen.loc[dist.idxmin()] return best def select_min_objectives(self, minobjectives=None, **minobjkwargs): if minobjectives is not None: msg = 'minobjectives must be the same length as objectives' assert len(minobjectives) == len(self.objectives), msg minobjkwargs.update(dict(zip(self.objectives, minobjectives))) lg = self.lastgen.copy() for o, v in minobjkwargs.items(): lg = lg[lg[o] < v] return lg
