Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / util / urltools.py @ 194a7cc0

History | View | Annotate | Download (11 kB)

1
"""
2
Copyright (c) 2013 Roderick Baier
3

4
Permission is hereby granted, free of charge, to any person obtaining a copy of
5
this software and associated documentation files (the "Software"), to deal in
6
the Software without restriction, including without limitation the rights to
7
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8
the Software, and to permit persons to whom the Software is furnished to do so,
9
subject to the following conditions:
10

11
The above copyright notice and this permission notice shall be included in all
12
copies or substantial portions of the Software.
13

14
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20
"""
21

    
22
import os
23
import re
24
import urllib
25
from collections import namedtuple
26
from posixpath import normpath
27

    
28

    
29
__all__ = ["ParseResult", "SplitResult", "split",
30
           "split_netloc", "assemble", "normalize",
31
           "normalize_host", "normalize_path", "normalize_query",
32
           "normalize_fragment", "unquote"]
33

    
34

    
35
#PSL_URL = 'http://mxr.mozilla.org/mozilla-central/source/netwerk/dns/effective_tld_names.dat?raw=1'
36
#
37
#def _get_public_suffix_list():
38
#    """Get the public suffix list.
39
#    """
40
#    local_psl = os.environ.get('PUBLIC_SUFFIX_LIST')
41
#    if local_psl:
42
#        psl_raw = open(local_psl).readlines()
43
#    else:
44
#        psl_raw = urllib.urlopen(PSL_URL).readlines()
45
#    psl = set()
46
#    for line in psl_raw:
47
#        item = line.strip()
48
#        if item != '' and not item.startswith('//'):
49
#            psl.add(item)
50
#    return psl
51
#
52
#PSL = _get_public_suffix_list()
53

    
54

    
55
SCHEMES = ['http', 'https', 'ftp', 'sftp', 'file', 'gopher', 'imap', 'mms',
56
           'news', 'nntp', 'telnet', 'prospero', 'rsync', 'rtsp', 'rtspu',
57
           'svn', 'git', 'ws', 'wss']
58
SCHEME_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
59
IP_CHARS = '0123456789.:'
60
DEFAULT_PORT = {
61
    'http': '80',
62
    'https': '443',
63
    'ws': '80',
64
    'wss': '443',
65
    'ftp': '21',
66
    'sftp': '22'
67
}
68
UNQUOTE_EXCEPTIONS = {
69
    'path': ' /?+#',
70
    'query': ' &=+#',
71
    'fragment': ' +#'
72
}
73

    
74
#_hextochr = {'%02x' % i: chr(i) for i in range(256)}
75
#_hextochr.update({'%02X' % i: chr(i) for i in range(256)})
76
_hextochr = dict(('%02x' % i, chr(i)) for i in range(256))
77
_hextochr.update(dict(('%02X' % i, chr(i)) for i in range(256)))
78
_idna_encode = lambda x: x.decode('utf-8').encode('idna')
79
_idna_decode = lambda x: x.decode('idna').encode('utf-8')
80

    
81
SplitResult = namedtuple('SplitResult', ['scheme', 'netloc', 'path', 'query',
82
                                         'fragment'])
83
ParseResult = namedtuple('ParseResult', ['scheme', 'username', 'password',
84
                                         'subdomain', 'domain', 'tld', 'port',
85
                                         'path', 'query', 'fragment'])
86

    
87

    
88
def normalize(url):
89
    """Normalize a URL
90
    """
91
    if url == '':
92
        return ''
93
    parts = split(url.strip())
94
    if parts.scheme:
95
        netloc = parts.netloc
96
        if parts.scheme in SCHEMES:
97
            path = normalize_path(parts.path)
98
        else:
99
            path = parts.path
100
    else:
101
        netloc = parts.path
102
        path = ''
103
        if '/' in netloc:
104
            tmp = netloc.split('/', 1)
105
            netloc = tmp[0]
106
            path = normalize_path('/' + tmp[1])
107
    username, password, host, port = split_netloc(netloc)
108
    host = normalize_host(host)
109
    port = normalize_port(parts.scheme, port)
110
    query = normalize_query(parts.query)
111
    fragment = normalize_fragment(parts.fragment)
112
    result = ParseResult(parts.scheme, username, password, None, host, None,
113
                         port, path, query, fragment)
114
    return assemble(result)
