Revision 5655d560 kamaki/clients/pithos/__init__.py
b/kamaki/clients/pithos/__init__.py | ||
---|---|---|
101 | 101 |
cnt_back_up = self.container |
102 | 102 |
try: |
103 | 103 |
self.container = container or cnt_back_up |
104 |
self.container_delete(until=unicode(time())) |
|
104 |
r = self.container_delete(until=unicode(time()))
|
|
105 | 105 |
finally: |
106 | 106 |
self.container = cnt_back_up |
107 |
return r.headers |
|
107 | 108 |
|
108 | 109 |
def upload_object_unchunked( |
109 | 110 |
self, obj, f, |
... | ... | |
838 | 839 |
ret = [''] * num_of_blocks |
839 | 840 |
self._init_thread_limit() |
840 | 841 |
flying = dict() |
841 |
for blockid, blockhash in enumerate(remote_hashes): |
|
842 |
start = blocksize * blockid |
|
843 |
is_last = start + blocksize > total_size |
|
844 |
end = (total_size - 1) if is_last else (start + blocksize - 1) |
|
845 |
(start, end) = _range_up(start, end, range_str) |
|
846 |
if start < end: |
|
847 |
self._watch_thread_limit(flying.values()) |
|
848 |
flying[blockid] = self._get_block_async(obj, **restargs) |
|
849 |
for runid, thread in flying.items(): |
|
850 |
if (blockid + 1) == num_of_blocks: |
|
851 |
thread.join() |
|
852 |
elif thread.isAlive(): |
|
853 |
continue |
|
854 |
if thread.exception: |
|
855 |
raise thread.exception |
|
856 |
ret[runid] = thread.value.content |
|
857 |
self._cb_next() |
|
858 |
flying.pop(runid) |
|
859 |
return ''.join(ret) |
|
842 |
try: |
|
843 |
for blockid, blockhash in enumerate(remote_hashes): |
|
844 |
start = blocksize * blockid |
|
845 |
is_last = start + blocksize > total_size |
|
846 |
end = (total_size - 1) if is_last else (start + blocksize - 1) |
|
847 |
(start, end) = _range_up(start, end, range_str) |
|
848 |
if start < end: |
|
849 |
self._watch_thread_limit(flying.values()) |
|
850 |
flying[blockid] = self._get_block_async(obj, **restargs) |
|
851 |
for runid, thread in flying.items(): |
|
852 |
if (blockid + 1) == num_of_blocks: |
|
853 |
thread.join() |
|
854 |
elif thread.isAlive(): |
|
855 |
continue |
|
856 |
if thread.exception: |
|
857 |
raise thread.exception |
|
858 |
ret[runid] = thread.value.content |
|
859 |
self._cb_next() |
|
860 |
flying.pop(runid) |
|
861 |
return ''.join(ret) |
|
862 |
except KeyboardInterrupt: |
|
863 |
sendlog.info('- - - wait for threads to finish') |
|
864 |
for thread in activethreads(): |
|
865 |
thread.join() |
|
860 | 866 |
|
861 | 867 |
#Command Progress Bar method |
862 | 868 |
def _cb_next(self, step=1): |
... | ... | |
1028 | 1034 |
raise ClientError( |
1029 | 1035 |
'Container "%s" is not empty' % self.container, |
1030 | 1036 |
r.status_code) |
1037 |
return r.headers |
|
1031 | 1038 |
|
1032 | 1039 |
def get_container_versioning(self, container=None): |
1033 | 1040 |
""" |
... | ... | |
1128 | 1135 |
:param delimiter: (str) |
1129 | 1136 |
""" |
1130 | 1137 |
self._assert_container() |
1131 |
self.object_delete(obj, until=until, delimiter=delimiter) |
|
1138 |
r = self.object_delete(obj, until=until, delimiter=delimiter) |
|
1139 |
return r.headers |
|
1132 | 1140 |
|
1133 | 1141 |
def set_object_meta(self, obj, metapairs): |
1134 | 1142 |
""" |
... | ... | |
1163 | 1171 |
""" |
1164 | 1172 |
:param obj: (str) remote object path |
1165 | 1173 |
""" |
1166 |
self.object_post(obj, update=True, public=False) |
|
1174 |
r = self.object_post(obj, update=True, public=False) |
|
1175 |
return r.headers |
|
1167 | 1176 |
|
1168 | 1177 |
def get_object_info(self, obj, version=None): |
1169 | 1178 |
""" |
... | ... | |
1255 | 1264 |
filesize = fstat(source_file.fileno()).st_size |
1256 | 1265 |
nblocks = 1 + (filesize - 1) // blocksize |
1257 | 1266 |
offset = 0 |
1267 |
headers = {} |
|
1258 | 1268 |
if upload_cb: |
1259 |
upload_gen = upload_cb(nblocks) |
|
1260 |
upload_gen.next() |
|
1261 |
for i in range(nblocks): |
|
1262 |
block = source_file.read(min(blocksize, filesize - offset)) |
|
1263 |
offset += len(block) |
|
1264 |
self.object_post( |
|
1265 |
obj, |
|
1266 |
update=True, |
|
1267 |
content_range='bytes */*', |
|
1268 |
content_type='application/octet-stream', |
|
1269 |
content_length=len(block), |
|
1270 |
data=block) |
|
1269 |
self.progress_bar_gen = upload_cb(nblocks) |
|
1270 |
self._cb_next() |
|
1271 |
flying = {} |
|
1272 |
self._init_thread_limit() |
|
1273 |
try: |
|
1274 |
for i in range(nblocks): |
|
1275 |
block = source_file.read(min(blocksize, filesize - offset)) |
|
1276 |
offset += len(block) |
|
1271 | 1277 |
|
1272 |
if upload_cb: |
|
1273 |
upload_gen.next() |
|
1278 |
self._watch_thread_limit(flying.values()) |
|
1279 |
unfinished = {} |
|
1280 |
flying[i] = SilentEvent( |
|
1281 |
method=self.object_post, |
|
1282 |
obj=obj, |
|
1283 |
update=True, |
|
1284 |
content_range='bytes */*', |
|
1285 |
content_type='application/octet-stream', |
|
1286 |
content_length=len(block), |
|
1287 |
data=block) |
|
1288 |
flying[i].start() |
|
1289 |
|
|
1290 |
for key, thread in flying.items(): |
|
1291 |
if thread.isAlive(): |
|
1292 |
if i < nblocks: |
|
1293 |
unfinished[key] = thread |
|
1294 |
continue |
|
1295 |
thread.join() |
|
1296 |
if thread.exception: |
|
1297 |
raise thread.exception |
|
1298 |
headers[key] = thread.value.headers |
|
1299 |
self._cb_next() |
|
1300 |
flying = unfinished |
|
1301 |
except KeyboardInterrupt: |
|
1302 |
sendlog.info('- - - wait for threads to finish') |
|
1303 |
for thread in activethreads(): |
|
1304 |
thread.join() |
|
1305 |
return headers.values() |
|
1274 | 1306 |
|
1275 | 1307 |
def truncate_object(self, obj, upto_bytes): |
1276 | 1308 |
""" |
Also available in: Unified diff