from copy import deepcopy
-from ncclient import content
+from ncclient import xml_
from rpc import RPC
import logging
logger = logging.getLogger('ncclient.operations.edit')
-
-
"Operations related to changing device configuration"
class EditConfig(RPC):
'tag': 'default-operation',
'text': default_operation
})
- subtree.append(content.validated_element(config, ('config', content.qualify('config'))))
+ subtree.append(xml_.validated_element(config, ('config', xml_.qualify('config'))))
return self._request(spec)
class DeleteConfig(RPC):
"""
spec = deepcopy(Validate.SPEC)
try:
- src = content.validated_element(source, ('config', content.qualify('config')))
+ src = markup.validated_element(source, ('config', markup.qualify('config')))
except Exception as e:
logger.debug(e)
src = util.store_or_url('source', source, self._assert)
from rpc import RPC, RPCReply
-from ncclient import content
+from ncclient import xml_
from copy import deepcopy
import util
def _parsing_hook(self, root):
self._data = None
if not self._errors:
- self._data = content.find(root, 'data',
- nslist=[content.BASE_NS,
- content.CISCO_BS])
+ self._data = xml_.find(root, 'data',
+ nslist=[xml_.BASE_NS,
+ xml_.CISCO_BS])
@property
def data_ele(self):
"*<data>* element as an XML string"
if not self._parsed:
self.parse()
- return content.ele2xml(self._data)
+ return xml_.ele2xml(self._data)
@property
def data_dtree(self):
"*<data>* element in :ref:`dtree`"
- return content.ele2dtree(self._data)
+ return xml_.ele2dtree(self._data)
#: Same as :attr:`data_ele`
data = data_ele
from threading import Event, Lock
from uuid import uuid1
-from ncclient import content
+from ncclient import xml_
from ncclient.transport import SessionListener
from errors import OperationError, TimeoutExpiredError, MissingCapabilityError
"""Parse the *<rpc-reply>*"""
if self._parsed:
return
- root = self._root = content.xml2ele(self._raw) # <rpc-reply> element
+ root = self._root = xml_.xml2ele(self._raw) # <rpc-reply> element
# per rfc 4741 an <ok/> tag is sent when there are no errors or warnings
- ok = content.find(root, 'ok', nslist=[content.BASE_NS, content.CISCO_BS])
+ ok = xml_.find(root, 'ok', nslist=[xml_.BASE_NS, xml_.CISCO_BS])
if ok is not None:
logger.debug('parsed [%s]' % ok.tag)
else: # create RPCError objects from <rpc-error> elements
- error = content.find(root, 'rpc-error', nslist=[content.BASE_NS, content.CISCO_BS])
+ error = xml_.find(root, 'rpc-error', nslist=[xml_.BASE_NS, xml_.CISCO_BS])
if error is not None:
logger.debug('parsed [%s]' % error.tag)
for err in root.getiterator(error.tag):
# process a particular <rpc-error>
d = {}
for err_detail in err.getchildren(): # <error-type> etc..
- tag = content.unqualify(err_detail.tag)
+ tag = xml_.unqualify(err_detail.tag)
if tag != 'error-info':
d[tag] = err_detail.text.strip()
else:
- d[tag] = content.ele2xml(err_detail)
+ d[tag] = xml_.ele2xml(err_detail)
self._errors.append(RPCError(d))
self._parsing_hook(root)
self._parsed = True
def callback(self, root, raw):
tag, attrs = root
- if content.unqualify(tag) != 'rpc-reply':
+ if xml_.unqualify(tag) != 'rpc-reply':
return
rpc = None
for key in attrs:
- if content.unqualify(key) == 'message-id':
+ if xml_.unqualify(key) == 'message-id':
id = attrs[key]
try:
with self._lock:
spec = {
'tag': 'rpc',
'attrib': {
- 'xmlns': content.BASE_NS,
+ 'xmlns': xml_.BASE_NS,
'message-id': self._id
},
'subtree': [ opspec ]
}
- return content.dtree2xml(spec)
+ return xml_.dtree2xml(spec)
def _request(self, op):
"""Subclasses call this method to make the RPC request.
from rpc import RPC
-from ncclient.content import qualify as _
-from ncclient.transport import SessionListener
-
-NOTIFICATION_NS = 'urn:ietf:params:xml:ns:netconf:notification:1.0'
-
-# TODO when can actually test it...
-
-class CreateSubscription(RPC):
- # tested: no
-
- SPEC = {
- 'tag': _('create-subscription', NOTIFICATION_NS)
- }
-
-class Notification: pass
-
-class NotificationListener(SessionListener): pass
+#from ncclient.xml import qualify as _
+#from ncclient.transport import SessionListener
+#
+#NOTIFICATION_NS = 'urn:ietf:params:xml:ns:netconf:notification:1.0'
+#
+## TODO when can actually test it...
+#
+#class CreateSubscription(RPC):
+# # tested: no
+#
+# SPEC = {
+# 'tag': _('create-subscription', NOTIFICATION_NS)
+# }
+#
+#class Notification: pass
+#
+#class NotificationListener(SessionListener): pass
'Boilerplate ugliness'
-from ncclient import content
+from ncclient import xml_
from errors import OperationError, MissingCapabilityError
else:
raise OperationError("Invalid filter type")
else:
- rep = content.validated_element(spec, ['filter', content.qualify('filter')],
- attrs=[('type', content.qualify('type'))])
+ rep = xml_.validated_element(spec, ['filter', xml_.qualify('filter')],
+ attrs=[('type', xml_.qualify('type'))])
if type == 'xpath' and capcheck is not None:
capcheck(':xpath')
return rep
from Queue import Queue
from threading import Thread, Lock, Event
-from ncclient import content
+from ncclient import xml_
from ncclient.capabilities import Capabilities
from errors import TransportError
def _dispatch_message(self, raw):
try:
- root = content.parse_root(raw)
+ root = xml_.parse_root(raw)
except Exception as e:
logger.error('error parsing dispatch message: %s' % e)
return
self._error_cb = error_cb
def callback(self, root, raw):
- if content.unqualify(root[0]) == 'hello':
+ if xml_.unqualify(root[0]) == 'hello':
try:
id, capabilities = HelloHandler.parse(raw)
except Exception as e:
"Given a list of capability URI's returns <hello> message XML string"
spec = {
'tag': 'hello',
- 'attrib': {'xmlns': content.BASE_NS},
+ 'attrib': {'xmlns': xml_.BASE_NS},
'subtree': [{
'tag': 'capabilities',
'subtree': # this is fun :-)
[{'tag': 'capability', 'text': uri} for uri in capabilities]
}]
}
- return content.dtree2xml(spec)
+ return xml_.dtree2xml(spec)
@staticmethod
def parse(raw):
"Returns tuple of (session-id (str), capabilities (Capabilities)"
sid, capabilities = 0, []
- root = content.xml2ele(raw)
+ root = xml_.xml2ele(raw)
for child in root.getchildren():
- tag = content.unqualify(child.tag)
+ tag = xml_.unqualify(child.tag)
if tag == 'session-id':
sid = child.text
elif tag == 'capabilities':
for cap in child.getchildren():
- if content.unqualify(cap.tag) == 'capability':
+ if xml_.unqualify(cap.tag) == 'capability':
capabilities.append(cap.text)
return sid, Capabilities(capabilities)
# limitations under the License.
-"""The :mod:`content` module provides methods for creating XML documents, parsing XML, and converting between different XML representations. It uses :mod:`~xml.etree.ElementTree` internally.
+"""The :mod:`xml` module provides methods for creating XML documents, parsing
+XML, and converting between different XML representations. It uses
+:mod:`~xml.etree.ElementTree` internally.
"""
from cStringIO import StringIO
### Namespace-related
-#: Base NETCONf namespace
+#: Base NETCONF namespace
BASE_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0'
#: ... and this is BASE_NS according to Cisco devices tested
CISCO_BS = 'urn:ietf:params:netconf:base:1.0'
#: namespace for Tail-f data model
TAILF_AAA_1_1 = 'http://tail-f.com/ns/aaa/1.1'
+#: namespace for Tail-f data model
TAILF_EXECD_1_1 = 'http://tail-f.com/ns/execd/1.1'
+#: namespace for Cisco data model
+CISCO_CPI_10 = 'http://www.cisco.com/cpi_10/schema'
try:
register_namespace = ET.register_namespace
# cElementTree uses ElementTree's _namespace_map, so that's ok
ElementTree._namespace_map[uri] = prefix
-# we'd like BASE_NS to be prefixed as "netconf"
register_namespace('netconf', BASE_NS)
register_namespace('aaa', TAILF_AAA_1_1)
register_namespace('execd', TAILF_EXECD_1_1)
+register_namespace('cpi', CISCO_CPI_10)
+
qualify = lambda tag, ns=BASE_NS: tag if ns is None else '{%s}%s' % (ns, tag)
iselement = ET.iselement
def find(ele, tag, nslist=[]):
- """If *nslist* is empty, same as :meth:`xml.etree.ElementTree.Element.find`. If it is not, *tag* is interpreted as an unqualified name and qualified using each item in *nslist* (with a :const:`None` item in *nslit* meaning no qualification is done). The first match is returned.
+ """If *nslist* is empty, same as :meth:`xml.etree.ElementTree.Element.find`.
+ If it is not, *tag* is interpreted as an unqualified name and qualified
+ using each item in *nslist* (with a :const:`None` item in *nslit* meaning no
+ qualification is done). The first match is returned.
:arg nslist: optional list of namespaces
:type nslit: `string` `list`
return (element.tag, element.attrib)
def validated_element(rep, tags=None, attrs=None, text=None):
- """Checks if the root element meets the supplied criteria. Returns a :class:`~xml.etree.ElementTree.Element` instance if so, otherwise raises :exc:`ContentError`.
+ """Checks if the root element meets the supplied criteria. Returns a
+ :class:`~xml.etree.ElementTree.Element` instance if so, otherwise raises
+ :exc:`ContentError`.
:arg tags: tag name or a list of allowable tag names
:arg attrs: list of required attribute names, each item may be a list of allowable alternatives