"""
The Element class child classes alow parsing the expressions of a
given model element. There are four tipes of elements:
- Auxiliars (Aux class): Auxiliary elements, defined with <aux>.
- Flows (Flow class): Flow elements, defined with <flow>.
- Gfs (Gf class): Lookup elements, defined with <gf>.
- Stocks (Stock class): Data component, defined with <stock>
Moreover, a fith type element is defined ControlElement, which allows parsing
the values of the model control variables (time step, initialtime, final time).
The final result from a parsed element can be exported to an
AbstractElement object in order to build a model in other language.
"""
import re
from typing import Tuple, Union, List
from lxml import etree
import parsimonious
import numpy as np
from ..structures.abstract_model import\
AbstractElement, AbstractControlElement,\
AbstractLookup, AbstractComponent, AbstractSubscriptRange
from ..structures.abstract_expressions import\
AbstractSyntax, CallStructure, ReferenceStructure
from . import xmile_utils as vu
from .xmile_structures import structures, parsing_ops
[docs]
class Element():
"""
Element class. This class provides the shared methods for its
children: Aux, Flow, Gf, Stock, and ControlElement.
Parameters
----------
node: etree._Element
The element node content.
ns: dict
The namespace of the section.
subscripts: dict
The subscript dictionary of the section, necessary to parse
some subscripted elements.
"""
_interp_methods = {
"continuous": "interpolate",
"extrapolate": "extrapolate",
"discrete": "hold_backward"
}
_kind = "Element"
def __init__(self, node: etree._Element, ns: dict, subscripts):
self.node = node
self.ns = ns
self.name = node.attrib["name"].replace("\\n", " ")
self.units = self._get_xpath_text(node, "ns:units") or ""
self.documentation = self._get_xpath_text(node, "ns:doc") or ""
self.limits = (None, None)
self.components = []
self.subscripts = subscripts
def __str__(self): # pragma: no cover
text = "\n%s definition: %s" % (self._kind, self.name)
text += "\nSubscrips: %s" % repr(self.subscripts)\
if self.subscripts else ""
text += "\n\t%s" % self._expression
return text
@property
def _expression(self): # pragma: no cover
if hasattr(self, "ast"):
return str(self.ast).replace("\n", "\n\t")
else:
return self.node.text.replace("\n", "\n\t")
@property
def _verbose(self) -> str: # pragma: no cover
"""Get element information."""
return self.__str__()
@property
def verbose(self): # pragma: no cover
"""Print element information to standard output."""
print(self._verbose)
def _get_xpath_text(self, node: etree._Element,
xpath: str) -> Union[str, None]:
"""Safe access of occassionally missing text"""
try:
return node.xpath(xpath, namespaces=self.ns)[0].text
except IndexError:
return None
def _get_xpath_attrib(self, node: etree._Element,
xpath: str, attrib: str) -> Union[str, None]:
"""Safe access of occassionally missing attributes"""
# defined here to take advantage of NS in default
try:
return node.xpath(xpath, namespaces=self.ns)[0].attrib[attrib]
except IndexError:
return None
def _get_limits(self) -> Tuple[Union[None, str], Union[None, str]]:
"""Get the limits of the element"""
lims = (
self._get_xpath_attrib(self.node, 'ns:range', 'min'),
self._get_xpath_attrib(self.node, 'ns:range', 'max')
)
return tuple(float(x) if x is not None else x for x in lims)
def _get_non_negative(self, behavior):
non_negative = behavior or bool(
self.node.xpath('ns:non_negative', namespaces=self.ns)
)
boolean = self._get_xpath_text(self.node, 'ns:non_negative')
if boolean is not None:
non_negative = 'false' not in boolean.lower()
return non_negative
def _parse_lookup_xml_node(self, node: etree._Element) -> AbstractSyntax:
"""
Parse lookup definition
Returns
-------
AST: AbstractSyntax
"""
ys_node = node.xpath('ns:ypts', namespaces=self.ns)[0]
ys = np.fromstring(
ys_node.text,
dtype=float,
sep=ys_node.attrib['sep'] if 'sep' in ys_node.attrib else ','
)
xscale_node = node.xpath('ns:xscale', namespaces=self.ns)
if len(xscale_node) > 0:
xmin = xscale_node[0].attrib['min']
xmax = xscale_node[0].attrib['max']
xs = np.linspace(float(xmin), float(xmax), len(ys))
else:
xs_node = node.xpath('ns:xpts', namespaces=self.ns)[0]
xs = np.fromstring(
xs_node.text,
dtype=float,
sep=xs_node.attrib['sep'] if 'sep' in xs_node.attrib else ','
)
interp = node.attrib['type'] if 'type' in node.attrib else 'continuous'
return structures["lookup"](
x=tuple(xs[np.argsort(xs)]),
y=tuple(ys[np.argsort(xs)]),
x_limits=(np.min(xs), np.max(xs)),
y_limits=(np.min(ys), np.max(ys)),
type=self._interp_methods[interp]
)
[docs]
def parse(self, behaviors: dict) -> None:
"""
Parse all the components of an element
Parameters
----------
behaviors: dict
Dictionary with keys 'non_negative_flow' and 'non_negative_stock'
and boolean values defining the global behavior for the
stocks and flows.
Returns
-------
None
"""
if self.node.xpath("ns:element", namespaces=self.ns):
# defined in several equations each with one subscript
for subnode in self.node.xpath("ns:element", namespaces=self.ns):
self.components.append(
((subnode.attrib["subscript"].split(","), []),
self._parse_component(subnode, behaviors)[0])
)
else:
# get the subscripts from element
subscripts = [
subnode.attrib["name"]
for subnode
in self.node.xpath("ns:dimensions/ns:dim", namespaces=self.ns)
]
parsed = self._parse_component(self.node, behaviors)
if len(parsed) == 1:
# element defined with one equation
self.components = [((subscripts, []), parsed[0])]
else:
# element defined in several equations, but only the general
# subscripts are given, save each equation with its
# subscrtipts
subs_list = self.subscripts[subscripts[0]]
self.components = [
(([subs], []), parsed_i) for subs, parsed_i in
zip(subs_list, parsed)
]
def _smile_parser(self, expression: str) -> AbstractSyntax:
"""
Parse expression with parsimonious.
Returns
-------
AST: AbstractSyntax
"""
tree = vu.Grammar.get("equations", parsing_ops).parse(
expression.strip())
return EquationVisitor(tree).translation
def _get_empty_abstract_element(self) -> AbstractElement:
"""
Get empty Abstract used for building
Returns
-------
AbstractElement
"""
return AbstractElement(
name=self.name,
units=self.units,
limits=self.limits,
documentation=self.documentation,
components=[])
[docs]
class Aux(Element):
"""
Auxiliary variable defined by <aux> in Xmile.
Parameters
----------
node: etree._Element
The element node content.
ns: dict
The namespace of the section.
subscripts: dict
The subscript dictionary of the section, necessary to parse
some subscripted elements.
"""
_kind = "Flow"
def __init__(self, node, ns, subscripts):
super().__init__(node, ns, subscripts)
self.limits = self._get_limits()
def _parse_component(self, node: etree._Element,
behaviors: dict) -> List[AbstractSyntax]:
"""
Parse one Aux component
Returns
-------
AST: AbstractSyntax
"""
asts = []
for eqn in node.xpath('ns:eqn', namespaces=self.ns):
# Replace new lines with space, and replace 2 or more spaces with
# single space. Then ensure there is no space at start or end of
# equation
eqn = re.sub(r"(\s{2,})", " ", eqn.text.replace("\n", ' ')).strip()
ast = self._smile_parser(eqn)
gf_node = self.node.xpath("ns:gf", namespaces=self.ns)
if len(gf_node) > 0:
ast = structures["inline_lookup"](
ast, self._parse_lookup_xml_node(gf_node[0]))
asts.append(ast)
return asts
[docs]
def get_abstract_element(self) -> AbstractElement:
"""
Get Abstract Element used for building. This method is
automatically called by Sections's get_abstract_section.
Returns
-------
AbstractElement: AbstractElement
Abstract Element object that can be used for building
the model in another language. It contains a list of
AbstractComponents with the Abstract Syntax Tree of each of
the expressions.
"""
ae = self._get_empty_abstract_element()
for component in self.components:
ae.components.append(AbstractComponent(
subscripts=component[0],
ast=component[1]))
return ae
[docs]
class Flow(Aux):
"""
Flow defined by <flow> in Xmile.
Parameters
----------
node: etree._Element
The element node content.
ns: dict
The namespace of the section.
subscripts: dict
The subscript dictionary of the section, necessary to parse
some subscripted elements.
"""
_kind = "Flow"
def __init__(self, node, ns, subscripts):
super().__init__(node, ns, subscripts)
def _parse_component(self, node: etree._Element,
behaviors: dict) -> List[AbstractSyntax]:
"""
Parse one Flow component
Returns
-------
AST: AbstractSyntax
"""
asts = super()._parse_component(node, behaviors)
if self._get_non_negative(behaviors['non_negative_flow']):
# non_negative flows
asts = [
CallStructure(
ReferenceStructure("max"), (ast, 0)
)
for ast in asts
]
return asts
[docs]
class Gf(Element):
"""
Gf variable (lookup) defined by <gf> in Xmile.
Parameters
----------
node: etree._Element
The element node content.
ns: dict
The namespace of the section.
subscripts: dict
The subscript dictionary of the section, necessary to parse
some subscripted elements.
"""
_kind = "Gf component"
def __init__(self, node, ns, subscripts):
super().__init__(node, ns, subscripts)
self.limits = self.get_limits()
[docs]
def get_limits(self) -> Tuple[Union[None, str], Union[None, str]]:
"""Get the limits of the Gf element"""
lims = (
self._get_xpath_attrib(self.node, 'ns:yscale', 'min'),
self._get_xpath_attrib(self.node, 'ns:yscale', 'max')
)
return tuple(float(x) if x is not None else x for x in lims)
def _parse_component(self, node: etree._Element,
behaviors: dict) -> AbstractSyntax:
"""
Parse one Gf component
Returns
-------
AST: AbstractSyntax
"""
return [self._parse_lookup_xml_node(self.node)]
[docs]
def get_abstract_element(self) -> AbstractElement:
"""
Get Abstract Element used for building. This method is
automatically called by Sections's get_abstract_section.
Returns
-------
AbstractElement: AbstractElement
Abstract Element object that can be used for building
the model in another language. It contains a list of
AbstractComponents with the Abstract Syntax Tree of each of
the expressions.
"""
ae = self._get_empty_abstract_element()
for component in self.components:
ae.components.append(AbstractLookup(
subscripts=component[0],
ast=component[1]))
return ae
[docs]
class Stock(Element):
"""
Stock variable defined by <stock> in Xmile.
Parameters
----------
node: etree._Element
The element node content.
ns: dict
The namespace of the section.
subscripts: dict
The subscript dictionary of the section, necessary to parse
some subscripted elements.
"""
_kind = "Stock component"
def __init__(self, node, ns, subscripts):
super().__init__(node, ns, subscripts)
self.limits = self._get_limits()
def _parse_component(self, node, behaviors: dict) -> AbstractSyntax:
"""
Parse one Stock component
Returns
-------
AST: AbstractSyntax
"""
# Parse each flow equations
inflows = [
self._smile_parser(inflow.text)
for inflow in self.node.xpath('ns:inflow', namespaces=self.ns)]
outflows = [
self._smile_parser(outflow.text)
for outflow in self.node.xpath('ns:outflow', namespaces=self.ns)]
if inflows:
# stock has inflows
expr = ["+"] * (len(inflows)-1) + ["-"] * len(outflows)
elif outflows:
# stock has no inflows but outflows
outflows[0] = structures["negative"](outflows[0])
expr = ["-"] * (len(outflows)-1)
else:
# stock is constant
expr = []
inflows = [0]
if expr:
# stock has more than one flow
flows = structures["arithmetic"](expr, inflows+outflows)
else:
# stock has only one flow
flows = inflows[0] if inflows else outflows[0]
# Read the initial value equation for stock element
initial = self._smile_parser(self._get_xpath_text(self.node, 'ns:eqn'))
# Get non-negative information
non_negative = self._get_non_negative(behaviors['non_negative_stock'])
return [structures["stock"](flows, initial, non_negative)]
[docs]
def get_abstract_element(self) -> AbstractElement:
"""
Get Abstract Element used for building. This method is
automatically called by Sections's get_abstract_section.
Returns
-------
AbstractElement: AbstractElement
Abstract Element object that can be used for building
the model in another language. It contains a list of
AbstractComponents with the Abstract Syntax Tree of each of
the expressions.
"""
ae = self._get_empty_abstract_element()
for component in self.components:
ae.components.append(AbstractComponent(
subscripts=component[0],
ast=component[1]))
return ae
class ControlElement(Element):
"""Control variable (lookup)"""
_kind = "Control variable"
def __init__(self, name, units, documentation, eqn):
self.name = name
self.units = units
self.documentation = documentation
self.limits = (None, None)
self.eqn = eqn
def parse(self, behaviors: dict) -> None:
"""
Parse control elment.
Parameters
----------
behaviors: dict
Dictionary with keys 'non_negative_flow' and 'non_negative_stock'
and boolean values defining the global behavior for the
stocks and flows.
Returns
-------
None
"""
self.ast = self._smile_parser(self.eqn)
def get_abstract_element(self) -> AbstractElement:
"""
Get Abstract Element used for building. This method is
automatically called by Sections's get_abstract_section.
Returns
-------
AbstractElement: AbstractElement
Abstract Element object that can be used for building
the model in another language. It contains an AbstractComponent
with the Abstract Syntax Tree of the expression.
"""
return AbstractControlElement(
name=self.name,
units=self.units,
limits=self.limits,
documentation=self.documentation,
components=[
AbstractComponent(subscripts=([], []), ast=self.ast)
]
)
[docs]
class SubscriptRange():
"""Subscript range definition."""
def __init__(self, name: str, definition: List[str],
mapping: List[str] = []):
self.name = name
self.definition = definition
self.mapping = mapping
def __str__(self): # pragma: no cover
return "\nSubscript range definition: %s\n\t%s\n" % (
self.name,
self.definition)
@property
def _verbose(self) -> str: # pragma: no cover
"""Get subscript range information."""
return self.__str__()
@property
def verbose(self): # pragma: no cover
"""Print subscript range information to standard output."""
print(self._verbose)
[docs]
def get_abstract_subscript_range(self) -> AbstractSubscriptRange:
"""
Get Abstract Subscript Range used for building. This method is
automatically called by Sections's get_abstract_section.
Returns
-------
AbstractSubscriptRange: AbstractSubscriptRange
Abstract Subscript Range object that can be used for building
the model in another language.
"""
return AbstractSubscriptRange(
name=self.name,
subscripts=self.definition,
mapping=self.mapping
)
class EquationVisitor(parsimonious.NodeVisitor):
"""Visit the elements of a equation to get the AST"""
def __init__(self, ast):
self.translation = None
self.elements = {}
self.subs = None # the subscripts if given
self.negatives = set()
self.visit(ast)
def visit_expr_type(self, n, vc):
self.translation = self.elements[vc[0]]
def visit_logic2_expr(self, n, vc):
# expressions with logical binary operators (and, or)
return vu.split_arithmetic(
structures["logic"], parsing_ops["logic_ops"],
"".join(vc).strip(), self.elements)
def visit_logic_expr(self, n, vc):
# expressions with logical unitary operators (not)
id = vc[2]
if vc[0].lower() == "not":
id = self.add_element(structures["logic"](
[":NOT:"],
(self.elements[id],)
))
return id
def visit_comp_expr(self, n, vc):
# expressions with comparisons (=, <>, <, <=, >, >=)
return vu.split_arithmetic(
structures["logic"], parsing_ops["comp_ops"],
"".join(vc).strip(), self.elements)
def visit_add_expr(self, n, vc):
# expressions with additions (+, -)
return vu.split_arithmetic(
structures["arithmetic"], parsing_ops["add_ops"],
"".join(vc).strip(), self.elements)
def visit_mod_expr(self, n, vc):
# modulo expressions (mod)
if vc[1].lower().startswith("mod"):
return self.add_element(
structures["call"](
structures["reference"]("modulo"),
(self.elements[vc[0]], self.elements[vc[1][3:]])
))
else:
return vc[0]
def visit_prod_expr(self, n, vc):
# expressions with products (*, /)
return vu.split_arithmetic(
structures["arithmetic"], parsing_ops["prod_ops"],
"".join(vc).strip(), self.elements)
def visit_exp_expr(self, n, vc):
# expressions with exponentials (^)
return vu.split_arithmetic(
structures["arithmetic"], parsing_ops["exp_ops"],
"".join(vc).strip(), self.elements, self.negatives)
def visit_neg_expr(self, n, vc):
id = vc[2]
if vc[0] == "-":
if isinstance(self.elements[id], (float, int)):
self.elements[id] = -self.elements[id]
else:
self.negatives.add(id)
return id
def visit_call(self, n, vc):
func = self.elements[vc[0]]
args = self.elements[vc[4]]
if func.reference in structures:
func_str = structures[func.reference]
if isinstance(func_str, dict):
return self.add_element(func_str[len(args)](*args))
else:
return self.add_element(func_str(*args))
else:
return self.add_element(structures["call"](func, args))
def visit_conditional_statement(self, n, vc):
return self.add_element(structures["if_then_else"](
self.elements[vc[2]],
self.elements[vc[6]],
self.elements[vc[10]]))
def visit_reference(self, n, vc):
id = self.add_element(structures["reference"](
vc[0].lower().replace(" ", "_").strip("\""), self.subs))
self.subs = None
return id
def visit_array(self, n, vc):
if ";" in n.text or "," in n.text:
return self.add_element(np.squeeze(np.array(
[row.split(",") for row in n.text.strip(";").split(";")],
dtype=float)))
else:
return self.add_element(eval(n.text))
def visit_subscript_list(self, n, vc):
subs = [x.strip().replace("_", " ") for x in vc[2].split(",")]
self.subs = structures["subscripts_ref"](subs)
return ""
def visit_name(self, n, vc):
return n.text.strip()
def visit_expr(self, n, vc):
if vc[0] not in self.elements:
return self.add_element(eval(vc[0]))
else:
return vc[0]
def visit_arguments(self, n, vc):
arglist = tuple(x.strip(",") for x in vc)
return self.add_element(tuple(
self.elements[arg] if arg in self.elements
else eval(arg) for arg in arglist))
def visit_parens(self, n, vc):
return vc[2]
def visit__(self, n, vc):
# handles whitespace characters
return ""
def generic_visit(self, n, vc):
return "".join(filter(None, vc)) or n.text
def add_element(self, element):
return vu.add_element(self.elements, element)