#
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
##
# mPlane Protocol Reference Implementation
# Client SDK API implementation
#
# (c) 2013-2015 mPlane Consortium (http://www.ict-mplane.eu)
# Author: Brian Trammell <brian@trammell.ch>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
import mplane.model
import mplane.utils
from datetime import datetime
import html.parser
import urllib3
from threading import Thread
import queue
import tornado.web
import tornado.httpserver
import tornado.ioloop
CAPABILITY_PATH_ELEM = "capability"
FORGED_DN_HEADER = "Forged-MPlane-Identity"
DEFAULT_IDENTITY = "default"
DEFAULT_PORT = 8888
DEFAULT_HOST = "127.0.0.1"
DEFAULT_REGISTRATION_PATH = "register/capability"
DEFAULT_SPECIFICATION_PATH = "show/specification"
DEFAULT_RESULT_PATH = "register/result"
[docs]class BaseClient(object):
"""
Core implementation of a generic programmatic client.
Used for common client state management between
Client and ClientListener; use one of these instead.
"""
def __init__(self, tls_state, supervisor=False, exporter=None):
self._tls_state = tls_state
self._capabilities = {}
self._capability_labels = {}
self._capability_identities = {}
self._receipt_identities = {}
self._receipts = {}
self._receipt_labels = {}
self._results = {}
self._result_labels = {}
self._supervisor = supervisor
if self._supervisor:
self._exporter = exporter
def _add_capability(self, msg, identity):
"""
Add a capability to internal state. The capability will be recallable
by token, and, if present, by label.
Internal use only; use handle_message instead.
"""
# FIXME retoken on token collision with another identity
token = msg.get_token()
self._capabilities[token] = msg
if msg.get_label():
self._capability_labels[msg.get_label()] = msg
if identity:
self._capability_identities[token] = identity
def _remove_capability(self, msg):
token = msg.get_token()
if token in self._capabilities:
label = self._capabilities[token].get_label()
del self._capabilities[token]
if label and label in self._capability_labels:
del self._capability_labels[label]
def _withdraw_capability(self, msg, identity):
"""
Process a withdrawal. Match the withdrawal to the capability,
first by token, then by schema. Withdrawals that do not match
any known capabilities are dropped silently.
Internal use only; use handle_message instead.
"""
token = msg.get_token()
# FIXME check identity, exception on mismatch
if token in self._capabilities:
self._remove_capability(self._capabilities[token])
else:
# Search all capabilities by schema
for cap in self.capabilities_matching_schema(msg):
self._remove_capability(cap.get_token())
[docs] def capability_for(self, token_or_label):
"""
Retrieve a capability given a token or label.
"""
if token_or_label in self._capability_labels:
return self._capability_labels[token_or_label]
elif token_or_label in self._capabilities:
return self._capabilities[token_or_label]
else:
raise KeyError("no capability for token or label "+token_or_label)
[docs] def identity_for(self, token_or_label, receipt=False):
"""
Retrieve an identity given a capability token or label, or a receipt token.
"""
if not receipt:
if token_or_label in self._capability_identities:
return self._capability_identities[token_or_label]
elif token_or_label in self._capability_labels:
return self._capability_identities[self._capability_labels[token_or_label].get_token()]
else:
raise KeyError("no identity for capability token or label "+token_or_label)
else:
if token_or_label in self._receipt_identities:
return self._receipt_identities[token_or_label]
else:
raise KeyError("no identity for receipt token " + token_or_label)
[docs] def capabilities_matching_schema(self, schema_capability):
"""
Given a capability, return *all* known capabilities matching the
given schema capability. A capability matches a schema capability
if and only if: (1) the capability schemas match and (2) all
constraints in the capability are contained by all constraints
in the schema capability.
Used to programmatically select capabilities matching an
aggregation or other collection operation (e.g. at a supervisor).
"""
# FIXME write this, maybe refactor part back into model.
pass
def _spec_for(self, cap_tol, when, params, relabel=None):
"""
Given a capability token or label, a temporal scope, a dictionary
of parameters, and an optional new label, derive a specification
ready for invocation, and return the capability and specification.
Used internally by derived classes; use invoke_capability instead.
"""
cap = self.capability_for(cap_tol)
spec = mplane.model.Specification(capability=cap)
# set temporal scope
spec.set_when(when)
# fill in parameters
# spec.set_single_values() # this is automatic now
for pname in spec.parameter_names():
if spec.get_parameter_value(pname) is None:
if pname in params:
spec.set_parameter_value(pname, params[pname])
else:
raise KeyError("missing parameter "+pname)
# regenerate token based on parameters and temporal scope
spec.retoken()
# generate label
if relabel:
spec.set_label(relabel)
else:
spec.set_label(cap.get_label() + "-" + str(self._ssn))
self._ssn += 1
return (cap, spec)
def _handle_receipt(self, msg, identity):
self._add_receipt(msg, identity)
def _add_receipt(self, msg, identity):
"""
Add a receipt to internal state. The receipt will be recallable
by token, and, if present, by label.
Internal use only; use handle_message instead.
"""
self._receipt_identities[msg.get_token()] = identity
self._receipts[msg.get_token()] = msg
if msg.get_label():
self._receipt_labels[msg.get_label()] = msg
def _remove_receipt(self, msg):
token = msg.get_token()
if token in self._receipts:
receipt = self._receipts[token]
del self._receipts[token]
label = receipt.get_label()
if label and label in self._receipt_labels:
del self._receipt_labels[label]
def _handle_result(self, msg, identity):
# FIXME check the result identity
# against where we sent the specification to
self._add_result(msg, identity)
def _add_result(self, msg, identity=None):
"""
Add a result to internal state. The result will supercede any receipt
stored for the same token, and will be recallable by token, and,
if present, by label.
Internal use only; use handle_message instead.
"""
receipt = None
try:
if isinstance(msg, mplane.model.Envelope):
# if the result is an envelope containing multijob
# results, keep the receipt until the multijob ends
(start, end) = msg.when().datetimes()
if end < datetime.utcnow():
if self._supervisor:
self._exporter.put_nowait([msg, identity])
receipt = self._receipts[msg.get_token()]
self._remove_receipt(receipt)
else:
receipt = self._receipts[msg.get_token()]
self._remove_receipt(receipt)
except KeyError:
pass
self._results[msg.get_token()] = msg
if not isinstance(msg, mplane.model.Exception):
if msg.get_label():
self._result_labels[msg.get_label()] = msg
else:
if receipt is not None:
self._result_labels[receipt.get_label()] = msg
def _remove_result(self, msg):
token = msg.get_token()
if token in self._results:
label = self._results[token].get_label()
del self._results[token]
if label and label in self._result_labels:
del self._result_labels[label]
[docs] def result_for(self, token_or_label):
"""
return a result for the token if available;
return the receipt for the token otherwise.
"""
# first look in state
if token_or_label in self._receipt_labels:
return self._receipt_labels[token_or_label]
elif token_or_label in self._receipts:
return self._receipts[token_or_label]
elif token_or_label in self._result_labels:
return self._result_labels[token_or_label]
elif token_or_label in self._results:
return self._results[token_or_label]
else:
raise KeyError("no such token or label "+token_or_label)
def _handle_exception(self, msg, identity):
self._add_result(msg)
[docs] def handle_message(self, msg, identity=None):
"""
Handle a message. Used internally to process
mPlane messages received from a component. Can also be used
to inject messages into a client's state.
"""
if (self._supervisor and
not isinstance(msg, mplane.model.Envelope)):
self._exporter.put_nowait([msg, identity])
if isinstance(msg, mplane.model.Capability):
self._add_capability(msg, identity)
elif isinstance(msg, mplane.model.Withdrawal):
self._withdraw_capability(msg, identity)
elif isinstance(msg, mplane.model.Receipt):
self._handle_receipt(msg, identity)
elif isinstance(msg, mplane.model.Result):
self._handle_result(msg, identity)
elif isinstance(msg, mplane.model.Exception):
self._handle_exception(msg, identity)
elif isinstance(msg, mplane.model.Envelope):
if msg.get_token() in self._receipts:
self._handle_result(msg, identity)
else:
for imsg in msg.messages():
self.handle_message(imsg, identity)
else:
raise ValueError("Internal error: unknown message "+repr(msg))
[docs] def forget(self, token_or_label):
"""
forget all receipts and results for the given token or label
"""
if token_or_label in self._result_labels:
result = self._result_labels[token_or_label]
del self._result_labels[token_or_label]
del self._results[result.get_token()]
if token_or_label in self._results:
result = self._results[token_or_label]
del self._results[token_or_label]
if result.get_label():
del self._result_labels[result.get_label()]
if token_or_label in self._receipt_labels:
receipt = self._receipt_labels[token_or_label]
del self._receipt_labels[token_or_label]
del self._receipts[receipt.get_token()]
if token_or_label in self._receipts:
receipt = self._receipts[token_or_label]
del self._receipts[token_or_label]
if receipt.get_label():
del self._receipt_labels[receipt.get_label()]
[docs] def receipt_tokens(self):
"""
list all tokens for outstanding receipts
"""
return tuple(self._receipts.keys())
[docs] def receipt_labels(self):
"""
list all labels for outstanding receipts
"""
return tuple(self._receipt_labels.keys())
[docs] def result_tokens(self):
"""
list all tokens for stored results
"""
return tuple(self._results.keys())
[docs] def result_labels(self):
"""
list all labels for stored results
"""
return tuple(self._result_labels.keys())
[docs] def capability_tokens(self):
"""
list all tokens for stored capabilities
"""
return tuple(self._capabilities.keys())
[docs] def capability_labels(self):
"""
list all labels for stored capabilities
"""
return tuple(self._capability_labels.keys())
[docs]class CrawlParser(html.parser.HTMLParser):
"""
HTML parser class to extract all URLS in a href attributes in
an HTML page. Used to extract links to Capabilities exposed
as link collections.
"""
def __init__(self, **kwargs):
super(CrawlParser, self).__init__(**kwargs)
self.urls = []
def handle_starttag(self, tag, attrs):
attrs = {k: v for (k,v) in attrs}
if tag == "a" and "href" in attrs:
self.urls.append(attrs["href"])
[docs]class HttpInitiatorClient(BaseClient):
"""
Core implementation of an mPlane JSON-over-HTTP(S) client.
Supports client-initiated workflows. Intended for building
client UIs and bots.
"""
def __init__(self, tls_state, default_url=None,
supervisor=False, exporter=None):
"""
initialize a client with a given
default URL an a given TLS state
"""
super().__init__(tls_state, supervisor=supervisor,
exporter=exporter)
self._default_url = default_url
# specification serial number
# used to create labels programmatically
self._ssn = 0
def set_default_url(self, url):
if isinstance(url, str):
self._default_url = urllib3.util.parse_url(url)
else:
self._default_url = url
[docs] def send_message(self, msg, dst_url=None):
"""
send a message, store any result in client state.
"""
# figure out where to send the message
if not dst_url:
dst_url = self._default_url
if isinstance(dst_url, str):
dst_url = urllib3.util.parse_url(dst_url)
pool = self._tls_state.pool_for(dst_url.scheme, dst_url.host, dst_url.port)
headers = {"Content-Type": "application/x-mplane+json"}
if self._tls_state.forged_identity():
headers[FORGED_DN_HEADER] = self._tls_state.forged_identity()
if dst_url.path is not None:
path = dst_url.path
else:
path = "/"
res = pool.urlopen('POST', path,
body=mplane.model.unparse_json(msg).encode("utf-8"),
headers=headers)
if (res.status == 200 and
res.getheader("Content-Type") == "application/x-mplane+json"):
component_identity = self._tls_state.extract_peer_identity(dst_url)
self.handle_message(mplane.model.parse_json(res.data.decode("utf-8")), component_identity)
else:
# Didn't get an mPlane reply. What now?
pass
[docs] def result_for(self, token_or_label):
"""
return a result for the token if available;
attempt to redeem the receipt for the token otherwise;
if not yet redeemable, return the receipt instead.
"""
# go get a raw receipt or result
rr = super().result_for(token_or_label)
# check if it's a Job or Multijob result
if (isinstance(rr, mplane.model.Result) or
isinstance(rr, mplane.model.Envelope)):
# If it's a Multijob result, there may be a receipt
# to retrieve further data.
# In that case, ignore the current result and
# retrieve up-to-date results from the component
if (rr.get_token() not in self.receipt_tokens() and
rr.get_label() not in self.receipt_labels()):
return rr
else:
rr = self._receipts[rr.get_token()]
elif isinstance(rr, mplane.model.Exception):
return rr
# if we're here, we have a receipt. try to redeem it.
self.send_message(mplane.model.Redemption(receipt=rr))
# see if we got a result
if token_or_label in self._result_labels:
return self._result_labels[token_or_label]
elif token_or_label in self._results:
return self._results[token_or_label]
else:
# Nope. Return the receipt.
return rr
[docs] def invoke_capability(self, cap_tol, when, params, relabel=None):
"""
Given a capability token or label, a temporal scope, a dictionary
of parameters, and an optional new label, derive a specification
and send it to the appropriate destination.
"""
(cap, spec) = self._spec_for(cap_tol, when, params, relabel)
spec.validate()
dst_url = urllib3.util.Url(scheme=self._default_url.scheme,
host=self._default_url.host,
port=self._default_url.port,
path=cap.get_link())
self.send_message(spec, dst_url)
return spec
def interrupt_capability(self, cap_tol):
# get the receipt
rr = super().result_for(cap_tol)
interrupt = mplane.model.Interrupt(specification=rr)
dst_url = urllib3.util.Url(scheme=self._default_url.scheme,
host=self._default_url.host,
port=self._default_url.port,
path=self._default_url.path)
self.send_message(interrupt, dst_url)
[docs] def retrieve_capabilities(self, url, urlchain=[], pool=None, identity=None):
"""
connect to the given URL, retrieve and process the
capabilities/withdrawals found there
"""
# detect loops in capability links
if url in urlchain:
return
if not self._default_url:
self.set_default_url(url)
if isinstance(url, str):
url = urllib3.util.parse_url(url)
if identity is None:
identity = self._tls_state.extract_peer_identity(url)
if pool is None:
if url.host is not None:
pool = self._tls_state.pool_for(url.scheme, url.host, url.port)
else:
print("ConnectionPool not defined")
exit(1)
if url.path is not None:
path = url.path
else:
path = "/"
res = pool.request('GET', path)
if res.status == 200:
ctype = res.getheader("Content-Type")
if ctype == "application/x-mplane+json":
# Probably an envelope. Process the message.
self.handle_message(
mplane.model.parse_json(res.data.decode("utf-8")), identity)
elif ctype == "text/html":
# Treat as a list of links to capability messages.
parser = CrawlParser(strict=False)
parser.feed(res.data.decode("utf-8"))
parser.close()
for capurl in parser.urls:
self.retrieve_capabilities(url=capurl,
urlchain=urlchain + [url],
pool=pool, identity=identity)
[docs]class HttpListenerClient(BaseClient):
"""
Core implementation of an mPlane JSON-over-HTTP(S) client.
Supports component-initiated workflows. Intended for building
supervisors.
"""
def __init__(self, config, tls_state=None,
supervisor=False, exporter=None, io_loop=None):
super().__init__(tls_state, supervisor=supervisor,
exporter=exporter)
listen_host = DEFAULT_HOST
if "listen-host" in config["client"]:
listen_host = config["client"]["listen-host"]
listen_port = DEFAULT_PORT
if "listen-port" in config["client"]:
listen_port = config.getint("client", "listen-port")
registration_path = DEFAULT_REGISTRATION_PATH
if "registration-path" in config["client"]:
registration_path = config["client"]["registration-path"]
specification_path = DEFAULT_SPECIFICATION_PATH
if "registration-path" in config["client"]:
specification_path = config["client"]["specification-path"]
result_path = DEFAULT_RESULT_PATH
if "result-path" in config["client"]:
result_path = config["client"]["result-path"]
# Outgoing messages per component identifier
self._outgoing = {}
# specification serial number
# used to create labels programmatically
self._ssn = 0
# Capability
self._callback_capability = {}
# Create a request handler pointing at this client
self._tornado_application = tornado.web.Application([
(r"/" + registration_path, RegistrationHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(r"/" + registration_path + "/", RegistrationHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(r"/" + specification_path, SpecificationHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(r"/" + specification_path + "/", SpecificationHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(r"/" + result_path, ResultHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
(r"/" + result_path + "/", ResultHandler, {'listenerclient': self, 'tlsState': self._tls_state}),
])
http_server = tornado.httpserver.HTTPServer(self._tornado_application, ssl_options=tls_state.get_ssl_options())
# run the server
http_server.listen(listen_port, listen_host)
if io_loop is not None:
cli_t = Thread(target=self.listen_in_background(io_loop))
else:
cli_t = Thread(target=self.listen_in_background)
cli_t.daemon = True
cli_t.start()
[docs] def listen_in_background(self, io_loop=None):
"""
The server listens for requests in background, while
the supervisor console remains accessible
"""
if io_loop is None:
tornado.ioloop.IOLoop.instance().start()
def _push_outgoing(self, identity, msg):
if identity not in self._outgoing:
self._outgoing[identity] = []
self._outgoing[identity].append(msg)
[docs] def invoke_capability(self, cap_tol, when, params, relabel=None, callback_when=None):
"""
Given a capability token or label, a temporal scope, a dictionary
of parameters, and an optional new label, derive a specification
and queue it for retrieval by the appropriate identity (i.e., the
one associated with the capability).
If the identity has indicated it supports callback control,
the optional callback_when parameter queues a callback spec to
schedule the next callback.
"""
# grab cap, spec, and identity
(cap, spec) = self._spec_for(cap_tol, when, params, relabel)
identity = self.identity_for(cap.get_token())
callback_cap = None
if identity in self._callback_capability:
# prepare a callback spec if we need to
callback_cap = self._callback_capability[identity]
if callback_cap and callback_when:
callback_spec = mplane.model.Specification(capability=callback_cap)
callback_spec.set_when(callback_when)
envelope = mplane.model.Envelope()
envelope.append_message(callback_spec)
envelope.append_message(spec)
self._push_outgoing(identity, envelope)
else:
self._push_outgoing(identity, spec)
return spec
def interrupt_capability(self, cap_tol):
# get the receipt
rr = super().result_for(cap_tol)
identity = self.identity_for(rr.get_token(), receipt=True)
interrupt = mplane.model.Interrupt(specification=rr)
self._push_outgoing(identity, interrupt)
def _add_capability(self, msg, identity):
"""
Override Client's add_capability, check for callback control
"""
if msg.verb() == mplane.model.VERB_CALLBACK:
# FIXME this is kind of dodgy; we should do better checks to
# make sure this is a real callback capability
self._callback_capability[identity] = msg
else:
# not a callback control cap, just add the capability
super()._add_capability(msg, identity)
[docs]class MPlaneHandler(tornado.web.RequestHandler):
"""
Abstract tornado RequestHandler that allows a
handler to respond with an mPlane Message.
"""
def _respond_message(self, msg):
"""
Returns an HTTP response containing a JSON message
"""
self.set_status(200)
self.set_header("Content-Type", "application/x-mplane+json")
self.write(mplane.model.unparse_json(msg))
self.finish()
def _respond_plain_text(self, code, text = None):
"""
Returns an HTTP response containing a plain text message
"""
self.set_status(code)
if text is not None:
self.set_header("Content-Type", "text/plain")
self.write(text)
self.finish()
def _respond_json_text(self, code, text = None):
"""
Returns an HTTP response containing a plain text message
"""
self.set_status(code)
if text is not None:
self.set_header("Content-Type", "application/x-mplane+json")
self.write(text)
self.finish()
[docs]class RegistrationHandler(MPlaneHandler):
"""
Handles the probes that want to register to this supervisor
Each capability is registered indipendently
"""
def initialize(self, listenerclient, tlsState):
self._listenerclient = listenerclient
self._tls = tlsState
def post(self):
# unwrap json message from body
if (self.request.headers["Content-Type"] == "application/x-mplane+json"):
env = mplane.model.parse_json(self.request.body.decode("utf-8"))
else:
self._respond_plain_text(400, "Invalid format")
return
self._listenerclient.handle_message(env,
self._tls.extract_peer_identity(self.request))
# reply to the component
response = ""
for new_cap in env.messages():
if isinstance(new_cap, mplane.model.Capability):
response = response + "\"" + new_cap.get_label() + "\":{\"registered\":\"ok\"},"
else:
response = response + "\"" + new_cap.get_label() + "\":{\"registered\":\"no\", \"reason\":\"Not a capability\"},"
response = "{" + response[:-1].replace("\n", "") + "}"
self._respond_json_text(200, response)
return
[docs]class SpecificationHandler(MPlaneHandler):
"""
Exposes the specifications, that will be periodically pulled by the
components
"""
def initialize(self, listenerclient, tlsState):
self._listenerclient = listenerclient
self._tls = tlsState
def get(self):
identity = self._tls.extract_peer_identity(self.request)
specs = self._listenerclient._outgoing.pop(identity, [])
env = mplane.model.Envelope()
for spec in specs:
env.append_message(spec)
if isinstance(spec, mplane.model.Specification):
print("Specification " + spec.get_label() + " successfully pulled by " + identity)
else:
print("Interrupt " + spec.get_token() + " successfully pulled by " + identity)
self._respond_json_text(200, mplane.model.unparse_json(env))
[docs]class ResultHandler(MPlaneHandler):
"""
Receives results of specifications
"""
def initialize(self, listenerclient, tlsState):
self._listenerclient = listenerclient
self._tls = tlsState
def post(self):
# unwrap json message from body
if (self.request.headers["Content-Type"] == "application/x-mplane+json"):
env = mplane.model.parse_json(self.request.body.decode("utf-8"))
else:
self._respond_plain_text(400, "Invalid format")
return
self._listenerclient.handle_message(env,
self._tls.extract_peer_identity(self.request))
self._respond_plain_text(200)
return