1 # Copyright 2009 Shikhar Bhushan
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from threading import Event, Lock
16 from uuid import uuid1
17 from weakref import WeakValueDictionary
19 from ncclient import content
20 from ncclient.transport import SessionListener
22 from errors import OperationError
25 logger = logging.getLogger('ncclient.operations.rpc')
30 def __init__(self, raw):
39 def _parsing_hook(self, root): pass
44 root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
45 # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
46 ok = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
48 logger.debug('parsed [%s]' % ok.tag)
49 else: # create RPCError objects from <rpc-error> elements
50 error = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
52 logger.debug('parsed [%s]' % error.tag)
53 for err in root.getiterator(error.tag):
54 # process a particular <rpc-error>
56 for err_detail in err.getchildren(): # <error-type> etc..
57 tag = content.unqualify(err_detail.tag)
58 if tag != 'error-info':
59 d[tag] = err_detail.text.strip()
61 d[tag] = content.ele2xml(err_detail)
62 self._errors.append(RPCError(d))
63 self._parsing_hook(root)
68 '<rpc-reply> as returned'
75 return not self._errors # empty list => false
82 return self._errors[0]
88 'List of RPCError objects. Will be empty if no <rpc-error> elements in reply.'
94 class RPCError(OperationError): # raise it if you like
96 def __init__(self, err_dict):
98 if self.message is not None:
99 OperationError.__init__(self, self.message)
101 OperationError.__init__(self)
105 return self.get('error-type', None)
109 return self.get('error-severity', None)
113 return self.get('error-tag', None)
117 return self.get('error-path', None)
121 return self.get('error-message', None)
125 return self.get('error-info', None)
127 ## dictionary interface
129 __getitem__ = lambda self, key: self._dict.__getitem__(key)
131 __iter__ = lambda self: self._dict.__iter__()
133 __contains__ = lambda self, key: self._dict.__contains__(key)
135 keys = lambda self: self._dict.keys()
137 get = lambda self, key, default: self._dict.get(key, default)
139 iteritems = lambda self: self._dict.iteritems()
141 iterkeys = lambda self: self._dict.iterkeys()
143 itervalues = lambda self: self._dict.itervalues()
145 values = lambda self: self._dict.values()
147 items = lambda self: self._dict.items()
149 __repr__ = lambda self: repr(self._dict)
152 class RPCReplyListener(SessionListener):
154 # one instance per session
155 def __new__(cls, session):
156 instance = session.get_listener_instance(cls)
158 instance = object.__new__(cls)
159 instance._lock = Lock()
160 instance._id2rpc = WeakValueDictionary()
161 instance._pipelined = session.can_pipeline
162 session.add_listener(instance)
165 def register(self, id, rpc):
167 self._id2rpc[id] = rpc
169 def callback(self, root, raw):
171 if content.unqualify(tag) != 'rpc-reply':
175 if content.unqualify(key) == 'message-id':
179 rpc = self._id2rpc.pop(id)
181 logger.warning('no object registered for message-id: [%s]' % id)
182 except Exception as e:
183 logger.debug('error - %r' % e)
186 if not self._pipelined:
188 assert(len(self._id2rpc) == 1)
189 rpc = self._id2rpc.values()[0]
192 logger.warning('<rpc-reply> without message-id received: %s' % raw)
193 logger.debug('delivering to %r' % rpc)
196 def errback(self, err):
197 for rpc in self._id2rpc.values():
206 def __init__(self, session, async=False, timeout=None):
207 if not session.can_pipeline:
208 raise UserWarning('Asynchronous mode not supported for this device/session')
209 self._session = session
211 for cap in self.DEPENDS:
213 except AttributeError:
216 self._timeout = timeout
217 # keeps things simple instead of having a class attr that has to be locked
218 self._id = uuid1().urn
219 # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
220 self._listener = RPCReplyListener(session)
221 self._listener.register(self._id, self)
224 self._reply_event = Event()
226 def _build(self, opspec):
229 'tag': content.qualify('rpc'),
230 'attrib': {'message-id': self._id},
233 return content.dtree2xml(spec)
235 def _request(self, op):
236 req = self._build(op)
237 self._session.send(req)
239 return self._reply_event
241 self._reply_event.wait(self._timeout)
242 if self._reply_event.isSet():
248 raise ReplyTimeoutError
251 return self._request(self.SPEC)
253 def _delivery_hook(self):
257 def _assert(self, capability):
258 if capability not in self._session.server_capabilities:
259 raise MissingCapabilityError('Server does not support [%s]' % cap)
261 def deliver(self, raw):
262 self._reply = self.REPLY_CLS(raw)
263 self._delivery_hook()
264 self._reply_event.set()
266 def error(self, err):
268 self._reply_event.set()
272 return self._reply_event.is_set()
289 def reply_event(self):
290 return self._reply_event
292 def set_async(self, bool): self._async = bool
293 async = property(fget=lambda self: self._async, fset=set_async)
295 def set_timeout(self, timeout): self._timeout = timeout
296 timeout = property(fget=lambda self: self._timeout, fset=set_timeout)