Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ 4de03d63

History | View | Annotate | Download (8.5 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20

    
21
from errors import OperationError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.rpc')
25

    
26

    
27
class RPCReply:
28
    
29
    'NOTES: memory considerations?? storing both raw xml + ET.Element'
30
    
31
    def __init__(self, raw):
32
        self._raw = raw
33
        self._parsed = False
34
        self._root = None
35
        self._errors = []
36
    
37
    def __repr__(self):
38
        return self._raw
39
    
40
    def parse(self):
41
        if self._parsed:
42
            return
43
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
44
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
45
        ok = content.find(root, 'ok')
46
        if ok is not None:
47
            logger.debug('parsed [%s]' % ok.tag)
48
        else: # create RPCError objects from <rpc-error> elements
49
            error = content.find(root, 'rpc-error')
50
            if error is not None:
51
                logger.debug('parsed [%s]' % error.tag)
52
                for err in root.getiterator(error.tag):
53
                    # process a particular <rpc-error>
54
                    d = {}
55
                    for err_detail in err.getchildren(): # <error-type> etc..
56
                        tag = content.unqualify(err_detail.tag)
57
                        if tag != 'error-info':
58
                            d[tag] = err_detail.text.strip()
59
                        else:
60
                            d[tag] = content.ele2xml(err_detail)
61
                    self._errors.append(RPCError(d))
62
        self._parsing_hook(root)
63
        self._parsed = True
64
    
65
    @property
66
    def xml(self):
67
        '<rpc-reply> as returned'
68
        return self._raw
69
    
70
    @property
71
    def ok(self):
72
        if not self._parsed:
73
            self.parse()
74
        return not self._errors # empty list => false
75
    
76
    @property
77
    def error(self):
78
        if not self._parsed:
79
            self.parse()
80
        if self._errors:
81
            return self._errors[0]
82
        else:
83
            return None
84
    
85
    @property
86
    def errors(self):
87
        'List of RPCError objects. Will be empty if no <rpc-error> elements in reply.'
88
        if not self._parsed:
89
            self.parse()
90
        return self._errors
91

    
92

    
93
class RPCError(OperationError): # raise it if you like
94
    
95
    def __init__(self, err_dict):
96
        self._dict = err_dict
97
        if self.message is not None:
98
            OperationError.__init__(self, self.message)
99
        else:
100
            OperationError.__init__(self)
101
    
102
    @property
103
    def type(self):
104
        return self.get('error-type', None)
105
    
106
    @property
107
    def severity(self):
108
        return self.get('error-severity', None)
109
    
110
    @property
111
    def tag(self):
112
        return self.get('error-tag', None)
113
    
114
    @property
115
    def path(self):
116
        return self.get('error-path', None)
117
    
118
    @property
119
    def message(self):
120
        return self.get('error-message', None)
121
    
122
    @property
123
    def info(self):
124
        return self.get('error-info', None)
125

    
126
    ## dictionary interface
127
    
128
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
129
    
130
    __iter__ = lambda self: self._dict.__iter__()
131
    
132
    __contains__ = lambda self, key: self._dict.__contains__(key)
133
    
134
    keys = lambda self: self._dict.keys()
135
    
136
    get = lambda self, key, default: self._dict.get(key, default)
137
        
138
    iteritems = lambda self: self._dict.iteritems()
139
    
140
    iterkeys = lambda self: self._dict.iterkeys()
141
    
142
    itervalues = lambda self: self._dict.itervalues()
143
    
144
    values = lambda self: self._dict.values()
145
    
146
    items = lambda self: self._dict.items()
147
    
148
    __repr__ = lambda self: repr(self._dict)
149

    
150

    
151
class RPCReplyListener(object):
152
    
153
    # one instance per session
154
    def __new__(cls, session):
155
        instance = session.get_listener_instance(cls)
156
        if instance is None:
157
            instance = object.__new__(cls)
158
            instance._lock = Lock()
159
            instance._id2rpc = WeakValueDictionary()
160
            instance._pipelined = session.can_pipeline
161
            instance._errback = None
162
            session.add_listener(instance)
163
        return instance
164
    
165
    def register(self, id, rpc):
166
        with self._lock:
167
            self._id2rpc[id] = rpc
168

    
169
    def set_errback(self, errback):
170
        self._errback = errback
171
    
172
    def callback(self, root, raw):
173
        tag, attrs = root
174
        if content.unqualify(tag) != 'rpc-reply':
175
            return
176
        rpc = None
177
        for key in attrs:
178
            if content.unqualify(key) == 'message-id':
179
                id = attrs[key]
180
                try:
181
                    with self._lock:
182
                        rpc = self._id2rpc.pop(id)
183
                except KeyError:
184
                    logger.warning('no object registered for message-id: [%s]' % id)
185
                except Exception as e:
186
                    logger.debug('error - %r' % e)
187
                break
188
        else:
189
            if not self._pipelined:
190
                with self._lock:
191
                    assert(len(self._id2rpc) == 1)
192
                    rpc = self._id2rpc.values()[0]
193
                    self._id2rpc.clear()
194
            else:
195
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
196
        logger.debug('delivering to %r' % rpc)
197
        rpc.deliver(raw)
198
    
199
    def errback(self, err):
200
        if self._errback is not None:
201
            self._errback(err)
202

    
203

    
204
class RPC(object):
205
    
206
    DEPENDS = []
207
    REPLY_CLS = RPCReply
208
    
209
    def __init__(self, session, async=False, timeout=None):
210
        if not session.can_pipeline:
211
            raise UserWarning('Asynchronous mode not supported for this device/session')
212
        self._session = session
213
        try:
214
            for cap in self.DEPENDS:
215
                self._assert(cap)
216
        except AttributeError:
217
            pass        
218
        self._async = async
219
        self._timeout = timeout
220
        # keeps things simple instead of having a class attr that has to be locked
221
        self._id = uuid1().urn
222
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
223
        self._listener = RPCReplyListener(session)
224
        self._listener.register(self._id, self)
225
        self._reply = None
226
        self._reply_event = Event()
227
    
228
    def _build(self, opspec, encoding='utf-8'):
229
        "TODO: docstring"
230
        spec = {
231
            'tag': content.qualify('rpc'),
232
            'attributes': {'message-id': self._id},
233
            'subtree': opspec
234
            }
235
        return content.dtree2xml(encoding)
236
    
237
    def _request(self, op):
238
        req = self._build(op)
239
        self._session.send(req)
240
        if self._async:
241
            return self._reply_event
242
        else:
243
            self._reply_event.wait(self._timeout)
244
            if self._reply_event.isSet():
245
                self._reply.parse()
246
                return self._reply
247
            else:
248
                raise ReplyTimeoutError
249
    
250
    def request(self):
251
        return self._request(self.SPEC)
252
    
253
    def _delivery_hook(self):
254
        'For subclasses'
255
        pass
256
    
257
    def _assert(self, capability):
258
        if capability not in self._session.server_capabilities:
259
            raise MissingCapabilityError('Server does not support [%s]' % cap)
260
    
261
    def deliver(self, raw):
262
        self._reply = self.REPLY_CLS(raw)
263
        self._delivery_hook()
264
        self._reply_event.set()
265
    
266
    @property
267
    def has_reply(self):
268
        return self._reply_event.isSet()
269
    
270
    @property
271
    def reply(self):
272
        return self._reply
273
    
274
    @property
275
    def id(self):
276
        return self._id
277
    
278
    @property
279
    def session(self):
280
        return self._session
281
    
282
    @property
283
    def reply_event(self):
284
        return self._reply_event
285
    
286
    def set_async(self, bool): self._async = bool
287
    async = property(fget=lambda self: self._async, fset=set_async)
288
    
289
    def set_timeout(self, timeout): self._timeout = timeout
290
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)