from threading import Event, Lock
from uuid import uuid1
-from weakref import WeakValueDictionary
-from ncclient import content
+from ncclient.xml_ import *
from ncclient.transport import SessionListener
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
import logging
-logger = logging.getLogger('ncclient.operations.rpc')
+logger = logging.getLogger("ncclient.operations.rpc")
+
+
+class RPCError(OperationError):
+
+ "Represents an `rpc-error`. It is a type of :exc:`OperationError` and can be raised as such."
+
+ tag_to_attr = {
+ qualify("error-type"): "_type",
+ qualify("error-tag"): "_tag",
+ qualify("error-severity"): "_severity",
+ qualify("error-info"): "_info",
+ qualify("error-path"): "_path",
+ qualify("error-message"): "_message"
+ }
+
+ def __init__(self, raw):
+ self._raw = raw
+ for attr in RPCError.tag_to_attr.values():
+ setattr(self, attr, None)
+ for subele in raw:
+ attr = RPCError.tag_to_attr.get(subele.tag, None)
+ if attr is not None:
+ setattr(self, attr, subele.text if attr != "_info" else to_xml(subele) )
+ if self.message is not None:
+ OperationError.__init__(self, self.message)
+ else:
+ OperationError.__init__(self, self.to_dict())
+
+ def to_dict(self):
+ return dict([ (attr[1:], getattr(self, attr)) for attr in RPCError.tag_to_attr.values() ])
+
+ @property
+ def xml(self):
+ "The `rpc-error` element as returned in XML."
+ return self._raw
+
+ @property
+ def type(self):
+ "The contents of the `error-type` element."
+ return self._type
+
+ @property
+ def tag(self):
+ "The contents of the `error-tag` element."
+ return self._tag
+
+ @property
+ def severity(self):
+ "The contents of the `error-severity` element."
+ return self._severity
+
+ @property
+ def path(self):
+ "The contents of the `error-path` element if present or `None`."
+ return self._path
+
+ @property
+ def message(self):
+ "The contents of the `error-message` element if present or `None`."
+ return self._message
+
+ @property
+ def info(self):
+ "XML string or `None`; representing the `error-info` element."
+ return self._info
class RPCReply:
- """Represents an *<rpc-reply>*. Only concerns itself with whether the
- operation was successful.
+ """Represents an *rpc-reply*. Only concerns itself with whether the operation was successful.
.. note::
- If the reply has not yet been parsed there is an implicit, one-time
- parsing overhead to accessing the attributes defined by this class and
- any subclasses.
+ If the reply has not yet been parsed there is an implicit, one-time parsing overhead to
+ accessing some of the attributes defined by this class.
"""
-
+
+ ERROR_CLS = RPCError
+ "Subclasses can specify a different error class, but it should be a subclass of `RPCError`."
+
def __init__(self, raw):
self._raw = raw
self._parsed = False
def __repr__(self):
return self._raw
-
- def _parsing_hook(self, root):
- """Subclass can implement.
-
- :type root: :class:`~xml.etree.ElementTree.Element`
- """
- pass
-
+
def parse(self):
- """Parse the *<rpc-reply>*"""
- if self._parsed:
- return
- root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
- # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
- ok = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
- if ok is not None:
- logger.debug('parsed [%s]' % ok.tag)
- else: # create RPCError objects from <rpc-error> elements
- error = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
+ "Parses the *rpc-reply*."
+ if self._parsed: return
+ root = self._root = to_ele(self._raw) # The <rpc-reply> element
+ # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
+ ok = root.find(qualify("ok"))
+ if ok is None:
+ # Create RPCError objects from <rpc-error> elements
+ error = root.find(qualify("rpc-error"))
if error is not None:
- logger.debug('parsed [%s]' % error.tag)
for err in root.getiterator(error.tag):
- # process a particular <rpc-error>
- d = {}
- for err_detail in err.getchildren(): # <error-type> etc..
- tag = content.unqualify(err_detail.tag)
- if tag != 'error-info':
- d[tag] = err_detail.text.strip()
- else:
- d[tag] = content.ele2xml(err_detail)
- self._errors.append(RPCError(d))
+ # Process a particular <rpc-error>
+ self._errors.append(self.ERROR_CLS(err))
self._parsing_hook(root)
self._parsed = True
+ def _parsing_hook(self, root):
+ "No-op by default. Gets passed the *root* element for the reply."
+ pass
+
@property
def xml(self):
- "*<rpc-reply>* as returned"
+ "*rpc-reply* element as returned."
return self._raw
-
+
@property
def ok(self):
"Boolean value indicating if there were no errors."
- if not self._parsed:
- self.parse()
- return not self._errors # empty list => false
-
+ return not self.errors # empty list => false
+
@property
def error(self):
- """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
- """
- if not self._parsed:
- self.parse()
+ "Returns the first :class:`RPCError` and `None` if there were no errors."
+ self.parse()
if self._errors:
return self._errors[0]
else:
return None
-
+
@property
def errors(self):
- """`list` of :class:`RPCError` objects. Will be empty if there were no
- *<rpc-error>* elements in reply.
- """
- if not self._parsed:
- self.parse()
+ "List of `RPCError` objects. Will be empty if there were no *rpc-error* elements in reply."
+ self.parse()
return self._errors
-class RPCError(OperationError): # raise it if you like
-
- """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
- so it can be raised like any other exception."""
-
- def __init__(self, err_dict):
- self._dict = err_dict
- if self.message is not None:
- OperationError.__init__(self, self.message)
- else:
- OperationError.__init__(self)
-
- @property
- def type(self):
- "`string` represeting *error-type* element"
- return self.get('error-type', None)
-
- @property
- def severity(self):
- "`string` represeting *error-severity* element"
- return self.get('error-severity', None)
-
- @property
- def tag(self):
- "`string` represeting *error-tag* element"
- return self.get('error-tag', None)
-
- @property
- def path(self):
- "`string` or :const:`None`; represeting *error-path* element"
- return self.get('error-path', None)
-
- @property
- def message(self):
- "`string` or :const:`None`; represeting *error-message* element"
- return self.get('error-message', None)
-
- @property
- def info(self):
- "`string` or :const:`None`, represeting *error-info* element"
- return self.get('error-info', None)
-
- ## dictionary interface
-
- __getitem__ = lambda self, key: self._dict.__getitem__(key)
-
- __iter__ = lambda self: self._dict.__iter__()
-
- __contains__ = lambda self, key: self._dict.__contains__(key)
-
- keys = lambda self: self._dict.keys()
-
- get = lambda self, key, default: self._dict.get(key, default)
-
- iteritems = lambda self: self._dict.iteritems()
-
- iterkeys = lambda self: self._dict.iterkeys()
-
- itervalues = lambda self: self._dict.itervalues()
-
- values = lambda self: self._dict.values()
-
- items = lambda self: self._dict.items()
-
- __repr__ = lambda self: repr(self._dict)
-
-
-class RPCReplyListener(SessionListener):
-
- # internal use
-
- # one instance per session
+class RPCReplyListener(SessionListener): # internal use
+
+ creation_lock = Lock()
+
+ # one instance per session -- maybe there is a better way??
def __new__(cls, session):
- instance = session.get_listener_instance(cls)
- if instance is None:
- instance = object.__new__(cls)
- instance._lock = Lock()
- instance._id2rpc = WeakValueDictionary()
- instance._pipelined = session.can_pipeline
- session.add_listener(instance)
- return instance
+ with RPCReplyListener.creation_lock:
+ instance = session.get_listener_instance(cls)
+ if instance is None:
+ instance = object.__new__(cls)
+ instance._lock = Lock()
+ instance._id2rpc = {}
+ #instance._pipelined = session.can_pipeline
+ session.add_listener(instance)
+ return instance
def register(self, id, rpc):
with self._lock:
def callback(self, root, raw):
tag, attrs = root
- if content.unqualify(tag) != 'rpc-reply':
+ if tag != qualify("rpc-reply"):
return
- rpc = None
- for key in attrs:
- if content.unqualify(key) == 'message-id':
- id = attrs[key]
- try:
- with self._lock:
- rpc = self._id2rpc.pop(id)
- except KeyError:
- logger.warning('no object registered for message-id: [%s]' % id)
- except Exception as e:
- logger.debug('error - %r' % e)
- break
- else:
- if not self._pipelined:
+ for key in attrs: # in the <rpc-reply> attributes
+ if key == "message-id": # if we found msgid attr
+ id = attrs[key] # get the msgid
with self._lock:
- assert(len(self._id2rpc) == 1)
- rpc = self._id2rpc.values()[0]
- self._id2rpc.clear()
- else:
- logger.warning('<rpc-reply> without message-id received: %s' % raw)
- logger.debug('delivering to %r' % rpc)
- rpc.deliver_reply(raw)
-
+ try:
+ rpc = self._id2rpc[id] # the corresponding rpc
+ logger.debug("Delivering to %r" % rpc)
+ rpc.deliver_reply(raw)
+ except KeyError:
+ raise OperationError("Unknown 'message-id': %s", id)
+ # no catching other exceptions, fail loudly if must
+ else:
+ # if no error delivering, can del the reference to the RPC
+ del self._id2rpc[id]
+ break
+ else:
+ raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
+
def errback(self, err):
- for rpc in self._id2rpc.values():
- rpc.deliver_error(err)
+ try:
+ for rpc in self._id2rpc.values():
+ rpc.deliver_error(err)
+ finally:
+ self._id2rpc.clear()
-class RPC(object):
+class RaiseMode(object):
- """Base class for all operations.
+ NONE = 0
+ "Don't attempt to raise any type of `rpc-error` as :exc:`RPCError`."
- Directly corresponds to *<rpc>* requests. Handles making the request, and
- taking delivery of the reply.
- """
+ ERRORS = 1
+ "Raise only when the `error-type` indicates it is an honest-to-god error."
- # : Subclasses can specify their dependencies on capabilities. List of URI's
- # or abbreviated names, e.g. ':writable-running'. These are verified at the
- # time of object creation. If the capability is not available, a
- # :exc:`MissingCapabilityError` is raised.
- DEPENDS = []
+ ALL = 2
+ "Don't look at the `error-type`, always raise."
+
+
+class RPC(object):
+
+ """Base class for all operations, directly corresponding to *rpc* requests. Handles making the request, and taking delivery of the reply."""
- # : Subclasses can specify a different reply class, but it must be a
- # subclass of :class:`RPCReply`.
+ DEPENDS = []
+ """Subclasses can specify their dependencies on capabilities as a list of URI's or abbreviated names, e.g. ':writable-running'. These are verified at the time of instantiation. If the capability is not available, :exc:`MissingCapabilityError` is raised."""
+
REPLY_CLS = RPCReply
+ "By default :class:`RPCReply`. Subclasses can specify a :class:`RPCReply` subclass."
+
+ def __init__(self, session, async=False, timeout=30, raise_mode=RaiseMode.NONE):
+ """
+ *session* is the :class:`~ncclient.transport.Session` instance
+
+ *async* specifies whether the request is to be made asynchronously, see :attr:`is_async`
- def __init__(self, session, async=False, timeout=None):
+ *timeout* is the timeout for a synchronous request, see :attr:`timeout`
+
+ *raise_mode* specifies the exception raising mode, see :attr:`raise_mode`
+ """
self._session = session
try:
for cap in self.DEPENDS:
pass
self._async = async
self._timeout = timeout
- # keeps things simple instead of having a class attr that has to be locked
- self._id = uuid1().urn
- # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
+ self._raise_mode = raise_mode
+ self._id = uuid1().urn # Keeps things simple instead of having a class attr with running ID that has to be locked
self._listener = RPCReplyListener(session)
self._listener.register(self._id, self)
self._reply = None
self._error = None
self._event = Event()
-
- def _build(self, opspec):
- # internal
- spec = {
- 'tag': content.qualify('rpc'),
- 'attrib': {'message-id': self._id},
- 'subtree': [ opspec ]
- }
- return content.dtree2xml(spec)
+
+ def _wrap(self, subele):
+ # internal use
+ ele = new_ele("rpc", {"message-id": self._id}, xmlns=BASE_NS_1_0)
+ ele.append(subele)
+ return to_xml(ele)
def _request(self, op):
- """Subclasses call this method to make the RPC request.
-
- In asynchronous mode, returns an :class:`~threading.Event` which is set
- when the reply has been received or an error occured. It is prudent,
- therefore, to check the :attr:`error` attribute before accesing
- :attr:`reply`.
-
- Otherwise, waits until the reply is received and returns
- :class:`RPCReply`.
-
- :arg opspec: :ref:`dtree` for the operation
- :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
- :rtype: :class:`~threading.Event` or :class:`RPCReply`
+ """Implementations of :meth:`request` call this method to send the request and process the reply.
+
+ In synchronous mode, blocks until the reply is received and returns :class:`RPCReply`. Depending on the :attr:`raise_mode` a `rpc-error` element in the reply may lead to an :exc:`RPCError` exception.
+
+ In asynchronous mode, returns immediately, returning `self`. The :attr:`event` attribute will be set when the reply has been received (see :attr:`reply`) or an error occured (see :attr:`error`).
+
+ *op* is the operation to be requested as an :class:`~xml.etree.ElementTree.Element`
"""
- logger.debug('request %r with opsepc=%r' % (self, op))
- req = self._build(op)
- self.session.send(req)
- if self.async:
- logger.debug('async, returning event')
- return self.event
+ logger.info('Requesting %r' % self.__class__.__name__)
+ req = self._wrap(op)
+ self._session.send(req)
+ if self._async:
+ logger.debug('Async request, returning %r', self)
+ return self
else:
- logger.debug('sync, will wait for timeout=%r' % self.timeout)
- self.event.wait(self.timeout)
- if self.event.isSet():
- if self.error:
+ logger.debug('Sync request, will wait for timeout=%r' % self._timeout)
+ self._event.wait(self._timeout)
+ if self._event.isSet():
+ if self._error:
+ # Error that prevented reply delivery
raise self._error
- self.reply.parse()
- return self.reply
+ self._reply.parse()
+ if self._reply.error is not None:
+ # <rpc-error>'s [ RPCError ]
+ if self._raise_mode == RaiseMode.ALL:
+ raise self._reply.error
+ elif (self._raise_mode == RaiseMode.ERRORS and self._reply.error.type == "error"):
+ raise self._reply.error
+ return self._reply
else:
raise TimeoutExpiredError
- def request(self, *args, **kwds):
- """Subclasses implement this method. Here, the operation is constructed
- in :ref:`dtree`, and the result of :meth:`_request` returned."""
- raise NotImplementedError
-
- def _delivery_hook(self):
- """Subclasses can implement this method. Will be called after
- initialising the :attr:`reply` or :attr:`error` attribute and before
- setting the :attr:`event`"""
+ def request(self):
+ """Subclasses must implement this method. Typically only the request needs to be built as an
+ :class:`~xml.etree.ElementTree.Element` and everything else can be handed off to
+ :meth:`_request`."""
pass
-
+
def _assert(self, capability):
- """Subclasses can use this method to verify that a capability is available
- with the NETCONF server, before making a request that requires it. A
- :exc:`MissingCapabilityError` will be raised if the capability is not
- available."""
+ """Subclasses can use this method to verify that a capability is available with the NETCONF
+ server, before making a request that requires it. A :exc:`MissingCapabilityError` will be
+ raised if the capability is not available."""
if capability not in self._session.server_capabilities:
- raise MissingCapabilityError('Server does not support [%s]' % cap)
-
+ raise MissingCapabilityError('Server does not support [%s]' % capability)
+
def deliver_reply(self, raw):
# internal use
self._reply = self.REPLY_CLS(raw)
- self._delivery_hook()
self._event.set()
def deliver_error(self, err):
# internal use
self._error = err
- self._delivery_hook()
self._event.set()
-
+
@property
def reply(self):
- ":class:`RPCReply` element if reply has been received or :const:`None`"
+ ":class:`RPCReply` element if reply has been received or `None`"
return self._reply
-
+
@property
def error(self):
- """:exc:`Exception` type if an error occured or :const:`None`.
-
- This attribute should be checked if the request was made asynchronously,
- so that it can be determined if :attr:`event` being set is because of a
- reply or error.
-
+ """:exc:`Exception` type if an error occured or `None`.
+
.. note::
- This represents an error which prevented a reply from being
- received. An *<rpc-error>* does not fall in that category -- see
- :class:`RPCReply` for that.
+ This represents an error which prevented a reply from being received. An *rpc-error*
+ does not fall in that category -- see `RPCReply` for that.
"""
return self._error
-
+
@property
def id(self):
- "The *message-id* for this RPC"
+ "The *message-id* for this RPC."
return self._id
-
+
@property
def session(self):
- """The :class:`~ncclient.transport.Session` object associated with this
- RPC"""
+ "The `~ncclient.transport.Session` object associated with this RPC."
return self._session
@property
def event(self):
- """:class:`~threading.Event` that is set when reply has been received or
- error occured."""
+ """:class:`~threading.Event` that is set when reply has been received or when an error preventing
+ delivery of the reply occurs.
+ """
return self._event
- def set_async(self, async=True):
- """Set asynchronous mode for this RPC."""
+ def __set_async(self, async=True):
self._async = async
if async and not session.can_pipeline:
raise UserWarning('Asynchronous mode not supported for this device/session')
- def set_timeout(self, timeout):
- """Set the timeout for synchronous waiting defining how long the RPC
- request will block on a reply before raising an error."""
- self._timeout = timeout
+ def __set_raise_mode(self, mode):
+ assert(choice in ("all", "errors", "none"))
+ self._raise_mode = mode
- #: Whether this RPC is asynchronous
- async = property(fget=lambda self: self._async, fset=set_async)
+ def __set_timeout(self, timeout):
+ self._timeout = timeout
- #: Timeout for synchronous waiting
- timeout = property(fget=lambda self: self._timeout, fset=set_timeout)
+ raise_mode = property(fget=lambda self: self._raise_mode, fset=__set_raise_mode)
+ """Depending on this exception raising mode, an `rpc-error` in the reply may be raised as an :exc:`RPCError` exception. Valid values are the constants defined in :class:`RaiseMode`. """
+
+ is_async = property(fget=lambda self: self._async, fset=__set_async)
+ """Specifies whether this RPC will be / was requested asynchronously. By default RPC's are synchronous."""
+
+ timeout = property(fget=lambda self: self._timeout, fset=__set_timeout)
+ """Timeout in seconds for synchronous waiting defining how long the RPC request will block on a reply before raising :exc:`TimeoutExpiredError`.
+
+ Irrelevant for asynchronous usage.
+ """