Revision fa9c0c38

b/kamaki/cli/commands/pithos.py
34 34
from sys import stdout
35 35
from time import localtime, strftime
36 36
from os import path, makedirs, walk
37
from io import StringIO
37 38

  
38 39
from kamaki.cli import command
39 40
from kamaki.cli.command_tree import CommandTree
40 41
from kamaki.cli.errors import raiseCLIError, CLISyntaxError, CLIBaseUrlError
41 42
from kamaki.cli.utils import (
42
    format_size, to_bytes, print_dict, print_items, page_hold, bold, ask_user,
43
    format_size, to_bytes, print_dict, print_items, pager, bold, ask_user,
43 44
    get_path_size, print_json, guess_mime_type)
44 45
from kamaki.cli.argument import FlagArgument, ValueArgument, IntArgument
45 46
from kamaki.cli.argument import KeyValueArgument, DateArgument
......
354 355
        format=ValueArgument(
355 356
            'format to parse until data (default: d/m/Y H:M:S )', '--format'),
356 357
        shared=FlagArgument('show only shared', '--shared'),
357
        more=FlagArgument(
358
            'output results in pages (-n to set items per page, default 10)',
359
            '--more'),
358
        more=FlagArgument('read long results', '--more'),
360 359
        exact_match=FlagArgument(
361 360
            'Show only objects that match exactly with path',
362 361
            '--exact-match'),
......
367 366
        if self['json_output']:
368 367
            print_json(object_list)
369 368
            return
370
        limit = int(self['limit']) if self['limit'] > 0 else len(object_list)
369
        out = StringIO() if self['more'] else stdout
371 370
        for index, obj in enumerate(object_list):
372 371
            if self['exact_match'] and self.path and not (
373 372
                    obj['name'] == self.path or 'content_type' in obj):
......
384 383
                isDir = False
385 384
                size = format_size(obj['bytes'])
386 385
                pretty_obj['bytes'] = '%s (%s)' % (obj['bytes'], size)
387
            oname = bold(obj['name'])
386
            oname = obj['name'] if self['more'] else bold(obj['name'])
388 387
            prfx = ('%s%s. ' % (empty_space, index)) if self['enum'] else ''
389 388
            if self['detail']:
390
                print('%s%s' % (prfx, oname))
391
                print_dict(pretty_obj, exclude=('name'))
392
                print
389
                out.writelines(u'%s%s\n' % (prfx, oname))
390
                print_dict(pretty_obj, exclude=('name'), out=out)
391
                out.writelines(u'\n')
393 392
            else:
394
                oname = '%s%9s %s' % (prfx, size, oname)
395
                oname += '/' if isDir else ''
396
                print(oname)
397
            if self['more']:
398
                page_hold(index, limit, len(object_list))
393
                oname = u'%s%9s %s' % (prfx, size, oname)
394
                oname += u'/' if isDir else u''
395
                out.writelines(oname + u'\n')
396
        if self['more']:
397
            pager(out.getvalue())
399 398

  
400 399
    def print_containers(self, container_list):
401 400
        if self['json_output']:
402 401
            print_json(container_list)
403 402
            return
404
        limit = int(self['limit']) if self['limit'] > 0\
405
            else len(container_list)
403
        out = StringIO() if self['more'] else stdout
406 404
        for index, container in enumerate(container_list):
407 405
            if 'bytes' in container:
408 406
                size = format_size(container['bytes'])
409 407
            prfx = ('%s. ' % (index + 1)) if self['enum'] else ''
410
            cname = '%s%s' % (prfx, bold(container['name']))
408
            _cname = container['name'] if (
409
                self['more']) else bold(container['name'])
410
            cname = u'%s%s' % (prfx, _cname)
411 411
            if self['detail']:
412
                print(cname)
412
                out.writelines(cname + u'\n')
413 413
                pretty_c = container.copy()
414 414
                if 'bytes' in container:
415 415
                    pretty_c['bytes'] = '%s (%s)' % (container['bytes'], size)
416
                print_dict(pretty_c, exclude=('name'))
417
                print
416
                print_dict(pretty_c, exclude=('name'), out=out)
417
                out.writelines(u'\n')
418 418
            else:
419 419
                if 'count' in container and 'bytes' in container:
420
                    print('%s (%s, %s objects)' % (
421
                        cname,
422
                        size,
423
                        container['count']))
