Source code for daetools.pyDAE.data_reporters

"""********************************************************************************
                            data_reporters.py
                 DAE Tools: pyDAE module, www.daetools.com
                 Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
********************************************************************************"""
import os, sys, numpy, json, itertools
from pyCore import *
from pyDataReporting import *

def _formatName(name):
    # Strip non alpha-numeric characters and remove the root model name.
    # I.e. _formatName('model1.model2.var') returns 'model2.var'.
    lnames = daeGetStrippedName(name).split('.')
    return '.'.join(lnames[1:])

[docs]class daePickleDataReporter(daeDataReporterFile): """ Saves data as the Python pickle which can be opened in DAE Plotter or unpickled directly: .. code-block:: none f = open(filename, 'rb') process = pickle.load(f) f.close() where process is the dataReceiverProcess instance identical to the daeDataReceiverProcess objects used in the other data reporters below. """ def __init__(self): daeDataReporterFile.__init__(self)
[docs] def WriteDataToFile(self): try: from daetools.dae_plotter.data_receiver_io import pickleProcess pickleProcess(self.Process, self.ConnectString) except Exception as e: print(('Cannot write results in the Python pickle format:\n' + str(e)))
[docs]class daeVTKDataReporter(daeDataReporterLocal): """ Saves data in the binary VTK format (.vtr) using the pyEVTK module. pyEVTK is included in the daetools installation (daetools.ext_libs.pyevtk). A separate file is written into the specified directory for every time point. In addition, the 'variableName.visit' files are written for use with the VisIt software. Notate bene: - The original is available at https://pypi.python.org/pypi/PyEVTK. Install using: "pip install pyevtk". - It is not an original module available at https://bitbucket.org/pauloh/pyevtk. Does not require VTK installed. """ def __init__(self): daeDataReporterLocal.__init__(self) def Connect(self, ConnectString, ProcessName): """ ConnectString is a directory where the .vtr files will be written. """ self.ConnectString = ConnectString self.ProcessName = ProcessName self.base_folder = ConnectString if not os.path.isdir(self.base_folder): try: os.makedirs(self.base_folder) except Exception as e: print('Cannot create directory %s for VTK files:\n%s' % (self.base_folder, str(e))) return False return True def IsConnected(self): return True def Disconnect(self): self.WriteFiles() return True
[docs] def WriteFiles(self): try: from daetools.ext_libs.pyevtk.hl import gridToVTK vtk_visit = {} for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in self.Process.dictVariableValues.items(): varName = _formatName(variable_name) x = numpy.array([0.0]) y = numpy.array([0.0]) z = numpy.array([0.0]) nd = len(l_domains) if nd > 3: break if nd == 1: x = numpy.array(l_domains[0]) elif nd == 2: x = numpy.array(l_domains[0]) y = numpy.array(l_domains[1]) elif nd == 3: x = numpy.array(l_domains[0]) y = numpy.array(l_domains[1]) z = numpy.array(l_domains[2]) for (t,), time in numpy.ndenumerate(ndarr_times): filename = '%s-%.5f' % (varName, time) filepath = os.path.join(self.base_folder, filename) if nd == 0: values = numpy.array([ndarr_values[t]]) else: values = numpy.array(ndarr_values[t]) values = values.reshape((len(x), len(y), len(z))) gridToVTK(filepath, x, y, z, pointData = {varName : values}) if not varName in vtk_visit: vtk_visit[varName] = [] vtk_visit[varName].append(filename + '.vtr') for var, files in vtk_visit.items(): filename = '%s.visit' % var filepath = os.path.join(self.base_folder, filename) f = open(filepath, 'w') f.write('\n'.join(files)) f.close() except Exception as e: print(('Cannot write results in .vtk format:\n' + str(e)))
[docs]class daeMatlabMATFileDataReporter(daeDataReporterFile): """ Saves data in Matlab MAT format format (.mat) using scipy.io.savemat function. Every variable is saved as numpy array (variable names are stripped from illegal characters). In addition, time and domain points for every variable are saved as 'varName.Times' and 'varName.Domains'. Does not require Matlab installed. """ def __init__(self): daeDataReporterFile.__init__(self)
[docs] def WriteDataToFile(self): try: import scipy.io mdict = {} variables = self.Process.dictVariables for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in self.Process.dictVariableValues.items(): name = _formatName(variable_name) domainNames = [_formatName(d.Name) for d in variables[variable_name].Domains] mdict[name] = ndarr_values mdict[name + '.Times'] = ndarr_times mdict[name + '.DomainNames'] = numpy.array(domainNames) mdict[name + '.Domains'] = numpy.array(l_domains) mdict[name + '.Units'] = s_units scipy.io.savemat(self.ConnectString, mdict, appendmat = True, format = '5', long_field_names = True, do_compression = False, oned_as = 'row') except Exception as e: print(('Cannot write results in .mat format:\n' + str(e)))
[docs]class daeJSONFileDataReporter(daeDataReporterFile): """ Saves data in JSON text format using the Python json library. """ def __init__(self): daeDataReporterFile.__init__(self)
[docs] def WriteDataToFile(self): try: mdict = {} variables = self.Process.dictVariables for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in self.Process.dictVariableValues.items(): name = _formatName(variable_name) domainNames = [_formatName(d.Name) for d in variables[variable_name].Domains] mdict[name] = {'Values' : ndarr_values.tolist(), 'Times' : ndarr_times.tolist(), 'DomainNames' : domainNames, 'Domains' : [d.tolist() for d in l_domains], 'Units' : s_units } f = open(self.ConnectString, 'w') f.write(json.dumps(mdict, sort_keys=True, indent=4)) f.close() except Exception as e: print(('Cannot write data in JSON format:\n' + str(e)))
[docs]class daeHDF5FileDataReporter(daeDataReporterFile): """ Saves data in HDF5 format using the Python h5py library. A separate group is created for every variable and contain the following data sets: - Values: multidimensional array with the variable values - Times: 1d array with the time points - DomainNames: names of the domains that the variable is distributed on - Domains: multidimensional array with the domain points - Units: variable units as a string """ def __init__(self): daeDataReporterFile.__init__(self)
[docs] def WriteDataToFile(self): try: import h5py f = h5py.File(self.ConnectString, "w") variables = self.Process.dictVariables for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in self.Process.dictVariableValues.items(): name = _formatName(variable_name) domainNames = [_formatName(d.Name).encode('utf8') for d in variables[variable_name].Domains] grp = f.create_group(name) dsv = grp.create_dataset("Values", data = ndarr_values) dst = grp.create_dataset("Times", data = ndarr_times) dsd = grp.create_dataset("DomainNames", data = domainNames) dsd = grp.create_dataset("Domains", data = l_domains) dsu = grp.create_dataset("Units", data = s_units) f.close() except Exception as e: print(('Cannot write data in HDF5 format:\n' + str(e)))
[docs]class daeXMLFileDataReporter(daeDataReporterFile): """ Saves data in XML format (.xml) using the Python xml library. The numerical data are saved as json strings (for easier parsing). """ def __init__(self): daeDataReporterFile.__init__(self)
[docs] def WriteDataToFile(self): try: from xml.etree import ElementTree class XmlDictObject(dict): """ Adds object like functionality to the standard dictionary. """ def __init__(self, initdict=None): if initdict is None: initdict = {} dict.__init__(self, initdict) def __getattr__(self, item): return self.__getitem__(item) def __setattr__(self, item, value): self.__setitem__(item, value) def __str__(self): if self.has_key('_text'): return self.__getitem__('_text') else: return '' @staticmethod def Wrap(x): """ Static method to wrap a dictionary recursively as an XmlDictObject """ if isinstance(x, dict): return XmlDictObject((k, XmlDictObject.Wrap(v)) for (k, v) in x.items()) elif isinstance(x, list): return [XmlDictObject.Wrap(v) for v in x] else: return x @staticmethod def _UnWrap(x): if isinstance(x, dict): return dict((k, XmlDictObject._UnWrap(v)) for (k, v) in x.items()) elif isinstance(x, list): return [XmlDictObject._UnWrap(v) for v in x] else: return x def UnWrap(self): """ Recursively converts an XmlDictObject to a standard dictionary and returns the result. """ return XmlDictObject._UnWrap(self) def _ConvertDictToXmlRecurse(parent, dictitem): assert type(dictitem) is not type([]) if isinstance(dictitem, dict): for (tag, child) in dictitem.items(): if str(tag) == '_text': parent.text = str(child) elif type(child) is type([]): # iterate through the array and convert for listchild in child: elem = ElementTree.Element(tag) parent.append(elem) _ConvertDictToXmlRecurse(elem, listchild) else: elem = ElementTree.Element(tag) parent.append(elem) _ConvertDictToXmlRecurse(elem, child) else: parent.text = str(dictitem) def ConvertDictToXml(xmldict): """ Converts a dictionary to an XML ElementTree Element """ roottag = list(xmldict.keys())[0] root = ElementTree.Element(roottag) _ConvertDictToXmlRecurse(root, xmldict[roottag]) return root mdict = XmlDictObject() xml_variables = {} variables = self.Process.dictVariables for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in sorted(self.Process.dictVariableValues.items()): name = _formatName(variable_name) domainNames = [_formatName(d.Name) for d in variables[variable_name].Domains] variable = {} variable['Units'] = s_units variable['DomainNames'] = json.dumps(domainNames) variable['Domains'] = json.dumps(numpy.array(l_domains).tolist()) variable['Times'] = json.dumps(ndarr_times.tolist()) variable['Values'] = json.dumps(ndarr_values.tolist()) xml_variables[name] = variable mdict['Simulation'] = xml_variables root = ConvertDictToXml(mdict) root.set('processName', self.ProcessName) tree = ElementTree.ElementTree(root) tree.write(self.ConnectString) except Exception as e: print(('Cannot write data in XML format:\n' + str(e)))
[docs]class daeExcelFileDataReporter(daeDataReporterFile): """ Saves data into the Microsoft Excel format (.xlsx) using the openpyxl library (https://openpyxl.readthedocs.io). Does not require Excel installed and works on all operating systems. """ def __init__(self): daeDataReporterFile.__init__(self)
[docs] def WriteDataToFile(self): try: import openpyxl wb = openpyxl.Workbook() for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in sorted(self.Process.dictVariableValues.items()): name = _formatName(variable_name) ws = wb.create_sheet(title = name) ws.cell(row = 1, column = 1, value = 'Time [s] / Values [%s]' % s_units) variable_names = ['%s%s' % (name, (list(val_indexes) if val_indexes else '')) for val_indexes, value in numpy.ndenumerate(ndarr_values[0])] v = 2 for var_name in variable_names: ws.cell(row = 1, column = v, value = var_name) v += 1 for (t,), time in numpy.ndenumerate(ndarr_times): ws.cell(row = t+2, column = 1, value = time) v = 2 for val_indexes, value in numpy.ndenumerate(ndarr_values[t]): ws.cell(row = t+2, column = v, value = value) v += 1 wb.save(self.ConnectString) except Exception as e: print(('Cannot write data to an excel file:\n' + str(e)))
[docs]class daePandasDataReporter(daeDataReporterLocal): """ Creates a dataset using the Pandas library (available as data_frame property - the Pandas DataFrame object). """ def __init__(self): daeDataReporterLocal.__init__(self) def Connect(self, ConnectString, ProcessName): return True def IsConnected(self): return True def Disconnect(self): return True @property def data_frame(self): data_frame = None try: from pandas import DataFrame names = [] data = [] domains = [] domain_names = [] times = [] units = [] variables = self.Process.dictVariables for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in self.Process.dictVariableValues.items(): name = _formatName(variable_name) domainNames = [_formatName(d.Name) for d in variables[variable_name].Domains] names.append(name) units.append(s_units) times.append(ndarr_times) domains.append(l_domains) domain_names.append(domainNames) data.append(ndarr_values) _data = list(zip(units, domain_names, domains, times, data)) data_frame = DataFrame(data = _data, columns = ['Units', 'DomainNames', 'Domains', 'Times', 'Values'], index = names) except Exception as e: print(('Cannot generate Pandas DataFrame:\n' + str(e))) return data_frame
[docs]class daePlotDataReporter(daeDataReporterLocal): """ Plots the specified variables using the Matplotlib library (by Caleb Hattingh). """ def __init__(self): daeDataReporterLocal.__init__(self) self.ProcessName = "" def Connect(self, ConnectString, ProcessName): self.ProcessName = ProcessName self.ConnectString = ConnectString return True def IsConnected(self): return True def Disconnect(self): return True
[docs] def Plot(self, *args, **kwargs): ''' ``args`` can be either: a) Instances of daeVariable, or b) Lists of daeVariable instances, or c) A mixture of both. Each ``arg`` will get its own subplot. The subplots are all automatically arranged such that the resulting figure is as square-like as possible. You can however override the shape by supplying ``figRows`` and ``figCols`` as keyword args. Basic Example:: # Create Log, Solver, DataReporter and Simulation object log = daePythonStdOutLog() daesolve = daeIDAS() from daetools.pyDAE.data_reporters import daePlotDataReporter datareporter = daePlotDataReporter() simulation = simTutorial() simulation.m.SetReportingOn(True) simulation.ReportingInterval = 20 simulation.TimeHorizon = 500 simName = simulation.m.Name + strftime(" [%d.%m.%Y %H:%M:%S]", localtime()) if(datareporter.Connect("", simName) == False): sys.exit() simulation.Initialize(daesolver, datareporter, log) simulation.m.SaveModelReport(simulation.m.Name + ".xml") simulation.m.SaveRuntimeModelReport(simulation.m.Name + "-rt.xml") simulation.SolveInitial() simulation.Run() simulation.Finalize() datareporter.Plot( simulation.m.Ci, # Subplot 1 [simulation.m.L, simulation.m.event], # Subplot 2 (2 sets) simulation.m.Vp, # Subplot 3 [simulation.m.L, simulation.m.Vp] # Subplot 4 (2 sets) ) ''' try: import matplotlib.pyplot as plt from math import sqrt, ceil, floor ''' Math to make sure we have enough subplots for any number of given variables.''' n = len(args) def kwargCheck(kw='', valueIfNone=0): if kw in kwargs and kwargs[kw]: return kwargs[kw] else: return valueIfNone rows = kwargCheck('figRows', valueIfNone=int(round(sqrt(n)))) cols = kwargCheck('figCols', valueIfNone=int(ceil(sqrt(n)))) ''' Create lookup for convenience. There are two kinds of "variable" identifiers: a) instances of daeDataReceiverVariable (I call processVar) b) instances of daeVariable (I call daeVar) This lookup will create a name-value dict for later matching.''' lookup = {} for processVar in self.Process.Variables: lookup[processVar.Name] = processVar def varLabel(daeVar): return '{0} ({1})'.format(daeVar.Name, str(daeVar.VariableType.Units)) def larger_axlim(axlim): """ This function enlarges the y-axis range so that data will not lie underneath an axis line. axlim: 2-tuple result: 2-tuple (enlarged range) This was taken from (thanks bernie!): http://stackoverflow.com/a/6230993/170656 """ axmin,axmax = axlim axrng = axmax - axmin new_min = axmin - 0.1 * axrng new_max = axmax + 0.1 * axrng return new_min, new_max def plotVariable(daeVar): ''' Create easy plotter that requires only a daeVariable. This function will plot the variable's data onto whatever is the current set of axes. Note that the matchups between the processVar and daeVar require daeVar.CanonicalName, not merely Name.''' if daeVar.CanonicalName in lookup: processVar = lookup[daeVar.CanonicalName] #import pdb; pdb.set_trace() plt.plot( processVar.TimeValues, processVar.Values, label=varLabel(daeVar)) for i, arg in enumerate(args): '''Loop over arguments: some are variables (each one gets its own subplot) and some are lists of variables, these vars will share a plot.''' axes = plt.subplot(rows, cols, i+1) plt.xlabel('Time (s)') #plt.title('Title') plt.grid(True) try: plt.tight_layout(1.2) except Exception as e: pass if isinstance(arg, list): ''' This arg is a list of variables. These must all be plotted on the same plot.''' variables = arg for v in variables: plotVariable(v) plt.legend() else: ''' This arg is a single variable.''' variable = arg plt.ylabel(varLabel(variable)) plotVariable(variable) axes.set_ylim( larger_axlim( axes.get_ylim() ) ) plt.show() except Exception as e: import traceback print(('Cannot generate matplotlib plots:\n' + traceback.format_exc()))
class daeCSVFileDataReporter(daeDataReporterFile): """ Saves the results in the comma-separated values (CSV) format. The separator is ',' and the variable names are double quoted '"'. """ def __init__(self, uniqueTimeValues = False, nameBrackets = '[]'): daeDataReporterFile.__init__(self) self.uniqueTimeValues = uniqueTimeValues self.lBracket = nameBrackets[0] self.rBracket = nameBrackets[1] def WriteDataToFile(self): variable_names = [] variable_values = [] max_n_times = 0 variable_names.append('time') for variable_name, (ndarr_values, ndarr_times, l_domains, s_units) in self.Process.dictVariableValues.items(): varName = _formatName(variable_name) domain_sizes = [range(len(d)) for d in l_domains] indexes = itertools.product(*domain_sizes) n_times = ndarr_values.shape[0] if n_times > max_n_times: max_n_times = n_times times = ndarr_times.tolist() # Generate variable names for distributed variables if indexes: for index_list in indexes: str_index_list = [str(item) for item in index_list] if str_index_list: inds = '%s%s%s' % (self.lBracket, ','.join(str_index_list), self.rBracket) else: inds = '' variable_names.append('%s%s' % (varName, inds)) var_indexes = [slice(0,n_times)] + list(index_list) values = ndarr_values[var_indexes].tolist() variable_values.append(values) # Add time values variable_values.insert(0, times) try: f = open(self.ConnectString, 'w') # Some variables (i.e. parameters) have only one value so generate values for all time points for i in range(len(variable_values)): if len(variable_values[i]) == 1: variable_values[i] = variable_values[i] * max_n_times # Create ndarray for easier slicing vals = numpy.array(variable_values, copy = False) # Write variable names in the header row = ','.join(['\"%s\"' % v for v in variable_names]) f.write(row + '\n') # Write rows if self.uniqueTimeValues: unique_vals = {} for i in range(max_n_times): row_vals = vals[..., i] t = row_vals[0] unique_vals[t] = row_vals for t,row_vals in sorted(unique_vals.items()): row = ','.join(['%.16e' % v for v in row_vals]) f.write(row + '\n') else: for i in range(max_n_times): row = ','.join(['%.16e' % v for v in vals[..., i]]) f.write(row + '\n') f.close() except Exception as e: print(('Cannot write data in CSV format:\n' + str(e)))