Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 0c89e420

History | View | Annotate | Download (13.2 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17

    
18
from ncclient.xml_ import *
19
from ncclient.transport import SessionListener
20

    
21
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22

    
23
import logging
24
logger = logging.getLogger("ncclient.operations.rpc")
25

    
26

    
27
class RPCError(OperationError):
28

    
29
    "Represents an `rpc-error`. It is a type of :exc:`OperationError` and can be raised as such."
30
    
31
    tag_to_attr = {
32
        qualify("error-type"): "_type",
33
        qualify("error-tag"): "_tag",
34
        qualify("error-severity"): "_severity",
35
        qualify("error-info"): "_info",
36
        qualify("error-path"): "_path",
37
        qualify("error-message"): "_message"
38
    }
39
    
40
    def __init__(self, raw):
41
        self._raw = raw
42
        for attr in RPCError.tag_to_attr.values():
43
            setattr(self, attr, None)
44
        for subele in raw:
45
            attr = RPCError.tag_to_attr.get(subele.tag, None)
46
            if attr is not None:
47
                setattr(self, attr, subele.text if attr != "_info" else to_xml(subele) )
48
        if self.message is not None:
49
            OperationError.__init__(self, self.message)
50
        else:
51
            OperationError.__init__(self, self.to_dict())
52
    
53
    def to_dict(self):
54
        return dict([ (attr[1:], getattr(self, attr)) for attr in RPCError.tag_to_attr.values() ])
55
    
56
    @property
57
    def xml(self):
58
        "The `rpc-error` element as returned in XML."
59
        return self._raw
60
    
61
    @property
62
    def type(self):
63
        "The contents of the `error-type` element."
64
        return self._type
65
    
66
    @property
67
    def tag(self):
68
        "The contents of the `error-tag` element."
69
        return self._tag
70
    
71
    @property
72
    def severity(self):
73
        "The contents of the `error-severity` element."
74
        return self._severity
75
    
76
    @property
77
    def path(self):
78
        "The contents of the `error-path` element if present or `None`."
79
        return self._path
80
    
81
    @property
82
    def message(self):
83
        "The contents of the `error-message` element if present or `None`."
84
        return self._message
85
    
86
    @property
87
    def info(self):
88
        "XML string or `None`; representing the `error-info` element."
89
        return self._info
90

    
91

    
92
class RPCReply:
93

    
94
    """Represents an *rpc-reply*. Only concerns itself with whether the operation was successful.
95

96
    .. note::
97
        If the reply has not yet been parsed there is an implicit, one-time parsing overhead to
98
        accessing some of the attributes defined by this class.
99
    """
100
    
101
    ERROR_CLS = RPCError
102
    "Subclasses can specify a different error class, but it should be a subclass of `RPCError`."
103
    
104
    def __init__(self, raw):
105
        self._raw = raw
106
        self._parsed = False
107
        self._root = None
108
        self._errors = []
109

    
110
    def __repr__(self):
111
        return self._raw
112
    
113
    def parse(self):
114
        "Parses the *rpc-reply*."
115
        if self._parsed: return
116
        root = self._root = to_ele(self._raw) # The <rpc-reply> element
117
        # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
118
        ok = root.find(qualify("ok"))
119
        if ok is None:
120
            # Create RPCError objects from <rpc-error> elements
121
            error = root.find(qualify("rpc-error"))
122
            if error is not None:
123
                for err in root.getiterator(error.tag):
124
                    # Process a particular <rpc-error>
125
                    self._errors.append(self.ERROR_CLS(err))
126
        self._parsing_hook(root)
127
        self._parsed = True
128

    
129
    def _parsing_hook(self, root):
130
        "No-op by default. Gets passed the *root* element for the reply."
131
        pass
132
    
133
    @property
134
    def xml(self):
135
        "*rpc-reply* element as returned."
136
        return self._raw
137
    
138
    @property
139
    def ok(self):
140
        "Boolean value indicating if there were no errors."
141
        return not self.errors # empty list => false
142
    
143
    @property
144
    def error(self):
145
        "Returns the first :class:`RPCError` and `None` if there were no errors."
146
        self.parse()
147
        if self._errors:
148
            return self._errors[0]
149
        else:
150
            return None
151
    
152
    @property
153
    def errors(self):
154
        "List of `RPCError` objects. Will be empty if there were no *rpc-error* elements in reply."
155
        self.parse()
156
        return self._errors
157

    
158

    
159
class RPCReplyListener(SessionListener): # internal use
160
    
161
    creation_lock = Lock()
162
    
163
    # one instance per session -- maybe there is a better way??
164
    def __new__(cls, session):
165
        with RPCReplyListener.creation_lock:
166
            instance = session.get_listener_instance(cls)
167
            if instance is None:
168
                instance = object.__new__(cls)
169
                instance._lock = Lock()
170
                instance._id2rpc = {}
171
                #instance._pipelined = session.can_pipeline
172
                session.add_listener(instance)
173
            return instance
174

    
175
    def register(self, id, rpc):
176
        with self._lock:
177
            self._id2rpc[id] = rpc
178

    
179
    def callback(self, root, raw):
180
        tag, attrs = root
181
        if tag != qualify("rpc-reply"):
182
            return
183
        for key in attrs: # in the <rpc-reply> attributes
184
            if key == "message-id": # if we found msgid attr
185
                id = attrs[key] # get the msgid
186
                with self._lock:
187
                    try:
188
                        rpc = self._id2rpc[id] # the corresponding rpc
189
                        logger.debug("Delivering to %r" % rpc)
190
                        rpc.deliver_reply(raw)
191
                    except KeyError:
192
                        raise OperationError("Unknown 'message-id': %s", id)
193
                    # no catching other exceptions, fail loudly if must
194
                    else:
195
                        # if no error delivering, can del the reference to the RPC
196
                        del self._id2rpc[id]
197
                        break
198
        else:
199
            raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
200
    
201
    def errback(self, err):
202
        try:
203
            for rpc in self._id2rpc.values():
204
                rpc.deliver_error(err)
205
        finally:
206
            self._id2rpc.clear()
207

    
208

    
209
class RaiseMode(object):
210

    
211
    NONE = 0
212
    "Don't attempt to raise any type of `rpc-error` as :exc:`RPCError`."
213

    
214
    ERRORS = 1
215
    "Raise only when the `error-type` indicates it is an honest-to-god error."
216

    
217
    ALL = 2
218
    "Don't look at the `error-type`, always raise."
219

    
220

    
221
class RPC(object):
222
    
223
    """Base class for all operations, directly corresponding to *rpc* requests. Handles making the request, and taking delivery of the reply."""
224

    
225
    DEPENDS = []
226
    """Subclasses can specify their dependencies on capabilities as a list of URI's or abbreviated names, e.g. ':writable-running'. These are verified at the time of instantiation. If the capability is not available, :exc:`MissingCapabilityError` is raised."""
227
    
228
    REPLY_CLS = RPCReply
229
    "By default :class:`RPCReply`. Subclasses can specify a :class:`RPCReply` subclass."
230
    
231
    def __init__(self, session, async=False, timeout=30, raise_mode=RaiseMode.NONE):
232
        """
233
        *session* is the :class:`~ncclient.transport.Session` instance
234

235
        *async* specifies whether the request is to be made asynchronously, see :attr:`is_async`
236

237
        *timeout* is the timeout for a synchronous request, see :attr:`timeout`
238

239
        *raise_mode* specifies the exception raising mode, see :attr:`raise_mode`
240
        """
241
        self._session = session
242
        try:
243
            for cap in self.DEPENDS:
244
                self._assert(cap)
245
        except AttributeError:
246
            pass
247
        self._async = async
248
        self._timeout = timeout
249
        self._raise_mode = raise_mode
250
        self._id = uuid1().urn # Keeps things simple instead of having a class attr with running ID that has to be locked
251
        self._listener = RPCReplyListener(session)
252
        self._listener.register(self._id, self)
253
        self._reply = None
254
        self._error = None
255
        self._event = Event()
256
    
257
    def _wrap(self, subele):
258
        # internal use
259
        ele = new_ele("rpc", {"message-id": self._id})
260
        ele.append(subele)
261
        return to_xml(ele)
262

    
263
    def _request(self, op):
264
        """Implementations of :meth:`request` call this method to send the request and process the reply.
265
        
266
        In synchronous mode, blocks until the reply is received and returns :class:`RPCReply`. Depending on the :attr:`raise_mode` a `rpc-error` element in the reply may lead to an :exc:`RPCError` exception.
267
        
268
        In asynchronous mode, returns immediately, returning `self`. The :attr:`event` attribute will be set when the reply has been received (see :attr:`reply`) or an error occured (see :attr:`error`).
269
        
270
        *op* is the operation to be requested as an :class:`~xml.etree.ElementTree.Element`
271
        """
272
        logger.info('Requesting %r' % self.__class__.__name__)
273
        req = self._wrap(op)
274
        self._session.send(req)
275
        if self._async:
276
            logger.debug('Async request, returning %r', self)
277
            return self
278
        else:
279
            logger.debug('Sync request, will wait for timeout=%r' % self._timeout)
280
            self._event.wait(self._timeout)
281
            if self._event.isSet():
282
                if self._error:
283
                    # Error that prevented reply delivery
284
                    raise self._error
285
                self._reply.parse()
286
                if self._reply.error is not None:
287
                    # <rpc-error>'s [ RPCError ]
288
                    if self._raise_mode == RaiseMode.ALL:
289
                        raise self._reply.error
290
                    elif (self._raise_mode == RaiseMode.ERRORS and self._reply.error.type == "error"):
291
                        raise self._reply.error
292
                return self._reply
293
            else:
294
                raise TimeoutExpiredError
295

    
296
    def request(self):
297
        """Subclasses must implement this method. Typically only the request needs to be built as an
298
        :class:`~xml.etree.ElementTree.Element` and everything else can be handed off to
299
        :meth:`_request`."""
300
        pass
301
    
302
    def _assert(self, capability):
303
        """Subclasses can use this method to verify that a capability is available with the NETCONF
304
        server, before making a request that requires it. A :exc:`MissingCapabilityError` will be
305
        raised if the capability is not available."""
306
        if capability not in self._session.server_capabilities:
307
            raise MissingCapabilityError('Server does not support [%s]' % capability)
308
    
309
    def deliver_reply(self, raw):
310
        # internal use
311
        self._reply = self.REPLY_CLS(raw)
312
        self._event.set()
313

    
314
    def deliver_error(self, err):
315
        # internal use
316
        self._error = err
317
        self._event.set()
318
    
319
    @property
320
    def reply(self):
321
        ":class:`RPCReply` element if reply has been received or `None`"
322
        return self._reply
323
    
324
    @property
325
    def error(self):
326
        """:exc:`Exception` type if an error occured or `None`.
327
        
328
        .. note::
329
            This represents an error which prevented a reply from being received. An *rpc-error*
330
            does not fall in that category -- see `RPCReply` for that.
331
        """
332
        return self._error
333
    
334
    @property
335
    def id(self):
336
        "The *message-id* for this RPC."
337
        return self._id
338
    
339
    @property
340
    def session(self):
341
        "The `~ncclient.transport.Session` object associated with this RPC."
342
        return self._session
343

    
344
    @property
345
    def event(self):
346
        """:class:`~threading.Event` that is set when reply has been received or when an error preventing
347
        delivery of the reply occurs.
348
        """
349
        return self._event
350

    
351
    def __set_async(self, async=True):
352
        self._async = async
353
        if async and not session.can_pipeline:
354
            raise UserWarning('Asynchronous mode not supported for this device/session')
355

    
356
    def __set_raise_mode(self, mode):
357
        assert(choice in ("all", "errors", "none"))
358
        self._raise_mode = mode
359

    
360
    def __set_timeout(self, timeout):
361
        self._timeout = timeout
362

    
363
    raise_mode = property(fget=lambda self: self._raise_mode, fset=__set_raise_mode)
364
    """Depending on this exception raising mode, an `rpc-error` in the reply may be raised as an :exc:`RPCError` exception. Valid values are the constants defined in :class:`RaiseMode`. """
365
    
366
    is_async = property(fget=lambda self: self._async, fset=__set_async)
367
    """Specifies whether this RPC will be / was requested asynchronously. By default RPC's are synchronous."""
368
    
369
    timeout = property(fget=lambda self: self._timeout, fset=__set_timeout)
370
    """Timeout in seconds for synchronous waiting defining how long the RPC request will block on a reply before raising :exc:`TimeoutExpiredError`.
371
    
372
    Irrelevant for asynchronous usage.
373
    """