420
                    out.writelines(u'%s (%s, %s objects)\n' % (
421
                        cname, size, container['count']))
424 422
                else:
425
                    print(cname)
426
            if self['more']:
427
                page_hold(index + 1, limit, len(container_list))
423
                    out.writelines(cname + '\n')
424
        if self['more']:
425
            pager(out.getvalue())
428 426

  
429 427
    @errors.generic.all
430 428
    @errors.pithos.connection
......
1239 1237
    *   download <container>:<path> <local dir> -R
1240 1238
    will download all files on <container> prefixed as <path>,
1241 1239
    to <local dir>/<full path> (or <local dir>\<full path> in windows)
1242
    *   download <container>:<path> <local dir> --exact-match
1243
    will download only one file, exactly matching <path>
1240
    *   download <container>:<path> <local dir>
1241
    will download only one file<path>
1244 1242
    ATTENTION: to download cont:dir1/dir2/file there must exist objects
1245 1243
    cont:dir1 and cont:dir1/dir2 of type application/directory
1246 1244
    To create directory objects, use /file mkdir
b/kamaki/cli/utils/__init__.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from sys import stdout
34
from sys import stdout, stdin
35 35
from re import compile as regex_compile
36
from time import sleep
37 36
from os import walk, path
38 37
from json import dumps
38
from pydoc import pager
39 39

  
40 40
from kamaki.cli.errors import raiseCLIError
41 41

  
......
57 57
    suggest['ansicolors']['active'] = True
58 58

  
59 59

  
60
def _print(w):
61
    """Print wrapper is used to help unittests check what is printed"""
62
    print w
63

  
64

  
65
def _write(w):
66
    """stdout.write wrapper is used to help unittests check what is printed"""
67
    stdout.write(w)
68

  
69

  
70
def _flush():
71
    """stdout.flush wrapper is used to help unittests check what is called"""
72
    stdout.flush()
73

  
74

  
75
def _readline():
76
    """raw_input wrapper is used to help unittests"""
77
    return raw_input()
78

  
79

  
80 60
def suggest_missing(miss=None, exclude=[]):
81 61
    global suggest
82 62
    sgs = dict(suggest)
......
131 111
    return new_d
132 112

  
133 113

  
134
def print_json(data):
114
def print_json(data, out=stdout):
135 115
    """Print a list or dict as json in console
136 116

  
137 117
    :param data: json-dumpable data
138
    """
139
    _print(dumps(data, indent=INDENT_TAB))
140

  
141 118

  
142
def pretty_dict(d, *args, **kwargs):
143
    print_dict(pretty_keys(d, *args, **kwargs))
119
    :param out: Input/Output stream to dump values into
120
    """
121
    out.writelines(unicode(dumps(data, indent=INDENT_TAB) + '\n'))
144 122

  
145 123

  
146 124
def print_dict(
147 125
        d,
148 126
        exclude=(), indent=0,
149
        with_enumeration=False, recursive_enumeration=False):
127
        with_enumeration=False, recursive_enumeration=False, out=stdout):
150 128
    """Pretty-print a dictionary object
151 129
    <indent>key: <non iterable item>
152 130
    <indent>key:
......
163 141
    :param recursive_enumeration: (bool) recursively enumerate iterables (does
164 142
        not enumerate 1st level keys)
165 143

  
144
    :param out: Input/Output stream to dump values into
145

  
166 146
    :raises CLIError: if preconditions fail
167 147
    """
168 148
    assert isinstance(d, dict), 'print_dict input must be a dict'
......
172 152
        k = ('%s' % k).strip()
173 153
        if k in exclude:
174 154
            continue
175
        print_str = ' ' * indent
176
        print_str += '%s.' % (i + 1) if with_enumeration else ''
177
        print_str += '%s:' % k
155
        print_str = u' ' * indent
156
        print_str += u'%s.' % (i + 1) if with_enumeration else u''
157
        print_str += u'%s:' % k
178 158
        if isinstance(v, dict):
179
            _print(print_str)
159
            out.writelines(print_str + '\n')
180 160
            print_dict(
181 161
                v, exclude, indent + INDENT_TAB,
182
                recursive_enumeration, recursive_enumeration)
162
                recursive_enumeration, recursive_enumeration, out)
183 163
        elif isinstance(v, list) or isinstance(v, tuple):
184
            _print(print_str)
164
            out.writelines(print_str + '\n')
185 165
            print_list(
186 166
                v, exclude, indent + INDENT_TAB,
187
                recursive_enumeration, recursive_enumeration)
167
                recursive_enumeration, recursive_enumeration, out)
188 168
        else:
189
            _print('%s %s' % (print_str, v))
169
            out.writelines(u'%s %s\n' % (print_str, v))
190 170

  
191 171

  
192 172
def print_list(
193 173
        l,
194 174
        exclude=(), indent=0,
195
        with_enumeration=False, recursive_enumeration=False):
175
        with_enumeration=False, recursive_enumeration=False, out=stdout):
196 176
    """Pretty-print a list of items
