Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ a14c36f9

History | View | Annotate | Download (8.4 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
import ncclient.content
20

    
21
from reply import RPCReply
22

    
23
import logging
24
logger = logging.getLogger('ncclient.rpc')
25

    
26

    
27
class RPC(object):
28
    
29
    DEPENDS = []
30
    REPLY_CLS = RPCReply
31
    
32
    def __init__(self, session, async=False, timeout=None):
33
        if not session.can_pipeline:
34
            raise UserWarning('Asynchronous mode not supported for this device/session')
35
        self._session = session
36
        try:
37
            for cap in self.DEPENDS:
38
                self._assert(cap)
39
        except AttributeError:
40
            pass        
41
        self._async = async
42
        self._timeout = timeout
43
        self._id = uuid1().urn
44
        self._listener = RPCReplyListener(session)
45
        self._listener.register(self._id, self)
46
        self._reply = None
47
        self._reply_event = Event()
48
    
49
    def _build(self, opspec, encoding='utf-8'):
50
        "TODO: docstring"
51
        spec = {
52
            'tag': content.qualify('rpc'),
53
            'attributes': {'message-id': self._id},
54
            'subtree': opspec
55
            }
56
        return XMLConverter(spec).to_string(encoding)
57
    
58
    def _request(self, op):
59
        req = self._build(op)
60
        self._session.send(req)
61
        if self._async:
62
            return self._reply_event
63
        else:
64
            self._reply_event.wait(self._timeout)
65
            if self._reply_event.isSet():
66
                self._reply.parse()
67
                return self._reply
68
            else:
69
                raise ReplyTimeoutError
70
    
71
    def request(self):
72
        return self._request(self.SPEC)
73
    
74
    def _delivery_hook(self):
75
        'For subclasses'
76
        pass
77
    
78
    def _assert(self, capability):
79
        if capability not in self._session.server_capabilities:
80
            raise MissingCapabilityError('Server does not support [%s]' % cap)
81
    
82
    def deliver(self, raw):
83
        self._reply = self.REPLY_CLS(raw)
84
        self._delivery_hook()
85
        self._reply_event.set()
86
    
87
    @property
88
    def has_reply(self):
89
        return self._reply_event.isSet()
90
    
91
    @property
92
    def reply(self):
93
        return self._reply
94
    
95
    @property
96
    def id(self):
97
        return self._id
98
    
99
    @property
100
    def session(self):
101
        return self._session
102
    
103
    @property
104
    def reply_event(self):
105
        return self._reply_event
106
    
107
    def set_async(self, bool): self._async = bool
108
    async = property(fget=lambda self: self._async, fset=set_async)
109
    
110
    def set_timeout(self, timeout): self._timeout = timeout
111
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)
112

    
113

    
114
class RPCReply:
115
    
116
    'NOTES: memory considerations?? storing both raw xml + ET.Element'
117
    
118
    def __init__(self, raw):
119
        self._raw = raw
120
        self._parsed = False
121
        self._root = None
122
        self._errors = []
123
    
124
    def __repr__(self):
125
        return self._raw
126
    
127
    def parse(self):
128
        if self._parsed: return
129
        root = self._root = content.to_element(self._raw) # <rpc-reply> element
130
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
131
        ok = content.namespaced_find(root, 'ok')
132
        if ok is not None:
133
            logger.debug('parsed [%s]' % ok.tag)
134
        else: # create RPCError objects from <rpc-error> elements
135
            error = content.namespaced_find(root, 'rpc-error')
136
            if error is not None:
137
                logger.debug('parsed [%s]' % error.tag)
138
                for err in root.getiterator(error.tag):
139
                    # process a particular <rpc-error>
140
                    d = {}
141
                    for err_detail in err.getchildren(): # <error-type> etc..
142
                        tag = content.unqualify(err_detail.tag)
143
                        d[tag] = (err_detail.text.strip() if tag != 'error-info'
144
                                  else content.ele2string(err_detail, 'utf-8'))
145
                    self._errors.append(RPCError(d))
146
        self._parsing_hook(root)
