Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 0cdb8b3c

History | View | Annotate | Download (7.6 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient import content
19
from ncclient.capabilities import Capabilities
20

    
21
import logging
22
logger = logging.getLogger('ncclient.transport.session')
23

    
24
class Session(Thread):
25
    "This is a base class for use by protocol implementations"
26
    
27
    def __init__(self, capabilities):
28
        Thread.__init__(self)
29
        self.set_daemon(True)
30
        self._listeners = set() # 3.0's weakset ideal
31
        self._lock = Lock()
32
        self.set_name('session')
33
        self._q = Queue()
34
        self._client_capabilities = capabilities
35
        self._server_capabilities = None # yet
36
        self._id = None # session-id
37
        self._connected = False # to be set/cleared by subclass implementation
38
        logger.debug('%r created: client_capabilities=%r' %
39
                     (self, self._client_capabilities))
40
    
41
    def _dispatch_message(self, raw):
42
        try:
43
            root = content.parse_root(raw)
44
        except Exception as e:
45
            logger.error('error parsing dispatch message: %s' % e)
46
            return
47
        with self._lock:
48
            listeners = list(self._listeners)
49
        for l in listeners:
50
            logger.debug('dispatching message to %r' % l)
51
            try:
52
                l.callback(root, raw)
53
            except Exception as e:
54
                logger.warning('[error] %r' % e)
55
    
56
    def _dispatch_error(self, err):
57
        with self._lock:
58
            listeners = list(self._listeners)
59
        for l in listeners:
60
            logger.debug('dispatching error to %r' % l)
61
            try:
62
                l.errback(err)
63
            except Exception as e:
64
                logger.warning('error %r' % e)
65
    
66
    def _post_connect(self):
67
        "Greeting stuff"
68
        init_event = Event()
69
        error = [None] # so that err_cb can bind error[0]. just how it is.
70
        # callbacks
71
        def ok_cb(id, capabilities):
72
            self._id = id
73
            self._server_capabilities = capabilities
74
            init_event.set()
75
        def err_cb(err):
76
            error[0] = err
77
            init_event.set()
78
        listener = HelloHandler(ok_cb, err_cb)
79
        self.add_listener(listener)
80
        self.send(HelloHandler.build(self._client_capabilities))
81
        logger.debug('starting main loop')
82
        self.start()
83
        # we expect server's hello message
84
        init_event.wait()
85
        # received hello message or an error happened
86
        self.remove_listener(listener)
87
        if error[0]:
88
            raise error[0]
89
        logger.info('initialized: session-id=%s | server_capabilities=%s' % (self._id, self._server_capabilities))
90
    
91
    def add_listener(self, listener):
92
        """Register a listener that will be notified of incoming messages and errors.
93
        
94
        :type listener: :class:`SessionListener`
95
        """
96
        logger.debug('installing listener %r' % listener)
97
        if not isinstance(listener, SessionListener):
98
            raise SessionError("Listener must be a SessionListener type")
99
        with self._lock:
100
            self._listeners.add(listener)
101
    
102
    def remove_listener(self, listener):
103
        "Unregister some listener; ignoring if the listener was never registered."
104
        logger.debug('discarding listener %r' % listener)
105
        with self._lock:
106
            self._listeners.discard(listener)
107
    
108
    def get_listener_instance(self, cls):
109
        """If a listener of the specified type is registered, returns it. This is useful when it is desirable to have only one instance of a particular type per session, i.e. a multiton.
110
        
111
        :type cls: :class:`type`
112
        :rtype: :class:`SessionListener` or :const:`None`
113
        """
114
        with self._lock:
115
            for listener in self._listeners:
116
                if isinstance(listener, cls):
117
                    return listener
118
    
119
    def connect(self, *args, **kwds): # subclass implements
120
        raise NotImplementedError
121

    
122
    def run(self): # subclass implements
123
        raise NotImplementedError
124
    
125
    def send(self, message):
126
        """
127
        :param message: XML document
128
        :type message: string
129
        """
130
        logger.debug('queueing %s' % message)
131
        self._q.put(message)
132
    
133
    ### Properties
134

    
135
    @property
136
    def connected(self):
137
        ":rtype: bool"
138
        return self._connected
139

    
140
    @property
141
    def client_capabilities(self):
142
        ":rtype: :class:`Capabilities`"
143
        return self._client_capabilities
144
    
145
    @property
146
    def server_capabilities(self):
147
        ":rtype: :class:`Capabilities` or :const:`None`"
148
        return self._server_capabilities
149
    
150
    @property
151
    def id(self):
152
        ":rtype: :obj:`string` or :const:`None`"
153
        return self._id
154
    
155
    @property
156
    def can_pipeline(self):
157
        ":rtype: :obj:`bool`"
158
        return True
159

    
160

    
161
class SessionListener(object):
162
    
163
    """'Listen' to incoming messages on a NETCONF :class:`Session`
164
    
165
    .. note::
166
        Avoid computationally intensive tasks in the callbacks.
167
    """
168
    
169
    def callback(self, root, raw):
170
        """Called when a new XML document is received. The `root` argument allows the callback to determine whether it wants to further process the document.
171
        
172
        :param root: tuple of (tag, attrs) where tag is the qualified name of the root element and attrs is a dictionary of its attributes (also qualified names)
173
        :param raw: XML document
174
        :type raw: string
175
        """
176
        raise NotImplementedError
177
    
178
    def errback(self, ex):
179
        """Called when an error occurs.
180
        
181
        :type ex: :class:`Exception`
182
        """
183
        raise NotImplementedError
184

    
185

    
186
class HelloHandler(SessionListener):
187
    
188
    def __init__(self, init_cb, error_cb):
189
        self._init_cb = init_cb
190
        self._error_cb = error_cb
191
    
192
    def callback(self, root, raw):
193
        if content.unqualify(root[0]) == 'hello':
194
            try:
195
                id, capabilities = HelloHandler.parse(raw)
196
            except Exception as e:
197
                self._error_cb(e)
198
            else:
199
                self._init_cb(id, capabilities)
200
    
201
    def errback(self, err):
202
        self._error_cb(err)
203
    
204
    @staticmethod
205
    def build(capabilities):
206
        "Given a list of capability URI's returns <hello> message XML string"
207
        spec = {
208
            'tag': content.qualify('hello'),
209
            'subtree': [{
210
                'tag': 'capabilities',
211
                'subtree': # this is fun :-)
212
                    [{'tag': 'capability', 'text': uri} for uri in capabilities]
213
                }]
214
            }
215
        return content.dtree2xml(spec)
216
    
217
    @staticmethod
218
    def parse(raw):
219
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
220
        sid, capabilities = 0, []
221
        root = content.xml2ele(raw)
222
        for child in root.getchildren():
223
            tag = content.unqualify(child.tag)
224
            if tag == 'session-id':
225
                sid = child.text
226
            elif tag == 'capabilities':
227
                for cap in child.getchildren():
228
                    if content.unqualify(cap.tag) == 'capability':
229
                        capabilities.append(cap.text)
230
        return sid, Capabilities(capabilities)