Source code for pyiron.base.pyio.parser

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

from __future__ import print_function
import ast
import numpy as np

"""
General purpose output parser
"""

__author__ = "Joerg Neugebauer"
__copyright__ = (
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
    "Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"


[docs]def extract_data_from_str_lst(str_lst, tag, num_args=1): """ General purpose routine to extract any static from a log (text) file Args: file_name (str): file name or path to the file, can either be absolute or relative tag (str): string at the beginning of the line num_args (int): number of arguments separated by ' ' or ',' to extract after the tag Returns: list: List of arguments extracted as strings """ def multiple_delimiter_split(s, seps): res = [s] for sep in seps: s, res = res, [] for seq in s: res += seq.split(sep) while "" in res: res.remove("") return res collector = [] ind_start = len(tag.split()) for line_in_file in str_lst: if line_in_file.startswith(tag): collector = [] vals = multiple_delimiter_split(line_in_file, (" ", ",")) if num_args == 1: collector.append(vals[ind_start]) else: collector.append(vals[ind_start : num_args + ind_start]) return collector
[docs]def extract_data_from_file(file_name, tag, num_args=1): """ General purpose routine to extract any static from a log (text) file Args: file_name (str): file name or path to the file, can either be absolute or relative tag (str): string at the beginning of the line num_args (int): number of arguments separated by ' ' or ',' to extract after the tag Returns: list: List of arguments extracted as strings """ with open(file_name) as infile: content = infile.readlines() return extract_data_from_str_lst(str_lst=content, tag=tag, num_args=num_args)
[docs]class Logstatus(object): """ Generic Parser for parsing output files by searching for a specific pattern structure and extracting the data that follows the pattern into the status_dict dictionary. Args: iter_levels (int): Levels of iteration - default = 1 """ def __init__(self, h5=None, iter_levels=1): # path = None, # path of h5 file if h5 is not None: h5.add_group("generic") h5.move_up() self.h5 = h5 self.h5_group_data = h5.getGroup().logStatus self.status_dict = {} self.iter_levels = iter_levels self.iter = iter_levels * [0] self.store_as_vector = [] self.h5_open = False
[docs] def reset_iter(self, dim=0): """ Reset iteration level Args: dim (int): reset value - default = 0 """ for i in range(dim, self.iter_levels): self.iter[i] = 0
[docs] def raise_iter(self, dim=0): """ Increase the iteration level Args: dim (int): position - default = 0 """ self.iter[dim] += 1
[docs] def append(self, title, data_to_append, vec=False): """ Append data to the LogStatus object status_dict dictionary Args: title (str): Title of the data to append data_to_append (list,dict): the data can be of various types vec (bool): [True/False] if the data is a single vector instead of a matrix or a tensor """ if title in self.status_dict.keys(): if vec: raise ValueError( "For appending matrix rather than vector option needed!" ) self.status_dict[title].append([list(self.iter), data_to_append]) else: self.status_dict[title] = [[list(self.iter), data_to_append]]
[docs] def to_hdf(self, hdf): """ Store the LogStatus object status_dict dictionary in an HDF5 file Args: hdf (ProjectHDFio): HDF5 object to store the dictionary in. """ for key, value in self.status_dict.items(): if key in self.store_as_vector: if len(value) > 1: raise ValueError( "Multi-dimensional array cannot be saved as vector" ) hdf[key] = np.array(value[0][1]) else: hdf[key] = np.array([val for _, val in value])
[docs] def combine_xyz(self, x_key, y_key, z_key, combined_key, as_vector=False): """ Combine three lists representing the x,y,z coordinates, by accessing them from the status_dict dictionary, combining them, store them under the combined_key and remove the other three keys. Args: x_key (str): key of the x coordinates y_key (str): key of the y coordinates z_key (str): key of the z coordinates combined_key (str): name of the combined coordinates """ if ( x_key in self.status_dict and y_key in self.status_dict and z_key in self.status_dict ): combined_lst = [] if as_vector: time_x, val_x = self.status_dict[x_key][0] time_y, val_y = self.status_dict[y_key][0] time_z, val_z = self.status_dict[z_key][0] for val_t_x, val_t_y, val_t_z in zip(val_x, val_y, val_z): combined_lst.append([time_x, [val_t_x, val_t_y, val_t_z]]) else: for var_x, var_y, var_z in zip( self.status_dict[x_key], self.status_dict[y_key], self.status_dict[z_key], ): time_x, val_x = var_x time_y, val_y = var_y time_z, val_z = var_z combined_lst.append( [ time_x, [ [val_t_x, val_t_y, val_t_z] for val_t_x, val_t_y, val_t_z in zip( val_x, val_y, val_z ) ], ] ) del self.status_dict[x_key] del self.status_dict[y_key] del self.status_dict[z_key] self.status_dict[combined_key] = combined_lst
[docs] def combine_mat(self, x_key, xy_key, xz_key, y_key, yz_key, z_key, combined_key): """ Combine three lists representing the x,y,z coordinates, by accessing them from the status_dict dictionary, combining them, store them under the combined_key and remove the other three keys. Args: x_key (str): key of the x coordinates y_key (str): key of the y coordinates z_key (str): key of the z coordinates combined_key (str): name of the combined coordinates """ if ( x_key in self.status_dict and y_key in self.status_dict and z_key in self.status_dict ): combined_lst = [] for var_xx, var_xy, var_xz, var_yy, var_yz, var_zz in zip( self.status_dict[x_key], self.status_dict[xy_key], self.status_dict[xz_key], self.status_dict[y_key], self.status_dict[yz_key], self.status_dict[z_key], ): time_xx, val_xx = var_xx time_xy, val_xy = var_xy time_xz, val_xz = var_xz time_yy, val_yy = var_yy time_yz, val_yz = var_yz time_zz, val_zz = var_zz combined_lst.append( [ time_xx, [ [ [var_t_xx, var_t_xy, var_t_xz], [var_t_yx, var_t_yy, var_t_yz], [var_t_zx, var_t_zy, var_t_zz], ] for var_t_xx, var_t_xy, var_t_xz, var_t_yx, var_t_yy, var_t_yz, var_t_zx, var_t_zy, var_t_zz in zip( val_xx, val_xy, val_xz, val_xy, val_yy, val_yz, val_xz, val_yz, val_zz, ) ], ] ) del self.status_dict[x_key] del self.status_dict[xy_key] del self.status_dict[xz_key] del self.status_dict[y_key] del self.status_dict[yz_key] del self.status_dict[z_key] self.status_dict[combined_key] = combined_lst
[docs] def convert_unit(self, key, factor): if key in self.status_dict: return_lst = [] for step in self.status_dict[key]: time, values = step return_lst.append([time, (np.array(values) * factor).tolist()]) self.status_dict[key] = return_lst
[docs] @staticmethod def extract_item(l_item): """ Method to extract information from a single line - currently very specific for the Lammps output Args: l_item (str): line to extract information from Returns: str, list: the tag_string as string and the arguments as list """ item_list = l_item.split() first_item = item_list[1] if first_item == "NUMBER": num_elements = 3 elif first_item == "BOX": num_elements = 2 else: num_elements = 1 tag = item_list[1 : num_elements + 1] tag_string = " ".join(el for el in tag) if len(item_list) == num_elements + 1: args = None else: args = item_list[num_elements + 1 : :] return tag_string, args
[docs] def extract_from_list(self, list_of_lines, tag_dict, h5_dict=None, key_dict=None): """ Main function of the LogStatus class to extract data from an output file by searching for the tag dictionary Args: file_name (str): absolute path to the output file tag_dict (dict): Dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data h5_dict (dict): Translation dictionary of output tags as keys to the tags used on the HDF5 file as values. key_dict (dict): Translation dictionary of python internal tags as keys to the output tags as values. """ val_item = {} tag_vals = {} tag = LogTag(tag_dict, h5_dict, key_dict) iterate_over_lines = iter(list_of_lines) for line_read in iterate_over_lines: while True: if tag.is_item(line_read): # items): tag_name = tag.tag_name if tag.rows() == 0: # read single line_read tag.set_item(tag_vals, self) try: line_read = next(iterate_over_lines) except StopIteration: break else: for _ in range(tag.line_skip()): line_read = next(iterate_over_lines) if isinstance(tag.rows(), str): i_line = 0 while True: try: line_read = next(iterate_over_lines) except StopIteration: break if line_read.find(tag.rows().strip()) > -1: break if "WARNING:" in line_read: break val_line = [ [ast.literal_eval(l) for l in line_read.split()] ] if i_line == 0: val_array = np.array(val_line) else: val_array = np.append( arr=val_array, values=val_line, axis=0 ) i_line += 1 else: for i_line in range(tag.rows()): try: line_read = next(iterate_over_lines) except StopIteration: break val_line = [ [ast.literal_eval(l) for l in line_read.split()] ] if i_line == 0: val_array = np.array(val_line) else: val_array = np.append( arr=val_array, values=val_line, axis=0 ) if tag.is_func(): val_array = tag.apply_func(val_array) val_item[tag_name] = val_array if np.shape(val_array) == (1, 1): self.append(tag.h5(), val_array[0, 0]) elif tag.test_split(): tag_list = None if tag.split_tag: tag_list = tag_name.split() elif tag.split_arg: if "header" not in tag_dict[tag_name].keys(): tag_list = tag.val_list else: tag_list = tag_dict[tag_name]["header"] for i, t in enumerate(tag_list): if "header" not in tag_dict[tag_name].keys(): self.append( tag.translate(t), np.copy(val_array[:, i]) ) else: self.append(t, np.copy(val_array[:, i])) else: self.append(tag.h5(), np.copy(val_array)) else: try: line_read = next(iterate_over_lines) except StopIteration: break
[docs] def extract_file(self, file_name, tag_dict, h5_dict=None, key_dict=None): """ Main function of the LogStatus class to extract data from an output file by searching for the tag dictionary Args: file_name (str): absolute path to the output file tag_dict (dict): Dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data h5_dict (dict): Translation dictionary of output tags as keys to the tags used on the HDF5 file as values. key_dict (dict): Translation dictionary of python internal tags as keys to the output tags as values. """ with open(file_name, "r") as f: content = f.readlines() self.extract_from_list( list_of_lines=content, tag_dict=tag_dict, h5_dict=h5_dict, key_dict=key_dict )
[docs]class LogTag(object): """ LogTag object to parse for a specific pattern in the output file Args: tag_dict (dict): Dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data h5_dict (dict): Translation dictionary of output tags as keys to the tags used on the HDF5 file as values. key_dict (dict): Translation dictionary of python internal tags as keys to the output tags as values. """ def __init__(self, tag_dict, h5_dict=None, key_dict=None): self._tag_dict = None self._tag_first_word = None self._current = None self._dyn_tags = None self._key_dict = None self._h5_dict = None self._tag_name = None self.tag_dict = tag_dict self.key_dict = key_dict self.h5_dict = h5_dict @property def current(self): """ Get the current tag Returns: dict: current tag """ return self._current @current.setter def current(self, tag_name): """ Set the current tag Args: tag_name (str): current tag """ if tag_name not in self.tag_dict.keys(): raise ValueError("Unknown tag_name: " + tag_name) self._tag_name = tag_name self._current = self.tag_dict[tag_name] @property def tag_name(self): """ Get tag name Returns: str: tag name """ return self._tag_name @property def tag_dict(self): """ Get tag dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data Returns: dict: tag dictionary """ return self._tag_dict @tag_dict.setter def tag_dict(self, tag_dict): """ Set tag dictionary with tags/patterns as key and an additional dictionary to describe the data structure. The data structure dictionary can contain the following keys: - "arg": position of the argument - or dimension (":", ":,:") - "type": Python data type - "h5": HDF5 key to store the information - "rows": number of rows from the line where the tag was found - "splitTag": split the tag - [True/False] - "splitArg": split the argument - [True/False] - "lineSkip": skip a line - "func": function to convert the data Args: tag_dict (dict): tag dictionary """ self._tag_dict = tag_dict self._tag_first_word = tuple(self.tag_dict.keys()) self.dyn_tags = tag_dict @property def tag_first_word(self): """ Get first word of the tag Returns: str: first word """ return self._tag_first_word @property def dyn_tags(self): """ Get dynamic tags Returns: dict: dynamic tags """ return self._dyn_tags @dyn_tags.setter def dyn_tags(self, tag_dict): """ Set dynamic tags Args: tag_dict (dict): tag dictionary """ dyn_tags = {} for w in tag_dict.keys(): items = w.split() if items[0][:1] == "$": dyn_tags[w[1:]] = w self._dyn_tags = dyn_tags @property def key_dict(self): """ Get translation dictionary of python internal tags as keys to the output tags as values. Returns: dict: key dictionary """ return self._key_dict @key_dict.setter def key_dict(self, key_dict): """ Set translation dictionary of python internal tags as keys to the output tags as values. Args: key_dict (dict): key dictionary """ self._key_dict = key_dict @property def h5_dict(self): """ Get translation dictionary of output tags as keys to the tags used on the HDF5 file as values. Returns: dict: h5 dictionary """ return self._h5_dict @h5_dict.setter def h5_dict(self, h5_dict): """ Set translation dictionary of output tags as keys to the tags used on the HDF5 file as values. Args: h5_dict (dict): h5 dictionary """ self._h5_dict = h5_dict
[docs] def is_item(self, item_line, start=0): """ Check if the current line - item_line - matches one of the provided tags, if that is the case set the tag to be the current tag and update the val_list with the corresponding values. Args: item_line (str): Line of the output file start (int): Character to start with when parsing the item_line - default=0 Returns: bool: [True/False] """ l = item_line.strip() if not l.startswith( self.tag_first_word, start ): # start -> line must start with tag return False tag = None for tag in self.tag_first_word: if start == l.find(tag, start): break items = [ls.strip() for ls in l[len(tag) :].split()] self.current = tag self.val_list = items return True
[docs] def get_item(self, item, default): """ If item is part of the current dictionary keys the corresponding value is returned otherwise the default is returned. Args: item (str): dictionary key default (list, dict, int, float): Default value Returns: list, dict, int, float: The values connected to the key item in the current dictionary and if item is not a key in the current dictionary return the default value. """ if self.current is None: raise ValueError("current tag not defined!") if item in self.current.keys(): return self.current[item] else: return default
[docs] def h5(self): """ Translate current tag to HDF5 tag using the tag dictionary Returns: str: hdf5 key name """ return self.get_item(item="h5", default=self.tag_name)
[docs] def translate(self, item): """ Translate current tag to HDF5 tag using the h5_dict dictionary Args: item (str): Python tag Returns: str: HDF5 tag """ if self.h5_dict is None: raise ValueError("h5_dict is None!" + item) if item in self.h5_dict.keys(): return self.h5_dict[item] else: raise ValueError("tag not in h5_dict: " + item)
[docs] def arg(self): """ Get tag argument Returns: str: tag arguments """ l_arg = self.get_item(item="arg", default=0) if isinstance(l_arg, str): return l_arg else: return str(l_arg)
[docs] def line_skip(self): """ Check how many lines should be skipped. Returns: bool: [True/ False] """ return bool(self.get_item(item="lineSkip", default=0))
[docs] def rows(self): """ Number of rows to parse Returns: int, str: number of rows """ rows = self.get_item(item="rows", default=0) try: return int(rows) except ValueError: return rows
[docs] def test_split(self): """ Check if the argument or the tag should be split - if "splitArg" or "splitTag" is included in the tag_dict dictionary. Returns: bool: [True/ False] """ self.split_arg = self.get_item(item="splitArg", default=False) self.split_tag = self.get_item(item="splitTag", default=False) return self.split_arg or self.split_tag
[docs] def is_func(self): """ Check if a function is defined to convert the data - if "func" is included in the tag_dict dictionary Returns: bool: [True/ False] """ my_func = self.get_item(item="func", default=None) return my_func is not None
[docs] def apply_func(self, val): """ Apply the function on a given value Args: val (dict, list, float, int): value to apply the function on Returns: dict, list, float, int: result of applying the function """ my_func = self.get_item(item="func", default=None) if my_func is not None: return my_func(val)
[docs] def set_item(self, tag_vals, log_file): """ Set LogTag item Args: tag_vals (dict): tag value dictionary log_file (Logstatus): Logstatus object Returns: list: tag name, tag values, rows, line skip [True/False] """ tag_name = self.tag_name if self.rows() == 0: if not len(self.arg()) == 1: val = [] for i_item in ast.literal_eval(self.arg()): val.append(ast.literal_eval("self.val_list[" + i_item + "]")) else: # input is an array val = eval("self.val_list[" + self.arg() + "]") if isinstance(val, str): val = ast.literal_eval(val) tag_vals[tag_name] = val if len(self.arg()) == 1: log_file.append(self.h5(), data_to_append=val) else: for i_num, i_val in enumerate(val): log_file.append(self.h5()[i_num], data_to_append=i_val) if tag_name in self.dyn_tags.keys(): self.resolve_dynamic_variable(val) return tag_name, tag_vals, self.rows(), self.line_skip()
[docs] def resolve_dynamic_variable(self, val): """ Resolve dynamic variable using the key_dict dictionary Args: val: values to resolve """ d_name = self.dyn_tags[self.tag_name] if self.key_dict is not None: val = [self.key_dict[v] for v in val if v in self.key_dict.keys()] resolved_name = " ".join(val) v = self.tag_dict[d_name] self.tag_dict[resolved_name] = v del self.tag_dict[d_name] self.dyn_tags = self.tag_dict self._tag_first_word = tuple(self.tag_dict.keys())