Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 0cdb8b3c

History | View | Annotate | Download (8.7 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20
from ncclient.transport import SessionListener
21

    
22
from errors import OperationError
23

    
24
import logging
25
logger = logging.getLogger('ncclient.rpc')
26

    
27

    
28
class RPCReply:
29
    
30
    'NOTES: memory considerations?? storing both raw xml + ET.Element'
31
    
32
    def __init__(self, raw):
33
        self._raw = raw
34
        self._parsed = False
35
        self._root = None
36
        self._errors = []
37
    
38
    def __repr__(self):
39
        return self._raw
40
    
41
    def _parsing_hook(self, root):
42
        pass
43
    
44
    def parse(self):
45
        if self._parsed:
46
            return
47
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
48
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
49
        ok = content.find(root, 'data', strict=False)
50
        if ok is not None:
51
            logger.debug('parsed [%s]' % ok.tag)
52
        else: # create RPCError objects from <rpc-error> elements
53
            error = content.find(root, 'data', strict=False)
54
            if error is not None:
55
                logger.debug('parsed [%s]' % error.tag)
56
                for err in root.getiterator(error.tag):
57
                    # process a particular <rpc-error>
58
                    d = {}
59
                    for err_detail in err.getchildren(): # <error-type> etc..
60
                        tag = content.unqualify(err_detail.tag)
61
                        if tag != 'error-info':
62
                            d[tag] = err_detail.text.strip()
63
                        else:
64
                            d[tag] = content.ele2xml(err_detail)
65
                    self._errors.append(RPCError(d))
66
        self._parsing_hook(root)
67
        self._parsed = True
68
    
69
    @property
70
    def xml(self):
71
        '<rpc-reply> as returned'
72
        return self._raw
73
    
74
    @property
75
    def ok(self):
76
        if not self._parsed:
77
            self.parse()
78
        return not self._errors # empty list => false
79
    
80
    @property
81
    def error(self):
82
        if not self._parsed:
83
            self.parse()
84
        if self._errors:
85
            return self._errors[0]
86
        else:
87
            return None
88
    
89
    @property
90
    def errors(self):
91
        'List of RPCError objects. Will be empty if no <rpc-error> elements in reply.'
92
        if not self._parsed:
93
            self.parse()
94
        return self._errors
95

    
96

    
97
class RPCError(OperationError): # raise it if you like
98
    
99
    def __init__(self, err_dict):
100
        self._dict = err_dict
101
        if self.message is not None:
102
            OperationError.__init__(self, self.message)
103
        else:
104
            OperationError.__init__(self)
105
    
106
    @property
107
    def type(self):
108
        return self.get('error-type', None)
109
    
110
    @property
111
    def severity(self):
112
        return self.get('error-severity', None)
113
    
114
    @property
115
    def tag(self):
116
        return self.get('error-tag', None)
117
    
118
    @property
119
    def path(self):
120
        return self.get('error-path', None)
121
    
122
    @property
123
    def message(self):
124
        return self.get('error-message', None)
125
    
126
    @property
127
    def info(self):
128
        return self.get('error-info', None)
129

    
130
    ## dictionary interface
131
    
132
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
133
    
134
    __iter__ = lambda self: self._dict.__iter__()
135
    
136
    __contains__ = lambda self, key: self._dict.__contains__(key)
137
    
138
    keys = lambda self: self._dict.keys()
139
    
140
    get = lambda self, key, default: self._dict.get(key, default)
141
        
142
    iteritems = lambda self: self._dict.iteritems()
143
    
144
    iterkeys = lambda self: self._dict.iterkeys()
145
    
146
    itervalues = lambda self: self._dict.itervalues()
147
    
148
    values = lambda self: self._dict.values()
149
    
150
    items = lambda self: self._dict.items()
151
    
152
    __repr__ = lambda self: repr(self._dict)
153

    
154

    
155
class RPCReplyListener(SessionListener):
156
    
157
    # one instance per session
158
    def __new__(cls, session):
159
        instance = session.get_listener_instance(cls)
160
        if instance is None:
161
            instance = object.__new__(cls)
162
            instance._lock = Lock()
163
            instance._id2rpc = WeakValueDictionary()
164
            instance._pipelined = session.can_pipeline
165
            session.add_listener(instance)
166
        return instance
167
    
168
    def register(self, id, rpc):
169
        with self._lock:
170
            self._id2rpc[id] = rpc
171
    
172
    def callback(self, root, raw):
173
        tag, attrs = root
174
        if content.unqualify(tag) != 'rpc-reply':
175
            return
176
        rpc = None
177
        for key in attrs:
178
            if content.unqualify(key) == 'message-id':
179
                id = attrs[key]
180
                try:
181
                    with self._lock:
182
                        rpc = self._id2rpc.pop(id)
183
                except KeyError:
184
                    logger.warning('no object registered for message-id: [%s]' % id)
185
                except Exception as e:
186
                    logger.debug('error - %r' % e)
187
                break
188
        else:
189
            if not self._pipelined:
190
                with self._lock:
191
                    assert(len(self._id2rpc) == 1)
192
                    rpc = self._id2rpc.values()[0]
193
                    self._id2rpc.clear()
194
            else:
195
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
196
        logger.debug('delivering to %r' % rpc)
197
        rpc.deliver(raw)
198
    
199
    def errback(self, err):
200
        for rpc in self._id2rpc.values():
201
            rpc.error(err)
202

    
203

    
204
class RPC(object):
205
    
206
    DEPENDS = []
207
    REPLY_CLS = RPCReply
208
    
209
    def __init__(self, session, async=False, timeout=None):
210
        if not session.can_pipeline:
211
            raise UserWarning('Asynchronous mode not supported for this device/session')
212
        self._session = session
213
        try:
214
            for cap in self.DEPENDS:
215
                self._assert(cap)
216
        except AttributeError:
217
            pass        
218
        self._async = async
219
        self._timeout = timeout
220
        # keeps things simple instead of having a class attr that has to be locked
221
        self._id = uuid1().urn
222
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
223
        self._listener = RPCReplyListener(session)
224
        self._listener.register(self._id, self)
225
        self._reply = None
226
        self._reply_event = Event()
227
    
228
    def _build(self, opspec):
229
        "TODO: docstring"
230
        spec = {
231
            'tag': content.qualify('rpc'),
232
            'attributes': {'message-id': self._id},
233
            'subtree': opspec
234
            }
235
        return content.dtree2xml(spec)
236
    
237
    def _request(self, op):
238
        req = self._build(op)
239
        self._session.send(req)
240
        if self._async:
241
            return (self._reply_event, self._error_event)
242
        else:
243
            self._reply_event.wait(self._timeout)
244
            if self._reply_event.is_set():
245
                if self._error:
246
                    raise self._error
247
                self._reply.parse()
248
                return self._reply
249
            else:
250
                raise ReplyTimeoutError
251
    
252
    def request(self):
253
        return self._request(self.SPEC)
254
    
255
    def _delivery_hook(self):
256
        'For subclasses'
257
        pass
258
    
259
    def _assert(self, capability):
260
        if capability not in self._session.server_capabilities:
261
            raise MissingCapabilityError('Server does not support [%s]' % cap)
262
    
263
    def deliver(self, raw):
264
        self._reply = self.REPLY_CLS(raw)
265
        self._delivery_hook()
266
        self._reply_event.set()
267
    
268
    def error(self, err):
269
        self._error = err
270
        self._reply_event.set()
271
    
272
    @property
273
    def has_reply(self):
274
        return self._reply_event.is_set()
275
    
276
    @property
277
    def reply(self):
278
        return self._reply
279
    
280
    @property
281
    def id(self):
282
        return self._id
283
    
284
    @property
285
    def session(self):
286
        return self._session
287
    
288
    @property
289
    def reply_event(self):
290
        return self._reply_event
291
    
292
    def set_async(self, bool): self._async = bool
293
    async = property(fget=lambda self: self._async, fset=set_async)
294
    
295
    def set_timeout(self, timeout): self._timeout = timeout
296
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)