a84b112099ea5c8e2bcebd62f1c4f6fce3416001
[ncclient] / ncclient / operations / rpc.py
1 # Copyright 2009 Shikhar Bhushan
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from threading import Event, Lock
16 from uuid import uuid1
17
18 from ncclient.xml_ import *
19 from ncclient.transport import SessionListener
20
21 from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22
23 import logging
24 logger = logging.getLogger("ncclient.operations.rpc")
25
26
27 class RPCReply:
28
29     """Represents an *<rpc-reply>*. Only concerns itself with whether the
30     operation was successful.
31
32     .. note::
33         If the reply has not yet been parsed there is an implicit, one-time
34         parsing overhead to accessing the attributes defined by this class and
35         any subclasses.
36     """
37
38     def __init__(self, raw):
39         self._raw = raw
40         self._parsed = False
41         self._root = None
42         self._errors = []
43
44     def __repr__(self):
45         return self._raw
46
47     def _parsing_hook(self, root):
48         """Subclass can implement.
49
50         :type root: :class:`~xml.etree.ElementTree.Element`
51         """
52         pass
53
54     def parse(self):
55         """Parse the *<rpc-reply>*"""
56         if self._parsed:
57             return
58         root = self._root = to_ele(self._raw) # The <rpc-reply> element
59         # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
60         ok = root.find(qualify("ok"))
61         if ok is None:
62             # Create RPCError objects from <rpc-error> elements
63             error = root.find(qualify("rpc-error"))
64             if error is not None:
65                 for err in root.getiterator(error.tag):
66                     # Process a particular <rpc-error>
67                     self._errors.append(RPCError(err))
68         self._parsing_hook(root)
69         self._parsed = True
70
71     @property
72     def xml(self):
73         "*<rpc-reply>* as returned"
74         return self._raw
75
76     @property
77     def ok(self):
78         "Boolean value indicating if there were no errors."
79         if not self._parsed:
80             self.parse()
81         return not self._errors # empty list => false
82
83     @property
84     def error(self):
85         """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
86         """
87         if not self._parsed:
88             self.parse()
89         if self._errors:
90             return self._errors[0]
91         else:
92             return None
93
94     @property
95     def errors(self):
96         """`list` of :class:`RPCError` objects. Will be empty if there were no
97         *<rpc-error>* elements in reply.
98         """
99         if not self._parsed:
100             self.parse()
101         return self._errors
102
103
104 class RPCError(OperationError):
105
106     """Represents an *<rpc-error>*. It is a type of :exc:`OperationError`
107     and can be raised like any other exception."""
108
109     def __init__(self, err):
110         self._type = None
111         self._severity = None
112         self._info = None
113         self._tag = None
114         self._path = None
115         self._message = None
116         for subele in err:
117             if subele.tag == qualify("error-tag"):
118                 self._tag = subele.text
119             elif subele.tag == qualify("error-severity"):
120                 self._severity = subele.text
121             elif subele.tag == qualify("error-info"):
122                 self._info = subele.text
123             elif subele.tag == qualify("error-path"):
124                 self._path = subele.text
125             elif subele.tag == qualify("error-message"):
126                 self._message = subele.text
127         if self.message is not None:
128             OperationError.__init__(self, self.message)
129         else:
130             OperationError.__init__(self)
131
132     @property
133     def type(self):
134         "`string` representing text of *error-type* element"
135         return self._type
136
137     @property
138     def severity(self):
139         "`string` representing text of *error-severity* element"
140         return self._severity
141
142     @property
143     def tag(self):
144         "`string` representing text of *error-tag* element"
145         return self._tag
146
147     @property
148     def path(self):
149         "`string` or :const:`None`; representing text of *error-path* element"
150         return self._path
151
152     @property
153     def message(self):
154         "`string` or :const:`None`; representing text of *error-message* element"
155         return self._message
156
157     @property
158     def info(self):
159         "`string` (XML) or :const:`None`, representing *error-info* element"
160         return self._info
161
162
163 class RPCReplyListener(SessionListener):
164
165     # internal use
166
167     # one instance per session -- maybe there is a better way??
168     def __new__(cls, session):
169         instance = session.get_listener_instance(cls)
170         if instance is None:
171             instance = object.__new__(cls)
172             instance._lock = Lock()
173             instance._id2rpc = {}
174             #instance._pipelined = session.can_pipeline
175             session.add_listener(instance)
176         return instance
177
178     def register(self, id, rpc):
179         with self._lock:
180             self._id2rpc[id] = rpc
181
182     def callback(self, root, raw):
183         tag, attrs = root
184         if tag != qualify("rpc-reply"):
185             return
186         for key in attrs: # in the <rpc-reply> attributes
187             if key == "message-id": # if we found msgid attr
188                 id = attrs[key] # get the msgid
189                 with self._lock:
190                     try:                    
191                         rpc = self._id2rpc[id] # the corresponding rpc
192                         logger.debug("Delivering to %r" % rpc)
193                         rpc.deliver_reply(raw)
194                     except KeyError:
195                         raise OperationError("Unknown message-id: %s", id)
196                     # no catching other exceptions, fail loudly if must
197                     else:
198                         # if no error delivering, can del the reference to the RPC
199                         del self._id2rpc[id]
200                         break
201         else:
202             raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
203     
204     def errback(self, err):
205         try:
206             for rpc in self._id2rpc.values():
207                 rpc.deliver_error(err)
208         finally:
209             self._id2rpc.clear()
210
211
212 class RPC(object):
213
214     """Base class for all operations.
215
216     Directly corresponds to *<rpc>* requests. Handles making the request, and
217     taking delivery of the reply.
218     """
219
220     #: Subclasses can specify their dependencies on capabilities. List of URI's
221     # or abbreviated names, e.g. ':writable-running'. These are verified at the
222     # time of object creation. If the capability is not available, a
223     # :exc:`MissingCapabilityError` is raised.
224     DEPENDS = []
225
226     #: Subclasses can specify a different reply class, but it must be a
227     # subclass of :class:`RPCReply`.
228     REPLY_CLS = RPCReply
229
230     def __init__(self, session, async=False, timeout=None, raise_mode="none"):
231         self._session = session
232         try:
233             for cap in self.DEPENDS:
234                 self._assert(cap)
235         except AttributeError:
236             pass
237         self._async = async
238         self._timeout = timeout
239         self._raise_mode = raise_mode
240         self._id = uuid1().urn # Keeps things simple instead of having a class attr that has to be locked
241         self._listener = RPCReplyListener(session)
242         self._listener.register(self._id, self)
243         self._reply = None
244         self._error = None
245         self._event = Event()
246
247     def _build(self, subele):
248         # internal
249         ele = new_ele("rpc", {"message-id": self._id}, xmlns=BASE_NS_1_0)
250         ele.append(subele)
251         return to_xml(ele)
252
253     def _request(self, op):
254         """Subclasses call this method to make the RPC request.
255         
256         In synchronous mode, waits until the reply is received and returns
257         :class:`RPCReply`.
258         
259         In asynchronous mode, returns immediately, returning a reference to this
260         object. The :attr:`event` attribute will be set when the reply has been
261         received (see :attr:`reply`) or an error occured (see :attr:`error`).
262         
263         :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
264         :rtype: :class:`RPCReply` (sync) or :class:`RPC` (async)
265         """
266         logger.info('Requesting %r' % self.__class__.__name__)
267         req = self._build(op)
268         self._session.send(req)
269         if self._async:
270             logger.debug('Async request, returning %r', self)
271             return self
272         else:
273             logger.debug('Sync request, will wait for timeout=%r' %
274                          self._timeout)
275             self._event.wait(self._timeout)
276             if self._event.isSet():
277                 if self._error:
278                     # Error that prevented reply delivery
279                     raise self._error
280                 self._reply.parse()
281                 if self._reply.error is not None:
282                     # <rpc-error>'s [ RPCError ]
283                     if self._raise_mode == "all":
284                         raise self._reply.error
285                     elif (self._raise_mode == "errors" and
286                           self._reply.error.type == "error"):
287                         raise self._reply.error
288                 return self._reply
289             else:
290                 raise TimeoutExpiredError
291
292     def request(self, *args, **kwds):
293         "Subclasses implement this method."
294         return self._request(self.SPEC)
295     
296     def _assert(self, capability):
297         """Subclasses can use this method to verify that a capability is available
298         with the NETCONF server, before making a request that requires it. A
299         :exc:`MissingCapabilityError` will be raised if the capability is not
300         available."""
301         if capability not in self._session.server_capabilities:
302             raise MissingCapabilityError('Server does not support [%s]' %
303                                          capability)
304     
305     def deliver_reply(self, raw):
306         # internal use
307         self._reply = self.REPLY_CLS(raw)
308         self._event.set()
309
310     def deliver_error(self, err):
311         # internal use
312         self._error = err
313         self._event.set()
314
315     @property
316     def reply(self):
317         ":class:`RPCReply` element if reply has been received or :const:`None`"
318         return self._reply
319
320     @property
321     def error(self):
322         """:exc:`Exception` type if an error occured or :const:`None`.
323         
324         .. note::
325             This represents an error which prevented a reply from being
326             received. An *<rpc-error>* does not fall in that category -- see
327             :class:`RPCReply` for that.
328         """
329         return self._error
330
331     @property
332     def id(self):
333         "The *message-id* for this RPC"
334         return self._id
335
336     @property
337     def session(self):
338         """The :class:`~ncclient.transport.Session` object associated with this
339         RPC"""
340         return self._session
341
342     @property
343     def event(self):
344         """:class:`~threading.Event` that is set when reply has been received or
345         error occured."""
346         return self._event
347
348     def set_async(self, async=True):
349         """Set asynchronous mode for this RPC."""
350         self._async = async
351         if async and not session.can_pipeline:
352             raise UserWarning('Asynchronous mode not supported for this device/session')
353
354     def set_raise_mode(self, mode):
355         assert(choice in ("all", "errors", "none"))
356         self._raise_mode = mode
357
358     def set_timeout(self, timeout):
359         """Set the timeout for synchronous waiting; defining how long the RPC
360         request will block on a reply before raising an error. Irrelevant for
361         asynchronous usage."""
362         self._timeout = timeout
363
364     #: Whether this RPC is asynchronous
365     is_async = property(fget=lambda self: self._async, fset=set_async)
366
367     #: Timeout for synchronous waiting
368     timeout = property(fget=lambda self: self._timeout, fset=set_timeout)