1 # Copyright 2009 Shikhar Bhushan
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from threading import Event, Lock
16 from uuid import uuid1
18 from ncclient.xml_ import *
19 from ncclient.transport import SessionListener
21 from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
24 logger = logging.getLogger("ncclient.operations.rpc")
27 class RPCError(OperationError):
29 """Represents an *rpc-error*. It is a type of :exc:`OperationError` and can be raised like any
33 qualify("error-type"): "_type",
34 qualify("error-tag"): "_tag",
35 qualify("error-severity"): "_severity",
36 qualify("error-info"): "_info",
37 qualify("error-path"): "_path",
38 qualify("error-message"): "_message"
41 def __init__(self, raw):
43 for attr in RPCError.tag_to_attr.values():
44 setattr(self, attr, None)
46 attr = RPCError.tag_to_attr.get(subele.tag, None)
48 setattr(self, attr, subele.text if attr != "_info" else to_xml(subele) )
49 if self.message is not None:
50 OperationError.__init__(self, self.message)
52 OperationError.__init__(self, self.to_dict())
55 return dict([ (attr[1:], getattr(self, attr)) for attr in RPCError.tag_to_attr.values() ])
59 "*rpc-error* element as returned."
64 "`string` representing text of *error-type* element."
69 "`string` representing text of *error-tag* element."
74 "`string` representing text of *error-severity* element."
79 "`string` or :const:`None`; representing text of *error-path* element."
84 "`string` or :const:`None`; representing text of *error-message* element."
89 "`string` (XML) or :const:`None`; representing *error-info* element."
95 """Represents an *rpc-reply*. Only concerns itself with whether the operation was successful.
98 If the reply has not yet been parsed there is an implicit, one-time parsing overhead to
99 accessing the attributes defined by this class and any subclasses.
103 "Subclasses can specify a different error class, but it should be a subclass of `RPCError`."
105 def __init__(self, raw):
115 "Parses the *rpc-reply*."
116 if self._parsed: return
117 root = self._root = to_ele(self._raw) # The <rpc-reply> element
118 # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
119 ok = root.find(qualify("ok"))
121 # Create RPCError objects from <rpc-error> elements
122 error = root.find(qualify("rpc-error"))
123 if error is not None:
124 for err in root.getiterator(error.tag):
125 # Process a particular <rpc-error>
126 self._errors.append(self.ERROR_CLS(err))
127 self._parsing_hook(root)
130 def _parsing_hook(self, root):
135 "*rpc-reply* element as returned."
140 "Boolean value indicating if there were no errors."
141 return not self.errors # empty list => false
145 "Returns the first `RPCError` and :const:`None` if there were no errors."
148 return self._errors[0]
154 """`list` of `RPCError` objects. Will be empty if there were no *rpc-error* elements in
160 class RPCReplyListener(SessionListener): # internal use
162 creation_lock = Lock()
164 # one instance per session -- maybe there is a better way??
165 def __new__(cls, session):
166 with RPCReplyListener.creation_lock:
167 instance = session.get_listener_instance(cls)
169 instance = object.__new__(cls)
170 instance._lock = Lock()
171 instance._id2rpc = {}
172 #instance._pipelined = session.can_pipeline
173 session.add_listener(instance)
176 def register(self, id, rpc):
178 self._id2rpc[id] = rpc
180 def callback(self, root, raw):
182 if tag != qualify("rpc-reply"):
184 for key in attrs: # in the <rpc-reply> attributes
185 if key == "message-id": # if we found msgid attr
186 id = attrs[key] # get the msgid
189 rpc = self._id2rpc[id] # the corresponding rpc
190 logger.debug("Delivering to %r" % rpc)
191 rpc.deliver_reply(raw)
193 raise OperationError("Unknown 'message-id': %s", id)
194 # no catching other exceptions, fail loudly if must
196 # if no error delivering, can del the reference to the RPC
200 raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
202 def errback(self, err):
204 for rpc in self._id2rpc.values():
205 rpc.deliver_error(err)
212 """Base class for all operations, directly corresponding to *rpc* requests. Handles making the
213 request, and taking delivery of the reply."""
216 """Subclasses can specify their dependencies on capabilities. List of URI's or abbreviated
217 names, e.g. ':writable-running'. These are verified at the time of instantiation. If the
218 capability is not available, a :exc:`MissingCapabilityError` is raised.
222 "Subclasses can specify a different reply class, but it should be a subclass of `RPCReply`."
224 def __init__(self, session, async=False, timeout=None, raise_mode="none"):
225 self._session = session
227 for cap in self.DEPENDS:
229 except AttributeError:
232 self._timeout = timeout
233 self._raise_mode = raise_mode
234 self._id = uuid1().urn # Keeps things simple instead of having a class attr with running ID that has to be locked
235 self._listener = RPCReplyListener(session)
236 self._listener.register(self._id, self)
239 self._event = Event()
241 def _wrap(self, subele):
243 ele = new_ele("rpc", {"message-id": self._id}, xmlns=BASE_NS_1_0)
247 def _request(self, op):
248 """Implementations of :meth:`request` call this method to send the request and process the
251 In synchronous mode, blocks until the reply is received and returns `RPCReply`. Depending on
252 the :attr:`raise_mode` a *rpc-error* element in the reply may lead to an :exc:`RPCError`
255 In asynchronous mode, returns immediately, returning *self*. The :attr:`event` attribute
256 will be set when the reply has been received (see :attr:`reply`) or an error occured (see
259 :param op: operation to be requested
260 :type ops: `~xml.etree.ElementTree.Element`
262 :rtype: `RPCReply` (sync) or `RPC` (async)
264 logger.info('Requesting %r' % self.__class__.__name__)
266 self._session.send(req)
268 logger.debug('Async request, returning %r', self)
271 logger.debug('Sync request, will wait for timeout=%r' % self._timeout)
272 self._event.wait(self._timeout)
273 if self._event.isSet():
275 # Error that prevented reply delivery
278 if self._reply.error is not None:
279 # <rpc-error>'s [ RPCError ]
280 if self._raise_mode == "all":
281 raise self._reply.error
282 elif (self._raise_mode == "errors" and
283 self._reply.error.type == "error"):
284 raise self._reply.error
287 raise TimeoutExpiredError
290 """Subclasses must implement this method. Typically only the request needs to be built as an
291 `~xml.etree.ElementTree.Element` and everything else can be handed off to
295 def _assert(self, capability):
296 """Subclasses can use this method to verify that a capability is available with the NETCONF
297 server, before making a request that requires it. A :exc:`MissingCapabilityError` will be
298 raised if the capability is not available."""
299 if capability not in self._session.server_capabilities:
300 raise MissingCapabilityError('Server does not support [%s]' % capability)
302 def deliver_reply(self, raw):
304 self._reply = self.REPLY_CLS(raw)
307 def deliver_error(self, err):
314 "`RPCReply` element if reply has been received or :const:`None`"
319 """:exc:`Exception` type if an error occured or :const:`None`.
322 This represents an error which prevented a reply from being received. An *rpc-error*
323 does not fall in that category -- see `RPCReply` for that.
329 "The *message-id* for this RPC."
334 "The `~ncclient.transport.Session` object associated with this RPC."
339 """`~threading.Event` that is set when reply has been received or when an error preventing
340 delivery of the reply occurs.
344 def set_async(self, async=True):
346 if async and not session.can_pipeline:
347 raise UserWarning('Asynchronous mode not supported for this device/session')
349 def set_raise_mode(self, mode):
350 assert(choice in ("all", "errors", "none"))
351 self._raise_mode = mode
353 def set_timeout(self, timeout):
354 self._timeout = timeout
356 raise_mode = property(fget=lambda self: self._raise_mode, fset=set_raise_mode)
357 """Depending on this exception raising mode, an *rpc-error* in the reply may be raised as
358 :exc:`RPCError` exceptions. Valid values:
360 * ``"all"`` -- any kind of *rpc-error* (error or warning)
361 * ``"errors"`` -- when the *error-type* element says it is an error
362 * ``"none"`` -- neither
365 is_async = property(fget=lambda self: self._async, fset=set_async)
366 """Specifies whether this RPC will be / was requested asynchronously. By default RPC's are
370 timeout = property(fget=lambda self: self._timeout, fset=set_timeout)
371 """Timeout in seconds for synchronous waiting defining how long the RPC request will block on a
372 reply before raising :exc:`TimeoutExpiredError`. By default there is no timeout, represented by
375 Irrelevant for asynchronous usage.