Statistics
| Branch: | Tag: | Revision:

root / astakosclient / astakosclient / keypath.py @ 26498848

History | View | Annotate | Download (7.1 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
def dict_merge(a, b):
36
    """
37
    http://www.xormedia.com/recursively-merge-dictionaries-in-python/
38
    """
39
    if not isinstance(b, dict):
40
        return b
41
    result = copy.deepcopy(a)
42
    for k, v in b.iteritems():
43
        if k in result and isinstance(result[k], dict):
44
                result[k] = dict_merge(result[k], v)
45
        else:
46
            result[k] = copy.deepcopy(v)
47
    return result
48

    
49

    
50
def lookup_path(container, path, sep='.', createpath=False):
51
    """
52
    return (['a','b'],
53
            [container['a'], container['a']['b']],
54
            'c')  where path=sep.join(['a','b','c'])
55

56
    """
57
    names = path.split(sep)
58
    dirnames = names[:-1]
59
    basename = names[-1]
60

    
61
    node = container
62
    name_path = []
63
    node_path = [node]
64
    for name in dirnames:
65
        name_path.append(name)
66
        if name not in node:
67
            if not createpath:
68
                m = "'{0}': path not found".format(sep.join(name_path))
69
                raise KeyError(m)
70
            node[name] = {}
71
        try:
72
            node = node[name]
73
        except TypeError as e:
74
            m = "'{0}': cannot traverse path beyond this node: {1}"
75
            m = m.format(sep.join(name_path), str(e))
76
            raise ValueError(m)
77
        node_path.append(node)
78

    
79
    return name_path, node_path, basename
80

    
81

    
82
def walk_paths(container):
83
    for name, node in container.iteritems():
84
        if not hasattr(node, 'items'):
85
            yield [name], [node]
86
        else:
87
            for names, nodes in walk_paths(node):
88
                yield [name] + names, [node] + nodes
89

    
90

    
91
def list_paths(container, sep='.'):
92
    """
93
    >>> sorted(list_paths({'a': {'b': {'c': 'd'}}}))
94
    [('a.b.c', 'd')]
95
    >>> sorted(list_paths({'a': {'b': {'c': 'd'}, 'e': 3}}))
96
    [('a.b.c', 'd'), ('a.e', 3)]
97
    >>> sorted(list_paths({'a': {'b': {'c': 'd'}, 'e': {'f': 3}}}))
98
    [('a.b.c', 'd'), ('a.e.f', 3)]
99
    >>> list_paths({})
100
    []
101

102
    """
103
    return [(sep.join(name_path), node_path[-1])
104
            for name_path, node_path in walk_paths(container)]
105

    
106

    
107
def del_path(container, path, sep='.', collect=True):
108
    """
109
    del container['a']['b']['c'] where path=sep.join(['a','b','c'])
110

111
    >>> d = {'a': {'b': {'c': 'd'}}}; del_path(d, 'a.b.c'); d
112
    {}
113
    >>> d = {'a': {'b': {'c': 'd'}}}; del_path(d, 'a.b.c', collect=False); d
114
    {'a': {'b': {}}}
115
    >>> d = {'a': {'b': {'c': 'd'}}}; del_path(d, 'a.b.c.d')
116
    Traceback (most recent call last):
117
    ValueError: 'a.b.c': cannot traverse path beyond this node:\
118
 'str' object does not support item deletion
119
    """
120

    
121
    name_path, node_path, basename = \
122
            lookup_path(container, path, sep=sep, createpath=False)
123

    
124
    lastnode = node_path.pop()
125
    lastname = basename
126
    try:
127
        if basename in lastnode:
128
            del lastnode[basename]
129
    except (TypeError, KeyError) as e:
130
        m = "'{0}': cannot traverse path beyond this node: {1}"
131
        m = m.format(sep.join(name_path), str(e))
132
        raise ValueError(m)
133

    
134
    if collect:
135
        while node_path and not lastnode:
136
            basename = name_path.pop()
137
            lastnode = node_path.pop()
138
            del lastnode[basename]
139

    
140

    
141
def get_path(container, path, sep='.'):
142
    """
143
    return container['a']['b']['c'] where path=sep.join(['a','b','c'])
144

145
    >>> get_path({'a': {'b': {'c': 'd'}}}, 'a.b.c.d')
146
    Traceback (most recent call last):
147
    ValueError: 'a.b.c.d': cannot traverse path beyond this node:\
148
 string indices must be integers, not str
149
    >>> get_path({'a': {'b': {'c': 1}}}, 'a.b.c.d')
150
    Traceback (most recent call last):
151
    ValueError: 'a.b.c.d': cannot traverse path beyond this node:\
152
 'int' object is unsubscriptable
153
    >>> get_path({'a': {'b': {'c': 1}}}, 'a.b.c')
154
    1
155
    >>> get_path({'a': {'b': {'c': 1}}}, 'a.b')
156
    {'c': 1}
157

158
    """
159
    name_path, node_path, basename = \
160
            lookup_path(container, path, sep=sep, createpath=False)
161
    name_path.append(basename)
162
    node = node_path[-1]
163

    
164
    try:
165
        return node[basename]
166
    except TypeError as e:
167
        m = "'{0}': cannot traverse path beyond this node: {1}"
168
        m = m.format(sep.join(name_path), str(e))
169
        raise ValueError(m)
170
    except KeyError as e:
171
        m = "'{0}': path not found: {1}"
172
        m = m.format(sep.join(name_path), str(e))
173
        raise KeyError(m)
174

    
175

    
176
def set_path(container, path, value, sep='.',
177
             createpath=False, overwrite=True):
178
    """
179
    container['a']['b']['c'] = value where path=sep.join(['a','b','c'])
180

181
    >>> set_path({'a': {'b': {'c': 'd'}}}, 'a.b.c.d', 1)
182
    Traceback (most recent call last):
183
    ValueError: 'a.b.c.d': cannot traverse path beyond this node:\
184
 'str' object does not support item assignment
185
    >>> set_path({'a': {'b': {'c': 'd'}}}, 'a.b.x.d', 1)
186
    Traceback (most recent call last):
187
    KeyError: "'a.b.x': path not found"
188
    >>> set_path({'a': {'b': {'c': 'd'}}}, 'a.b.x.d', 1, createpath=True)
189
    
190
    >>> set_path({'a': {'b': {'c': 'd'}}}, 'a.b.c', 1)
191
     
192
    >>> set_path({'a': {'b': {'c': 'd'}}}, 'a.b.c', 1, overwrite=False)
193
    Traceback (most recent call last):
194
    ValueError: will not overwrite path 'a.b.c'
195

196
    """
197
    name_path, node_path, basename = \
198
            lookup_path(container, path, sep=sep, createpath=createpath)
199
    name_path.append(basename)
200
    node = node_path[-1]
201

    
202
    if basename in node and not overwrite:
203
        m = "will not overwrite path '{0}'".format(path)
204
        raise ValueError(m)
205

    
206
    try:
207
        node[basename] = value
208
    except TypeError as e:
209
        m = "'{0}': cannot traverse path beyond this node: {1}"
210
        m = m.format(sep.join(name_path), str(e))
211
        raise ValueError(m)
212

    
213

    
214
if __name__ == '__main__':
215
    import doctest
216
    doctest.testmod()