Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ bd7957fb

History | View | Annotate | Download (12.5 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17

    
18
from ncclient.xml_ import *
19
from ncclient.transport import SessionListener
20

    
21
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22

    
23
import logging
24
logger = logging.getLogger("ncclient.operations.rpc")
25

    
26

    
27
class RPCReply:
28

    
29
    """Represents an *<rpc-reply>*. Only concerns itself with whether the
30
    operation was successful.
31

32
    .. note::
33
        If the reply has not yet been parsed there is an implicit, one-time
34
        parsing overhead to accessing the attributes defined by this class and
35
        any subclasses.
36
    """
37

    
38
    def __init__(self, raw):
39
        self._raw = raw
40
        self._parsed = False
41
        self._root = None
42
        self._errors = []
43

    
44
    def __repr__(self):
45
        return self._raw
46

    
47
    def _parsing_hook(self, root):
48
        """Subclass can implement.
49

50
        :type root: :class:`~xml.etree.ElementTree.Element`
51
        """
52
        pass
53

    
54
    def parse(self):
55
        """Parse the *<rpc-reply>*"""
56
        if self._parsed:
57
            return
58
        root = self._root = to_ele(self._raw) # The <rpc-reply> element
59
        # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
60
        ok = root.find(qualify("ok"))
61
        if ok is None:
62
            # Create RPCError objects from <rpc-error> elements
63
            error = root.find(qualify("rpc-error"))
64
            if error is not None:
65
                for err in root.getiterator(error.tag):
66
                    # Process a particular <rpc-error>
67
                    self._errors.append(RPCError(err))
68
        self._parsing_hook(root)
69
        self._parsed = True
70

    
71
    @property
72
    def xml(self):
73
        "*<rpc-reply>* as returned"
74
        return self._raw
75

    
76
    @property
77
    def ok(self):
78
        "Boolean value indicating if there were no errors."
79
        if not self._parsed:
80
            self.parse()
81
        return not self._errors # empty list => false
82

    
83
    @property
84
    def error(self):
85
        """Short for :attr:`errors` [0]; :const:`None` if there were no errors.
86
        """
87
        if not self._parsed:
88
            self.parse()
89
        if self._errors:
90
            return self._errors[0]
91
        else:
92
            return None
93

    
94
    @property
95
    def errors(self):
96
        """`list` of :class:`RPCError` objects. Will be empty if there were no
97
        *<rpc-error>* elements in reply.
98
        """
99
        if not self._parsed:
100
            self.parse()
101
        return self._errors
102

    
103

    
104
class RPCError(OperationError):
105

    
106
    """Represents an *<rpc-error>*. It is a type of :exc:`OperationError`
107
    and can be raised like any other exception."""
108

    
109
    def __init__(self, err):
110
        self._type = None
111
        self._tag = None
112
        self._severity = None
113
        self._info = None
114
        self._path = None
115
        self._message = None
116
        for subele in err:
117
            if subele.tag == qualify("error-type"):
118
                self._type = subele.text
119
            elif subele.tag == qualify("error-tag"):
120
                self._tag = subele.text
121
            elif subele.tag == qualify("error-severity"):
122
                self._severity = subele.text
123
            elif subele.tag == qualify("error-info"):
124
                self._info = subele.text
125
            elif subele.tag == qualify("error-path"):
126
                self._path = subele.text
127
            elif subele.tag == qualify("error-message"):
128
                self._message = subele.text
129
        if self.message is not None:
130
            OperationError.__init__(self, self.message)
131
        else:
132
            OperationError.__init__(self, self.to_dict())
133
    
134
    def to_dict(self):
135
        return {
136
            'type': self.type,
137
            'tag': self.tag,
138
            'severity': self.severity,
139
            'path': self.path,
140
            'message': self.message,
141
            'info': self.info
142
        }
143
    
144
    @property
145
    def type(self):
146
        "`string` representing text of *error-type* element"
147
        return self._type
148
    
149
    @property
150
    def tag(self):
151
        "`string` representing text of *error-tag* element"
152
        return self._tag
153
    
154
    @property
155
    def severity(self):
156
        "`string` representing text of *error-severity* element"
157
        return self._severity
158
    
159
    @property
160
    def path(self):
161
        "`string` or :const:`None`; representing text of *error-path* element"
162
        return self._path
163

    
164
    @property
165
    def message(self):
166
        "`string` or :const:`None`; representing text of *error-message* element"
167
        return self._message
168

    
169
    @property
170
    def info(self):
171
        "`string` (XML) or :const:`None`, representing *error-info* element"
172
        return self._info
173

    
174

    
175
class RPCReplyListener(SessionListener):
176

    
177
    # internal use
178

    
179
    # one instance per session -- maybe there is a better way??
180
    def __new__(cls, session):
181
        instance = session.get_listener_instance(cls)
182
        if instance is None:
183
            instance = object.__new__(cls)
184
            instance._lock = Lock()
185
            instance._id2rpc = {}
186
            #instance._pipelined = session.can_pipeline
187
            session.add_listener(instance)
188
        return instance
189

    
190
    def register(self, id, rpc):
191
        with self._lock:
192
            self._id2rpc[id] = rpc
193

    
194
    def callback(self, root, raw):
195
        tag, attrs = root
196
        if tag != qualify("rpc-reply"):
197
            return
198
        for key in attrs: # in the <rpc-reply> attributes
199
            if key == "message-id": # if we found msgid attr
200
                id = attrs[key] # get the msgid
201
                with self._lock:
202
                    try:                    
203
                        rpc = self._id2rpc[id] # the corresponding rpc
204
                        logger.debug("Delivering to %r" % rpc)
205
                        rpc.deliver_reply(raw)
206
                    except KeyError:
207
                        raise OperationError("Unknown message-id: %s", id)
208
                    # no catching other exceptions, fail loudly if must
209
                    else:
210
                        # if no error delivering, can del the reference to the RPC
211
                        del self._id2rpc[id]
212
                        break
213
        else:
214
            raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
215
    
216
    def errback(self, err):
217
        try:
218
            for rpc in self._id2rpc.values():
219
                rpc.deliver_error(err)
220
        finally:
221
            self._id2rpc.clear()
222

    
223

    
224
class RPC(object):
225

    
226
    """Base class for all operations.
227

228
    Directly corresponds to *<rpc>* requests. Handles making the request, and
229
    taking delivery of the reply.
230
    """
231

    
232
    #: Subclasses can specify their dependencies on capabilities. List of URI's
233
    # or abbreviated names, e.g. ':writable-running'. These are verified at the
234
    # time of object creation. If the capability is not available, a
235
    # :exc:`MissingCapabilityError` is raised.
236
    DEPENDS = []
237

    
238
    #: Subclasses can specify a different reply class, but it must be a
239
    # subclass of :class:`RPCReply`.
240
    REPLY_CLS = RPCReply
241

    
242
    def __init__(self, session, async=False, timeout=None, raise_mode="none"):
243
        self._session = session
244
        try:
245
            for cap in self.DEPENDS:
246
                self._assert(cap)
247
        except AttributeError:
248
            pass
249
        self._async = async
250
        self._timeout = timeout
251
        self._raise_mode = raise_mode
252
        self._id = uuid1().urn # Keeps things simple instead of having a class attr that has to be locked
253
        self._listener = RPCReplyListener(session)
254
        self._listener.register(self._id, self)
255
        self._reply = None
256
        self._error = None
257
        self._event = Event()
258

    
259
    def _build(self, subele):
260
        # internal
261
        ele = new_ele("rpc", {"message-id": self._id}, xmlns=BASE_NS_1_0)
262
        ele.append(subele)
263
        return to_xml(ele)
264

    
265
    def _request(self, op):
266
        """Subclasses call this method to make the RPC request.
267
        
268
        In synchronous mode, waits until the reply is received and returns
269
        :class:`RPCReply`.
270
        
271
        In asynchronous mode, returns immediately, returning a reference to this
272
        object. The :attr:`event` attribute will be set when the reply has been
273
        received (see :attr:`reply`) or an error occured (see :attr:`error`).
274
        
275
        :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
276
        :rtype: :class:`RPCReply` (sync) or :class:`RPC` (async)
277
        """
278
        logger.info('Requesting %r' % self.__class__.__name__)
279
        req = self._build(op)
280
        self._session.send(req)
281
        if self._async:
282
            logger.debug('Async request, returning %r', self)
283
            return self
284
        else:
285
            logger.debug('Sync request, will wait for timeout=%r' %
286
                         self._timeout)
287
            self._event.wait(self._timeout)
288
            if self._event.isSet():
289
                if self._error:
290
                    # Error that prevented reply delivery
291
                    raise self._error
292
                self._reply.parse()
293
                if self._reply.error is not None:
294
                    # <rpc-error>'s [ RPCError ]
295
                    if self._raise_mode == "all":
296
                        raise self._reply.error
297
                    elif (self._raise_mode == "errors" and
298
                          self._reply.error.type == "error"):
299
                        raise self._reply.error
300
                return self._reply
301
            else:
302
                raise TimeoutExpiredError
303

    
304
    def request(self, *args, **kwds):
305
        "Subclasses implement this method."
306
        return self._request(self.SPEC)
307
    
308
    def _assert(self, capability):
309
        """Subclasses can use this method to verify that a capability is available
310
        with the NETCONF server, before making a request that requires it. A
311
        :exc:`MissingCapabilityError` will be raised if the capability is not
312
        available."""
313
        if capability not in self._session.server_capabilities:
314
            raise MissingCapabilityError('Server does not support [%s]' %
315
                                         capability)
316
    
317
    def deliver_reply(self, raw):
318
        # internal use
319
        self._reply = self.REPLY_CLS(raw)
320
        self._event.set()
321

    
322
    def deliver_error(self, err):
323
        # internal use
324
        self._error = err
325
        self._event.set()
326

    
327
    @property
328
    def reply(self):
329
        ":class:`RPCReply` element if reply has been received or :const:`None`"
330
        return self._reply
331

    
332
    @property
333
    def error(self):
334
        """:exc:`Exception` type if an error occured or :const:`None`.
335
        
336
        .. note::
337
            This represents an error which prevented a reply from being
338
            received. An *<rpc-error>* does not fall in that category -- see
339
            :class:`RPCReply` for that.
340
        """
341
        return self._error
342

    
343
    @property
344
    def id(self):
345
        "The *message-id* for this RPC"
346
        return self._id
347

    
348
    @property
349
    def session(self):
350
        """The :class:`~ncclient.transport.Session` object associated with this
351
        RPC"""
352
        return self._session
353

    
354
    @property
355
    def event(self):
356
        """:class:`~threading.Event` that is set when reply has been received or
357
        error occured."""
358
        return self._event
359

    
360
    def set_async(self, async=True):
361
        """Set asynchronous mode for this RPC."""
362
        self._async = async
363
        if async and not session.can_pipeline:
364
            raise UserWarning('Asynchronous mode not supported for this device/session')
365

    
366
    def set_raise_mode(self, mode):
367
        assert(choice in ("all", "errors", "none"))
368
        self._raise_mode = mode
369

    
370
    def set_timeout(self, timeout):
371
        """Set the timeout for synchronous waiting; defining how long the RPC
372
        request will block on a reply before raising an error. Irrelevant for
373
        asynchronous usage."""
374
        self._timeout = timeout
375

    
376
    #: Whether this RPC is asynchronous
377
    is_async = property(fget=lambda self: self._async, fset=set_async)
378

    
379
    #: Timeout for synchronous waiting
380
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)