# limitations under the License.
'''
-NOTES
+TODO
=====
-
-- operations complete
-- parse into dicts??
-- code freeze and reST doc
+* code freeze and reST doc
'''
import sys
+# actually no reason why shouldn't work on 2.5 but that's... untested -- TODO
if sys.version_info < (2, 6):
raise RuntimeError('You need Python 2.6+ for this module.')
__version__ = "0.05"
-class NCClientError(Exception):
- pass
-
-class TransportError(NCClientError):
- pass
-
-class RPCError(NCClientError):
- pass
-
-class OperationError(NCClientError):
- pass
-
-class ContentError(NCClientError):
- pass
from xml.etree import cElementTree as ET
-iselement = ET.iselement
-element2string = ET.tostring
### Namespace-related ###
if found is not None:
break
return found
-
+
### Build XML using Python data structures ###
"TODO: docstring"
self._root = XMLConverter.build(spec)
- def to_string(self, encoding='utf-8'):
+ def tostring(self, encoding='utf-8'):
"TODO: docstring"
xml = ET.tostring(self._root, encoding)
# some etree versions don't include xml decl with utf-8
return spec
elif isinstance(spec, basestring):
return ET.XML(spec)
- ## assume isinstance(spec, dict)
+ # assume isinstance(spec, dict)
if 'tag' in spec:
ele = ET.Element(spec.get('tag'), spec.get('attributes', {}))
ele.text = spec.get('text', '')
return ele
elif 'comment' in spec:
return ET.Comment(spec.get('comment'))
+ # TODO elif DOM rep
else:
raise ContentError('Invalid tree spec')
@staticmethod
- def from_string(xml):
+ def fromstring(xml):
return XMLConverter.parse(ET.fromstring(xml))
@staticmethod
'tail': root.tail,
'subtree': [ XMLConverter.parse(child) for child in root.getchildren() ]
}
+
+## utility functions
+
+iselement = ET.iselement
+
+def isdom(x): return True # TODO
+
+def root_ensured(rep, tag):
+ if isinstance(rep, basestring):
+ rep = ET.XML(rep)
+ err = False
+ if ((iselement(rep) and (rep.tag not in (tag, qualify(tag))) or (isdom(x)))):
+ raise ArgumentError("Expected root element [%s] not found" % tag)
+ else:
+ return rep
def parse_root(raw):
- '''Parse the top-level element from a string representing an XML document.
+ '''Internal use.
+ Parse the top-level element from XML string.
Returns a `(tag, attributes)` tuple, where `tag` is a string representing
the qualified name of the root element and `attributes` is an
class Manager:
- 'Facade for the API'
+ "Thin layer of abstraction for the ncclient API."
RAISE_ALL = 0
RAISE_ERROR = 1
self._session = session
self._raise = rpc_error
- def do(self, op, *args, **kwds):
+ def rpc(self, op, *args, **kwds):
op = OPERATIONS[op](self._session)
reply = op.request(*args, **kwds)
if not reply.ok:
return reply.data
def locked(self, target):
- "For use with 'with'. target is the datastore, e.g. 'candidate'"
+ "Returns a context manager for use withthe 'with' statement.
+ `target` is the datastore to lock, e.g. 'candidate'"
return operations.LockContext(self._session, target)
-
+
get = lambda self, *args, **kwds: self._get('get')
get_config = lambda self, *args, **kwds: self._get('get-config')
pass
if self._session.connected: # if that didn't work...
self._session.close()
+
+ @property
+ def session(self, session):
+ return self._session
+
+ def get_capabilities(self, whose):
+ if whose in ('manager', 'client'):
+ return self._session._client_capabilities
+ elif whose in ('agent', 'server')
+ return self._session._server_capabilities
+
+
+ @property
+ def capabilities(self):
+ return self._session._client_capabilities
# See the License for the specific language governing permissions and
# limitations under the License.
-from ncclient.rpc import RPC
from ncclient.content import iselement
+from rpc import RPC
+
import util
-"""
-"""
-# NOTES
-# - consider class for helping define <config> for EditConfig??
+"Operations related to configuration editing"
class EditConfig(RPC):
# tested: no
- # combed: no
+ # combed: yes
- SPEC = {
- 'tag': 'edit-config',
- 'subtree': []
- }
+ SPEC = {'tag': 'edit-config', 'subtree': []}
def request(self, target=None, target_url=None, config=None,
default_operation=None, test_option=None, error_option=None):
subtree = spec['subtree']
subtree.append({
'tag': 'target',
- 'subtree': util.store_or_url(target, target_url)
- })
- subtree.append({
- 'tag': 'config',
- 'subtree': config
+ 'subtree': util.store_or_url(target, target_url, self._assert)
})
+ subtree.append(config)
if default_operation is not None:
subtree.append({
'tag': 'default-operation',
# tested: no
# combed: yes
- SPEC = {
- 'tag': 'delete-config',
- 'subtree': [ { 'tag': 'target', 'subtree': None } ]
- }
+ SPEC = {'tag': 'delete-config', 'subtree': []}
def request(self, target=None, target_url=None):
spec = DeleteConfig.SPEC.copy()
- spec['subtree'][0]['subtree'] = util.store_or_url(target, target_url)
+ spec['subtree'].append({
+ 'tag': 'target',
+ 'subtree': util.store_or_url(target, target_url, self._assert)
+ })
return self._request(spec)
# tested: no
# combed: yes
- SPEC = {
- 'tag': 'copy-config',
- 'subtree': []
- }
+ SPEC = {'tag': 'copy-config', 'subtree': []}
def request(self, source=None, source_url=None, target=None, target_url=None):
spec = CopyConfig.SPEC.copy()
spec['subtree'].append({
- 'tag': 'target',
- 'subtree': util.store_or_url(source, source_url)
+ 'tag': 'source',
+ 'subtree': util.store_or_url(source, source_url, self._assert)
})
spec['subtree'].append({
'tag': 'target',
- 'subtree': util.store_or_url(target, target_url)
+ 'subtree': util.store_or_url(target, target_url, self._assert)
})
return self._request(spec)
DEPENDS = [':validate']
- SPEC = {
- 'tag': 'validate',
- 'subtree': []
- }
+ SPEC = {'tag': 'validate', 'subtree': []}
- def request(self, source=None, config=None):
- util.one_of(source, capability)
- spec = SPEC.copy()
- if source is not None:
+ def request(self, source=None, source_url=None, config=None):
+ util.one_of(source, source_url, config)
+ spec = Validate.SPEC.copy()
+ if config is None:
spec['subtree'].append({
'tag': 'source',
- 'subtree': {'tag': source}
+ 'subtree': util.store_or_url(source, source_url, self._assert)
})
else:
- spec['subtree'].append({
- 'tag': 'config',
- 'subtree': config
- })
+ spec['subtree'].append(config)
return self._request(spec)
DEPENDS = [':candidate']
- SPEC = { 'tag': 'commit', 'subtree': [] }
+ SPEC = {'tag': 'commit', 'subtree': []}
def _parse_hook(self):
pass
def request(self, confirmed=False, timeout=None):
spec = SPEC.copy()
if confirmed:
+ self._assert(':confirmed-commit')
spec['subtree'].append({'tag': 'confirmed'})
if timeout is not None:
spec['subtree'].append({
def request(self, filter=None):
spec = Get.SPEC.copy()
if filter is not None:
- spec['subtree'].append(util.build_filter(*filter))
+ spec['subtree'].append(content.rootchecked(filter. 'filter', 'type'))
return self._request(spec)
class GetConfig(RPC):
'subtree': util.store_or_url(source, source_url)
})
if filter is not None:
- spec['subtree'].append(util.build_filter(*filter))
+ spec['subtree'].append(content.rootchecked(filter, 'filter', 'type'))
return self._request(spec)
+
from rpc import RPC
from reply import RPCReply, RPCError
-import ncclient
-
-class ReplyTimeoutError(ncclient.RPCError):
- pass
-
__all__ = [
'RPC',
'RPCReply',
- 'RPCError',
- 'ReplyTimeoutError'
+ 'RPCError'
]
# See the License for the specific language governing permissions and
# limitations under the License.
-'Boilerplate'
+'Boilerplate ugliness'
from ncclient import OperationError
+from ncclient.content import qualify as _
+from ncclient.content import ensure_root
-from . import MissingCapabilityError
+from ncclient.errors import MissingCapabilityError, ArgumentError
def one_of(*args):
'Verifies that only one of the arguments is not None'
return
raise OperationError('Insufficient parameters')
-def store_or_url(store, url):
+def store_or_url(store, url, capcheck_func=None):
one_of(store, url)
node = {}
if store is not None:
node['tag'] = store
else:
+ if capcheck_func is not None:
+ capcheck_func(':url') # hmm.. schema check? deem overkill for now
node['tag'] = 'url'
node['text'] = url
return node
-def build_filter(type, criteria):
- filter = {
- 'tag': 'filter',
- 'attributes': {'type': type}
- }
- if type == 'xpath':
- filter['attributes']['select'] = criteria
- else:
- filter['subtree'] = [criteria]
- return filter
+++ /dev/null
-# Copyright 2009 Shikhar Bhushan
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"TODO: docstrings"
-
-from ncclient import NCClientError
-
-class TransportError(NCClientError):
- pass
-
-class AuthenticationError(TransportError):
- pass
-
-class SessionCloseError(TransportError):
-
- def __init__(self, in_buf, out_buf=None):
- msg = 'Unexpected session close.'
- if in_buf:
- msg += ' IN_BUFFER: {%s}' % in_buf
- if out_buf:
- msg += ' OUT_BUFFER: {%s}' % out_buf
- SSHError.__init__(self, msg)
-
-class SSHError(TransportError):
- pass
-
-class SSHUnknownHostError(SSHError):
-
- def __init__(self, hostname, key):
- from binascii import hexlify
- SSHError(self, 'Unknown host key [%s] for [%s]'
- % (hexlify(key.get_fingerprint()), hostname))
- self.hostname = hostname
- self.key = key