#
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# mPlane Protocol Reference Implementation
# Information Model and Element Registry
#
# (c) 2013-2014 mPlane Consortium (http://www.ict-mplane.eu)
# Author: Brian Trammell <brian@trammell.ch>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Information model and element registry for the mPlane protocol.
This module implements Statements and Notifications, the core
messages used by the mPlane protocol to describe measurement
and query schemas, and various other classes to support them.
There are three kinds of Statements:
- Capability represents something a component can do
- Specification tells a component to do something it
advertised a Capability for
- Result returns the results for a Specification in-band
Notifications are used to transfer other information between
components and clients. There are four kinds of Notifications:
- Receipt notifies that a Result is not yet ready or
that the results of an operation will be indirectly exported.
- Redemption is a subsequent attempt to redeem a Receipt.
- Withdrawal notifies that a Capability is no longer available.
- Interrupt notifies that a running Specification should be stopped.
To see how all this fits together, let's simulate the message exchange
in a simple ping measurement. Initially, we have to load the default element
registry and programatically create a new empty Capability, as it would
be advertised by the component.
>>> import mplane
>>> import json
>>> mplane.model.initialize_registry()
>>> cap = mplane.model.Capability()
First, we set a temporal scope for the capability. Probe components
generally advertise a temporal scope from the present stretching
into the indeterminate future. In this case, we advertise that the
measurement performed is periodic, by setting the minimum period
supported by the capability: one ping per second.
>>> cap.set_when("now ... future / 1s")
We can only ping from one IPv4 address, to any IPv4 address.
Adding a parameter without a constraint makes it unconstrained:
>>> cap.add_parameter("source.ip4", "10.0.27.2")
>>> cap.add_parameter("destination.ip4")
Then we define the result columns this measurement can produce. Here,
we want quick reporting of min, max, and mean delays, as well as a
total count of singleton measurements taken and packets lost:
>>> cap.add_result_column("delay.twoway.icmp.us.min")
>>> cap.add_result_column("delay.twoway.icmp.us.max")
>>> cap.add_result_column("delay.twoway.icmp.us.mean")
>>> cap.add_result_column("delay.twoway.icmp.count")
>>> cap.add_result_column("packets.lost")
Now we have a capability we could transform into JSON and make it
available to clients via the mPlane protocol, or via static
download or configuration:
>>> capjson = mplane.model.unparse_json(cap)
>>> capjson # doctest: +SKIP
'{"capability": "measure",
"version": 1,
"registry": "http://ict-mplane.eu/registry/core",
"when": "now ... future / 1s",
"parameters": {"source.ip4": "10.0.27.2",
"destination.ip4": "*"},
"results": ["delay.twoway.icmp.us.min",
"delay.twoway.icmp.us.max",
"delay.twoway.icmp.us.mean",
"delay.twoway.icmp.count",
"packets.lost"]}'
On the client side, we'd receive this capability as a JSON object and turn it
into a capability, from which we generate a specification:
>>> clicap = mplane.model.parse_json(capjson)
>>> spec = mplane.model.Specification(capability=clicap)
>>> spec
<specification: measure when now ... future / 1s token e00d7fe8 schema 5ce99352 p(v)/m/r 2(1)/0/5>
Here we have a specification with a given token, schema, and 2 parameters,
no metadata, and five result columns.
.. note:: The schema of the statement is identified by a
schema hash, the first eight hex digits of which are shown for
diagnostic purposes. Statements with identical sets of parameters
and columns (schemas) will have identical schema hashes. Likewise,
the token is defined by the schema as well as the parameter values.
First, let's fill in a specific temporal scope for the measurement:
>>> spec.set_when("2017-12-24 22:18:42 + 1m / 1s")
And then let's fill in some parameters. All od the parameters whose
single values are already given by their constraints (in this case, source.ip4)
have already been filled in. So let's start with the destination.
Note that strings are accepted and automatically parsed using each
parameter's primitive type:
>>> spec.set_parameter_value("destination.ip4", "10.0.37.2")
And now we can transform this specification and send it back to
the component from which we got the capability:
>>> specjson = mplane.model.unparse_json(spec)
>>> specjson # doctest: +SKIP
'{"specification": "measure",
"version": 1,
"registry": "http://ict-mplane.eu/registry/core",
"token": "ea839b56bc3f6004e95d780d7a64d899",
"when": "2017-12-24 22:18:42.000000 + 1m / 1s",
"parameters": {"source.ip4": "10.0.27.2",
"destination.ip4": "10.0.37.2"},
"results": ["delay.twoway.icmp.us.min",
"delay.twoway.icmp.us.max",
"delay.twoway.icmp.us.mean",
"delay.twoway.icmp.count",
"packets.lost"]}'
On the component side, likewise, we'd receive this specification as a JSON
object and turn it back into a specification:
>>> comspec = mplane.model.parse_json(specjson)
The component would determine the measurement, query, or other operation to
run given by the specification, then extract the necessary parameter values, e.g.:
>>> comspec.get_parameter_value("destination.ip4")
IPv4Address('10.0.37.2')
>>> comspec.when().period()
datetime.timedelta(0, 1)
After running the measurement, the component would return the results
by assigning values to parameters which changed and result columns
measured:
>>> res = mplane.model.Result(specification=comspec)
>>> res.set_when("2017-12-24 22:18:42.993000 ... 2017-12-24 22:19:42.991000")
>>> res.set_result_value("delay.twoway.icmp.us.min", 33155)
>>> res.set_result_value("delay.twoway.icmp.us.mean", 55166)
>>> res.set_result_value("delay.twoway.icmp.us.max", 192307)
>>> res.set_result_value("delay.twoway.icmp.count", 58220)
>>> res.set_result_value("packets.lost", 2)
The result can then be serialized and sent back to the client:
>>> resjson = mplane.model.unparse_json(res)
>>> resjson # doctest: +SKIP
'{"result": "measure",
"version": 1,
"registry": "http://ict-mplane.eu/registry/core",
"token": "ea839b56bc3f6004e95d780d7a64d899",
"when": "2017-12-24 22:18:42.993000 ... 2017-12-24 22:19:42.991000",
"parameters": {"source.ip4": "10.0.27.2",
"destination.ip4": "10.0.37.2"},
"results": ["delay.twoway.icmp.us.min",
"delay.twoway.icmp.us.max",
"delay.twoway.icmp.us.mean",
"delay.twoway.icmp.count",
"packets.lost"],
"resultvalues": [["33155", "192307", "55166", "58220", "2"]]}'
which can transform them back to a result and extract the values:
>>> clires = mplane.model.parse_json(resjson)
>>> clires
<result: measure when 2017-12-24 22:18:42.993000 ... 2017-12-24 22:19:42.991000 token e00d7fe8 schema 5ce99352 p/m/r(r) 2/0/5(1)>
If the component cannot return results immediately (for example, because
the measurement will take some time), it can return a receipt instead:
>>> rcpt = mplane.model.Receipt(specification=comspec)
This receipt contains all the information in the specification, as well as a token
which can be used to quickly identify it in the future.
>>> rcpt.get_token()
'e00d7fe813cf17eeeea37b313dcfa4e7'
.. note:: The mPlane protocol specification allows components to assign tokens
however they like. In the reference implementation, the default token
is based on a hash like the schema hash: statements with the same verb,
schema, parameter values, and metadata will have identical default tokens.
A component could, however, assign serial-number based tokens, or tokens
mapping to structures in its own filesystem, etc.
>>> jsonrcpt = mplane.model.unparse_json(rcpt)
>>> jsonrcpt # doctest: +SKIP
'{"receipt": "measure",
"version": 1,
"registry": "http://ict-mplane.eu/registry/core",
"token": "e00d7fe813cf17eeeea37b313dcfa4e7",
"when": "2017-12-24 22:18:42.000000 + 1m / 1s",
"parameters": {"destination.ip4": "10.0.37.2",
"source.ip4": "10.0.27.2"},
"results": ["delay.twoway.icmp.us.min",
"delay.twoway.icmp.us.max",
"delay.twoway.icmp.us.mean",
"delay.twoway.icmp.count",
"packets.lost"],}'
The component keeps the receipt, keyed by token, and returns it to the
client in a message. The client then which generates a future redemption
referring to this receipt to retrieve the results:
>>> clircpt = mplane.model.parse_json(jsonrcpt)
>>> clircpt
<receipt: e00d7fe813cf17eeeea37b313dcfa4e7>
>>> rdpt = mplane.model.Redemption(receipt=clircpt)
>>> rdpt
<redemption: e00d7fe813cf17eeeea37b313dcfa4e7>
Note here that the redemption has the same token as the receipt;
just the token may be sent back to the component to retrieve the
results:
>>> mplane.model.unparse_json(rdpt, token_only=True) # doctest: +SKIP
'{"redemption": "measure",
"version": 1,
"registry": "http://ict-mplane.eu/registry/core",
"token": "e00d7fe813cf17eeeea37b313dcfa4e7"
}'
As long as the measurement is running, the client can stop the measurement by sending an
interrupt:
>>> irpt = mplane.model.Interrupt(specification=spec)
>>> jsonrcpt = mplane.model.unparse_json(irpt)
The component receives the interrupt, stops the measurement and returns the results of
performed measurement.
Otherwise, in case the component cannot perform the speficied operation, it sends a
withdrawal to cancel the previously advertised capability:
>>> wtdr = mplane.model.Withdrawal(capability=cap)
>>> mplane.model.unparse_json(wtdr) # doctest: +SKIP
'{"withdrawal": "measure",
"version": 1,
"registry": "http://ict-mplane.eu/registry/core",
"token": "d7e9df75145e209e144bf9c06e7a9d2f",
"when": "now ... future / 1s",
"parameters": {"destination.ip4": "*",
"source.ip4": "10.0.27.2"},
"results": ["delay.twoway.icmp.us.min",
"delay.twoway.icmp.us.max",
"delay.twoway.icmp.us.mean",
"delay.twoway.icmp.count",
"packets.lost"]
}'
Further several messages can be send at once by using an Envelope,
e.g. a component could announce serval capabilites at once.
In case of our simple ping component we create a second capability
that only provides the mean of the measurement values:
>>> cap2 = mplane.model.Capability()
>>> cap2.set_when("now ... future / 1s")
>>> cap2.add_parameter("source.ip4", "10.0.27.2")
>>> cap2.add_parameter("destination.ip4")
>>> cap2.add_result_column("delay.twoway.icmp.us.mean")
Now we create an Envelope and append the two capabilities.
>>> env = mplane.model.Envelope()
>>> env.append_message(cap)
>>> env.append_message(cap2)
>>> env # doctest: +SKIP
<Envelope message (2):
<capability: measure when now ... future / 1s token d7e9df75 schema 5ce99352 p/m/r 2/0/5>
<capability: measure when now ... future / 1s token a9ec7fce schema ea37cea5 p/m/r 2/0/1>
>
Similar as with every other message this Envelope is serialized and send to the client:
>>> envjson = mplane.model.unparse_json(env)
The client receives the Envelope and decomposes the encapsulated messages:
>>> clienv = mplane.model.parse_json(envjson)
>>> messages = [message for message in clienv.messages()]
"""
try:
from ipaddress import ip_address
except ImportError:
from ipaddr import IPAddress as ip_address
from datetime import datetime, timedelta, timezone
from copy import copy, deepcopy
import urllib.request
import urllib.parse
import collections
import functools
import operator
import hashlib
import json
import yaml
import re
import os
from mplane.utils import normalize_path
#######################################################################
# String constants for protocol framing
#######################################################################
ELEMENT_SEP = "."
RANGE_SEP = " ... "
DURATION_SEP = " + "
PERIOD_SEP = " / "
SET_SEP = ","
ANCHOR_SEP = "#"
INNER_WHEN_SEP_START = " { "
INNER_WHEN_SEP_END = " } "
CONSTRAINT_ALL = "*"
VALUE_NONE = "*"
TIME_PAST = "past"
TIME_NOW = "now"
TIME_FUTURE = "future"
WHEN_REPEAT = "repeat "
WHEN_CRON = " cron "
VERB_MEASURE = "measure"
VERB_QUERY = "query"
VERB_COLLECT = "collect"
VERB_STORE = "store"
VERB_CALLBACK = "callback"
KEY_PARAMETERS = "parameters"
KEY_METADATA = "metadata"
KEY_RESULTS = "results"
KEY_RESULTVALUES = "resultvalues"
KEY_TOKEN = "token"
KEY_MESSAGE = "message"
KEY_LINK = "link"
KEY_EXPORT = "export"
KEY_VERSION = "version"
KEY_WHEN = "when"
KEY_REGISTRY = "registry"
KEY_LABEL = "label"
KEY_CONTENTS = "contents"
KEY_MONTHS = "months"
KEY_DAYS = "days"
KEY_WEEKDAYS = "weekdays"
KEY_HOURS = "hours"
KEY_MINUTES = "minutes"
KEY_SECONDS = "seconds"
KIND_CAPABILITY = "capability"
KIND_SPECIFICATION = "specification"
KIND_RESULT = "result"
KIND_RECEIPT = "receipt"
KIND_REDEMPTION = "redemption"
KIND_INDIRECTION = "indirection"
KIND_WITHDRAWAL = "withdrawal"
KIND_INTERRUPT = "interrupt"
KIND_EXCEPTION = "exception"
KIND_ENVELOPE = "envelope"
ENVELOPE_MESSAGE = "message"
ENVELOPE_STATEMENT = "statement"
ENVELOPE_NOTIFICATION = "notification"
KEY_REGFMT = "registry-format"
KEY_REGREV = "registry-revision"
KEY_REGURI = "registry-uri"
KEY_REGINCLUDE = "includes"
KEY_ELEMENTS = "elements"
KEY_ELEMNAME = "name"
KEY_ELEMPRIM = "prim"
KEY_ELEMDESC = "desc"
REGURI_DEFAULT = "http://ict-mplane.eu/registry/core"
REGFMT_FLAT = "mplane-0"
#######################################################################
# Protocol constants
#######################################################################
MPLANE_VERSION = 1 # version 1 -- D1.4 protocol, interop guarantee
#######################################################################
# Reference implementation constants
#######################################################################
# Hash length in __repr__ strings
REPHL = 8
#######################################################################
# Universal parse and unparse functions for times and durations
#######################################################################
_iso8601_pat = '(\d+-\d+-\d+)(\s+\d+:\d+(:\d+)?)?(\.\d+)?'
_iso8601_re = re.compile(_iso8601_pat)
_iso8601_fmt = { 'us': '%Y-%m-%d %H:%M:%S.%f',
's': '%Y-%m-%d %H:%M:%S',
'm': '%Y-%m-%d %H:%M',
'd': '%Y-%m-%d'}
_dur_pat = '((\d+)d)?((\d+)h)?((\d+)m)?((\d+)s)?'
_dur_re = re.compile(_dur_pat)
_dur_seclabel = ( (86400, 'd'),
( 3600, 'h'),
( 60, 'm'),
( 1, 's') )
_innerwhen_pat = '\{ (.*?) \}'
_innerwhen_re = re.compile(_innerwhen_pat)
def parse_time(valstr):
if valstr is None:
return None
elif valstr == TIME_PAST:
return time_past
elif valstr == TIME_FUTURE:
return time_future
elif valstr == TIME_NOW:
return time_now
else:
m = _iso8601_re.match(valstr)
if m:
mstr = m.group(0)
mg = m.groups()
if mg[3]:
# FIXME this only handles millseconds; we should handle
# general precision fractional seconds correctly
dt = datetime.strptime(mstr, "%Y-%m-%d %H:%M:%S.%f")
elif mg[2]:
dt = datetime.strptime(mstr, "%Y-%m-%d %H:%M:%S")
elif mg[1]:
dt = datetime.strptime(mstr, "%Y-%m-%d %H:%M")
else:
dt = datetime.strptime(mstr, "%Y-%m-%d")
return dt
else:
raise ValueError(repr(valstr)+" does not appear to be an mPlane timestamp")
def unparse_time(valts, precision="us"):
if isinstance(valts, datetime):
return valts.strftime(_iso8601_fmt[precision])
else:
return str(valts)
def parse_dur(valstr):
if valstr is None:
return None
else:
m = _dur_re.match(valstr)
if m:
mg = m.groups()
valsec = 0
for i in range(4):
if mg[2*i + 1]:
valsec += _dur_seclabel[i][0] * int(mg[2*i + 1])
return timedelta(seconds=valsec)
else:
raise ValueError(repr(valstr)+" does not appear to be an mPlane duration")
def unparse_dur(valtd):
valsec = int(valtd.total_seconds())
valstr = ""
for i in range(4):
if valsec >= _dur_seclabel[i][0]:
valunit = int(valsec / _dur_seclabel[i][0])
valstr += str(valunit) + _dur_seclabel[i][1]
valsec -= valunit * _dur_seclabel[i][0]
if len(valstr) == 0:
valstr = "0s"
return valstr
#######################################################################
# Temporal Scoping and Scheduling
#######################################################################
class _PastTime:
"""
Class representing the indeterminate past.
Do not instantiate; use the time_past instance of this class.
"""
def __str__(self):
return TIME_PAST
def __repr__(self):
return "mplane.model.time_past"
def strftime(self, ign):
return str(self)
time_past = _PastTime()
class _NowTime:
"""
Class representing the present.
Do not instantiate; use the time_now instance of this class.
"""
def __str__(self):
return TIME_NOW
def __repr__(self):
return "mplane.model.time_now"
def strftime(self, ign):
return str(self)
time_now = _NowTime()
class _FutureTime:
"""
Class representing the indeterminate future.
Do not instantiate; use the time_future instance of this class.
"""
def __str__(self):
return TIME_FUTURE
def __repr__(self):
return "mplane.model.time_future"
def strftime(self, ign):
return str(self)
time_future = _FutureTime()
def _parse_numset(valstr):
return set(map(int, valstr.split(SET_SEP)))
def _unparse_numset(valset):
return SET_SEP.join(map(str, sorted(list(valset))))
_dow_label = ('mo', 'tu', 'we', 'th', 'fr', 'sa', 'so')
_dow_number = { k: v for (v, k) in enumerate(_dow_label) }
def _parse_wdayset(valstr):
return set(map(lambda x:_dow_number[x], valstr.split(SET_SEP)))
def _unparse_wdayset(valset):
return SET_SEP.join(map(lambda x: _dow_label[x], sorted(list(valset))))
class _Crontab(object):
def __init__(self):
super().__init__()
self._months = set()
self._days = set()
self._weekdays = set()
self._hours = set()
self._minutes = set()
self._seconds = set()
def _parse_value(self, val):
# Check if this is a range
rangesplit = val.split('-')
if len(rangesplit) > 1:
return set(range(int(rangesplit[0]),int(rangesplit[1])+1))
# Parse numset
return _parse_numset(val)
def _parse(self, valstr):
valsplit = valstr.split()
if len(valsplit) != 6:
raise ValueError(repr(valstr)+" does not appear to be a mPlane crontab")
self._seconds = set(range(0,60)) if (valsplit[0] == '*') else self._parse_value(valsplit[0])
self._minutes = set(range(0,60)) if (valsplit[1] == '*') else self._parse_value(valsplit[1])
self._hours = set(range(0,60)) if (valsplit[2] == '*') else self._parse_value(valsplit[2])
self._days = set(range(1,32)) if (valsplit[3] == '*') else self._parse_value(valsplit[3])
self._weekdays = set(range(0,7)) if (valsplit[4] == '*') else self._parse_value(valsplit[4])
self._months = set(range(1,13)) if (valsplit[5] == '*') else self._parse_value(valsplit[5])
def __str__(self):
return" ".join([_unparse_numset(self._seconds),
_unparse_numset(self._minutes),
_unparse_numset(self._hours),
_unparse_numset(self._days),
_unparse_numset(self._weekdays),
_unparse_numset(self._months)])
def __repr__(self):
return "cron "+str(self)
[docs]class When(object):
"""
Defines the temporal scopes for capabilities, results, or
single measurement specifications.
"""
def __init__(self, valstr=None, a=None, b=None, duration=None, period=None,
repeated=False, inner_duration=None, inner_period=None, crontab=None):
super().__init__()
self._a = a
self._b = b
self._duration = duration
self._period = period
self._repeated = repeated
self._inner_duration = inner_duration
self._inner_period = inner_period
self._crontab = crontab
if valstr is not None:
self._parse(valstr)
def _parse(self, valstr):
# First check if this is a repeated measurement
valsplit = valstr.split(WHEN_REPEAT)
if len(valsplit) > 1:
self._repeated = True
# Remove 'repeat '
valstr = valsplit[1]
# Check for inner when
innervalstr = _innerwhen_re.search(valstr)
if innervalstr:
innervalstr = innervalstr.group(1)
# check if inner-when begins with 'now'
if not innervalstr.startswith(TIME_NOW):
raise ValueError(repr(valstr)+" does not appear to be an mPlane repeated-when (inner when has to be relative to now)")
# Separate the period from the value and parse it
valsplit = innervalstr.split(PERIOD_SEP)
if len(valsplit) > 1:
(innervalstr, perstr) = valsplit
self._inner_period = parse_dur(perstr)
else:
self._period = None
# then try to split duration
valsplit = innervalstr.split(DURATION_SEP)
if len(valsplit) > 1:
(innervalstr, durstr) = valsplit
self._inner_duration = parse_dur(durstr)
valsplit = [innervalstr]
else:
self._inner_duration = None
# Remove inner when
valsplit = valstr.split(INNER_WHEN_SEP_START)[0]
# Finally check for a crontab
valsplit = valsplit.split(WHEN_CRON)
if len(valsplit) > 1:
# remove inner when and parse the crontab
self._crontab = _Crontab()
self._crontab._parse(valsplit[1])
else:
self._crontab = None
# Remove crontab
valstr = valsplit[0]
else:
self._inner_duration = None
self._inner_period = None
self._crontab = None
# Outer-when or simple-when
# separate the period from the value and parse it
valsplit = valstr.split(PERIOD_SEP)
if len(valsplit) > 1:
(valstr, perstr) = valsplit
self._period = parse_dur(perstr)
else:
self._period = None
# then try to split duration or range
valsplit = valstr.split(DURATION_SEP)
if len(valsplit) > 1:
(valstr, durstr) = valsplit
self._duration = parse_dur(durstr)
valsplit = [valstr]
else:
self._duration = None
valsplit = valstr.split(RANGE_SEP)
# if this is a repeated-when without a cron, period has to be set
if self._repeated and self._crontab is None and self._period is None:
raise ValueError(repr(valstr)+" does not appear to be an mPlane repeated-when (no duration or cron set)")
# if this is a repeated-when with cron, period must not be set
if self._repeated and self._crontab and self._period:
raise ValueError(repr(valstr)+" does not appear to be an mPlane repeated-when (duration and cron set at the same time)")
self._a = parse_time(valsplit[0])
if len(valsplit) > 1:
self._b = parse_time(valsplit[1])
else:
self._b = None
def __str__(self):
if self._repeated:
valstr = "".join((WHEN_REPEAT, unparse_time(self._a)))
else:
valstr = "".join((unparse_time(self._a)))
if self._b is not None:
valstr = "".join((valstr, RANGE_SEP, unparse_time(self._b)))
elif self._duration is not None:
valstr = "".join((valstr, DURATION_SEP, unparse_dur(self._duration)))
if self._period is not None:
valstr = "".join((valstr, PERIOD_SEP, unparse_dur(self._period)))
if self._crontab is not None:
valstr = "".join((valstr, WHEN_CRON, str(self._crontab)))
if self._inner_duration is not None:
valstr = "".join((valstr, INNER_WHEN_SEP_START, TIME_NOW, DURATION_SEP, unparse_dur(self._inner_duration)))
if self._inner_period is not None:
valstr = "".join((valstr, PERIOD_SEP, unparse_dur(self._inner_period)))
valstr = "".join((valstr, INNER_WHEN_SEP_END))
return valstr
def __repr__(self):
return "<When: "+str(self)+">"
[docs] def is_forever(self):
"""Returns True if this scope ends in the indeterminate future."""
return self._b is time_future
[docs] def is_past(self):
"""Returns True if this is an indefinite past scope."""
return self._a is time_past and self._b is time_now
[docs] def is_future(self):
"""Returns True if this is an indefinite future scope."""
return self._a is time_now and self._b is time_future
[docs] def is_infinite(self):
"""
Returns True if this scope is completely infinite
(from the infinite past to the infinite future).
"""
return self._a is time_past and self._b is time_future
[docs] def is_definite(self):
"""
Returns True if this scope defines a definite time
or a definite time interval.
"""
if self._b is None:
return isinstance(self._a, datetime)
else:
return isinstance(self._a, datetime) and isinstance(self._b, datetime)
[docs] def is_singleton(self):
"""
Returns True if this temporal scope refers to a
singleton measurement. Used in scheduling an enclosing
Specification; has no meaning for Capabilities
or Results.
"""
return self._a is not None and self._b is None and self.duration() is None
[docs] def is_repeated(self):
"""
Return True if this temporal scope referes to a
repeated when.
"""
return self._repeated
[docs] def datetimes(self, tzero=None):
"""
Return start and end times as absolute timestamps
for this temporal scope, relative to a given tzero.
"""
if tzero is None:
tzero = datetime.utcnow()
if self._a is time_now:
start = tzero
elif self._a is time_past:
start = None
else:
start = self._a
if self._b is time_now:
end = tzero
elif self._b is time_future:
end = None
elif self._b is None:
if self.duration() is not None:
end = start + self.duration()
else:
end = start
else:
end = self._b
return (start, end)
[docs] def duration(self, tzero=None):
"""
Return the duration of this temporal scope as a timedelta.
If the temporal scope is indefinite in the future, returns None.
"""
if self._duration is not None:
return self._duration
elif self._b is None:
return timedelta()
elif self._b is time_future:
return None
else:
(start, end) = self.datetimes(tzero)
return end - start
[docs] def period(self):
"""Returns the period of this temporal scope."""
return self._period
[docs] def timer_delays(self, tzero=None):
"""
Returns a tuple with delays for timers to signal the start and end of
a temporal scope, given a specified time zero, which defaults to the
current system time.
The start delay is defined to be zero if the scheduled start time has
already passed or the temporal scope is immediate (i.e., starts now).
The start delay is None if the temporal scope has expired (that is,
the current time is after the calculated end time).
The end delay is defined to be None if the temporal scope has already
expired, or if the temporal scope has no scheduled end (is infinite or
a singleton). End delays are calculated to give priority to duration
when a temporal scope is expressed in terms of duration, and to
prioritize end time otherwise.
Used in scheduling an enclosing Specification for execution.
Has no meaning for Capabilities or Results.
"""
# default to current time
if tzero is None:
tzero = datetime.utcnow()
# get datetimes
(start, end) = self.datetimes(tzero=tzero)
# determine start delay, account for late start
sd = (start - tzero).total_seconds()
if sd < 0:
sd = 0
# determine end delay
if self._b is not None and self._b is not time_future:
ed = (end - tzero).total_seconds()
elif self.duration() is not None:
ed = sd + self.duration().total_seconds()
else:
ed = None
# detect expired temporal scope
if ed is not None and ed < 0:
sd = None
ed = None
return (sd, ed)
[docs] def sort_scope(self, t, tzero=None):
"""
Returns < 0 if time t falls before this scope,
0 if time t falls within the scope,
or > 0 if time t falls after this scope.
"""
# Special handling for "now"
if t is time_now:
if self._a is time_now or self._b is time_now:
return 0
else:
if tzero is None:
tzero = datetime.utcnow()
t = tzero
# Get concrete time range
(start, end) = self.datetimes(tzero=tzero)
if start and t < start:
return (t - start).total_seconds()
elif end and t > end:
return (t - end).total_seconds()
else:
return 0
[docs] def in_scope(self, t, tzero=None):
"""
Returns True if time t falls within this scope.
"""
return self.sort_scope(t, tzero) == 0
[docs] def follows(self, s, tzero=None):
"""
Returns True if this scope follows (is contained by) another.
"""
if not self._repeated and s.period() is not None and (self.period() is None or self.period() < s.period()):
return False
if self._repeated and s.period() is not None and (self._inner_period is None or self._inner_period < s.period()):
return False
if s.in_scope(self._a, tzero):
return True
if isinstance(self._b, datetime) and s.in_scope(self._b, tzero):
return True
else:
return False
[docs] def iterator(self, tzero=None):
"""
Returns an iterator over When statements generated by a repeated when.
"""
if not self._repeated:
raise Exception("Can't get iterator for non-repeated when")
# default to now, zero microseconds, initialize minus one second
if tzero is None:
t = datetime.utcnow().replace(microsecond=0)
else:
t = tzero
# get base period (default 1s) and
period = self.period()
if period is None:
period = timedelta(seconds=1)
# fast forward if necessary
lag = self.sort_scope(t, tzero)
if lag < 0:
t += timedelta(seconds=-lag)
tzero = t
# loop through time by period and check for match
t -= period
# repeat with cron
if self._crontab:
while True:
t += period
if self.sort_scope(t, tzero) > 0:
break
if len(self._crontab._seconds) and (t.second not in self._crontab._seconds):
continue
if len(self._crontab._minutes) and (t.minute not in self._crontab._minutes):
continue
if len(self._crontab._hours) and (t.hour not in self._crontab._hours):
continue
if len(self._crontab._days) and (t.day not in self._crontab._days):
continue
if len(self._crontab._weekdays) and ((t.weekday() + 1) % 7 not in self._crontab._weekdays):
continue
if len(self._crontab._months) and (t.month not in self._crontab._months):
continue
yield When(a=t, period=self._inner_period, duration=self._inner_duration)
# repeat without cron
else:
while True:
t += period
if self.sort_scope(t, tzero) > 0:
break
yield When(a=t, period=self._inner_period, duration=self._inner_duration)
when_infinite = When(a=time_past, b=time_future)
# class Schedule(object):
# """
# Defines a schedule for repeated operations based on crontab-like
# sets of months, days, days of weeks, hours, minutes, and seconds.
# Used to specify repetitions of single measurements in a Specification.
# Designed to be broadly compatible with LMAP calendar-based scheduling.
# This class is not yet fully implemented or integrated into the
# information model.
# """
# def __init__(self, dictval=None, when=None):
# super().__init__()
# self._when = when
# self._months = set()
# self._days = set()
# self._weekdays = set()
# self._hours = set()
# self._minutes = set()
# self._seconds = set()
# if dictval is not None:
# self._from_dict(dictval)
# def __repr__(self):
# rs = "<Schedule "
# if self._when is not None:
# rs += repr(self._when) + " "
# rs += "cron "
# rs += "/".join(map(str, [len(self._months),
# len(self._days),
# len(self._weekdays),
# len(self._hours),
# len(self._minutes),
# len(self._seconds)]))
# rs += ">"
# return rs
# def to_dict(self):
# """
# Represents this schedule as a dictionary (for serialization).
# """
# d = {}
# if self._when:
# d[KEY_WHEN] = str(self._when)
# if len(self._months):
# d[KEY_MONTHS] = _unparse_numset(self._months)
# if len(self._days):
# d[KEY_DAYS] = _unparse_numset(self._days)
# if len(self._weekdays):
# d[KEY_WEEKDAYS] = _unparse_wdayset(self._weekdays)
# if len(self._hours):
# d[KEY_HOURS] = _unparse_numset(self._hours)
# if len(self._minutes):
# d[KEY_MINUTES] = _unparse_numset(self._minutes)
# if len(self._seconds):
# d[KEY_SECONDS] = _unparse_numset(self._seconds)
# return d
# def _from_dict(self, d):
# if KEY_WHEN in d:
# self._when = When(valstr=d[KEY_WHEN])
# if KEY_MONTHS in d:
# self._months = _parse_numset(d[KEY_MONTHS])
# if KEY_DAYS in d:
# self._days = _parse_numset(d[KEY_DAYS])
# if KEY_WEEKDAYS in d:
# self._weekdays = _parse_wdayset(d[KEY_WEEKDAYS])
# if KEY_HOURS in d:
# self._hours = _parse_numset(d[KEY_HOURS])
# if KEY_MINUTES in d:
# self._minutes = _parse_numset(d[KEY_MINUTES])
# if KEY_SECONDS in d:
# self._seconds = _parse_numset(d[KEY_SECONDS])
# def datetime_iterator(self, t=None):
# """
# Returns an iterator over datetimes generated by the schedule
# and period.
# """
# # default to now, zero microseconds, initialize minus one second
# if t is None:
# t = datetime.utcnow().replace(microsecond=0)
# # get base period (default 1s) and
# period = None
# if self._when is not None:
# period = self._when.period()
# if period is None:
# period = timedelta(seconds=1)
# # fast forward if necessary
# lag = self._when.sort_scope(t)
# if lag < 0:
# t += timedelta(seconds=-lag)
# # loop through time by period and check for match
# t -= period
# while True:
# t += period
# if self._when is not None and not self._when.in_scope(t):
# break
# if len(self._seconds) and (t.second not in self._seconds):
# continue
# if len(self._minutes) and (t.minute not in self._minutes):
# continue
# if len(self._hours) and (t.hour not in self._hours):
# continue
# if len(self._days) and (t.day not in self._days):
# continue
# if len(self._weekdays) and (t.weekday() not in self._weekdays):
# continue
# if len(self._months) and (t.month not in self._months):
# continue
# yield t
def test_tscope():
# Definite scope
wdef = When("2009-02-20 13:00:00 ... 2009-02-20 15:00:00")
assert wdef.duration() == timedelta(0,7200)
assert wdef.period() is None
assert wdef.is_definite()
assert not wdef.is_infinite()
assert not wdef.is_repeated()
assert wdef.in_scope(parse_time("2009-02-20 14:15:16"))
assert not wdef.in_scope(parse_time("2009-02-21 14:15:16"))
assert wdef.sort_scope(parse_time("2009-01-20 22:30:15")) < 0
assert wdef.sort_scope(parse_time("2010-07-27 22:30:15")) > 0
assert wdef.datetimes() == (parse_time("2009-02-20 13:00:00"),
parse_time("2009-02-20 15:00:00"))
assert wdef.timer_delays(tzero=parse_time("2009-02-20 12:00:00")) == (3600, 10800)
# Immediate scope with period
wrel = When("now + 30m / 15s")
assert wrel.duration() == timedelta(0,1800)
assert wrel.period() == timedelta(0,15)
assert not wrel.is_definite()
assert not wrel.is_repeated()
assert wrel.is_immediate()
assert wrel.in_scope(parse_time("2009-02-20 13:44:45"), tzero=parse_time("2009-02-20 13:30:00"))
assert wrel.datetimes(tzero=parse_time("2009-02-20 13:30:00")) == \
(parse_time("2009-02-20 13:30:00"), parse_time("2009-02-20 14:00:00"))
assert wrel.follows(wdef, tzero=parse_time("2009-02-20 13:30:00"))
assert wrel.timer_delays(tzero=parse_time("2009-02-20 12:00:00")) == (0, 1800)
# Infinite scope
assert when_infinite.duration() is None
assert when_infinite.period() is None
assert not when_infinite.is_repeated()
assert wdef.follows(when_infinite)
assert wrel.follows(when_infinite)
assert (when_infinite.datetimes()) == (None, None)
# repeated when without inner-when (should be the same as immediate scrope with period)
wrep = When("repeat now + 30m / 15s")
assert wrep.is_repeated()
assert wrep.duration() == timedelta(0,1800)
assert wrep.period() == timedelta(0,15)
assert not wrep.is_definite()
assert wrep.is_immediate()
assert wrep.in_scope(parse_time("2009-02-20 13:44:45"), tzero=parse_time("2009-02-20 13:30:00"))
assert wrep.datetimes(tzero=parse_time("2009-02-20 13:30:00")) == \
(parse_time("2009-02-20 13:30:00"), parse_time("2009-02-20 14:00:00"))
assert wrep.follows(wdef, tzero=parse_time("2009-02-20 13:30:00"))
assert wrep.timer_delays(tzero=parse_time("2009-02-20 12:00:00")) == (0, 1800)
# repeated when with period and inner-when
wrep = When("repeat now + 30m / 1m { now + 5s / 1s }")
assert wrep.is_repeated()
assert wrep.duration() == timedelta(0,1800)
assert wrep.period() == timedelta(0,60)
assert not wrep.is_definite()
assert wrep.is_immediate()
assert wrep.in_scope(parse_time("2009-02-20 13:44:45"), tzero=parse_time("2009-02-20 13:30:00"))
assert wrep.datetimes(tzero=parse_time("2009-02-20 13:30:00")) == \
(parse_time("2009-02-20 13:30:00"), parse_time("2009-02-20 14:00:00"))
assert wrep.follows(wdef, tzero=parse_time("2009-02-20 13:30:00"))
assert wrep.timer_delays(tzero=parse_time("2009-02-20 12:00:00")) == (0, 1800)
# check when from subspec (first iteration)
iter = wrep.iterator(parse_time("2009-02-20 13:30:00"))
wrep_subspec = next(iter)
assert wrep_subspec.duration() == timedelta(0,5)
assert wrep_subspec.period() == timedelta(0,1)
assert wrep_subspec.is_definite()
assert not wrep_subspec.is_immediate()
assert wrep_subspec.in_scope(parse_time("2009-02-20 13:30:04"), tzero=parse_time("2009-02-20 13:30:00"))
assert not wrep_subspec.in_scope(parse_time("2009-02-20 13:30:06"), tzero=parse_time("2009-02-20 13:30:00"))
assert wrep_subspec.datetimes(tzero=parse_time("2009-02-20 13:30:00")) == \
(parse_time("2009-02-20 13:30:00"), parse_time("2009-02-20 13:30:05"))
assert wrep_subspec.follows(wdef, tzero=parse_time("2009-02-20 13:30:00"))
assert wrep_subspec.timer_delays(tzero=parse_time("2009-02-20 12:00:00")) == (5400, 5405)
# check when from subspec (second iteration)
wrep_subspec = next(iter)
assert wrep_subspec.duration() == timedelta(0,5)
assert wrep_subspec.period() == timedelta(0,1)
assert wrep_subspec.is_definite()
assert not wrep_subspec.is_immediate()
assert wrep_subspec.in_scope(parse_time("2009-02-20 13:31:04"), tzero=parse_time("2009-02-20 13:30:00"))
assert not wrep_subspec.in_scope(parse_time("2009-02-20 13:31:06"), tzero=parse_time("2009-02-20 13:30:00"))
assert wrep_subspec.datetimes(tzero=parse_time("2009-02-20 13:30:00")) == \
(parse_time("2009-02-20 13:31:00"), parse_time("2009-02-20 13:31:05"))
assert wrep_subspec.follows(wdef, tzero=parse_time("2009-02-20 13:30:00"))
assert wrep_subspec.timer_delays(tzero=parse_time("2009-02-20 12:00:00")) == (5460, 5465)
# cron when (first monday each month)
wcron = When("repeat now ... future cron 0 0 * 1,2,3,4,5,6,7 1 * { now + 5s / 1s }")
assert wcron.is_repeated()
assert not wcron.is_definite()
assert wcron.is_immediate()
assert wcron.in_scope(parse_time("2009-02-20 13:44:45"), tzero=parse_time("2009-02-20 13:30:00"))
# check when from subspec (first iteration)
iter = wcron.iterator(parse_time("2009-02-20 13:30:00"))
wrep_subspec = next(iter)
assert wrep_subspec.duration() == timedelta(0,5)
assert wrep_subspec.period() == timedelta(0,1)
assert wrep_subspec.is_definite()
assert not wrep_subspec.is_immediate()
assert wrep_subspec.in_scope(parse_time("2009-03-02 00:00:04"), tzero=parse_time("2009-02-20 13:30:00"))
assert not wrep_subspec.in_scope(parse_time("2009-02-20 00:06:06"), tzero=parse_time("2009-02-20 13:30:00"))
assert wrep_subspec.datetimes(tzero=parse_time("2009-02-20 13:30:00")) == \
(parse_time("2009-03-02 00:00:00"), parse_time("2009-03-02 00:00:05"))
assert wrep_subspec.follows(When("2009-03-02 00:00:00 ... 2009-03-02 15:00:00"), tzero=parse_time("2009-03-02 00:00:03"))
assert wrep_subspec.timer_delays(tzero=parse_time("2009-03-01 23:00:00")) == (3600, 3605)
#######################################################################
# Primitive Types
#######################################################################
class _Primitive(object):
"""
Represents a primitive mPlane data type. Primitive types define
textual and native representations for data elements, and convert
between the two.
In general, client code will not need to interact with Primitives;
conversion between strings and values is handled automatically by
the Statement and Notification classes.
"""
def __init__(self, name):
super().__init__()
self.name = name
def __str__(self):
return self.name
def __repr__(self):
return "<special mplane primitive "+self.name+">"
def parse(self, sval):
"""
Converts a string to a value; default implementation
returns the string directly, returning None for the
special string "*", which represents "all values" in
mPlane.
"""
if sval is None or sval == VALUE_NONE:
return None
else:
return sval
def unparse(self, val):
"""
Converts a value to a string; default implementation
uses native __str__ representation, replaces None with a
the special string "*", representing all values.
"""
if val is None:
return VALUE_NONE
else:
return str(val)
class _StringPrimitive(_Primitive):
"""
Represents a string. Uses the default implementation.
If necessary, use the prim_string instance of this class;
in general, however, this is used internally by Element.
"""
def __init__(self):
super().__init__("string")
def __repr__(self):
return "mplane.model.prim_string"
class _NaturalPrimitive(_Primitive):
"""
Represents a natural number (unsigned integer).
Uses a Python int as the native representation.
If necessary, use the prim_natural instance of this class;
in general, however, this is used internally by Element.
"""
def __init__(self):
super().__init__("natural")
def __repr__(self):
return "mplane.model.prim_natural"
def parse(self, sval):
"""Convert a string to a natural value."""
if sval is None or sval == VALUE_NONE:
return None
else:
# also converts values like 100.0 or 10E2
return int(float(sval))
class _RealPrimitive(_Primitive):
"""
Represents a real number (floating point).
Uses a Python float as the native representation.
If necessary, use the prim_real instance of this class;
in general, however, this is used internally by Element.
"""
def __init__(self):
super().__init__("real")
def __repr__(self):
return "mplane.model.prim_real"
def parse(self, sval):
"""Convert a string to a floating point value."""
if sval is None or sval == VALUE_NONE:
return None
else:
return float(sval)
class _BooleanPrimitive(_Primitive):
"""
Represents a real number (floating point).
Uses a Python bool as the native representation.
If necessary, use the prim_boolean instance of this class;
in general, however, this is used internally by Element.
"""
def __init__(self):
super().__init__("boolean")
def __repr__(self):
return "mplane.model.prim_boolean"
def parse(self, sval):
"""Convert a string to a boolean value."""
if sval is None or sval == VALUE_NONE:
return None
elif sval == 'True':
return True
elif sval == 'False':
return False
# also converts 1 and 0
elif sval == '1':
return True
elif sval == '0':
return False
else:
raise ValueError("Invalid boolean value "+sval)
class _AddressPrimitive(_Primitive):
"""
Represents a IPv4 or IPv6 host or network address.
Uses the Python standard library ipaddress module
for the native representation.
"""
def __init__(self):
super().__init__("address")
def __repr__(self):
return "mplane.model.prim_address"
def parse(self, sval):
"""Convert a string to an address value."""
if sval is None or sval == VALUE_NONE:
return None
else:
return ip_address(sval)
class _URLPrimitive(_Primitive):
"""
Represents a URL. For now, URLs are implemented only as strings,
without any parsing or validation.
"""
def __init__(self):
super().__init__("url")
def __repr__(self):
return "mplane.model.prim_url"
class _TimePrimitive(_Primitive):
"""
Represents a UTC timestamp with arbitrary precision.
Also handles special-purpose mPlane timestamps.
"""
def __init__(self):
super().__init__("time")
def __repr__(self):
return "mplane.model.prim_time"
def parse(self, valstr):
return parse_time(valstr)
def unparse(self, val):
return unparse_time(val)
prim_string = _StringPrimitive()
prim_natural = _NaturalPrimitive()
prim_real = _RealPrimitive()
prim_boolean = _BooleanPrimitive()
prim_time = _TimePrimitive()
prim_address = _AddressPrimitive()
prim_url = _URLPrimitive()
_prim = {x.name: x for x in [prim_string,
prim_natural,
prim_real,
prim_boolean,
prim_time,
prim_address,
prim_url]}
def test_primitives():
import math
assert prim_string.parse("foo") == 'foo'
assert prim_string.unparse("foo") == 'foo'
assert prim_string.parse("*") is None
assert prim_string.unparse(None) == '*'
assert prim_natural.parse("42") == 42
assert prim_natural.unparse(27) == '27'
assert prim_real.unparse(math.pi) == '3.141592653589793'
assert prim_real.parse("4.2e6") == 4200000.0
assert prim_boolean.unparse(False) == 'False'
assert prim_boolean.parse("True") == True
assert prim_address.parse("10.0.27.101") == ip_address('10.0.27.101')
assert prim_address.unparse(ip_address("10.0.27.101")) == '10.0.27.101'
assert prim_address.parse("2001:db8:1:33::c0:ffee") == \
ip_address('2001:db8:1:33::c0:ffee')
assert prim_address.unparse(ip_address("2001:db8:1:33::c0:ffee")) == \
'2001:db8:1:33::c0:ffee'
assert prim_time.parse("2013-07-30 23:19:42") == \
datetime(2013, 7, 30, 23, 19, 42)
assert prim_time.unparse(datetime(2013, 7, 30, 23, 19, 42)) == \
'2013-07-30 23:19:42.000000'
assert prim_time.parse("now") is time_now
assert prim_time.parse("past") is time_past
assert prim_time.parse("future") is time_future
assert prim_time.unparse(time_now) == "now"
assert prim_time.unparse(time_past) == "past"
assert prim_time.unparse(time_future) == "future"
#######################################################################
# Elements and registries
#######################################################################
[docs]class Element(object):
"""
An Element represents a name for a particular type of data with
a specific semantic meaning; it is analogous to an IPFIX Information
Element, or a named column in a relational database.
An Element has a Name by which it can be compared to other Elements,
and a primitive type, which it uses to convert values to and from
strings.
The mPlane reference implementation includes a default registry of
elements; use initialize_registry() to use these.
"""
def __init__(self, name, prim, desc=None, namespace=REGURI_DEFAULT):
super().__init__()
self._name = name
self._prim = prim
self._desc = desc
self._qualname = namespace + ANCHOR_SEP + name
def __str__(self):
return self._name
def __repr__(self):
return "<Element "+self._qualname+" "+repr(self._prim)+" >"
[docs] def name(self):
"""Returns the name of this Element"""
return self._name
[docs] def desc(self):
"""Returns the description of this Element"""
return self._desc
[docs] def qualified_name(self):
"""Returns the name of this Element along with its namespace"""
return self._qualname
[docs] def primitive_name(self):
"""Returns the name of this Element's primitive"""
return self._prim.name
[docs] def parse(self, sval):
"""
Converts a string to a value for this Element; delegates to primitive.
"""
return self._prim.parse(sval)
[docs] def unparse(self, val):
"""
Converts a value to a string for this Element; delegates to primitive.
"""
return self._prim.unparse(val)
[docs] def compatible_with(self, rval):
"""
Determines based on naming rules if this element is compatible with
element rval; that is, if transformation_to will return a function
for turning a value of this element to the other. Compatibility based
on name structure is a future feature; this method currently checks for
name equality only.
"""
return self._name == rval._name
[docs]class Registry(object):
"""
A Registry is a collection of named Elements associated with a
namespace URI, from which it is retrieved.
"""
def __init__(self, uri=None, filename=None, noparse=False):
super().__init__()
self._revision = None
self._elements = collections.OrderedDict()
self._namespaces = set()
# stash URI and parse the registry
if uri:
if noparse:
self._uri = uri
else:
self._parse_from_uri(uri)
elif filename:
self._parse_from_file(filename)
else:
self._parse_from_file()
def __len__(self):
return len(self._elements)
def __getitem__(self, name):
return self._elements[name]
def _add_element(self, elem):
self._elements[elem.name()] = elem
def _include_registry(self, other_registry):
for elem in other_registry._elements.values():
self._add_element(elem)
def _parse_json_bytestream(self, stream):
# Turn the stream into a dict
s = stream.read()
if isinstance(s, bytes):
s = s.decode("utf-8")
d = json.loads(s)
# check format
if d[KEY_REGFMT] != REGFMT_FLAT:
raise ValueError("Unsupported registry format "+str(d[KEY_REGFMT]))
# stash revision
self._revision = int(d[KEY_REGREV])
# get namespace, store it and check for loops
self._uri = d[KEY_REGURI]
if self._uri in self._namespaces:
raise ValueError("Registry include loop at "+self._uri)
self._namespaces.add(self._uri)
# now parse includes depth-first
if KEY_REGINCLUDE in d:
for incuri in d[KEY_REGINCLUDE]:
self._include_registry(registry_for_uri(incuri))
# finally, iterate over elements and add them to the table
for elem in d[KEY_ELEMENTS]:
name = elem[KEY_ELEMNAME]
prim = _prim[elem[KEY_ELEMPRIM]]
if KEY_ELEMDESC in elem:
desc = elem[KEY_ELEMDESC]
else:
desc = None
# Add the element in the subordinate in the parent namespace --
# FIXME probably want to check to make sure this is the right
# thing to do
self._add_element(Element(name, prim, desc, self._uri))
def _parse_from_file(self, filename=None):
if filename is None:
filename = os.path.join(os.path.dirname(__file__), "registry.json")
with open(filename, "r") as stream:
self._parse_json_bytestream(stream)
def _parse_from_uri(self, uri):
if uri == REGURI_DEFAULT:
with open(os.path.join(os.path.dirname(__file__), "registry.json"), "r") as stream:
self._parse_json_bytestream(stream)
else:
# normalize path if is a file or if no scheme is given
# (we assume that is is a file)
scheme = urllib.parse.urlparse(uri).scheme
if scheme == "file" or scheme == "":
uri = "file://" + normalize_path(uri)
try:
with urllib.request.urlopen(uri) as stream:
self._parse_json_bytestream(stream)
except:
raise ValueError("Invalid Registry uri: " + uri)
def _dump_json(self):
d = collections.OrderedDict()
d[KEY_REGFMT] = REGFMT_FLAT
d[KEY_REGREV] = int(self._revision)
d[KEY_REGURI] = self._uri
d[KEY_ELEMENTS] = []
for elem in self._elements.values():
ed = collections.OrderedDict()
ed[KEY_ELEMNAME] = elem.name()
ed[KEY_ELEMPRIM] = elem.primitive_name()
desc = elem.desc()
if desc is not None:
ed[KEY_ELEMDESC] = desc
d[KEY_ELEMENTS].append(ed)
return json.dumps(d, indent=4)
[docs] def uri(self):
"""
Returns the URI by which this registry is known.
"""
return self._uri
_base_registry = None
_registries = {}
def preload_registry(filename=None):
global _registries
_registries[uri] = Registry(filename=filename)
[docs]def initialize_registry(uri=REGURI_DEFAULT):
"""
Initializes the mPlane registry from a URI; if no URI is given,
initializes the registry from the internal core registry.
Call this before doing anything else.
"""
global _base_registry
global _registries
_base_registry = Registry(uri=uri)
_registries[uri] = _base_registry
[docs]def registry_for_uri(uri):
"""
Get a registry for a given URI, maintaining a local cache.
Called when parsing statements; generally not useful in client code.
"""
global _registries
if uri not in _registries:
_registries[uri] = Registry(uri=uri)
return _registries[uri]
[docs]def element(name, reguri=None):
"""
Returns the Element with the given name.
If reguri is given, searches the speficied Registry,
otherwise searches the base Registry.
"""
global _base_registry
global _registries
if reguri:
return _registries[reguri][name]
else:
return _base_registry[name]
def test_registry():
# default registry trough the Registry-Object
base_registry = Registry()
assert repr(base_registry["start"]) == "<Element " + REGURI_DEFAULT + "#start mplane.model.prim_time >"
assert base_registry["start"].name() == "start"
assert base_registry["start"].primitive_name() == "time"
assert base_registry["start"].desc() == "Start time of an event/flow that may have a non-zero duration"
# registry with a parent
test_registry = Registry(os.path.join(os.path.dirname(__file__), os.pardir, "testdata", "registry_with_parent.json"))
assert repr(test_registry["testName"]) == "<Element testdata/registry_with_parent.json#testName mplane.model.prim_time >"
assert test_registry["testName"].name() == "testName"
assert test_registry["testName"].primitive_name() == "time"
assert test_registry["testName"].desc() == "testDesc"
# element from the parent registry
assert repr(test_registry["start"]) == "<Element http://ict-mplane.eu/registry/core#start mplane.model.prim_time >"
assert test_registry["start"].name() == "start"
assert test_registry["start"].primitive_name() == "time"
assert test_registry["start"].desc() == "Start time of an event/flow that may have a non-zero duration"
# overwritten element
assert repr(test_registry["end"]) == "<Element testdata/registry_with_parent.json#end mplane.model.prim_time >"
assert test_registry["end"].name() == "end"
assert test_registry["end"].primitive_name() == "time"
assert test_registry["end"].desc() == "overwritten end"
# default registry through the element-method
initialize_registry()
assert repr(element("start")) == "<Element http://ict-mplane.eu/registry/core#start mplane.model.prim_time >"
assert element("start").name() == "start"
assert element("start").primitive_name() == "time"
assert element("start").desc() == "Start time of an event/flow that may have a non-zero duration"
#######################################################################
# Constraints
#######################################################################
class _Constraint(object):
"""
Represents a set of acceptable values for an element.
The default constraint accepts everything; use
the special instance constraint_all for this.
Clients and components will generally interact with the
Constraint classes through Parameters.
"""
def __init__(self, prim):
super().__init__()
self._prim = prim
def __str__(self):
"""Represents this Constraint as a string"""
return CONSTRAINT_ALL
def __repr__(self):
return "mplane.model.constraint_all"
def met_by(self, val):
"""Determines if this constraint is met by a given value."""
return True
def single_value(self):
"""
If this constraint only allows a single value, return it.
Otherwise, return None. The default constraint allows all values,
so this always returns None.
"""
return None
constraint_all = _Constraint(None)
class _RangeConstraint(_Constraint):
"""Represents acceptable values for an element as an inclusive range"""
def __init__(self, prim, sval=None, a=None, b=None):
super().__init__(prim)
if sval is not None:
(astr, bstr) = sval.split(RANGE_SEP)
self.a = prim.parse(astr)
self.b = prim.parse(bstr)
elif a is not None and b is not None:
self.a = a
self.b = b
else:
raise "RangeConstraint needs either a string "+\
"or an explicit range"
if (self.a > self.b):
(self.a, self.b) = (self.b, self.a)
def __str__(self):
return self._prim.unparse(self.a) + \
RANGE_SEP + \
self._prim.unparse(self.b)
def __repr__(self):
return "mplane.model.RangeConstraint("+repr(self._prim)+\
", "+repr(str(self))+")"
def met_by(self, val):
"""Determines if the value is within the range"""
return (val >= self.a) and (val <= self.b)
def single_value(self):
"""If this constraint only allows a single value, return it. Otherwise, return None."""
if self.a == self.b:
return self.a
else:
return None
class _SetConstraint(_Constraint):
"""Represents acceptable values as a discrete set."""
def __init__(self, prim, sval=None, vs=None):
super().__init__(prim)
if sval is not None:
self.vs = set(map(self._prim.parse, sval.split(SET_SEP)))
elif vs is not None:
self.vs = vs
else:
self.vs = set()
def __str__(self):
return SET_SEP.join(map(self._prim.unparse, self.vs))
def __repr__(self):
return "mplane.model.SetConstraint("+repr(self._prim)+\
", "+repr(str(self))+")"
def met_by(self, val):
"""Determines if the value is a mamber of the set"""
return val in self.vs
def single_value(self):
"""If this constraint only allows a single value, return it. Otherwise, return None."""
if len(self.vs) == 1:
return list(self.vs)[0]
else:
return None
[docs]def parse_constraint(prim, sval):
"""
Given a primitive and a string value, parses a constraint
string (returned via str(constraint)) into an instance of an
appropriate constraint class.
"""
if sval == CONSTRAINT_ALL:
return constraint_all
elif sval.find(RANGE_SEP) > 0:
return _RangeConstraint(prim=prim, sval=sval)
else:
return _SetConstraint(prim=prim, sval=sval)
def test_constraints():
assert constraint_all.met_by("whatever")
assert constraint_all.met_by(None)
rc = parse_constraint(prim_natural,"0 ... 99")
assert not rc.met_by(-1)
assert rc.met_by(0)
assert rc.met_by(33)
assert rc.met_by(99)
assert not rc.met_by(100)
assert str(rc) == "0 ... 99"
sc = parse_constraint(prim_address,"10.0.27.100,10.0.28.103")
assert sc.met_by(ip_address('10.0.28.103'))
assert not sc.met_by(ip_address('10.0.27.103'))
#######################################################################
# Statements
#######################################################################
[docs]class Parameter(Element):
"""
A Parameter is an element which can take a constraint and a value.
In Capabilities, Parameters have constraints and no value; in
Specifications and Results, Parameters have both constraints and
values.
"""
def __init__(self, parent_element, constraint=constraint_all, val=None):
super().__init__(parent_element._name, parent_element._prim)
self._val = None
if isinstance(constraint, str):
self._constraint = parse_constraint(self._prim, constraint)
elif isinstance(constraint, _Constraint):
self._constraint = constraint
else:
self._constraint = _SetConstraint(vs=set([constraint]), prim=self._prim)
self.set_value(val)
def __repr__(self):
return "<Parameter "+str(self)+" "+repr(self._prim)+" "+\
str(self._constraint)+" value "+repr(self._val)+">"
[docs] def has_value(self):
"""Returns True if this component has a value."""
return self._val is not None
[docs] def is_single_value(self):
"""
Returns True if this parameter's Constraint only allows a single value
"""
return self._constraint.single_value() is not None
[docs] def get_single_value(self):
"""
If this parameter's Constraint only allows a single value, returns it
"""
single_val = self._constraint.single_value()
if single_val is not None:
return single_val
else:
return None
[docs] def set_single_value(self):
"""
If this Parameter's Constraint allows only a single value, and this
Paramater does not yet have a value, set the value to the only one
allowed by the Constraint.
"""
if not self.has_value():
self._val = self._constraint.single_value()
[docs] def can_set_value(self, val):
"""
Returns True if the parameter can take the specified value,
False otherwise.
Either takes a value of the correct type for the associated
Primitive, or a string, which will be parsed to a value of
the correct type.
"""
if isinstance(val, str):
val = self._prim.parse(val)
return self._constraint.met_by(val)
[docs] def set_value(self, val):
"""
Sets the value of the Parameter.
Either takes a value of the correct type for the associated Primitive, or
a string, which will be parsed to a value of the correct type.
Raises ValueError if the value is not allowable for the Constraint.
"""
if isinstance(val, str):
val = self._prim.parse(val)
if (val is None) or self._constraint.met_by(val):
self._val = val
else:
raise ValueError(repr(self) + " cannot take value " + repr(val))
[docs] def get_value(self):
"""Returns this Parameter's value"""
return self._val
def _as_tuple(self):
if self._val is not None:
return (self._name, self._prim.unparse(self._val))
else:
return (self._name, str(self._constraint))
def _clear_constraint(self):
self._constraint = constraint_all
[docs]class ResultColumn(Element):
"""
A ResultColumn is an element which can take an array of values.
In Capabilities and Specifications, this array is empty, while in
Results it has one or more values, such that all the ResultColumns
in the Result have the same number of values.
"""
def __init__(self, parent_element):
super().__init__(parent_element._name, parent_element._prim)
self._vals = []
def __repr__(self):
return "<ResultColumn "+str(self)+" "+repr(self._prim)+\
" with "+str(len(self))+" values>"
def __len__(self):
return len(self._vals)
def __getitem__(self, key):
return self._vals[key]
def __setitem__(self, key, val):
# Automatically parse strings
if isinstance(val, str):
val = self._prim.parse(val)
# Automatically extend column to fit
while len(self) < key:
self._vals.append(None)
# Append or replace value
if len(self) == key:
self._vals.append(val)
else:
self._vals[key] = val
def __delitem__(self, key):
del(self._vals[key])
def __iter__(self):
return iter(self._vals)
[docs] def clear(self):
""" Clears values. """
self._vals.clear()
[docs]class Statement(object):
"""
A Statement is an assertion about the properties of a measurement
or other action performed by an mPlane component. This class
contains common implementation for the three kinds of mPlane
statement. Clients and components should use the
:class:`mplane.model.Capability`, :class:`mplane.model.Specification`,
and :class:`mplane.model.Result` classes instead.
"""
def __init__(self, dictval=None, verb=VERB_MEASURE, label=None, token=None, when=None, reguri=None):
super().__init__()
# Make a blank statement
self._version = MPLANE_VERSION
self._params = collections.OrderedDict()
self._metadata = collections.OrderedDict()
self._resultcolumns = collections.OrderedDict()
self._verb = None
self._label = None
self._token = None
self._link = None
self._export = None
if dictval is not None:
# Fill in from dictionary
self._from_dict(dictval)
else:
# Fill in from defaults
self._verb = verb
self._label = label
self._token = token
if when is None:
when = when_infinite
elif isinstance(when, str):
when = When(when)
self._when = when
if reguri is not None:
self._reguri = reguri
else:
for uri in _registries:
self._reguri = uri
break
def __repr__(self):
return "<"+self.kind_str()+": "+self._verb+self._label_repr()+\
" when "+str(self._when)+\
" token "+self.get_token(REPHL)+" schema "+self._schema_hash(REPHL)+\
self._more_repr()+">"
def _more_repr(self):
return ""
def _label_repr(self):
if self._label is None:
return ""
else:
return " ("+self._label+")"
def kind_str(self):
raise NotImplementedError("Cannot instantiate a raw Statement")
def validate(self):
raise NotImplementedError("Cannot instantiate a raw Statement")
[docs] def verb(self):
"""Returns this statement's verb"""
return self._verb
[docs] def add_parameter(self, elem_name, constraint=constraint_all, val=None):
"""Programatically adds a parameter to this Statement."""
self._params[elem_name] = Parameter(element(elem_name, reguri=self._reguri),
constraint=constraint,
val = val)
[docs] def has_parameter(self, elem_name):
"""Returns True if the statement has a parameter with the given name."""
return elem_name in self._params
[docs] def parameter_names(self):
"""Iterates over the names of parameters in this Statement."""
yield from self._params.keys()
[docs] def parameter_values(self):
"""
Returns a dict mapping parameter names to values
for each parameter with a value.
"""
d = {}
for k in self.parameter_names():
v = self.get_parameter_value(k)
if v:
d[k] = v
return d
[docs] def count_parameters(self):
"""Returns the number of parameters in this Statement."""
return len(self._params)
[docs] def count_parameter_values(self):
"""Returns the number of parameters with values in this Statement."""
return sum(map(lambda p: p.has_value(), self._params.values()))
[docs] def get_parameter_value(self, elem_name):
"""Returns the value for a named parameter on this Statement."""
return self._params[elem_name].get_value()
[docs] def set_parameter_value(self, elem_name, value):
"""Programatically sets a value for a parameter on this Statement."""
elem = self._params[elem_name]
elem.set_value(value)
[docs] def can_set_parameter_value(self, elem_name, value):
"""Determines whether a given Parameter can take a value."""
elem = self._params[elem_name]
return elem.can_set_value(value)
[docs] def get_single_parameter_value(self, elem_name):
"""
If a given parameter is single-valued returns
that value, otherwise returns None
"""
return self._params[elem_name].get_single_value()
[docs] def add_result_column(self, elem_name):
"""Programatically adds a result column to this Statement."""
self._resultcolumns[elem_name] = ResultColumn(element(elem_name, reguri=self._reguri))
[docs] def has_result_column(self, elem_name):
"""Returns True if the statement has results column with the given name."""
return elem_name in self._resultcolumns
[docs] def result_column_names(self):
"""Iterates over the names of result columns in this Statement."""
yield from self._resultcolumns.keys()
[docs] def count_result_columns(self):
"""Returns the number of result columns in this Statement."""
return len(self._resultcolumns)
[docs] def count_result_rows(self):
"""Returns the number of result rows in this Statement."""
return functools.reduce(max,
[len(col) for col in self._resultcolumns.values()], 0)
[docs] def get_link(self):
"""
Returns the statement's link URL, which specifies where the next message
in the workflow should be sent to or retrieved from.
"""
return self._link
[docs] def set_link(self, link):
"""Sets the Statement's link URL"""
self._link = link
[docs] def get_export(self):
"""
Returns the Statement's export URL, which specifies where
results will be indirectly exported.
"""
return self._export
[docs] def set_export(self, export):
"""Sets the Statement's export URL."""
self._export = export
[docs] def get_label(self):
"""Returns the Statement's label."""
return self._label
[docs] def set_label(self, label):
"""Sets the statement's label"""
self._label = label
[docs] def when(self):
"""Returns the statement's temporal scope."""
return self._when
[docs] def set_when(self, when, force=False):
"""
Sets the statement's temporal scope. Ensures that the temporal scope is
within the previous temporal scope unless force is True.
Takes either an instance of
mplane.model.When, or a string describing the scope.
"""
if isinstance(when, str):
when = When(when)
if not force and \
(self._when is not None) and \
not when.follows(self._when):
raise ValueError("Cannot set temporal scope "+str(when)+
" within "+str(self._when))
self._when = when
def _schema_hash(self, lim=None):
"""
Returns a hex string uniquely identifying the set of parameters
and result columns (the schema) of this statement.
"""
sstr = self._reguri + \
" p " + " ".join(sorted(self._params.keys())) + \
" r " + " ".join(sorted(self._resultcolumns.keys()))
hstr = hashlib.md5(sstr.encode('utf-8')).hexdigest()
if lim is not None:
return hstr[:lim]
else:
return hstr
def _pv_hash(self, lim=None, astr=None):
"""
Returns a hex string uniquely identifying the set of parameters,
temporal scope, parameter values, and result columns
of this statement. Used as a specification key.
"""
spk = sorted(self._params.keys())
spv = [self._params[k].unparse(self._params[k].get_value()) for k in spk]
tstr = self._reguri + self._verb + " w " + str(self._when) +\
" pk " + " ".join(spk) + \
" pv " + " ".join(spv) + \
" r " + " ".join(sorted(self._resultcolumns.keys()))
if astr:
tstr += astr
hstr = hashlib.md5(tstr.encode('utf-8')).hexdigest()
if lim is not None:
return hstr[:lim]
else:
return hstr
def _mpcv_hash(self, lim=None, astr=None):
"""
Returns a hex string uniquely identifying the set of parameters,
temporal scope, parameter constraints, parameter values, metadata, metadata values,
and result columns (the extended specification) of this statement.
Used as a complete token for statements.
"""
spk = sorted(self._params.keys())
spc = [str(self._params[k]._constraint) for k in spk]
spv = [self._params[k].unparse(self._params[k].get_value()) for k in spk]
smk = sorted(self._metadata.keys())
smv = [self._metadata[k].unparse(self._metadata[k].get_value()) for k in smk]
tstr = self._reguri + self._verb + \
" w " + str(self._when) + \
" pk " + " ".join(spk) + \
" pc " + " ".join(spc) + " pv " + " ".join(spv) + \
" mk " + " ".join(smk) + " mv " + " ".join(smv) + \
" r " + " ".join(sorted(self._resultcolumns.keys())) + \
" ex " + str(self._export)
if astr:
tstr += astr
hstr = hashlib.md5(tstr.encode('utf-8')).hexdigest()
if lim is not None:
return hstr[:lim]
else:
return hstr
[docs] def get_token(self, lim=None):
"""
Returns the token of a Statement.
If a token has not been explicitely set,
it returns the default token for the Statement type.
"""
if self._token is None:
self._token = self._default_token()
if lim is not None and len(self._token) > lim:
return self._token[:lim]
else:
return self._token
def set_token(self, token):
self._token = token
def _default_token(self):
return self._mpcv_hash()
def _result_rows(self):
rows = []
for row_index in range(self.count_result_rows()):
row = []
rows.append(row)
for col in self._resultcolumns.values():
try:
valstr = col._prim.unparse(col[row_index])
except IndexError:
valstr = VALUE_NONE
row.append(valstr)
return rows
[docs] def to_dict(self, token_only=False):
"""
Converts a Statement to a dictionary (for further conversion
to JSON or YAML), which can be passed as the dictval
argument of the appropriate statement constructor.
"""
self.validate()
d = collections.OrderedDict()
d[self.kind_str()] = self._verb
d[KEY_VERSION] = self._version
d[KEY_REGISTRY] = self._reguri
if self._label is not None:
d[KEY_LABEL] = self._label
if self._link is not None:
d[KEY_LINK] = self._link
if self._export is not None:
d[KEY_EXPORT] = self._export
if self._token is not None:
d[KEY_TOKEN] = self._token
d[KEY_WHEN] = str(self._when)
if self.count_parameters() > 0:
d[KEY_PARAMETERS] = {t[0] : t[1] for t in [v._as_tuple()
for v in self._params.values()]}
if self.count_metadata() > 0:
d[KEY_METADATA] = {t[0] : t[1] for t in [v._as_tuple()
for v in self._metadata.values()]}
if self.count_result_columns() > 0:
d[KEY_RESULTS] = [k for k in self._resultcolumns.keys()]
if self.count_result_rows() > 0:
d[KEY_RESULTVALUES] = self._result_rows()
return d
def _params_from_dict(self, d):
"""
Fills in parameters from a dictionary; used internally.
The default implementation interprets dictionary values
as parameter values.
"""
for (k, v) in d.items():
self.add_parameter(k, val=v)
def _from_dict(self, d):
"""
Fills in this Statement with values from a dictionary
produced with to_dict (i.e., as taken from JSON or YAML).
Ignores result values, as these are handled by :func:`Result._from_dict()`;
ignores the schedule section, as this is handled in :func:`Specification._from_dict()`.
"""
self._verb = d[self.kind_str()]
if KEY_VERSION in d:
if int(d[KEY_VERSION]) > MPLANE_VERSION:
raise ValueError("Version mismatch")
if KEY_REGISTRY in d:
self._reguri = d[KEY_REGISTRY]
registry_for_uri(self._reguri) # make sure the registry is loaded
if KEY_LABEL in d:
self._label = d[KEY_LABEL]
if KEY_LINK in d:
self._link = d[KEY_LINK]
if KEY_EXPORT in d:
self._link = d[KEY_EXPORT]
if KEY_TOKEN in d:
self._token = d[KEY_TOKEN]
if KEY_WHEN in d:
self._when = When(d[KEY_WHEN])
if KEY_PARAMETERS in d:
self._params_from_dict(d[KEY_PARAMETERS])
if KEY_METADATA in d:
for (k, v) in d[KEY_METADATA].items():
self.add_metadata(k, v)
if KEY_RESULTS in d:
for v in d[KEY_RESULTS]:
self.add_result_column(v)
def _clear_constraints(self):
for param in self._params.values():
param._clear_constraint()
[docs]class Capability(Statement):
"""
A Capability represents something an mPlane component can do.
Capabilities contain verbs (strings identifying the thing the
component can do), parameters (which must be given by a client
in a Specification in order for the component to do that thing),
metadata (additional information about the process used to do
that thing), and result columns (the data that thing will return).
Capabilities can either be created programatically, using the
add_parameter(), add_metadata(), and add_result_column()
methods, or by reading from a JSON object using parse_json().
"""
def __init__(self, dictval=None, verb=VERB_MEASURE, label=None, token=None, when=None, reguri=None):
super().__init__(dictval=dictval, verb=verb, label=label, token=token, when=when, reguri=reguri)
def _more_repr(self):
return " p/m/r "+str(self.count_parameters())+"/"+\
str(self.count_metadata())+"/"+\
str(self.count_result_columns())
def kind_str(self):
return KIND_CAPABILITY
[docs] def set_when(self, when, force=True):
"""By default, changes to capability temporal scopes are always forced."""
super().set_when(when, force)
[docs] def validate(self):
"""
Checks that this is a valid Capability; i.e., capabilites can not parameter nor results values.
"""
pval = functools.reduce(operator.__or__,
(p.has_value() for p in self._params.values()),
False)
if pval or (self.count_result_rows() > 0):
raise ValueError("Capabilities must have neither parameter nor "+
"result values.")
def _params_from_dict(self, d):
"""
Fills in parameters from a dictionary; used internally.
The Capability implementation interprets dictionary values
as constraints.
"""
for (k, v) in d.items():
self.add_parameter(k, constraint=v)
[docs]class Specification(Statement):
"""
A Specification represents a request for an mPlane component to do
something it has advertised in a Capability.
Capabilities contain verbs (strings identifying the thing the
component can do), parameters (which must be given by a client
in a Specification in order for the component to do that thing),
metadata (additional information about the process used to do
that thing), and result columns (the data that thing will return).
Specifications are created either by passing a Capability the
Specification is intended to use as the capability= argument of
the constructor, or by reading from a JSON object (see model.parse_json()).
"""
def __init__(self, dictval=None, capability=None, verb=VERB_MEASURE, label=None, token=None, when=None, schedule=None):
super().__init__(dictval=dictval, verb=verb, label=label, token=token, when=when)
if dictval is None and capability is not None:
# Build a statement from a capabilitiy
self._verb = capability._verb
self._label = capability._label
self._metadata = capability._metadata
self._params = deepcopy(capability._params)
self._resultcolumns = deepcopy(capability._resultcolumns)
self._reguri = capability._reguri
# inherit from capability only when necessary
if when is None:
self._when = capability._when
# now set values we know we can
for param in self._params.values():
param.set_single_value()
def _more_repr(self):
return " p(v)/m/r "+str(self.count_parameters())+"("+\
str(self.count_parameter_values())+")/"+\
str(self.count_metadata())+"/"+\
str(self.count_result_columns())
def kind_str(self):
return KIND_SPECIFICATION
[docs] def fulfills(self, capability):
""" Returns True if this Speficication fulfills the Capability"""
# verify that the schema hash is equal
if self._schema_hash() != capability._schema_hash():
return False
# Verify that the specification is within the capability's temporal scope
if not self._when.follows(capability.when()):
return False
# Works for me.
return True
[docs] def validate(self):
"""
Checks that this is a valid Specification; i.e., that all parameters have values.
"""
pval = functools.reduce(operator.__and__,
(p.has_value() for p in self._params.values()),
True)
if (not pval) or (self.count_result_rows() > 0):
raise ValueError("Specifications must have parameter values.")
[docs] def is_schedulable(self):
"""
Determine if a specification can be scheduled -- i.e., that its
temporal scope refers to some time in the future at which a
measurement or other operation should take place, or some range of
time for which existing data should be searched and/or retrieved.
Currently, this just checks to see whether the verb is 'query'.
"""
return self._verb != VERB_QUERY
def _default_token(self):
return self._pv_hash()
[docs] def retoken(self, force=False):
"""
Generates a new token, if necessary, taking into account the current time
if a specification has a relative temporal scope.
"""
if force:
self._token = self._default_token()
elif not self._when.is_definite():
self._token = self._pv_hash(astr = repr(self._when.datetimes()))
[docs] def subspec_iterator(self):
"""
Iterates over subordinate specifications if this specification is repeated
(i.e., has a repeated Temporal Scope); otherwise yields self once. Each subordinate
specification has an absolute temporal scope derived from this specification's
relative temporal scope and schedule.
"""
if self._when.is_repeated():
subspec = deepcopy(self) # Brian does not like that but he said it is okay
iter = self._when.iterator()
while 1:
subspec._when = next(iter)
subspec.retoken(True)
yield subspec
else:
yield self
[docs]class Result(Statement):
"""
A result is a statement that a component measured
a given set of values at a given point in time, according to a specification.
Results are generally created by passing the specification the new result responds to as the specification= argument to the constructor. A result inherits its token from the specification it responds to.
"""
def __init__(self, dictval=None, specification=None, verb=VERB_MEASURE, label=None, token=None, when=None):
super().__init__(dictval=dictval, verb=verb, label=label, token=token, when=when)
if dictval is None and specification is not None:
self._verb = specification._verb
self._label = specification._label
self._metadata = specification._metadata
self._params = deepcopy(specification._params)
self._resultcolumns = deepcopy(specification._resultcolumns)
self._reguri = specification._reguri
# assign token from specification
self._token = specification.get_token()
# allow parameters to take values other than constrained
self._clear_constraints()
# inherit from specification only when necessary
if when is not None:
self._when = specification._when
def _more_repr(self):
return " p/m/r(r) "+str(self.count_parameters())+"/"+\
str(self.count_metadata())+"/"+\
str(self.count_result_columns())+"("+\
str(self.count_result_rows())+")"
def kind_str(self):
return KIND_RESULT
[docs] def validate(self):
"""
Checks that this is a valid Result; i.e., that all parameters have values.
"""
pval = functools.reduce(operator.__and__,
(p.has_value() for p in self._params.values()),
True)
if (not pval):
raise ValueError("Results must have parameter values.")
if (not self._when.is_definite()):
raise ValueError("Results must have definite temporal scope.")
def _from_dict(self, d):
"""
Fill in this Result with values from a dictionary
produced with to_dict().
"""
super()._from_dict(d)
column_key = list(self._resultcolumns.keys())
if KEY_RESULTVALUES in d:
for i, row in enumerate(d[KEY_RESULTVALUES]):
for j, val in enumerate(row):
self._resultcolumns[column_key[j]][i] = val
[docs] def set_result_value(self, elem_name, val, row_index=0):
"""
Sets a single result value.
"""
self._resultcolumns[elem_name][row_index] = val
[docs] def schema_dict_iterator(self):
"""
Iterates over each row in this result, yielding a dictionary
mapping all parameter and result column names to their values.
"""
for i in range(self.count_result_rows()):
d = self.parameter_values()
for k in self.result_column_names():
d[k] = self._resultcolumns[k][i]
yield d
#######################################################################
# Notifications
#######################################################################
[docs]class BareNotification(object):
"""
Notifications are used to send additional information between
mPlane clients and components other than measurement statements.
Notifications can either be part of a normal measurement workflow
(as Receipts and Redemptions) or signal exceptional conditions
(as Withdrawals and Interrupts).
This class contains implementation common to all Notifications
which do not contain any information from a related Capability
or Specification.
"""
def __init__(self, dictval=None, token=None):
super().__init__()
if dictval is not None:
self._from_dict(dictval)
else:
self._token = token
[docs]class Exception(BareNotification):
"""
A Component sends an Exception to a Client, or a Client to a
Component, to present a human-readable message about a failure
or non-nominal condition.
The status field is used to store an HTTP
status code corresponding to the exception to the
client and component frameworks.
"""
def __init__(self, token=None, dictval=None, errmsg=None, status=None):
super().__init__(dictval=dictval, token=token)
if dictval is None:
if errmsg is None:
errmsg = "Unspecified exception"
self._errmsg = errmsg
self.status = status
def __repr__(self):
return "<Exception: "+self.get_token()+" "+self._errmsg+">"
def kind_str(self):
return KIND_EXCEPTION
[docs] def get_token(self):
"""
Returns a token that originates from a message that has caused
the Expection or None if the token was explictly not set
"""
return self._token
def set_token(self, token):
self._token = token
def to_dict(self, token_only=False):
d = collections.OrderedDict()
d[KIND_EXCEPTION] = self._token
d[KEY_MESSAGE] = self._errmsg
return d
def _from_dict(self, d):
self._token = d[KIND_EXCEPTION]
self._errmsg = d[KEY_MESSAGE]
class _StatementNotification(Statement):
"""
Common implementation superclass for notifications that
may contain all or part of a related Capability or Specification.
Clients and components should use :class:`mplane.model.Receipt`,
:class:`mplane.model.Redemption`, and :class:`mplane.model.Withdrawal`
directly
"""
def __init__(self, dictval=None, statement=None, verb=VERB_MEASURE, token=None):
super().__init__(dictval=dictval, verb=verb, token=token)
if dictval is None and statement is not None:
self._label = statement._label
self._verb = statement._verb
self._when = statement._when
self._reguri = statement._reguri
self._metadata = statement._metadata
self._params = deepcopy(statement._params)
self._resultcolumns = deepcopy(statement._resultcolumns)
self._token = statement.get_token()
def __repr__(self):
return "<"+self.kind_str()+": "+self._label_repr()+self.get_token()+">"
def to_dict(self, token_only=False):
d = super().to_dict(token_only)
if token_only and self._token is not None:
for sk in (KEY_PARAMETERS, KEY_METADATA, KEY_RESULTS, KEY_LINK, KEY_WHEN):
try:
del(d[sk])
except KeyError:
pass
return d
def kind_str(self):
raise NotImplementedError("Cannot instantiate a raw StatementNotification")
[docs]class Receipt(_StatementNotification):
"""
A component presents a receipt to a Client in lieu of a result, when the
result will not be available in a reasonable amount of time; or to confirm
a Specification """
def __init__(self, dictval=None, specification=None, token=None):
super().__init__(dictval=dictval, statement=specification, token=token)
def kind_str(self):
return KIND_RECEIPT
[docs] def validate(self):
"""
Checks that this is a valid Receipt; performes the same checks as for a Specification.
"""
Specification.validate(self)
[docs]class Redemption(_StatementNotification):
"""
A client presents a Redemption to a component from which it has received
a Receipt in order to get the associated Result.
"""
def __init__(self, dictval=None, receipt=None, token=None):
super().__init__(dictval=dictval, statement=receipt, token=token)
if receipt is not None and token is None:
self._token = receipt.get_token()
def kind_str(self):
return KIND_REDEMPTION
[docs] def validate(self):
"""
Checks that this is a valid Redemption; performes the same checks as for a Specification.
"""
Specification.validate(self)
[docs]class Withdrawal(_StatementNotification):
"""A Withdrawal cancels a Capability"""
def __init__(self, dictval=None, capability=None, token=None):
super().__init__(dictval=dictval, statement=capability, token=token)
def kind_str(self):
return KIND_WITHDRAWAL
[docs] def validate(self):
"""
Checks that this is a valid Withdrawal; performes the same checks as for a Capability.
"""
Capability.validate(self)
[docs]class Interrupt(_StatementNotification):
"""An Interrupt cancels a Specification"""
def __init__(self, dictval=None, specification=None, token=None):
super().__init__(dictval=dictval, statement=specification, token=token)
def kind_str(self):
return KIND_INTERRUPT
[docs] def validate(self):
"""
Checks that this is a valid Interrupt; performes the same checks as for a Specification.
"""
Specification.validate(self)
#######################################################################
# Envelope
#######################################################################
[docs]class Envelope(object):
"""
Envelopes are used to contain other Messages.
"""
def __init__(self, dictval=None, content_type=ENVELOPE_MESSAGE, token=None, label=None, when=None):
super().__init__()
self._version = MPLANE_VERSION
self._messages = []
self._content_type = content_type
self._token = token
self._label = label
self._when = None
if when:
(start, end) = when.datetimes()
self._when = When(a=start,
b=end,
period=when.period())
if dictval is not None:
self._from_dict(dictval)
def __repr__(self):
if self._token:
token_part = " token "+self.get_token()
else:
token_part = ""
return "<envelope: "+self._content_type+\
" ("+str(len(self._messages))+")"+token_part+": "+\
" ".join(map(repr, self._messages))+">"
def __len__(self):
return len(self._messages)
[docs] def trim(self, n):
""" Removes everything except the last n elements """
del self._messages[:-n]
[docs] def append_message(self, msg):
""" Appends a message to an Envelope """
self._messages.append(msg)
[docs] def messages(self):
""" Returns an iterator to iterate over all messages in an Envelope """
return iter(self._messages)
def kind_str(self):
return KIND_ENVELOPE
def to_dict(self, token_only=False):
d = {}
d[self.kind_str()] = self._content_type
d[KEY_VERSION] = self._version
d[KEY_CONTENTS] = [m.to_dict(token_only=token_only) for m in self.messages()]
if self._token is not None:
d[KEY_TOKEN] = self._token
if self._when is not None:
d[KEY_WHEN] = str(self._when)
if self._label is not None:
d[KEY_LABEL] = self._label
return d
def _from_dict(self, d):
self._content_type = d[self.kind_str()]
if KEY_VERSION in d:
if int(d[KEY_VERSION]) > MPLANE_VERSION:
raise ValueError("Version mismatch")
if KEY_TOKEN in d:
self._token = d[KEY_TOKEN]
if KEY_WHEN in d:
self._when = When(d[KEY_WHEN])
if KEY_LABEL in d:
self._label = d[KEY_LABEL]
for md in d[KEY_CONTENTS]:
self.append_message(message_from_dict(md))
[docs] def get_label(self):
""" Returns the label or None if no label has been set """
return self._label
[docs] def get_token(self, lim=None):
""" Returns the token or None if no token has been set """
if self._token is not None and lim is not None and len(self._token) > lim:
return self._token[:lim]
else:
return self._token
def set_token(self, token):
self._token = token
[docs] def when(self):
""" Returns the envelope's temporal scope. (If it's a bunch of multijob results) """
return self._when
#######################################################################
# Utility methods
#######################################################################
[docs]def message_from_dict(d):
"""
Given a dictionary returned from to_dict(), return a decoded
mPlane message (statement or notification).
"""
classmap = { KIND_CAPABILITY : Capability,
KIND_SPECIFICATION : Specification,
KIND_RESULT : Result,
KIND_RECEIPT : Receipt,
KIND_REDEMPTION : Redemption,
KIND_WITHDRAWAL : Withdrawal,
KIND_INTERRUPT : Interrupt,
KIND_EXCEPTION : Exception,
KIND_ENVELOPE : Envelope}
for k in classmap.keys():
if k in d:
return classmap[k](dictval = d)
raise ValueError("Cannot determine message type from "+repr(d))
[docs]def parse_json(jstr):
"""
Parse a JSON object in a string and return the associated mPlane message.
"""
return message_from_dict(json.loads(jstr))
[docs]def unparse_json(msg, token_only=False):
"""
Transform an mPlane message into a JSON object representing it. If
token_only is True, uses tokens only for message types for which that is
appropriate (i.e. Reciepts, Redemptions, Withdrawals, and Interrupts).
"""
return json.dumps(msg.to_dict(token_only=token_only),
sort_keys=True, indent=2, separators=(',',': '))
def parse_yaml(ystr):
return mplane.model.message_from_dict(yaml.load(ystr))
def unparse_yaml(msg):
return yaml.dump(dict(msg.to_dict()), default_flow_style=False, indent=4)
def render_text(message):
if isinstance(message, Envelope):
for msg in message.messages():
print(render(msg))
else:
print(render(message))
def render(msg):
d = msg.to_dict()
if msg.kind_str() == KIND_EXCEPTION:
out = KIND_EXCEPTION + ": " + d[KIND_EXCEPTION] + "\n"
else:
out = "%s: %s\n" % (msg.kind_str(), msg.verb())
for section in (KEY_MESSAGE, KEY_LABEL, KEY_LINK,
KEY_EXPORT, KEY_TOKEN, KEY_WHEN):
if section in d:
out += " %-12s: %s\n" % (section, d[section])
for section in (KEY_PARAMETERS, KEY_METADATA):
if section in d:
out += " %-12s(%2u): \n" % (section, len(d[section]))
for element in d[section]:
out += " %32s: %s\n" % (element, d[section][element])
if KEY_RESULTVALUES in d:
out += " %-12s(%2u):\n" % (KEY_RESULTVALUES, len(d[KEY_RESULTVALUES]))
for i, row in enumerate(d[KEY_RESULTVALUES]):
out += " result %u:\n" % (i)
for j, val in enumerate(row):
out += " %32s: %s\n" % (d[KEY_RESULTS][j], val)
elif KEY_RESULTS in d:
out += " %-12s(%2u):\n" % (KEY_RESULTS, len(d[KEY_RESULTS]))
for element in d[KEY_RESULTS]:
out += " %s\n" % (element)
return out