Source code for calculator

#!/usr/bin/env python
#
# Copyright © 2016-2020 - Rennes Physics Institute
#
# This file is part of msspec.
#
# msspec is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# msspec is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this msspec.  If not, see <http://www.gnu.org/licenses/>.
#
# Source file  : src/msspec/calculator.py
# Last modified: ven. 10 avril 2020 17:19:24
# Committed by : "Sylvain Tricot <sylvain.tricot@univ-rennes1.fr>"


"""
Module calculator
=================

This module contains different classes used to define a new calculator for
specific spectroscopies understood by MsSpec.

These spectroscopies are listed :ref:`here <globalparameters-spectroscopy>`.

There is one *calculator* class for each spectroscopy. The class name is based
on the spectroscopy name. For instance, the class for PhotoElectron Diffraction
is called :py:class:`_PED`.

The helper function :py:func:`calculator.MSSPEC` is used to create objects from
these classes by passing the kind of spectroscopy as a keyword argument.

For more information on MsSpec, follow this
`link <https://ipr.univ-rennes1.fr/msspec>`__

"""

import inspect
import logging
import os
import re
import shutil
import sys
import time
from collections import OrderedDict
from datetime import datetime
from io import StringIO
from shutil import copyfile
from shutil import rmtree
from subprocess import PIPE
from subprocess import Popen

import ase.atom
import ase.atoms
import ase.data
from ase.io.extxyz import write_xyz
import numpy as np
from ase.calculators.calculator import Calculator
from terminaltables.ascii_table import AsciiTable

from msspec import iodata
from msspec.calcio import CompCurveIO
from msspec.calcio import PhagenIO
from msspec.calcio import SpecIO
from msspec.data import electron_be
from msspec.misc import get_call_info
from msspec.misc import get_level_from_electron_configuration
from msspec.misc import log_process_output
from msspec.misc import LOGGER
from msspec.misc import set_log_output
from msspec.misc import UREG
from msspec.misc import XRaySource
from msspec.parameters import CalculationParameters
from msspec.parameters import CompCurveParameters
from msspec.parameters import CompCurveGeneralParameters
from msspec.parameters import DetectorParameters
from msspec.parameters import EIGParameters
from msspec.parameters import GlobalParameters
from msspec.parameters import MuffintinParameters
from msspec.parameters import PEDParameters
from msspec.parameters import PhagenMallocParameters
from msspec.parameters import PhagenParameters
from msspec.parameters import ScanParameters
from msspec.parameters import SourceParameters
from msspec.parameters import SpecMallocParameters
from msspec.parameters import SpecParameters
from msspec.parameters import TMatrixParameters
from msspec.phagen.fortran.libphagen import main as do_phagen
from msspec.spec.fortran import _eig_mi
from msspec.spec.fortran import _eig_pw
from msspec.spec.fortran import _phd_mi_noso_nosp_nosym
from msspec.spec.fortran import _phd_se_noso_nosp_nosym
from msspec.spec.fortran import _comp_curves
from msspec.utils import get_atom_index


