# Copyright French Alternative Energies and Atomic Energy Commission
# Contributors: valjean developers
# valjean-support@cea.fr
#
# This software is a computer program whose purpose is to analyze and
# post-process numerical simulation results.
#
# This software is governed by the CeCILL license under French law and abiding
# by the rules of distribution of free software. You can use, modify and/ or
# redistribute the software under the terms of the CeCILL license as circulated
# by CEA, CNRS and INRIA at the following URL: http://www.cecill.info.
#
# As a counterpart to the access to the source code and rights to copy, modify
# and redistribute granted by the license, users are provided only with a
# limited warranty and the software's author, the holder of the economic
# rights, and the successive licensors have only limited liability.
#
# In this respect, the user's attention is drawn to the risks associated with
# loading, using, modifying and/or developing or reproducing the software by
# the user in light of its specific status of free software, that may mean that
# it is complicated to manipulate, and that also therefore means that it is
# reserved for developers and experienced professionals having in-depth
# computer knowledge. Users are therefore encouraged to load and test the
# software's suitability as regards their requirements in conditions enabling
# the security of their systems and/or data to be ensured and, more generally,
# to use and operate it in the same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
'''Module performing scanning and parsing of Tripoli-4 outputs.
This module also allows quick checks on outputs:
* presence of ``"NORMAL COMPLETION"``
* presence and values of times (simulation, exploitation)
Some options for debugging are available (end flag).
.. todo::
Change absolute imports in relative ones when main will be moved to
:ref:`cambronne <cambronne-main>`.
'''
import logging
import threading
from pyparsing import ParseException, ParserElement
from . import scan
from .grammar import t4gram
from .common import SpectrumDictBuilderException, MeshDictBuilderException
from ...chrono import Chrono
from ..browser import Browser
from .data_convertor import convert_data
LOGGER = logging.getLogger(__name__)
PYPARSING_LOCK = threading.RLock()
# in Eponine, profile is a key of globals
if 'profile' not in globals()['__builtins__']:
[docs] def profile(fprof):
'''To profile memory usage.'''
return fprof
[docs]class ParserException(Exception):
'''An error that may be raised by the :class:`Parser` class.'''
[docs]class Parser:
'''Scan Tripoli-4 listings, then parse the required batches.'''
[docs] @profile
def __init__(self, jddname):
'''Initialize the :class:`Parser` object.
:param str jddname: path to the Tripoli-4 output
It also initalizes the result of :class:`.scan.Scanner` to ``None``,
then executes the scan. If this step fails an exception is raised.
The Parser main object instance variable is:
`scan_res` (:class:`~.scan.Scanner`)
result from the scan of the Tripoli-4 output. See in the related
documentation the various instance variables available (like
``times``). Inheriting from :class:`collections.abc.Mapping`
various default methods are available like ``len``, ``[]``, etc.
The keys of :class:`~.scan.Scanner` are the batch numbers available
from the Tripoli-4 output. To get their list, use
:meth:`batch_numbers`.
Parsing (e.g. :meth:`parse_from_number`) returns a
:class:`ParseResult`.
'''
LOGGER.info("Parsing %s", jddname)
self.jdd = jddname
try:
with Chrono() as chrono:
self.scan_res = self._scan()
except scan.ScannerException as t4se:
LOGGER.error(t4se)
raise ParserException("Scan failed.") from None
except ParserException as t4pe:
LOGGER.error(t4pe)
raise ParserException("Scan failed.") from None
LOGGER.info("Successful scan in %f s", chrono)
def _scan(self):
'''Scan the parse the given jdd.'''
scan_res = self._scan_listing()
self._check_scan(scan_res)
return scan_res
@profile
def _scan_listing(self):
'''Scan Tripoli-4 listing, calling :mod:`.scan`.
:rtype: Scanner
'''
return scan.Scanner(self.jdd)
def _check_scan(self, scan_res):
'''Check existence of scan result and presence of normal end (per
default NORMAL COMPLETION).
:param Scanner scan_res: scan result
'''
if not scan_res:
raise ParserException(
f"No result found in Tripoli-4 listing {self.jdd}\n"
f"{scan_res.fatal_error()}")
if not scan_res.normalend:
LOGGER.warning("Tripoli-4 listing did not finish with "
"NORMAL COMPLETION.")
[docs] def batch_numbers(self):
'''Help method to get the available batch numbers.
:rtype: list(int)
'''
return list(self.scan_res.keys())
def _parse_listing_worker(self, gram, str_to_parse):
'''Parse the given string and raise exception if parsing failed.'''
try:
with PYPARSING_LOCK:
# Disable packrat caching: it degrades the performance of our
# grammar. We have to disable it globally here, because other
# packages (matplotlib for example) may have enabled it
# globally
ParserElement.disable_memoization()
result = gram.parseString(str_to_parse).asList()
except ParseException as err:
LOGGER.error("Parsing failed in %s, you are probably trying to "
"read a new response. Please update the parser "
"before re-running.", self.jdd)
LOGGER.debug("Exception explanation:\n%s", err.explain(depth=None))
# from None allows to raise a new exception without traceback and
# message of the previous one here.
raise ParserException("Error in parsing") from None
except (SpectrumDictBuilderException, MeshDictBuilderException) as dbe:
LOGGER.error(dbe)
raise ParserException("Error in parsing") from None
return result
def _time_consistency(self, pres, batch_number):
'''Check time consistency between parsed result and scan.'''
if 'batch_data' not in pres:
raise ParserException('No batch_data in parsed result, '
'something looks wrong in the T4 output.')
bdata = pres['batch_data']
try:
time_key = next(k for k in bdata if 'time' in k)
except StopIteration as sit:
raise ParserException(
'No "time" variable found in the Tripoli-4 output, '
'please check it.') from sit
if bdata[time_key] != self.scan_res.times[time_key][batch_number]:
msg = (f'{time_key} looks inconsistent between parsing '
f'({bdata[time_key]}) and scanning '
f'({self.scan_res.times[time_key][batch_number]})')
if self.scan_res.partial:
LOGGER.warning(msg)
else:
raise ParserException(msg)
[docs] def parse_from_number(self, batch_number, name=''):
'''Parse from batch index or batch number.
:param int batch_number: number of the batch to parse
:rtype: ParseResult
'''
LOGGER.debug('Using parse from Parser')
with Chrono() as chrono:
pres, = self._parse_listing_worker(
t4gram, self.scan_res[batch_number])
LOGGER.info("Successful parsing in %f s", chrono)
self._time_consistency(pres, batch_number)
scan_vars = self.scan_res.global_variables(batch_number)
return ParseResult(pres, scan_vars, name)
[docs] def parse_from_index(self, batch_index=-1, name=''):
'''Parse from batch index or batch number.
Per default the last batch is parsed (index = -1).
:param int batch_index: index of the batch in the list of batches
:rtype: ParseResult
'''
batch_number = self.scan_res.batch_number(batch_index)
return self.parse_from_number(batch_number, name)
[docs] def print_stats(self):
'''Print Tripoli-4 statistics (warnings and errors).'''
self.scan_res.print_statistics()
[docs] def check_times(self):
'''Check if running times are well written in Tripoli-4 listings.
These times are at the end of the result block and mark the end flag.
:returns: boolean, True if well present, else False
Returned bool depends on the listing:
* if the job was run in parallel mode (and declared so),
``"simulation time"`` and ``"elapsed time"`` should be present in
that order, only the second one is checked
* if the listing name contains ``"exploit"`` or ``"verif"``, it is most
probably an exploitation job (Green bands for example),
``"exploitation time"`` is checked
* else ``"simulation time"`` is checked
'''
return self.scan_res.check_times()
[docs] def print_times(self):
'''Print time characteristics of the Tripoli-4 result considered.
This print includes initialization time, simulation time, exploitation
time and elapsed time.
'''
for stime, vtime in self.scan_res.times.items():
print(stime.capitalize(), "=", vtime)
[docs]class ParseResult:
'''Class containing a parsing result from Tripoli-4 output for one batch.
The :class:`ParseResult` object is accessible from the instance
attribute
`res`
that is a unique dictionary containing all the results from
scanning and parsing steps. Variables characteristic to a batch are
stored under the key ``'batch_data'`` no matter if they come from
:class:`.Scanner` or from :class:`Parser`. Variables
characteristic to a run (= one execution of Tripoli-4) are stored
under ``'run_data'``, coming from the scanning step.
It is possible to transform the ``res`` dictionary in a
:class:`~valjean.eponine.browser.Browser` thanks to the
method :meth:`to_browser`.
'''
[docs] def __init__(self, parse_res, scan_vars, name=''):
'''Initialize the :class:`ParseResult` from:
:param dict parse_res: result from T4 parsing (for 1 batch)
:param dict scan_vars: variables coming from :class:`.Scanner` global
to job or specific to the batch.
:param str name: name to give to the parse result (will be propagated
to data)
Fill the `res` object.
'''
self.name = name
self._check_batch_number(parse_res, scan_vars)
self.pres = parse_res
self.res = self._build_unique_dict(self._build_datasets(),
scan_vars, name)
@staticmethod
def _check_batch_number(pres, svars):
'''Check that batch number from scan variables and edition batch number
if exists are the same, else emit a warning.
'''
ebn = pres.get('edition_batch_number')
sbn = svars.get('batch_number')
if sbn is None:
LOGGER.warning('No batch number was set in Scanner, please check.')
if ebn is not None and ebn != sbn:
LOGGER.warning('Edition batch number different from batch number')
def _build_datasets(self):
ares = {}
for key, val in self.pres.items():
if key == 'batch_data':
ares[key] = val.copy()
else:
ares[key] = [self._make_datasets(r) for r in val]
return ares
@staticmethod
def _set_array_what(score, resp):
if resp['response_type'] == 'sensitivity':
return 'sensitivity'
if resp['response_type'] == 'spherical_harmonics':
return resp['results']['spherical_harmonics']['what']
# score_name for adjoint criticality edition
what = resp.get('response_function', resp.get('score_name'))
return score.replace('score', what).lower()
@staticmethod
def _set_scalar_what(resn, resp):
if resp['response_type'] == 'sensitivity':
return 'sensitivity'
if 'batches' in resn or 'kij' in resn:
return resn
return resp.get('response_function', resp.get('response_type')).lower()
@staticmethod
def _set_args(score, resn, resp):
rname = '_'.join((score, resn)).replace('not_converged',
resp['response_type'])
sigma = 'sigma' if 'sigma' in resp['results'][resn] else None
if score == 'keff':
sigma = 'sigma%'
rname = score
what = 'keff'
elif 'correlation' in score:
what = 'correlation'
sigma = None
elif score == 'equivalent_keff':
what = 'keff'
rname = score
sigma = None
elif 'vovstar' in score:
sigma = 'vovstar_sigma'
what = 'vovstar'
elif 'uncert' in resn:
rname = rname.replace('uncert_', '')
what = score
elif resn == 'parna_likelihood':
if score == 'mean':
rname = resn
what = resn
else:
rname = '_'.join((resn, score))
what = score
else:
what = resp['response_function'].lower()
return rname, sigma, what
def _dset_from_dict(self, resn, res, resp):
tdict = {}
for arr in res:
if arr in ('units', 'coordinates'):
tdict[arr] = res[arr]
continue
if ((isinstance(res[arr], str)
and arr not in ('not_converged', 'what'))):
tdict[arr] = res[arr]
continue
if arr in ('bins', 'sigma', 'sigma%', 'what'):
continue
if 'array' in arr:
scores = [k for k in res[arr].dtype.names if k != 'sigma']
for score in scores:
rname = ('_'.join([score, arr]) if score not in arr
else arr).replace('_array', '')
what = self._set_array_what(score, resp)
tdict[rname] = convert_data(
resp['results'], resn, array_key=arr, score=score,
name=self.name, what=what)
else:
rname, sigma, what = self._set_args(arr, resn, resp)
tdict[rname] = convert_data(
resp['results'], resn, name=self.name,
what=what, score=arr, sigma=sigma)
return tdict
def _make_datasets(self, resp):
ress = {k: v for k, v in resp.items() if k != 'results'}
res = {}
resp_res = resp['results']
for resn, ires in resp_res.items():
if resn == 'spectrum' and {'mesh', 'spectrum'}.issubset(resp_res):
if not any('entropy' in n for n in resp_res['mesh']):
LOGGER.warning(
'Mesh and spectrum in same result, skipping spectrum. '
'Not foreseen case, please contact a developer')
continue
if isinstance(ires, dict):
res.update(self._dset_from_dict(resn, ires, resp))
elif isinstance(res, str) and resn != 'not_converged':
res[resn] = ires
else:
rname = (resp['response_type'] if resn == 'not_converged'
else resn)
what = self._set_scalar_what(resn, resp)
res[rname] = convert_data(resp_res, resn, name=self.name,
what=what)
ress['results'] = res
return ress
@staticmethod
def _build_unique_dict(pres, svars, name):
'''Build a unique dictionary from parsed result and global variables
from Scanner.
Variables specific to batch are added to the already existing
dictionary under the key ``'batch_data'`` from parsed result, while a
new item is created for the run data (key: ``'run_data'``).
:param dict pres: parsed result
:param dict svars: global variables from Scanner
:param str name: name of the parsed result (be considered as
``'batch_data'``)
:returns: updated parsed result
'''
gvars = svars.copy()
bdata_keys = {'batch_number', 'simulation_time', 'elapsed_time',
'exploitation_time'}
for key in bdata_keys & set(gvars.keys()):
pres['batch_data'].update({key: gvars.pop(key)})
pres['batch_data'].update({'name': name})
pres['run_data'] = gvars
return pres
[docs] def to_browser(self):
'''Get a :class:`~valjean.eponine.browser.Browser` from the
:class:`ParseResult`.
The global variables in Browser are the batch data. You can access the
`run data` only from the parsed result.
:rtype: Browser
'''
list_resps = [resp for key, lresp in self.res.items()
for resp in lresp
if key not in ('batch_data', 'run_data')]
browser = Browser(list_resps, global_vars=self.res['batch_data'])
if browser.is_empty():
LOGGER.error('Browser creation failed, please check what happened')
return browser