fix that last commit.. hrm
[ncclient] / ncclient / operations / rpc.py
1 # Copyright 2009 Shikhar Bhushan
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from threading import Event, Lock
16 from uuid import uuid1
17
18 from ncclient.xml_ import *
19 from ncclient.transport import SessionListener
20
21 from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22
23 import logging
24 logger = logging.getLogger("ncclient.operations.rpc")
25
26
27 class RPCReply:
28
29     """Represents an *<rpc-reply>*. Only concerns itself with whether the
30     operation was successful.
31
32     .. note::
33         If the reply has not yet been parsed there is an implicit, one-time
34         parsing overhead to accessing the attributes defined by this class and
35         any subclasses.
36     """
37
38     def __init__(self, raw):
39         self._raw = raw
40         self._parsed = False
41         self._root = None
42         self._errors = []
43
44     def __repr__(self):
45         return self._raw
46
47     def _parsing_hook(self, root):
48         """Subclass can implement.
49
50         :type root: :class:`~xml.etree.ElementTree.Element`
51         """
52         pass
53
54     def parse(self):
55         """Parse the *<rpc-reply>*"""
56         if self._parsed:
57             return
58         root = self._root = to_ele(self._raw) # The <rpc-reply> element
59         # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
60         ok = root.find(qualify("ok"))
61         if ok is None:
62             # Create RPCError objects from <rpc-error> elements
63             error = root.find(qualify("rpc-error"))
64             if error is not None:
65                 for err in root.getiterator(error.tag):
66                     # Process a particular <rpc-error>
67                     self._errors.append(RPCError(err))
68         self._parsing_hook(root)
69         self._parsed = True
70
71     @property
72     def xml(self):
73         "*<rpc-reply>* as returned"
74         return self._raw
75
76     @property
77     def ok(self):
78         "Boolean value indicating if there were no errors."
79         if not self._parsed:
80             self.parse()
81         return not self._errors # empty list => false
82
83     @property
84     def error(self):
85         """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
86         """
87         if not self._parsed:
88             self.parse()
89         if self._errors:
90             return self._errors[0]
91         else:
92             return None
93
94     @property
95     def errors(self):
96         """`list` of :class:`RPCError` objects. Will be empty if there were no
97         *<rpc-error>* elements in reply.
98         """
99         if not self._parsed:
100             self.parse()
101         return self._errors
102
103
104 class RPCError(OperationError):
105
106     """Represents an *<rpc-error>*. It is a type of :exc:`OperationError`
107     and can be raised like any other exception."""
108
109     def __init__(self, err):
110         self._type = None
111         self._tag = None
112         self._severity = None
113         self._info = None
114         self._path = None
115         self._message = None
116         for subele in err:
117             if subele.tag == qualify("error-type"):
118                 self._type = subele.text
119             elif subele.tag == qualify("error-tag"):
120                 self._tag = subele.text
121             elif subele.tag == qualify("error-severity"):
122                 self._severity = subele.text
123             elif subele.tag == qualify("error-info"):
124                 self._info = subele.text
125             elif subele.tag == qualify("error-path"):
126                 self._path = subele.text
127             elif subele.tag == qualify("error-message"):
128                 self._message = subele.text
129         if self.message is not None:
130             OperationError.__init__(self, self.message)
131         else:
132             OperationError.__init__(self, self.to_dict())
133     
134     def to_dict(self):
135         return {
136             'type': self.type,
137             'tag': self.tag,
138             'severity': self.severity,
139             'path': self.path,
140             'message': self.message,
141             'info': self.info
142         }
143     
144     @property
145     def type(self):
146         "`string` representing text of *error-type* element"
147         return self._type
148     
149     @property
150     def tag(self):
151         "`string` representing text of *error-tag* element"
152         return self._tag
153     
154     @property
155     def severity(self):
156         "`string` representing text of *error-severity* element"
157         return self._severity
158     
159     @property
160     def path(self):
161         "`string` or :const:`None`; representing text of *error-path* element"
162         return self._path
163
164     @property
165     def message(self):
166         "`string` or :const:`None`; representing text of *error-message* element"
167         return self._message
168
169     @property
170     def info(self):
171         "`string` (XML) or :const:`None`, representing *error-info* element"
172         return self._info
173
174
175 class RPCReplyListener(SessionListener):
176
177     # internal use
178
179     # one instance per session -- maybe there is a better way??
180     def __new__(cls, session):
181         instance = session.get_listener_instance(cls)
182         if instance is None:
183             instance = object.__new__(cls)
184             instance._lock = Lock()
185             instance._id2rpc = {}
186             #instance._pipelined = session.can_pipeline
187             session.add_listener(instance)
188         return instance
189
190     def register(self, id, rpc):
191         with self._lock:
192             self._id2rpc[id] = rpc
193
194     def callback(self, root, raw):
195         tag, attrs = root
196         if tag != qualify("rpc-reply"):
197             return
198         for key in attrs: # in the <rpc-reply> attributes
199             if key == "message-id": # if we found msgid attr
200                 id = attrs[key] # get the msgid
201                 with self._lock:
202                     try:                    
203                         rpc = self._id2rpc[id] # the corresponding rpc
204                         logger.debug("Delivering to %r" % rpc)
205                         rpc.deliver_reply(raw)
206                     except KeyError:
207                         raise OperationError("Unknown message-id: %s", id)
208                     # no catching other exceptions, fail loudly if must
209                     else:
210                         # if no error delivering, can del the reference to the RPC
211                         del self._id2rpc[id]
212                         break
213         else:
214             raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
215     
216     def errback(self, err):
217         try:
218             for rpc in self._id2rpc.values():
219                 rpc.deliver_error(err)
220         finally:
221             self._id2rpc.clear()
222
223
224 class RPC(object):
225
226     """Base class for all operations.
227
228     Directly corresponds to *<rpc>* requests. Handles making the request, and
229     taking delivery of the reply.
230     """
231
232     #: Subclasses can specify their dependencies on capabilities. List of URI's
233     # or abbreviated names, e.g. ':writable-running'. These are verified at the
234     # time of object creation. If the capability is not available, a
235     # :exc:`MissingCapabilityError` is raised.
236     DEPENDS = []
237
238     #: Subclasses can specify a different reply class, but it must be a
239     # subclass of :class:`RPCReply`.
240     REPLY_CLS = RPCReply
241
242     def __init__(self, session, async=False, timeout=None, raise_mode="none"):
243         self._session = session
244         try:
245             for cap in self.DEPENDS:
246                 self._assert(cap)
247         except AttributeError:
248             pass
249         self._async = async
250         self._timeout = timeout
251         self._raise_mode = raise_mode
252         self._id = uuid1().urn # Keeps things simple instead of having a class attr that has to be locked
253         self._listener = RPCReplyListener(session)
254         self._listener.register(self._id, self)
255         self._reply = None
256         self._error = None
257         self._event = Event()
258
259     def _build(self, subele):
260         # internal
261         ele = new_ele("rpc", {"message-id": self._id}, xmlns=BASE_NS_1_0)
262         ele.append(subele)
263         return to_xml(ele)
264
265     def _request(self, op):
266         """Subclasses call this method to make the RPC request.
267         
268         In synchronous mode, waits until the reply is received and returns
269         :class:`RPCReply`.
270         
271         In asynchronous mode, returns immediately, returning a reference to this
272         object. The :attr:`event` attribute will be set when the reply has been
273         received (see :attr:`reply`) or an error occured (see :attr:`error`).
274         
275         :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
276         :rtype: :class:`RPCReply` (sync) or :class:`RPC` (async)
277         """
278         logger.info('Requesting %r' % self.__class__.__name__)
279         req = self._build(op)
280         self._session.send(req)
281         if self._async:
282             logger.debug('Async request, returning %r', self)
283             return self
284         else:
285             logger.debug('Sync request, will wait for timeout=%r' %
286                          self._timeout)
287             self._event.wait(self._timeout)
288             if self._event.isSet():
289                 if self._error:
290                     # Error that prevented reply delivery
291                     raise self._error
292                 self._reply.parse()
293                 if self._reply.error is not None:
294                     # <rpc-error>'s [ RPCError ]
295                     if self._raise_mode == "all":
296                         raise self._reply.error
297                     elif (self._raise_mode == "errors" and
298                           self._reply.error.type == "error"):
299                         raise self._reply.error
300                 return self._reply
301             else:
302                 raise TimeoutExpiredError
303
304     def request(self, *args, **kwds):
305         "Subclasses implement this method."
306         return self._request(self.SPEC)
307     
308     def _assert(self, capability):
309         """Subclasses can use this method to verify that a capability is available
310         with the NETCONF server, before making a request that requires it. A
311         :exc:`MissingCapabilityError` will be raised if the capability is not
312         available."""
313         if capability not in self._session.server_capabilities:
314             raise MissingCapabilityError('Server does not support [%s]' %
315                                          capability)
316     
317     def deliver_reply(self, raw):
318         # internal use
319         self._reply = self.REPLY_CLS(raw)
320         self._event.set()
321
322     def deliver_error(self, err):
323         # internal use
324         self._error = err
325         self._event.set()
326
327     @property
328     def reply(self):
329         ":class:`RPCReply` element if reply has been received or :const:`None`"
330         return self._reply
331
332     @property
333     def error(self):
334         """:exc:`Exception` type if an error occured or :const:`None`.
335         
336         .. note::
337             This represents an error which prevented a reply from being
338             received. An *<rpc-error>* does not fall in that category -- see
339             :class:`RPCReply` for that.
340         """
341         return self._error
342
343     @property
344     def id(self):
345         "The *message-id* for this RPC"
346         return self._id
347
348     @property
349     def session(self):
350         """The :class:`~ncclient.transport.Session` object associated with this
351         RPC"""
352         return self._session
353
354     @property
355     def event(self):
356         """:class:`~threading.Event` that is set when reply has been received or
357         error occured."""
358         return self._event
359
360     def set_async(self, async=True):
361         """Set asynchronous mode for this RPC."""
362         self._async = async
363         if async and not session.can_pipeline:
364             raise UserWarning('Asynchronous mode not supported for this device/session')
365
366     def set_raise_mode(self, mode):
367         assert(choice in ("all", "errors", "none"))
368         self._raise_mode = mode
369
370     def set_timeout(self, timeout):
371         """Set the timeout for synchronous waiting; defining how long the RPC
372         request will block on a reply before raising an error. Irrelevant for
373         asynchronous usage."""
374         self._timeout = timeout
375
376     #: Whether this RPC is asynchronous
377     is_async = property(fget=lambda self: self._async, fset=set_async)
378
379     #: Timeout for synchronous waiting
380     timeout = property(fget=lambda self: self._timeout, fset=set_timeout)