[docs]class _MSCALCULATOR(Calculator): """ This class defines an ASE calculator for doing Multiple scattering calculations. """ implemented_properties = ['', ] __data = {} def __init__(self, spectroscopy='PED', algorithm='expansion', polarization=None, dichroism=None, spinpol=False, folder='./calc', txt='-', **kwargs): stdout = sys.stdout if isinstance(txt, str) and txt != '-': stdout = open(txt, 'w') #elif isinstance(txt, buffer): # stdout = txt elif txt == None: stdout = open('/dev/null', 'a') #set_log_output(stdout) ######################################################################## LOGGER.debug('Initialization of %s', self.__class__.__name__) LOGGER.debug(get_call_info(inspect.currentframe())) ######################################################################## # Init the upper class Calculator.__init__(self, **kwargs) ######################################################################## LOGGER.debug(' create low level parameters') ######################################################################## self.phagen_parameters = PhagenParameters() self.phagen_malloc_parameters = PhagenMallocParameters() self.spec_parameters = SpecParameters() self.spec_malloc_parameters = SpecMallocParameters() ######################################################################## LOGGER.debug(' create higher level parameters') ######################################################################## self.tmatrix_parameters = TMatrixParameters(self.phagen_parameters) self.muffintin_parameters = MuffintinParameters(self.phagen_parameters, self.spec_parameters) self.global_parameters = GlobalParameters(self.phagen_parameters, self.spec_parameters) if spectroscopy == 'PED': self.spectroscopy_parameters = PEDParameters(self.phagen_parameters, self.spec_parameters) elif spectroscopy == 'EIG': self.spectroscopy_parameters = EIGParameters(self.phagen_parameters, self.spec_parameters) else: raise NameError('No such spectroscopy') self.source_parameters = SourceParameters(self.global_parameters, self.phagen_parameters, self.spec_parameters) self.detector_parameters = DetectorParameters(self.global_parameters, self.phagen_parameters, self.spec_parameters) self.scan_parameters = ScanParameters(self.global_parameters, self.phagen_parameters, self.spec_parameters) self.calculation_parameters = CalculationParameters( self.global_parameters, self.phagen_parameters, self.spec_parameters) # initialize all parameters with defaults values LOGGER.info("Set default values =========================================") for p in (list(self.global_parameters) + list(self.muffintin_parameters) + list(self.tmatrix_parameters) + list(self.source_parameters) + list(self.detector_parameters) + list(self.scan_parameters) + list(self.calculation_parameters) + list(self.spectroscopy_parameters)): p.set(p.default) LOGGER.info("End of default values ======================================") # updated global parameters with provided keywords self.global_parameters.spectroscopy = spectroscopy self.global_parameters.algorithm = algorithm self.global_parameters.polarization = polarization self.global_parameters.dichroism = dichroism self.global_parameters.spinpol = spinpol self.global_parameters.folder = folder self.phagenio = PhagenIO(self.phagen_parameters, self.phagen_malloc_parameters) self.specio = SpecIO(self.spec_parameters, self.spec_malloc_parameters, self.phagenio) ######################################################################## LOGGER.debug(' create a space dedicated to the calculation') ######################################################################## self.init_folder = os.getcwd() #self.msspec_folder = os.path.join(MSSPEC_ROOT) self.tmp_folder = os.path.abspath(folder) LOGGER.debug(' folder: \'%s\'', self.tmp_folder) if not os.path.exists(self.tmp_folder): os.makedirs(self.tmp_folder) os.makedirs(os.path.join(self.tmp_folder, 'input')) os.makedirs(os.path.join(self.tmp_folder, 'output')) #copyfile(os.path.join(self.msspec_folder, 'ase', 'Makefile'), # os.path.join(self.tmp_folder, 'Makefile')) #os.chdir(self.tmp_folder) #inv = cor = 'NO' #if algorithm == 'expansion': # pass #elif algorithm == 'inversion': # inv = 'YES' #elif algorithm == 'correlation': # cor = 'YES' # spin orbit resolved (not yet) #sorb = 'NO' # spin resolved #dichro_spinpol = False #if dichroism in ('sum_over_spin', 'spin_resolved'): dichro_spinpol = True #spin = 'NO' #if spinpol or dichro_spinpol: # spin = 'YES' #if spin == 'YES': # LOGGER.error('Option not implemented!') # raise NotImplementedError( # 'Spin polarization is not implemeted yet!') calctype_spectro = self.spec_parameters.get_parameter('calctype_spectro') calctype_spectro = calctype_spectro.value #self._make_opts = (MSSPEC_ROOT, calctype_spectro, inv, cor, # spin, sorb, self.tmp_folder) # Initialize the working environment #self._make('init') self.modified = False self.resources = {} ######################################################################## LOGGER.debug(' initialization done.\n') ######################################################################## def _make(self, target): LOGGER.debug(get_call_info(inspect.currentframe())) os.chdir(self.tmp_folder) cmd = ("make__SPACE__ROOT_FOLDER=\"{}\"__SPACE__SPEC=\"{}\"__SPACE__INV=\"{}\"__SPACE__COR=\"{" "}\"__SPACE__" "SPIN=\"{}\"__SPACE__SO=\"{}\"__SPACE__CALC_FOLDER=\"{}\"__SPACE__{}").format(*(self._make_opts + ( target,))).split('__SPACE__') #cmd = cmd.replace(' ', '\ ') #cmd = cmd.split('__SPACE__') LOGGER.debug(' the full command is: %s', cmd) child = Popen(cmd,stdout=PIPE, stderr=PIPE) logger_name = LOGGER.name if target == 'tmatrix': logger_name = 'Phagen' elif target == 'compute': logger_name = 'Spec' log_process_output(child, logger=logger_name) os.chdir(self.init_folder) if child.returncode != 0: LOGGER.error("Unable to successfully run the target: {}".format(target)) sys.exit(1) def _guess_ke(self, level): """ Try to guess the kinetic energy based on the level and the source energy. If the kinetic energy cannot be infered because the level is not reported in the database, the returned value is None. """ try: state = get_level_from_electron_configuration(level) absorber_atomic_number = self.atoms[self.atoms.absorber].number lines = electron_be[absorber_atomic_number] binding_energy = lines[state] except KeyError: # unable to find a binding energy in the database return None # let's assume work function energy (in eV) wf = 4.5 source_energy = self.source_parameters.get_parameter('energy').value ke = source_energy - binding_energy - wf #return np.array(ke, dtype=np.float).flatten() return ke def run_phagen(self): input_fname = os.path.join(self.tmp_folder, 'input/input.ms') modified = self.phagenio.write_input_file(filename=input_fname) # Write the external potential file pot = self.tmatrix_parameters.potential if pot != 'muffin_tin': inpot_fname = os.path.join(self.tmp_folder, 'output/fort.2') mflag = pot.write(inpot_fname, self.phagenio.prototypical_atoms) modified = modified or mflag # Make a copy of the input potential file dest = os.path.join(self.tmp_folder, 'input/extpot.dat') shutil.copy(inpot_fname, dest) # Run if input files changed os.chdir(os.path.join(self.tmp_folder, 'output')) modified = True if modified: # Run phagen do_phagen() # Copy some output files to be more explicit shutil.copy('clus/clus.out', 'cluster.clu') shutil.copy('fort.35', 'tmatrix.tl') shutil.copy('fort.55', 'tmatrix.rad') # Load data self.phagenio.load_tl_file('tmatrix.tl') self.phagenio.load_cluster_file('cluster.clu') self.phagenio.load_prototypical_atoms('div/molinpot3.out') # Change back to the init_folder os.chdir(self.init_folder) def run_spec(self, malloc={}): def get_li(level): orbitals = 'spdfghi' m = re.match(r'\d(?P<l>[%s])(\d/2)?' % orbitals, level) return orbitals.index(m.group('l')) #include_fname = os.path.join(self.tmp_folder, 'src/spec.inc') input_fname = os.path.join(self.tmp_folder, 'input/spec.dat') kdirs_fname = os.path.join(self.tmp_folder, 'input/kdirs.dat') mod0 = self.specio.write_input_file(filename=input_fname) #mod1 = self.specio.write_include_file(filename=include_fname) mod2 = self.specio.write_kdirs_file(filename=kdirs_fname) #self.modified = self.modified or mod0 or mod1 or mod2 self.modified = self.modified or mod0 or mod2 #self._make('tmatrix') #self._make('bin/spec') #t0 = time.time() #self._make('compute') #t1 = time.time() #self.resources['spec_time'] = t1 - t0 self.modified = True if self.modified: #self.get_tmatrix() t0 = time.time() os.chdir(os.path.join(self.tmp_folder, 'output')) # set/get the dimension values requirements = OrderedDict({ 'NATP_M' : self.phagenio.nat, 'NATCLU_M' : len(self.atoms), 'NAT_EQ_M' : self.phagenio.nateqm, 'N_CL_L_M' : 1, 'NE_M' : self.phagenio.ne, 'NL_M' : self.phagenio.nlmax + 1, 'LI_M' : get_li(self.spec_parameters.extra_level) + 1, 'NEMET_M' : 1, 'NO_ST_M' : self.spec_parameters.calc_no, 'NDIF_M' : 10, 'NSO_M' : 2, 'NTEMP_M' : 1, 'NODES_EX_M' : 3, 'NSPIN_M' : 1, # to change for spin dependent 'NTH_M' : 2000, 'NPH_M' : 8000, 'NDIM_M' : 100000, 'N_TILT_M' : 11, # to change see extdir.f 'N_ORD_M' : 250, 'NPATH_M' : self.spec_malloc_parameters.NPATH_M, 'NGR_M' : 10,}) # update with provided values for key, value in malloc.items(): requirements[key] = value # set some automatic values for memory allocation for key, value in requirements.items(): setattr(self.spec_malloc_parameters, key, value) # Get the spec function to launch if self.global_parameters.spectroscopy == 'PED': if self.global_parameters.algorithm == 'expansion': do_spec = _phd_se_noso_nosp_nosym.run elif self.global_parameters.algorithm == 'inversion': do_spec = _phd_mi_noso_nosp_nosym.run else: LOGGER.error("\'{}\' spectroscopy with \'{}\' algorithm is not " "an allowed combination.".format(self.global_parameters.spectroscopy, self.global_parameters.algorithm)) raise ValueError elif self.global_parameters.spectroscopy == 'EIG': if self.global_parameters.algorithm == 'inversion': do_spec = _eig_mi.run elif self.global_parameters.algorithm == 'power': do_spec = _eig_pw.run else: LOGGER.error("\'{}\' spectroscopy with \'{}\' algorithm is not " "an allowed combination.".format(self.global_parameters.spectroscopy, self.global_parameters.algorithm)) raise ValueError else: LOGGER.error("\'{}\' spectroscopy is not supported yet.".format(self.global_parameters.spectroscopy)) raise NotImplementedError # cannot use this, unfortunately #do_spec(*requirements.values()) do_spec( requirements['NATP_M'], requirements['NATCLU_M'], requirements['NAT_EQ_M'], requirements['N_CL_L_M'], requirements['NE_M'], requirements['NL_M'], requirements['LI_M'], requirements['NEMET_M'], requirements['NO_ST_M'], requirements['NDIF_M'], requirements['NSO_M'], requirements['NTEMP_M'], requirements['NODES_EX_M'], requirements['NSPIN_M'], requirements['NTH_M'], requirements['NPH_M'], requirements['NDIM_M'], requirements['N_TILT_M'], requirements['N_ORD_M'], requirements['NPATH_M'], requirements['NGR_M']) t1 = time.time() self.resources['spec_time'] = t1 - t0 os.chdir(self.init_folder) def get_tmatrix(self): LOGGER.info("Getting the TMatrix...") LOGGER.debug(get_call_info(inspect.currentframe())) # If using an external potential, Phagen should be run twice, # the first time with the internal MT potential. pot = self.tmatrix_parameters.potential if pot != 'muffin_tin': LOGGER.info("Phagen is running first with MuffinTin potential...") self.tmatrix_parameters.potential = "muffin_tin" self.tmatrix_parameters.exchange_correlation = "hedin_lundqvist_complex" self.run_phagen() self.tmatrix_parameters.potential = pot self.run_phagen() tl = self.phagenio.tl tl_threshold = self.tmatrix_parameters.get_parameter('tl_threshold') if tl_threshold.value != None: LOGGER.debug(" applying tl_threshold to %s...", tl_threshold.value) go_on = True while go_on: go_on = False for ia in range(self.phagenio.nat): for ie in range(self.phagenio.ne): last_tl = tl[ia][ie][-1, -2:] # convert to complex last_tl = last_tl[0] + last_tl[1] * 1j if np.abs(last_tl) < tl_threshold.value: # remove last line of tl tl[ia][ie] = tl[ia][ie][:-1, :] go_on = True max_tl = self.tmatrix_parameters.get_parameter('max_tl').value cluster = self.phagen_parameters.get_parameter('atoms').value #proto_indices = cluster.get_array('proto_indices') proto_indices = np.array( [atom.get('proto_index') for atom in self.atoms]) if max_tl != None: LOGGER.debug(" applying max_tl: %s", max_tl) for ia in range(self.phagenio.nat): for ie in range(self.phagenio.ne): try: # for each set of tl: # 1. get the symbol of the prototipical atom j = np.where(proto_indices == ia+1)[0] symbol = cluster[j][0].symbol # 2. get the number of max tl allowed ntl = max_tl[symbol] # 3. reshape the tl set accordingly tl[ia][ie] = tl[ia][ie][:ntl, :] except KeyError: pass self.phagenio.write_tl_file( os.path.join(self.tmp_folder, 'output/tmatrix.tl')) # update spec extra parameters here self.spec_parameters.set_parameter('extra_nat', self.phagenio.nat) self.spec_parameters.set_parameter('extra_nlmax', self.phagenio.nlmax)
[docs] def set_atoms(self, atoms): """Defines the cluster on which the calculator will work. :param atoms: The cluster to attach the calculator to. :type atoms: :py:class:`ase.Atoms` """ if atoms.absorber == None: LOGGER.error("You must define the absorber before setting the atoms to the" "calculator.") self.atoms = atoms self.phagen_parameters.set_parameter('atoms', atoms) self.spec_parameters.set_parameter('extra_atoms', atoms)
[docs] def get_parameters(self): """Get all the defined parameters in the calculator. :return: A list of all parameters objects. :rtype: List of :py:class:`parameters.Parameter` """ _ = [] for section in ('global', 'muffintin', 'tmatrix', 'spectroscopy', 'source', 'detector', 'scan', 'calculation'): parameters = getattr(self, section + '_parameters') for p in parameters: _.append(p) return _
def add_cluster_to_dset(self, dset): clusbuf = StringIO() self.atoms.info['absorber'] = self.atoms.absorber #self.atoms.write(clusbuf, format='xyz') write_xyz(clusbuf, self.atoms) dset.add_parameter(group='Cluster', name='cluster', value=clusbuf.getvalue(), hidden="True")
[docs] def shutdown(self): """Removes the temporary folder and all its content. The user may whish to keep the calculation folder (see :ref:`globalparameters-folder`) so it is not removed at the end of the calculation. The calculation folder contains raw results from *Phagen* and *Spec* programs as well as their input files and configuration. It allows the program to save some time by not repeating some tasks (such as the Fortran code generation, building the binaries, computing things that did not changed between runs...). Calling this function at the end of the calculation will erase this calculation folder. .. warning:: Calling this function will erase the produced data without prompting you for confirmation, so take care of explicitly saving your results in your script, by using the :py:func:`iodata.Data.save` method for example. """ LOGGER.info('Deleting temporary files...') rmtree(self.tmp_folder)
[docs]class _PED(_MSCALCULATOR): """This class creates a calculator object for PhotoElectron DIffraction spectroscopy. :param algorithm: The algorithm to use for the computation. See :ref:`globalparameters-algorithm` for more details about the allowed values and the type. :param polarization: The incoming light polarization (see :ref:`globalparameters-polarization`) :param dichroism: Wether to enable or not the dichroism (see :ref:`globalparameters-dichroism`) :param spinpol: Enable or disable the spin polarization in the calculation (see :ref:`globalparameters-spinpol`) :param folder: The path to the temporary folder for the calculations. See :ref:`globalparameters-folder` :param txt: The name of a file where to redirect standard output. The string '-' will redirect the standard output to the screen (default). :type txt: str .. note:: This class constructor is not meant to be called directly by the user. Use the :py:func:`MSSPEC` to instanciate any calculator. """ def __init__(self, algorithm='expansion', polarization=None, dichroism=None, spinpol=False, folder='./calc', txt='-'): _MSCALCULATOR.__init__(self, spectroscopy='PED', algorithm=algorithm, polarization=polarization, dichroism=dichroism, spinpol=spinpol, folder=folder, txt=txt) self.iodata = iodata.Data('PED Simulation') def _get_scan(self, scan_type='theta', phi=0, theta=np.linspace(-70, 70, 141), level=None, kinetic_energy=None, data=None, malloc={}, other_parameters={}): LOGGER.info("Computting the %s scan...", scan_type) if data: self.iodata = data if kinetic_energy is None: # try to guess the kinetic energy kinetic_energy = self._guess_ke(level) # if still None... if kinetic_energy is None: LOGGER.error('Unable to guess the kinetic energy!') raise ValueError('You must define a kinetic_energy value.') # update the parameters self.scan_parameters.set_parameter('kinetic_energy', kinetic_energy) all_ke = self.scan_parameters.get_parameter('ke_array') if np.any(all_ke.value < 0): LOGGER.error('Source energy is not high enough or level too deep!') raise ValueError('Kinetic energy is < 0! ({})'.format( kinetic_energy)) self.scan_parameters.set_parameter('type', scan_type) # make sure there is only one energy point in scatf scan if scan_type == 'scatf': assert len(all_ke) == 1, ('kinetic_energy should not be an array ' 'in scatf scan') if scan_type != 'scatf': self.scan_parameters.set_parameter('phi', phi) self.scan_parameters.set_parameter('theta', theta) self.spectroscopy_parameters.set_parameter('level', level) # It is still possible to modify any option right before runing phagen # and spec for k, v in other_parameters.items(): grp_str, param_str = k.split('.') grp = getattr(self, grp_str) grp.set_parameter(param_str, v, force=True) self.get_tmatrix() self.run_spec(malloc) # Now load the data ndset = len(self.iodata) dset = self.iodata.add_dset('{} scan [{:d}]'.format(scan_type, ndset)) for p in self.get_parameters(): bundle = {'group': str(p.group), 'name': str(p.name), 'value': str(p.value), 'unit': '' if p.unit is None else str(p.unit)} dset.add_parameter(**bundle) if scan_type in ('theta', 'phi', 'energy'): results_fname = os.path.join(self.tmp_folder, 'output/results.dat') data = self.specio.load_results(results_fname) for _plane, _theta, _phi, _energy, _dirsig, _cs in data.T: if _plane == -1: dset.add_row(theta=_theta, phi=_phi, energy=_energy, cross_section=_cs, direct_signal=_dirsig) elif scan_type in ('scatf',): results_fname = os.path.join(self.tmp_folder, 'output/facdif1.dat') data = self.specio.load_facdif(results_fname) data = data[:, [1, 4, 5, 6, 8]].T _proto, _sf_real, _sf_imag, _theta, _energy = data _sf = _sf_real + _sf_imag * 1j dset.add_columns(proto_index=_proto, sf_real=np.real(_sf), sf_imag=np.imag(_sf), sf_module=np.abs(_sf), theta=_theta, energy=_energy) elif scan_type in ('theta_phi',): results_fname = os.path.join(self.tmp_folder, 'output/results.dat') data = self.specio.load_results(results_fname) #theta_c, phi_c = data[[2, 3], :] #xsec_c = data[-1, :] #dirsig_c = data[-2, :] #dset.add_columns(theta=theta_c) #dset.add_columns(phi=phi_c) #dset.add_columns(cross_section=xsec_c) #dset.add_columns(direct_signal=dirsig_c) for _plane, _theta, _phi, _energy, _dirsig, _cs in data.T: if _plane == -1: dset.add_row(theta=_theta, phi=_phi, energy=_energy, cross_section=_cs, direct_signal=_dirsig) # create a view title = '' for ke in all_ke.value: if scan_type == 'theta': absorber_symbol = self.atoms[self.atoms.absorber].symbol title = 'Polar scan of {}({}) at {:.2f} eV'.format( absorber_symbol, level, ke) xlabel = r'Angle $\theta$($\degree$)' ylabel = r'Signal (a. u.)' view = dset.add_view("E = {:.2f} eV".format(ke), title=title, xlabel=xlabel, ylabel=ylabel) for angle_phi in self.scan_parameters.get_parameter( 'phi').value: where = ("energy=={:.2f} and phi=={:.2f}" "").format(ke, angle_phi) legend = r'$\phi$ = {:.1f} $\degree$'.format(angle_phi) view.select('theta', 'cross_section', where=where, legend=legend) if scan_type == 'phi': absorber_symbol = self.atoms[self.atoms.absorber].symbol title = 'Azimuthal scan of {}({}) at {:.2f} eV'.format( absorber_symbol, level, ke) xlabel = r'Angle $\phi$($\degree$)' ylabel = r'Signal (a. u.)' view = dset.add_view("E = {:.2f} eV".format(ke), title=title, xlabel=xlabel, ylabel=ylabel) for angle_theta in self.scan_parameters.get_parameter( 'theta').value: where = ("energy=={:.2f} and theta=={:.2f}" "").format(ke, angle_theta) legend = r'$\theta$ = {:.1f} $\degree$'.format(angle_theta) view.select('phi', 'cross_section', where=where, legend=legend) if scan_type == 'theta_phi': absorber_symbol = self.atoms[self.atoms.absorber].symbol title = ('Stereographic projection of {}({}) at {:.2f} eV' '').format(absorber_symbol, level, ke) xlabel = r'Angle $\phi$($\degree$)' ylabel = r'Signal (a. u.)' view = dset.add_view("E = {:.2f} eV".format(ke), title=title, xlabel=xlabel, ylabel=ylabel, projection='stereo', colorbar=True, autoscale=True) view.select('theta', 'phi', 'cross_section') if scan_type == 'scatf': for i in range(self.phagenio.nat): proto_index = i+1 title = 'Scattering factor at {:.3f} eV'.format(kinetic_energy) mini = min(map(np.min, [dset.sf_real, dset.sf_imag, dset.sf_module])) maxi = max(map(np.max, [dset.sf_real, dset.sf_imag, dset.sf_module])) view = dset.add_view("Proto. atom #{:d}".format(proto_index), title=title, projection='polar', ylim=[mini, maxi]) where = "proto_index=={:d}".format(proto_index) view.select('theta', 'sf_module', where=where, legend=r'$|f(\theta)|$') view.select('theta', 'sf_real', where=where, legend=r'$\Re(f(\theta))$') view.select('theta', 'sf_imag', where=where, legend=r'$\Im(f(\theta))$') if scan_type == 'energy': absorber_symbol = self.atoms[self.atoms.absorber].symbol title = (r'Energy scan of {}({}) at $\theta$={:.2f}$\degree$ and ' '$\phi$={:.2f}$\degree$').format( absorber_symbol, level, theta, phi) xlabel = r'Photoelectron kinetic energy (eV)' ylabel = r'Signal (a. u.)' view = dset.add_view("EnergyScan".format(ke), title=title, xlabel=xlabel, ylabel=ylabel) view.select('energy', 'cross_section') # save the cluster #clusbuf = StringIO() #self.atoms.info['absorber'] = self.atoms.absorber #self.atoms.write(clusbuf, format='xyz') #dset.add_parameter(group='Cluster', name='cluster', value=clusbuf.getvalue(), hidden="True") self.add_cluster_to_dset(dset) LOGGER.info('%s scan computing done!', scan_type) return self.iodata
[docs] def get_potential(self, atom_index=None, data=None, units={'energy': 'eV', 'space': 'angstrom'}): """Computes the coulombic part of the atomic potential. :param atom_index: The atom indices to get the potential of, either as a list or as a single integer :param data: The data object to store the results to :param units: The units to be used. A dictionary with the keys 'energy' and 'space' :return: A Data object """ LOGGER.info("Getting the Potential...") LOGGER.debug(get_call_info(inspect.currentframe())) _units = {'energy': 'eV', 'space': 'angstrom'} _units.update(units) if data: self.iodata = data self.run_phagen() filename = os.path.join(self.tmp_folder, 'output/tmatrix.tl') tl = self.phagenio.load_tl_file(filename) filename = os.path.join(self.tmp_folder, 'output/cluster.clu') self.phagenio.load_cluster_file(filename) if self.phagen_parameters.potgen in ('in'): filename = os.path.join(self.tmp_folder, 'output/plot/plot_vc.dat') else: filename = os.path.join(self.tmp_folder, 'output/plot/plot_v.dat') pot_data = self.phagenio.load_potential_file(filename) cluster = self.phagen_parameters.get_parameter('atoms').value dset = self.iodata.add_dset('Potential [{:d}]'.format(len(self.iodata))) r = [] v = [] index = np.empty((0,1), dtype=int) absorber_position = cluster[cluster.absorber].position for _pot_data in pot_data: # find the proto index of these data at_position = (_pot_data['coord'] * UREG.bohr_radius).to('angstrom').magnitude + absorber_position at_index = get_atom_index(cluster, *at_position) at_proto_index = cluster[at_index].get('proto_index') #values = np.asarray(_pot_data['values']) values = _pot_data['values'] index = np.append(index, np.ones(values.shape[0], dtype=int) * at_proto_index) r = np.append(r, (values[:, 0] * UREG.bohr_radius).to(_units['space']).magnitude) v = np.append(v, (values[:, 1] * UREG.rydberg).to(_units['energy']).magnitude) dset.add_columns(distance=r, potential=v, index=index) view = dset.add_view('potential data', title='Potential energy of atoms', xlabel='distance from atomic center [{:s}]'.format(_units['space']), ylabel='energy [{:s}]'.format(_units['energy']), scale='linear', autoscale=True) if atom_index == None: for i in range(pot_data[len(pot_data) - 1]['index']): view.select('distance', 'potential', where="index=={:d}".format(i), legend="Atom index #{:d}".format(i + 1)) else: for i in atom_index: view.select('distance', 'potential', where="index=={:d}".format(cluster[i].get('proto_index') - 1), legend="Atom index #{:d}".format(i)) return self.iodata
[docs] def get_scattering_factors(self, level='1s', kinetic_energy=None, data=None, **kwargs): """Computes the scattering factors of all prototypical atoms in the cluster. This function computes the real and imaginery parts of the scattering factor as well as its modulus for each non symetrically equivalent atom in the cluster. The results are stored in the *data* object if provided as a parameter. :param level: The electronic level. See :ref:`pedparameters-level`. :param kinetic_energy: see :ref:`scanparameters-kinetic_energy`. :param data: a :py:class:`iodata.Data` object to append the results to or None. :returns: The modified :py:class:`iodata.Data` object passed as an argument or a new :py:class:`iodata.Data` object. """ data = self._get_scan(scan_type='scatf', level=level, data=data, kinetic_energy=kinetic_energy, **kwargs) return data
[docs] def get_theta_scan(self, phi=0, theta=np.linspace(-70, 70, 141), level=None, kinetic_energy=None, data=None, **kwargs): """Computes a polar scan of the emitted photoelectrons. :param phi: The azimuthal angle in degrees. See :ref:`scanparameters-phi`. :param theta: All the values of the polar angle to be computed. See :ref:`scanparameters-theta`. :param level: The electronic level. See :ref:`pedparameters-level`. :param kinetic_energy: see :ref:`scanparameters-kinetic_energy`. :param data: a :py:class:`iodata.Data` object to append the results to or None. :returns: The modified :py:class:`iodata.Data` object passed as an argument or a new :py:class:`iodata.Data` object. """ data = self._get_scan(scan_type='theta', level=level, theta=theta, phi=phi, kinetic_energy=kinetic_energy, data=data, **kwargs) return data
[docs] def get_phi_scan(self, phi=np.linspace(0, 359, 359), theta=0, level=None, kinetic_energy=None, data=None, **kwargs): """Computes an azimuthal scan of the emitted photoelectrons. :param phi: All the values of the azimuthal angle to be computed. See :ref:`scanparameters-phi`. :param theta: The polar angle in degrees. See :ref:`scanparameters-theta`. :param level: The electronic level. See :ref:`pedparameters-level`. :param kinetic_energy: see :ref:`scanparameters-kinetic_energy`. :param data: a :py:class:`iodata.Data` object to append the results to or None. :returns: The modified :py:class:`iodata.Data` object passed as an argument or a new :py:class:`iodata.Data` object. """ data = self._get_scan(scan_type='phi', level=level, theta=theta, phi=phi, kinetic_energy=kinetic_energy, data=data, **kwargs) return data
[docs] def get_theta_phi_scan(self, phi=np.linspace(0, 360), theta=np.linspace(0, 90, 45), level=None, kinetic_energy=None, data=None, **kwargs): """Computes a stereographic scan of the emitted photoelectrons. The azimuth ranges from 0 to 360° and the polar angle ranges from 0 to 90°. :param level: The electronic level. See :ref:`pedparameters-level`. :param kinetic_energy: see :ref:`scanparameters-kinetic_energy`. :param data: a :py:class:`iodata.Data` object to append the results to or None. :returns: The modified :py:class:`iodata.Data` object passed as an argument or a new :py:class:`iodata.Data` object. """ data = self._get_scan(scan_type='theta_phi', level=level, theta=theta, phi=phi, kinetic_energy=kinetic_energy, data=data, **kwargs) return data
[docs] def get_energy_scan(self, phi=0, theta=0, level=None, kinetic_energy=None, data=None, **kwargs): """Computes an energy scan of the emitted photoelectrons. :param phi: All the values of the azimuthal angle to be computed. See :ref:`scanparameters-phi`. :param theta: The polar angle in degrees. See :ref:`scanparameters-theta`. :param level: The electronic level. See :ref:`pedparameters-level`. :param kinetic_energy: see :ref:`scanparameters-kinetic_energy`. :param data: a :py:class:`iodata.Data` object to append the results to or None. :returns: The modified :py:class:`iodata.Data` object passed as an argument or a new :py:class:`iodata.Data` object. """ data = self._get_scan(scan_type='energy', level=level, theta=theta, phi=phi, kinetic_energy=kinetic_energy, data=data, **kwargs) return data
[docs]class _EIG(_MSCALCULATOR): """ .. note:: This class constructor is not meant to be called directly by the user. Use the :py:func:`MSSPEC` to instanciate any calculator. """ def __init__(self, algorithm='inversion', polarization=None, dichroism=None, spinpol=False, folder='./calc', txt='-'): _MSCALCULATOR.__init__(self, spectroscopy='EIG', algorithm=algorithm, polarization=polarization, dichroism=dichroism, spinpol=spinpol, folder=folder, txt=txt) if algorithm not in ('inversion', 'power'): LOGGER.error("Only the 'inversion' or the 'power' algorithms " "are supported in EIG spectroscopy mode") exit(1) self.iodata = iodata.Data('EIG Simulation') # ensure no core-hole is created by using the "led" (LEED) mode # of Phagen self.phagen_parameters.calctype = "led" def get_eigen_values(self, kinetic_energy, data=None): LOGGER.info("Computting the eigen values...") if data: self.iodata = data # update the parameters self.scan_parameters.set_parameter('kinetic_energy', kinetic_energy) all_ke = self.scan_parameters.get_parameter('ke_array').value if np.any(all_ke < 0): LOGGER.error('Source energy is not high enough or level too deep!') raise ValueError('Kinetic energy is < 0! ({})'.format( kinetic_energy)) # ensure that if the alogorithm is 'inversion' the IPWM must be set to 0 if self.global_parameters.algorithm.lower() == 'inversion': self.spec_parameters.eigval_ipwm = 0 self.get_tmatrix() self.run_spec() # Now create a dataset ndset = len(self.iodata) dset = self.iodata.add_dset('Eigen values calculation [{:d}]'.format(ndset)) for p in self.get_parameters(): bundle = {'group': str(p.group), 'name': str(p.name), 'value': str(p.value), 'unit': '' if p.unit is None else str(p.unit)} dset.add_parameter(**bundle) results_fname = os.path.join(self.tmp_folder, 'output/results.dat') data = self.specio.load_results(results_fname) energy, spec_rad = data dset.add_columns(energy=energy, spectral_radius=spec_rad) # create a view title = 'Spectral radii versus kinetic energy' xlabel = r'Kinetic energy (eV)' ylabel = r'Spectral radius' view = dset.add_view("Spectral radius", title=title, xlabel=xlabel, ylabel=ylabel, marker='o') view.select('energy', 'spectral_radius') # add the cluster to the dataset self.add_cluster_to_dset(dset) return self.iodata
[docs]def MSSPEC(spectroscopy='PED', **kwargs): """ The MsSpec calculator constructor. Instanciates a calculator for the given spectroscopy. :param spectroscopy: See :ref:`globalparameters-spectroscopy`. :param kwargs: Other keywords are passed to the spectroscopy-specific constructor. :returns: A :py:class:`calculator._MSCALCULATOR` object. """ module = sys.modules[__name__] cls = getattr(module, '_' + spectroscopy) return cls(**kwargs)
class RFACTOR(object): def __init__(self, folder='./rfc'): self.iodata = iodata.Data('R-Factor analysis') self.folder = folder self._params = CompCurveParameters() self.general_parameters = CompCurveGeneralParameters( self._params) self.io = CompCurveIO(self._params) # prepare the working area if not os.path.exists(self.folder): os.makedirs(self.folder) os.makedirs(os.path.join(self.folder, 'rfc')) os.makedirs(os.path.join(self.folder, 'exp/div')) os.makedirs(os.path.join(self.folder, 'calc/div')) os.makedirs(os.path.join(self.folder, 'plot')) # store the r-factor analysis results # self.variables = {'variable0_name': [value0, value1, ...], # 'variable1_name': [value0, value1, ...], # ...} self.variables = {} # self.rfc = [[rf0_0, rf0_1, ... , rf0_11], <-- run 0 # [rf1_0, rf1_1, ... , rf1_11], <-- run 1 # ............................, # [rfn_0, rfn_1, ... , rfn_11]] <-- run n self.rfc = [] # The x and y array to compare to self.xref = self.yref = [0,] # The index of the best value self.index = 0 # The best values as a dictionary self.best_values = {} # The number of calculation files in the stack. This counter is # inremented each time calculation is run self.stack_count = 0 # initialize all parameters with defaults values self.logger = logging.getLogger("RFACTOR") self.logger.info("Set default values =========================================") for p in (list(self.general_parameters)): p.set(p.default) self.logger.info("End of default values ======================================") #exit() def set_references(self, X, Y): self.xref = X self.yref = Y def run(self, *args, data=None, **kwargs): # Get the data object if provided #data = kwargs.pop('data', None) if data: assert isinstance(data, iodata.Data), ("Unsupported type for data" "keyword.") self.iodata = data # Move to the working folder cwd = os.getcwd() os.chdir(self.folder) # Write the reference data np.savetxt('exp/experiment.txt', np.transpose([self.xref, self.yref])) # Write all the input calculation files # Number of input files noif = int(len(args)/2) for i in range(noif): X, Y = args[2*i], args[2*i+1] fname = os.path.join('calc', f'calculation{self.stack_count:d}.txt') # And save to the working space np.savetxt(fname, np.transpose([X, Y])) self.stack_count += 1 # Update the list of input calculation files self._params.calc_filename = [] for i in range(self.stack_count): fname = os.path.join('calc', f'calculation{i:d}.txt') self._params.calc_filename.append(fname) # Write the input file self.io.write_input_file('comp_curves.dat') # And finally run _comp_curves.run() ####################################################################### # Load the results ####################################################################### self.rfc = [] for i in range(self.stack_count): # Read results files and get the R-Factors, exp data and calc # data for file index 'i' rfc, exp_data, calc_data = self.io.load_results(index=i) # Update the list of R-Factors results self.rfc.append(rfc) # Update the variables values for i in range(noif): for k,v in kwargs.items(): try: vi = v[i] except (IndexError, TypeError) as err: vi = v try: self.variables[k].append(vi) except KeyError as err: self.variables[k] = [vi,] ####################################################################### # Analysis ####################################################################### rfc = np.array(self.rfc) # Get the index of the minimum for each R-Factor (each column) all_min = np.argmin(rfc, axis=0) # Iterate over each run and get the number of R-Factors that are min # for this set all_counts = np.zeros(self.stack_count, dtype=int) for i in range(self.stack_count): all_counts[i] = len(np.where(all_min==i)[0]) # The best set of variables (ie the run index) is the one with the # highest number of counts: self.index = np.argmax(all_counts) # Update the "best values" dict self.best_values = {k:self.variables[k][self.index] for k in self.variables.keys()} # with np.printoptions(precision=6, linewidth=300, threshold=200): # print('rfc: ') # print(rfc) # print('all_min: ', all_min) # print('all_counts: ', all_counts) # print('variables: ', self.variables) ####################################################################### # Store values ####################################################################### # Three datasets will be created or existing ones will be reused if # any. dset_values_title = "CurveComparison Values" dset_results_title = "CurveComparison Results" dset_rfc_title = "CurveComparison R-Factors" # Create (or re-create) the datasets dset_values = self.iodata.add_dset(dset_values_title, overwrite=True) dset_values.add_columns(x=[], yref=[]) view_values = dset_values.add_view("Comparison", xlabel="X", ylabel="Y", autoscale=True, overwrite=True) dset_results = self.iodata.add_dset(dset_results_title, overwrite=True) dset_results.add_columns(variable_set=list(range(self.stack_count)), counts=all_counts.copy()) dset_results.add_columns(**self.variables) view_results = dset_results.add_view("R-Factor analysis", xlabel="variable set number", ylabel="counts", title=("Number of R-Factors " "minima for each set of " "variables")) dset_rfc = self.iodata.add_dset(dset_rfc_title, overwrite=True) dset_rfc.add_columns(rfactor_number=list(range(12))) view_rfc = dset_rfc.add_view("R-Factor results", xlabel="R-Factor #", ylabel="R-Factor value", title="", autoscale=True, marker="s") for i in range(self.stack_count): rfc, exp_data, calc_data = self.io.load_results(index=i) # Store the experimental data dset_values.x, dset_values.yref = exp_data.T # Append the calculated values ycalc = calc_data[:,1] dset_values.add_columns(**{f"calc{i:d}": ycalc}) dset_rfc.add_columns(**{f'variable_set{i:d}': rfc}) # Plot the curves view_values.select('x', 'yref', legend='Reference values') title = '' for k,v in self.best_values.items(): title += f'{k}={v} ' view_values.select('x', f"calc{self.index:d}", legend="Best calculated values") view_values.set_plot_options(title=title) view_results.select('counts') for i in range(self.stack_count): view_rfc.select('rfactor_number', f'variable_set{i:d}', legend=f"variables set #{i:d}") # Save the parameters for p in self.get_parameters(): bundle = {'group': str(p.group), 'name': str(p.name), 'value': str(p.value), 'unit': '' if p.unit is None else str(p.unit)} dset_results.add_parameter(**bundle) # Move back to the initial folder os.chdir(cwd) # And return the Data object return self.iodata def get_parameters(self): """Get all the defined parameters in the calculator. :return: A list of all parameters objects. :rtype: List of :py:class:`parameters.Parameter` """ _ = [] for section in ('general', ): parameters = getattr(self, section + '_parameters') for p in parameters: _.append(p) return _ if __name__ == "__main__": pass