Revision 5655d560 kamaki/clients/pithos/__init__.py

b/kamaki/clients/pithos/__init__.py
101 101
        cnt_back_up = self.container
102 102
        try:
103 103
            self.container = container or cnt_back_up
104
            self.container_delete(until=unicode(time()))
104
            r = self.container_delete(until=unicode(time()))
105 105
        finally:
106 106
            self.container = cnt_back_up
107
        return r.headers
107 108

  
108 109
    def upload_object_unchunked(
109 110
            self, obj, f,
......
838 839
        ret = [''] * num_of_blocks
839 840
        self._init_thread_limit()
840 841
        flying = dict()
841
        for blockid, blockhash in enumerate(remote_hashes):
842
            start = blocksize * blockid
843
            is_last = start + blocksize > total_size
844
            end = (total_size - 1) if is_last else (start + blocksize - 1)
845
            (start, end) = _range_up(start, end, range_str)
846
            if start < end:
847
                self._watch_thread_limit(flying.values())
848
                flying[blockid] = self._get_block_async(obj, **restargs)
849
            for runid, thread in flying.items():
850
                if (blockid + 1) == num_of_blocks:
851
                    thread.join()
852
                elif thread.isAlive():
853
                    continue
854
                if thread.exception:
855
                    raise thread.exception
856
                ret[runid] = thread.value.content
857
                self._cb_next()
858
                flying.pop(runid)
859
        return ''.join(ret)
842
        try:
843
            for blockid, blockhash in enumerate(remote_hashes):
844
                start = blocksize * blockid
845
                is_last = start + blocksize > total_size
846
                end = (total_size - 1) if is_last else (start + blocksize - 1)
847
                (start, end) = _range_up(start, end, range_str)
848
                if start < end:
849
                    self._watch_thread_limit(flying.values())
850
                    flying[blockid] = self._get_block_async(obj, **restargs)
851
                for runid, thread in flying.items():
852
                    if (blockid + 1) == num_of_blocks:
853
                        thread.join()
854
                    elif thread.isAlive():
855
                        continue
856
                    if thread.exception:
857
                        raise thread.exception
858
                    ret[runid] = thread.value.content
859
                    self._cb_next()
860
                    flying.pop(runid)
861
            return ''.join(ret)
862
        except KeyboardInterrupt:
863
            sendlog.info('- - - wait for threads to finish')
864
            for thread in activethreads():
865
                thread.join()
860 866

  
861 867
    #Command Progress Bar method
862 868
    def _cb_next(self, step=1):
......
1028 1034
            raise ClientError(
1029 1035
                'Container "%s" is not empty' % self.container,
1030 1036
                r.status_code)
1037
        return r.headers
1031 1038

  
1032 1039
    def get_container_versioning(self, container=None):
1033 1040
        """
......
1128 1135
        :param delimiter: (str)
1129 1136
        """
1130 1137
        self._assert_container()
1131
        self.object_delete(obj, until=until, delimiter=delimiter)
1138
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1139
        return r.headers
1132 1140

  
1133 1141
    def set_object_meta(self, obj, metapairs):
1134 1142
        """
......
1163 1171
        """
1164 1172
        :param obj: (str) remote object path
1165 1173
        """
1166
        self.object_post(obj, update=True, public=False)
1174
        r = self.object_post(obj, update=True, public=False)
1175
        return r.headers
1167 1176

  
1168 1177
    def get_object_info(self, obj, version=None):
1169 1178
        """
......
1255 1264
        filesize = fstat(source_file.fileno()).st_size
1256 1265
        nblocks = 1 + (filesize - 1) // blocksize
1257 1266
        offset = 0
1267
        headers = {}
1258 1268
        if upload_cb:
1259
            upload_gen = upload_cb(nblocks)
1260
            upload_gen.next()
1261
        for i in range(nblocks):
1262
            block = source_file.read(min(blocksize, filesize - offset))
1263
            offset += len(block)
1264
            self.object_post(
1265
                obj,
1266
                update=True,
1267
                content_range='bytes */*',
1268
                content_type='application/octet-stream',
1269
                content_length=len(block),
1270
                data=block)
1269
            self.progress_bar_gen = upload_cb(nblocks)
1270
            self._cb_next()
1271
        flying = {}
1272
        self._init_thread_limit()
1273
        try:
1274
            for i in range(nblocks):
1275
                block = source_file.read(min(blocksize, filesize - offset))
1276
                offset += len(block)
1271 1277

  
1272
            if upload_cb:
1273
                upload_gen.next()
1278
                self._watch_thread_limit(flying.values())
1279
                unfinished = {}
1280
                flying[i] = SilentEvent(
1281
                    method=self.object_post,
1282
                    obj=obj,
1283
                    update=True,
1284
                    content_range='bytes */*',
1285
                    content_type='application/octet-stream',
1286
                    content_length=len(block),
1287
                    data=block)
1288
                flying[i].start()
1289

  
1290
                for key, thread in flying.items():
1291
                    if thread.isAlive():
1292
                        if i < nblocks:
1293
                            unfinished[key] = thread
1294
                            continue
1295
                        thread.join()
1296
                    if thread.exception:
1297
                        raise thread.exception
1298
                    headers[key] = thread.value.headers
1299
                    self._cb_next()
1300
                flying = unfinished
1301
        except KeyboardInterrupt:
1302
            sendlog.info('- - - wait for threads to finish')
1303
            for thread in activethreads():
1304
                thread.join()
1305
        return headers.values()
1274 1306

  
1275 1307
    def truncate_object(self, obj, upto_bytes):
1276 1308
        """

Also available in: Unified diff