115

    
116

    
117
#def encode(url):
118
#    """Encode URL
119
#    """
120
#    parts = extract(url)
121
#    encoded = ParseResult(*(_idna_encode(p) for p in parts))
122
#    return assemble(encoded)
123

    
124

    
125
def assemble(parts):
126
    """Assemble a ParseResult to a new URL
127
    """
128
    nurl = ''
129
    if parts.scheme:
130
        if parts.scheme in SCHEMES:
131
            nurl += parts.scheme + '://'
132
        else:
133
            nurl += parts.scheme + ':'
134
    if parts.username and parts.password:
135
        nurl += parts.username + ':' + parts.password + '@'
136
    elif parts.username:
137
        nurl += parts.username + '@'
138
    if parts.subdomain:
139
        nurl += parts.subdomain + '.'
140
    nurl += parts.domain
141
    if parts.tld:
142
        nurl += '.' + parts.tld
143
    if parts.port:
144
        nurl += ':' + parts.port
145
    if parts.path:
146
        nurl += parts.path
147
    if parts.query:
148
        nurl += '?' + parts.query
149
    if parts.fragment:
150
        nurl += '#' + parts.fragment
151
    return nurl
152

    
153

    
154
def normalize_host(host):
155
    """Normalize host (decode IDNA)
156
    """
157
    if 'xn--' not in host:
158
        return host
159
    parts = host.split('.')
160
    return '.'.join([_idna_decode(p) for p in parts])
161

    
162

    
163
def normalize_port(scheme, port):
164
    """Check if the port is default port
165
    """
166
    if not scheme:
167
        return port
168
    if port and port != DEFAULT_PORT[scheme]:
169
        return port
170

    
171

    
172
def normalize_path(path):
173
    """Normalize path (collapse etc.)
174
    """
175
    if path in ['//', '/' ,'']:
176
        return '/'
177
    npath = normpath(unquote(path, exceptions=UNQUOTE_EXCEPTIONS['path']))
178
    if path[-1] == '/' and npath != '/':
179
        npath += '/'
180
    return npath
181

    
182

    
183
def normalize_query(query):
184
    """Normalize query (sort params by name, remove params without value)
185
    """
186
    if query == '' or len(query) <= 2:
187
        return ''
188
    nquery = unquote(query, exceptions=UNQUOTE_EXCEPTIONS['query'])
189
    params = nquery.split('&')
190
    nparams = []
191
    for param in params:
192
        if '=' in param:
193
            k, v = param.split('=', 1)
194
            if k and v:
195
                nparams.append("%s=%s" % (k, v))
196
    nparams.sort()
197
    return '&'.join(nparams)
198

    
199

    
200
def normalize_fragment(fragment):
201
    """Normalize fragment (unquote with exceptions only)
202
    """
203
    return unquote(fragment, UNQUOTE_EXCEPTIONS['fragment'])
204

    
205

    
206
def unquote(text, exceptions=[]):
207
    """Unquote a text but ignore the exceptions
208
    """
209
    if '%' not in text:
210
        return text
211
    s = text.split('%')
212
    res = [s[0]]
213
    for h in s[1:]:
214
        c = _hextochr.get(h[:2])
215
        if c and c not in exceptions:
216
            if len(h) > 2:
217
                res.append(c + h[2:])
218
            else:
219
                res.append(c)
220
        else:
221
            res.append('%' + h)
222
    return ''.join(res)
223

    
224

    
225
#def parse(url):
226
#    """Parse a URL
227
#    """
228
#    parts = split(url)
229
#    if parts.scheme:
230
#        (username, password, host, port) = split_netloc(parts.netloc)
231
#        (subdomain, domain, tld) = split_host(host)
232
#    else:
233
#        username = password = subdomain = domain = tld = port = ''
234
#    return ParseResult(parts.scheme, username, password, subdomain, domain, tld,
235
#                       port, parts.path, parts.query, parts.fragment)
236

    
237

    
238
#def extract(url):
239
#    """Extract as much information from a (relative) URL as possible
240
#    """
241
#    parts = split(url)
242
#    if parts.scheme:
243
#        netloc = parts.netloc
244
#        path = parts.path
245
#    else:
246
#        netloc = parts.path
247
#        path = ''
248
#        if '/' in netloc:
249
#            tmp = netloc.split('/', 1)
250
#            netloc = tmp[0]
251
#            path = '/' + tmp[1]
252
#    (username, password, host, port) = split_netloc(netloc)
253
#    (subdomain, domain, tld) = split_host(host)
254
#    return ParseResult(parts.scheme, username, password, subdomain, domain, tld,
255
#                       port, path, parts.query, parts.fragment)
256

    
257

    
258
def split(url):
259
    """Split URL into scheme, netloc, path, query and fragment
260
    """
