Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 216bb34c

History | View | Annotate | Download (12.5 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20
from ncclient.transport import SessionListener
21

    
22
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
23

    
24
import logging
25
logger = logging.getLogger('ncclient.operations.rpc')
26

    
27

    
28
class RPCReply:
29

    
30
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
31
    operation was successful.
32

33
    .. note::
34
        If the reply has not yet been parsed there is an implicit, one-time
35
        parsing overhead to accessing the attributes defined by this class and
36
        any subclasses.
37
    """
38

    
39
    def __init__(self, raw):
40
        self._raw = raw
41
        self._parsed = False
42
        self._root = None
43
        self._errors = []
44

    
45
    def __repr__(self):
46
        return self._raw
47

    
48
    def _parsing_hook(self, root):
49
        """Subclass can implement.
50

51
        :type root: :class:`~xml.etree.ElementTree.Element`
52
        """
53
        pass
54

    
55
    def parse(self):
56
        """Parse the *<rpc-reply>*"""
57
        if self._parsed:
58
            return
59
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
60
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
61
        ok = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
62
        if ok is not None:
63
            logger.debug('parsed [%s]' % ok.tag)
64
        else: # create RPCError objects from <rpc-error> elements
65
            error = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
66
            if error is not None:
67
                logger.debug('parsed [%s]' % error.tag)
68
                for err in root.getiterator(error.tag):
69
                    # process a particular <rpc-error>
70
                    d = {}
71
                    for err_detail in err.getchildren(): # <error-type> etc..
72
                        tag = content.unqualify(err_detail.tag)
73
                        if tag != 'error-info':
74
                            d[tag] = err_detail.text.strip()
75
                        else:
76
                            d[tag] = content.ele2xml(err_detail)
77
                    self._errors.append(RPCError(d))
78
        self._parsing_hook(root)
79
        self._parsed = True
80

    
81
    @property
82
    def xml(self):
83
        "*<rpc-reply>* as returned"
84
        return self._raw
85

    
86
    @property
87
    def ok(self):
88
        "Boolean value indicating if there were no errors."
89
        if not self._parsed:
90
            self.parse()
91
        return not self._errors # empty list => false
92

    
93
    @property
94
    def error(self):
95
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
96
        """
97
        if not self._parsed:
98
            self.parse()
99
        if self._errors:
100
            return self._errors[0]
101
        else:
102
            return None
103

    
104
    @property
105
    def errors(self):
106
        """`list` of :class:`RPCError` objects. Will be empty if there were no
107
        *<rpc-error>* elements in reply.
108
        """
109
        if not self._parsed:
110
            self.parse()
111
        return self._errors
112

    
113

    
114
class RPCError(OperationError): # raise it if you like
115

    
116
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
117
    so it can be raised like any other exception."""
118

    
119
    def __init__(self, err_dict):
120
        self._dict = err_dict
121
        if self.message is not None:
122
            OperationError.__init__(self, self.message)
123
        else:
124
            OperationError.__init__(self)
125

    
126
    @property
127
    def type(self):
128
        "`string` represeting *error-type* element"
129
        return self.get('error-type', None)
130

    
131
    @property
132
    def severity(self):
133
        "`string` represeting *error-severity* element"
134
        return self.get('error-severity', None)
135

    
136
    @property
137
    def tag(self):
138
        "`string` represeting *error-tag* element"
139
        return self.get('error-tag', None)
140

    
141
    @property
142
    def path(self):
143
        "`string` or :const:`None`; represeting *error-path* element"
144
        return self.get('error-path', None)
145

    
146
    @property
147
    def message(self):
148
        "`string` or :const:`None`; represeting *error-message* element"
149
        return self.get('error-message', None)
150

    
151
    @property
152
    def info(self):
153
        "`string` or :const:`None`, represeting *error-info* element"
154
        return self.get('error-info', None)
155

    
156
    ## dictionary interface
157

    
158
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
159

    
160
    __iter__ = lambda self: self._dict.__iter__()
161

    
162
    __contains__ = lambda self, key: self._dict.__contains__(key)
163

    
164
    keys = lambda self: self._dict.keys()
165

    
166
    get = lambda self, key, default: self._dict.get(key, default)
167

    
168
    iteritems = lambda self: self._dict.iteritems()
169

    
170
    iterkeys = lambda self: self._dict.iterkeys()
171

    
172
    itervalues = lambda self: self._dict.itervalues()
173

    
174
    values = lambda self: self._dict.values()
175

    
176
    items = lambda self: self._dict.items()
177

    
178
    __repr__ = lambda self: repr(self._dict)
179

    
180

    
181
class RPCReplyListener(SessionListener):
182

    
183
    # internal use
184

    
185
    # one instance per session
186
    def __new__(cls, session):
187
        instance = session.get_listener_instance(cls)
188
        if instance is None:
189
            instance = object.__new__(cls)
190
            instance._lock = Lock()
191
            instance._id2rpc = WeakValueDictionary()
192
            instance._pipelined = session.can_pipeline
193
            session.add_listener(instance)
194
        return instance
195

    
196
    def register(self, id, rpc):
197
        with self._lock:
198
            self._id2rpc[id] = rpc
199

    
200
    def callback(self, root, raw):
201
        tag, attrs = root
202
        if content.unqualify(tag) != 'rpc-reply':
203
            return
204
        rpc = None
205
        for key in attrs:
206
            if content.unqualify(key) == 'message-id':
207
                id = attrs[key]
208
                try:
209
                    with self._lock:
210
                        rpc = self._id2rpc.pop(id)
211
                except KeyError:
212
                    logger.warning('no object registered for message-id: [%s]' % id)
213
                except Exception as e:
214
                    logger.debug('error - %r' % e)
215
                break
216
        else:
217
            if not self._pipelined:
218
                with self._lock:
219
                    assert(len(self._id2rpc) == 1)
220
                    rpc = self._id2rpc.values()[0]
221
                    self._id2rpc.clear()
222
            else:
223
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
224
        logger.debug('delivering to %r' % rpc)
225
        rpc.deliver_reply(raw)
226

    
227
    def errback(self, err):
228
        for rpc in self._id2rpc.values():
229
            rpc.deliver_error(err)
230

    
231

    
232
class RPC(object):
233

    
234
    """Base class for all operations.
235

236
    Directly corresponds to *<rpc>* requests. Handles making the request, and
237
    taking delivery of the reply.
238
    """
239

    
240
    # : Subclasses can specify their dependencies on capabilities. List of URI's
241
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
242
    # time of object creation. If the capability is not available, a
243
    # :exc:`MissingCapabilityError` is raised.
244
    DEPENDS = []
245

    
246
    # : Subclasses can specify a different reply class, but it must be a
247
    # subclass of :class:`RPCReply`.
248
    REPLY_CLS = RPCReply
249

    
250
    def __init__(self, session, async=False, timeout=None):
251
        self._session = session
252
        try:
253
            for cap in self.DEPENDS:
254
                self._assert(cap)
255
        except AttributeError:
256
            pass
257
        self._async = async
258
        self._timeout = timeout
259
        # keeps things simple instead of having a class attr that has to be locked
260
        self._id = uuid1().urn
261
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
262
        self._listener = RPCReplyListener(session)
263
        self._listener.register(self._id, self)
264
        self._reply = None
265
        self._error = None
266
        self._event = Event()
267

    
268
    def _build(self, opspec):
269
        # internal
270
        spec = {
271
            'tag': content.qualify('rpc'),
272
            'attrib': {'message-id': self._id},
273
            'subtree': [ opspec ]
274
            }
275
        return content.dtree2xml(spec)
276

    
277
    def _request(self, op):
278
        """Subclasses call this method to make the RPC request.
279

280
        In asynchronous mode, returns an :class:`~threading.Event` which is set
281
        when the reply has been received or an error occured. It is prudent,
282
        therefore, to check the :attr:`error` attribute before accesing
283
        :attr:`reply`.
284

285
        Otherwise, waits until the reply is received and returns
286
        :class:`RPCReply`.
287

288
        :arg opspec: :ref:`dtree` for the operation
289
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
290
        :rtype: :class:`~threading.Event` or :class:`RPCReply`
291
        """
292
        req = self._build(op)
293
        self._session.send(req)
294
        if self._async:
295
            return self._event
296
        else:
297
            self._event.wait(self._timeout)
298
            if self._event.isSet():
299
                if self._error:
300
                    raise self._error
301
                self._reply.parse()
302
                return self._reply
303
            else:
304
                raise TimeoutExpiredError
305

    
306
    def request(self, *args, **kwds):
307
        """Subclasses implement this method. Here, the operation is constructed
308
        in :ref:`dtree`, and the result of :meth:`_request` returned."""
309
        raise NotImplementedError
310

    
311
    def _delivery_hook(self):
312
        """Subclasses can implement this method. Will be called after
313
        initialising the :attr:`reply` or :attr:`error` attribute and before
314
        setting the :attr:`event`"""
315
        pass
316

    
317
    def _assert(self, capability):
318
        """Subclasses can use this method to verify that a capability is available
319
        with the NETCONF server, before making a request that requires it. A
320
        :exc:`MissingCapabilityError` will be raised if the capability is not
321
        available."""
322
        if capability not in self._session.server_capabilities:
323
            raise MissingCapabilityError('Server does not support [%s]' % cap)
324

    
325
    def deliver_reply(self, raw):
326
        # internal use
327
        self._reply = self.REPLY_CLS(raw)
328
        self._delivery_hook()
329
        self._event.set()
330

    
331
    def deliver_error(self, err):
332
        # internal use
333
        self._error = err
334
        self._delivery_hook()
335
        self._event.set()
336

    
337
    @property
338
    def reply(self):
339
        ":class:`RPCReply` element if reply has been received or :const:`None`"
340
        return self._reply
341

    
342
    @property
343
    def error(self):
344
        """:exc:`Exception` type if an error occured or :const:`None`.
345

346
        This attribute should be checked if the request was made asynchronously,
347
        so that it can be determined if :attr:`event` being set is because of a
348
        reply or error.
349

350
        .. note::
351
            This represents an error which prevented a reply from being
352
            received. An *<rpc-error>* does not fall in that category -- see
353
            :class:`RPCReply` for that.
354
        """
355
        return self._error
356

    
357
    @property
358
    def id(self):
359
        "The *message-id* for this RPC"
360
        return self._id
361

    
362
    @property
363
    def session(self):
364
        """The :class:`~ncclient.transport.Session` object associated with this
365
        RPC"""
366
        return self._session
367

    
368
    @property
369
    def event(self):
370
        """:class:`~threading.Event` that is set when reply has been received or
371
        error occured."""
372
        return self._event
373

    
374
    def set_async(self, async=True):
375
        """Set asynchronous mode for this RPC."""
376
        self._async = async
377
        if async and not session.can_pipeline:
378
            raise UserWarning('Asynchronous mode not supported for this device/session')
379

    
380
    def set_timeout(self, timeout):
381
        """Set the timeout for synchronous waiting defining how long the RPC
382
        request will block on a reply before raising an error."""
383
        self._timeout = timeout
384

    
385
    #: Whether this RPC is asynchronous
386
    async = property(fget=lambda self: self._async, fset=set_async)
387

    
388
    #: Timeout for synchronous waiting
389
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)