Revision efed7d4c

b/ncclient/operations/__init__.py
17 17
import logging
18 18
logger = logging.getLogger('ncclient.operations')
19 19

  
20
#from ncclient.session import CAPABILITIES
21
#
22 20
#from retrieve import Get, GetConfig
23 21
#from edit import EditConfig, DeleteConfig
24
#from session import CloseSession, KillSession
25
#from lock import Lock, Unlock
22
from session import CloseSession, KillSession
23
from lock import Lock, Unlock
26 24
#from notification import CreateSubscription
27
#
28
#__all__ = [
25

  
26
__all__ = [
29 27
#    'Get',
30 28
#    'GetConfig',
31 29
#    'EditConfig',
32 30
#    'DeleteConfig',
33 31
#    'Lock',
34 32
#    'Unlock',
35
#    'CloseSession',
36
#    'KillSession',
33
    'CloseSession',
34
    'KillSession',
37 35
#    'CreateSubscription',
38 36
#    ]
/dev/null
1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

  
15
from weakref import WeakValueDictionary
16

  
17
from ncclient.content.parsers import RootParser
18

  
19
_listeners = WeakValueDictionary()
20

  
21
def get_listener(session):
22
    try:
23
        return _listeners[session]
24
    except KeyError:
25
        _listeners[session] = MessageListener()
26
        return _listeners[session]
27

  
28
class MessageListener:
29
    
30
    def __init__(self):
31
        # {message-id: RPC}
32
        self._rpc = WeakValueDictionary()
33
        # if the session gets closed by remote endpoint,
34
        # need to know if it is an error event or was requested through
35
        # a NETCONF operation i.e. CloseSession
36
        self._expecting_close = False
37
        # other recognized names and behavior on receiving them
38
        self._recognized = []
39
    
40
    def __str__(self):
41
        return 'MessageListener'
42
    
43
    def expect_close(self):
44
        self._expecting_close = True
45
    
46
    def register(self, id, op):
47
        self._id2rpc[id] = op
48
    
49
    ### Events
50
    
51
    def reply(self, raw):
52
        pass
53
    
54
    def error(self, err):
55
        from ncclient.session.session import SessionCloseError
56
        if err is SessionCloseError:
57
            logger.debug('session closed by remote endpoint, expecting_close=%s' %
58
                         self._expecting_close)
59
            if not self._expecting_close:
60
                raise err
b/ncclient/operations/rpc.py
12 12
# See the License for the specific language governing permissions and
13 13
# limitations under the License.
14 14

  
15
'Remote Procedure Call'
16

  
17 15
from threading import Event, Lock
18 16
from uuid import uuid1
17
from weakref import WeakKeyDictionary, WeakValueDictionary
19 18

  
20 19
from listener import get_listener
21 20
from ncclient.content.builders import RPCBuilder
21
from ncclient.content.parsers import RootParser
22
from ncclient.content.common import qualify as _
23
from ncclient.content.common import BASE_NS
22 24

  
23 25
class RPC:
24 26
    
25
    def __init__(self, session, async=False, parse=True):
27
    _listeners = WeakKeyDictionary()
28
    _lock = Lock()
29
    
30
    def __init__(self, session):
26 31
        self._session = session
27
        self._async = async
32
        self._id = None
33
        self._reply = None # RPCReply
34
        self._reply_event = None
35
    
36
    @property
37
    def _listener(self):
38
        with self._lock:
39
            return self._listeners.setdefault(self._session, MessageListener())
40
    
41
    def _response_cb(self, raw):
42
        self._reply = RPCReply(raw)
43
        reply_event.set()
44
    
45
    def _do_request(self, op, reply_event=None):
28 46
        self._id = uuid1().urn
29
        self._reply = None
30
        self._reply_event = Event()
31
        self.listener.register(self._id, self)
32
        session.add_listener(self.listener)
33
    
34
    def _response_cb(self, reply):
35
        self._reply = reply
36
        self._event.set()
37
    
38
    def _do_request(self, op):
39
        self._session.send(RPCBuilder.build(self._id, op))
40
        if not self._async:
47
        # get the listener instance for this session
48
        # <rpc-reply> with message id will reach response_cb
49
        self._listener.register(self._id, self._response_cb)
50
        # only effective the first time, transport.session.Subject internally
51
        # uses a set type for listeners
52
        self._session.add_listener(self._listener)
53
        req = RPCBuilder.build(self._id, op)
54
        self._session.send(req)
55
        if reply_event is not None: # if we were provided an Event to use
56
            self._reply_event = reply_event
57
        else: # otherwise, block till response received and return it
58
            self._reply_event = Event()
41 59
            self._reply_event.wait()
42
        return self._reply
60
            self._reply.parse()
61
            return self._reply
43 62
    
44
    def request(self):
63
    def request(self, *args, **kwds):
45 64
        raise NotImplementedError
46 65
    
47
    def wait_for_reply(self, timeout=None):
48
        self._reply_event.wait(timeout)
49
    
50 66
    @property
51 67
    def has_reply(self):
52
        return self._reply_event.isSet()
53
    
54
    @property
55
    def is_async(self):
56
        return self._async
68
        try:
69
            return self._reply_event.isSet()
70
        except TypeError: # reply_event is None
71
            return False
57 72
    
58 73
    @property
59 74
    def reply(self):
......
64 79
        return self._id
65 80
    
66 81
    @property
67
    def listener(self):
68
        listener = get_listener(self._session)
69

  
70
    @property
71 82
    def session(self):
72 83
        return self._session
73 84

  
74 85
class RPCReply:
75 86
    
76
    class RPCError:
77
        
78
        pass
87
    def __init__(self, raw):
88
        self._raw = raw
89
        self._parsed = False
90
        self._ok = None
91
        self._errs = []
79 92
    
93
    def __str__(self):
94
        return self._raw
80 95

  
96
    @property
97
    def raw(self):
98
        return self._raw
99
    
100
    def parse(self):
101
        #errs = RPCParser.parse(self._raw)
102
        #for raw, err_dict in errs:
103
        #    self._errs.append(RPCError(raw, err_dict))
104
        self._parsed = True
105
    
106
    @property
107
    def parsed(self):
108
        return self._parsed
109
    
110
    @property
111
    def ok(self):
112
        return True if self._parsed and not self._err else False
113
    
114
    @property
115
    def errors(self):
116
        return self._errs
117
    
118
    @property
119
    def raw(self):
120
        return self._raw
121

  
122
class RPCError(Exception): # raise it if you like
123
    
124
    def __init__(self, raw, err_dict):
125
        self._raw = raw
126
        self._dict = err_dict
127

  
128
    def __str__(self):
129
        # TODO
130
        return self._raw
131
    
132
    def __dict__(self):
133
        return self._dict
134
    
135
    @property
136
    def raw(self):
137
        return self._raw
138
    
139
    @property
140
    def type(self):
141
        return self._dict.get('type', None)
142
    
143
    @property
144
    def severity(self):
145
        return self._dict.get('severity', None)
146
    
147
    @property
148
    def tag(self):
149
        return self._dict.get('tag', None)
150
    
151
    @property
152
    def path(self):
153
        return self._dict.get('path', None)
154
    
155
    @property
156
    def message(self):
157
        return self._dict.get('message', None)
158
    
159
    @property
160
    def info(self):
161
        return self._dict.get('info', None)
162

  
163

  
164
class SessionListener:
165
    
166
    '''This is the glue between received data and the object it should be
167
    forwarded to.
168
    '''
169
    
170
    def __init__(self):
171
        # this dictionary takes care of <rpc-reply> elements received
172
        # { 'message-id': callback } dict
173
        self._id2cb = WeakValueDictionary()
174
        # this is a more generic dict takes care of other top-level elements
175
        # that may be received, e.g. <notification>'s
176
        # {'tag': callback} dict
177
        self._tag2cb = WeakValueDictionary() 
178
        # if we receive a SessionCloseError it might not be one we want to act on
179
        self._expecting_close = False
180
        self._errback = None # error event callback
181
        self._lock = Lock()
182
    
183
    def __str__(self):
184
        return 'SessionListener'
185
    
186
    def register(self, msgid, cb):
187
        with self._lock:
188
            self._id2cb[msgid] = cb
189
    
190
    def recognize(self, tag, cb):
191
        with self._lock:
192
            self._tag2cb[tag] = cb
193
    
194
    def expect_close(self):
195
        self._expecting_close = True
196
    
197
    @property
198
    def _recognized_elements(self):
199
        elems = [_('rpc-reply', BASE_NS)]
200
        with self._lock:
201
            elems.extend(self._tag2cb.keys())
202
        return elems
203
    
204
    def reply(self, raw):
205
        tag, attrs = RootParser.parse(raw, self._recognized_elements)
206
        try:
207
            cb = None
208
            if tag == _('rpc-reply', BASE_NS):
209
                try:
210
                    id = attrs[_('message-id', BASE_NS)]
211
                except KeyError:
212
                    logger.warning('<rpc-reply> w/o message-id attr received: %s'
213
                                   % raw)
214
                cb = self._id2cb.get(id, None)
215
            else:
216
                cb = self._tag2cb.get(tag, None)
217
            if cb is not None:
218
                cb(raw)
219
        except Exception as e:
220
            logger.warning('SessionListener.reply: %r' % e)
221
    
222
    def set_errback(self, errback):
223
        self._errback = errback
224
    
225
    def error(self, err):
226
        from ncclient.transport.error import SessionCloseError
227
        act = True
228
        if isinstance(err, SessionCloseError):
229
            logger.debug('session closed, expecting_close=%s' %
230
                         self._expecting_close)
231
            if self._expecting_close:
232
                act = False
233
        if act:
234
            logger.error('SessionListener.error: %r' % err)
235
            if self._errback is not None:
236
                errback(err)
b/ncclient/operations/session.py
14 14

  
15 15
'Session-related NETCONF operations'
16 16

  
17
from ncclient.content.parsers import RPCParser
18
from rpc import RPC
19

  
20

  
17 21
class CloseSession(RPC):
18
    pass
22
    
23
    def __init__(self):
24
        RPC.__init__(self)
25
        self.spec = { 'tag': 'close-session' }
26
    
27
    def _response_cb(self, reply):
28
        RPC._response_cb(self, reply)
29
        if RPCParser.parse_ok(reply):
30
            self._listener.expect_close()
31
        self._session.close()
32
    
33
    def request(self, *args, **kwds):
34
        self._do_request(spec, *args, **kwds)
35

  
19 36

  
20 37
class KillSession(RPC):
21
    pass
38
    
39
    def __init__(self):
40
        RPC.__init__(self)
41
        self.spec = {
42
            'tag': 'kill-session',
43
            'children': [ { 'tag': 'session-id', 'text': None} ]
44
            }
45
    
46
    def request(self, session_id, reply_event=None):
47
        if not isinstance(session_id, basestring): # just make sure...
48
            session_id = str(session_id)
49
        self.spec['children'][0]['text'] = session_id
50
        self._do_request(self.spec, reply_event)

Also available in: Unified diff