147
        self._parsed = True
148
    
149
    @property
150
    def xml(self):
151
        '<rpc-reply> as returned'
152
        return self._raw
153
    
154
    @property
155
    def ok(self):
156
        if not self._parsed:
157
            self.parse()
158
        return not self._errors # empty list => false
159
    
160
    @property
161
    def error(self):
162
        if not self._parsed:
163
            self.parse()
164
        if self._errors:
165
            return self._errors[0]
166
        else:
167
            return None
168
    
169
    @property
170
    def errors(self):
171
        'List of RPCError objects. Will be empty if no <rpc-error> elements in reply.'
172
        if not self._parsed:
173
            self.parse()
174
        return self._errors
175

    
176

    
177
class RPCError(ncclient.RPCError): # raise it if you like
178
    
179
    def __init__(self, err_dict):
180
        self._dict = err_dict
181
        if self.message is not None:
182
            ncclient.RPCError.__init__(self, self.message)
183
        else:
184
            ncclient.RPCError.__init__(self)
185
    
186
    @property
187
    def raw(self):
188
        return self._element.tostring()
189
    
190
    @property
191
    def type(self):
192
        return self.get('error-type', None)
193
    
194
    @property
195
    def severity(self):
196
        return self.get('error-severity', None)
197
    
198
    @property
199
    def tag(self):
200
        return self.get('error-tag', None)
201
    
202
    @property
203
    def path(self):
204
        return self.get('error-path', None)
205
    
206
    @property
207
    def message(self):
208
        return self.get('error-message', None)
209
    
210
    @property
211
    def info(self):
212
        return self.get('error-info', None)
213

    
214
    ## dictionary interface
215
    
216
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
217
    
218
    __iter__ = lambda self: self._dict.__iter__()
219
    
220
    __contains__ = lambda self, key: self._dict.__contains__(key)
221
    
222
    keys = lambda self: self._dict.keys()
223
    
224
    get = lambda self, key, default: self._dict.get(key, default)
225
        
226
    iteritems = lambda self: self._dict.iteritems()
227
    
228
    iterkeys = lambda self: self._dict.iterkeys()
229
    
230
    itervalues = lambda self: self._dict.itervalues()
231
    
232
    values = lambda self: self._dict.values()
233
    
234
    items = lambda self: self._dict.items()
235
    
236
    __repr__ = lambda self: repr(self._dict)
237

    
238

    
239
class RPCReplyListener:
240
    
241
    # one instance per session
242
    def __new__(cls, session):
243
        instance = session.get_listener_instance(cls)
244
        if instance is None:
245
            instance = object.__new__(cls)
246
            instance._lock = Lock()
247
            instance._id2rpc = WeakValueDictionary()
248
            instance._pipelined = session.can_pipeline
249
            instance._errback = None
250
            session.add_listener(instance)
251
        return instance
252
    
253
    def register(self, id, rpc):
254
        with self._lock:
255
            self._id2rpc[id] = rpc
256

    
257
    def set_errback(self, errback):
258
        self._errback = errback
259
    
260
    def callback(self, root, raw):
261
        tag, attrs = root
262
        if content.unqualify(tag) != 'rpc-reply':
263
            return
264
        rpc = None
265
        for key in attrs:
266
            if content.unqualify(key) == 'message-id':
267
                id = attrs[key]
268
                try:
269
                    with self._lock:
270
                        rpc = self._id2rpc.pop(id)
271
                except KeyError:
272
                    logger.warning('no object registered for message-id: [%s]' % id)
273
                except Exception as e:
274
                    logger.debug('error - %r' % e)
275
                break
276
        else:
277
            if not self._pipelined:
278
                with self._lock:
279
                    assert(len(self._id2rpc) == 1)
280
                    rpc = self._id2rpc.values()[0]
281
                    self._id2rpc.clear()
282
            else:
283
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
284
        logger.debug('delivering to %r' % rpc)
285
        rpc.deliver(raw)
286
    
287
    def errback(self, err):
288
        if self._errback is not None:
289
            self._errback(err)