Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 6a2dfeb4

History | View | Annotate | Download (12.8 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17

    
18
from ncclient import content
19
from ncclient.transport import SessionListener
20

    
21
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.operations.rpc')
25

    
26

    
27
class RPCReply:
28

    
29
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
30
    operation was successful.
31

32
    .. note::
33
        If the reply has not yet been parsed there is an implicit, one-time
34
        parsing overhead to accessing the attributes defined by this class and
35
        any subclasses.
36
    """
37

    
38
    def __init__(self, raw):
39
        self._raw = raw
40
        self._parsed = False
41
        self._root = None
42
        self._errors = []
43

    
44
    def __repr__(self):
45
        return self._raw
46

    
47
    def _parsing_hook(self, root):
48
        """Subclass can implement.
49

50
        :type root: :class:`~xml.etree.ElementTree.Element`
51
        """
52
        pass
53

    
54
    def parse(self):
55
        """Parse the *<rpc-reply>*"""
56
        if self._parsed:
57
            return
58
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
59
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
60
        ok = content.find(root, 'ok', nslist=[content.BASE_NS, content.CISCO_BS])
61
        if ok is not None:
62
            logger.debug('parsed [%s]' % ok.tag)
63
        else: # create RPCError objects from <rpc-error> elements
64
            error = content.find(root, 'rpc-error', nslist=[content.BASE_NS, content.CISCO_BS])
65
            if error is not None:
66
                logger.debug('parsed [%s]' % error.tag)
67
                for err in root.getiterator(error.tag):
68
                    # process a particular <rpc-error>
69
                    d = {}
70
                    for err_detail in err.getchildren(): # <error-type> etc..
71
                        tag = content.unqualify(err_detail.tag)
72
                        if tag != 'error-info':
73
                            d[tag] = err_detail.text.strip()
74
                        else:
75
                            d[tag] = content.ele2xml(err_detail)
76
                    self._errors.append(RPCError(d))
77
        self._parsing_hook(root)
78
        self._parsed = True
79

    
80
    @property
81
    def xml(self):
82
        "*<rpc-reply>* as returned"
83
        return self._raw
84

    
85
    @property
86
    def ok(self):
87
        "Boolean value indicating if there were no errors."
88
        if not self._parsed:
89
            self.parse()
90
        return not self._errors # empty list => false
91

    
92
    @property
93
    def error(self):
94
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
95
        """
96
        if not self._parsed:
97
            self.parse()
98
        if self._errors:
99
            return self._errors[0]
100
        else:
101
            return None
102

    
103
    @property
104
    def errors(self):
105
        """`list` of :class:`RPCError` objects. Will be empty if there were no
106
        *<rpc-error>* elements in reply.
107
        """
108
        if not self._parsed:
109
            self.parse()
110
        return self._errors
111

    
112

    
113
class RPCError(OperationError): # raise it if you like
114

    
115
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
116
    so it can be raised like any other exception."""
117

    
118
    def __init__(self, err_dict):
119
        self._dict = err_dict
120
        if self.message is not None:
121
            OperationError.__init__(self, self.message)
122
        else:
123
            OperationError.__init__(self)
124

    
125
    @property
126
    def type(self):
127
        "`string` represeting *error-type* element"
128
        return self.get('error-type', None)
129

    
130
    @property
131
    def severity(self):
132
        "`string` represeting *error-severity* element"
133
        return self.get('error-severity', None)
134

    
135
    @property
136
    def tag(self):
137
        "`string` represeting *error-tag* element"
138
        return self.get('error-tag', None)
139

    
140
    @property
141
    def path(self):
142
        "`string` or :const:`None`; represeting *error-path* element"
143
        return self.get('error-path', None)
144

    
145
    @property
146
    def message(self):
147
        "`string` or :const:`None`; represeting *error-message* element"
148
        return self.get('error-message', None)
149

    
150
    @property
151
    def info(self):
152
        "`string` or :const:`None`, represeting *error-info* element"
153
        return self.get('error-info', None)
154

    
155
    ## dictionary interface
156

    
157
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
158

    
159
    __iter__ = lambda self: self._dict.__iter__()
160

    
161
    __contains__ = lambda self, key: self._dict.__contains__(key)
162

    
163
    keys = lambda self: self._dict.keys()
164

    
165
    get = lambda self, key, default: self._dict.get(key, default)
166

    
167
    iteritems = lambda self: self._dict.iteritems()
168

    
169
    iterkeys = lambda self: self._dict.iterkeys()
170

    
171
    itervalues = lambda self: self._dict.itervalues()
172

    
173
    values = lambda self: self._dict.values()
174

    
175
    items = lambda self: self._dict.items()
176

    
177
    __repr__ = lambda self: repr(self._dict)
178

    
179

    
180
class RPCReplyListener(SessionListener):
181

    
182
    # internal use
183

    
184
    # one instance per session
185
    def __new__(cls, session):
186
        instance = session.get_listener_instance(cls)
187
        if instance is None:
188
            instance = object.__new__(cls)
189
            instance._lock = Lock()
190
            instance._id2rpc = {}
191
            instance._pipelined = session.can_pipeline
192
            session.add_listener(instance)
193
        return instance
194

    
195
    def register(self, id, rpc):
196
        with self._lock:
197
            self._id2rpc[id] = rpc
198

    
199
    def callback(self, root, raw):
200
        tag, attrs = root
201
        if content.unqualify(tag) != 'rpc-reply':
202
            return
203
        rpc = None
204
        for key in attrs:
205
            if content.unqualify(key) == 'message-id':
206
                id = attrs[key]
207
                try:
208
                    with self._lock:
209
                        rpc = self._id2rpc.pop(id)
210
                except KeyError:
211
                    logger.warning('no object registered for message-id: [%s]' % id)
212
                except Exception as e:
213
                    logger.debug('error - %r' % e)
214
                break
215
        else:
216
            if not self._pipelined:
217
                with self._lock:
218
                    assert(len(self._id2rpc) == 1)
219
                    rpc = self._id2rpc.values()[0]
220
                    self._id2rpc.clear()
221
            else:
222
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
223
        logger.debug('delivering to %r' % rpc)
224
        rpc.deliver_reply(raw)
225

    
226
    def errback(self, err):
227
        try:
228
            for rpc in self._id2rpc.values():
229
                rpc.deliver_error(err)
230
        finally:
231
            self._id2rpc.clear()
232

    
233

    
234
class RPC(object):
235

    
236
    """Base class for all operations.
237

238
    Directly corresponds to *<rpc>* requests. Handles making the request, and
239
    taking delivery of the reply.
240
    """
241

    
242
    # : Subclasses can specify their dependencies on capabilities. List of URI's
243
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
244
    # time of object creation. If the capability is not available, a
245
    # :exc:`MissingCapabilityError` is raised.
246
    DEPENDS = []
247

    
248
    # : Subclasses can specify a different reply class, but it must be a
249
    # subclass of :class:`RPCReply`.
250
    REPLY_CLS = RPCReply
251

    
252
    def __init__(self, session, async=False, timeout=None):
253
        self._session = session
254
        try:
255
            for cap in self.DEPENDS:
256
                self._assert(cap)
257
        except AttributeError:
258
            pass
259
        self._async = async
260
        self._timeout = timeout
261
        # keeps things simple instead of having a class attr that has to be locked
262
        self._id = uuid1().urn
263
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
264
        self._listener = RPCReplyListener(session)
265
        self._listener.register(self._id, self)
266
        self._reply = None
267
        self._error = None
268
        self._event = Event()
269

    
270
    def _build(self, opspec):
271
        # internal
272
        spec = {
273
            'tag': 'rpc',
274
            'attrib': {
275
                'xmlns': content.BASE_NS,
276
                'message-id': self._id
277
                },
278
            'subtree': [ opspec ]
279
            }
280
        return content.dtree2xml(spec)
281

    
282
    def _request(self, op):
283
        """Subclasses call this method to make the RPC request.
284

285
        In asynchronous mode, returns an :class:`~threading.Event` which is set
286
        when the reply has been received or an error occured. It is prudent,
287
        therefore, to check the :attr:`error` attribute before accesing
288
        :attr:`reply`.
289

290
        Otherwise, waits until the reply is received and returns
291
        :class:`RPCReply`.
292

293
        :arg opspec: :ref:`dtree` for the operation
294
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
295
        :rtype: :class:`~threading.Event` or :class:`RPCReply`
296
        """
297
        logger.debug('request %r with opsepc=%r' % (self, op))
298
        req = self._build(op)
299
        self._session.send(req)
300
        if self._async:
301
            logger.debug('async, returning event')
302
            return self._event
303
        else:
304
            logger.debug('sync, will wait for timeout=%r' % self._timeout)
305
            self._event.wait(self._timeout)
306
            if self._event.isSet():
307
                if self._error:
308
                    raise self._error
309
                self._reply.parse()
310
                return self._reply
311
            else:
312
                raise TimeoutExpiredError
313

    
314
    def request(self, *args, **kwds):
315
        """Subclasses implement this method. Here, the operation is constructed
316
        in :ref:`dtree`, and the result of :meth:`_request` returned."""
317
        raise NotImplementedError
318

    
319
    def _delivery_hook(self):
320
        """Subclasses can implement this method. Will be called after
321
        initialising the :attr:`reply` or :attr:`error` attribute and before
322
        setting the :attr:`event`"""
323
        pass
324

    
325
    def _assert(self, capability):
326
        """Subclasses can use this method to verify that a capability is available
327
        with the NETCONF server, before making a request that requires it. A
328
        :exc:`MissingCapabilityError` will be raised if the capability is not
329
        available."""
330
        if capability not in self._session.server_capabilities:
331
            raise MissingCapabilityError('Server does not support [%s]' % cap)
332

    
333
    def deliver_reply(self, raw):
334
        # internal use
335
        self._reply = self.REPLY_CLS(raw)
336
        self._delivery_hook()
337
        self._event.set()
338

    
339
    def deliver_error(self, err):
340
        # internal use
341
        self._error = err
342
        self._delivery_hook()
343
        self._event.set()
344

    
345
    @property
346
    def reply(self):
347
        ":class:`RPCReply` element if reply has been received or :const:`None`"
348
        return self._reply
349

    
350
    @property
351
    def error(self):
352
        """:exc:`Exception` type if an error occured or :const:`None`.
353

354
        This attribute should be checked if the request was made asynchronously,
355
        so that it can be determined if :attr:`event` being set is because of a
356
        reply or error.
357

358
        .. note::
359
            This represents an error which prevented a reply from being
360
            received. An *<rpc-error>* does not fall in that category -- see
361
            :class:`RPCReply` for that.
362
        """
363
        return self._error
364

    
365
    @property
366
    def id(self):
367
        "The *message-id* for this RPC"
368
        return self._id
369

    
370
    @property
371
    def session(self):
372
        """The :class:`~ncclient.transport.Session` object associated with this
373
        RPC"""
374
        return self._session
375

    
376
    @property
377
    def event(self):
378
        """:class:`~threading.Event` that is set when reply has been received or
379
        error occured."""
380
        return self._event
381

    
382
    def set_async(self, async=True):
383
        """Set asynchronous mode for this RPC."""
384
        self._async = async
385
        if async and not session.can_pipeline:
386
            raise UserWarning('Asynchronous mode not supported for this device/session')
387

    
388
    def set_timeout(self, timeout):
389
        """Set the timeout for synchronous waiting defining how long the RPC
390
        request will block on a reply before raising an error."""
391
        self._timeout = timeout
392

    
393
    #: Whether this RPC is asynchronous
394
    async = property(fget=lambda self: self._async, fset=set_async)
395

    
396
    #: Timeout for synchronous waiting
397
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)