Revision 74b7c6dc kamaki/cli/commands/pithos.py

b/kamaki/cli/commands/pithos.py
42 42
from kamaki.cli.commands import (
43 43
    _command_init, errors, addLogSettings, DontRaiseKeyError, _optional_json,
44 44
    _name_filter, _optional_output_cmd)
45
from kamaki.cli.errors import CLIBaseUrlError, CLIError, CLIInvalidArgument
45
from kamaki.cli.errors import (
46
    CLIBaseUrlError, CLIError, CLIInvalidArgument, raiseCLIError)
46 47
from kamaki.cli.argument import (
47 48
    FlagArgument, IntArgument, ValueArgument, DateArgument,
48 49
    ProgressBarArgument, RepeatableArgument)
......
279 280
    def _run(self):
280 281
        self._optional_output(self.client.create_directory(self.path))
281 282

  
282
    def main(self, container___directory):
283
        super(self.__class__, self)._run(
284
            container___directory, path_is_optional=False)
283
    def main(self, path_or_url):
284
        super(self.__class__, self)._run(path_or_url)
285 285
        self._run()
286 286

  
287 287

  
......
388 388
                                    'source_prefix'].parsed_name))])
389 389
                raise
390 390
            dst_path = self.dst_path or self.path
391
            dst_obj = dst_objects.get(dst_path, None) or self.path
391
            dst_obj = dst_objects.get(dst_path or self.path, None)
392 392
            if self['force'] or not dst_obj:
393 393
                pairs.append(
394 394
                    (None if self._is_dir(src_obj) else self.path, dst_path))
......
870 870
    def main(self, path_or_url):
871 871
        super(self.__class__, self)._run(path_or_url)
872 872
        self._run()
873

  
874

  
875
@command(file_cmds)
876
class file_download(_pithos_container):
877
    """Download a remove file or directory object to local file system"""
878

  
879
    arguments = dict(
880
        resume=FlagArgument(
881
            'Resume/Overwrite (attempt resume, else overwrite)',
882
            ('-f', '--resume')),
883
        range=RangeArgument('Download only that range of data', '--range'),
884
        matching_etag=ValueArgument('download iff ETag match', '--if-match'),
885
        non_matching_etag=ValueArgument(
886
            'download iff ETags DO NOT match', '--if-none-match'),
887
        modified_since_date=DateArgument(
888
            'download iff remote file is modified since then',
889
            '--if-modified-since'),
890
        unmodified_since_date=DateArgument(
891
            'show output iff remote file is unmodified since then',
892
            '--if-unmodified-since'),
893
        object_version=ValueArgument(
894
            'download a file of a specific version',
895
            ('-O', '--object-version')),
896
        max_threads=IntArgument('default: 5', '--threads'),
897
        progress_bar=ProgressBarArgument(
898
            'do not show progress bar', ('-N', '--no-progress-bar'),
899
            default=False),
900
        recursive=FlagArgument(
901
            'Download a remote directory object and its contents',
902
            ('-r', '--recursive'))
903
        )
904

  
905
    def _src_dst(self, local_path):
906
        """Create a list of (src, dst) where src is a remote location and dst
907
        is an open file descriptor. Directories are denoted as (None, dirpath)
908
        and they are pretended to other objects in a very strict order (shorter
909
        to longer path)."""
910
        ret = []
911
        try:
912
            obj = self.client.get_object_info(
913
                self.path, version=self['object_version'])
914
            obj.setdefault('name', self.path.strip('/'))
915
        except ClientError as ce:
916
            if ce.status in (404, ):
917
                raiseCLIError(ce, details=[
918
                    'To download an object, the object must exist either as a'
919
                    ' file or as a directory.',
920
                    'For example, to download everything under prefix/ the '
921
                    'directory "prefix" must exist.',
922
                    'To see if an remote object is actually there:',
923
                    '  /file info [/CONTAINER/]OBJECT',
924
                    'To create a directory object:',
925
                    '  /file mkdir [/CONTAINER/]OBJECT'])
926
            if ce.status in (204, ):
927
                raise CLIError(
928
                    'No file or directory objects to download',
929
                    details=[
930
                        'To download a container (e.g., %s):' % self.container,
931
                        '  [kamaki] container download %s [LOCAL_PATH]' % (
932
                            self.container)])
933
            raise
934
        rpath = self.path.strip('/')
935
        local_path = local_path[-1:] if (
936
            local_path.endswith('/')) else local_path
937

  
938
        if self._is_dir(obj):
939
            if self['recursive']:
940
                dirs, files = [obj, ], []
941
                objects = self.client.container_get(
942
                    path=self.path or '/',
943
                    if_modified_since=self['modified_since_date'],
944
                    if_unmodified_since=self['unmodified_since_date'])
