def abbreviate(uri):
if uri.startswith('urn:ietf:params:netconf:capability:'):
- return (':' + uri.split(':')[5])
+ return ':' + uri.split(':')[5]
+ elif uri.startswith('urn:ietf:params:netconf:base:'):
+ return ':base'
-def schemes(uri):
- return uri.partition("?scheme=")[2].split(',')
+def version(uri):
+ if uri.startswith('urn:ietf:params:netconf:capability:'):
+ return uri.split(':')[6]
+ elif uri.startswith('urn:ietf:params:netconf:base:'):
+ return uri.split(':')[5]
class Capabilities:
-
- """Represent the capabilities of client or server. Also facilitates using
- abbreviated capability names in addition to complete URI.
- """
-
+
def __init__(self, capabilities=None):
self._dict = {}
if isinstance(capabilities, dict):
self._dict = capabilities
elif isinstance(capabilities, list):
for uri in capabilities:
- self._dict[uri] = abbreviate(uri)
-
+ self._dict[uri] = (abbreviate(uri), version(uri))
+
def __contains__(self, key):
- return ( key in self._dict ) or ( key in self._dict.values() )
-
+ if key in self._dict:
+ return True
+ for info in self._dict.values():
+ if key == info[0]:
+ return True
+ return False
+
def __iter__(self):
return self._dict.keys().__iter__()
-
+
def __repr__(self):
return repr(self._dict.keys())
-
+
def __list__(self):
return self._dict.keys()
-
- def add(self, uri, shorthand=None):
- if shorthand is None:
- shorthand = abbreviate(uri)
- self._dict[uri] = shorthand
-
+
+ def add(self, uri, info=None):
+ if info is None:
+ info = (abbreviate(uri), version(uri))
+ self._dict[uri] = info
+
set = add
-
+
def remove(self, key):
if key in self._dict:
del self._dict[key]
else:
for uri in self._dict:
- if self._dict[uri] == key:
+ if key in self._dict[uri]:
del self._dict[uri]
break
-# : the capabilities currently supported by ncclient
+ def get_uri(self, shortname):
+ for uri, info in self._dict.items():
+ if info[0] == shortname:
+ return uri
+
+ def url_schemes(self):
+ url_uri = get_uri(':url')
+ if url_uri is None:
+ return []
+ else:
+ return url_uri.partition("?scheme=")[2].split(',')
+
+ def version(self, key):
+ try:
+ return self._dict[key][1]
+ except KeyError:
+ for uri, info in self._dict.items():
+ if info[0] == key:
+ return info[1]
+
+
+#: the capabilities supported by NCClient
CAPABILITIES = Capabilities([
'urn:ietf:params:netconf:base:1.0',
'urn:ietf:params:netconf:capability:writable-running:1.0',
'urn:ietf:params:netconf:capability:xpath:1.0',
#'urn:ietf:params:netconf:capability:notification:1.0', # TODO
#'urn:ietf:params:netconf:capability:interleave:1.0' # theoretically already supported
-])
\ No newline at end of file
+])
# See the License for the specific language governing permissions and
# limitations under the License.
+
+"""The :mod:`content` module provides methods for creating XML documents, parsing XML, and converting between different XML representations. It uses :mod:`~xml.etree.ElementTree` internally.
+"""
+
from cStringIO import StringIO
from xml.etree import cElementTree as ET
### Namespace-related
-# : Base NETCONf namespace
+#: Base NETCONf namespace
BASE_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0'
-# : ... and this is BASE_NS according to Cisco devices tested
+#: ... and this is BASE_NS according to Cisco devices tested
CISCO_BS = 'urn:ietf:params:netconf:base:1.0'
try:
# limitations under the License.
import capabilities
-import operations
+from operations import OPERATIONS
import transport
connect = ssh_connect # default session type
-RAISE_ALL, RAISE_ERROR, RAISE_NONE = range(3)
+#: Raise all errors
+RAISE_ALL = 0
+#:
+RAISE_ERR = 1
+#:
+RAISE_NONE = 2
class Manager:
def __init__(self, session):
self._session = session
- self._rpc_error_handling = RAISE_ALL
+ self._rpc_error_action = RAISE_ALL
- def set_rpc_error_option(self, option):
+ def set_rpc_error_action(self, action):
self._rpc_error_handling = option
def do(self, op, *args, **kwds):
- op = operations.OPERATIONS[op](self._session)
+ op = OPERATIONS[op](self._session)
reply = op.request(*args, **kwds)
if not reply.ok:
if self._raise == RAISE_ALL:
def locked(self, target):
"""Returns a context manager for use with the 'with' statement.
- `target` is the datastore to lock, e.g. 'candidate
+
+ :arg target: name of the datastore to lock
+ :type target: `string`
"""
return operations.LockContext(self._session, target)
- get = lambda self, *args, **kwds: self.do('get', *args, **kwds).data
+ def get(self, filter=None):
+ pass
+
+ def get_config(self, source, filter=None):
+ pass
+
+ def copy_config(self, source, target):
+ pass
+
+ def validate(self, source):
+ pass
+
+ def commit(self, target):
+ pass
+
+ def discard_changes(self):
+ pass
+
+ def delete_config(self, target):
+ pass
- get_config = lambda self, *args, **kwds: self.do('get-config', *args, **kwds).data
+ def lock(self, target):
+ pass
- edit_config = lambda self, *args, **kwds: self.do('edit-config', *args, **kwds)
+ def unlock(self, target):
+ pass
- copy_config = lambda self, *args, **kwds: self.do('copy-config', *args, **kwds)
+ def close_session(self):
+ pass
- validate = lambda self, *args, **kwds: self.do('validate', *args, **kwds)
+ def kill_session(self, session_id):
+ pass
- commit = lambda self, *args, **kwds: self.do('commit', *args, **kwds)
+ def confirmed_commit(self, timeout=None):
+ pass
- discard_changes = lambda self, *args, **kwds: self.do('discard-changes', *args, **kwds)
+ def confirm(self):
+ # give confirmation
+ pass
- delete_config = lambda self, *args, **kwds: self.do('delete-config', *args, **kwds)
+ def discard_changes(self):
+ pass
lock = lambda self, *args, **kwds: self.do('lock', *args, **kwds)
def close(self):
try: # try doing it clean
self.close_session()
- except:
- pass
+ except Exception as e:
+ logger.debug('error doing <close-session> -- %r' % e)
if self._session.connected: # if that didn't work...
self._session.close()
def session(self, session):
return self._session
- def get_capabilities(self, whose):
- if whose in ('manager', 'client'):
- return self._session._client_capabilities
- elif whose in ('agent', 'server'):
- return self._session._server_capabilities
-
@property
- def capabilities(self):
+ def client_capabilities(self):
return self._session._client_capabilities
@property
def server_capabilities(self):
return self._session._server_capabilities
+
+ @property
+ def session_id(self):
+ return self._session.id
# See the License for the specific language governing permissions and
# limitations under the License.
-'NETCONF protocol operations'
-
from errors import OperationError, MissingCapabilityError
-from rpc import RPCError
-from retrieve import Get, GetConfig
-from edit import EditConfig, CopyConfig, DeleteConfig, Validate, Commit, DiscardChanges
+from rpc import RPC, RPCReply, RPCError
+from retrieve import Get, GetConfig, GetReply
+from edit import EditConfig, CopyConfig, DeleteConfig, Validate, Commit, DiscardChanges, ConfirmedCommit
from session import CloseSession, KillSession
from lock import Lock, Unlock, LockContext
-from subscribe import CreateSubscription
+#from subscribe import CreateSubscription
OPERATIONS = {
'get': Get,
}
__all__ = [
+ 'RPC',
+ 'RPCReply',
'RPCError',
'OPERATIONS',
'Get',
'GetConfig',
+ 'GetReply',
'EditConfig',
'CopyConfig',
'Validate',
'Commit',
+ 'ConfirmedCommit'
'DiscardChanges',
'DeleteConfig',
'Lock',
'Unlock',
'LockContext',
'CloseSession',
- 'KillSession',
- 'CreateSubscription',
+ 'KillSession'
]
"Operations related to configuration editing"
class EditConfig(RPC):
-
- # tested: no
- # combed: yes
-
+
SPEC = {'tag': 'edit-config', 'subtree': []}
-
+
def request(self, target=None, config=None, default_operation=None,
test_option=None, error_option=None):
util.one_of(target, config)
'tag': 'error-option',
'text': error_option
})
+ return self._request(spec)
class DeleteConfig(RPC):
-
- # tested: no
- # combed: yes
-
+
SPEC = {'tag': 'delete-config', 'subtree': []}
-
+
def request(self, target):
spec = DeleteConfig.SPEC.copy()
- spec['subtree'].append(util.store_or_url('source', source, self._assert))
+ spec['subtree'].append(util.store_or_url('target', target, self._assert))
return self._request(spec)
class CopyConfig(RPC):
-
- # tested: no
- # combed: yes
-
+
SPEC = {'tag': 'copy-config', 'subtree': []}
-
+
def request(self, source, target):
spec = CopyConfig.SPEC.copy()
spec['subtree'].append(util.store_or_url('source', source, self._assert))
- spec['subtree'].append(util.store_or_url('target', source, self._assert))
+ spec['subtree'].append(util.store_or_url('target', target, self._assert))
return self._request(spec)
class Validate(RPC):
-
- # tested: no
- # combed: yes
-
- 'config attr shd not include <config> root'
-
+
DEPENDS = [':validate']
-
+
SPEC = {'tag': 'validate', 'subtree': []}
-
- def request(self, source=None, config=None):
- util.one_of(source, config)
+
+ def request(self, source):
+ # determine if source is a <config> element
spec = Validate.SPEC.copy()
- if config is None:
+ try:
+ spec['subtree'].append({
+ 'tag': 'source',
+ 'subtree': content.validated_root(config, ('config', content.qualify('config')))
+ })
+ except ContentError:
spec['subtree'].append(util.store_or_url('source', source, self._assert))
- else:
- spec['subtree'].append(content.validated_root(config, 'config'))
return self._request(spec)
class Commit(RPC):
-
- # tested: no
- # combed: yes
-
+
DEPENDS = [':candidate']
-
+
SPEC = {'tag': 'commit', 'subtree': []}
-
+
def _parse_hook(self):
pass
-
- def request(self, confirmed=False, timeout=None):
+
+ def request(self, confirmed=False):
spec = SPEC.copy()
if confirmed:
self._assert(':confirmed-commit')
class DiscardChanges(RPC):
-
- # tested: no
- # combed: yes
-
+
DEPENDS = [':candidate']
-
+
SPEC = {'tag': 'discard-changes'}
class ConfirmedCommit(Commit):
"psuedo-op"
-
- # tested: no
- # combed: yes
-
+
DEPENDS = [':candidate', ':confirmed-commit']
-
- def request(self, timeout=None):
- "Commit changes; requireing that a confirming commit follow"
- return Commit.request(self, confirmed=True, timeout=timeout)
-
+
+ def request(self):
+ "Commit changes requiring that a confirm/discard follow"
+ return Commit.request(self, confirmed=True)
+
def confirm(self):
- "Make the confirming commit"
+ "Confirm changes"
return Commit.request(self, confirmed=True)
-
+
def discard(self):
+ "Discard changes"
return DiscardChanges(self.session, self.async, self.timeout).request()
class MissingCapabilityError(NCClientError):
pass
-
from rpc import RPC
class Lock(RPC):
-
- # tested: no
- # combed: yes
-
+
SPEC = {
'tag': 'lock',
'subtree': {
'subtree': {'tag': None }
}
}
-
- def request(self, target):
+
+ def request(self, target, *args, **kwds):
spec = Lock.SPEC.copy()
spec['subtree']['subtree']['tag'] = target
- return self._request(spec)
+ return self._request(spec, *args, **kwds)
class Unlock(RPC):
-
- # tested: no
- # combed: yes
-
+
SPEC = {
'tag': 'unlock',
'subtree': {
'subtree': {'tag': None }
}
}
-
- def request(self, target):
+
+ def request(self, target, *args, **kwds):
spec = Unlock.SPEC.copy()
spec['subtree']['subtree']['tag'] = target
- return self._request(spec)
+ return self._request(spec, *args, **kwds)
class LockContext:
-
- # tested: no
- # combed: yes
-
+
def __init__(self, session, target):
self.session = session
self.target = target
-
+
def __enter__(self):
reply = Lock(self.session).request(self.target)
if not reply.ok:
raise reply.error
else:
return self
-
+
def __exit__(self, *args):
reply = Unlock(session).request(self.target)
if not reply.ok:
import util
class GetReply(RPCReply):
-
- 'Adds data attribute'
-
- # tested: no
- # combed: yes
-
+
+ """Adds attributes for the *<data>* element to :class:`RPCReply`, pertinent
+ to the *<get>* or *<get-config>* operations."""
+
def _parsing_hook(self, root):
self._data = None
if not self._errors:
- self._data = content.find(root, 'data', nslist=[content.BASE_NS, content.CISCO_BS])
-
+ self._data = content.find(root, 'data',
+ nslist=[content.BASE_NS,
+ content.CISCO_BS])
+
@property
- def data(self):
+ def data_ele(self):
+ "As an :class:`~xml.etree.ElementTree.Element`"
if not self._parsed:
self.parse()
return self._data
+ @property
+ def data_xml(self):
+ "As an XML string"
+ if not self._parsed:
+ self.parse()
+ return content.ele2xml(self._data)
+
+ data = data_ele
+
+
class Get(RPC):
-
- # tested: no
- # combed: yes
-
+
+ "*<get>* RPC"
+
SPEC = {
'tag': 'get',
'subtree': []
}
-
+
REPLY_CLS = GetReply
-
+
def request(self, filter=None):
spec = Get.SPEC.copy()
if filter is not None:
class GetConfig(RPC):
- # tested: no
- # combed: yes
-
+ "*<get-config>* RPC"
+
SPEC = {
'tag': 'get-config',
'subtree': []
}
-
+
REPLY_CLS = GetReply
-
+
def request(self, source, filter=None):
- """
- `filter` has to be a tuple of (type, criteria)
- The type may be one of 'xpath' or 'subtree'
- The criteria may be an ElementTree.Element, an XML fragment, or tree specification
- """
spec = GetConfig.SPEC.copy()
spec['subtree'].append(util.store_or_url('source', source, self._assert))
if filter is not None:
from weakref import WeakValueDictionary
from ncclient import content
+from ncclient.capabilities import check
from ncclient.transport import SessionListener
from errors import OperationError
class RPCReply:
+ """Represents an *<rpc-reply>*. Only concerns itself with whether the
+ operation was successful. Note that if the reply has not yet been parsed
+ there is a one-time parsing overhead to accessing the :attr:`ok` and
+ :attr:`error`/:attr:`errors` attributes."""
+
def __init__(self, raw):
self._raw = raw
self._parsed = False
def __repr__(self):
return self._raw
- def _parsing_hook(self, root): pass
+ def _parsing_hook(self, root):
+ """Subclass can implement.
+
+ :type root: :class:`~xml.etree.ElementTree.Element`
+ """
+ pass
def parse(self):
+ """Parse the *<rpc-reply>*"""
if self._parsed:
return
root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
@property
def xml(self):
- '<rpc-reply> as returned'
+ "*<rpc-reply>* as returned"
return self._raw
@property
def ok(self):
+ "Boolean value indicating if there were no errors."
if not self._parsed:
self.parse()
return not self._errors # empty list => false
@property
def error(self):
+ "Short for :attr:`errors`[0], returning :const:`None` if there were no errors."
if not self._parsed:
self.parse()
if self._errors:
@property
def errors(self):
- 'List of RPCError objects. Will be empty if no <rpc-error> elements in reply.'
+ "List of :class:`RPCError` objects. Will be empty if there were no :class:`<rpc-error>` elements in reply."
if not self._parsed:
self.parse()
return self._errors
class RPCError(OperationError): # raise it if you like
+ """Represents an *<rpc-error>*. It is an instance of :exc:`OperationError`
+ so it can be raised like any other exception."""
+
def __init__(self, err_dict):
self._dict = err_dict
if self.message is not None:
@property
def type(self):
+ "`string` represeting *error-type* element"
return self.get('error-type', None)
@property
def severity(self):
+ "`string` represeting *error-severity* element"
return self.get('error-severity', None)
@property
def tag(self):
+ "`string` represeting *error-tag* element"
return self.get('error-tag', None)
@property
def path(self):
+ "`string` or :const:`None`; represeting *error-path* element"
return self.get('error-path', None)
@property
def message(self):
+ "`string` or :const:`None`; represeting *error-message* element"
return self.get('error-message', None)
@property
def info(self):
+ "`string` or :const:`None`, represeting *error-info* element"
return self.get('error-info', None)
## dictionary interface
class RPCReplyListener(SessionListener):
+ # internal use
+
# one instance per session
def __new__(cls, session):
instance = session.get_listener_instance(cls)
else:
logger.warning('<rpc-reply> without message-id received: %s' % raw)
logger.debug('delivering to %r' % rpc)
- rpc.deliver(raw)
+ rpc.deliver_reply(raw)
def errback(self, err):
for rpc in self._id2rpc.values():
- rpc.error(err)
+ rpc.deliver_error(err)
class RPC(object):
+ "Directly corresponds to *<rpc>* requests. Handles making the request, and taking delivery of the reply."
+
+ # : Subclasses can specify their dependencies on capabilities. List of URI's
+ # or abbreviated names, e.g. ':writable-running'. These are verified at the
+ # time of object creation. If the capability is not available, a
+ # :exc:`MissingCapabilityError` is raised.
DEPENDS = []
+
+ # : Subclasses can specify a different reply class, but it must be a
+ # subclass of :class:`RPCReply`.
REPLY_CLS = RPCReply
def __init__(self, session, async=False, timeout=None):
- if not session.can_pipeline:
- raise UserWarning('Asynchronous mode not supported for this device/session')
self._session = session
try:
for cap in self.DEPENDS:
self._listener.register(self._id, self)
self._reply = None
self._error = None
- self._reply_event = Event()
+ self._event = Event()
def _build(self, opspec):
- "TODO: docstring"
+ # internal
spec = {
'tag': content.qualify('rpc'),
'attrib': {'message-id': self._id},
- 'subtree': opspec
+ 'subtree': [ opspec ]
}
return content.dtree2xml(spec)
def _request(self, op):
+ """Subclasses call this method to make the RPC request.
+
+ In asynchronous mode, returns an :class:`~threading.Event` which is set
+ when the reply has been received or an error occured. It is prudent,
+ therefore, to check the :attr:`error` attribute before accesing
+ :attr:`reply`.
+
+ Otherwise, waits until the reply is received and returns
+ :class:`RPCReply`.
+
+ :arg opspec: :ref:`dtree` for the operation
+ :type opspec: :obj:`dict` or :obj:`string` or :class:`~xml.etree.ElementTree.Element`
+ :rtype: :class:`~threading.Event` or :class:`RPCReply`
+ """
req = self._build(op)
self._session.send(req)
if self._async:
- return self._reply_event
+ return self._event
else:
- self._reply_event.wait(self._timeout)
- if self._reply_event.isSet():
+ self._event.wait(self._timeout)
+ if self._event.isSet():
if self._error:
raise self._error
self._reply.parse()
else:
raise ReplyTimeoutError
- def request(self):
- return self._request(self.SPEC)
+ def request(self, *args, **kwds):
+ "Subclasses implement this method. Here, the operation is to be constructed as a :ref:`dtree`, and the result of :meth:`_request` returned."
+ return self._request(self.SPEC, *args, **kwds)
def _delivery_hook(self):
- 'For subclasses'
+ """Subclasses can implement this method. Will be called after
+ initialising the :attr:`reply` or :attr:`error` attribute and before
+ setting the :attr:`event`"""
pass
def _assert(self, capability):
+ """Subclasses can use this method to verify that a capability is available
+ with the NETCONF server, before making a request that requires it. A
+ :class:`MissingCapabilityError` will be raised if the capability is not
+ available."""
if capability not in self._session.server_capabilities:
raise MissingCapabilityError('Server does not support [%s]' % cap)
- def deliver(self, raw):
+ def deliver_reply(self, raw):
+ # internal use
self._reply = self.REPLY_CLS(raw)
self._delivery_hook()
- self._reply_event.set()
+ self._event.set()
- def error(self, err):
+ def deliver_error(self, err):
+ # internal use
self._error = err
- self._reply_event.set()
-
- @property
- def has_reply(self):
- return self._reply_event.is_set()
+ self._delivery_hook()
+ self._event.set()
@property
def reply(self):
- if self.error:
- raise self._error
+ ":class:`RPCReply` element if reply has been received or :const:`None`"
return self._reply
@property
+ def error(self):
+ """:exc:`Exception` type if an error occured or :const:`None`.
+
+ This attribute should be checked if the request was made asynchronously,
+ so that it can be determined if :attr:`event` being set is because of a
+ reply or error.
+
+ .. note::
+ This represents an error which prevented a reply from being
+ received. An *<rpc-error>* does not fall in that category -- see
+ :class:`RPCReply` for that.
+ """
+ return self._error
+
+ @property
def id(self):
+ "The *message-id* for this RPC"
return self._id
@property
def session(self):
+ """The :class:`~ncclient.transport.Session` object associated with this
+ RPC"""
return self._session
@property
- def reply_event(self):
- return self._reply_event
+ def event(self):
+ """:class:`~threading.Event` that is set when reply has been received or
+ error occured."""
+ return self._event
+
+ def set_async(self, async=True):
+ """Set asynchronous mode for this RPC."""
+ self._async = async
+ if async and not session.can_pipeline:
+ raise UserWarning('Asynchronous mode not supported for this device/session')
+
+ def set_timeout(self, timeout):
+ """Set the timeout for synchronous waiting defining how long the RPC
+ request will block on a reply before raising an error."""
+ self._timeout = timeout
- def set_async(self, bool): self._async = bool
+ #: Whether this RPC is asynchronous
async = property(fget=lambda self: self._async, fset=set_async)
- def set_timeout(self, timeout): self._timeout = timeout
+ #: Timeout for synchronous waiting
timeout = property(fget=lambda self: self._timeout, fset=set_timeout)
from rpc import RPC
class CloseSession(RPC):
- # tested: no
- # combed: yes
-
+
+ "*<close-session>* RPC. The connection to NETCONF server is also closed."
+
SPEC = { 'tag': 'close-session' }
-
- def _delivery_hook(self):
+
+ def _delivsery_hook(self):
self.session.close()
class KillSession(RPC):
- # tested: no
- # combed: yes
-
+
+ "*<kill-session>* RPC."
+
SPEC = {
'tag': 'kill-session',
'subtree': []
}
-
- def request(self, session_id):
+
+ def request(self, session_id, *args, **kwds):
spec = KillSession.SPEC.copy()
if not isinstance(session_id, basestring): # just making sure...
session_id = str(session_id)
'tag': 'session-id',
'text': session_id
})
- return self._request(spec)
+ return self._request(spec, *args, **kwds)
'subtree': criteria
}
else:
- rep = content.validated_element(spec, 'filter', 'type')
+ rep = content.validated_element(spec, ['filter', content.qualify('filter')],
+ attrs=[('type', content.qualify('type'))])
try:
type = rep['type']
except KeyError:
if saved_exception is not None:
# need pep-3134 to do this right
- raise SSHAuthenticationError(repr(saved_exception))
+ raise AuthenticationError(repr(saved_exception))
- raise SSHAuthenticationError('No authentication methods available')
+ raise AuthenticationError('No authentication methods available')
def run(self):
chan = self._channel