261
    scheme = netloc = path = query = fragment = ''
262
    ip6_start = url.find('[')
263
    scheme_end = url.find(':')
264
    if ip6_start > 0 and ip6_start < scheme_end:
265
        scheme_end = -1
266
    if scheme_end > 0:
267
        for c in url[:scheme_end]:
268
            if c not in SCHEME_CHARS:
269
                break
270
        else:
271
            scheme = url[:scheme_end].lower()
272
            rest = url[scheme_end:].lstrip(':/')
273
    if not scheme:
274
        rest = url
275
    l_path = rest.find('/')
276
    l_query = rest.find('?')
277
    l_frag = rest.find('#')
278
    if l_path > 0:
279
        if l_query > 0 and l_frag > 0:
280
            netloc = rest[:l_path]
281
            path = rest[l_path:min(l_query, l_frag)]
282
        elif l_query > 0:
283
            if l_query > l_path:
284
                netloc = rest[:l_path]
285
                path = rest[l_path:l_query]
286
            else:
287
                netloc = rest[:l_query]
288
                path = ''
289
        elif l_frag > 0:
290
            netloc = rest[:l_path]
291
            path = rest[l_path:l_frag]
292
        else:
293
            netloc = rest[:l_path]
294
            path = rest[l_path:]
295
    else:
296
        if l_query > 0:
297
            netloc = rest[:l_query]
298
        elif l_frag > 0:
299
            netloc = rest[:l_frag]
300
        else:
301
            netloc = rest
302
    if l_query > 0:
303
        if l_frag > 0:
304
            query = rest[l_query+1:l_frag]
305
        else:
306
            query = rest[l_query+1:]
307
    if l_frag > 0:
308
        fragment = rest[l_frag+1:]
309
    if not scheme:
310
        path = netloc + path
311
        netloc = ''
312
    return SplitResult(scheme, netloc, path, query, fragment)
313

    
314

    
315
def _clean_netloc(netloc):
316
    """Remove trailing '.' and ':' and tolower
317
    """
318
    try:
319
        netloc.encode('ascii')
320
    except:
321
        return netloc.rstrip('.:').decode('utf-8').lower().encode('utf-8')
322
    else:
323
        return netloc.rstrip('.:').lower()
324

    
325

    
326
def split_netloc(netloc):
327
    """Split netloc into username, password, host and port
328
    """
329
    username = password = host = port = ''
330
    if '@' in netloc:
331
        user_pw, netloc = netloc.split('@', 1)
332
        if ':' in user_pw:
333
            username, password = user_pw.split(':', 1)
334
        else:
335
            username = user_pw
336
    netloc = _clean_netloc(netloc)
337
    if ':' in netloc and netloc[-1] != ']':
338
        host, port = netloc.rsplit(':', 1)
339
    else:
340
        host = netloc
341
    return username, password, host, port
342

    
343

    
344
#def split_host(host):
345
#    """Use the Public Suffix List to split host into subdomain, domain and tld
346
#    """
347
#    if '[' in host:
348
#        return '', host, ''
349
#    domain = subdomain = tld = ''
350
#    for c in host:
351
#        if c not in IP_CHARS:
352
#            break
353
#    else:
354
#        return '', host, ''
355
#    parts = host.split('.')
356
#    for i in range(len(parts)):
357
#        tld = '.'.join(parts[i:])
358
#        wildcard_tld = '*.' + tld
359
#        exception_tld = '!' + tld
360
#        if exception_tld in PSL:
361
#            domain = '.'.join(parts[:i+1])
362
#            tld = '.'.join(parts[i+1:])
363
#            break
364
#        if tld in PSL:
365
#            domain = '.'.join(parts[:i])
366
#            break
367
#        if wildcard_tld in PSL:
368
#            domain = '.'.join(parts[:i-1])
369
#            tld = '.'.join(parts[i-1:])
370
#            break
371
#    if '.' in domain:
372
#        (subdomain, domain) = domain.rsplit('.', 1)
373
#    return subdomain, domain, tld