945
                for obj in objects.json:
946
                    (dirs if self._is_dir(obj) else files).append(obj)
947

  
948
                #  Put the directories on top of the list
949
                for dpath in sorted(['%s%s' % (
950
                        local_path, d['name'][len(rpath):]) for d in dirs]):
951
                    if path.exists(dpath):
952
                        if path.isdir(dpath):
953
                            continue
954
                        raise CLIError(
955
                            'Cannot replace local file %s with a directory '
956
                            'of the same name' % dpath,
957
                            details=[
958
                                'Either remove the file or specify a'
959
                                'different target location'])
960
                    ret.append((None, dpath, None))
961

  
962
                #  Append the file objects
963
                for opath in [o['name'] for o in files]:
964
                    lpath = '%s%s' % (local_path, opath[len(rpath):])
965
                    if self['resume']:
966
                        fxists = path.exists(lpath)
967
                        if fxists and path.isdir(lpath):
968
                            raise CLIError(
969
                                'Cannot change local dir %s info file' % (
970
                                    lpath),
971
                                details=[
972
                                    'Either remove the file or specify a'
973
                                    'different target location'])
974
                        ret.append((opath, lpath, fxists))
975
                    elif path.exists(lpath):
976
                        raise CLIError(
977
                            'Cannot overwrite %s' % lpath,
978
                            details=['To overwrite/resume, use  %s' % '/'.join(
979
                                self.arguments['resume'].parsed_name)])
980
                    else:
981
                        ret.append((opath, lpath, None))
982
            else:
983
                raise CLIError(
984
                    'Remote object /%s/%s is a directory' % (
985
                        self.container, local_path),
986
                    details=['Use %s to download directories' % '/'.join(
987
                        self.arguments['recursive'].parsed_name)])
988
        else:
989
            #  Remote object is just a file
990
            if path.exists(local_path) and not self['resume']:
991
                raise CLIError(
992
                    'Cannot overwrite local file %s' % (lpath),
993
                    details=['To overwrite/resume, use  %s' % '/'.join(
994
                        self.arguments['resume'].parsed_name)])
995
            ret.append((rpath, local_path, self['resume']))
996
        for r, l, resume in ret:
997
            if r:
998
                with open(l, 'rwb+' if resume else 'wb+') as f:
999
                    yield (r, f)
1000
            else:
1001
                yield (r, l)
1002

  
1003
    @errors.generic.all
1004
    @errors.pithos.connection
1005
    @errors.pithos.container
1006
    @errors.pithos.object_path
1007
    @errors.pithos.local_path
1008
    def _run(self, local_path):
1009
        self.client.MAX_THREADS = self['max_threads'] or 5
1010
        progress_bar = None
1011
        try:
1012
            for rpath, output_file in self._src_dst(local_path):
1013
                if not rpath:
1014
                    self.error('Create local directory %s' % output_file)
1015
                    makedirs(output_file)
1016
                    continue
1017
                self.error('/%s/%s --> %s' % (
1018
                    self.container, rpath, output_file.name))
1019
                progress_bar, download_cb = self._safe_progress_bar(
1020
                    '  download')
1021
                self.client.download_object(
1022
                    rpath, output_file,
1023
                    download_cb=download_cb,
1024
                    range_str=self['range'],
1025
                    version=self['object_version'],
1026
                    if_match=self['matching_etag'],
1027
                    resume=self['resume'],
1028
                    if_none_match=self['non_matching_etag'],
1029
                    if_modified_since=self['modified_since_date'],
1030
                    if_unmodified_since=self['unmodified_since_date'])
1031
        except KeyboardInterrupt:
1032
            from threading import activeCount, enumerate as activethreads
1033
            timeout = 0.5
1034
            while activeCount() > 1:
1035
                self._out.write('\nCancel %s threads: ' % (activeCount() - 1))
1036
                self._out.flush()
1037
                for thread in activethreads():
1038
                    try:
1039
                        thread.join(timeout)
1040
                        self._out.write('.' if thread.isAlive() else '*')
1041
                    except RuntimeError:
1042
                        continue
1043
                    finally:
1044
                        self._out.flush()
1045
                        timeout += 0.1
1046
            self.error('\nDownload canceled by user')
1047
            if local_path is not None:
1048
                self.error('to resume, re-run with --resume')
1049
        except Exception:
1050
            self._safe_progress_bar_finish(progress_bar)
1051
            raise
1052
        finally:
1053
            self._safe_progress_bar_finish(progress_bar)
1054

  
1055
    def main(self, remote_path_or_url, local_path=None):
1056
        super(self.__class__, self)._run(remote_path_or_url)
1057
        local_path = local_path or self.path or '.'
1058
        self._run(local_path=local_path)

Also available in: Unified diff