Statistics
| Branch: | Tag: | Revision:

root / ncclient / content.py @ 6ab782f7

History | View | Annotate | Download (4 kB)

1
# Copyright 2009 Shikhar Bhushan
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#    http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

    
15
"TODO: docstring"
16

    
17
from xml.etree import cElementTree as ET
18

    
19
from ncclient import NCClientError
20

    
21
class ContentError(NCClientError):
22
    pass
23

    
24
### Namespace-related ###
25

    
26
BASE_NS = 'urn:ietf:params:xml:ns:netconf:base:1.0'
27
# and this is BASE_NS according to cisco devices...
28
CISCO_BS = 'urn:ietf:params:netconf:base:1.0'
29

    
30
try:
31
    register_namespace = ET.register_namespace
32
except AttributeError:
33
    def register_namespace(prefix, uri):
34
        from xml.etree import ElementTree
35
        # cElementTree uses ElementTree's _namespace_map, so that's ok
36
        ElementTree._namespace_map[uri] = prefix
37

    
38
# we'd like BASE_NS to be prefixed as "netconf"
39
register_namespace('netconf', BASE_NS)
40

    
41
qualify = lambda tag, ns=BASE_NS: '{%s}%s' % (ns, tag)
42

    
43
# i would have written a def if lambdas weren't so much fun
44
multiqualify = lambda tag, nslist=(BASE_NS, CISCO_BS): [qualify(tag, ns)
45
                                                        for ns in nslist]
46

    
47
unqualify = lambda tag: tag[tag.rfind('}')+1:]
48

    
49
### XML with Python data structures
50

    
51
def to_element(spec):
52
    "TODO: docstring"
53
    if iselement(spec):
54
        return spec
55
    elif isinstance(spec, basestring):
56
        return ET.XML(spec)
57
    if not isinstance(spec, dict):
58
        raise ContentError("Invalid tree spec")
59
    if 'tag' in spec:
60
        ele = ET.Element(spec.get('tag'), spec.get('attributes', {}))
61
        ele.text = spec.get('text', '')
62
        ele.tail = spec.get('tail', '')
63
        subtree = spec.get('subtree', [])
64
        # might not be properly specified as list but may be dict
65
        if isinstance(subtree, dict):
66
            subtree = [subtree]
67
        for subele in subtree:
68
            ele.append(XMLConverter.build(subele))
69
        return ele
70
    elif 'comment' in spec:
71
        return ET.Comment(spec.get('comment'))
72
    else:
73
        raise ContentError('Invalid tree spec')
74

    
75
def from_xml(xml):
76
    return ET.fromstring(xml)
77

    
78
def to_xml(repr, encoding='utf-8'):
79
    "TODO: docstring"
80
    xml = ET.tostring(to_element(repr), encoding)
81
    # some etree versions don't include xml decl with utf-8
82
    # this is a problem with some devices
83
    return (xml if xml.startswith('<?xml')
84
            else '<?xml version="1.0" encoding="%s"?>%s' % (encoding, xml))
85

    
86

    
87
## Utility functions
88

    
89
iselement = ET.iselement
90

    
91
def namespaced_find(ele, tag, strict=False):
92
    """In strict mode, doesn't work around Cisco implementations sending incorrectly
93
    namespaced XML. Supply qualified name if using strict mode.
94
    """
95
    found = None
96
    if strict:
97
        found = ele.find(tag)
98
    else:
99
        for qname in multiqualify(tag):
100
            found = ele.find(qname)
101
            if found is not None:
102
                break
103
    return found
104

    
105
def parse_root(raw):
106
    '''Internal use.
107
    Parse the top-level element from XML string.
108
    
109
    Returns a `(tag, attributes)` tuple, where `tag` is a string representing
110
    the qualified name of the root element and `attributes` is an
111
    `{attribute: value}` dictionary.
112
    '''
113
    fp = StringIO(raw[:1024]) # this is a guess but start element beyond 1024 bytes would be a bit absurd
114
    for event, element in ET.iterparse(fp, events=('start',)):
115
        return (element.tag, element.attrib)
116

    
117
def root_ensured(rep, req_tag, req_attrs=None):
118
    rep = to_element(rep)
119
    if rep.tag not in (req_tag, qualify(req_tag)):
120
        raise ContentError("Required root element [%s] not found" % req_tag)
121
    if req_attrs is not None:
122
        pass # TODO
123
    return rep