capabilities.append(cap.text)
return sid, capabilities
+
class RootParser:
'''Parser for the top-level element of an XML document. Does not look at any
- sub-elements. It is useful for efficiently determining the type of received
- messages.
+ sub-elements. It is useful for efficiently determining the type of a received
+ message.
'''
@staticmethod
'''Parse the top-level element from a string representing an XML document.
recognized is a list of tag names that will be successfully parsed.
- The tag names should not be qualified. This is for simplicity of parsing
- where the NETCONF implementation is non-compliant (e.g. cisco's which
- uses an incorrect namespace)
+ The tag names should be qualified with namespace.
Returns a `(tag, attributes)` tuple, where `tag` is a string representing
the qualified name of the recognized element and `attributes` is an
fp = StringIO(raw)
for event, element in ET.iterparse(fp, events=('start',)):
for ele in recognized:
- if __(element.tag) == ele:
- attrs = {}
- for attr in element.attrib:
- attrs[__(attr)] = element.attrib[attr]
- return (ele, attrs)
+ if element.tag == ele:
+ return (element.tag, element.attrib)
break
class RPCReplyParser:
@staticmethod
+ def parse_rpc_error(root, ns):
+ err_dict = {}
+ for tag in ('error-type', 'error-tag', 'error-severity', 'error-app-tag',
+ 'error-path', 'error-message', 'error-info'):
+ ele = root.find(_(tag, ns))
+ if ele is not None:
+ err_dict[tag] = ele.text
+
+ @staticmethod
def parse(raw):
- pass
+ oktags = (_('ok', BASE_NS), _('ok', CISCO_NS))
+ root = ET.fromstring(raw)
+ ok = root.find(_('ok', BASE_NS))
+ if ok is None:
+ ok = root.find(_('ok', CISCO_BS))
+ ns = BASE_NS
+ errs = root.findall(_('rpc-error', BASE_NS))
+ if not errs:
+ errs = root.findall(_('rpc-error', CISCO_BS))
+ ns = CISCO_NS
+ ret = [ (err, RPCReplyParser.parse_rpc_error(err, ns)) for err in errs ]
+ return ok, ret
from ncclient.content.common import unqualify as __
from ncclient.content.common import BASE_NS, CISCO_BS
+q_rpcreply = [_('rpc-reply', BASE_NS), _('rpc-reply', CISCO_BS)]
+
class SessionListener:
'''This is the glue between received data and the object it should be
@property
def _recognized_elements(self):
- elems = [ 'rpc-reply' ]
+ elems = q_rpcreply
with self._lock:
elems.extend(self._tag2obj.keys())
return elems
def received(self, raw):
res = RootParser.parse(raw, self._recognized_elements)
if res is not None:
- (tag, attrs) = res
+ tag, attrs = res # unpack
else:
return
logger.debug('SessionListener.reply: parsed (%r, %r)' % res)
try:
- cb = None
- if tag == 'rpc-reply':
- id = attrs.get('message-id', None)
- if id is None:
- logger.warning('<rpc-reply> w/o message-id attr received: %s' % raw)
+ obj = None
+ if tag in q_rpcreply:
+ for key in attrs:
+ if __(key) == 'message-id':
+ id = attrs[key]
+ break
else:
- obj = self._id2rpc.get(id, None)
+ logger.warning('<rpc-reply> without message-id received: %s'
+ % raw)
+ obj = self._id2rpc.get(id, None)
else:
obj = self._tag2obj.get(tag, None)
if obj is not None: