Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 216bb34c

History | View | Annotate | Download (7.9 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient import content
19
from ncclient.capabilities import Capabilities
20

    
21
import logging
22
logger = logging.getLogger('ncclient.transport.session')
23

    
24
class Session(Thread):
25

    
26
    "Base class for use by transport protocol implementations."
27

    
28
    def __init__(self, capabilities):
29
        Thread.__init__(self)
30
        self.setDaemon(True)
31
        self._listeners = set() # 3.0's weakset would be ideal
32
        self._lock = Lock()
33
        self.setName('session')
34
        self._q = Queue()
35
        self._client_capabilities = capabilities
36
        self._server_capabilities = None # yet
37
        self._id = None # session-id
38
        self._connected = False # to be set/cleared by subclass implementation
39
        logger.debug('%r created: client_capabilities=%r' %
40
                     (self, self._client_capabilities))
41

    
42
    def _dispatch_message(self, raw):
43
        try:
44
            root = content.parse_root(raw)
45
        except Exception as e:
46
            logger.error('error parsing dispatch message: %s' % e)
47
            return
48
        with self._lock:
49
            listeners = list(self._listeners)
50
        for l in listeners:
51
            logger.debug('dispatching message to %r' % l)
52
            try:
53
                l.callback(root, raw)
54
            except Exception as e:
55
                logger.warning('[error] %r' % e)
56

    
57
    def _dispatch_error(self, err):
58
        with self._lock:
59
            listeners = list(self._listeners)
60
        for l in listeners:
61
            logger.debug('dispatching error to %r' % l)
62
            try:
63
                l.errback(err)
64
            except Exception as e:
65
                logger.warning('error %r' % e)
66

    
67
    def _post_connect(self):
68
        "Greeting stuff"
69
        init_event = Event()
70
        error = [None] # so that err_cb can bind error[0]. just how it is.
71
        # callbacks
72
        def ok_cb(id, capabilities):
73
            self._id = id
74
            self._server_capabilities = capabilities
75
            init_event.set()
76
        def err_cb(err):
77
            error[0] = err
78
            init_event.set()
79
        listener = HelloHandler(ok_cb, err_cb)
80
        self.add_listener(listener)
81
        self.send(HelloHandler.build(self._client_capabilities))
82
        logger.debug('starting main loop')
83
        self.start()
84
        # we expect server's hello message
85
        init_event.wait()
86
        # received hello message or an error happened
87
        self.remove_listener(listener)
88
        if error[0]:
89
            raise error[0]
90
        #if ':base:1.0' not in self.server_capabilities:
91
        #    raise MissingCapabilityError(':base:1.0')
92
        logger.info('initialized: session-id=%s | server_capabilities=%s' %
93
                    (self._id, self._server_capabilities))
94

    
95
    def add_listener(self, listener):
96
        """Register a listener that will be notified of incoming messages and
97
        errors.
98

99
        :type listener: :class:`SessionListener`
100
        """
101
        logger.debug('installing listener %r' % listener)
102
        if not isinstance(listener, SessionListener):
103
            raise SessionError("Listener must be a SessionListener type")
104
        with self._lock:
105
            self._listeners.add(listener)
106

    
107
    def remove_listener(self, listener):
108
        """Unregister some listener; ignore if the listener was never
109
        registered.
110

111
        :type listener: :class:`SessionListener`
112
        """
113
        logger.debug('discarding listener %r' % listener)
114
        with self._lock:
115
            self._listeners.discard(listener)
116

    
117
    def get_listener_instance(self, cls):
118
        """If a listener of the specified type is registered, returns the
119
        instance.
120

121
        :type cls: :class:`SessionListener`
122
        """
123
        with self._lock:
124
            for listener in self._listeners:
125
                if isinstance(listener, cls):
126
                    return listener
127

    
128
    def connect(self, *args, **kwds): # subclass implements
129
        raise NotImplementedError
130

    
131
    def run(self): # subclass implements
132
        raise NotImplementedError
133

    
134
    def send(self, message):
135
        """Send the supplied *message* to NETCONF server.
136

137
        :arg message: an XML document
138

139
        :type message: `string`
140
        """
141
        logger.debug('queueing %s' % message)
142
        self._q.put(message)
143

    
144
    ### Properties
145

    
146
    @property
147
    def connected(self):
148
        "Connection status of the session."
149
        return self._connected
150

    
151
    @property
152
    def client_capabilities(self):
153
        "Client's :class:`Capabilities`"
154
        return self._client_capabilities
155

    
156
    @property
157
    def server_capabilities(self):
158
        "Server's :class:`Capabilities`"
159
        return self._server_capabilities
160

    
161
    @property
162
    def id(self):
163
        """A `string` representing the `session-id`. If the session has not
164
        been initialized it will be :const:`None`"""
165
        return self._id
166

    
167
    @property
168
    def can_pipeline(self):
169
        "Whether this session supports pipelining"
170
        return True
171

    
172

    
173
class SessionListener(object):
174

    
175
    """Base class for :class:`Session` listeners, which are notified when a new
176
    NETCONF message is received or an error occurs.
177

178
    .. note::
179
        Avoid time-intensive tasks in a callback's context.
180
    """
181

    
182
    def callback(self, root, raw):
183
        """Called when a new XML document is received. The `root` argument
184
        allows the callback to determine whether it wants to further process the
185
        document.
186

187
        :arg root: is a tuple of `(tag, attributes)` where `tag` is the qualified name of the root element and `attributes` is a dictionary of its attributes (also qualified names)
188
        :type root: `tuple`
189

190
        :arg raw: XML document
191
        :type raw: `string`
192
        """
193
        raise NotImplementedError
194

    
195
    def errback(self, ex):
196
        """Called when an error occurs.
197

198
        :type ex: :exc:`Exception`
199
        """
200
        raise NotImplementedError
201

    
202

    
203
class HelloHandler(SessionListener):
204

    
205
    def __init__(self, init_cb, error_cb):
206
        self._init_cb = init_cb
207
        self._error_cb = error_cb
208

    
209
    def callback(self, root, raw):
210
        if content.unqualify(root[0]) == 'hello':
211
            try:
212
                id, capabilities = HelloHandler.parse(raw)
213
            except Exception as e:
214
                self._error_cb(e)
215
            else:
216
                self._init_cb(id, capabilities)
217

    
218
    def errback(self, err):
219
        self._error_cb(err)
220

    
221
    @staticmethod
222
    def build(capabilities):
223
        "Given a list of capability URI's returns <hello> message XML string"
224
        spec = {
225
            'tag': content.qualify('hello'),
226
            'subtree': [{
227
                'tag': 'capabilities',
228
                'subtree': # this is fun :-)
229
                    [{'tag': 'capability', 'text': uri} for uri in capabilities]
230
                }]
231
            }
232
        return content.dtree2xml(spec)
233

    
234
    @staticmethod
235
    def parse(raw):
236
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
237
        sid, capabilities = 0, []
238
        root = content.xml2ele(raw)
239
        for child in root.getchildren():
240
            tag = content.unqualify(child.tag)
241
            if tag == 'session-id':
242
                sid = child.text
243
            elif tag == 'capabilities':
244
                for cap in child.getchildren():
245
                    if content.unqualify(cap.tag) == 'capability':
246
                        capabilities.append(cap.text)
247
        return sid, Capabilities(capabilities)