Statistics
| Branch: | Tag: | Revision:

root / ncclient / content.py @ 65c6a607

History | View | Annotate | Download (4.5 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
"TODO: docstring"
16

    
17
from xml.etree import cElementTree as ET
18

    
19
from ncclient import NCClientError
20

    
21
class ContentError(NCClientError):
22
    pass
23

    
24
### Namespace-related
25

    
26
BASE_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0'
27
# and this is BASE_NS according to cisco devices...
28
CISCO_BS = 'urn:ietf:params:netconf:base:1.0'
29

    
30
try:
31
    register_namespace = ET.register_namespace
32
except AttributeError:
33
    def register_namespace(prefix, uri):
34
        from xml.etree import ElementTree
35
        # cElementTree uses ElementTree's _namespace_map, so that's ok
36
        ElementTree._namespace_map[uri] = prefix
37

    
38
# we'd like BASE_NS to be prefixed as "netconf"
39
register_namespace('netconf', BASE_NS)
40

    
41
qualify = lambda tag, ns=BASE_NS: tag if ns is None else '{%s}%s' % (ns, tag)
42

    
43
multiqualify = lambda tag, nslist=(BASE_NS, CISCO_BS): [qualify(tag, ns) for ns in nslist]
44

    
45
unqualify = lambda tag: tag[tag.rfind('}')+1:]
46

    
47
### Other utility functions
48

    
49
iselement = ET.iselement
50

    
51
def namespaced_find(ele, tag, strict=False):
52
    """In strict mode, doesn't work around Cisco implementations sending incorrectly
53
    namespaced XML. Supply qualified name if using strict mode.
54
    """
55
    found = None
56
    if strict:
57
        found = ele.find(tag)
58
    else:
59
        for qname in multiqualify(tag):
60
            found = ele.find(qname)
61
            if found is not None:
62
                break
63
    return found
64

    
65
def parse_root(raw):
66
    '''Parse the top-level element from XML string.
67
    
68
    Returns a `(tag, attributes)` tuple, where `tag` is a string representing
69
    the qualified name of the root element and `attributes` is an
70
    `{attribute: value}` dictionary.
71
    '''
72
    fp = StringIO(raw[:1024]) # this is a guess but start element beyond 1024 bytes would be a bit absurd
73
    for event, element in ET.iterparse(fp, events=('start',)):
74
        return (element.tag, element.attrib)
75

    
76
def root_ensured(rep, req_tag, req_attrs=None):
77
    rep = to_element(rep)
78
    if rep.tag not in (req_tag, qualify(req_tag)):
79
        raise ContentError("Required root element [%s] not found" % req_tag)
80
    if req_attrs is not None:
81
        pass # TODO
82
    return rep
83

    
84
### XML with Python data structures
85

    
86
dtree2ele = DictTree.Element
87
dtree2xml = DictTree.XML
88
ele2dtree = Element.DictTree
89
ele2xml = Element.XML
90
xml2dtree = XML.DictTree
91
xml2ele = XML.Element
92

    
93
class DictTree:
94

    
95
    @staticmethod
96
    def Element(spec):
97
        if iselement(spec):
98
            return spec
99
        elif isinstance(spec, basestring):
100
            return XML.Element(spec)
101
        if not isinstance(spec, dict):
102
            raise ContentError("Invalid tree spec")
103
        if 'tag' in spec:
104
            ele = ET.Element(spec.get('tag'), spec.get('attributes', {}))
105
            ele.text = spec.get('text', '')
106
            ele.tail = spec.get('tail', '')
107
            subtree = spec.get('subtree', [])
108
            # might not be properly specified as list but may be dict
109
            if isinstance(subtree, dict):
110
                subtree = [subtree]
111
            for subele in subtree:
112
                ele.append(DictTree.Element(subele))
113
            return ele
114
        elif 'comment' in spec:
115
            return ET.Comment(spec.get('comment'))
116
        else:
117
            raise ContentError('Invalid tree spec')
118
    
119
    @staticmethod
120
    def XML(spec):
121
        Element.XML(DictTree.Element(spec))
122

    
123
class Element:
124
    
125
    @staticmethod
126
    def DictTree(ele):
127
        return {
128
            'tag': ele.tag,
129
            'attributes': ele.attrib,
130
            'text': ele.text,
131
            'tail': ele.tail,
132
            'subtree': [ Element.DictTree(child) for child in root.getchildren() ]
133
        }
134
    
135
    @staticmethod
136
    def XML(ele, encoding='utf-8'):
137
        xml = ET.tostring(ele, encoding)
138
        return xml if xml.startswith('<?xml') else '<?xml version="1.0" encoding="%s"?>\n%s' % (encoding, xml)
139

    
140
class XML:
141
    
142
    @staticmethod
143
    def DictTree(ele):
144
        return Element.DictTree(Element.XML(ele))
145
    
146
    @staticmethod
147
    def Element(xml):
148
        return ET.fromstring(xml)