Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 188649fa

History | View | Annotate | Download (12.8 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20
from ncclient.transport import SessionListener
21

    
22
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
23

    
24
import logging
25
logger = logging.getLogger('ncclient.operations.rpc')
26

    
27

    
28
class RPCReply:
29

    
30
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
31
    operation was successful.
32

33
    .. note::
34
        If the reply has not yet been parsed there is an implicit, one-time
35
        parsing overhead to accessing the attributes defined by this class and
36
        any subclasses.
37
    """
38

    
39
    def __init__(self, raw):
40
        self._raw = raw
41
        self._parsed = False
42
        self._root = None
43
        self._errors = []
44

    
45
    def __repr__(self):
46
        return self._raw
47

    
48
    def _parsing_hook(self, root):
49
        """Subclass can implement.
50

51
        :type root: :class:`~xml.etree.ElementTree.Element`
52
        """
53
        pass
54

    
55
    def parse(self):
56
        """Parse the *<rpc-reply>*"""
57
        if self._parsed:
58
            return
59
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
60
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
61
        ok = content.find(root, 'ok', nslist=[content.BASE_NS, content.CISCO_BS])
62
        if ok is not None:
63
            logger.debug('parsed [%s]' % ok.tag)
64
        else: # create RPCError objects from <rpc-error> elements
65
            error = content.find(root, 'rpc-error', nslist=[content.BASE_NS, content.CISCO_BS])
66
            if error is not None:
67
                logger.debug('parsed [%s]' % error.tag)
68
                for err in root.getiterator(error.tag):
69
                    # process a particular <rpc-error>
70
                    d = {}
71
                    for err_detail in err.getchildren(): # <error-type> etc..
72
                        tag = content.unqualify(err_detail.tag)
73
                        if tag != 'error-info':
74
                            d[tag] = err_detail.text.strip()
75
                        else:
76
                            d[tag] = content.ele2xml(err_detail)
77
                    self._errors.append(RPCError(d))
78
        self._parsing_hook(root)
79
        self._parsed = True
80

    
81
    @property
82
    def xml(self):
83
        "*<rpc-reply>* as returned"
84
        return self._raw
85

    
86
    @property
87
    def ok(self):
88
        "Boolean value indicating if there were no errors."
89
        if not self._parsed:
90
            self.parse()
91
        return not self._errors # empty list => false
92

    
93
    @property
94
    def error(self):
95
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
96
        """
97
        if not self._parsed:
98
            self.parse()
99
        if self._errors:
100
            return self._errors[0]
101
        else:
102
            return None
103

    
104
    @property
105
    def errors(self):
106
        """`list` of :class:`RPCError` objects. Will be empty if there were no
107
        *<rpc-error>* elements in reply.
108
        """
109
        if not self._parsed:
110
            self.parse()
111
        return self._errors
112

    
113

    
114
class RPCError(OperationError): # raise it if you like
115

    
116
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
117
    so it can be raised like any other exception."""
118

    
119
    def __init__(self, err_dict):
120
        self._dict = err_dict
121
        if self.message is not None:
122
            OperationError.__init__(self, self.message)
123
        else:
124
            OperationError.__init__(self)
125

    
126
    @property
127
    def type(self):
128
        "`string` represeting *error-type* element"
129
        return self.get('error-type', None)
130

    
131
    @property
132
    def severity(self):
133
        "`string` represeting *error-severity* element"
134
        return self.get('error-severity', None)
135

    
136
    @property
137
    def tag(self):
138
        "`string` represeting *error-tag* element"
139
        return self.get('error-tag', None)
140

    
141
    @property
142
    def path(self):
143
        "`string` or :const:`None`; represeting *error-path* element"
144
        return self.get('error-path', None)
145

    
146
    @property
147
    def message(self):
148
        "`string` or :const:`None`; represeting *error-message* element"
149
        return self.get('error-message', None)
150

    
151
    @property
152
    def info(self):
153
        "`string` or :const:`None`, represeting *error-info* element"
154
        return self.get('error-info', None)
155

    
156
    ## dictionary interface
157

    
158
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
159

    
160
    __iter__ = lambda self: self._dict.__iter__()
161

    
162
    __contains__ = lambda self, key: self._dict.__contains__(key)
163

    
164
    keys = lambda self: self._dict.keys()
165

    
166
    get = lambda self, key, default: self._dict.get(key, default)
167

    
168
    iteritems = lambda self: self._dict.iteritems()
169

    
170
    iterkeys = lambda self: self._dict.iterkeys()
171

    
172
    itervalues = lambda self: self._dict.itervalues()
173

    
174
    values = lambda self: self._dict.values()
175

    
176
    items = lambda self: self._dict.items()
177

    
178
    __repr__ = lambda self: repr(self._dict)
179

    
180

    
181
class RPCReplyListener(SessionListener):
182

    
183
    # internal use
184

    
185
    # one instance per session
186
    def __new__(cls, session):
187
        instance = session.get_listener_instance(cls)
188
        if instance is None:
189
            instance = object.__new__(cls)
190
            instance._lock = Lock()
191
            instance._id2rpc = WeakValueDictionary()
192
            instance._pipelined = session.can_pipeline
193
            session.add_listener(instance)
194
        return instance
195

    
196
    def register(self, id, rpc):
197
        with self._lock:
198
            self._id2rpc[id] = rpc
199

    
200
    def callback(self, root, raw):
201
        tag, attrs = root
202
        if content.unqualify(tag) != 'rpc-reply':
203
            return
204
        rpc = None
205
        for key in attrs:
206
            if content.unqualify(key) == 'message-id':
207
                id = attrs[key]
208
                try:
209
                    with self._lock:
210
                        rpc = self._id2rpc.pop(id)
211
                except KeyError:
212
                    logger.warning('no object registered for message-id: [%s]' % id)
213
                except Exception as e:
214
                    logger.debug('error - %r' % e)
215
                break
216
        else:
217
            if not self._pipelined:
218
                with self._lock:
219
                    assert(len(self._id2rpc) == 1)
220
                    rpc = self._id2rpc.values()[0]
221
                    self._id2rpc.clear()
222
            else:
223
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
224
        logger.debug('delivering to %r' % rpc)
225
        rpc.deliver_reply(raw)
226

    
227
    def errback(self, err):
228
        for rpc in self._id2rpc.values():
229
            rpc.deliver_error(err)
230

    
231

    
232
class RPC(object):
233

    
234
    """Base class for all operations.
235

236
    Directly corresponds to *<rpc>* requests. Handles making the request, and
237
    taking delivery of the reply.
238
    """
239

    
240
    # : Subclasses can specify their dependencies on capabilities. List of URI's
241
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
242
    # time of object creation. If the capability is not available, a
243
    # :exc:`MissingCapabilityError` is raised.
244
    DEPENDS = []
245

    
246
    # : Subclasses can specify a different reply class, but it must be a
247
    # subclass of :class:`RPCReply`.
248
    REPLY_CLS = RPCReply
249

    
250
    def __init__(self, session, async=False, timeout=None):
251
        self._session = session
252
        try:
253
            for cap in self.DEPENDS:
254
                self._assert(cap)
255
        except AttributeError:
256
            pass
257
        self._async = async
258
        self._timeout = timeout
259
        # keeps things simple instead of having a class attr that has to be locked
260
        self._id = uuid1().urn
261
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
262
        self._listener = RPCReplyListener(session)
263
        self._listener.register(self._id, self)
264
        self._reply = None
265
        self._error = None
266
        self._event = Event()
267

    
268
    def _build(self, opspec):
269
        # internal
270
        spec = {
271
            'tag': 'rpc',
272
            'attrib': {
273
                'xmlns': content.BASE_NS,
274
                'message-id': self._id
275
                },
276
            'subtree': [ opspec ]
277
            }
278
        return content.dtree2xml(spec)
279

    
280
    def _request(self, op):
281
        """Subclasses call this method to make the RPC request.
282

283
        In asynchronous mode, returns an :class:`~threading.Event` which is set
284
        when the reply has been received or an error occured. It is prudent,
285
        therefore, to check the :attr:`error` attribute before accesing
286
        :attr:`reply`.
287

288
        Otherwise, waits until the reply is received and returns
289
        :class:`RPCReply`.
290

291
        :arg opspec: :ref:`dtree` for the operation
292
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
293
        :rtype: :class:`~threading.Event` or :class:`RPCReply`
294
        """
295
        logger.debug('request %r with opsepc=%r' % (self, op))
296
        req = self._build(op)
297
        self._session.send(req)
298
        if self._async:
299
            logger.debug('async, returning event')
300
            return self._event
301
        else:
302
            logger.debug('sync, will wait for timeout=%r' % self._timeout)
303
            self._event.wait(self._timeout)
304
            if self._event.isSet():
305
                if self._error:
306
                    raise self._error
307
                self._reply.parse()
308
                return self._reply
309
            else:
310
                raise TimeoutExpiredError
311

    
312
    def request(self, *args, **kwds):
313
        """Subclasses implement this method. Here, the operation is constructed
314
        in :ref:`dtree`, and the result of :meth:`_request` returned."""
315
        raise NotImplementedError
316

    
317
    def _delivery_hook(self):
318
        """Subclasses can implement this method. Will be called after
319
        initialising the :attr:`reply` or :attr:`error` attribute and before
320
        setting the :attr:`event`"""
321
        pass
322

    
323
    def _assert(self, capability):
324
        """Subclasses can use this method to verify that a capability is available
325
        with the NETCONF server, before making a request that requires it. A
326
        :exc:`MissingCapabilityError` will be raised if the capability is not
327
        available."""
328
        if capability not in self._session.server_capabilities:
329
            raise MissingCapabilityError('Server does not support [%s]' % cap)
330

    
331
    def deliver_reply(self, raw):
332
        # internal use
333
        self._reply = self.REPLY_CLS(raw)
334
        self._delivery_hook()
335
        self._event.set()
336

    
337
    def deliver_error(self, err):
338
        # internal use
339
        self._error = err
340
        self._delivery_hook()
341
        self._event.set()
342

    
343
    @property
344
    def reply(self):
345
        ":class:`RPCReply` element if reply has been received or :const:`None`"
346
        return self._reply
347

    
348
    @property
349
    def error(self):
350
        """:exc:`Exception` type if an error occured or :const:`None`.
351

352
        This attribute should be checked if the request was made asynchronously,
353
        so that it can be determined if :attr:`event` being set is because of a
354
        reply or error.
355

356
        .. note::
357
            This represents an error which prevented a reply from being
358
            received. An *<rpc-error>* does not fall in that category -- see
359
            :class:`RPCReply` for that.
360
        """
361
        return self._error
362

    
363
    @property
364
    def id(self):
365
        "The *message-id* for this RPC"
366
        return self._id
367

    
368
    @property
369
    def session(self):
370
        """The :class:`~ncclient.transport.Session` object associated with this
371
        RPC"""
372
        return self._session
373

    
374
    @property
375
    def event(self):
376
        """:class:`~threading.Event` that is set when reply has been received or
377
        error occured."""
378
        return self._event
379

    
380
    def set_async(self, async=True):
381
        """Set asynchronous mode for this RPC."""
382
        self._async = async
383
        if async and not session.can_pipeline:
384
            raise UserWarning('Asynchronous mode not supported for this device/session')
385

    
386
    def set_timeout(self, timeout):
387
        """Set the timeout for synchronous waiting defining how long the RPC
388
        request will block on a reply before raising an error."""
389
        self._timeout = timeout
390

    
391
    #: Whether this RPC is asynchronous
392
    async = property(fget=lambda self: self._async, fset=set_async)
393

    
394
    #: Timeout for synchronous waiting
395
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)