Statistics
| Branch: | Tag: | Revision:

root / ncclient / transport / session.py @ 583c11f6

History | View | Annotate | Download (6.2 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
from Queue import Queue
16
from threading import Thread, Lock, Event
17

    
18
from ncclient import content
19
from ncclient.capabilities import Capabilities
20

    
21
import logging
22
logger = logging.getLogger('ncclient.transport.session')
23

    
24
class Session(Thread):
25
    
26
    def __init__(self, capabilities):
27
        Thread.__init__(self)
28
        self.set_daemon(True)
29
        self._listeners = set() # 3.0's weakset ideal
30
        self._lock = Lock()
31
        self.set_name('session')
32
        self._q = Queue()
33
        self._client_capabilities = capabilities
34
        self._server_capabilities = None # yet
35
        self._id = None # session-id
36
        self._connected = False # to be set/cleared by subclass implementation
37
        logger.debug('%r created: client_capabilities=%r' %
38
                     (self, self._client_capabilities))
39
    
40
    def _dispatch_message(self, raw):
41
        "TODO: docstring"
42
        try:
43
            root = content.parse_root(raw)
44
        except Exception as e:
45
            logger.error('error parsing dispatch message: %s' % e)
46
            return
47
        with self._lock:
48
            listeners = list(self._listeners)
49
        for l in listeners:
50
            logger.debug('dispatching message to %r' % l)
51
            try:
52
                l.callback(root, raw)
53
            except Exception as e:
54
                logger.warning('[error] %r' % e)
55
    
56
    def _dispatch_error(self, err):
57
        "TODO: docstring"
58
        with self._lock:
59
            listeners = list(self._listeners)
60
        for l in listeners:
61
            logger.debug('dispatching error to %r' % l)
62
            try:
63
                l.errback(err)
64
            except Exception as e:
65
                logger.warning('error %r' % e)
66
    
67
    def _post_connect(self):
68
        "Greeting stuff"
69
        init_event = Event()
70
        error = [None] # so that err_cb can bind error[0]. just how it is.
71
        # callbacks
72
        def ok_cb(id, capabilities):
73
            self._id = id
74
            self._server_capabilities = capabilities
75
            init_event.set()
76
        def err_cb(err):
77
            error[0] = err
78
            init_event.set()
79
        listener = HelloHandler(ok_cb, err_cb)
80
        self.add_listener(listener)
81
        self.send(HelloHandler.build(self._client_capabilities))
82
        logger.debug('starting main loop')
83
        self.start()
84
        # we expect server's hello message
85
        init_event.wait()
86
        # received hello message or an error happened
87
        self.remove_listener(listener)
88
        if error[0]:
89
            raise error[0]
90
        logger.info('initialized: session-id=%s | server_capabilities=%s' %
91
                     (self._id, self._server_capabilities))
92
    
93
    def add_listener(self, listener):
94
        logger.debug('installing listener %r' % listener)
95
        if not isinstance(listener, SessionListener):
96
            raise SessionError("Listener must be a SessionListener type")
97
        with self._lock:
98
            self._listeners.add(listener)
99
    
100
    def remove_listener(self, listener):
101
        logger.debug('discarding listener %r' % listener)
102
        with self._lock:
103
            self._listeners.discard(listener)
104
    
105
    def get_listener_instance(self, cls):
106
        with self._lock:
107
            for listener in self._listeners:
108
                if isinstance(listener, cls):
109
                    return listener
110
    
111
    def connect(self, *args, **kwds):
112
        raise NotImplementedError
113

    
114
    def run(self):
115
        raise NotImplementedError
116
    
117
    def send(self, message):
118
        logger.debug('queueing %s' % message)
119
        self._q.put(message)
120
    
121
    ### Properties
122
    
123
    @property
124
    def client_capabilities(self):
125
        return self._client_capabilities
126
    
127
    @property
128
    def server_capabilities(self):
129
        return self._server_capabilities
130
    
131
    @property
132
    def connected(self):
133
        return self._connected
134
    
135
    @property
136
    def id(self):
137
        "`session-id` if session is initialized, :const:`None` otherwise"
138
        return self._id
139
    
140
    @property
141
    def can_pipeline(self):
142
        return True
143

    
144

    
145
class SessionListener(object):
146
    
147
    def callback(self, root, raw):
148
        raise NotImplementedError
149
    
150
    def errback(self, ex):
151
        raise NotImplementedError
152

    
153

    
154
class HelloHandler(SessionListener):
155
    
156
    def __init__(self, init_cb, error_cb):
157
        self._init_cb = init_cb
158
        self._error_cb = error_cb
159
    
160
    def callback(self, root, raw):
161
        if content.unqualify(root[0]) == 'hello':
162
            try:
163
                id, capabilities = HelloHandler.parse(raw)
164
            except Exception as e:
165
                self._error_cb(e)
166
            else:
167
                self._init_cb(id, capabilities)
168
    
169
    def errback(self, err):
170
        self._error_cb(err)
171
    
172
    @staticmethod
173
    def build(capabilities):
174
        "Given a list of capability URI's returns <hello> message XML string"
175
        spec = {
176
            'tag': content.qualify('hello'),
177
            'subtree': [{
178
                'tag': 'capabilities',
179
                'subtree': # this is fun :-)
180
                    [{'tag': 'capability', 'text': uri} for uri in capabilities]
181
                }]
182
            }
183
        return content.dtree2xml(spec)
184
    
185
    @staticmethod
186
    def parse(raw):
187
        "Returns tuple of (session-id (str), capabilities (Capabilities)"
188
        sid, capabilities = 0, []
189
        root = content.xml2ele(raw)
190
        for child in root.getchildren():
191
            tag = content.unqualify(child.tag)
192
            if tag == 'session-id':
193
                sid = child.text
194
            elif tag == 'capabilities':
195
                for cap in child.getchildren():
196
                    if content.unqualify(cap.tag) == 'capability':
197
                        capabilities.append(cap.text)
198
        return sid, Capabilities(capabilities)