1 # Copyright 2009 Shikhar Bhushan
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from Queue import Queue
16 from threading import Thread, Lock, Event
18 from ncclient import content
19 from ncclient.capabilities import Capabilities
21 from errors import TransportError
24 logger = logging.getLogger('ncclient.transport.session')
26 class Session(Thread):
28 "Base class for use by transport protocol implementations."
30 def __init__(self, capabilities):
33 self._listeners = set() # 3.0's weakset would be ideal
35 self.setName('session')
37 self._client_capabilities = capabilities
38 self._server_capabilities = None # yet
39 self._id = None # session-id
40 self._connected = False # to be set/cleared by subclass implementation
41 logger.debug('%r created: client_capabilities=%r' %
42 (self, self._client_capabilities))
44 def _dispatch_message(self, raw):
46 root = content.parse_root(raw)
47 except Exception as e:
48 logger.error('error parsing dispatch message: %s' % e)
51 listeners = list(self._listeners)
53 logger.debug('dispatching message to %r' % l)
56 except Exception as e:
57 logger.warning('[error] %r' % e)
59 def _dispatch_error(self, err):
61 listeners = list(self._listeners)
63 logger.debug('dispatching error to %r' % l)
66 except Exception as e:
67 logger.warning('error dispatching to %r: %r' % (l, e))
69 def _post_connect(self):
72 error = [None] # so that err_cb can bind error[0]. just how it is.
74 def ok_cb(id, capabilities):
76 self._server_capabilities = capabilities
81 listener = HelloHandler(ok_cb, err_cb)
82 self.add_listener(listener)
83 self.send(HelloHandler.build(self._client_capabilities))
84 logger.debug('starting main loop')
86 # we expect server's hello message
88 # received hello message or an error happened
89 self.remove_listener(listener)
92 #if ':base:1.0' not in self.server_capabilities:
93 # raise MissingCapabilityError(':base:1.0')
94 logger.info('initialized: session-id=%s | server_capabilities=%s' %
95 (self._id, self._server_capabilities))
97 def add_listener(self, listener):
98 """Register a listener that will be notified of incoming messages and
101 :type listener: :class:`SessionListener`
103 logger.debug('installing listener %r' % listener)
104 if not isinstance(listener, SessionListener):
105 raise SessionError("Listener must be a SessionListener type")
107 self._listeners.add(listener)
109 def remove_listener(self, listener):
110 """Unregister some listener; ignore if the listener was never
113 :type listener: :class:`SessionListener`
115 logger.debug('discarding listener %r' % listener)
117 self._listeners.discard(listener)
119 def get_listener_instance(self, cls):
120 """If a listener of the specified type is registered, returns the
123 :type cls: :class:`SessionListener`
126 for listener in self._listeners:
127 if isinstance(listener, cls):
130 def connect(self, *args, **kwds): # subclass implements
131 raise NotImplementedError
133 def run(self): # subclass implements
134 raise NotImplementedError
136 def send(self, message):
137 """Send the supplied *message* to NETCONF server.
139 :arg message: an XML document
141 :type message: `string`
143 if not self.connected:
144 raise TransportError('Not connected to NETCONF server')
145 logger.debug('queueing %s' % message)
152 "Connection status of the session."
153 return self._connected
156 def client_capabilities(self):
157 "Client's :class:`Capabilities`"
158 return self._client_capabilities
161 def server_capabilities(self):
162 "Server's :class:`Capabilities`"
163 return self._server_capabilities
167 """A `string` representing the `session-id`. If the session has not
168 been initialized it will be :const:`None`"""
172 def can_pipeline(self):
173 "Whether this session supports pipelining"
177 class SessionListener(object):
179 """Base class for :class:`Session` listeners, which are notified when a new
180 NETCONF message is received or an error occurs.
183 Avoid time-intensive tasks in a callback's context.
186 def callback(self, root, raw):
187 """Called when a new XML document is received. The `root` argument
188 allows the callback to determine whether it wants to further process the
191 :arg root: tuple of `(tag, attributes)` where `tag` is the qualified name of the root element and `attributes` is a dictionary of its attributes (also qualified names)
194 :arg raw: XML document
197 raise NotImplementedError
199 def errback(self, ex):
200 """Called when an error occurs.
202 :type ex: :exc:`Exception`
204 raise NotImplementedError
207 class HelloHandler(SessionListener):
209 def __init__(self, init_cb, error_cb):
210 self._init_cb = init_cb
211 self._error_cb = error_cb
213 def callback(self, root, raw):
214 if content.unqualify(root[0]) == 'hello':
216 id, capabilities = HelloHandler.parse(raw)
217 except Exception as e:
220 self._init_cb(id, capabilities)
222 def errback(self, err):
226 def build(capabilities):
227 "Given a list of capability URI's returns <hello> message XML string"
230 'attrib': {'xmlns': content.BASE_NS},
232 'tag': 'capabilities',
233 'subtree': # this is fun :-)
234 [{'tag': 'capability', 'text': uri} for uri in capabilities]
237 return content.dtree2xml(spec)
241 "Returns tuple of (session-id (str), capabilities (Capabilities)"
242 sid, capabilities = 0, []
243 root = content.xml2ele(raw)
244 for child in root.getchildren():
245 tag = content.unqualify(child.tag)
246 if tag == 'session-id':
248 elif tag == 'capabilities':
249 for cap in child.getchildren():
250 if content.unqualify(cap.tag) == 'capability':
251 capabilities.append(cap.text)
252 return sid, Capabilities(capabilities)