Statistics
| Branch: | Tag: | Revision:

root / ncclient / operations / rpc.py @ e52e8478

History | View | Annotate | Download (8.5 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from threading import Event, Lock
16
from uuid import uuid1
17
from weakref import WeakValueDictionary
18

    
19
from ncclient import content
20

    
21
from errors import OperationError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.rpc')
25

    
26

    
27
class RPCReply:
28
    
29
    'NOTES: memory considerations?? storing both raw xml + ET.Element'
30
    
31
    def __init__(self, raw):
32
        self._raw = raw
33
        self._parsed = False
34
        self._root = None
35
        self._errors = []
36
    
37
    def __repr__(self):
38
        return self._raw
39
    
40
    def _parsing_hook(self, root):
41
        pass
42
    
43
    def parse(self):
44
        if self._parsed:
45
            return
46
        root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
47
        # per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
48
        ok = content.find(root, 'ok')
49
        if ok is not None:
50
            logger.debug('parsed [%s]' % ok.tag)
51
        else: # create RPCError objects from <rpc-error> elements
52
            error = content.find(root, 'rpc-error')
53
            if error is not None:
54
                logger.debug('parsed [%s]' % error.tag)
55
                for err in root.getiterator(error.tag):
56
                    # process a particular <rpc-error>
57
                    d = {}
58
                    for err_detail in err.getchildren(): # <error-type> etc..
59
                        tag = content.unqualify(err_detail.tag)
60
                        if tag != 'error-info':
61
                            d[tag] = err_detail.text.strip()
62
                        else:
63
                            d[tag] = content.ele2xml(err_detail)
64
                    self._errors.append(RPCError(d))
65
        self._parsing_hook(root)
66
        self._parsed = True
67
    
68
    @property
69
    def xml(self):
70
        '<rpc-reply> as returned'
71
        return self._raw
72
    
73
    @property
74
    def ok(self):
75
        if not self._parsed:
76
            self.parse()
77
        return not self._errors # empty list => false
78
    
79
    @property
80
    def error(self):
81
        if not self._parsed:
82
            self.parse()
83
        if self._errors:
84
            return self._errors[0]
85
        else:
86
            return None
87
    
88
    @property
89
    def errors(self):
90
        'List of RPCError objects. Will be empty if no <rpc-error> elements in reply.'
91
        if not self._parsed:
92
            self.parse()
93
        return self._errors
94

    
95

    
96
class RPCError(OperationError): # raise it if you like
97
    
98
    def __init__(self, err_dict):
99
        self._dict = err_dict
100
        if self.message is not None:
101
            OperationError.__init__(self, self.message)
102
        else:
103
            OperationError.__init__(self)
104
    
105
    @property
106
    def type(self):
107
        return self.get('error-type', None)
108
    
109
    @property
110
    def severity(self):
111
        return self.get('error-severity', None)
112
    
113
    @property
114
    def tag(self):
115
        return self.get('error-tag', None)
116
    
117
    @property
118
    def path(self):
119
        return self.get('error-path', None)
120
    
121
    @property
122
    def message(self):
123
        return self.get('error-message', None)
124
    
125
    @property
126
    def info(self):
127
        return self.get('error-info', None)
128

    
129
    ## dictionary interface
130
    
131
    __getitem__ = lambda self, key: self._dict.__getitem__(key)
132
    
133
    __iter__ = lambda self: self._dict.__iter__()
134
    
135
    __contains__ = lambda self, key: self._dict.__contains__(key)
136
    
137
    keys = lambda self: self._dict.keys()
138
    
139
    get = lambda self, key, default: self._dict.get(key, default)
140
        
141
    iteritems = lambda self: self._dict.iteritems()
142
    
143
    iterkeys = lambda self: self._dict.iterkeys()
144
    
145
    itervalues = lambda self: self._dict.itervalues()
146
    
147
    values = lambda self: self._dict.values()
148
    
149
    items = lambda self: self._dict.items()
150
    
151
    __repr__ = lambda self: repr(self._dict)
152

    
153

    
154
class RPCReplyListener(object):
155
    
156
    # one instance per session
157
    def __new__(cls, session):
158
        instance = session.get_listener_instance(cls)
159
        if instance is None:
160
            instance = object.__new__(cls)
161
            instance._lock = Lock()
162
            instance._id2rpc = WeakValueDictionary()
163
            instance._pipelined = session.can_pipeline
164
            instance._errback = None
165
            session.add_listener(instance)
166
        return instance
167
    
168
    def register(self, id, rpc):
169
        with self._lock:
170
            self._id2rpc[id] = rpc
171

    
172
    def set_errback(self, errback):
173
        self._errback = errback
174
    
175
    def callback(self, root, raw):
176
        tag, attrs = root
177
        if content.unqualify(tag) != 'rpc-reply':
178
            return
179
        rpc = None
180
        for key in attrs:
181
            if content.unqualify(key) == 'message-id':
182
                id = attrs[key]
183
                try:
184
                    with self._lock:
185
                        rpc = self._id2rpc.pop(id)
186
                except KeyError:
187
                    logger.warning('no object registered for message-id: [%s]' % id)
188
                except Exception as e:
189
                    logger.debug('error - %r' % e)
190
                break
191
        else:
192
            if not self._pipelined:
193
                with self._lock:
194
                    assert(len(self._id2rpc) == 1)
195
                    rpc = self._id2rpc.values()[0]
196
                    self._id2rpc.clear()
197
            else:
198
                logger.warning('<rpc-reply> without message-id received: %s' % raw)
199
        logger.debug('delivering to %r' % rpc)
200
        rpc.deliver(raw)
201
    
202
    def errback(self, err):
203
        if self._errback is not None:
204
            self._errback(err)
205

    
206

    
207
class RPC(object):
208
    
209
    DEPENDS = []
210
    REPLY_CLS = RPCReply
211
    
212
    def __init__(self, session, async=False, timeout=None):
213
        if not session.can_pipeline:
214
            raise UserWarning('Asynchronous mode not supported for this device/session')
215
        self._session = session
216
        try:
217
            for cap in self.DEPENDS:
218
                self._assert(cap)
219
        except AttributeError:
220
            pass        
221
        self._async = async
222
        self._timeout = timeout
223
        # keeps things simple instead of having a class attr that has to be locked
224
        self._id = uuid1().urn
225
        # RPCReplyListener itself makes sure there isn't more than one instance -- i.e. multiton
226
        self._listener = RPCReplyListener(session)
227
        self._listener.register(self._id, self)
228
        self._reply = None
229
        self._reply_event = Event()
230
    
231
    def _build(self, opspec):
232
        "TODO: docstring"
233
        spec = {
234
            'tag': content.qualify('rpc'),
235
            'attributes': {'message-id': self._id},
236
            'subtree': opspec
237
            }
238
        return content.dtree2xml(spec)
239
    
240
    def _request(self, op):
241
        req = self._build(op)
242
        self._session.send(req)
243
        if self._async:
244
            return self._reply_event
245
        else:
246
            self._reply_event.wait(self._timeout)
247
            if self._reply_event.isSet():
248
                self._reply.parse()
249
                return self._reply
250
            else:
251
                raise ReplyTimeoutError
252
    
253
    def request(self):
254
        return self._request(self.SPEC)
255
    
256
    def _delivery_hook(self):
257
        'For subclasses'
258
        pass
259
    
260
    def _assert(self, capability):
261
        if capability not in self._session.server_capabilities:
262
            raise MissingCapabilityError('Server does not support [%s]' % cap)
263
    
264
    def deliver(self, raw):
265
        self._reply = self.REPLY_CLS(raw)
266
        self._delivery_hook()
267
        self._reply_event.set()
268
    
269
    @property
270
    def has_reply(self):
271
        return self._reply_event.isSet()
272
    
273
    @property
274
    def reply(self):
275
        return self._reply
276
    
277
    @property
278
    def id(self):
279
        return self._id
280
    
281
    @property
282
    def session(self):
283
        return self._session
284
    
285
    @property
286
    def reply_event(self):
287
        return self._reply_event
288
    
289
    def set_async(self, bool): self._async = bool
290
    async = property(fget=lambda self: self._async, fset=set_async)
291
    
292
    def set_timeout(self, timeout): self._timeout = timeout
293
    timeout = property(fget=lambda self: self._timeout, fset=set_timeout)