Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 6c70b245

History | View | Annotate | Download (13.1 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17

    
18
from ncclient import xml_
19
from ncclient.transport import SessionListener
20

    
21
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.operations.rpc')
25

    
26

    
27
class RPCReply:
28

    
29
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
30
    operation was successful.
31

32
    .. note::
33
        If the reply has not yet been parsed there is an implicit, one-time
34
        parsing overhead to accessing the attributes defined by this class and
35
        any subclasses.
36
    """
37

    
38
    def __init__(self, raw):
39
        self._raw = raw
40
        self._parsed = False
41
        self._root = None
42
        self._errors = []
43

    
44
    def __repr__(self):
45
        return self._raw
46

    
47
    def _parsing_hook(self, root):
48
        """Subclass can implement.
49

50
        :type root: :class:`~xml.etree.ElementTree.Element`
51
        """
52
        pass
53

    
54
    def parse(self):
55
        """Parse the *<rpc-reply>*"""
56
        if self._parsed:
57
            return
58
        root = self._root = xml_.xml2ele(self._raw) # <rpc-reply> element
59
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
60
        ok = xml_.find(root, 'ok', nslist=xml_.NSLIST)
61
        if ok is not None:
62
            logger.debug('parsed [%s]' % ok.tag)
63
        else: # create RPCError objects from <rpc-error> elements
64
            error = xml_.find(root, 'rpc-error', nslist=xml_.NSLIST)
65
            if error is not None:
66
                logger.debug('parsed [%s]' % error.tag)
67
                for err in root.getiterator(error.tag):
68
                    # process a particular <rpc-error>
69
                    d = {}
70
                    for err_detail in err.getchildren(): # <error-type> etc..
71
                        tag = xml_.unqualify(err_detail.tag)
72
                        if tag != 'error-info':
73
                            d[tag] = err_detail.text.strip()
74
                        else:
75
                            d[tag] = xml_.ele2xml(err_detail)
76
                    self._errors.append(RPCError(d))
77
        self._parsing_hook(root)
78
        self._parsed = True
79

    
80
    @property
81
    def xml(self):
82
        "*<rpc-reply>* as returned"
83
        return self._raw
84

    
85
    @property
86
    def ok(self):
87
        "Boolean value indicating if there were no errors."
88
        if not self._parsed:
89
            self.parse()
90
        return not self._errors # empty list => false
91

    
92
    @property
93
    def error(self):
94
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
95
        """
96
        if not self._parsed:
97
            self.parse()
98
        if self._errors:
99
            return self._errors[0]
100
        else:
101
            return None
102

    
103
    @property
104
    def errors(self):
105
        """`list` of :class:`RPCError` objects. Will be empty if there were no
106
        *<rpc-error>* elements in reply.
107
        """
108
        if not self._parsed:
109
            self.parse()
110
        return self._errors
111

    
112

    
113
class RPCError(OperationError): # raise it if you like
114

    
115
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
116
    and can be raised like any other exception."""
117

    
118
    def __init__(self, err_dict):
119
        self._dict = err_dict
120
        if self.message is not None:
121
            OperationError.__init__(self, self.message)
122
        else:
123
            OperationError.__init__(self)
124

    
125
    @property
126
    def type(self):
127
        "`string` representing text of *error-type* element"
128
        return self.get('error-type', None)
129

    
130
    @property
131
    def severity(self):
132
        "`string` representing text of *error-severity* element"
133
        return self.get('error-severity', None)
134

    
135
    @property
136
    def tag(self):
137
        "`string` representing text of *error-tag* element"
138
        return self.get('error-tag', None)
139

    
140
    @property
141
    def path(self):
142
        "`string` or :const:`None`; representing text of *error-path* element"
143
        return self.get('error-path', None)
144

    
145
    @property
146
    def message(self):
147
        "`string` or :const:`None`; representing text of *error-message* element"
148
        return self.get('error-message', None)
149

    
150
    @property
151
    def info(self):
152
        "`string` (XML) or :const:`None`, representing *error-info* element"
153
        return self.get('error-info', None)
154

    
155
    ## dictionary interface
156

    
157
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
158

    
159
    __iter__ = lambda self: self._dict.__iter__()
160

    
161
    __contains__ = lambda self, key: self._dict.__contains__(key)
162

    
163
    keys = lambda self: self._dict.keys()
164

    
165
    get = lambda self, key, default: self._dict.get(key, default)
166

    
167
    iteritems = lambda self: self._dict.iteritems()
168

    
169
    iterkeys = lambda self: self._dict.iterkeys()
170

    
171
    itervalues = lambda self: self._dict.itervalues()
172

    
173
    values = lambda self: self._dict.values()
174

    
175
    items = lambda self: self._dict.items()
176

    
177
    __repr__ = lambda self: repr(self._dict)
178

    
179

    
180
class RPCReplyListener(SessionListener):
181

    
182
    # internal use
183

    
184
    # one instance per session
185
    def __new__(cls, session):
186
        instance = session.get_listener_instance(cls)
187
        if instance is None:
188
            instance = object.__new__(cls)
189
            instance._lock = Lock()
190
            instance._id2rpc = {}
191
            instance._pipelined = session.can_pipeline
192
            session.add_listener(instance)
193
        return instance
194

    
195
    def register(self, id, rpc):
196
        with self._lock:
197
            self._id2rpc[id] = rpc
198

    
199
    def callback(self, root, raw):
200
        tag, attrs = root
201
        if xml_.unqualify(tag) != 'rpc-reply':
202
            return
203
        rpc = None
204
        for key in attrs:
205
            if xml_.unqualify(key) == 'message-id':
206
                id = attrs[key]
207
                try:
208
                    with self._lock:
209
                        rpc = self._id2rpc.pop(id)
210
                except KeyError:
211
                    logger.warning('no object registered for message-id: [%s]' % id)
212
                except Exception as e:
213
                    logger.debug('error - %r' % e)
214
                break
215
        else:
216
            if not self._pipelined:
217
                with self._lock:
218
                    assert(len(self._id2rpc) == 1)
219
                    rpc = self._id2rpc.values()[0]
220
                    self._id2rpc.clear()
221
            else:
222
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
223
        logger.debug('delivering to %r' % rpc)
224
        rpc.deliver_reply(raw)
225

    
226
    def errback(self, err):
227
        try:
228
            for rpc in self._id2rpc.values():
229
                rpc.deliver_error(err)
230
        finally:
231
            self._id2rpc.clear()
232

    
233

    
234
class RPC(object):
235

    
236
    """Base class for all operations.
237

238
    Directly corresponds to *<rpc>* requests. Handles making the request, and
239
    taking delivery of the reply.
240
    """
241

    
242
    #: Subclasses can specify their dependencies on capabilities. List of URI's
243
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
244
    # time of object creation. If the capability is not available, a
245
    # :exc:`MissingCapabilityError` is raised.
246
    DEPENDS = []
247

    
248
    #: Subclasses can specify a different reply class, but it must be a
249
    # subclass of :class:`RPCReply`.
250
    REPLY_CLS = RPCReply
251

    
252
    def __init__(self, session, async=False, timeout=None, raise_mode='none'):
253
        self._session = session
254
        try:
255
            for cap in self.DEPENDS:
256
                self._assert(cap)
257
        except AttributeError:
258
            pass
259
        self._async = async
260
        self._timeout = timeout
261
        self._raise_mode = raise_mode
262
        # keeps things simple instead of having a class attr that has to be locked
263
        self._id = uuid1().urn
264
        self._listener = RPCReplyListener(session)
265
        self._listener.register(self._id, self)
266
        self._reply = None
267
        self._error = None
268
        self._event = Event()
269

    
270
    def _build(self, opspec):
271
        # internal
272
        spec = {
273
            'tag': 'rpc',
274
            'attrib': {
275
                'xmlns': xml_.BASE_NS_1_0,
276
                'message-id': self._id
277
                },
278
            'subtree': [ opspec ]
279
            }
280
        return xml_.dtree2xml(spec)
281

    
282
    def _request(self, op):
283
        """Subclasses call this method to make the RPC request.
284

285
        In asynchronous mode, returns an :class:`~threading.Event` which is set
286
        when the reply has been received or an error occured. It is prudent,
287
        therefore, to check the :attr:`error` attribute before accesing
288
        :attr:`reply`.
289

290
        Otherwise, waits until the reply is received and returns
291
        :class:`RPCReply`.
292

293
        :arg opspec: :ref:`dtree` for the operation
294
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
295
        :rtype: :class:`~threading.Event` or :class:`RPCReply`
296
        """
297
        logger.debug('request %r with opsepc=%r' % (self, op))
298
        req = self._build(op)
299
        self._session.send(req)
300
        if self._async:
301
            logger.debug('async, returning event')
302
            return self._event
303
        else:
304
            logger.debug('sync, will wait for timeout=%r' % self._timeout)
305
            self._event.wait(self._timeout)
306
            if self._event.isSet():
307
                if self._error:
308
                    # Error that prevented reply delivery
309
                    raise self._error
310
                self._reply.parse()
311
                if self._reply.error is not None:
312
                    # <rpc-error>'s [ RPCError ]
313
                    if self._raise_mode == "all":
314
                        raise self._reply._error
315
                    elif (self._raise_mode == "errors" and
316
                          self._reply.error.type == "error"):
317
                        raise self._reply.error
318
                return self._reply
319
            else:
320
                raise TimeoutExpiredError
321

    
322
    def request(self, *args, **kwds):
323
        """Subclasses implement this method. Here, the operation is constructed
324
        in :ref:`dtree`, and the result of :meth:`_request` returned."""
325
        return self._request(self.SPEC)
326
    
327
    def _assert(self, capability):
328
        """Subclasses can use this method to verify that a capability is available
329
        with the NETCONF server, before making a request that requires it. A
330
        :exc:`MissingCapabilityError` will be raised if the capability is not
331
        available."""
332
        if capability not in self._session.server_capabilities:
333
            raise MissingCapabilityError('Server does not support [%s]' %
334
                                         capability)
335

    
336
    def deliver_reply(self, raw):
337
        # internal use
338
        self._reply = self.REPLY_CLS(raw)
339
        self._delivery_hook()
340
        self._event.set()
341

    
342
    def deliver_error(self, err):
343
        # internal use
344
        self._error = err
345
        self._delivery_hook()
346
        self._event.set()
347

    
348
    @property
349
    def reply(self):
350
        ":class:`RPCReply` element if reply has been received or :const:`None`"
351
        return self._reply
352

    
353
    @property
354
    def error(self):
355
        """:exc:`Exception` type if an error occured or :const:`None`.
356

357
        This attribute should be checked if the request was made asynchronously,
358
        so that it can be determined if :attr:`event` being set is because of a
359
        reply or error.
360

361
        .. note::
362
            This represents an error which prevented a reply from being
363
            received. An *<rpc-error>* does not fall in that category -- see
364
            :class:`RPCReply` for that.
365
        """
366
        return self._error
367

    
368
    @property
369
    def id(self):
370
        "The *message-id* for this RPC"
371
        return self._id
372

    
373
    @property
374
    def session(self):
375
        """The :class:`~ncclient.transport.Session` object associated with this
376
        RPC"""
377
        return self._session
378

    
379
    @property
380
    def event(self):
381
        """:class:`~threading.Event` that is set when reply has been received or
382
        error occured."""
383
        return self._event
384

    
385
    def set_async(self, async=True):
386
        """Set asynchronous mode for this RPC."""
387
        self._async = async
388
        if async and not session.can_pipeline:
389
            raise UserWarning('Asynchronous mode not supported for this device/session')
390

    
391
    def set_raise_mode(self, mode):
392
        assert(choice in ('all', 'errors', 'none'))
393
        self._raise_mode = mode
394

    
395
    def set_timeout(self, timeout):
396
        """Set the timeout for synchronous waiting defining how long the RPC
397
        request will block on a reply before raising an error."""
398
        self._timeout = timeout
399

    
400
    #: Whether this RPC is asynchronous
401
    async = property(fget=lambda self: self._async, fset=set_async)
402

    
403
    #: Timeout for synchronous waiting
404
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)