Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 564bee4f

History | View | Annotate | Download (8 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient import xml_
19
from ncclient.capabilities import Capabilities
20

    
21
from errors import TransportError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.transport.session')
25

    
26
class Session(Thread):
27

    
28
    "Base class for use by transport protocol implementations."
29

    
30
    def __init__(self, capabilities):
31
        Thread.__init__(self)
32
        self.setDaemon(True)
33
        self._listeners = set() # 3.0's weakset would be ideal
34
        self._lock = Lock()
35
        self.setName('session')
36
        self._q = Queue()
37
        self._client_capabilities = capabilities
38
        self._server_capabilities = None # yet
39
        self._id = None # session-id
40
        self._connected = False # to be set/cleared by subclass implementation
41
        logger.debug('%r created: client_capabilities=%r' %
42
                     (self, self._client_capabilities))
43

    
44
    def _dispatch_message(self, raw):
45
        try:
46
            root = xml_.parse_root(raw)
47
        except Exception as e:
48
            logger.error('error parsing dispatch message: %s' % e)
49
            return
50
        with self._lock:
51
            listeners = list(self._listeners)
52
        for l in listeners:
53
            logger.debug('dispatching message to %r' % l)
54
            try:
55
                l.callback(root, raw)
56
            except Exception as e:
57
                logger.warning('[error] %r' % e)
58

    
59
    def _dispatch_error(self, err):
60
        with self._lock:
61
            listeners = list(self._listeners)
62
        for l in listeners:
63
            logger.debug('dispatching error to %r' % l)
64
            try:
65
                l.errback(err)
66
            except Exception as e:
67
                logger.warning('error dispatching to %r: %r' % (l, e))
68

    
69
    def _post_connect(self):
70
        "Greeting stuff"
71
        init_event = Event()
72
        error = [None] # so that err_cb can bind error[0]. just how it is.
73
        # callbacks
74
        def ok_cb(id, capabilities):
75
            self._id = id
76
            self._server_capabilities = capabilities
77
            init_event.set()
78
        def err_cb(err):
79
            error[0] = err
80
            init_event.set()
81
        listener = HelloHandler(ok_cb, err_cb)
82
        self.add_listener(listener)
83
        self.send(HelloHandler.build(self._client_capabilities))
84
        logger.debug('starting main loop')
85
        self.start()
86
        # we expect server's hello message
87
        init_event.wait()
88
        # received hello message or an error happened
89
        self.remove_listener(listener)
90
        if error[0]:
91
            raise error[0]
92
        #if ':base:1.0' not in self.server_capabilities:
93
        #    raise MissingCapabilityError(':base:1.0')
94
        logger.info('initialized: session-id=%s | server_capabilities=%s' %
95
                    (self._id, self._server_capabilities))
96

    
97
    def add_listener(self, listener):
98
        """Register a listener that will be notified of incoming messages and
99
        errors.
100

101
        :type listener: :class:`SessionListener`
102
        """
103
        logger.debug('installing listener %r' % listener)
104
        if not isinstance(listener, SessionListener):
105
            raise SessionError("Listener must be a SessionListener type")
106
        with self._lock:
107
            self._listeners.add(listener)
108

    
109
    def remove_listener(self, listener):
110
        """Unregister some listener; ignore if the listener was never
111
        registered.
112

113
        :type listener: :class:`SessionListener`
114
        """
115
        logger.debug('discarding listener %r' % listener)
116
        with self._lock:
117
            self._listeners.discard(listener)
118

    
119
    def get_listener_instance(self, cls):
120
        """If a listener of the specified type is registered, returns the
121
        instance.
122

123
        :type cls: :class:`SessionListener`
124
        """
125
        with self._lock:
126
            for listener in self._listeners:
127
                if isinstance(listener, cls):
128
                    return listener
129

    
130
    def connect(self, *args, **kwds): # subclass implements
131
        raise NotImplementedError
132

    
133
    def run(self): # subclass implements
134
        raise NotImplementedError
135

    
136
    def send(self, message):
137
        """Send the supplied *message* to NETCONF server.
138

139
        :arg message: an XML document
140

141
        :type message: `string`
142
        """
143
        if not self.connected:
144
            raise TransportError('Not connected to NETCONF server')
145
        logger.debug('queueing %s' % message)
146
        self._q.put(message)
147

    
148
    ### Properties
149

    
150
    @property
151
    def connected(self):
152
        "Connection status of the session."
153
        return self._connected
154

    
155
    @property
156
    def client_capabilities(self):
157
        "Client's :class:`Capabilities`"
158
        return self._client_capabilities
159

    
160
    @property
161
    def server_capabilities(self):
162
        "Server's :class:`Capabilities`"
163
        return self._server_capabilities
164

    
165
    @property
166
    def id(self):
167
        """A `string` representing the `session-id`. If the session has not
168
        been initialized it will be :const:`None`"""
169
        return self._id
170

    
171
    @property
172
    def can_pipeline(self):
173
        "Whether this session supports pipelining"
174
        return True
175

    
176

    
177
class SessionListener(object):
178

    
179
    """Base class for :class:`Session` listeners, which are notified when a new
180
    NETCONF message is received or an error occurs.
181

182
    .. note::
183
        Avoid time-intensive tasks in a callback's context.
184
    """
185

    
186
    def callback(self, root, raw):
187
        """Called when a new XML document is received. The `root` argument
188
        allows the callback to determine whether it wants to further process the
189
        document.
190

191
        :arg root: tuple of `(tag, attributes)` where `tag` is the qualified name of the root element and `attributes` is a dictionary of its attributes (also qualified names)
192
        :type root: `tuple`
193

194
        :arg raw: XML document
195
        :type raw: `string`
196
        """
197
        raise NotImplementedError
198

    
199
    def errback(self, ex):
200
        """Called when an error occurs.
201

202
        :type ex: :exc:`Exception`
203
        """
204
        raise NotImplementedError
205

    
206

    
207
class HelloHandler(SessionListener):
208

    
209
    def __init__(self, init_cb, error_cb):
210
        self._init_cb = init_cb
211
        self._error_cb = error_cb
212

    
213
    def callback(self, root, raw):
214
        if xml_.unqualify(root[0]) == 'hello':
215
            try:
216
                id, capabilities = HelloHandler.parse(raw)
217
            except Exception as e:
218
                self._error_cb(e)
219
            else:
220
                self._init_cb(id, capabilities)
221

    
222
    def errback(self, err):
223
        self._error_cb(err)
224

    
225
    @staticmethod
226
    def build(capabilities):
227
        "Given a list of capability URI's returns <hello> message XML string"
228
        spec = {
229
            'tag': 'hello',
230
            'attrib': {'xmlns': xml_.BASE_NS},
231
            'subtree': [{
232
                'tag': 'capabilities',
233
                'subtree': # this is fun :-)
234
                    [{'tag': 'capability', 'text': uri} for uri in capabilities]
235
                }]
236
            }
237
        return xml_.dtree2xml(spec)
238

    
239
    @staticmethod
240
    def parse(raw):
241
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
242
        sid, capabilities = 0, []
243
        root = xml_.xml2ele(raw)
244
        for child in root.getchildren():
245
            tag = xml_.unqualify(child.tag)
246
            if tag == 'session-id':
247
                sid = child.text
248
            elif tag == 'capabilities':
249
                for cap in child.getchildren():
250
                    if xml_.unqualify(cap.tag) == 'capability':
251
                        capabilities.append(cap.text)
252
        return sid, Capabilities(capabilities)