1 # Copyright 2009 Shikhar Bhushan
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from Queue import Queue
16 from threading import Thread, Lock, Event
18 from ncclient import xml_
19 from ncclient.capabilities import Capabilities
21 from errors import TransportError
24 logger = logging.getLogger('ncclient.transport.session')
26 class Session(Thread):
28 "Base class for use by transport protocol implementations."
30 def __init__(self, capabilities):
33 self._listeners = set() # 3.0's weakset would be ideal
35 self.setName('session')
37 self._client_capabilities = capabilities
38 self._server_capabilities = None # yet
39 self._id = None # session-id
40 self._connected = False # to be set/cleared by subclass implementation
41 logger.debug('%r created: client_capabilities=%r' %
42 (self, self._client_capabilities))
44 def _dispatch_message(self, raw):
46 root = xml_.parse_root(raw)
47 except Exception as e:
48 logger.error('error parsing dispatch message: %s' % e)
51 listeners = list(self._listeners)
53 logger.debug('dispatching message to %r' % l)
56 except Exception as e:
57 logger.warning('[error] %r' % e)
59 def _dispatch_error(self, err):
61 listeners = list(self._listeners)
63 logger.debug('dispatching error to %r' % l)
66 except Exception as e:
67 logger.warning('error dispatching to %r: %r' % (l, e))
69 def _post_connect(self):
72 error = [None] # so that err_cb can bind error[0]. just how it is.
74 def ok_cb(id, capabilities):
76 self._server_capabilities = capabilities
81 listener = HelloHandler(ok_cb, err_cb)
82 self.add_listener(listener)
83 self.send(HelloHandler.build(self._client_capabilities))
84 logger.debug('starting main loop')
86 # we expect server's hello message
88 # received hello message or an error happened
89 self.remove_listener(listener)
92 #if ':base:1.0' not in self.server_capabilities:
93 # raise MissingCapabilityError(':base:1.0')
94 logger.info('initialized: session-id=%s | server_capabilities=%s' %
95 (self._id, self._server_capabilities))
97 def add_listener(self, listener):
98 """Register a listener that will be notified of incoming messages and
101 :type listener: :class:`SessionListener`
103 logger.debug('installing listener %r' % listener)
104 if not isinstance(listener, SessionListener):
105 raise SessionError("Listener must be a SessionListener type")
107 self._listeners.add(listener)
109 def remove_listener(self, listener):
110 """Unregister some listener; ignore if the listener was never
113 :type listener: :class:`SessionListener`
115 logger.debug('discarding listener %r' % listener)
117 self._listeners.discard(listener)
119 def get_listener_instance(self, cls):
120 """If a listener of the specified type is registered, returns the
123 :type cls: :class:`SessionListener`
126 for listener in self._listeners:
127 if isinstance(listener, cls):
130 def connect(self, *args, **kwds): # subclass implements
131 raise NotImplementedError
133 def run(self): # subclass implements
134 raise NotImplementedError
136 def send(self, message):
137 """Send the supplied *message* to NETCONF server.
139 :arg message: an XML document
141 :type message: `string`
143 if not self.connected:
144 raise TransportError('Not connected to NETCONF server')
145 logger.debug('queueing %s' % message)
152 "Connection status of the session."
153 return self._connected
156 def client_capabilities(self):
157 "Client's :class:`Capabilities`"
158 return self._client_capabilities
161 def server_capabilities(self):
162 "Server's :class:`Capabilities`"
163 return self._server_capabilities
167 """A `string` representing the `session-id`. If the session has not
168 been initialized it will be :const:`None`"""
172 def can_pipeline(self):
173 "Whether this session supports pipelining"
177 class SessionListener(object):
179 """Base class for :class:`Session` listeners, which are notified when a new
180 NETCONF message is received or an error occurs.
183 Avoid time-intensive tasks in a callback's context.
186 def callback(self, root, raw):
187 """Called when a new XML document is received. The `root` argument
188 allows the callback to determine whether it wants to further process the
191 :arg root: tuple of `(tag, attributes)` where `tag` is the qualified name of the root element and `attributes` is a dictionary of its attributes (also qualified names)
194 :arg raw: XML document
197 raise NotImplementedError
199 def errback(self, ex):
200 """Called when an error occurs.
202 :type ex: :exc:`Exception`
204 raise NotImplementedError
207 class HelloHandler(SessionListener):
209 def __init__(self, init_cb, error_cb):
210 self._init_cb = init_cb
211 self._error_cb = error_cb
213 def callback(self, root, raw):
214 if xml_.unqualify(root[0]) == 'hello':
216 id, capabilities = HelloHandler.parse(raw)
217 except Exception as e:
220 self._init_cb(id, capabilities)
222 def errback(self, err):
226 def build(capabilities):
227 "Given a list of capability URI's returns <hello> message XML string"
230 'attrib': {'xmlns': xml_.BASE_NS},
232 'tag': 'capabilities',
233 'subtree': # this is fun :-)
234 [{'tag': 'capability', 'text': uri} for uri in capabilities]
237 return xml_.dtree2xml(spec)
241 "Returns tuple of (session-id (str), capabilities (Capabilities)"
242 sid, capabilities = 0, []
243 root = xml_.xml2ele(raw)
244 for child in root.getchildren():
245 tag = xml_.unqualify(child.tag)
246 if tag == 'session-id':
248 elif tag == 'capabilities':
249 for cap in child.getchildren():
250 if xml_.unqualify(cap.tag) == 'capability':
251 capabilities.append(cap.text)
252 return sid, Capabilities(capabilities)