"""
def __init__(self, capabilities=None):
- "TODO: docstring"
self._dict = {}
if isinstance(capabilities, dict):
self._dict = capabilities
self._dict[uri] = Capabilities.guess_shorthand(uri)
def __contains__(self, key):
- "TODO: docstring"
return ( key in self._dict ) or ( key in self._dict.values() )
def __iter__(self):
- "TODO: docstring"
return self._dict.keys().__iter__()
def __repr__(self):
- "TODO: docstring"
return repr(self._dict.keys())
def __list__(self):
return self._dict.keys()
def add(self, uri, shorthand=None):
- "TODO: docstring"
if shorthand is None:
shorthand = Capabilities.guess_shorthand(uri)
self._dict[uri] = shorthand
set = add
def remove(self, key):
- "TODO: docstring"
if key in self._dict:
del self._dict[key]
else:
@staticmethod
def guess_shorthand(uri):
- "TODO: docstring"
if uri.startswith('urn:ietf:params:netconf:capability:'):
return (':' + uri.split(':')[5])
'urn:ietf:params:netconf:capability:confirmed-commit:1.0',
'urn:ietf:params:netconf:capability:rollback-on-error:1.0',
'urn:ietf:params:netconf:capability:startup:1.0',
- 'urn:ietf:params:netconf:capability:url:1.0',
+ 'urn:ietf:params:netconf:capability:url:1.0?scheme=http,ftp,file',
'urn:ietf:params:netconf:capability:validate:1.0',
'urn:ietf:params:netconf:capability:xpath:1.0',
'urn:ietf:params:netconf:capability:notification:1.0',
# See the License for the specific language governing permissions and
# limitations under the License.
-"TODO: docstring"
-
from xml.etree import cElementTree as ET
from ncclient import NCClientError
unqualify = lambda tag: tag[tag.rfind('}')+1:]
-### Other utility functions
-
-iselement = ET.iselement
-
-def namespaced_find(ele, tag, strict=False):
- """In strict mode, doesn't work around Cisco implementations sending incorrectly
- namespaced XML. Supply qualified name if using strict mode.
- """
- found = None
- if strict:
- found = ele.find(tag)
- else:
- for qname in multiqualify(tag):
- found = ele.find(qname)
- if found is not None:
- break
- return found
-
-def parse_root(raw):
- '''Parse the top-level element from XML string.
-
- Returns a `(tag, attributes)` tuple, where `tag` is a string representing
- the qualified name of the root element and `attributes` is an
- `{attribute: value}` dictionary.
- '''
- fp = StringIO(raw[:1024]) # this is a guess but start element beyond 1024 bytes would be a bit absurd
- for event, element in ET.iterparse(fp, events=('start',)):
- return (element.tag, element.attrib)
-
-def root_ensured(rep, req_tag, req_attrs=None):
- rep = to_element(rep)
- if rep.tag not in (req_tag, qualify(req_tag)):
- raise ContentError("Required root element [%s] not found" % req_tag)
- if req_attrs is not None:
- pass # TODO
- return rep
-
### XML with Python data structures
dtree2ele = DictTree.Element
@staticmethod
def Element(xml):
return ET.fromstring(xml)
+
+### Other utility functions
+
+iselement = ET.iselement
+
+def find(ele, tag, strict=False):
+ """In strict mode, doesn't workaround Cisco implementations sending incorrectly
+ namespaced XML. Supply qualified tag name if using strict mode.
+ """
+ if strict:
+ return ele.find(tag)
+ else:
+ for qname in multiqualify(tag):
+ found = ele.find(qname)
+ if found is not None:
+ return found
+
+def parse_root(raw):
+ '''Parse the top-level element from XML string.
+
+ Returns a `(tag, attributes)` tuple, where `tag` is a string representing
+ the qualified name of the root element and `attributes` is an
+ `{attribute: value}` dictionary.
+ '''
+ fp = StringIO(raw[:1024]) # this is a guess but start element beyond 1024 bytes would be a bit absurd
+ for event, element in ET.iterparse(fp, events=('start',)):
+ return (element.tag, element.attrib)
+
+def validated_element(rep, tag, attrs=None):
+ ele = dtree2ele(rep)
+ if ele.tag not in (tag, qualify(tag)):
+ raise ContentError("Required root element [%s] not found" % tag)
+ if attrs is not None:
+ for req in attrs:
+ for attr in ele.attrib:
+ if unqualify(attr) == req:
+ break
+ else:
+ raise ContentError("Required attribute [%s] not found in element [%s]" % (req, req_tag))
+ return ele
class Manager:
- "Thin layer of abstraction for the ncclient API."
+ "Thin layer of abstraction for the API."
- RAISE_ALL = 0
- RAISE_ERROR = 1
- RAISE_NONE = 2
+ RAISE_ALL, RAISE_ERROR, RAISE_NONE = range(3)
- def __init__(self, session, rpc_error=Manager.RAISE_ERROR):
+ def __init__(self, session, rpc_errors=Manager.RAISE_ALL):
self._session = session
self._raise = rpc_error
- def rpc(self, op, *args, **kwds):
+ def do(self, op, *args, **kwds):
op = OPERATIONS[op](self._session)
reply = op.request(*args, **kwds)
if not reply.ok:
self.close()
return False
- def _get(self, type, *args, **kwds):
- reply = self.do(type)
- return reply.data
-
def locked(self, target):
- "Returns a context manager for use withthe 'with' statement.
- `target` is the datastore to lock, e.g. 'candidate'"
+ """Returns a context manager for use withthe 'with' statement.
+ `target` is the datastore to lock, e.g. 'candidate
+ """
return operations.LockContext(self._session, target)
- get = lambda self, *args, **kwds: self._get('get')
+ get = lambda self, *args, **kwds: self.do('get', *args, **kwds).data
- get_config = lambda self, *args, **kwds: self._get('get-config')
+ get_config = lambda self, *args, **kwds: self.do('get-config', *args, **kwds).data
edit_config = lambda self, *args, **kwds: self.do('edit-config', *args, **kwds)
pass
if self._session.connected: # if that didn't work...
self._session.close()
-
+
@property
def session(self, session):
- return self._session
+ return self._session
def get_capabilities(self, whose):
- if whose in ('manager', 'client'):
- return self._session._client_capabilities
- elif whose in ('agent', 'server')
- return self._session._server_capabilities
-
-
+ if whose in ('manager', 'client'):
+ return self._session._client_capabilities
+ elif whose in ('agent', 'server'):
+ return self._session._server_capabilities
+
@property
def capabilities(self):
return self._session._client_capabilities
'NETCONF protocol operations'
-from ncclient import NCClientError
-
-from rpc import RPC, RPCError
-from errors import MissingCapabilityError
+from errors import OperationError, MissingCapabilityError
+from rpc import RPCError
from retrieve import Get, GetConfig
from edit import EditConfig, CopyConfig, DeleteConfig, Validate, Commit, DiscardChanges
from session import CloseSession, KillSession
from lock import Lock, Unlock, LockContext
from subscribe import CreateSubscription
+OPERATIONS = {
+ 'get': Get,
+ 'get-config': GetConfig,
+ 'edit-config': EditConfig,
+ 'copy-config': CopyConfig,
+ 'validate': Validate,
+ 'commit': Commit,
+ 'discard-changes': DiscardChanges,
+ 'delete-config': DeleteConfig,
+ 'lock': Lock,
+ 'unlock': Unlock,
+ 'close_session': CloseSession,
+ 'kill-session': KillSession,
+}
+
__all__ = [
- 'RPC',
- 'RPCReply',
'RPCError',
+ 'OPERATIONS',
'Get',
'GetConfig',
'EditConfig',
SPEC = {'tag': 'edit-config', 'subtree': []}
- def request(self, target=None, target_url=None, config=None,
- default_operation=None, test_option=None, error_option=None):
- util.one_of(target, target_url)
+ def request(self, target=None, config=None, default_operation=None,
+ test_option=None, error_option=None):
+ util.one_of(target, config)
spec = EditConfig.SPEC.copy()
subtree = spec['subtree']
- subtree.append({
- 'tag': 'target',
- 'subtree': util.store_or_url(target, target_url, self._assert)
- })
- subtree.append(content.root_ensured(config, 'config'))
+ subtree.append(util.store_or_url('target', target, self._assert))
+ subtree.append(content.validated_root(config, 'config'))
if default_operation is not None:
subtree.append({
'tag': 'default-operation',
SPEC = {'tag': 'delete-config', 'subtree': []}
- def request(self, target=None, target_url=None):
+ def request(self, target):
spec = DeleteConfig.SPEC.copy()
- spec['subtree'].append({
- 'tag': 'target',
- 'subtree': util.store_or_url(target, target_url, self._assert)
- })
+ spec['subtree'].append(util.store_or_url('source', source, self._assert))
return self._request(spec)
SPEC = {'tag': 'copy-config', 'subtree': []}
- def request(self, source=None, source_url=None, target=None, target_url=None):
+ def request(self, source, target):
spec = CopyConfig.SPEC.copy()
- spec['subtree'].append({
- 'tag': 'source',
- 'subtree': util.store_or_url(source, source_url, self._assert)
- })
- spec['subtree'].append({
- 'tag': 'target',
- 'subtree': util.store_or_url(target, target_url, self._assert)
- })
+ spec['subtree'].append(util.store_or_url('source', source, self._assert))
+ spec['subtree'].append(util.store_or_url('target', source, self._assert))
return self._request(spec)
SPEC = {'tag': 'validate', 'subtree': []}
- def request(self, source=None, source_url=None, config=None):
- util.one_of(source, source_url, config)
+ def request(self, source=None, config=None):
+ util.one_of(source, config)
spec = Validate.SPEC.copy()
if config is None:
- spec['subtree'].append({
- 'tag': 'source',
- 'subtree': util.store_or_url(source, source_url, self._assert)
- })
+ spec['subtree'].append(util.store_or_url('source', source, self._assert))
else:
- spec['subtree'].append(content.root_ensured(config, 'config'))
+ spec['subtree'].append(content.validated_root(config, 'config'))
return self._request(spec)
return self._request(Commit.SPEC)
+class DiscardChanges(RPC):
+
+ # tested: no
+ # combed: yes
+
+ DEPENDS = [':candidate']
+
+ SPEC = {'tag': 'discard-changes'}
+
+
class ConfirmedCommit(Commit):
"psuedo-op"
def confirm(self):
"Make the confirming commit"
return Commit.request(self, confirmed=True)
-
-
-class DiscardChanges(RPC):
-
- # tested: no
- # combed: yes
-
- DEPENDS = [':candidate']
- SPEC = {'tag': 'discard-changes'}
-
+ def discard(self):
+ return DiscardChanges(self.session, self.async, self.timeout).request()
+# Copyright 2009 Shikhar Bhushan
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from ncclient import NCClientError
class OperationError(NCClientError):
def _parsing_hook(self, root):
self._data = None
if not self._errors:
- self._data = content.namespaced_find(root, 'data')
+ self._data = content.find(root, 'data')
@property
- def data_element(self):
+ def data(self):
if not self._parsed:
self.parse()
return self._data
-
- @property
- def data_xml(self):
- return content.element2string(self.data_element)
-
- data = data_element
class Get(RPC):
def request(self, filter=None):
spec = Get.SPEC.copy()
if filter is not None:
- spec['subtree'].append(util.build_filter(filter)))
+ spec['subtree'].append(util.build_filter(filter))
return self._request(spec)
+
class GetConfig(RPC):
# tested: no
REPLY_CLS = GetReply
- def request(self, source=None, source_url=None, filter=None):
+ def request(self, source, filter=None):
"""
`filter` has to be a tuple of (type, criteria)
The type may be one of 'xpath' or 'subtree'
The criteria may be an ElementTree.Element, an XML fragment, or tree specification
"""
spec = GetConfig.SPEC.copy()
- spec['subtree'].append({
- 'tag': 'source',
- 'subtree': util.store_or_url(source, source_url)
- })
+ spec['subtree'].append(util.store_or_url('source', source, self._assert))
if filter is not None:
spec['subtree'].append(util.build_filter(filter))
return self._request(spec)
-
from ncclient import content
-from reply import RPCReply
+from errors import OperationError
import logging
logger = logging.getLogger('ncclient.rpc')
return
root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
# per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
- ok = content.namespaced_find(root, 'ok')
+ ok = content.find(root, 'ok')
if ok is not None:
logger.debug('parsed [%s]' % ok.tag)
else: # create RPCError objects from <rpc-error> elements
- error = content.namespaced_find(root, 'rpc-error')
+ error = content.find(root, 'rpc-error')
if error is not None:
logger.debug('parsed [%s]' % error.tag)
for err in root.getiterator(error.tag):
d = {}
for err_detail in err.getchildren(): # <error-type> etc..
tag = content.unqualify(err_detail.tag)
- d[tag] = (err_detail.text.strip() if tag != 'error-info'
- else content.ele2string(err_detail, 'utf-8'))
+ if tag != 'error-info':
+ d[tag] = err_detail.text.strip()
+ else:
+ d[tag] = content.ele2xml(err_detail)
self._errors.append(RPCError(d))
self._parsing_hook(root)
self._parsed = True
return self._errors
-class RPCError(ncclient.RPCError): # raise it if you like
+class RPCError(OperationError): # raise it if you like
def __init__(self, err_dict):
self._dict = err_dict
if self.message is not None:
- ncclient.RPCError.__init__(self, self.message)
+ OperationError.__init__(self, self.message)
else:
- ncclient.RPCError.__init__(self)
-
- @property
- def raw(self):
- return self._element.tostring()
+ OperationError.__init__(self)
@property
def type(self):
SPEC = { 'tag': 'close-session' }
- def _delivery_hook(self)
+ def _delivery_hook(self):
self.session.close()
'text': session_id
})
return self._request(spec)
-
'Boilerplate ugliness'
-from ncclient import OperationError
-from ncclient.content import qualify as _
-from ncclient.content import root_ensured
+from ncclient import content
-from errors import MissingCapabilityError
+from errors import OperationError, MissingCapabilityError
def one_of(*args):
'Verifies that only one of the arguments is not None'
return
raise OperationError('Insufficient parameters')
-def store_or_url(store, url, capcheck=None):
- one_of(store, url)
- node = {}
- if store is not None:
- node['tag'] = store
- else:
+def store_or_url(wha, loc, capcheck=None):
+ node = { 'tag': wha, 'subtree': {} }
+ if '://' in loc: # e.g. http://, file://, ftp://
if capcheck is not None:
- capcheck(':url') # hmm.. schema check? deem overkill for now
- node['tag'] = 'url'
- node['text'] = url
+ capcheck(':url') # url schema check at some point!
+ node['subtree']['tag'] = 'url'
+ node['subtree']['text'] = loc
+ else:
+ node['subtree']['tag'] = loc
return node
def build_filter(spec, capcheck=None):
'tag': 'filter',
'attributes': {'type': type},
'subtree': criteria
- }
+ }
else:
- rep = root_ensure(spec, 'filter', 'type')
+ rep = content.validated_element(spec, 'filter', 'type')
try:
type = rep['type']
except KeyError:
- type = ele[qualify('type'))
- if type == 'xpath' and capcheck_func is not None:
- capcheck_func(':xpath')
+ type = ele[content.qualify('type')]
+ if type == 'xpath' and capcheck is not None:
+ capcheck(':xpath')
return rep
-
from ncclient import content
-class HelloHandler(Listener):
+class HelloHandler:
def __init__(self, init_cb, error_cb):
self._init_cb = init_cb
self._error_cb = error_cb
- def __str__(self):
- return 'HelloListener'
-
def callback(self, root, raw):
if content.unqualify(root[0]) == 'hello':
try: