Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 9a9af391

History | View | Annotate | Download (7.9 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient import xml_
19
from ncclient.capabilities import Capabilities
20

    
21
from errors import TransportError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.transport.session')
25

    
26
class Session(Thread):
27

    
28
    "Base class for use by transport protocol implementations."
29

    
30
    def __init__(self, capabilities):
31
        Thread.__init__(self)
32
        self.setDaemon(True)
33
        self._listeners = set() # 3.0's weakset would be ideal
34
        self._lock = Lock()
35
        self.setName('session')
36
        self._q = Queue()
37
        self._client_capabilities = capabilities
38
        self._server_capabilities = None # yet
39
        self._id = None # session-id
40
        self._connected = False # to be set/cleared by subclass implementation
41
        logger.debug('%r created: client_capabilities=%r' %
42
                     (self, self._client_capabilities))
43

    
44
    def _dispatch_message(self, raw):
45
        try:
46
            root = xml_.parse_root(raw)
47
        except Exception as e:
48
            logger.error('error parsing dispatch message: %s' % e)
49
            return
50
        with self._lock:
51
            listeners = list(self._listeners)
52
        for l in listeners:
53
            logger.debug('dispatching message to %r' % l)
54
            l.callback(root, raw) # no try-except; fail loudly if you must!
55
    
56
    def _dispatch_error(self, err):
57
        with self._lock:
58
            listeners = list(self._listeners)
59
        for l in listeners:
60
            logger.debug('dispatching error to %r' % l)
61
            try: # here we can be more considerate with catching exceptions
62
                l.errback(err) 
63
            except Exception as e:
64
                logger.warning('error dispatching to %r: %r' % (l, e))
65

    
66
    def _post_connect(self):
67
        "Greeting stuff"
68
        init_event = Event()
69
        error = [None] # so that err_cb can bind error[0]. just how it is.
70
        # callbacks
71
        def ok_cb(id, capabilities):
72
            self._id = id
73
            self._server_capabilities = capabilities
74
            init_event.set()
75
        def err_cb(err):
76
            error[0] = err
77
            init_event.set()
78
        listener = HelloHandler(ok_cb, err_cb)
79
        self.add_listener(listener)
80
        self.send(HelloHandler.build(self._client_capabilities))
81
        logger.debug('starting main loop')
82
        self.start()
83
        # we expect server's hello message
84
        init_event.wait()
85
        # received hello message or an error happened
86
        self.remove_listener(listener)
87
        if error[0]:
88
            raise error[0]
89
        #if ':base:1.0' not in self.server_capabilities:
90
        #    raise MissingCapabilityError(':base:1.0')
91
        logger.info('initialized: session-id=%s | server_capabilities=%s' %
92
                    (self._id, self._server_capabilities))
93

    
94
    def add_listener(self, listener):
95
        """Register a listener that will be notified of incoming messages and
96
        errors.
97

98
        :type listener: :class:`SessionListener`
99
        """
100
        logger.debug('installing listener %r' % listener)
101
        if not isinstance(listener, SessionListener):
102
            raise SessionError("Listener must be a SessionListener type")
103
        with self._lock:
104
            self._listeners.add(listener)
105

    
106
    def remove_listener(self, listener):
107
        """Unregister some listener; ignore if the listener was never
108
        registered.
109

110
        :type listener: :class:`SessionListener`
111
        """
112
        logger.debug('discarding listener %r' % listener)
113
        with self._lock:
114
            self._listeners.discard(listener)
115

    
116
    def get_listener_instance(self, cls):
117
        """If a listener of the specified type is registered, returns the
118
        instance.
119

120
        :type cls: :class:`SessionListener`
121
        """
122
        with self._lock:
123
            for listener in self._listeners:
124
                if isinstance(listener, cls):
125
                    return listener
126

    
127
    def connect(self, *args, **kwds): # subclass implements
128
        raise NotImplementedError
129

    
130
    def run(self): # subclass implements
131
        raise NotImplementedError
132

    
133
    def send(self, message):
134
        """Send the supplied *message* to NETCONF server.
135

136
        :arg message: an XML document
137

138
        :type message: `string`
139
        """
140
        if not self.connected:
141
            raise TransportError('Not connected to NETCONF server')
142
        logger.debug('queueing %s' % message)
143
        self._q.put(message)
144

    
145
    ### Properties
146

    
147
    @property
148
    def connected(self):
149
        "Connection status of the session."
150
        return self._connected
151

    
152
    @property
153
    def client_capabilities(self):
154
        "Client's :class:`Capabilities`"
155
        return self._client_capabilities
156

    
157
    @property
158
    def server_capabilities(self):
159
        "Server's :class:`Capabilities`"
160
        return self._server_capabilities
161

    
162
    @property
163
    def id(self):
164
        """A `string` representing the `session-id`. If the session has not
165
        been initialized it will be :const:`None`"""
166
        return self._id
167

    
168

    
169
class SessionListener(object):
170

    
171
    """Base class for :class:`Session` listeners, which are notified when a new
172
    NETCONF message is received or an error occurs.
173

174
    .. note::
175
        Avoid time-intensive tasks in a callback's context.
176
    """
177

    
178
    def callback(self, root, raw):
179
        """Called when a new XML document is received. The `root` argument
180
        allows the callback to determine whether it wants to further process the
181
        document.
182

183
        :arg root: tuple of `(tag, attributes)` where `tag` is the qualified name of the root element and `attributes` is a dictionary of its attributes (also qualified names)
184
        :type root: `tuple`
185

186
        :arg raw: XML document
187
        :type raw: `string`
188
        """
189
        raise NotImplementedError
190

    
191
    def errback(self, ex):
192
        """Called when an error occurs.
193

194
        :type ex: :exc:`Exception`
195
        """
196
        raise NotImplementedError
197

    
198

    
199
class HelloHandler(SessionListener):
200

    
201
    def __init__(self, init_cb, error_cb):
202
        self._init_cb = init_cb
203
        self._error_cb = error_cb
204

    
205
    def callback(self, root, raw):
206
        if xml_.unqualify(root[0]) == 'hello':
207
            try:
208
                id, capabilities = HelloHandler.parse(raw)
209
            except Exception as e:
210
                self._error_cb(e)
211
            else:
212
                self._init_cb(id, capabilities)
213

    
214
    def errback(self, err):
215
        self._error_cb(err)
216

    
217
    @staticmethod
218
    def build(capabilities):
219
        "Given a list of capability URI's returns <hello> message XML string"
220
        spec = {
221
            'tag': 'hello',
222
            'attrib': {'xmlns': xml_.BASE_NS_1_0},
223
            'subtree': [{
224
                'tag': 'capabilities',
225
                'subtree': # this is fun :-)
226
                    [{'tag': 'capability', 'text': uri} for uri in capabilities]
227
                }]
228
            }
229
        return xml_.dtree2xml(spec)
230

    
231
    @staticmethod
232
    def parse(raw):
233
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
234
        sid, capabilities = 0, []
235
        root = xml_.xml2ele(raw)
236
        for child in root.getchildren():
237
            tag = xml_.unqualify(child.tag)
238
            if tag == 'session-id':
239
                sid = child.text
240
            elif tag == 'capabilities':
241
                for cap in child.getchildren():
242
                    if xml_.unqualify(cap.tag) == 'capability':
243
                        capabilities.append(cap.text)
244
        return sid, Capabilities(capabilities)