197 177
    <indent>key: <non iterable item>
198 178
    <indent>key:
......
209 189
    :param recursive_enumeration: (bool) recursively enumerate iterables (does
210 190
        not enumerate 1st level keys)
211 191

  
192
    :param out: Input/Output stream to dump values into
193

  
212 194
    :raises CLIError: if preconditions fail
213 195
    """
214 196
    assert isinstance(l, list) or isinstance(l, tuple), (
......
216 198
    assert indent >= 0, 'print_list indent must be >= 0'
217 199

  
218 200
    for i, item in enumerate(l):
219
        print_str = ' ' * indent
220
        print_str += '%s.' % (i + 1) if with_enumeration else ''
201
        print_str = u' ' * indent
202
        print_str += u'%s.' % (i + 1) if with_enumeration else u''
221 203
        if isinstance(item, dict):
222 204
            if with_enumeration:
223
                _print(print_str)
205
                out.writelines(print_str + '\n')
224 206
            elif i and i < len(l):
225
                _print('')
207
                out.writelines(u'\n')
226 208
            print_dict(
227 209
                item, exclude,
228 210
                indent + (INDENT_TAB if with_enumeration else 0),
229
                recursive_enumeration, recursive_enumeration)
211
                recursive_enumeration, recursive_enumeration, out)
230 212
        elif isinstance(item, list) or isinstance(item, tuple):
231 213
            if with_enumeration:
232
                _print(print_str)
214
                out.writelines(print_str + '\n')
233 215
            elif i and i < len(l):
234
                _print()
216
                out.writelines(u'\n')
235 217
            print_list(
236 218
                item, exclude, indent + INDENT_TAB,
237
                recursive_enumeration, recursive_enumeration)
219
                recursive_enumeration, recursive_enumeration, out)
238 220
        else:
239 221
            item = ('%s' % item).strip()
240 222
            if item in exclude:
241 223
                continue
242
            _print('%s%s' % (print_str, item))
243

  
244

  
245
def page_hold(index, limit, maxlen):
246
    """Check if there are results to show, and hold the page when needed
247
    :param index: (int) > 0, index of current element
248
    :param limit: (int) 0 < limit <= max, page hold if limit mod index == 0
249
    :param maxlen: (int) Don't hold if index reaches maxlen
250

  
251
    :returns: True if there are more to show, False if all results are shown
252
    """
253
    if index >= maxlen:
254
        return False
255
    if index and index % limit == 0:
256
        raw_input('(%s listed - %s more - "enter" to continue)' % (
257
            index, maxlen - index))
258
    return True
224
            out.writelines(u'%s%s\n' % (print_str, item))
259 225

  
260 226

  
261 227
def print_items(
262 228
        items, title=('id', 'name'),
263
        with_enumeration=False, with_redundancy=False,
264
        page_size=0):
229
        with_enumeration=False, with_redundancy=False, out=stdout):
265 230
    """print dict or list items in a list, using some values as title
266 231
    Objects of next level don't inherit enumeration (default: off) or titles
267 232

  
......
273 238

  
274 239
    :param with_redundancy: (boolean) values in title also appear on body
275 240

  
276
    :param page_size: (int) show results in pages of page_size items, enter to
277
        continue
241
    :param out: Input/Output stream to dump values into
