Revision efed7d4c ncclient/operations/rpc.py

b/ncclient/operations/rpc.py
12 12
# See the License for the specific language governing permissions and
13 13
# limitations under the License.
14 14

  
15
'Remote Procedure Call'
16

  
17 15
from threading import Event, Lock
18 16
from uuid import uuid1
17
from weakref import WeakKeyDictionary, WeakValueDictionary
19 18

  
20 19
from listener import get_listener
21 20
from ncclient.content.builders import RPCBuilder
21
from ncclient.content.parsers import RootParser
22
from ncclient.content.common import qualify as _
23
from ncclient.content.common import BASE_NS
22 24

  
23 25
class RPC:
24 26
    
25
    def __init__(self, session, async=False, parse=True):
27
    _listeners = WeakKeyDictionary()
28
    _lock = Lock()
29
    
30
    def __init__(self, session):
26 31
        self._session = session
27
        self._async = async
32
        self._id = None
33
        self._reply = None # RPCReply
34
        self._reply_event = None
35
    
36
    @property
37
    def _listener(self):
38
        with self._lock:
39
            return self._listeners.setdefault(self._session, MessageListener())
40
    
41
    def _response_cb(self, raw):
42
        self._reply = RPCReply(raw)
43
        reply_event.set()
44
    
45
    def _do_request(self, op, reply_event=None):
28 46
        self._id = uuid1().urn
29
        self._reply = None
30
        self._reply_event = Event()
31
        self.listener.register(self._id, self)
32
        session.add_listener(self.listener)
33
    
34
    def _response_cb(self, reply):
35
        self._reply = reply
36
        self._event.set()
37
    
38
    def _do_request(self, op):
39
        self._session.send(RPCBuilder.build(self._id, op))
40
        if not self._async:
47
        # get the listener instance for this session
48
        # <rpc-reply> with message id will reach response_cb
49
        self._listener.register(self._id, self._response_cb)
50
        # only effective the first time, transport.session.Subject internally
51
        # uses a set type for listeners
52
        self._session.add_listener(self._listener)
53
        req = RPCBuilder.build(self._id, op)
54
        self._session.send(req)
55
        if reply_event is not None: # if we were provided an Event to use
56
            self._reply_event = reply_event
57
        else: # otherwise, block till response received and return it
58
            self._reply_event = Event()
41 59
            self._reply_event.wait()
42
        return self._reply
60
            self._reply.parse()
61
            return self._reply
43 62
    
44
    def request(self):
63
    def request(self, *args, **kwds):
45 64
        raise NotImplementedError
46 65
    
47
    def wait_for_reply(self, timeout=None):
48
        self._reply_event.wait(timeout)
49
    
50 66
    @property
51 67
    def has_reply(self):
52
        return self._reply_event.isSet()
53
    
54
    @property
55
    def is_async(self):
56
        return self._async
68
        try:
69
            return self._reply_event.isSet()
70
        except TypeError: # reply_event is None
71
            return False
57 72
    
58 73
    @property
59 74
    def reply(self):
......
64 79
        return self._id
65 80
    
66 81
    @property
67
    def listener(self):
68
        listener = get_listener(self._session)
69

  
70
    @property
71 82
    def session(self):
72 83
        return self._session
73 84

  
74 85
class RPCReply:
75 86
    
76
    class RPCError:
77
        
78
        pass
87
    def __init__(self, raw):
88
        self._raw = raw
89
        self._parsed = False
90
        self._ok = None
91
        self._errs = []
79 92
    
93
    def __str__(self):
94
        return self._raw
80 95

  
96
    @property
97
    def raw(self):
98
        return self._raw
99
    
100
    def parse(self):
101
        #errs = RPCParser.parse(self._raw)
102
        #for raw, err_dict in errs:
103
        #    self._errs.append(RPCError(raw, err_dict))
104
        self._parsed = True
105
    
106
    @property
107
    def parsed(self):
108
        return self._parsed
109
    
110
    @property
111
    def ok(self):
112
        return True if self._parsed and not self._err else False
113
    
114
    @property
115
    def errors(self):
116
        return self._errs
117
    
118
    @property
119
    def raw(self):
120
        return self._raw
121

  
122
class RPCError(Exception): # raise it if you like
123
    
124
    def __init__(self, raw, err_dict):
125
        self._raw = raw
126
        self._dict = err_dict
127

  
128
    def __str__(self):
129
        # TODO
130
        return self._raw
131
    
132
    def __dict__(self):
133
        return self._dict
134
    
135
    @property
136
    def raw(self):
137
        return self._raw
138
    
139
    @property
140
    def type(self):
141
        return self._dict.get('type', None)
142
    
143
    @property
144
    def severity(self):
145
        return self._dict.get('severity', None)
146
    
147
    @property
148
    def tag(self):
149
        return self._dict.get('tag', None)
150
    
151
    @property
152
    def path(self):
153
        return self._dict.get('path', None)
154
    
155
    @property
156
    def message(self):
157
        return self._dict.get('message', None)
158
    
159
    @property
160
    def info(self):
161
        return self._dict.get('info', None)
162

  
163

  
164
class SessionListener:
165
    
166
    '''This is the glue between received data and the object it should be
167
    forwarded to.
168
    '''
169
    
170
    def __init__(self):
171
        # this dictionary takes care of <rpc-reply> elements received
172
        # { 'message-id': callback } dict
173
        self._id2cb = WeakValueDictionary()
174
        # this is a more generic dict takes care of other top-level elements
175
        # that may be received, e.g. <notification>'s
176
        # {'tag': callback} dict
177
        self._tag2cb = WeakValueDictionary() 
178
        # if we receive a SessionCloseError it might not be one we want to act on
179
        self._expecting_close = False
180
        self._errback = None # error event callback
181
        self._lock = Lock()
182
    
183
    def __str__(self):
184
        return 'SessionListener'
185
    
186
    def register(self, msgid, cb):
187
        with self._lock:
188
            self._id2cb[msgid] = cb
189
    
190
    def recognize(self, tag, cb):
191
        with self._lock:
192
            self._tag2cb[tag] = cb
193
    
194
    def expect_close(self):
195
        self._expecting_close = True
196
    
197
    @property
198
    def _recognized_elements(self):
199
        elems = [_('rpc-reply', BASE_NS)]
200
        with self._lock:
201
            elems.extend(self._tag2cb.keys())
202
        return elems
203
    
204
    def reply(self, raw):
205
        tag, attrs = RootParser.parse(raw, self._recognized_elements)
206
        try:
207
            cb = None
208
            if tag == _('rpc-reply', BASE_NS):
209
                try:
210
                    id = attrs[_('message-id', BASE_NS)]
211
                except KeyError:
212
                    logger.warning('<rpc-reply> w/o message-id attr received: %s'
213
                                   % raw)
214
                cb = self._id2cb.get(id, None)
215
            else:
216
                cb = self._tag2cb.get(tag, None)
217
            if cb is not None:
218
                cb(raw)
219
        except Exception as e:
220
            logger.warning('SessionListener.reply: %r' % e)
221
    
222
    def set_errback(self, errback):
223
        self._errback = errback
224
    
225
    def error(self, err):
226
        from ncclient.transport.error import SessionCloseError
227
        act = True
228
        if isinstance(err, SessionCloseError):
229
            logger.debug('session closed, expecting_close=%s' %
230
                         self._expecting_close)
231
            if self._expecting_close:
232
                act = False
233
        if act:
234
            logger.error('SessionListener.error: %r' % err)
235
            if self._errback is not None:
236
                errback(err)

Also available in: Unified diff