Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 6e571704

History | View | Annotate | Download (7.6 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient.xml_ import *
19
from ncclient.capabilities import Capabilities
20

    
21
from errors import TransportError
22

    
23
import logging
24
logger = logging.getLogger('ncclient.transport.session')
25

    
26
class Session(Thread):
27

    
28
    "Base class for use by transport protocol implementations."
29

    
30
    def __init__(self, capabilities):
31
        Thread.__init__(self)
32
        self.setDaemon(True)
33
        self._listeners = set()
34
        self._lock = Lock()
35
        self.setName('session')
36
        self._q = Queue()
37
        self._client_capabilities = capabilities
38
        self._server_capabilities = None # yet
39
        self._id = None # session-id
40
        self._connected = False # to be set/cleared by subclass implementation
41
        logger.debug('%r created: client_capabilities=%r' %
42
                     (self, self._client_capabilities))
43

    
44
    def _dispatch_message(self, raw):
45
        try:
46
            root = parse_root(raw)
47
        except Exception as e:
48
            logger.error('error parsing dispatch message: %s' % e)
49
            return
50
        with self._lock:
51
            listeners = list(self._listeners)
52
        for l in listeners:
53
            logger.debug('dispatching message to %r: %s' % (l, raw))
54
            l.callback(root, raw) # no try-except; fail loudly if you must!
55
    
56
    def _dispatch_error(self, err):
57
        with self._lock:
58
            listeners = list(self._listeners)
59
        for l in listeners:
60
            logger.debug('dispatching error to %r' % l)
61
            try: # here we can be more considerate with catching exceptions
62
                l.errback(err) 
63
            except Exception as e:
64
                logger.warning('error dispatching to %r: %r' % (l, e))
65

    
66
    def _post_connect(self):
67
        "Greeting stuff"
68
        init_event = Event()
69
        error = [None] # so that err_cb can bind error[0]. just how it is.
70
        # callbacks
71
        def ok_cb(id, capabilities):
72
            self._id = id
73
            self._server_capabilities = capabilities
74
            init_event.set()
75
        def err_cb(err):
76
            error[0] = err
77
            init_event.set()
78
        listener = HelloHandler(ok_cb, err_cb)
79
        self.add_listener(listener)
80
        self.send(HelloHandler.build(self._client_capabilities))
81
        logger.debug('starting main loop')
82
        self.start()
83
        # we expect server's hello message
84
        init_event.wait()
85
        # received hello message or an error happened
86
        self.remove_listener(listener)
87
        if error[0]:
88
            raise error[0]
89
        #if ':base:1.0' not in self.server_capabilities:
90
        #    raise MissingCapabilityError(':base:1.0')
91
        logger.info('initialized: session-id=%s | server_capabilities=%s' %
92
                    (self._id, self._server_capabilities))
93

    
94
    def add_listener(self, listener):
95
        """Register a listener that will be notified of incoming messages and
96
        errors.
97

98
        :type listener: :class:`SessionListener`
99
        """
100
        logger.debug('installing listener %r' % listener)
101
        if not isinstance(listener, SessionListener):
102
            raise SessionError("Listener must be a SessionListener type")
103
        with self._lock:
104
            self._listeners.add(listener)
105

    
106
    def remove_listener(self, listener):
107
        """Unregister some listener; ignore if the listener was never
108
        registered.
109

110
        :type listener: :class:`SessionListener`
111
        """
112
        logger.debug('discarding listener %r' % listener)
113
        with self._lock:
114
            self._listeners.discard(listener)
115

    
116
    def get_listener_instance(self, cls):
117
        """If a listener of the specified type is registered, returns the
118
        instance.
119

120
        :type cls: :class:`SessionListener`
121
        """
122
        with self._lock:
123
            for listener in self._listeners:
124
                if isinstance(listener, cls):
125
                    return listener
126

    
127
    def connect(self, *args, **kwds): # subclass implements
128
        raise NotImplementedError
129

    
130
    def run(self): # subclass implements
131
        raise NotImplementedError
132

    
133
    def send(self, message):
134
        """Send the supplied *message* (xml string) to NETCONF server."""
135
        if not self.connected:
136
            raise TransportError('Not connected to NETCONF server')
137
        logger.debug('queueing %s' % message)
138
        self._q.put(message)
139

    
140
    ### Properties
141

    
142
    @property
143
    def connected(self):
144
        "Connection status of the session."
145
        return self._connected
146

    
147
    @property
148
    def client_capabilities(self):
149
        "Client's :class:`Capabilities`"
150
        return self._client_capabilities
151

    
152
    @property
153
    def server_capabilities(self):
154
        "Server's :class:`Capabilities`"
155
        return self._server_capabilities
156

    
157
    @property
158
    def id(self):
159
        """A string representing the `session-id`. If the session has not been initialized it will be `None`"""
160
        return self._id
161

    
162

    
163
class SessionListener(object):
164

    
165
    """Base class for :class:`Session` listeners, which are notified when a new
166
    NETCONF message is received or an error occurs.
167

168
    .. note::
169
        Avoid time-intensive tasks in a callback's context.
170
    """
171

    
172
    def callback(self, root, raw):
173
        """Called when a new XML document is received. The *root* argument allows the callback to determine whether it wants to further process the document.
174

175
        Here, *root* is a tuple of *(tag, attributes)* where *tag* is the qualified name of the root element and *attributes* is a dictionary of its attributes (also qualified names).
176

177
        *raw* will contain the XML document as a string.
178
        """
179
        raise NotImplementedError
180

    
181
    def errback(self, ex):
182
        """Called when an error occurs.
183

184
        :type ex: :exc:`Exception`
185
        """
186
        raise NotImplementedError
187

    
188

    
189
class HelloHandler(SessionListener):
190

    
191
    def __init__(self, init_cb, error_cb):
192
        self._init_cb = init_cb
193
        self._error_cb = error_cb
194

    
195
    def callback(self, root, raw):
196
        tag, attrs = root
197
        if tag == qualify("hello"):
198
            try:
199
                id, capabilities = HelloHandler.parse(raw)
200
            except Exception as e:
201
                self._error_cb(e)
202
            else:
203
                self._init_cb(id, capabilities)
204

    
205
    def errback(self, err):
206
        self._error_cb(err)
207

    
208
    @staticmethod
209
    def build(capabilities):
210
        "Given a list of capability URI's returns <hello> message XML string"
211
        hello = new_ele("hello")
212
        caps = sub_ele(hello, "capabilities")
213
        def fun(uri): sub_ele(caps, "capability").text = uri
214
        map(fun, capabilities)
215
        return to_xml(hello)
216

    
217
    @staticmethod
218
    def parse(raw):
219
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
220
        sid, capabilities = 0, []
221
        root = to_ele(raw)
222
        for child in root.getchildren():
223
            if child.tag == qualify("session-id"):
224
                sid = child.text
225
            elif child.tag == qualify("capabilities"):
226
                for cap in child.getchildren():
227
                    if cap.tag == qualify("capability"):
228
                        capabilities.append(cap.text)
229
        return sid, Capabilities(capabilities)