278 242
    """
279 243
    if not items:
280 244
        return
281 245
    if not (isinstance(items, dict) or isinstance(items, list) or isinstance(
282 246
                items, tuple)):
283
        _print('%s' % items)
247
        out.writelines(u'%s\n' % items)
284 248
        return
285 249

  
286
    page_size = int(page_size or 0)
287
    try:
288
        page_size = page_size if page_size > 0 else len(items)
289
    except:
290
        page_size = len(items)
291
    num_of_pages = len(items) // page_size
292
    num_of_pages += 1 if len(items) % page_size else 0
293 250
    for i, item in enumerate(items):
294 251
        if with_enumeration:
295
            _write('%s. ' % (i + 1))
252
            out.write(u'%s. ' % (i + 1))
296 253
        if isinstance(item, dict):
297 254
            item = dict(item)
298 255
            title = sorted(set(title).intersection(item))
299 256
            pick = item.get if with_redundancy else item.pop
300
            header = ' '.join('%s' % pick(key) for key in title)
301
            _print(bold(header))
302
            print_dict(item, indent=INDENT_TAB)
257
            header = u' '.join(u'%s' % pick(key) for key in title)
258
            out.writelines(unicode(bold(header) + '\n'))
259
            print_dict(item, indent=INDENT_TAB, out=out)
303 260
        elif isinstance(item, list) or isinstance(item, tuple):
304
            print_list(item, indent=INDENT_TAB)
261
            print_list(item, indent=INDENT_TAB, out=out)
305 262
        else:
306
            _print(' %s' % item)
307
        page_hold(i + 1, page_size, len(items))
263
            out.writelines(u' %s\n' % item)
308 264

  
309 265

  
310 266
def format_size(size, decimal_factors=False):
......
418 374
    return terms
419 375

  
420 376

  
421
def ask_user(msg, true_resp=('y', )):
377
def ask_user(msg, true_resp=('y', ), out=stdout, user_in=stdin):
422 378
    """Print msg and read user response
423 379

  
424 380
    :param true_resp: (tuple of chars)
425 381

  
426 382
    :returns: (bool) True if reponse in true responses, False otherwise
