#!/usr/bin/env python
# coding: utf-8
#
# Copyright © 2016-2020 - Rennes Physics Institute
#
# This file is part of msspec.
#
# msspec is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# msspec is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this msspec. If not, see <http://www.gnu.org/licenses/>.
#
# Source file : src/msspec/looper.py
# Last modified: Thu, 27 Feb 2025 16:33:09 +0100
# Committed by : Sylvain Tricot <sylvain.tricot@univ-rennes.fr>
from collections import OrderedDict
from functools import partial
import itertools
import logging
import multiprocessing as mp
import numpy as np
import pandas as pd
import time
logger = logging.getLogger(__name__)
[docs]
class Variable:
def __init__(self, name, doc=""):
self.name = name
self.doc = doc
def __repr__(self):
return "<Variable(\'{}\')>".format(self.name)
[docs]
class Sweep:
def __init__(self, key, comments="", unit=None,
values=None,
start=None, stop=None, step=None,
num=10, scale='lin', base=10,
default=None,
folded=False,
unzip=False,
group=None,
accumulations=1,
linked_to=None):
self.key = key # The variable's name
self.comments = comments
self.default = default
self.folded = folded
self.unzip = unzip
self.linked_to = linked_to
self.group = None
# First use case: values are specidied
if values is not None:
self.values = values
else:
assert start is not None and stop is not None
self.start = start
self.stop = stop
if step is not None:
self.step = step
self.values = np.arange(start, stop, step)
else:
self.num = num
if scale == 'lin':
self.values = np.linspace(start, stop, num)
elif scale == 'log':
self.base = base
self.values = np.logspace(start, stop, num, base=base)
def __getitem__(self, index):
return (self.key, self.values[index])
def __len__(self):
return len(self.values)
def __repr__(self):
return "<{}(\'{}\')>".format(self.__class__.__name__, self.key)
@property
def unfolded(self):
return not(self.folded)
[docs]
class SweepRange:
def __init__(self, *sweeps):
self.sweeps = sweeps
self.index = 0
# First check that sweeps that are linked to another on are all included
links = {}
for sweep in sweeps:
if sweep.linked_to is not None:
if sweep.linked_to not in sweeps:
raise NameError(("The sweep \'{}\' is linked to \'{}\' "
"but is not included in the loop!").format(
sweep.key, sweep.linked_to.key))
# add the linked sweep to the list of links
if sweep.linked_to not in links.keys():
links[sweep.linked_to] = [sweep,]
else:
links[sweep.linked_to].append(sweep)
# The cumulative product of lengths
lengths = []
for sweep in sweeps:
if sweep.linked_to is None:
lengths.append(len(sweep) if sweep.unfolded else 1)
else:
lengths.append(1)
cn = np.cumprod(lengths)
# Get the total number of combinations is the last one
ntot = cn[-1]
self.links = links
self.cn = cn
self.ntot = ntot
def __len__(self):
return self.ntot
def __iter__(self):
self.index = 0
return self
def __next__(self):
if self.index < self.ntot:
i = self.index
self.index += 1
row = OrderedDict({k: None for k in self.columns})
for isweep, sweep in enumerate(self.sweeps):
# If sweep is linked to another one, do nothing as its value
# will be added by its parent
if sweep.linked_to is not None:
continue
# Compute the index
if sweep.folded:
key, value = sweep.key, sweep.values
row[key] = value
else:
idx = int(i/(self.ntot/self.cn[isweep])) % len(sweep)
# If this sweep has links, add also all values from its
# children
children = self.links.get(sweep, [])
for s in [sweep,] + children:
key, value = s[idx]
row[key] = value
row['sweep_index'] = i
return row
else:
raise StopIteration
@property
def columns(self):
cols = ['sweep_index']
cols += [sweep.key for sweep in self.sweeps]
return cols
@property
def values(self):
return [list(row.values()) for row in self]
@property
def items(self):
items = []
for row in self:
items.append(row)
return items
[docs]
class Looper:
def __init__(self):
self.data = None
@property
def pipeline(self):
try:
return self._pipeline
except AttributeError as error:
return None
@pipeline.setter
def pipeline(self, value):
self._pipeline = value
def _wrapper(self, x):
logger.debug("Pipeline called with {}".format(x))
return self.pipeline(**x)
[docs]
def run(self, *sweeps, ncpu=1, **kwargs):
logger.info("Loop starts...")
# prepare the list of inputs
sr = SweepRange(*sweeps)
items = sr.items
data = []
t0 = time.time()
if ncpu == 1:
# serial processing...
logger.info("serial processing...")
for i, values in enumerate(items):
values.update(kwargs)
result = self._wrapper(values)
data.append(result)
else:
# Parallel processing...
chunksize = 1 #int(nsets/ncpu)
[values.update(kwargs) for values in items]
logger.info(("Parallel processing over {:d} cpu (chunksize={:d})..."
"").format(ncpu, chunksize))
t0 = time.time()
pool = mp.Pool(processes=ncpu)
data = pool.map(self._wrapper, items, chunksize=chunksize)
pool.close()
pool.join()
t1 = time.time()
dt = t1 - t0
logger.info(("Processed {:d} sets of inputs in {:.3f} seconds"
"").format(len(sr), dt))
# Create the DataFrame
dfdata = []
columns = sr.columns + list(kwargs.keys()) + ['output',]
for i in range(len(sr)):
row = list(items[i].values())
row.append(data[i])
dfdata.append(row)
df = pd.DataFrame(dfdata, columns=columns)
df = df.drop(columns=['sweep_index'])
self.data = df
#return df
# return a list of [x0, y0, x1, y1,...xn, yn] and a list
# of corresponding dict of parameters {'keyA': [val0,...valn],
# 'keyB': [val0,...valn], ...}
# all_xy = []
# for irow, row in df.iterrows():
# all_xy.append(row.output[0])
# all_xy.append(row.output[1])
# parameters = df.to_dict()
# parameters.pop('output')
return self.data #all_xy, parameters
if __name__ == "__main__":
import numpy as np
import time
import logging
logging.basicConfig(level=logging.DEBUG)
def bar(**kwargs):
i = kwargs.get('sweep_index')
return np.linspace(0,i,10)
theta = Sweep(key='theta', comments="The polar angle",
start=-70, stop=70, num=3,
unit='degree',
folded=True)
phi = Sweep('phi', comments="The azimutal angle",
values=[0, 45],
unit='degree',
folded=True)
emitter_plane = Sweep('emitter_plane', comments="The emitter plane",
start=0, stop=3, step=1)
emitter = Sweep(key='emitter', values=('Ti', 'Sr'))
levels = Sweep(key='level', values=('2p', '3d'), linked_to=emitter)
lmaxt = Sweep(key='lmaxt', values=(25, 29, 30))#, linked_to=emitter)
uij = Sweep(key='uij', values=(0.01, 0.02, 0.03))#, linked_to=lmaxt)
sweeps = [theta, phi, emitter_plane, emitter, levels, lmaxt, uij]
looper = Looper()
looper.pipeline = bar
other_kws = {'un':1, 'deux':2}
data = looper.run(emitter, emitter_plane, uij, theta, levels, ncpu=4, **other_kws)
# Print the dataframe
print(data)
# Accessing the parameters and ouput values for a given sweep (e.g the last one)
print(looper.data.iloc[-1])
# Post-process the output values. For example here, the output is a 1D-array,
# make the sum of sweeps with 'Sr' emitter
X = np.array([ x for x in data[data.emitter == 'Sr'].output]).sum(axis=0)
print(X)