Revision fa9c0c38
b/kamaki/cli/commands/pithos.py | ||
---|---|---|
34 | 34 |
from sys import stdout |
35 | 35 |
from time import localtime, strftime |
36 | 36 |
from os import path, makedirs, walk |
37 |
from io import StringIO |
|
37 | 38 |
|
38 | 39 |
from kamaki.cli import command |
39 | 40 |
from kamaki.cli.command_tree import CommandTree |
40 | 41 |
from kamaki.cli.errors import raiseCLIError, CLISyntaxError, CLIBaseUrlError |
41 | 42 |
from kamaki.cli.utils import ( |
42 |
format_size, to_bytes, print_dict, print_items, page_hold, bold, ask_user,
|
|
43 |
format_size, to_bytes, print_dict, print_items, pager, bold, ask_user,
|
|
43 | 44 |
get_path_size, print_json, guess_mime_type) |
44 | 45 |
from kamaki.cli.argument import FlagArgument, ValueArgument, IntArgument |
45 | 46 |
from kamaki.cli.argument import KeyValueArgument, DateArgument |
... | ... | |
354 | 355 |
format=ValueArgument( |
355 | 356 |
'format to parse until data (default: d/m/Y H:M:S )', '--format'), |
356 | 357 |
shared=FlagArgument('show only shared', '--shared'), |
357 |
more=FlagArgument( |
|
358 |
'output results in pages (-n to set items per page, default 10)', |
|
359 |
'--more'), |
|
358 |
more=FlagArgument('read long results', '--more'), |
|
360 | 359 |
exact_match=FlagArgument( |
361 | 360 |
'Show only objects that match exactly with path', |
362 | 361 |
'--exact-match'), |
... | ... | |
367 | 366 |
if self['json_output']: |
368 | 367 |
print_json(object_list) |
369 | 368 |
return |
370 |
limit = int(self['limit']) if self['limit'] > 0 else len(object_list)
|
|
369 |
out = StringIO() if self['more'] else stdout
|
|
371 | 370 |
for index, obj in enumerate(object_list): |
372 | 371 |
if self['exact_match'] and self.path and not ( |
373 | 372 |
obj['name'] == self.path or 'content_type' in obj): |
... | ... | |
384 | 383 |
isDir = False |
385 | 384 |
size = format_size(obj['bytes']) |
386 | 385 |
pretty_obj['bytes'] = '%s (%s)' % (obj['bytes'], size) |
387 |
oname = bold(obj['name']) |
|
386 |
oname = obj['name'] if self['more'] else bold(obj['name'])
|
|
388 | 387 |
prfx = ('%s%s. ' % (empty_space, index)) if self['enum'] else '' |
389 | 388 |
if self['detail']: |
390 |
print('%s%s' % (prfx, oname))
|
|
391 |
print_dict(pretty_obj, exclude=('name')) |
|
392 |
print
|
|
389 |
out.writelines(u'%s%s\n' % (prfx, oname))
|
|
390 |
print_dict(pretty_obj, exclude=('name'), out=out)
|
|
391 |
out.writelines(u'\n')
|
|
393 | 392 |
else: |
394 |
oname = '%s%9s %s' % (prfx, size, oname) |
|
395 |
oname += '/' if isDir else ''
|
|
396 |
print(oname)
|
|
397 |
if self['more']:
|
|
398 |
page_hold(index, limit, len(object_list))
|
|
393 |
oname = u'%s%9s %s' % (prfx, size, oname)
|
|
394 |
oname += u'/' if isDir else u''
|
|
395 |
out.writelines(oname + u'\n')
|
|
396 |
if self['more']: |
|
397 |
pager(out.getvalue())
|
|
399 | 398 |
|
400 | 399 |
def print_containers(self, container_list): |
401 | 400 |
if self['json_output']: |
402 | 401 |
print_json(container_list) |
403 | 402 |
return |
404 |
limit = int(self['limit']) if self['limit'] > 0\ |
|
405 |
else len(container_list) |
|
403 |
out = StringIO() if self['more'] else stdout |
|
406 | 404 |
for index, container in enumerate(container_list): |
407 | 405 |
if 'bytes' in container: |
408 | 406 |
size = format_size(container['bytes']) |
409 | 407 |
prfx = ('%s. ' % (index + 1)) if self['enum'] else '' |
410 |
cname = '%s%s' % (prfx, bold(container['name'])) |
|
408 |
_cname = container['name'] if ( |
|
409 |
self['more']) else bold(container['name']) |
|
410 |
cname = u'%s%s' % (prfx, _cname) |
|
411 | 411 |
if self['detail']: |
412 |
print(cname)
|
|
412 |
out.writelines(cname + u'\n')
|
|
413 | 413 |
pretty_c = container.copy() |
414 | 414 |
if 'bytes' in container: |
415 | 415 |
pretty_c['bytes'] = '%s (%s)' % (container['bytes'], size) |
416 |
print_dict(pretty_c, exclude=('name')) |
|
417 |
print
|
|
416 |
print_dict(pretty_c, exclude=('name'), out=out)
|
|
417 |
out.writelines(u'\n')
|
|
418 | 418 |
else: |
419 | 419 |
if 'count' in container and 'bytes' in container: |
420 |
print('%s (%s, %s objects)' % ( |
|
421 |
cname, |
|
422 |
size, |
|
423 |
container['count'])) |
|
420 |
out.writelines(u'%s (%s, %s objects)\n' % ( |
|
421 |
cname, size, container['count'])) |
|
424 | 422 |
else: |
425 |
print(cname)
|
|
426 |
if self['more']:
|
|
427 |
page_hold(index + 1, limit, len(container_list))
|
|
423 |
out.writelines(cname + '\n')
|
|
424 |
if self['more']: |
|
425 |
pager(out.getvalue())
|
|
428 | 426 |
|
429 | 427 |
@errors.generic.all |
430 | 428 |
@errors.pithos.connection |
... | ... | |
1239 | 1237 |
* download <container>:<path> <local dir> -R |
1240 | 1238 |
will download all files on <container> prefixed as <path>, |
1241 | 1239 |
to <local dir>/<full path> (or <local dir>\<full path> in windows) |
1242 |
* download <container>:<path> <local dir> --exact-match
|
|
1243 |
will download only one file, exactly matching <path>
|
|
1240 |
* download <container>:<path> <local dir> |
|
1241 |
will download only one file<path> |
|
1244 | 1242 |
ATTENTION: to download cont:dir1/dir2/file there must exist objects |
1245 | 1243 |
cont:dir1 and cont:dir1/dir2 of type application/directory |
1246 | 1244 |
To create directory objects, use /file mkdir |
b/kamaki/cli/utils/__init__.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from sys import stdout |
|
34 |
from sys import stdout, stdin
|
|
35 | 35 |
from re import compile as regex_compile |
36 |
from time import sleep |
|
37 | 36 |
from os import walk, path |
38 | 37 |
from json import dumps |
38 |
from pydoc import pager |
|
39 | 39 |
|
40 | 40 |
from kamaki.cli.errors import raiseCLIError |
41 | 41 |
|
... | ... | |
57 | 57 |
suggest['ansicolors']['active'] = True |
58 | 58 |
|
59 | 59 |
|
60 |
def _print(w): |
|
61 |
"""Print wrapper is used to help unittests check what is printed""" |
|
62 |
print w |
|
63 |
|
|
64 |
|
|
65 |
def _write(w): |
|
66 |
"""stdout.write wrapper is used to help unittests check what is printed""" |
|
67 |
stdout.write(w) |
|
68 |
|
|
69 |
|
|
70 |
def _flush(): |
|
71 |
"""stdout.flush wrapper is used to help unittests check what is called""" |
|
72 |
stdout.flush() |
|
73 |
|
|
74 |
|
|
75 |
def _readline(): |
|
76 |
"""raw_input wrapper is used to help unittests""" |
|
77 |
return raw_input() |
|
78 |
|
|
79 |
|
|
80 | 60 |
def suggest_missing(miss=None, exclude=[]): |
81 | 61 |
global suggest |
82 | 62 |
sgs = dict(suggest) |
... | ... | |
131 | 111 |
return new_d |
132 | 112 |
|
133 | 113 |
|
134 |
def print_json(data): |
|
114 |
def print_json(data, out=stdout):
|
|
135 | 115 |
"""Print a list or dict as json in console |
136 | 116 |
|
137 | 117 |
:param data: json-dumpable data |
138 |
""" |
|
139 |
_print(dumps(data, indent=INDENT_TAB)) |
|
140 |
|
|
141 | 118 |
|
142 |
def pretty_dict(d, *args, **kwargs): |
|
143 |
print_dict(pretty_keys(d, *args, **kwargs)) |
|
119 |
:param out: Input/Output stream to dump values into |
|
120 |
""" |
|
121 |
out.writelines(unicode(dumps(data, indent=INDENT_TAB) + '\n')) |
|
144 | 122 |
|
145 | 123 |
|
146 | 124 |
def print_dict( |
147 | 125 |
d, |
148 | 126 |
exclude=(), indent=0, |
149 |
with_enumeration=False, recursive_enumeration=False): |
|
127 |
with_enumeration=False, recursive_enumeration=False, out=stdout):
|
|
150 | 128 |
"""Pretty-print a dictionary object |
151 | 129 |
<indent>key: <non iterable item> |
152 | 130 |
<indent>key: |
... | ... | |
163 | 141 |
:param recursive_enumeration: (bool) recursively enumerate iterables (does |
164 | 142 |
not enumerate 1st level keys) |
165 | 143 |
|
144 |
:param out: Input/Output stream to dump values into |
|
145 |
|
|
166 | 146 |
:raises CLIError: if preconditions fail |
167 | 147 |
""" |
168 | 148 |
assert isinstance(d, dict), 'print_dict input must be a dict' |
... | ... | |
172 | 152 |
k = ('%s' % k).strip() |
173 | 153 |
if k in exclude: |
174 | 154 |
continue |
175 |
print_str = ' ' * indent |
|
176 |
print_str += '%s.' % (i + 1) if with_enumeration else ''
|
|
177 |
print_str += '%s:' % k |
|
155 |
print_str = u' ' * indent
|
|
156 |
print_str += u'%s.' % (i + 1) if with_enumeration else u''
|
|
157 |
print_str += u'%s:' % k
|
|
178 | 158 |
if isinstance(v, dict): |
179 |
_print(print_str)
|
|
159 |
out.writelines(print_str + '\n')
|
|
180 | 160 |
print_dict( |
181 | 161 |
v, exclude, indent + INDENT_TAB, |
182 |
recursive_enumeration, recursive_enumeration) |
|
162 |
recursive_enumeration, recursive_enumeration, out)
|
|
183 | 163 |
elif isinstance(v, list) or isinstance(v, tuple): |
184 |
_print(print_str)
|
|
164 |
out.writelines(print_str + '\n')
|
|
185 | 165 |
print_list( |
186 | 166 |
v, exclude, indent + INDENT_TAB, |
187 |
recursive_enumeration, recursive_enumeration) |
|
167 |
recursive_enumeration, recursive_enumeration, out)
|
|
188 | 168 |
else: |
189 |
_print('%s %s' % (print_str, v))
|
|
169 |
out.writelines(u'%s %s\n' % (print_str, v))
|
|
190 | 170 |
|
191 | 171 |
|
192 | 172 |
def print_list( |
193 | 173 |
l, |
194 | 174 |
exclude=(), indent=0, |
195 |
with_enumeration=False, recursive_enumeration=False): |
|
175 |
with_enumeration=False, recursive_enumeration=False, out=stdout):
|
|
196 | 176 |
"""Pretty-print a list of items |
197 | 177 |
<indent>key: <non iterable item> |
198 | 178 |
<indent>key: |
... | ... | |
209 | 189 |
:param recursive_enumeration: (bool) recursively enumerate iterables (does |
210 | 190 |
not enumerate 1st level keys) |
211 | 191 |
|
192 |
:param out: Input/Output stream to dump values into |
|
193 |
|
|
212 | 194 |
:raises CLIError: if preconditions fail |
213 | 195 |
""" |
214 | 196 |
assert isinstance(l, list) or isinstance(l, tuple), ( |
... | ... | |
216 | 198 |
assert indent >= 0, 'print_list indent must be >= 0' |
217 | 199 |
|
218 | 200 |
for i, item in enumerate(l): |
219 |
print_str = ' ' * indent |
|
220 |
print_str += '%s.' % (i + 1) if with_enumeration else ''
|
|
201 |
print_str = u' ' * indent
|
|
202 |
print_str += u'%s.' % (i + 1) if with_enumeration else u''
|
|
221 | 203 |
if isinstance(item, dict): |
222 | 204 |
if with_enumeration: |
223 |
_print(print_str)
|
|
205 |
out.writelines(print_str + '\n')
|
|
224 | 206 |
elif i and i < len(l): |
225 |
_print('')
|
|
207 |
out.writelines(u'\n')
|
|
226 | 208 |
print_dict( |
227 | 209 |
item, exclude, |
228 | 210 |
indent + (INDENT_TAB if with_enumeration else 0), |
229 |
recursive_enumeration, recursive_enumeration) |
|
211 |
recursive_enumeration, recursive_enumeration, out)
|
|
230 | 212 |
elif isinstance(item, list) or isinstance(item, tuple): |
231 | 213 |
if with_enumeration: |
232 |
_print(print_str)
|
|
214 |
out.writelines(print_str + '\n')
|
|
233 | 215 |
elif i and i < len(l): |
234 |
_print()
|
|
216 |
out.writelines(u'\n')
|
|
235 | 217 |
print_list( |
236 | 218 |
item, exclude, indent + INDENT_TAB, |
237 |
recursive_enumeration, recursive_enumeration) |
|
219 |
recursive_enumeration, recursive_enumeration, out)
|
|
238 | 220 |
else: |
239 | 221 |
item = ('%s' % item).strip() |
240 | 222 |
if item in exclude: |
241 | 223 |
continue |
242 |
_print('%s%s' % (print_str, item)) |
|
243 |
|
|
244 |
|
|
245 |
def page_hold(index, limit, maxlen): |
|
246 |
"""Check if there are results to show, and hold the page when needed |
|
247 |
:param index: (int) > 0, index of current element |
|
248 |
:param limit: (int) 0 < limit <= max, page hold if limit mod index == 0 |
|
249 |
:param maxlen: (int) Don't hold if index reaches maxlen |
|
250 |
|
|
251 |
:returns: True if there are more to show, False if all results are shown |
|
252 |
""" |
|
253 |
if index >= maxlen: |
|
254 |
return False |
|
255 |
if index and index % limit == 0: |
|
256 |
raw_input('(%s listed - %s more - "enter" to continue)' % ( |
|
257 |
index, maxlen - index)) |
|
258 |
return True |
|
224 |
out.writelines(u'%s%s\n' % (print_str, item)) |
|
259 | 225 |
|
260 | 226 |
|
261 | 227 |
def print_items( |
262 | 228 |
items, title=('id', 'name'), |
263 |
with_enumeration=False, with_redundancy=False, |
|
264 |
page_size=0): |
|
229 |
with_enumeration=False, with_redundancy=False, out=stdout): |
|
265 | 230 |
"""print dict or list items in a list, using some values as title |
266 | 231 |
Objects of next level don't inherit enumeration (default: off) or titles |
267 | 232 |
|
... | ... | |
273 | 238 |
|
274 | 239 |
:param with_redundancy: (boolean) values in title also appear on body |
275 | 240 |
|
276 |
:param page_size: (int) show results in pages of page_size items, enter to |
|
277 |
continue |
|
241 |
:param out: Input/Output stream to dump values into |
|
278 | 242 |
""" |
279 | 243 |
if not items: |
280 | 244 |
return |
281 | 245 |
if not (isinstance(items, dict) or isinstance(items, list) or isinstance( |
282 | 246 |
items, tuple)): |
283 |
_print('%s' % items)
|
|
247 |
out.writelines(u'%s\n' % items)
|
|
284 | 248 |
return |
285 | 249 |
|
286 |
page_size = int(page_size or 0) |
|
287 |
try: |
|
288 |
page_size = page_size if page_size > 0 else len(items) |
|
289 |
except: |
|
290 |
page_size = len(items) |
|
291 |
num_of_pages = len(items) // page_size |
|
292 |
num_of_pages += 1 if len(items) % page_size else 0 |
|
293 | 250 |
for i, item in enumerate(items): |
294 | 251 |
if with_enumeration: |
295 |
_write('%s. ' % (i + 1))
|
|
252 |
out.write(u'%s. ' % (i + 1))
|
|
296 | 253 |
if isinstance(item, dict): |
297 | 254 |
item = dict(item) |
298 | 255 |
title = sorted(set(title).intersection(item)) |
299 | 256 |
pick = item.get if with_redundancy else item.pop |
300 |
header = ' '.join('%s' % pick(key) for key in title)
|
|
301 |
_print(bold(header))
|
|
302 |
print_dict(item, indent=INDENT_TAB) |
|
257 |
header = u' '.join(u'%s' % pick(key) for key in title)
|
|
258 |
out.writelines(unicode(bold(header) + '\n'))
|
|
259 |
print_dict(item, indent=INDENT_TAB, out=out)
|
|
303 | 260 |
elif isinstance(item, list) or isinstance(item, tuple): |
304 |
print_list(item, indent=INDENT_TAB) |
|
261 |
print_list(item, indent=INDENT_TAB, out=out)
|
|
305 | 262 |
else: |
306 |
_print(' %s' % item) |
|
307 |
page_hold(i + 1, page_size, len(items)) |
|
263 |
out.writelines(u' %s\n' % item) |
|
308 | 264 |
|
309 | 265 |
|
310 | 266 |
def format_size(size, decimal_factors=False): |
... | ... | |
418 | 374 |
return terms |
419 | 375 |
|
420 | 376 |
|
421 |
def ask_user(msg, true_resp=('y', )): |
|
377 |
def ask_user(msg, true_resp=('y', ), out=stdout, user_in=stdin):
|
|
422 | 378 |
"""Print msg and read user response |
423 | 379 |
|
424 | 380 |
:param true_resp: (tuple of chars) |
425 | 381 |
|
426 | 382 |
:returns: (bool) True if reponse in true responses, False otherwise |
427 | 383 |
""" |
428 |
_write('%s [%s/N]: ' % (msg, ', '.join(true_resp))) |
|
429 |
_flush() |
|
430 |
user_response = _readline() |
|
431 |
return user_response[0].lower() in true_resp |
|
432 |
|
|
433 |
|
|
434 |
def spiner(size=None): |
|
435 |
spins = ('/', '-', '\\', '|') |
|
436 |
_write(' ') |
|
437 |
size = size or -1 |
|
438 |
i = 0 |
|
439 |
while size - i: |
|
440 |
_write('\b%s' % spins[i % len(spins)]) |
|
441 |
_flush() |
|
442 |
i += 1 |
|
443 |
sleep(0.1) |
|
444 |
yield |
|
445 |
yield |
|
384 |
yep = ', '.join(true_resp) |
|
385 |
nope = '<not %s>' % yep if 'n' in true_resp or 'N' in true_resp else 'N' |
|
386 |
out.write(u'%s [%s/%s]: ' % (msg, yep, nope)) |
|
387 |
out.flush() |
|
388 |
user_response = user_in.readline() |
|
389 |
return user_response[0].lower() in [s.lower() for s in true_resp] |
|
446 | 390 |
|
447 | 391 |
|
448 | 392 |
def get_path_size(testpath): |
b/kamaki/cli/utils/test.py | ||
---|---|---|
35 | 35 |
from tempfile import NamedTemporaryFile |
36 | 36 |
from mock import patch, call |
37 | 37 |
from itertools import product |
38 |
from io import StringIO |
|
38 | 39 |
|
39 | 40 |
|
40 | 41 |
class UtilsMethods(TestCase): |
... | ... | |
64 | 65 |
self.assertRaises(AssertionError, guess_mime_type, *args) |
65 | 66 |
|
66 | 67 |
@patch('kamaki.cli.utils.dumps', return_value='(dumps output)') |
67 |
@patch('kamaki.cli.utils._print') |
|
68 |
def test_print_json(self, PR, JD): |
|
68 |
def test_print_json(self, JD): |
|
69 | 69 |
from kamaki.cli.utils import print_json, INDENT_TAB |
70 |
print_json('some data') |
|
70 |
out = StringIO() |
|
71 |
print_json('some data', out) |
|
71 | 72 |
JD.assert_called_once_with('some data', indent=INDENT_TAB) |
72 |
PR.assert_called_once_with('(dumps output)')
|
|
73 |
self.assertEqual(out.getvalue(), u'(dumps output)\n')
|
|
73 | 74 |
|
74 |
@patch('kamaki.cli.utils._print') |
|
75 |
def test_print_dict(self, PR): |
|
75 |
def test_print_dict(self): |
|
76 | 76 |
from kamaki.cli.utils import print_dict, INDENT_TAB |
77 |
call_counter = 0
|
|
77 |
out = StringIO()
|
|
78 | 78 |
self.assertRaises(AssertionError, print_dict, 'non-dict think') |
79 | 79 |
self.assertRaises(AssertionError, print_dict, {}, indent=-10) |
80 | 80 |
for args in product( |
... | ... | |
104 | 104 |
with patch('kamaki.cli.utils.print_dict') as PD: |
105 | 105 |
with patch('kamaki.cli.utils.print_list') as PL: |
106 | 106 |
pd_calls, pl_calls = 0, 0 |
107 |
print_dict(*args) |
|
108 |
exp_calls = []
|
|
107 |
print_dict(*args, out=out)
|
|
108 |
exp_calls = u''
|
|
109 | 109 |
for i, (k, v) in enumerate(d.items()): |
110 | 110 |
if k in exclude: |
111 | 111 |
continue |
112 |
str_k = ' ' * indent |
|
113 |
str_k += '%s.' % (i + 1) if with_enumeration else ''
|
|
114 |
str_k += '%s:' % k |
|
112 |
str_k = u' ' * indent
|
|
113 |
str_k += u'%s.' % (i + 1) if with_enumeration else u''
|
|
114 |
str_k += u'%s:' % k
|
|
115 | 115 |
if isinstance(v, dict): |
116 | 116 |
self.assertEqual( |
117 | 117 |
PD.mock_calls[pd_calls], |
... | ... | |
120 | 120 |
exclude, |
121 | 121 |
indent + INDENT_TAB, |
122 | 122 |
recursive_enumeration, |
123 |
recursive_enumeration)) |
|
123 |
recursive_enumeration, |
|
124 |
out)) |
|
124 | 125 |
pd_calls += 1 |
125 |
exp_calls.append(call(str_k))
|
|
126 |
exp_calls += str_k + '\n'
|
|
126 | 127 |
elif isinstance(v, list) or isinstance(v, tuple): |
127 | 128 |
self.assertEqual( |
128 | 129 |
PL.mock_calls[pl_calls], |
... | ... | |
131 | 132 |
exclude, |
132 | 133 |
indent + INDENT_TAB, |
133 | 134 |
recursive_enumeration, |
134 |
recursive_enumeration)) |
|
135 |
recursive_enumeration, |
|
136 |
out)) |
|
135 | 137 |
pl_calls += 1 |
136 |
exp_calls.append(call(str_k))
|
|
138 |
exp_calls += str_k + '\n'
|
|
137 | 139 |
else: |
138 |
exp_calls.append(call('%s %s' % (str_k, v))) |
|
139 |
real_calls = PR.mock_calls[call_counter:] |
|
140 |
call_counter = len(PR.mock_calls) |
|
141 |
self.assertEqual(sorted(real_calls), sorted(exp_calls)) |
|
140 |
exp_calls += u'%s %s\n' % (str_k, v) |
|
141 |
self.assertEqual(exp_calls, out.getvalue()) |
|
142 |
out = StringIO() |
|
142 | 143 |
|
143 |
@patch('kamaki.cli.utils._print') |
|
144 |
def test_print_list(self, PR): |
|
144 |
def test_print_list(self): |
|
145 | 145 |
from kamaki.cli.utils import print_list, INDENT_TAB |
146 |
call_counter = 0
|
|
146 |
out = StringIO()
|
|
147 | 147 |
self.assertRaises(AssertionError, print_list, 'non-list non-tuple') |
148 | 148 |
self.assertRaises(AssertionError, print_list, {}, indent=-10) |
149 | 149 |
for args in product( |
... | ... | |
159 | 159 |
with patch('kamaki.cli.utils.print_dict') as PD: |
160 | 160 |
with patch('kamaki.cli.utils.print_list') as PL: |
161 | 161 |
pd_calls, pl_calls = 0, 0 |
162 |
print_list(*args) |
|
163 |
exp_calls = []
|
|
162 |
print_list(*args, out=out)
|
|
163 |
exp_calls = ''
|
|
164 | 164 |
for i, v in enumerate(l): |
165 |
str_v = ' ' * indent |
|
166 |
str_v += '%s.' % (i + 1) if with_enumeration else ''
|
|
165 |
str_v = u' ' * indent
|
|
166 |
str_v += u'%s.' % (i + 1) if with_enumeration else u''
|
|
167 | 167 |
if isinstance(v, dict): |
168 | 168 |
if with_enumeration: |
169 |
exp_calls.append(call(str_v))
|
|
169 |
exp_calls += str_v + '\n'
|
|
170 | 170 |
elif i and i < len(l): |
171 |
exp_calls.append(call())
|
|
171 |
exp_calls += u'\n'
|
|
172 | 172 |
self.assertEqual( |
173 | 173 |
PD.mock_calls[pd_calls], |
174 | 174 |
call( |
... | ... | |
177 | 177 |
indent + ( |
178 | 178 |
INDENT_TAB if with_enumeration else 0), |
179 | 179 |
recursive_enumeration, |
180 |
recursive_enumeration)) |
|
180 |
recursive_enumeration, |
|
181 |
out)) |
|
181 | 182 |
pd_calls += 1 |
182 | 183 |
elif isinstance(v, list) or isinstance(v, tuple): |
183 | 184 |
if with_enumeration: |
184 |
exp_calls.append(call(str_v))
|
|
185 |
exp_calls += str_v + '\n'
|
|
185 | 186 |
elif i and i < len(l): |
186 |
exp_calls.append(call())
|
|
187 |
exp_calls += u'\n'
|
|
187 | 188 |
self.assertEqual( |
188 | 189 |
PL.mock_calls[pl_calls], |
189 | 190 |
call( |
... | ... | |
191 | 192 |
exclude, |
192 | 193 |
indent + INDENT_TAB, |
193 | 194 |
recursive_enumeration, |
194 |
recursive_enumeration)) |
|
195 |
recursive_enumeration, |
|
196 |
out)) |
|
195 | 197 |
pl_calls += 1 |
196 | 198 |
elif ('%s' % v) in exclude: |
197 | 199 |
continue |
198 | 200 |
else: |
199 |
exp_calls.append(call('%s%s' % (str_v, v))) |
|
200 |
real_calls = PR.mock_calls[call_counter:] |
|
201 |
call_counter = len(PR.mock_calls) |
|
202 |
self.assertEqual(sorted(real_calls), sorted(exp_calls)) |
|
203 |
|
|
204 |
@patch('__builtin__.raw_input') |
|
205 |
def test_page_hold(self, RI): |
|
206 |
from kamaki.cli.utils import page_hold |
|
207 |
ri_counter = 0 |
|
208 |
for args, expected in ( |
|
209 |
((0, 0, 0), False), |
|
210 |
((1, 3, 10), True), |
|
211 |
((3, 3, 10), True), |
|
212 |
((5, 3, 10), True), |
|
213 |
((6, 3, 10), True), |
|
214 |
((10, 3, 10), False), |
|
215 |
((11, 3, 10), False)): |
|
216 |
self.assertEqual(page_hold(*args), expected) |
|
217 |
index, limit, maxlen = args |
|
218 |
if index and index < maxlen and index % limit == 0: |
|
219 |
self.assertEqual(ri_counter + 1, len(RI.mock_calls)) |
|
220 |
self.assertEqual(RI.mock_calls[-1], call( |
|
221 |
'(%s listed - %s more - "enter" to continue)' % ( |
|
222 |
index, maxlen - index))) |
|
223 |
else: |
|
224 |
self.assertEqual(ri_counter, len(RI.mock_calls)) |
|
225 |
ri_counter = len(RI.mock_calls) |
|
201 |
exp_calls += u'%s%s\n' % (str_v, v) |
|
202 |
self.assertEqual(out.getvalue(), exp_calls) |
|
203 |
out = StringIO() |
|
226 | 204 |
|
227 |
@patch('kamaki.cli.utils._print') |
|
228 |
@patch('kamaki.cli.utils._write') |
|
229 | 205 |
@patch('kamaki.cli.utils.print_dict') |
230 | 206 |
@patch('kamaki.cli.utils.print_list') |
231 |
@patch('kamaki.cli.utils.page_hold') |
|
232 | 207 |
@patch('kamaki.cli.utils.bold', return_value='bold') |
233 |
def test_print_items(self, bold, PH, PL, PD, WR, PR):
|
|
208 |
def test_print_items(self, bold, PL, PD):
|
|
234 | 209 |
from kamaki.cli.utils import print_items, INDENT_TAB |
235 | 210 |
for args in product( |
236 | 211 |
( |
... | ... | |
239 | 214 |
({'k': 1, 'id': 2}, [5, 6, 7], (8, 9), '10')), |
240 | 215 |
(('id', 'name'), ('something', 2), ('lala', )), |
241 | 216 |
(False, True), |
242 |
(False, True), |
|
243 |
(0, 1, 2, 10)): |
|
244 |
items, title, with_enumeration, with_redundancy, page_size = args |
|
245 |
wr_counter, pr_counter = len(WR.mock_calls), len(PR.mock_calls) |
|
217 |
(False, True)): |
|
218 |
items, title, with_enumeration, with_redundancy = args |
|
246 | 219 |
pl_counter, pd_counter = len(PL.mock_calls), len(PD.mock_calls) |
247 |
bold_counter, ph_counter = len(bold.mock_calls), len(PH.mock_calls) |
|
248 |
print_items(*args) |
|
220 |
bold_counter, out_counter = len(bold.mock_calls), 0 |
|
221 |
out = StringIO() |
|
222 |
print_items(*args, out=out) |
|
223 |
out.seek(0) |
|
249 | 224 |
if not (isinstance(items, dict) or isinstance( |
250 | 225 |
items, list) or isinstance(items, tuple)): |
251 | 226 |
if items: |
252 |
self.assertEqual(PR.mock_calls[-1], call('%s' % items))
|
|
227 |
self.assertEqual(out.getvalue(), '%s\n' % items)
|
|
253 | 228 |
else: |
254 | 229 |
for i, item in enumerate(items): |
255 | 230 |
if with_enumeration: |
256 |
self.assertEqual( |
|
257 |
WR.mock_calls[wr_counter], |
|
258 |
call('%s. ' % (i + 1))) |
|
259 |
wr_counter += 1 |
|
231 |
exp_str = '%s. ' % (i + 1) |
|
232 |
self.assertEqual(out.read(len(exp_str)), exp_str) |
|
260 | 233 |
if isinstance(item, dict): |
261 | 234 |
title = sorted(set(title).intersection(item)) |
262 | 235 |
pick = item.get if with_redundancy else item.pop |
263 | 236 |
header = ' '.join('%s' % pick(key) for key in title) |
264 | 237 |
self.assertEqual( |
265 | 238 |
bold.mock_calls[bold_counter], call(header)) |
266 |
self.assertEqual( |
|
267 |
PR.mock_calls[pr_counter], call('bold')) |
|
239 |
self.assertEqual(out.read(5), 'bold\n') |
|
268 | 240 |
self.assertEqual( |
269 | 241 |
PD.mock_calls[pd_counter], |
270 |
call(item, indent=INDENT_TAB)) |
|
271 |
pr_counter += 1 |
|
242 |
call(item, indent=INDENT_TAB, out=out)) |
|
272 | 243 |
pd_counter += 1 |
273 | 244 |
bold_counter += 1 |
274 | 245 |
elif isinstance(item, list) or isinstance(item, tuple): |
275 | 246 |
self.assertEqual( |
276 | 247 |
PL.mock_calls[pl_counter], |
277 |
call(item, indent=INDENT_TAB)) |
|
248 |
call(item, indent=INDENT_TAB, out=out))
|
|
278 | 249 |
pl_counter += 1 |
279 | 250 |
else: |
280 |
self.assertEqual( |
|
281 |
PR.mock_calls[pr_counter], call(' %s' % item)) |
|
282 |
pr_counter += 1 |
|
283 |
page_size = page_size if page_size > 0 else len(items) |
|
284 |
self.assertEqual( |
|
285 |
PH.mock_calls[ph_counter], |
|
286 |
call(i + 1, page_size, len(items))) |
|
287 |
ph_counter += 1 |
|
251 |
exp_str = u' %s\n' % item |
|
252 |
self.assertEqual(out.read(len(exp_str)), exp_str) |
|
288 | 253 |
|
289 | 254 |
def test_format_size(self): |
290 | 255 |
from kamaki.cli.utils import format_size |
... | ... | |
426 | 391 |
'Is', 'this', 'a', 'parsed', 'string?'])): |
427 | 392 |
self.assertEqual(split_input(line), expected) |
428 | 393 |
|
429 |
@patch('kamaki.cli.utils._readline', return_value='read line') |
|
430 |
@patch('kamaki.cli.utils._flush') |
|
431 |
@patch('kamaki.cli.utils._write') |
|
432 |
def test_ask_user(self, WR, FL, RL): |
|
394 |
def test_ask_user(self): |
|
433 | 395 |
from kamaki.cli.utils import ask_user |
434 |
msg = 'some question' |
|
435 |
self.assertFalse(ask_user(msg)) |
|
436 |
WR.assert_called_once_with('%s [y/N]: ' % msg) |
|
437 |
FL.assert_called_once_with() |
|
438 |
RL.assert_called_once_with() |
|
439 |
|
|
440 |
self.assertTrue(ask_user(msg, ('r', ))) |
|
441 |
self.assertEqual(WR.mock_calls[-1], call('%s [r/N]: ' % msg)) |
|
442 |
self.assertEqual(FL.mock_calls, 2 * [call()]) |
|
443 |
self.assertEqual(RL.mock_calls, 2 * [call()]) |
|
396 |
msg = u'some question' |
|
397 |
out = StringIO() |
|
398 |
user_in = StringIO(u'n') |
|
399 |
self.assertFalse(ask_user(msg, out=out, user_in=user_in)) |
|
400 |
self.assertEqual(out.getvalue(), u'%s [y/N]: ' % msg) |
|
444 | 401 |
|
445 |
self.assertTrue(ask_user(msg, ('Z', 'r', 'k')))
|
|
446 |
self.assertEqual(WR.mock_calls[-1], call('%s [Z, r, k/N]: ' % msg))
|
|
447 |
self.assertEqual(FL.mock_calls, 3 * [call()])
|
|
448 |
self.assertEqual(RL.mock_calls, 3 * [call()])
|
|
402 |
user_in.seek(0)
|
|
403 |
out.seek(0)
|
|
404 |
self.assertTrue(ask_user(msg, ('n', ), out=out, user_in=user_in))
|
|
405 |
self.assertEqual(out.getvalue(), u'%s [n/<not n>]: ' % msg)
|
|
449 | 406 |
|
450 |
@patch('kamaki.cli.utils._flush') |
|
451 |
@patch('kamaki.cli.utils._write') |
|
452 |
def test_spiner(self, WR, FL): |
|
453 |
from kamaki.cli.utils import spiner |
|
454 |
spins = ('/', '-', '\\', '|') |
|
455 |
prev = 1 |
|
456 |
for i, SP in enumerate(spiner(6)): |
|
457 |
if not i: |
|
458 |
self.assertEqual(WR.mock_calls[-2], call(' ')) |
|
459 |
elif i > 5: |
|
460 |
break |
|
461 |
self.assertEqual(SP, None) |
|
462 |
self.assertEqual(WR.mock_calls[-1], call('\b%s' % spins[i % 4])) |
|
463 |
self.assertEqual(FL.mock_calls, prev * [call()]) |
|
464 |
prev += 1 |
|
407 |
user_in = StringIO(unicode('N')) |
|
408 |
out.seek(0) |
|
409 |
self.assertTrue(ask_user(msg, ('r', 'N'), out=out, user_in=user_in)) |
|
410 |
self.assertEqual(out.getvalue(), u'%s [r, N/<not r, N>]: ' % msg) |
|
465 | 411 |
|
466 | 412 |
def test_remove_from_items(self): |
467 | 413 |
from kamaki.cli.utils import remove_from_items |
Also available in: Unified diff