Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 0304f041

History | View | Annotate | Download (12.9 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17

    
18
from ncclient import xml_
19
from ncclient.transport import SessionListener
20

    
21
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.operations.rpc')
25

    
26

    
27
class RPCReply:
28

    
29
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
30
    operation was successful.
31

32
    .. note::
33
        If the reply has not yet been parsed there is an implicit, one-time
34
        parsing overhead to accessing the attributes defined by this class and
35
        any subclasses.
36
    """
37

    
38
    def __init__(self, raw):
39
        self._raw = raw
40
        self._parsed = False
41
        self._root = None
42
        self._errors = []
43

    
44
    def __repr__(self):
45
        return self._raw
46

    
47
    def _parsing_hook(self, root):
48
        """Subclass can implement.
49

50
        :type root: :class:`~xml.etree.ElementTree.Element`
51
        """
52
        pass
53

    
54
    def parse(self):
55
        """Parse the *<rpc-reply>*"""
56
        if self._parsed:
57
            return
58
        root = self._root = xml_.xml2ele(self._raw) # <rpc-reply> element
59
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
60
        ok = xml_.find(root, 'ok', nslist=xml_.NSLIST)
61
        if ok is not None:
62
            logger.debug('parsed [%s]' % ok.tag)
63
        else: # create RPCError objects from <rpc-error> elements
64
            error = xml_.find(root, 'rpc-error', nslist=xml_.NSLIST)
65
            if error is not None:
66
                logger.debug('parsed [%s]' % error.tag)
67
                for err in root.getiterator(error.tag):
68
                    # process a particular <rpc-error>
69
                    d = {}
70
                    for err_detail in err.getchildren(): # <error-type> etc..
71
                        tag = xml_.unqualify(err_detail.tag)
72
                        if tag != 'error-info':
73
                            d[tag] = err_detail.text.strip()
74
                        else:
75
                            d[tag] = xml_.ele2xml(err_detail)
76
                    self._errors.append(RPCError(d))
77
        self._parsing_hook(root)
78
        self._parsed = True
79

    
80
    @property
81
    def xml(self):
82
        "*<rpc-reply>* as returned"
83
        return self._raw
84

    
85
    @property
86
    def ok(self):
87
        "Boolean value indicating if there were no errors."
88
        if not self._parsed:
89
            self.parse()
90
        return not self._errors # empty list => false
91

    
92
    @property
93
    def error(self):
94
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
95
        """
96
        if not self._parsed:
97
            self.parse()
98
        if self._errors:
99
            return self._errors[0]
100
        else:
101
            return None
102

    
103
    @property
104
    def errors(self):
105
        """`list` of :class:`RPCError` objects. Will be empty if there were no
106
        *<rpc-error>* elements in reply.
107
        """
108
        if not self._parsed:
109
            self.parse()
110
        return self._errors
111

    
112

    
113
class RPCError(OperationError): # raise it if you like
114

    
115
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
116
    and can be raised like any other exception."""
117

    
118
    def __init__(self, err_dict):
119
        self._dict = err_dict
120
        if self.message is not None:
121
            OperationError.__init__(self, self.message)
122
        else:
123
            OperationError.__init__(self)
124

    
125
    @property
126
    def type(self):
127
        "`string` representing text of *error-type* element"
128
        return self.get('error-type', None)
129

    
130
    @property
131
    def severity(self):
132
        "`string` representing text of *error-severity* element"
133
        return self.get('error-severity', None)
134

    
135
    @property
136
    def tag(self):
137
        "`string` representing text of *error-tag* element"
138
        return self.get('error-tag', None)
139

    
140
    @property
141
    def path(self):
142
        "`string` or :const:`None`; representing text of *error-path* element"
143
        return self.get('error-path', None)
144

    
145
    @property
146
    def message(self):
147
        "`string` or :const:`None`; representing text of *error-message* element"
148
        return self.get('error-message', None)
149

    
150
    @property
151
    def info(self):
152
        "`string` (XML) or :const:`None`, representing *error-info* element"
153
        return self.get('error-info', None)
154

    
155
    ## dictionary interface
156

    
157
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
158

    
159
    __iter__ = lambda self: self._dict.__iter__()
160

    
161
    __contains__ = lambda self, key: self._dict.__contains__(key)
162

    
163
    keys = lambda self: self._dict.keys()
164

    
165
    get = lambda self, key, default: self._dict.get(key, default)
166

    
167
    iteritems = lambda self: self._dict.iteritems()
168

    
169
    iterkeys = lambda self: self._dict.iterkeys()
170

    
171
    itervalues = lambda self: self._dict.itervalues()
172

    
173
    values = lambda self: self._dict.values()
174

    
175
    items = lambda self: self._dict.items()
176

    
177
    __repr__ = lambda self: repr(self._dict)
178

    
179

    
180
class RPCReplyListener(SessionListener):
181

    
182
    # internal use
183

    
184
    # one instance per session
185
    def __new__(cls, session):
186
        instance = session.get_listener_instance(cls)
187
        if instance is None:
188
            instance = object.__new__(cls)
189
            instance._lock = Lock()
190
            instance._id2rpc = {}
191
            #instance._pipelined = session.can_pipeline
192
            session.add_listener(instance)
193
        return instance
194

    
195
    def register(self, id, rpc):
196
        with self._lock:
197
            self._id2rpc[id] = rpc
198

    
199
    def callback(self, root, raw):
200
        tag, attrs = root
201
        if xml_.unqualify(tag) != 'rpc-reply':
202
            return
203
        for key in attrs: # in the <rpc-reply> attributes
204
            if xml_.unqualify(key) == 'message-id': # if we found msgid attr
205
                id = attrs[key] # get the msgid
206
                try:
207
                    with self._lock:
208
                        rpc = self._id2rpc.get(id) # the corresponding rpc
209
                        logger.debug('delivering to %r' % rpc)
210
                        rpc.deliver_reply(raw)
211
                except KeyError:
212
                    raise OperationError('Unknown message-id: %s', id)
213
                # no catching other exceptions, fail loudly if must
214
                else:
215
                    # if no error delivering, can del the reference to the RPC
216
                    del self._id2rpc[id]
217
                    break
218
        else:
219
            raise OperationError('Could not find "message-id" attribute in <rpc-reply>')
220
    
221
    def errback(self, err):
222
        try:
223
            for rpc in self._id2rpc.values():
224
                rpc.deliver_error(err)
225
        finally:
226
            self._id2rpc.clear()
227

    
228

    
229
class RPC(object):
230

    
231
    """Base class for all operations.
232

233
    Directly corresponds to *<rpc>* requests. Handles making the request, and
234
    taking delivery of the reply.
235
    """
236

    
237
    #: Subclasses can specify their dependencies on capabilities. List of URI's
238
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
239
    # time of object creation. If the capability is not available, a
240
    # :exc:`MissingCapabilityError` is raised.
241
    DEPENDS = []
242

    
243
    #: Subclasses can specify a different reply class, but it must be a
244
    # subclass of :class:`RPCReply`.
245
    REPLY_CLS = RPCReply
246

    
247
    def __init__(self, session, async=False, timeout=None, raise_mode='none'):
248
        self._session = session
249
        try:
250
            for cap in self.DEPENDS:
251
                self._assert(cap)
252
        except AttributeError:
253
            pass
254
        self._async = async
255
        self._timeout = timeout
256
        self._raise_mode = raise_mode
257
        # keeps things simple instead of having a class attr that has to be locked
258
        self._id = uuid1().urn
259
        self._listener = RPCReplyListener(session)
260
        self._listener.register(self._id, self)
261
        self._reply = None
262
        self._error = None
263
        self._event = Event()
264

    
265
    def _build(self, opspec):
266
        # internal
267
        spec = {
268
            'tag': 'rpc',
269
            'attrib': {
270
                'xmlns': xml_.BASE_NS_1_0,
271
                'message-id': self._id
272
                },
273
            'subtree': [ opspec ]
274
            }
275
        return xml_.dtree2xml(spec)
276

    
277
    def _request(self, op):
278
        """Subclasses call this method to make the RPC request.
279
        
280
        In synchronous mode, waits until the reply is received and returns
281
        :class:`RPCReply`.
282
        
283
        In asynchronous mode, returns immediately, returning a reference to this
284
        object. The :attr:`event` attribute will be set when the reply has been
285
        received (see :attr:`reply`) or an error occured (see :attr:`error`).
286
        
287
        :arg opspec: :ref:`dtree` for the operation
288
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
289
        :rtype: :class:`RPCReply` (sync) or :class:`RPC` (async)
290
        """
291
        logger.debug('request %r with opsepc=%r' % (self, op))
292
        req = self._build(op)
293
        self._session.send(req)
294
        if self._async:
295
            logger.debug('async, returning')
296
            return self
297
        else:
298
            logger.debug('sync, will wait for timeout=%r' % self._timeout)
299
            self._event.wait(self._timeout)
300
            if self._event.isSet():
301
                if self._error:
302
                    # Error that prevented reply delivery
303
                    raise self._error
304
                self._reply.parse()
305
                if self._reply.error is not None:
306
                    # <rpc-error>'s [ RPCError ]
307
                    if self._raise_mode == "all":
308
                        raise self._reply._error
309
                    elif (self._raise_mode == "errors" and
310
                          self._reply.error.type == "error"):
311
                        raise self._reply.error
312
                return self._reply
313
            else:
314
                raise TimeoutExpiredError
315

    
316
    def request(self, *args, **kwds):
317
        """Subclasses implement this method. Here, the operation is constructed
318
        in :ref:`dtree`, and the result of :meth:`_request` returned."""
319
        return self._request(self.SPEC)
320
    
321
    def _assert(self, capability):
322
        """Subclasses can use this method to verify that a capability is available
323
        with the NETCONF server, before making a request that requires it. A
324
        :exc:`MissingCapabilityError` will be raised if the capability is not
325
        available."""
326
        if capability not in self._session.server_capabilities:
327
            raise MissingCapabilityError('Server does not support [%s]' %
328
                                         capability)
329
    
330
    def deliver_reply(self, raw):
331
        # internal use
332
        self._reply = self.REPLY_CLS(raw)
333
        #self._delivery_hook() -- usecase?!
334
        self._event.set()
335

    
336
    def deliver_error(self, err):
337
        # internal use
338
        self._error = err
339
        #self._delivery_hook() -- usecase?!
340
        self._event.set()
341

    
342
    @property
343
    def reply(self):
344
        ":class:`RPCReply` element if reply has been received or :const:`None`"
345
        return self._reply
346

    
347
    @property
348
    def error(self):
349
        """:exc:`Exception` type if an error occured or :const:`None`.
350
        
351
        .. note::
352
            This represents an error which prevented a reply from being
353
            received. An *<rpc-error>* does not fall in that category -- see
354
            :class:`RPCReply` for that.
355
        """
356
        return self._error
357

    
358
    @property
359
    def id(self):
360
        "The *message-id* for this RPC"
361
        return self._id
362

    
363
    @property
364
    def session(self):
365
        """The :class:`~ncclient.transport.Session` object associated with this
366
        RPC"""
367
        return self._session
368

    
369
    @property
370
    def event(self):
371
        """:class:`~threading.Event` that is set when reply has been received or
372
        error occured."""
373
        return self._event
374

    
375
    def set_async(self, async=True):
376
        """Set asynchronous mode for this RPC."""
377
        self._async = async
378
        if async and not session.can_pipeline:
379
            raise UserWarning('Asynchronous mode not supported for this device/session')
380

    
381
    def set_raise_mode(self, mode):
382
        assert(choice in ('all', 'errors', 'none'))
383
        self._raise_mode = mode
384

    
385
    def set_timeout(self, timeout):
386
        """Set the timeout for synchronous waiting defining how long the RPC
387
        request will block on a reply before raising an error."""
388
        self._timeout = timeout
389

    
390
    #: Whether this RPC is asynchronous
391
    async = property(fget=lambda self: self._async, fset=set_async)
392

    
393
    #: Timeout for synchronous waiting
394
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)