Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ b2d60e49

History | View | Annotate | Download (13 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17

    
18
from ncclient.xml_ import *
19
from ncclient.transport import SessionListener
20

    
21
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
22

    
23
import logging
24
logger = logging.getLogger("ncclient.operations.rpc")
25

    
26

    
27
class RPCError(OperationError):
28

    
29
    """Represents an *rpc-error*. It is a type of :exc:`OperationError` and can be raised like any
30
    other exception."""
31
    
32
    tag_to_attr = {
33
        qualify("error-type"): "_type",
34
        qualify("error-tag"): "_tag",
35
        qualify("error-severity"): "_severity",
36
        qualify("error-info"): "_info",
37
        qualify("error-path"): "_path",
38
        qualify("error-message"): "_message"
39
    }
40
    
41
    def __init__(self, raw):
42
        self._raw = raw
43
        for attr in RPCError.tag_to_attr.values():
44
            setattr(self, attr, None)
45
        for subele in raw:
46
            attr = RPCError.tag_to_attr.get(subele.tag, None)
47
            if attr is not None:
48
                setattr(self, attr, subele.text if attr != "_info" else to_xml(subele) )
49
        if self.message is not None:
50
            OperationError.__init__(self, self.message)
51
        else:
52
            OperationError.__init__(self, self.to_dict())
53
    
54
    def to_dict(self):
55
        return dict([ (attr[1:], getattr(self, attr)) for attr in RPCError.tag_to_attr.values() ])
56
    
57
    @property
58
    def xml(self):
59
        "*rpc-error* element as returned."
60
        return self._raw
61
    
62
    @property
63
    def type(self):
64
        "`string` representing text of *error-type* element."
65
        return self._type
66
    
67
    @property
68
    def tag(self):
69
        "`string` representing text of *error-tag* element."
70
        return self._tag
71
    
72
    @property
73
    def severity(self):
74
        "`string` representing text of *error-severity* element."
75
        return self._severity
76
    
77
    @property
78
    def path(self):
79
        "`string` or :const:`None`; representing text of *error-path* element."
80
        return self._path
81
    
82
    @property
83
    def message(self):
84
        "`string` or :const:`None`; representing text of *error-message* element."
85
        return self._message
86
    
87
    @property
88
    def info(self):
89
        "`string` (XML) or :const:`None`; representing *error-info* element."
90
        return self._info
91

    
92

    
93
class RPCReply:
94

    
95
    """Represents an *rpc-reply*. Only concerns itself with whether the operation was successful.
96

97
    .. note::
98
        If the reply has not yet been parsed there is an implicit, one-time parsing overhead to
99
        accessing the attributes defined by this class and any subclasses.
100
    """
101
    
102
    ERROR_CLS = RPCError
103
    "Subclasses can specify a different error class, but it should be a subclass of `RPCError`."
104
    
105
    def __init__(self, raw):
106
        self._raw = raw
107
        self._parsed = False
108
        self._root = None
109
        self._errors = []
110

    
111
    def __repr__(self):
112
        return self._raw
113
    
114
    def parse(self):
115
        "Parses the *rpc-reply*."
116
        if self._parsed: return
117
        root = self._root = to_ele(self._raw) # The <rpc-reply> element
118
        # Per RFC 4741 an <ok/> tag is sent when there are no errors or warnings
119
        ok = root.find(qualify("ok"))
120
        if ok is None:
121
            # Create RPCError objects from <rpc-error> elements
122
            error = root.find(qualify("rpc-error"))
123
            if error is not None:
124
                for err in root.getiterator(error.tag):
125
                    # Process a particular <rpc-error>
126
                    self._errors.append(self.ERROR_CLS(err))
127
        self._parsing_hook(root)
128
        self._parsed = True
129

    
130
    def _parsing_hook(self, root):
131
        pass
132
    
133
    @property
134
    def xml(self):
135
        "*rpc-reply* element as returned."
136
        return self._raw
137
    
138
    @property
139
    def ok(self):
140
        "Boolean value indicating if there were no errors."
141
        return not self.errors # empty list => false
142
    
143
    @property
144
    def error(self):
145
        "Returns the first `RPCError` and :const:`None` if there were no errors."
146
        self.parse()
147
        if self._errors:
148
            return self._errors[0]
149
        else:
150
            return None
151
    
152
    @property
153
    def errors(self):
154
        """`list` of `RPCError` objects. Will be empty if there were no *rpc-error* elements in
155
        reply."""
156
        self.parse()
157
        return self._errors
158

    
159

    
160
class RPCReplyListener(SessionListener): # internal use
161
    
162
    creation_lock = Lock()
163
    
164
    # one instance per session -- maybe there is a better way??
165
    def __new__(cls, session):
166
        with RPCReplyListener.creation_lock:
167
            instance = session.get_listener_instance(cls)
168
            if instance is None:
169
                instance = object.__new__(cls)
170
                instance._lock = Lock()
171
                instance._id2rpc = {}
172
                #instance._pipelined = session.can_pipeline
173
                session.add_listener(instance)
174
            return instance
175

    
176
    def register(self, id, rpc):
177
        with self._lock:
178
            self._id2rpc[id] = rpc
179

    
180
    def callback(self, root, raw):
181
        tag, attrs = root
182
        if tag != qualify("rpc-reply"):
183
            return
184
        for key in attrs: # in the <rpc-reply> attributes
185
            if key == "message-id": # if we found msgid attr
186
                id = attrs[key] # get the msgid
187
                with self._lock:
188
                    try:
189
                        rpc = self._id2rpc[id] # the corresponding rpc
190
                        logger.debug("Delivering to %r" % rpc)
191
                        rpc.deliver_reply(raw)
192
                    except KeyError:
193
                        raise OperationError("Unknown 'message-id': %s", id)
194
                    # no catching other exceptions, fail loudly if must
195
                    else:
196
                        # if no error delivering, can del the reference to the RPC
197
                        del self._id2rpc[id]
198
                        break
199
        else:
200
            raise OperationError("Could not find 'message-id' attribute in <rpc-reply>")
201
    
202
    def errback(self, err):
203
        try:
204
            for rpc in self._id2rpc.values():
205
                rpc.deliver_error(err)
206
        finally:
207
            self._id2rpc.clear()
208

    
209

    
210
class RPC(object):
211
    
212
    """Base class for all operations, directly corresponding to *rpc* requests. Handles making the
213
    request, and taking delivery of the reply."""
214
    
215
    DEPENDS = []
216
    """Subclasses can specify their dependencies on capabilities. List of URI's or abbreviated
217
    names, e.g. ':writable-running'. These are verified at the time of instantiation. If the
218
    capability is not available, a :exc:`MissingCapabilityError` is raised.
219
    """
220
    
221
    REPLY_CLS = RPCReply
222
    "Subclasses can specify a different reply class, but it should be a subclass of `RPCReply`."
223
    
224
    def __init__(self, session, async=False, timeout=None, raise_mode="none"):
225
        self._session = session
226
        try:
227
            for cap in self.DEPENDS:
228
                self._assert(cap)
229
        except AttributeError:
230
            pass
231
        self._async = async
232
        self._timeout = timeout
233
        self._raise_mode = raise_mode
234
        self._id = uuid1().urn # Keeps things simple instead of having a class attr with running ID that has to be locked
235
        self._listener = RPCReplyListener(session)
236
        self._listener.register(self._id, self)
237
        self._reply = None
238
        self._error = None
239
        self._event = Event()
240
    
241
    def _wrap(self, subele):
242
        # internal use
243
        ele = new_ele("rpc", {"message-id": self._id}, xmlns=BASE_NS_1_0)
244
        ele.append(subele)
245
        return to_xml(ele)
246

    
247
    def _request(self, op):
248
        """Implementations of :meth:`request` call this method to send the request and process the
249
        reply.
250
        
251
        In synchronous mode, blocks until the reply is received and returns `RPCReply`. Depending on
252
        the :attr:`raise_mode` a *rpc-error* element in the reply may lead to an :exc:`RPCError`
253
        exception.
254
        
255
        In asynchronous mode, returns immediately, returning *self*. The :attr:`event` attribute
256
        will be set when the reply has been received (see :attr:`reply`) or an error occured (see
257
        :attr:`error`).
258
        
259
        :param op: operation to be requested
260
        :type ops: `~xml.etree.ElementTree.Element`
261
        
262
        :rtype: `RPCReply` (sync) or `RPC` (async)
263
        """
264
        logger.info('Requesting %r' % self.__class__.__name__)
265
        req = self._wrap(op)
266
        self._session.send(req)
267
        if self._async:
268
            logger.debug('Async request, returning %r', self)
269
            return self
270
        else:
271
            logger.debug('Sync request, will wait for timeout=%r' % self._timeout)
272
            self._event.wait(self._timeout)
273
            if self._event.isSet():
274
                if self._error:
275
                    # Error that prevented reply delivery
276
                    raise self._error
277
                self._reply.parse()
278
                if self._reply.error is not None:
279
                    # <rpc-error>'s [ RPCError ]
280
                    if self._raise_mode == "all":
281
                        raise self._reply.error
282
                    elif (self._raise_mode == "errors" and
283
                          self._reply.error.type == "error"):
284
                        raise self._reply.error
285
                return self._reply
286
            else:
287
                raise TimeoutExpiredError
288

    
289
    def request(self):
290
        """Subclasses must implement this method. Typically only the request needs to be built as an
291
        `~xml.etree.ElementTree.Element` and everything else can be handed off to
292
        :meth:`_request`."""
293
        pass
294
    
295
    def _assert(self, capability):
296
        """Subclasses can use this method to verify that a capability is available with the NETCONF
297
        server, before making a request that requires it. A :exc:`MissingCapabilityError` will be
298
        raised if the capability is not available."""
299
        if capability not in self._session.server_capabilities:
300
            raise MissingCapabilityError('Server does not support [%s]' % capability)
301
    
302
    def deliver_reply(self, raw):
303
        # internal use
304
        self._reply = self.REPLY_CLS(raw)
305
        self._event.set()
306

    
307
    def deliver_error(self, err):
308
        # internal use
309
        self._error = err
310
        self._event.set()
311
    
312
    @property
313
    def reply(self):
314
        "`RPCReply` element if reply has been received or :const:`None`"
315
        return self._reply
316
    
317
    @property
318
    def error(self):
319
        """:exc:`Exception` type if an error occured or :const:`None`.
320
        
321
        .. note::
322
            This represents an error which prevented a reply from being received. An *rpc-error*
323
            does not fall in that category -- see `RPCReply` for that.
324
        """
325
        return self._error
326
    
327
    @property
328
    def id(self):
329
        "The *message-id* for this RPC."
330
        return self._id
331
    
332
    @property
333
    def session(self):
334
        "The `~ncclient.transport.Session` object associated with this RPC."
335
        return self._session
336

    
337
    @property
338
    def event(self):
339
        """`~threading.Event` that is set when reply has been received or when an error preventing
340
        delivery of the reply occurs.
341
        """
342
        return self._event
343

    
344
    def set_async(self, async=True):
345
        self._async = async
346
        if async and not session.can_pipeline:
347
            raise UserWarning('Asynchronous mode not supported for this device/session')
348

    
349
    def set_raise_mode(self, mode):
350
        assert(choice in ("all", "errors", "none"))
351
        self._raise_mode = mode
352

    
353
    def set_timeout(self, timeout):
354
        self._timeout = timeout
355

    
356
    raise_mode = property(fget=lambda self: self._raise_mode, fset=set_raise_mode)
357
    """Depending on this exception raising mode, an *rpc-error* in the reply may be raised as
358
    :exc:`RPCError` exceptions. Valid values:
359
    
360
    * ``"all"`` -- any kind of *rpc-error* (error or warning)
361
    * ``"errors"`` -- when the *error-type* element says it is an error
362
    * ``"none"`` -- neither
363
    """
364
    
365
    is_async = property(fget=lambda self: self._async, fset=set_async)
366
    """Specifies whether this RPC will be / was requested asynchronously. By default RPC's are
367
    synchronous.
368
    """
369
    
370
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)
371
    """Timeout in seconds for synchronous waiting defining how long the RPC request will block on a
372
    reply before raising :exc:`TimeoutExpiredError`. By default there is no timeout, represented by
373
    :const:`None`.
374
    
375
    Irrelevant for asynchronous usage.
376
    """