# See the License for the specific language governing permissions and
# limitations under the License.
+from threading import Lock
+
class Subject:
+ 'Thread-safe abstact class for event-dispatching subjects'
+
def __init__(self, listeners=[]):
self._listeners = listeners
-
+ self._lock = Lock()
+
def has_listener(self, listener):
- return (listener in self._listeners)
+ with self._lock:
+ return (listener in self._listeners)
def add_listener(self, listener):
+ with self._lock:
self._listeners.append(listener)
-
+
def remove_listener(self, listener):
- self._listeners.remove(listener)
-
+ with self._lock:
+ try:
+ self._listeners.remove(listener)
+ except ValueError:
+ pass
+
def dispatch(self, event, *args, **kwds):
try:
func = getattr(Listener, event)
- for l in listeners:
- func(l, data)
+ with self._lock:
+ for l in listeners:
+ func(l, *args, **kwds)
except AttributeError:
pass
class Listener:
+ """Abstract class for NETCONF protocol message listeners, defining 2 events:
+ - reply
+ - error
+ """
+
@override
def reply(self, *args, **kwds):
raise NotImplementedError
--- /dev/null
+# Copyright 2009 Shikhar Bhushan
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from listener import Listener
+
+from threading import Event
+
+class OperationError(NETCONFError): pass
+
+class RPC:
+ pass
+
+class RPCRequest(RPC):
+
+ cur_msg_id = {}
+
+ @cls
+ def next_id(cls, session_id):
+ cur_msg_id[session_id] = cur_msg_id.get(session_id, 0) + 1
+ return cur_msg_id[session_id]
+
+ def __init__(self):
+ self._reply = None
+ self._event = Event()
+ self._async = None
+
+ def get_reply(self, timeout=2.0):
+ self._event.wait(timeout)
+ if self._event.isSet():
+ return self._reply
+
+ def do(self, session, async=False):
+ self._async = async
+
+ @property
+ def async(self):
+ return self._async
+
+class RPCReply(RPC):
+ pass
+
+class RPCError(OperationError):
+ pass
+
+class Operation(RPCRequest):
+ pass
+
+class ReplyListener(Listener):
+
+ def __init__(self):
+ self._id2op = {}
+
+ def reply(self, msg):
+ # if all good:
+ op = id2op[id]
+ op._reply = parsed_msg
+ # else:
+ self._error = True
+
+ op._event.set()
+ pass
+
+ def error(self, buf):
+ pass
self._id = None # session-id
self._connected = False
self._initialised = False
+ self._error = False
self._q = Queue.Queue()
def _init(self, id, capabilities):
- if isinstance(id, int) and isinstance(capabilities, Capabilities):
- self.id = id
- self.capabilities[SERVER] = capabilities
- self.initialised = True
- else: # there was an error in parsing or such
- raise ValueError
+ self.id = id
+ self.capabilities[SERVER] = capabilities
+ self.initialised = True
def _greet(self):
hello = Creator()
raise NotImplementedError
def send(self, message):
- if self.ready:
- self._q.add(message)
+ 'Blocks if session not initialised yet'
+ while not (self.ready or self._error):
+ time.sleep(0.1)
+ if self._error:
+ raise SessionError
else:
- raise SessionError('Session not ready')
-
- ### Thread methods
+ self._q.add(message)
def run(self):
raise NotImplementedError
- ### Listener methods - these are relevant for the initial greeting only
+ ### Listener methods - relevant for the initial greeting
- def reply(self, data):
+ def reply(self, data, *args, **kwds):
id, capabilities = None, None
try:
p = Parser()
# ...
self._init(id, capabilities)
- except: # ...
- pass
+ except:
+ self._error = True
finally:
self.remove_listener(self)
- def error(self, data):
+ def error(self, data, *args, **kwds):
self._close()
- raise SSHError('Session initialization failed')
-
- ### Getter methods and properties
-
- def get_capabilities(self, whose):
- return self._capabilities[whose]
+ self.remove_listener(self)
+ self._error = True
- ready = property(lambda self: self._connected and self._initialised)
+ ### Properties
+
+ @property
+ def client_capabilities(self): return self._capabilities[CLIENT]
- id = property(lambda self: self._id)
+ @property
+ def serve_capabilities(self): return self._capabilities[SERVER]
- client_capabilities = property(lambda self: self._capabilities[CLIENT])
+ @property
+ def ready(self): return (self._connected and self._initialised)
- server_capabilities = property(lambda self: self._capabilities[SERVER])
+ @property
+ def id(self): return self._id
+
\ No newline at end of file