Revision 23963422
b/kamaki/cli/utils/__init__.py | ||
---|---|---|
67 | 67 |
stdout.write(w) |
68 | 68 |
|
69 | 69 |
|
70 |
def _flush(): |
|
71 |
"""stdout.flush wrapper is used to help unittests check what is called""" |
|
72 |
stdout.flush() |
|
73 |
|
|
74 |
|
|
75 |
def _readline(): |
|
76 |
"""stdout.readline wrapper is used to help unittests""" |
|
77 |
return stdout.readline() |
|
78 |
|
|
79 |
|
|
70 | 80 |
def suggest_missing(miss=None, exclude=[]): |
71 | 81 |
global suggest |
72 | 82 |
sgs = dict(suggest) |
... | ... | |
295 | 305 |
page_hold(i + 1, page_size, len(items)) |
296 | 306 |
|
297 | 307 |
|
298 |
def format_size(size): |
|
299 |
units = ('B', 'KiB', 'MiB', 'GiB', 'TiB') |
|
308 |
def format_size(size, decimal_factors=False): |
|
309 |
units = ('B', 'KB', 'MB', 'GB', 'TB') if decimal_factors else ( |
|
310 |
'B', 'KiB', 'MiB', 'GiB', 'TiB') |
|
311 |
step = 1000 if decimal_factors else 1024 |
|
312 |
fstep = float(step) |
|
300 | 313 |
try: |
301 | 314 |
size = float(size) |
302 |
except ValueError as err: |
|
303 |
raiseCLIError(err, 'Cannot format %s in bytes' % size) |
|
304 |
for unit in units: |
|
305 |
if size < 1024: |
|
315 |
except (ValueError, TypeError) as err: |
|
316 |
raiseCLIError(err, 'Cannot format %s in bytes' % ( |
|
317 |
','.join(size) if isinstance(size, tuple) else size)) |
|
318 |
for i, unit in enumerate(units): |
|
319 |
if size < step or i + 1 == len(units): |
|
306 | 320 |
break |
307 |
size /= 1024.0
|
|
321 |
size /= fstep
|
|
308 | 322 |
s = ('%.2f' % size) |
323 |
s = s.replace('%s' % step, '%s.99' % (step - 1)) if size <= fstep else s |
|
309 | 324 |
while '.' in s and s[-1] in ('0', '.'): |
310 | 325 |
s = s[:-1] |
311 | 326 |
return s + unit |
... | ... | |
317 | 332 |
:param format: (case insensitive) KiB, KB, MiB, MB, GiB, GB, TiB, TB |
318 | 333 |
|
319 | 334 |
:returns: (int) the size in bytes |
335 |
:raises ValueError: if invalid size or format |
|
336 |
:raises AttributeError: if format is not str |
|
337 |
:raises TypeError: if size is not arithmetic or convertible to arithmetic |
|
320 | 338 |
""" |
321 | 339 |
format = format.upper() |
322 | 340 |
if format == 'B': |
... | ... | |
337 | 355 |
|
338 | 356 |
def dict2file(d, f, depth=0): |
339 | 357 |
for k, v in d.items(): |
340 |
f.write('%s%s: ' % ('\t' * depth, k))
|
|
358 |
f.write('%s%s: ' % (' ' * INDENT_TAB * depth, k))
|
|
341 | 359 |
if isinstance(v, dict): |
342 | 360 |
f.write('\n') |
343 | 361 |
dict2file(v, f, depth + 1) |
344 |
elif isinstance(v, list): |
|
362 |
elif isinstance(v, list) or isinstance(v, tuple):
|
|
345 | 363 |
f.write('\n') |
346 | 364 |
list2file(v, f, depth + 1) |
347 | 365 |
else: |
348 |
f.write(' %s\n' % v)
|
|
366 |
f.write('%s\n' % v) |
|
349 | 367 |
|
350 | 368 |
|
351 | 369 |
def list2file(l, f, depth=1): |
352 | 370 |
for item in l: |
353 | 371 |
if isinstance(item, dict): |
354 | 372 |
dict2file(item, f, depth + 1) |
355 |
elif isinstance(item, list): |
|
373 |
elif isinstance(item, list) or isinstance(item, tuple):
|
|
356 | 374 |
list2file(item, f, depth + 1) |
357 | 375 |
else: |
358 |
f.write('%s%s\n' % ('\t' * depth, item))
|
|
376 |
f.write('%s%s\n' % (' ' * INDENT_TAB * depth, item))
|
|
359 | 377 |
|
360 | 378 |
# Split input auxiliary |
361 | 379 |
|
... | ... | |
365 | 383 |
return (re_parser.split(line), re_parser.findall(line)) |
366 | 384 |
|
367 | 385 |
|
368 |
def _sub_split(line): |
|
369 |
terms = [] |
|
370 |
(sub_trivials, sub_interesting) = _parse_with_regex(line, ' ".*?" ') |
|
371 |
for subi, subipart in enumerate(sub_interesting): |
|
372 |
terms += sub_trivials[subi].split() |
|
373 |
terms.append(subipart[2:-2]) |
|
374 |
terms += sub_trivials[-1].split() |
|
375 |
return terms |
|
376 |
|
|
377 |
|
|
378 |
def old_split_input(line): |
|
379 |
"""Use regular expressions to split a line correctly""" |
|
380 |
line = ' %s ' % line |
|
381 |
(trivial_parts, interesting_parts) = _parse_with_regex(line, ' \'.*?\' ') |
|
382 |
terms = [] |
|
383 |
for i, ipart in enumerate(interesting_parts): |
|
384 |
terms += _sub_split(trivial_parts[i]) |
|
385 |
terms.append(ipart[2:-2]) |
|
386 |
terms += _sub_split(trivial_parts[-1]) |
|
387 |
return terms |
|
388 |
|
|
389 |
|
|
390 | 386 |
def _get_from_parsed(parsed_str): |
391 | 387 |
try: |
392 | 388 |
parsed_str = parsed_str.strip() |
393 | 389 |
except: |
394 | 390 |
return None |
395 |
if parsed_str: |
|
396 |
if parsed_str[0] == parsed_str[-1] and parsed_str[0] in ("'", '"'): |
|
397 |
return [parsed_str[1:-1]] |
|
398 |
return parsed_str.split(' ') |
|
399 |
return None |
|
391 |
return ([parsed_str[1:-1]] if ( |
|
392 |
parsed_str[0] == parsed_str[-1] and parsed_str[0] in ("'", '"')) else ( |
|
393 |
parsed_str.split(' '))) if parsed_str else None |
|
400 | 394 |
|
401 | 395 |
|
402 | 396 |
def split_input(line): |
... | ... | |
405 | 399 |
reg_expr = '\'.*?\'|".*?"|^[\S]*$' |
406 | 400 |
(trivial_parts, interesting_parts) = _parse_with_regex(line, reg_expr) |
407 | 401 |
assert(len(trivial_parts) == 1 + len(interesting_parts)) |
408 |
#print(' [split_input] trivial_parts %s are' % trivial_parts) |
|
409 |
#print(' [split_input] interesting_parts %s are' % interesting_parts) |
|
410 | 402 |
terms = [] |
411 | 403 |
for i, tpart in enumerate(trivial_parts): |
412 | 404 |
part = _get_from_parsed(tpart) |
... | ... | |
428 | 420 |
|
429 | 421 |
:returns: (bool) True if reponse in true responses, False otherwise |
430 | 422 |
""" |
431 |
stdout.write('%s [%s/N]: ' % (msg, ', '.join(true_resp)))
|
|
432 |
stdout.flush()
|
|
433 |
user_response = stdin.readline()
|
|
423 |
_write('%s [%s/N]: ' % (msg, ', '.join(true_resp)))
|
|
424 |
_flush()
|
|
425 |
user_response = _readline()
|
|
434 | 426 |
return user_response[0].lower() in true_resp |
435 | 427 |
|
436 | 428 |
|
437 | 429 |
def spiner(size=None): |
438 | 430 |
spins = ('/', '-', '\\', '|') |
439 |
stdout.write(' ')
|
|
431 |
_write(' ')
|
|
440 | 432 |
size = size or -1 |
441 | 433 |
i = 0 |
442 | 434 |
while size - i: |
443 |
stdout.write('\b%s' % spins[i % len(spins)])
|
|
444 |
stdout.flush()
|
|
435 |
_write('\b%s' % spins[i % len(spins)])
|
|
436 |
_flush()
|
|
445 | 437 |
i += 1 |
446 | 438 |
sleep(0.1) |
447 | 439 |
yield |
448 | 440 |
yield |
449 | 441 |
|
450 |
if __name__ == '__main__': |
|
451 |
examples = [ |
|
452 |
'la_la le_le li_li', |
|
453 |
'\'la la\' \'le le\' \'li li\'', |
|
454 |
'\'la la\' le_le \'li li\'', |
|
455 |
'la_la \'le le\' li_li', |
|
456 |
'la_la \'le le\' \'li li\'', |
|
457 |
'"la la" "le le" "li li"', |
|
458 |
'"la la" le_le "li li"', |
|
459 |
'la_la "le le" li_li', |
|
460 |
'"la_la" "le le" "li li"', |
|
461 |
'\'la la\' "le le" \'li li\'', |
|
462 |
'la_la \'le le\' "li li"', |
|
463 |
'la_la \'le le\' li_li', |
|
464 |
'\'la la\' le_le "li li"', |
|
465 |
'"la la" le_le \'li li\'', |
|
466 |
'"la la" \'le le\' li_li', |
|
467 |
'la_la \'le\'le\' "li\'li"', |
|
468 |
'"la \'le le\' la"', |
|
469 |
'\'la "le le" la\'', |
|
470 |
'\'la "la" la\' "le \'le\' le" li_"li"_li', |
|
471 |
'\'\' \'L\' "" "A"'] |
|
472 |
|
|
473 |
for i, example in enumerate(examples): |
|
474 |
print('%s. Split this: (%s)' % (i + 1, example)) |
|
475 |
ret = old_split_input(example) |
|
476 |
print('\t(%s) of size %s' % (ret, len(ret))) |
|
477 |
|
|
478 | 442 |
|
479 | 443 |
def get_path_size(testpath): |
480 | 444 |
if path.isfile(testpath): |
b/kamaki/cli/utils/test.py | ||
---|---|---|
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 | 34 |
from unittest import TestCase |
35 |
#from tempfile import NamedTemporaryFile
|
|
35 |
from tempfile import NamedTemporaryFile |
|
36 | 36 |
from mock import patch, call |
37 | 37 |
from itertools import product |
38 | 38 |
|
... | ... | |
286 | 286 |
call(i + 1, page_size, len(items))) |
287 | 287 |
ph_counter += 1 |
288 | 288 |
|
289 |
def test_format_size(self): |
|
290 |
from kamaki.cli.utils import format_size |
|
291 |
from kamaki.cli import CLIError |
|
292 |
for v in ('wrong', {1: '1', 2: '2'}, ('tuples', 'not OK'), [1, 2]): |
|
293 |
self.assertRaises(CLIError, format_size, v) |
|
294 |
for step, B, K, M, G, T in ( |
|
295 |
(1000, 'B', 'KB', 'MB', 'GB', 'TB'), |
|
296 |
(1024, 'B', 'KiB', 'MiB', 'GiB', 'TiB')): |
|
297 |
Ki, Mi, Gi = step, step * step, step * step * step |
|
298 |
for before, after in ( |
|
299 |
(0, '0' + B), (512, '512' + B), ( |
|
300 |
Ki - 1, '%s%s' % (step - 1, B)), |
|
301 |
(Ki, '1' + K), (42 * Ki, '42' + K), ( |
|
302 |
Mi - 1, '%s.99%s' % (step - 1, K)), |
|
303 |
(Mi, '1' + M), (42 * Mi, '42' + M), ( |
|
304 |
Ki * Mi - 1, '%s.99%s' % (step - 1, M)), |
|
305 |
(Gi, '1' + G), (42 * Gi, '42' + G), ( |
|
306 |
Mi * Mi - 1, '%s.99%s' % (step - 1, G)), |
|
307 |
(Mi * Mi, '1' + T), (42 * Mi * Mi, '42' + T), ( |
|
308 |
Mi * Gi - 1, '%s.99%s' % (step - 1, T)), ( |
|
309 |
42 * Mi * Gi, '%s%s' % (42 * Ki, T))): |
|
310 |
self.assertEqual(format_size(before, step == 1000), after) |
|
311 |
|
|
312 |
def test_to_bytes(self): |
|
313 |
from kamaki.cli.utils import to_bytes |
|
314 |
for v in ('wrong', 'KABUM', 'kbps', 'kibps'): |
|
315 |
self.assertRaises(ValueError, to_bytes, v, 'B') |
|
316 |
self.assertRaises(ValueError, to_bytes, 42, v) |
|
317 |
for v in ([1, 2, 3], ('kb', 'mb'), {'kb': 1, 'byte': 2}): |
|
318 |
self.assertRaises(TypeError, to_bytes, v, 'B') |
|
319 |
self.assertRaises(AttributeError, to_bytes, 42, v) |
|
320 |
kl, ki = 1000, 1024 |
|
321 |
for size, (unit, factor) in product( |
|
322 |
(0, 42, 3.14, 1023, 10000), |
|
323 |
( |
|
324 |
('B', 1), ('b', 1), |
|
325 |
('KB', kl), ('KiB', ki), |
|
326 |
('mb', kl * kl), ('mIb', ki * ki), |
|
327 |
('gB', kl * kl * kl), ('GIB', ki * ki * ki), |
|
328 |
('TB', kl * kl * kl * kl), ('tiB', ki * ki * ki * ki))): |
|
329 |
self.assertEqual(to_bytes(size, unit), int(size * factor)) |
|
330 |
|
|
331 |
def test_dict2file(self): |
|
332 |
from kamaki.cli.utils import dict2file, INDENT_TAB |
|
333 |
for d, depth in product(( |
|
334 |
{'k': 42}, |
|
335 |
{'k1': 'v1', 'k2': [1, 2, 3], 'k3': {'k': 'v'}}, |
|
336 |
{'k1': { |
|
337 |
'k1.1': 'v1.1', |
|
338 |
'k1.2': [1, 2, 3], |
|
339 |
'k1.3': {'k': 'v'}}}), |
|
340 |
(-42, 0, 42)): |
|
341 |
exp = '' |
|
342 |
exp_d = [] |
|
343 |
exp_l = [] |
|
344 |
exp, exp_d, exp_l = '', [], [] |
|
345 |
with NamedTemporaryFile() as f: |
|
346 |
for k, v in d.items(): |
|
347 |
sfx = '\n' |
|
348 |
if isinstance(v, dict): |
|
349 |
exp_d.append(call(v, f, depth + 1)) |
|
350 |
elif isinstance(v, tuple) or isinstance(v, list): |
|
351 |
exp_l.append(call(v, f, depth + 1)) |
|
352 |
else: |
|
353 |
sfx = '%s\n' % v |
|
354 |
exp += '%s%s: %s' % ( |
|
355 |
' ' * (depth * INDENT_TAB), k, sfx) |
|
356 |
with patch('kamaki.cli.utils.dict2file') as D2F: |
|
357 |
with patch('kamaki.cli.utils.list2file') as L2F: |
|
358 |
dict2file(d, f, depth) |
|
359 |
f.seek(0) |
|
360 |
self.assertEqual(f.read(), exp) |
|
361 |
self.assertEqual(L2F.mock_calls, exp_l) |
|
362 |
self.assertEqual(D2F.mock_calls, exp_d) |
|
363 |
|
|
364 |
def test_list2file(self): |
|
365 |
from kamaki.cli.utils import list2file, INDENT_TAB |
|
366 |
for l, depth in product( |
|
367 |
( |
|
368 |
(1, 2, 3), |
|
369 |
[1, 2, 3], |
|
370 |
('v', [1, 2, 3], (1, 2, 3), {'1': 1, 2: '2', 3: 3}), |
|
371 |
['v', {'k1': 'v1', 'k2': [1, 2, 3], 'k3': {1: '1'}}]), |
|
372 |
(-42, 0, 42)): |
|
373 |
with NamedTemporaryFile() as f: |
|
374 |
exp, exp_d, exp_l = '', [], [] |
|
375 |
for v in l: |
|
376 |
if isinstance(v, dict): |
|
377 |
exp_d.append(call(v, f, depth + 1)) |
|
378 |
elif isinstance(v, list) or isinstance(v, tuple): |
|
379 |
exp_l.append(call(v, f, depth + 1)) |
|
380 |
else: |
|
381 |
exp += '%s%s\n' % (' ' * INDENT_TAB * depth, v) |
|
382 |
with patch('kamaki.cli.utils.dict2file') as D2F: |
|
383 |
with patch('kamaki.cli.utils.list2file') as L2F: |
|
384 |
list2file(l, f, depth) |
|
385 |
f.seek(0) |
|
386 |
self.assertEqual(f.read(), exp) |
|
387 |
self.assertEqual(L2F.mock_calls, exp_l) |
|
388 |
self.assertEqual(D2F.mock_calls, exp_d) |
|
389 |
|
|
390 |
def test__parse_with_regex(self): |
|
391 |
from re import compile as r_compile |
|
392 |
from kamaki.cli.utils import _parse_with_regex |
|
393 |
for args in product( |
|
394 |
( |
|
395 |
'this is a line', |
|
396 |
'this_is_also_a_line', |
|
397 |
'This "text" is quoted', |
|
398 |
'This "quoted" "text" is more "complicated"', |
|
399 |
'Is this \'quoted\' text "double \'quoted\' or not?"', |
|
400 |
'"What \'about\' the" oposite?', |
|
401 |
' Try with a " single double quote', |
|
402 |
'Go "down \'deep " deeper \'bottom \' up" go\' up" !'), |
|
403 |
( |
|
404 |
'\'.*?\'|".*?"|^[\S]*$', |
|
405 |
r'"([A-Za-z0-9_\./\\-]*)"', |
|
406 |
r'\"(.+?)\"', |
|
407 |
'\\^a\\.\\*\\$')): |
|
408 |
r_parser = r_compile(args[1]) |
|
409 |
self.assertEqual( |
|
410 |
_parse_with_regex(*args), |
|
411 |
(r_parser.split(args[0]), r_parser.findall(args[0]))) |
|
412 |
|
|
413 |
def test_split_input(self): |
|
414 |
from kamaki.cli.utils import split_input |
|
415 |
for line, expected in ( |
|
416 |
('unparsable', ['unparsable']), |
|
417 |
('"parsable"', ['parsable']), |
|
418 |
('"parse" out', ['parse', 'out']), |
|
419 |
('"one', ['"one']), |
|
420 |
('two" or" more"', ['two', ' or', 'more"']), |
|
421 |
('Go "down \'deep " deeper \'bottom \' up" go\' up" !', [ |
|
422 |
'Go', "down 'deep ", 'deeper', 'bottom ', |
|
423 |
'up', " go' up", '!']), |
|
424 |
('Is "this" a \'parsed\' string?', [ |
|
425 |
'Is', 'this', 'a', 'parsed', 'string?'])): |
|
426 |
self.assertEqual(split_input(line), expected) |
|
427 |
|
|
428 |
@patch('kamaki.cli.utils._readline', return_value='read line') |
|
429 |
@patch('kamaki.cli.utils._flush') |
|
430 |
@patch('kamaki.cli.utils._write') |
|
431 |
def test_ask_user(self, WR, FL, RL): |
|
432 |
from kamaki.cli.utils import ask_user |
|
433 |
msg = 'some question' |
|
434 |
self.assertFalse(ask_user(msg)) |
|
435 |
WR.assert_called_once_with('%s [y/N]: ' % msg) |
|
436 |
FL.assert_called_once_with() |
|
437 |
RL.assert_called_once_with() |
|
438 |
|
|
439 |
self.assertTrue(ask_user(msg, ('r', ))) |
|
440 |
self.assertEqual(WR.mock_calls[-1], call('%s [r/N]: ' % msg)) |
|
441 |
self.assertEqual(FL.mock_calls, 2 * [call()]) |
|
442 |
self.assertEqual(RL.mock_calls, 2 * [call()]) |
|
443 |
|
|
444 |
self.assertTrue(ask_user(msg, ('Z', 'r', 'k'))) |
|
445 |
self.assertEqual(WR.mock_calls[-1], call('%s [Z, r, k/N]: ' % msg)) |
|
446 |
self.assertEqual(FL.mock_calls, 3 * [call()]) |
|
447 |
self.assertEqual(RL.mock_calls, 3 * [call()]) |
|
448 |
|
|
449 |
@patch('kamaki.cli.utils._flush') |
|
450 |
@patch('kamaki.cli.utils._write') |
|
451 |
def test_spiner(self, WR, FL): |
|
452 |
from kamaki.cli.utils import spiner |
|
453 |
spins = ('/', '-', '\\', '|') |
|
454 |
prev = 1 |
|
455 |
for i, SP in enumerate(spiner(6)): |
|
456 |
if not i: |
|
457 |
self.assertEqual(WR.mock_calls[-2], call(' ')) |
|
458 |
elif i > 5: |
|
459 |
break |
|
460 |
self.assertEqual(SP, None) |
|
461 |
self.assertEqual(WR.mock_calls[-1], call('\b%s' % spins[i % 4])) |
|
462 |
self.assertEqual(FL.mock_calls, prev * [call()]) |
|
463 |
prev += 1 |
|
464 |
|
|
465 |
def test_remove_from_items(self): |
|
466 |
from kamaki.cli.utils import remove_from_items |
|
467 |
for v in ('wrong', [1, 2, 3], [{}, 2, {}]): |
|
468 |
self.assertRaises(AssertionError, remove_from_items, v, 'none') |
|
469 |
d = dict(k1=1, k2=dict(k2=2, k3=3), k3=3, k4=4) |
|
470 |
for k in (d.keys() + ['kN']): |
|
471 |
tmp1, tmp2 = dict(d), dict(d) |
|
472 |
remove_from_items([tmp1, ], k) |
|
473 |
tmp1.pop(k, None) |
|
474 |
self.assert_dicts_are_equal(tmp1, tmp2) |
|
475 |
for k in (d.keys() + ['kN']): |
|
476 |
tmp1, tmp2 = dict(d), dict(d) |
|
477 |
remove_from_items([tmp1, tmp2], k) |
|
478 |
self.assert_dicts_are_equal(tmp1, tmp2) |
|
479 |
|
|
289 | 480 |
|
290 | 481 |
if __name__ == '__main__': |
291 | 482 |
from sys import argv |
Also available in: Unified diff