Revision 78bd53a7
b/snf-pithos-app/pithos/api/test/accounts.py | ||
---|---|---|
42 | 42 |
import time as _time |
43 | 43 |
import datetime |
44 | 44 |
|
45 |
import django.utils.simplejson as json |
|
45 | 46 |
|
46 | 47 |
class AccountHead(PithosAPITest): |
47 | 48 |
def test_get_account_meta(self): |
... | ... | |
81 | 82 |
usage) |
82 | 83 |
|
83 | 84 |
def test_get_account_meta_until(self): |
85 |
cnames = ['apples', 'bananas', 'kiwis'] |
|
86 |
|
|
87 |
# create containers |
|
88 |
uploaded_bytes = 0 |
|
89 |
for cname in cnames: |
|
90 |
self.create_container(cname) |
|
91 |
|
|
92 |
# upload object |
|
93 |
name, data, resp = self.upload_object(cname) |
|
94 |
uploaded_bytes += len(data) |
|
95 |
|
|
84 | 96 |
self.update_account_meta({'foo': 'bar'}) |
85 | 97 |
|
86 | 98 |
account_info = self.get_account_info() |
... | ... | |
90 | 102 |
until = int(_time.mktime(t1.timetuple())) |
91 | 103 |
|
92 | 104 |
_time.sleep(2) |
105 |
|
|
106 |
# add containers |
|
107 |
cnames = ['oranges', 'pears'] |
|
108 |
for cname in cnames: |
|
109 |
self.create_container(cname) |
|
110 |
|
|
111 |
# upload object |
|
112 |
self.upload_object(cname) |
|
113 |
|
|
93 | 114 |
self.update_account_meta({'quality': 'AAA'}) |
94 | 115 |
|
95 | 116 |
account_info = self.get_account_info() |
... | ... | |
108 | 129 |
t = datetime.datetime.strptime( |
109 | 130 |
account_info['X-Account-Until-Timestamp'], DATE_FORMATS[2]) |
110 | 131 |
self.assertTrue(int(_time.mktime(t1.timetuple())) <= until) |
132 |
self.assertTrue('X-Account-Container-Count' in account_info) |
|
133 |
self.assertEqual(int(account_info['X-Account-Container-Count']), 3) |
|
134 |
self.assertTrue('X-Account-Bytes-Used' in account_info) |
|
111 | 135 |
|
112 | 136 |
def test_get_account_meta_until_invalid_date(self): |
113 | 137 |
self.update_account_meta({'quality': 'AAA'}) |
... | ... | |
135 | 159 |
self.assertEquals(containers, |
136 | 160 |
['apples', 'bananas', 'kiwis', 'oranges', 'pears']) |
137 | 161 |
|
162 |
def test_list_until(self): |
|
163 |
account_info = self.get_account_info() |
|
164 |
t = datetime.datetime.strptime(account_info['Last-Modified'], |
|
165 |
DATE_FORMATS[2]) |
|
166 |
t1 = t + datetime.timedelta(seconds=1) |
|
167 |
until = int(_time.mktime(t1.timetuple())) |
|
168 |
|
|
169 |
_time.sleep(2) |
|
170 |
|
|
171 |
self.create_container() |
|
172 |
|
|
173 |
url = join_urls(self.pithos_path, self.user) |
|
174 |
r = self.get('%s?until=%s' % (url, until)) |
|
175 |
self.assertEqual(r.status_code, 200) |
|
176 |
containers = r.content.split('\n') |
|
177 |
if '' in containers: |
|
178 |
containers.remove('') |
|
179 |
self.assertEqual(containers, |
|
180 |
['apples', 'bananas', 'kiwis', 'oranges', 'pears']) |
|
181 |
|
|
182 |
|
|
183 |
r = self.get('%s?until=%s&format=json' % (url, until)) |
|
184 |
self.assertEqual(r.status_code, 200) |
|
185 |
try: |
|
186 |
containers = json.loads(r.content) |
|
187 |
except: |
|
188 |
self.fail('json format expected') |
|
189 |
self.assertEqual([c['name'] for c in containers], |
|
190 |
['apples', 'bananas', 'kiwis', 'oranges', 'pears']) |
|
191 |
|
|
138 | 192 |
def test_list_shared(self): |
139 | 193 |
# upload and publish object |
140 | 194 |
oname, data, resp = self.upload_object('apples') |
b/snf-pithos-app/pithos/api/test/containers.py | ||
---|---|---|
83 | 83 |
(self.assertTrue('foo%s' % i in r['X-Container-Object-Meta']) |
84 | 84 |
for i in range(len(objects))) |
85 | 85 |
|
86 |
def test_get_container_meta_until(self): |
|
87 |
self.create_container('apples') |
|
88 |
|
|
89 |
# populate with objects |
|
90 |
objects = {} |
|
91 |
metalist = [] |
|
92 |
for i in range(random.randint(1, 100)): |
|
93 |
# upload object |
|
94 |
metakey = 'foo%s' % i |
|
95 |
meta = {metakey: 'bar'} |
|
96 |
name, data, resp = self.upload_object('apples', **meta) |
|
97 |
objects[name] = data |
|
98 |
metalist.append(metakey) |
|
99 |
|
|
100 |
self.update_container_meta('apples', {'foo': 'bar'}) |
|
101 |
|
|
102 |
container_info = self.get_container_info('apples') |
|
103 |
t = datetime.datetime.strptime(container_info['Last-Modified'], |
|
104 |
DATE_FORMATS[2]) |
|
105 |
t1 = t + datetime.timedelta(seconds=1) |
|
106 |
until = int(_time.mktime(t1.timetuple())) |
|
107 |
|
|
108 |
_time.sleep(2) |
|
109 |
|
|
110 |
for i in range(random.randint(1, 100)): |
|
111 |
# upload object |
|
112 |
meta = {'foo%s' % i: 'bar'} |
|
113 |
self.upload_object('apples', **meta) |
|
114 |
|
|
115 |
self.update_container_meta('apples', {'quality': 'AAA'}) |
|
116 |
|
|
117 |
container_info = self.get_container_info('apples') |
|
118 |
self.assertTrue('X-Container-Meta-Quality' in container_info) |
|
119 |
self.assertTrue('X-Container-Meta-Foo' in container_info) |
|
120 |
self.assertTrue('X-Container-Object-Count' in container_info) |
|
121 |
self.assertTrue(int(container_info['X-Container-Object-Count']) > len(objects)) |
|
122 |
self.assertTrue('X-Container-Bytes-Used' in container_info) |
|
123 |
|
|
124 |
t = datetime.datetime.strptime(container_info['Last-Modified'], |
|
125 |
DATE_FORMATS[-1]) |
|
126 |
last_modified = int(_time.mktime(t.timetuple())) |
|
127 |
assert until < last_modified |
|
128 |
|
|
129 |
container_info = self.get_container_info('apples', until=until) |
|
130 |
self.assertTrue('X-Container-Meta-Quality' not in container_info) |
|
131 |
self.assertTrue('X-Container-Meta-Foo' in container_info) |
|
132 |
self.assertTrue('X-Container-Until-Timestamp' in container_info) |
|
133 |
t = datetime.datetime.strptime( |
|
134 |
container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2]) |
|
135 |
self.assertTrue(int(_time.mktime(t1.timetuple())) <= until) |
|
136 |
self.assertTrue('X-Container-Object-Count' in container_info) |
|
137 |
self.assertEqual(int(container_info['X-Container-Object-Count']), len(objects)) |
|
138 |
self.assertTrue('X-Container-Bytes-Used' in container_info) |
|
139 |
self.assertEqual(int(container_info['X-Container-Bytes-Used']), |
|
140 |
sum([len(data) for data in objects.values()])) |
|
141 |
self.assertTrue('X-Container-Object-Meta' in container_info) |
|
142 |
self.assertEqual(container_info['X-Container-Object-Meta'], |
|
143 |
metalist) |
|
144 |
|
|
86 | 145 |
|
87 | 146 |
class ContainerGet(PithosAPITest): |
88 | 147 |
def setUp(self): |
... | ... | |
102 | 161 |
name, data, resp = self.upload_object('apples', o) |
103 | 162 |
self.objects['apples'][name] = data |
104 | 163 |
|
164 |
def test_list_until(self): |
|
165 |
account_info = self.get_account_info() |
|
166 |
t = datetime.datetime.strptime(account_info['Last-Modified'], |
|
167 |
DATE_FORMATS[2]) |
|
168 |
t1 = t + datetime.timedelta(seconds=1) |
|
169 |
until = int(_time.mktime(t1.timetuple())) |
|
170 |
|
|
171 |
_time.sleep(2) |
|
172 |
|
|
173 |
cname = self.cnames[0] |
|
174 |
self.upload_object(cname) |
|
175 |
|
|
176 |
url = join_urls(self.pithos_path, self.user, cname) |
|
177 |
r = self.get('%s?until=%s' % (url, until)) |
|
178 |
self.assertTrue(r.status_code, 200) |
|
179 |
objects = r.content.split('\n') |
|
180 |
if '' in objects: |
|
181 |
objects.remove('') |
|
182 |
self.assertEqual(objects, |
|
183 |
sorted(self.objects[cname].keys())) |
|
184 |
|
|
185 |
r = self.get('%s?until=%s&format=json' % (url, until)) |
|
186 |
self.assertTrue(r.status_code, 200) |
|
187 |
try: |
|
188 |
objects = json.loads(r.content) |
|
189 |
except: |
|
190 |
self.fail('json format expected') |
|
191 |
self.assertEqual([o['name'] for o in objects], |
|
192 |
sorted(self.objects[cname].keys())) |
|
193 |
|
|
105 | 194 |
def test_list_shared(self): |
106 | 195 |
# share an object |
107 | 196 |
cname = self.cnames[0] |
Also available in: Unified diff