from cStringIO import StringIO
from os import SEEK_CUR
-import socket
+from select import select
import paramiko
BUF_SIZE = 4096
MSG_DELIM = ']]>]]>'
+# TODO:
+# chuck SSHClient and use paramiko low-level api to get cisco compatibility
+# and finer control over host key verification, authentication, and error
+# handling
+
class SSHSession(Session):
def __init__(self, load_known_hosts=True,
self._connected = False
def _fresh_data(self):
+ '''The buffer could have grown by a maximum of BUF_SIZE bytes everytime
+ this method is called. Retains state across method calls and if a byte
+ has been read it will not be parsed again.
+ '''
delim = MSG_DELIM
n = len(delim) - 1
state = self._parsing_state
x = buf.read(1)
if not x: # done reading
break
- if x==delim[i]: # what we expected
+ if x==delim[state]: # what we expected
state += 1 # expect the next delim char
else:
state = 0 # reset
self._connected = True
self._post_connect()
-
def run(self):
chan = self._channel
chan.setblocking(0)
q = self._q
try:
while True:
- if chan.closed:
- raise SessionCloseError(self._in_buf.getvalue())
- if chan.send_ready() and not q.empty():
- data = q.get() + MSG_DELIM
- while data:
- n = chan.send(data)
- if n <= 0:
- raise SessionCloseError(self._in_buf.getvalue(), data)
- data = data[n:]
- if chan.recv_ready():
+ # select on a paramiko ssh channel object does not ever
+ # return it in the writable list, so it does not exactly
+ # emulate the socket api
+ r, w, e = select([chan], [], [], 0.1)
+ if r:
data = chan.recv(BUF_SIZE)
if data:
self._in_buf.write(data)
self._fresh_data()
else:
raise SessionCloseError(self._in_buf.getvalue())
+ if not q.empty() and chan.send_ready():
+ data = q.get() + MSG_DELIM
+ while data:
+ n = chan.send(data)
+ if n <= 0:
+ raise SessionCloseError(self._in_buf.getvalue(), data)
+ data = data[n:]
except Exception as e:
logger.debug('*** broke out of main loop ***')
self.dispatch('error', e)