Revision f5c75f88 ncclient/session/session.py

b/ncclient/session/session.py
12 12
# See the License for the specific language governing permissions and
13 13
# limitations under the License.
14 14

  
15
import logging
16 15
from threading import Thread, Lock, Event
17 16
from Queue import Queue
18 17

  
18
from . import logger
19 19
from capabilities import Capabilities, CAPABILITIES
20 20

  
21
logger = logging.getLogger('ncclient.session')
22 21

  
23
class SessionError(Exception):
24
    
25
    pass
22
class Subject:
26 23

  
27
class SessionCloseError(SessionError):
28
    
29
    def __init__(self, in_buf, out_buf=None):
30
        SessionError.__init__(self)
31
        self._in_buf, self._out_buf = in_buf, out_buf
24
    def __init__(self):
25
        self._listeners = set([])
26
        self._lock = Lock()
32 27
        
33
    def __str__(self):
34
        msg = 'Session closed by remote endpoint.'
35
        if self._in_buf:
36
            msg += '\nIN_BUFFER: %s' % self._in_buf
37
        if self._out_buf:
38
            msg += '\nOUT_BUFFER: %s' % self._out_buf
39
        return msg
28
    def has_listener(self, listener):
29
        with self._lock:
30
            return (listener in self._listeners)
40 31
    
41
class Session(Thread):
32
    def add_listener(self, listener):
33
        with self._lock:
34
            self._listeners.add(listener)
35
    
36
    def remove_listener(self, listener):
37
        with self._lock:
38
            self._listeners.discard(listener)
39
    
40
    def dispatch(self, event, *args, **kwds):
41
        # holding the lock while doing callbacks could lead to a deadlock
42
        # if one of the above methods is called
43
        with self._lock:
44
            listeners = list(self._listeners)
45
        for l in listeners:
46
            try:
47
                logger.debug('dispatching [%s] to [%s]' % (event, l))
48
                getattr(l, event)(*args, **kwds)
49
            except Exception as e:
50
                pass # if a listener doesn't care for some event we don't care
51

  
52

  
53
class Session(Thread, Subject):
42 54
    
43 55
    def __init__(self):
44 56
        Thread.__init__(self, name='session')
57
        Subject.__init__(self)
45 58
        self._client_capabilities = CAPABILITIES
46 59
        self._server_capabilities = None # yet
47 60
        self._id = None # session-id
48 61
        self._q = Queue()
49 62
        self._connected = False # to be set/cleared by subclass implementation
50
        self._listeners = set([])
51
        self._lock = Lock()
52 63
    
53 64
    def _post_connect(self):
54 65
        from ncclient.content.builders import HelloBuilder
55
        # queue client's hello message for sending
56 66
        self.send(HelloBuilder.build(self._client_capabilities))
57
        
58 67
        error = None
59
        proceed = Event()
68
        init_event = Event()
60 69
        def ok_cb(id, capabilities):
61 70
            self._id, self._capabilities = id, Capabilities(capabilities)
62
            proceed.set()
71
            init_event.set()
63 72
        def err_cb(err):
64 73
            error = err
65
            proceed.set()
74
            init_event.set()
66 75
        listener = HelloListener(ok_cb, err_cb)
67 76
        self.add_listener(listener)
68
        
69 77
        # start the subclass' main loop
70 78
        self.start()        
71 79
        # we expect server's hello message
72
        proceed.wait()
80
        init_event.wait()
73 81
        # received hello message or an error happened
74 82
        self.remove_listener(listener)
75 83
        if error:
76
            self._close()
77
            raise self._error
84
            raise error
85
        logger.debug('initialized:session-id:%s' % self._id)
78 86
    
79 87
    def send(self, message):
80
        logger.debug('queueing message: \n%s' % message)
88
        logger.debug('queueing:%s' % message)
81 89
        self._q.put(message)
82 90
    
83 91
    def connect(self):
......
92 100
        elif whose == 'server':
93 101
            return self._server_capabilities
94 102
    
95
    ### Session is a subject for arbitary listeners
96
    
97
    def has_listener(self, listener):
98
        with self._lock:
99
            return (listener in self._listeners)
100
    
101
    def add_listener(self, listener):
102
        with self._lock:
103
            self._listeners.add(listener)
104
    
105
    def remove_listener(self, listener):
106
        with self._lock:
107
            self._listeners.discard(listener)
108
    
109
    def dispatch(self, event, *args, **kwds):
110
        # holding the lock while doing callbacks could lead to a deadlock
111
        # if one of the above methods is called
112
        with self._lock:
113
            listeners = list(self._listeners)
114
        for l in listeners:
115
            try:
116
                logger.debug('dispatching [%s] to [%s]' % (event, l))
117
                getattr(l, event)(*args, **kwds)
118
            except Exception as e:
119
                logger.warning(e)
120
    
121 103
    ### Properties
122 104
    
123 105
    @property
......
147 129
    
148 130
    ### Events
149 131
    
150
    def reply(self, raw):
132
    def received(self, raw):
133
        logger.debug(raw)
151 134
        from ncclient.content.parsers import HelloParser
152 135
        try:
153 136
            id, capabilities = HelloParser.parse(raw)
......
165 148
    def __str__(self):
166 149
        return 'DebugListener'
167 150
    
168
    def reply(self, raw):
169
        logger.debug('DebugListener:reply:%s' % raw)
151
    def received(self, raw):
152
        logger.debug('DebugListener:[received]:%s' % raw)
170 153
    
171 154
    def error(self, err):
172
        logger.debug('DebugListener:error:%s' % err)
155
        logger.debug('DebugListener:[error]:%s' % err)

Also available in: Unified diff