from kamaki.cli.logger import get_logger
from kamaki.cli.utils import (
print_list, print_dict, print_json, print_items, ask_user,
- filter_dicts_by_dict)
+ filter_dicts_by_dict, DontRaiseUnicodeError, pref_enc)
from kamaki.cli.argument import FlagArgument, ValueArgument
from kamaki.cli.errors import CLIInvalidArgument
from sys import stdin, stdout, stderr
import codecs
-import locale
log = get_logger(__name__)
-pref_enc = locale.getpreferredencoding()
-
-
-def _encode_nicely(somestr, encoding=pref_enc, replacement='?'):
- """Encode somestr as 'encoding', but don't raise errors (replace with ?)
- :param encoding: (str) encode every character in this encoding
- :param replacement: (char) replace each char raising encode-decode errs
- """
- newstr = ''
- for c in somestr:
- try:
- newc = c.encode(encoding)
- newstr = '%s%s' % (newstr, newc)
- except (UnicodeDecodeError, UnicodeEncodeError) as e:
- log.debug('Encoding(%s): %s' % (encoding, e))
- newstr = '%s%s' % (newstr, replacement)
- return newstr
def DontRaiseKeyError(func):
self.auth_base = auth_base or getattr(self, 'auth_base', None)
self.cloud = cloud or getattr(self, 'cloud', None)
+ @DontRaiseUnicodeError
def write(self, s):
self._out.write('%s' % s)
self._out.flush()
+ @DontRaiseUnicodeError
def writeln(self, s=''):
self.write('%s\n' % s)
+ @DontRaiseUnicodeError
def error(self, s=''):
self._err.write('%s\n' % s)
self._err.flush()
if not isinstance(cmd_slice, slice):
lines = [lines, ]
if self['match']:
- lines = [l for l in lines if self._match(l, self['match'])]
+ lines = [l for l in lines if self.history._match(l, self['match'])]
self.print_items([l[:-1] for l in lines])
def main(self, cmd_numbers=''):
from re import compile as regex_compile
from os import walk, path
from json import dumps
+from kamaki.cli.logger import get_logger
+from locale import getpreferredencoding
from kamaki.cli.errors import raiseCLIError
INDENT_TAB = 4
+log = get_logger(__name__)
+pref_enc = getpreferredencoding()
suggest = dict(ansicolors=dict(
suggest['ansicolors']['active'] = True
+def _encode_nicely(somestr, encoding, replacement='?'):
+ """Encode somestr as 'encoding', but don't raise errors (replace with ?)
+ This method is slow. Us it only for grace.
+ :param encoding: (str) encode every character in this encoding
+ :param replacement: (char) replace each char raising encode-decode errs
+ """
+ newstr, err_counter = '', 0
+ for c in somestr:
+ try:
+ newc = c.encode(encoding)
+ newstr = '%s%s' % (newstr, newc)
+ except UnicodeError:
+ newstr = '%s%s' % (newstr, replacement)
+ err_counter += 1
+ if err_counter:
+ log.debug('\t%s character%s failed to be encoded as %s' % (
+ err_counter, 's' if err_counter > 1 else '', encoding))
+ return newstr
+
+
+def DontRaiseUnicodeError(foo):
+ def wrap(self, *args, **kwargs):
+ try:
+ s = kwargs.pop('s')
+ except KeyError:
+ try:
+ s = args[0]
+ except IndexError:
+ return foo(self, *args, **kwargs)
+ args = args[1:]
+ try:
+ s = s.encode(pref_enc)
+ except UnicodeError as ue:
+ log.debug('Encoding(%s): %s' % (pref_enc, ue))
+ s = _encode_nicely(s, pref_enc, replacement='?')
+ return foo(self, s, *args, **kwargs)
+ return wrap
+
+
def suggest_missing(miss=None, exclude=[]):
global suggest
sgs = dict(suggest)
if k.lower in ('x-auth-token', ) and (
not self.LOG_TOKEN):
self._token, v = v, '...'
- v = unquote(v)
+ v = unquote(v).decode('utf-8')
self._headers[k] = v
recvlog.info(' %s: %s%s' % (k, v, plog))
self._content = r.read()