427 383
    """
428
    _write('%s [%s/N]: ' % (msg, ', '.join(true_resp)))
429
    _flush()
430
    user_response = _readline()
431
    return user_response[0].lower() in true_resp
432

  
433

  
434
def spiner(size=None):
435
    spins = ('/', '-', '\\', '|')
436
    _write(' ')
437
    size = size or -1
438
    i = 0
439
    while size - i:
440
        _write('\b%s' % spins[i % len(spins)])
441
        _flush()
442
        i += 1
443
        sleep(0.1)
444
        yield
445
    yield
384
    yep = ', '.join(true_resp)
385
    nope = '<not %s>' % yep if 'n' in true_resp or 'N' in true_resp else 'N'
386
    out.write(u'%s [%s/%s]: ' % (msg, yep, nope))
387
    out.flush()
388
    user_response = user_in.readline()
389
    return user_response[0].lower() in [s.lower() for s in true_resp]
446 390

  
447 391

  
448 392
def get_path_size(testpath):
b/kamaki/cli/utils/test.py
35 35
from tempfile import NamedTemporaryFile
36 36
from mock import patch, call
37 37
from itertools import product
38
from io import StringIO
38 39

  
39 40

  
40 41
class UtilsMethods(TestCase):
......
64 65
                self.assertRaises(AssertionError, guess_mime_type, *args)
65 66

  
66 67
    @patch('kamaki.cli.utils.dumps', return_value='(dumps output)')
67
    @patch('kamaki.cli.utils._print')
68
    def test_print_json(self, PR, JD):
68
    def test_print_json(self, JD):
69 69
        from kamaki.cli.utils import print_json, INDENT_TAB
70
        print_json('some data')
70
        out = StringIO()
71
        print_json('some data', out)
71 72
        JD.assert_called_once_with('some data', indent=INDENT_TAB)
72
        PR.assert_called_once_with('(dumps output)')
73
        self.assertEqual(out.getvalue(), u'(dumps output)\n')
73 74

  
74
    @patch('kamaki.cli.utils._print')
75
    def test_print_dict(self, PR):
75
    def test_print_dict(self):
76 76
        from kamaki.cli.utils import print_dict, INDENT_TAB
77
        call_counter = 0
77
        out = StringIO()
78 78
        self.assertRaises(AssertionError, print_dict, 'non-dict think')
79 79
        self.assertRaises(AssertionError, print_dict, {}, indent=-10)
80 80
        for args in product(
......
104 104
            with patch('kamaki.cli.utils.print_dict') as PD:
105 105
                with patch('kamaki.cli.utils.print_list') as PL:
106 106
                    pd_calls, pl_calls = 0, 0
107
                    print_dict(*args)
108
                    exp_calls = []
107
                    print_dict(*args, out=out)
108
                    exp_calls = u''
109 109
                    for i, (k, v) in enumerate(d.items()):
110 110
                        if k in exclude:
111 111
                            continue
112
                        str_k = ' ' * indent
113
                        str_k += '%s.' % (i + 1) if with_enumeration else ''
114
                        str_k += '%s:' % k
112
                        str_k = u' ' * indent
113
                        str_k += u'%s.' % (i + 1) if with_enumeration else u''
114
                        str_k += u'%s:' % k
115 115
                        if isinstance(v, dict):
116 116
                            self.assertEqual(
117 117
                                PD.mock_calls[pd_calls],
......
120 120
                                    exclude,
121 121
                                    indent + INDENT_TAB,
122 122
                                    recursive_enumeration,
123
                                    recursive_enumeration))
123
                                    recursive_enumeration,
124
                                    out))
124 125
                            pd_calls += 1
125
                            exp_calls.append(call(str_k))
126
                            exp_calls += str_k + '\n'
126 127
                        elif isinstance(v, list) or isinstance(v, tuple):
127 128
                            self.assertEqual(
128 129
                                PL.mock_calls[pl_calls],
......
131 132
                                    exclude,
132 133
                                    indent + INDENT_TAB,
133 134
                                    recursive_enumeration,
134
                                    recursive_enumeration))
135
                                    recursive_enumeration,
136
                                    out))
135 137
                            pl_calls += 1
136
                            exp_calls.append(call(str_k))
138
                            exp_calls += str_k + '\n'
137 139
                        else:
138
                            exp_calls.append(call('%s %s' % (str_k, v)))
139
                    real_calls = PR.mock_calls[call_counter:]
140
                    call_counter = len(PR.mock_calls)
141
                    self.assertEqual(sorted(real_calls), sorted(exp_calls))
140
                            exp_calls += u'%s %s\n' % (str_k, v)
141
                    self.assertEqual(exp_calls, out.getvalue())
142
                    out = StringIO()
142 143

  
143
    @patch('kamaki.cli.utils._print')
144
    def test_print_list(self, PR):
144
    def test_print_list(self):
145 145
        from kamaki.cli.utils import print_list, INDENT_TAB
146
        call_counter = 0
146
        out = StringIO()
147 147
        self.assertRaises(AssertionError, print_list, 'non-list non-tuple')
148 148
        self.assertRaises(AssertionError, print_list, {}, indent=-10)
149 149
        for args in product(
......
159 159
            with patch('kamaki.cli.utils.print_dict') as PD:
160 160
                with patch('kamaki.cli.utils.print_list') as PL:
161 161
                    pd_calls, pl_calls = 0, 0
162
                    print_list(*args)
163
                    exp_calls = []
162
                    print_list(*args, out=out)
163
                    exp_calls = ''
164 164
                    for i, v in enumerate(l):
165
                        str_v = ' ' * indent
166
                        str_v += '%s.' % (i + 1) if with_enumeration else ''
165
                        str_v = u' ' * indent
166
                        str_v += u'%s.' % (i + 1) if with_enumeration else u''
167 167
                        if isinstance(v, dict):
168 168
                            if with_enumeration:
169
                                exp_calls.append(call(str_v))
169
                                exp_calls += str_v + '\n'
170 170
                            elif i and i < len(l):
171
                                exp_calls.append(call())
171
                                exp_calls += u'\n'
172 172
                            self.assertEqual(
173 173
                                PD.mock_calls[pd_calls],
174 174
                                call(
......
177 177
                                    indent + (
178 178
                                        INDENT_TAB if with_enumeration else 0),
179 179
                                    recursive_enumeration,
180
                                    recursive_enumeration))
180
                                    recursive_enumeration,
181
                                    out))
181 182
                            pd_calls += 1
182 183
                        elif isinstance(v, list) or isinstance(v, tuple):
183 184
                            if with_enumeration:
184
                                exp_calls.append(call(str_v))
185
                                exp_calls += str_v + '\n'
185 186
                            elif i and i < len(l):
186
                                exp_calls.append(call())
187
                                exp_calls += u'\n'
187 188
                            self.assertEqual(
188 189
                                PL.mock_calls[pl_calls],
189 190
                                call(
......
191 192
                                    exclude,
192 193
                                    indent + INDENT_TAB,
193 194
                                    recursive_enumeration,
194
                                    recursive_enumeration))
195
                                    recursive_enumeration,
196
                                    out))
195 197
                            pl_calls += 1
196 198
                        elif ('%s' % v) in exclude:
197 199
                            continue
198 200
                        else:
199
                            exp_calls.append(call('%s%s' % (str_v, v)))
200
                    real_calls = PR.mock_calls[call_counter:]
201
                    call_counter = len(PR.mock_calls)
202
                    self.assertEqual(sorted(real_calls), sorted(exp_calls))
203

  
204
    @patch('__builtin__.raw_input')
205
    def test_page_hold(self, RI):
206
        from kamaki.cli.utils import page_hold
207
        ri_counter = 0
208
        for args, expected in (
209
                ((0, 0, 0), False),
210
                ((1, 3, 10), True),
211
                ((3, 3, 10), True),
212
                ((5, 3, 10), True),
213
                ((6, 3, 10), True),
214
                ((10, 3, 10), False),
215
                ((11, 3, 10), False)):
216
            self.assertEqual(page_hold(*args), expected)
217
            index, limit, maxlen = args
218
            if index and index < maxlen and index % limit == 0:
219
                self.assertEqual(ri_counter + 1, len(RI.mock_calls))
220
                self.assertEqual(RI.mock_calls[-1], call(
221
                    '(%s listed - %s more - "enter" to continue)' % (
222
                        index, maxlen - index)))
223
            else:
224
                self.assertEqual(ri_counter, len(RI.mock_calls))
225
            ri_counter = len(RI.mock_calls)
201
                            exp_calls += u'%s%s\n' % (str_v, v)
202
                    self.assertEqual(out.getvalue(), exp_calls)
203
                    out = StringIO()
226 204

  
227
    @patch('kamaki.cli.utils._print')
228
    @patch('kamaki.cli.utils._write')
229 205
    @patch('kamaki.cli.utils.print_dict')
230 206
    @patch('kamaki.cli.utils.print_list')
231
    @patch('kamaki.cli.utils.page_hold')
232 207
    @patch('kamaki.cli.utils.bold', return_value='bold')
233
    def test_print_items(self, bold, PH, PL, PD, WR, PR):
208
    def test_print_items(self, bold, PL, PD):
234 209
        from kamaki.cli.utils import print_items, INDENT_TAB
235 210
        for args in product(
236 211
                (
......
239 214
                    ({'k': 1, 'id': 2}, [5, 6, 7], (8, 9), '10')),
240 215
                (('id', 'name'), ('something', 2), ('lala', )),
241 216
                (False, True),
242
                (False, True),
243
                (0, 1, 2, 10)):
244
            items, title, with_enumeration, with_redundancy, page_size = args
245
            wr_counter, pr_counter = len(WR.mock_calls), len(PR.mock_calls)
217
                (False, True)):
218
            items, title, with_enumeration, with_redundancy = args
246 219
            pl_counter, pd_counter = len(PL.mock_calls), len(PD.mock_calls)
247
            bold_counter, ph_counter = len(bold.mock_calls), len(PH.mock_calls)
248
            print_items(*args)
220
            bold_counter, out_counter = len(bold.mock_calls), 0
221
            out = StringIO()
222
            print_items(*args, out=out)
223
            out.seek(0)
249 224
            if not (isinstance(items, dict) or isinstance(
250 225
                    items, list) or isinstance(items, tuple)):
251 226
                if items:
252
                    self.assertEqual(PR.mock_calls[-1], call('%s' % items))
227
                    self.assertEqual(out.getvalue(), '%s\n' % items)
253 228
            else:
254 229
                for i, item in enumerate(items):
255 230
                    if with_enumeration:
256
                        self.assertEqual(
257
                            WR.mock_calls[wr_counter],
258
                            call('%s. ' % (i + 1)))
259
                        wr_counter += 1
231
                        exp_str = '%s. ' % (i + 1)
232
                        self.assertEqual(out.read(len(exp_str)), exp_str)
260 233
                    if isinstance(item, dict):
261 234
                        title = sorted(set(title).intersection(item))
262 235
                        pick = item.get if with_redundancy else item.pop
263 236
                        header = ' '.join('%s' % pick(key) for key in title)
264 237
                        self.assertEqual(
265 238
                            bold.mock_calls[bold_counter], call(header))
266
                        self.assertEqual(
267
                            PR.mock_calls[pr_counter], call('bold'))
239
                        self.assertEqual(out.read(5), 'bold\n')
268 240
                        self.assertEqual(
269 241
                            PD.mock_calls[pd_counter],
270
                            call(item, indent=INDENT_TAB))
271
                        pr_counter += 1
242
                            call(item, indent=INDENT_TAB, out=out))
272 243
                        pd_counter += 1
273 244
                        bold_counter += 1
274 245
                    elif isinstance(item, list) or isinstance(item, tuple):
275 246
                        self.assertEqual(
276 247
                            PL.mock_calls[pl_counter],
277
                            call(item, indent=INDENT_TAB))
248
                            call(item, indent=INDENT_TAB, out=out))
278 249
                        pl_counter += 1
279 250
                    else:
280
                        self.assertEqual(
281
                            PR.mock_calls[pr_counter], call(' %s' % item))
282
                        pr_counter += 1
283
                    page_size = page_size if page_size > 0 else len(items)
284
                    self.assertEqual(
285
                        PH.mock_calls[ph_counter],
286
                        call(i + 1, page_size, len(items)))
287
                    ph_counter += 1
251
                        exp_str = u' %s\n' % item
252
                        self.assertEqual(out.read(len(exp_str)), exp_str)
288 253

  
289 254
    def test_format_size(self):
290 255
        from kamaki.cli.utils import format_size
......
426 391
                    'Is', 'this', 'a', 'parsed', 'string?'])):
427 392
            self.assertEqual(split_input(line), expected)
428 393

  
429
    @patch('kamaki.cli.utils._readline', return_value='read line')
430
    @patch('kamaki.cli.utils._flush')
431
    @patch('kamaki.cli.utils._write')
432
    def test_ask_user(self, WR, FL, RL):
394
    def test_ask_user(self):
433 395
        from kamaki.cli.utils import ask_user
434
        msg = 'some question'
435
        self.assertFalse(ask_user(msg))
436
        WR.assert_called_once_with('%s [y/N]: ' % msg)
437
        FL.assert_called_once_with()
438
        RL.assert_called_once_with()
439

  
440
        self.assertTrue(ask_user(msg, ('r', )))
441
        self.assertEqual(WR.mock_calls[-1], call('%s [r/N]: ' % msg))
442
        self.assertEqual(FL.mock_calls, 2 * [call()])
443
        self.assertEqual(RL.mock_calls, 2 * [call()])
396
        msg = u'some question'
397
        out = StringIO()
398
        user_in = StringIO(u'n')
399
        self.assertFalse(ask_user(msg, out=out, user_in=user_in))
400
        self.assertEqual(out.getvalue(), u'%s [y/N]: ' % msg)
444 401

  
445
        self.assertTrue(ask_user(msg, ('Z', 'r', 'k')))
446
        self.assertEqual(WR.mock_calls[-1], call('%s [Z, r, k/N]: ' % msg))
447
        self.assertEqual(FL.mock_calls, 3 * [call()])
448
        self.assertEqual(RL.mock_calls, 3 * [call()])
402
        user_in.seek(0)
403
        out.seek(0)
404
        self.assertTrue(ask_user(msg, ('n', ), out=out, user_in=user_in))
405
        self.assertEqual(out.getvalue(), u'%s [n/<not n>]: ' % msg)
449 406

  
450
    @patch('kamaki.cli.utils._flush')
451
    @patch('kamaki.cli.utils._write')
452
    def test_spiner(self, WR, FL):
453
        from kamaki.cli.utils import spiner
454
        spins = ('/', '-', '\\', '|')
455
        prev = 1
456
        for i, SP in enumerate(spiner(6)):
457
            if not i:
458
                self.assertEqual(WR.mock_calls[-2], call(' '))
459
            elif i > 5:
460
                break
461
            self.assertEqual(SP, None)
462
            self.assertEqual(WR.mock_calls[-1], call('\b%s' % spins[i % 4]))
463
            self.assertEqual(FL.mock_calls, prev * [call()])
464
            prev += 1
407
        user_in = StringIO(unicode('N'))
408
        out.seek(0)
409
        self.assertTrue(ask_user(msg, ('r', 'N'), out=out, user_in=user_in))
410
        self.assertEqual(out.getvalue(), u'%s [r, N/<not r, N>]: ' % msg)
465 411

  
466 412
    def test_remove_from_items(self):
467 413
        from kamaki.cli.utils import remove_from_items

Also available in: Unified diff