Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 4f650d54

History | View | Annotate | Download (7.8 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient import content
19
from ncclient.capabilities import Capabilities
20

    
21
import logging
22
logger = logging.getLogger('ncclient.transport.session')
23

    
24
class Session(Thread):
25
    "Base class for use by transport protocol implementations."
26

    
27
    def __init__(self, capabilities):
28
        Thread.__init__(self)
29
        self.setDaemon(True)
30
        self._listeners = set() # 3.0's weakset would be ideal
31
        self._lock = Lock()
32
        self.setName('session')
33
        self._q = Queue()
34
        self._client_capabilities = capabilities
35
        self._server_capabilities = None # yet
36
        self._id = None # session-id
37
        self._connected = False # to be set/cleared by subclass implementation
38
        logger.debug('%r created: client_capabilities=%r' %
39
                     (self, self._client_capabilities))
40

    
41
    def _dispatch_message(self, raw):
42
        try:
43
            root = content.parse_root(raw)
44
        except Exception as e:
45
            logger.error('error parsing dispatch message: %s' % e)
46
            return
47
        with self._lock:
48
            listeners = list(self._listeners)
49
        for l in listeners:
50
            logger.debug('dispatching message to %r' % l)
51
            try:
52
                l.callback(root, raw)
53
            except Exception as e:
54
                logger.warning('[error] %r' % e)
55

    
56
    def _dispatch_error(self, err):
57
        with self._lock:
58
            listeners = list(self._listeners)
59
        for l in listeners:
60
            logger.debug('dispatching error to %r' % l)
61
            try:
62
                l.errback(err)
63
            except Exception as e:
64
                logger.warning('error %r' % e)
65

    
66
    def _post_connect(self):
67
        "Greeting stuff"
68
        init_event = Event()
69
        error = [None] # so that err_cb can bind error[0]. just how it is.
70
        # callbacks
71
        def ok_cb(id, capabilities):
72
            self._id = id
73
            self._server_capabilities = capabilities
74
            init_event.set()
75
        def err_cb(err):
76
            error[0] = err
77
            init_event.set()
78
        listener = HelloHandler(ok_cb, err_cb)
79
        self.add_listener(listener)
80
        self.send(HelloHandler.build(self._client_capabilities))
81
        logger.debug('starting main loop')
82
        self.start()
83
        # we expect server's hello message
84
        init_event.wait()
85
        # received hello message or an error happened
86
        self.remove_listener(listener)
87
        if error[0]:
88
            raise error[0]
89
        logger.info('initialized: session-id=%s | server_capabilities=%s' %
90
                    (self._id, self._server_capabilities))
91

    
92
    def add_listener(self, listener):
93
        """Register a listener that will be notified of incoming messages and
94
        errors.
95

96
        :arg listener: :class:`SessionListener`
97
        """
98
        logger.debug('installing listener %r' % listener)
99
        if not isinstance(listener, SessionListener):
100
            raise SessionError("Listener must be a SessionListener type")
101
        with self._lock:
102
            self._listeners.add(listener)
103

    
104
    def remove_listener(self, listener):
105
        """Unregister some listener; ignore if the listener was never
106
        registered."""
107
        logger.debug('discarding listener %r' % listener)
108
        with self._lock:
109
            self._listeners.discard(listener)
110

    
111
    def get_listener_instance(self, cls):
112
        """If a listener of the sspecified type is registered, returns the
113
        instance. This is useful when it is desirable to have only one instance
114
        of a particular type per session, i.e. a multiton.
115

116
        :arg cls: class of the listener
117
        """
118
        with self._lock:
119
            for listener in self._listeners:
120
                if isinstance(listener, cls):
121
                    return listener
122

    
123
    def connect(self, *args, **kwds): # subclass implements
124
        raise NotImplementedError
125

    
126
    def run(self): # subclass implements
127
        raise NotImplementedError
128

    
129
    def send(self, message):
130
        """Send the supplied *message* to NETCONF server.
131

132
        :arg message: an XML document
133

134
        :type message: :obj:`string`
135
        """
136
        logger.debug('queueing %s' % message)
137
        self._q.put(message)
138

    
139
    ### Properties
140

    
141
    @property
142
    def connected(self):
143
        "Connection status of the session."
144
        return self._connected
145

    
146
    @property
147
    def client_capabilities(self):
148
        "Client's :class:`Capabilities`"
149
        return self._client_capabilities
150

    
151
    @property
152
    def server_capabilities(self):
153
        "Server's :class:`Capabilities`"
154
        return self._server_capabilities
155

    
156
    @property
157
    def id(self):
158
        """A :obj:`string` representing the `session-id`. If the session has not
159
        been initialized it will be :const:`None`"""
160
        return self._id
161

    
162
    @property
163
    def can_pipeline(self):
164
        "Whether this session supports pipelining"
165
        return True
166

    
167

    
168
class SessionListener(object):
169

    
170
    """Base class for :class:`Session` listeners, which are notified when a new
171
    NETCONF message is received or an error occurs.
172

173
    .. note::
174
        Avoid time-intensive tasks in a callback's context.
175
    """
176

    
177
    def callback(self, root, raw):
178
        """Called when a new XML document is received. The `root` argument
179
        allows the callback to determine whether it wants to further process the
180
        document.
181

182
        :arg root: is a tuple of `(tag, attributes)` where `tag` is the qualified name of the root element and `attributes` is a dictionary of its attributes (also qualified names)
183

184
        :arg raw: XML document
185
        :type raw: :obj:`string`
186
        """
187
        raise NotImplementedError
188

    
189
    def errback(self, ex):
190
        """Called when an error occurs.
191

192
        :type ex: :exc:`Exception`
193
        """
194
        raise NotImplementedError
195

    
196

    
197
class HelloHandler(SessionListener):
198

    
199
    def __init__(self, init_cb, error_cb):
200
        self._init_cb = init_cb
201
        self._error_cb = error_cb
202

    
203
    def callback(self, root, raw):
204
        if content.unqualify(root[0]) == 'hello':
205
            try:
206
                id, capabilities = HelloHandler.parse(raw)
207
            except Exception as e:
208
                self._error_cb(e)
209
            else:
210
                self._init_cb(id, capabilities)
211

    
212
    def errback(self, err):
213
        self._error_cb(err)
214

    
215
    @staticmethod
216
    def build(capabilities):
217
        "Given a list of capability URI's returns <hello> message XML string"
218
        spec = {
219
            'tag': content.qualify('hello'),
220
            'subtree': [{
221
                'tag': 'capabilities',
222
                'subtree': # this is fun :-)
223
                    [{'tag': 'capability', 'text': uri} for uri in capabilities]
224
                }]
225
            }
226
        return content.dtree2xml(spec)
227

    
228
    @staticmethod
229
    def parse(raw):
230
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
231
        sid, capabilities = 0, []
232
        root = content.xml2ele(raw)
233
        for child in root.getchildren():
234
            tag = content.unqualify(child.tag)
235
            if tag == 'session-id':
236
                sid = child.text
237
            elif tag == 'capabilities':
238
                for cap in child.getchildren():
239
                    if content.unqualify(cap.tag) == 'capability':
240
                        capabilities.append(cap.text)
241
        return sid, Capabilities(capabilities)