Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 0b7d3b31

History | View | Annotate | Download (12.7 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20
from ncclient.transport import SessionListener
21

    
22
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
23

    
24
import logging
25
logger = logging.getLogger('ncclient.operations.rpc')
26

    
27

    
28
class RPCReply:
29

    
30
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
31
    operation was successful.
32

33
    .. note::
34
        If the reply has not yet been parsed there is an implicit, one-time
35
        parsing overhead to accessing the attributes defined by this class and
36
        any subclasses.
37
    """
38

    
39
    def __init__(self, raw):
40
        self._raw = raw
41
        self._parsed = False
42
        self._root = None
43
        self._errors = []
44

    
45
    def __repr__(self):
46
        return self._raw
47

    
48
    def _parsing_hook(self, root):
49
        """Subclass can implement.
50

51
        :type root: :class:`~xml.etree.ElementTree.Element`
52
        """
53
        pass
54

    
55
    def parse(self):
56
        """Parse the *<rpc-reply>*"""
57
        if self._parsed:
58
            return
59
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
60
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
61
        ok = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
62
        if ok is not None:
63
            logger.debug('parsed [%s]' % ok.tag)
64
        else: # create RPCError objects from <rpc-error> elements
65
            error = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
66
            if error is not None:
67
                logger.debug('parsed [%s]' % error.tag)
68
                for err in root.getiterator(error.tag):
69
                    # process a particular <rpc-error>
70
                    d = {}
71
                    for err_detail in err.getchildren(): # <error-type> etc..
72
                        tag = content.unqualify(err_detail.tag)
73
                        if tag != 'error-info':
74
                            d[tag] = err_detail.text.strip()
75
                        else:
76
                            d[tag] = content.ele2xml(err_detail)
77
                    self._errors.append(RPCError(d))
78
        self._parsing_hook(root)
79
        self._parsed = True
80

    
81
    @property
82
    def xml(self):
83
        "*<rpc-reply>* as returned"
84
        return self._raw
85

    
86
    @property
87
    def ok(self):
88
        "Boolean value indicating if there were no errors."
89
        if not self._parsed:
90
            self.parse()
91
        return not self._errors # empty list => false
92

    
93
    @property
94
    def error(self):
95
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
96
        """
97
        if not self._parsed:
98
            self.parse()
99
        if self._errors:
100
            return self._errors[0]
101
        else:
102
            return None
103

    
104
    @property
105
    def errors(self):
106
        """`list` of :class:`RPCError` objects. Will be empty if there were no
107
        *<rpc-error>* elements in reply.
108
        """
109
        if not self._parsed:
110
            self.parse()
111
        return self._errors
112

    
113

    
114
class RPCError(OperationError): # raise it if you like
115

    
116
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
117
    so it can be raised like any other exception."""
118

    
119
    def __init__(self, err_dict):
120
        self._dict = err_dict
121
        if self.message is not None:
122
            OperationError.__init__(self, self.message)
123
        else:
124
            OperationError.__init__(self)
125

    
126
    @property
127
    def type(self):
128
        "`string` represeting *error-type* element"
129
        return self.get('error-type', None)
130

    
131
    @property
132
    def severity(self):
133
        "`string` represeting *error-severity* element"
134
        return self.get('error-severity', None)
135

    
136
    @property
137
    def tag(self):
138
        "`string` represeting *error-tag* element"
139
        return self.get('error-tag', None)
140

    
141
    @property
142
    def path(self):
143
        "`string` or :const:`None`; represeting *error-path* element"
144
        return self.get('error-path', None)
145

    
146
    @property
147
    def message(self):
148
        "`string` or :const:`None`; represeting *error-message* element"
149
        return self.get('error-message', None)
150

    
151
    @property
152
    def info(self):
153
        "`string` or :const:`None`, represeting *error-info* element"
154
        return self.get('error-info', None)
155

    
156
    ## dictionary interface
157

    
158
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
159

    
160
    __iter__ = lambda self: self._dict.__iter__()
161

    
162
    __contains__ = lambda self, key: self._dict.__contains__(key)
163

    
164
    keys = lambda self: self._dict.keys()
165

    
166
    get = lambda self, key, default: self._dict.get(key, default)
167

    
168
    iteritems = lambda self: self._dict.iteritems()
169

    
170
    iterkeys = lambda self: self._dict.iterkeys()
171

    
172
    itervalues = lambda self: self._dict.itervalues()
173

    
174
    values = lambda self: self._dict.values()
175

    
176
    items = lambda self: self._dict.items()
177

    
178
    __repr__ = lambda self: repr(self._dict)
179

    
180

    
181
class RPCReplyListener(SessionListener):
182

    
183
    # internal use
184

    
185
    # one instance per session
186
    def __new__(cls, session):
187
        instance = session.get_listener_instance(cls)
188
        if instance is None:
189
            instance = object.__new__(cls)
190
            instance._lock = Lock()
191
            instance._id2rpc = WeakValueDictionary()
192
            instance._pipelined = session.can_pipeline
193
            session.add_listener(instance)
194
        return instance
195

    
196
    def register(self, id, rpc):
197
        with self._lock:
198
            self._id2rpc[id] = rpc
199

    
200
    def callback(self, root, raw):
201
        tag, attrs = root
202
        if content.unqualify(tag) != 'rpc-reply':
203
            return
204
        rpc = None
205
        for key in attrs:
206
            if content.unqualify(key) == 'message-id':
207
                id = attrs[key]
208
                try:
209
                    with self._lock:
210
                        rpc = self._id2rpc.pop(id)
211
                except KeyError:
212
                    logger.warning('no object registered for message-id: [%s]' % id)
213
                except Exception as e:
214
                    logger.debug('error - %r' % e)
215
                break
216
        else:
217
            if not self._pipelined:
218
                with self._lock:
219
                    assert(len(self._id2rpc) == 1)
220
                    rpc = self._id2rpc.values()[0]
221
                    self._id2rpc.clear()
222
            else:
223
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
224
        logger.debug('delivering to %r' % rpc)
225
        rpc.deliver_reply(raw)
226

    
227
    def errback(self, err):
228
        for rpc in self._id2rpc.values():
229
            rpc.deliver_error(err)
230

    
231

    
232
class RPC(object):
233

    
234
    """Base class for all operations.
235

236
    Directly corresponds to *<rpc>* requests. Handles making the request, and
237
    taking delivery of the reply.
238
    """
239

    
240
    # : Subclasses can specify their dependencies on capabilities. List of URI's
241
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
242
    # time of object creation. If the capability is not available, a
243
    # :exc:`MissingCapabilityError` is raised.
244
    DEPENDS = []
245

    
246
    # : Subclasses can specify a different reply class, but it must be a
247
    # subclass of :class:`RPCReply`.
248
    REPLY_CLS = RPCReply
249

    
250
    def __init__(self, session, async=False, timeout=None):
251
        self._session = session
252
        try:
253
            for cap in self.DEPENDS:
254
                self._assert(cap)
255
        except AttributeError:
256
            pass
257
        self._async = async
258
        self._timeout = timeout
259
        # keeps things simple instead of having a class attr that has to be locked
260
        self._id = uuid1().urn
261
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
262
        self._listener = RPCReplyListener(session)
263
        self._listener.register(self._id, self)
264
        self._reply = None
265
        self._error = None
266
        self._event = Event()
267

    
268
    def _build(self, opspec):
269
        # internal
270
        spec = {
271
            'tag': content.qualify('rpc'),
272
            'attrib': {'message-id': self._id},
273
            'subtree': [ opspec ]
274
            }
275
        return content.dtree2xml(spec)
276

    
277
    def _request(self, op):
278
        """Subclasses call this method to make the RPC request.
279

280
        In asynchronous mode, returns an :class:`~threading.Event` which is set
281
        when the reply has been received or an error occured. It is prudent,
282
        therefore, to check the :attr:`error` attribute before accesing
283
        :attr:`reply`.
284

285
        Otherwise, waits until the reply is received and returns
286
        :class:`RPCReply`.
287

288
        :arg opspec: :ref:`dtree` for the operation
289
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
290
        :rtype: :class:`~threading.Event` or :class:`RPCReply`
291
        """
292
        logger.debug('request %r with opsepc=%r' % (self, op))
293
        req = self._build(op)
294
        self.session.send(req)
295
        if self.async:
296
            logger.debug('async, returning event')
297
            return self.event
298
        else:
299
            logger.debug('sync, will wait for timeout=%r' % self.timeout)
300
            self.event.wait(self.timeout)
301
            if self.event.isSet():
302
                if self.error:
303
                    raise self._error
304
                self.reply.parse()
305
                return self.reply
306
            else:
307
                raise TimeoutExpiredError
308

    
309
    def request(self, *args, **kwds):
310
        """Subclasses implement this method. Here, the operation is constructed
311
        in :ref:`dtree`, and the result of :meth:`_request` returned."""
312
        raise NotImplementedError
313

    
314
    def _delivery_hook(self):
315
        """Subclasses can implement this method. Will be called after
316
        initialising the :attr:`reply` or :attr:`error` attribute and before
317
        setting the :attr:`event`"""
318
        pass
319

    
320
    def _assert(self, capability):
321
        """Subclasses can use this method to verify that a capability is available
322
        with the NETCONF server, before making a request that requires it. A
323
        :exc:`MissingCapabilityError` will be raised if the capability is not
324
        available."""
325
        if capability not in self._session.server_capabilities:
326
            raise MissingCapabilityError('Server does not support [%s]' % cap)
327

    
328
    def deliver_reply(self, raw):
329
        # internal use
330
        self._reply = self.REPLY_CLS(raw)
331
        self._delivery_hook()
332
        self._event.set()
333

    
334
    def deliver_error(self, err):
335
        # internal use
336
        self._error = err
337
        self._delivery_hook()
338
        self._event.set()
339

    
340
    @property
341
    def reply(self):
342
        ":class:`RPCReply` element if reply has been received or :const:`None`"
343
        return self._reply
344

    
345
    @property
346
    def error(self):
347
        """:exc:`Exception` type if an error occured or :const:`None`.
348

349
        This attribute should be checked if the request was made asynchronously,
350
        so that it can be determined if :attr:`event` being set is because of a
351
        reply or error.
352

353
        .. note::
354
            This represents an error which prevented a reply from being
355
            received. An *<rpc-error>* does not fall in that category -- see
356
            :class:`RPCReply` for that.
357
        """
358
        return self._error
359

    
360
    @property
361
    def id(self):
362
        "The *message-id* for this RPC"
363
        return self._id
364

    
365
    @property
366
    def session(self):
367
        """The :class:`~ncclient.transport.Session` object associated with this
368
        RPC"""
369
        return self._session
370

    
371
    @property
372
    def event(self):
373
        """:class:`~threading.Event` that is set when reply has been received or
374
        error occured."""
375
        return self._event
376

    
377
    def set_async(self, async=True):
378
        """Set asynchronous mode for this RPC."""
379
        self._async = async
380
        if async and not session.can_pipeline:
381
            raise UserWarning('Asynchronous mode not supported for this device/session')
382

    
383
    def set_timeout(self, timeout):
384
        """Set the timeout for synchronous waiting defining how long the RPC
385
        request will block on a reply before raising an error."""
386
        self._timeout = timeout
387

    
388
    #: Whether this RPC is asynchronous
389
    async = property(fget=lambda self: self._async, fset=set_async)
390

    
391
    #: Timeout for synchronous waiting
392
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)