Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ a7cb58ce

History | View | Annotate | Download (12.4 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20
from ncclient.capabilities import check
21
from ncclient.transport import SessionListener
22

    
23
from errors import OperationError
24

    
25
import logging
26
logger = logging.getLogger('ncclient.operations.rpc')
27

    
28

    
29
class RPCReply:
30

    
31
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
32
    operation was successful. Note that if the reply has not yet been parsed
33
    there is a one-time parsing overhead to accessing the :attr:`ok` and
34
    :attr:`error`/:attr:`errors` attributes."""
35

    
36
    def __init__(self, raw):
37
        self._raw = raw
38
        self._parsed = False
39
        self._root = None
40
        self._errors = []
41

    
42
    def __repr__(self):
43
        return self._raw
44

    
45
    def _parsing_hook(self, root):
46
        """Subclass can implement.
47

48
        :type root: :class:`~xml.etree.ElementTree.Element`
49
        """
50
        pass
51

    
52
    def parse(self):
53
        """Parse the *<rpc-reply>*"""
54
        if self._parsed:
55
            return
56
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
57
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
58
        ok = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
59
        if ok is not None:
60
            logger.debug('parsed [%s]' % ok.tag)
61
        else: # create RPCError objects from <rpc-error> elements
62
            error = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
63
            if error is not None:
64
                logger.debug('parsed [%s]' % error.tag)
65
                for err in root.getiterator(error.tag):
66
                    # process a particular <rpc-error>
67
                    d = {}
68
                    for err_detail in err.getchildren(): # <error-type> etc..
69
                        tag = content.unqualify(err_detail.tag)
70
                        if tag != 'error-info':
71
                            d[tag] = err_detail.text.strip()
72
                        else:
73
                            d[tag] = content.ele2xml(err_detail)
74
                    self._errors.append(RPCError(d))
75
        self._parsing_hook(root)
76
        self._parsed = True
77

    
78
    @property
79
    def xml(self):
80
        "*<rpc-reply>* as returned"
81
        return self._raw
82

    
83
    @property
84
    def ok(self):
85
        "Boolean value indicating if there were no errors."
86
        if not self._parsed:
87
            self.parse()
88
        return not self._errors # empty list => false
89

    
90
    @property
91
    def error(self):
92
        "Short for :attr:`errors`[0], returning :const:`None` if there were no errors."
93
        if not self._parsed:
94
            self.parse()
95
        if self._errors:
96
            return self._errors[0]
97
        else:
98
            return None
99

    
100
    @property
101
    def errors(self):
102
        "List of :class:`RPCError` objects. Will be empty if there were no :class:`<rpc-error>` elements in reply."
103
        if not self._parsed:
104
            self.parse()
105
        return self._errors
106

    
107

    
108
class RPCError(OperationError): # raise it if you like
109

    
110
    """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
111
    so it can be raised like any other exception."""
112

    
113
    def __init__(self, err_dict):
114
        self._dict = err_dict
115
        if self.message is not None:
116
            OperationError.__init__(self, self.message)
117
        else:
118
            OperationError.__init__(self)
119

    
120
    @property
121
    def type(self):
122
        "`string` represeting *error-type* element"
123
        return self.get('error-type', None)
124

    
125
    @property
126
    def severity(self):
127
        "`string` represeting *error-severity* element"
128
        return self.get('error-severity', None)
129

    
130
    @property
131
    def tag(self):
132
        "`string` represeting *error-tag* element"
133
        return self.get('error-tag', None)
134

    
135
    @property
136
    def path(self):
137
        "`string` or :const:`None`; represeting *error-path* element"
138
        return self.get('error-path', None)
139

    
140
    @property
141
    def message(self):
142
        "`string` or :const:`None`; represeting *error-message* element"
143
        return self.get('error-message', None)
144

    
145
    @property
146
    def info(self):
147
        "`string` or :const:`None`, represeting *error-info* element"
148
        return self.get('error-info', None)
149

    
150
    ## dictionary interface
151

    
152
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
153

    
154
    __iter__ = lambda self: self._dict.__iter__()
155

    
156
    __contains__ = lambda self, key: self._dict.__contains__(key)
157

    
158
    keys = lambda self: self._dict.keys()
159

    
160
    get = lambda self, key, default: self._dict.get(key, default)
161

    
162
    iteritems = lambda self: self._dict.iteritems()
163

    
164
    iterkeys = lambda self: self._dict.iterkeys()
165

    
166
    itervalues = lambda self: self._dict.itervalues()
167

    
168
    values = lambda self: self._dict.values()
169

    
170
    items = lambda self: self._dict.items()
171

    
172
    __repr__ = lambda self: repr(self._dict)
173

    
174

    
175
class RPCReplyListener(SessionListener):
176

    
177
    # internal use
178

    
179
    # one instance per session
180
    def __new__(cls, session):
181
        instance = session.get_listener_instance(cls)
182
        if instance is None:
183
            instance = object.__new__(cls)
184
            instance._lock = Lock()
185
            instance._id2rpc = WeakValueDictionary()
186
            instance._pipelined = session.can_pipeline
187
            session.add_listener(instance)
188
        return instance
189

    
190
    def register(self, id, rpc):
191
        with self._lock:
192
            self._id2rpc[id] = rpc
193

    
194
    def callback(self, root, raw):
195
        tag, attrs = root
196
        if content.unqualify(tag) != 'rpc-reply':
197
            return
198
        rpc = None
199
        for key in attrs:
200
            if content.unqualify(key) == 'message-id':
201
                id = attrs[key]
202
                try:
203
                    with self._lock:
204
                        rpc = self._id2rpc.pop(id)
205
                except KeyError:
206
                    logger.warning('no object registered for message-id: [%s]' % id)
207
                except Exception as e:
208
                    logger.debug('error - %r' % e)
209
                break
210
        else:
211
            if not self._pipelined:
212
                with self._lock:
213
                    assert(len(self._id2rpc) == 1)
214
                    rpc = self._id2rpc.values()[0]
215
                    self._id2rpc.clear()
216
            else:
217
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
218
        logger.debug('delivering to %r' % rpc)
219
        rpc.deliver_reply(raw)
220

    
221
    def errback(self, err):
222
        for rpc in self._id2rpc.values():
223
            rpc.deliver_error(err)
224

    
225

    
226
class RPC(object):
227

    
228
    "Directly corresponds to *<rpc>* requests. Handles making the request, and taking delivery of the reply."
229

    
230
    # : Subclasses can specify their dependencies on capabilities. List of URI's
231
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
232
    # time of object creation. If the capability is not available, a
233
    # :exc:`MissingCapabilityError` is raised.
234
    DEPENDS = []
235

    
236
    # : Subclasses can specify a different reply class, but it must be a
237
    # subclass of :class:`RPCReply`.
238
    REPLY_CLS = RPCReply
239

    
240
    def __init__(self, session, async=False, timeout=None):
241
        self._session = session
242
        try:
243
            for cap in self.DEPENDS:
244
                self._assert(cap)
245
        except AttributeError:
246
            pass
247
        self._async = async
248
        self._timeout = timeout
249
        # keeps things simple instead of having a class attr that has to be locked
250
        self._id = uuid1().urn
251
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
252
        self._listener = RPCReplyListener(session)
253
        self._listener.register(self._id, self)
254
        self._reply = None
255
        self._error = None
256
        self._event = Event()
257

    
258
    def _build(self, opspec):
259
        # internal
260
        spec = {
261
            'tag': content.qualify('rpc'),
262
            'attrib': {'message-id': self._id},
263
            'subtree': [ opspec ]
264
            }
265
        return content.dtree2xml(spec)
266

    
267
    def _request(self, op):
268
        """Subclasses call this method to make the RPC request.
269

270
        In asynchronous mode, returns an :class:`~threading.Event` which is set
271
        when the reply has been received or an error occured. It is prudent,
272
        therefore, to check the :attr:`error` attribute before accesing
273
        :attr:`reply`.
274

275
        Otherwise, waits until the reply is received and returns
276
        :class:`RPCReply`.
277

278
        :arg opspec: :ref:`dtree` for the operation
279
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
280
        :rtype: :class:`~threading.Event` or :class:`RPCReply`
281
        """
282
        req = self._build(op)
283
        self._session.send(req)
284
        if self._async:
285
            return self._event
286
        else:
287
            self._event.wait(self._timeout)
288
            if self._event.isSet():
289
                if self._error:
290
                    raise self._error
291
                self._reply.parse()
292
                return self._reply
293
            else:
294
                raise ReplyTimeoutError
295

    
296
    def request(self, *args, **kwds):
297
        "Subclasses implement this method. Here, the operation is to be constructed as a :ref:`dtree`, and the result of :meth:`_request` returned."
298
        return self._request(self.SPEC, *args, **kwds)
299

    
300
    def _delivery_hook(self):
301
        """Subclasses can implement this method. Will be called after
302
        initialising the :attr:`reply` or :attr:`error` attribute and before
303
        setting the :attr:`event`"""
304
        pass
305

    
306
    def _assert(self, capability):
307
        """Subclasses can use this method to verify that a capability is available
308
        with the NETCONF server, before making a request that requires it. A
309
        :class:`MissingCapabilityError` will be raised if the capability is not
310
        available."""
311
        if capability not in self._session.server_capabilities:
312
            raise MissingCapabilityError('Server does not support [%s]' % cap)
313

    
314
    def deliver_reply(self, raw):
315
        # internal use
316
        self._reply = self.REPLY_CLS(raw)
317
        self._delivery_hook()
318
        self._event.set()
319

    
320
    def deliver_error(self, err):
321
        # internal use
322
        self._error = err
323
        self._delivery_hook()
324
        self._event.set()
325

    
326
    @property
327
    def reply(self):
328
        ":class:`RPCReply` element if reply has been received or :const:`None`"
329
        return self._reply
330

    
331
    @property
332
    def error(self):
333
        """:exc:`Exception` type if an error occured or :const:`None`.
334

335
        This attribute should be checked if the request was made asynchronously,
336
        so that it can be determined if :attr:`event` being set is because of a
337
        reply or error.
338

339
        .. note::
340
            This represents an error which prevented a reply from being
341
            received. An *<rpc-error>* does not fall in that category -- see
342
            :class:`RPCReply` for that.
343
        """
344
        return self._error
345

    
346
    @property
347
    def id(self):
348
        "The *message-id* for this RPC"
349
        return self._id
350

    
351
    @property
352
    def session(self):
353
        """The :class:`~ncclient.transport.Session` object associated with this
354
        RPC"""
355
        return self._session
356

    
357
    @property
358
    def event(self):
359
        """:class:`~threading.Event` that is set when reply has been received or
360
        error occured."""
361
        return self._event
362

    
363
    def set_async(self, async=True):
364
        """Set asynchronous mode for this RPC."""
365
        self._async = async
366
        if async and not session.can_pipeline:
367
            raise UserWarning('Asynchronous mode not supported for this device/session')
368

    
369
    def set_timeout(self, timeout):
370
        """Set the timeout for synchronous waiting defining how long the RPC
371
        request will block on a reply before raising an error."""
372
        self._timeout = timeout
373

    
374
    #: Whether this RPC is asynchronous
375
    async = property(fget=lambda self: self._async, fset=set_async)
376

    
377
    #: Timeout for synchronous waiting
378
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)