Revision 78bd53a7 snf-pithos-app/pithos/api/test/containers.py

b/snf-pithos-app/pithos/api/test/containers.py
83 83
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
84 84
            for i in range(len(objects)))
85 85

  
86
    def test_get_container_meta_until(self):
87
        self.create_container('apples')
88

  
89
        # populate with objects
90
        objects = {}
91
        metalist = []
92
        for i in range(random.randint(1, 100)):
93
            # upload object
94
            metakey = 'foo%s' % i
95
            meta = {metakey: 'bar'}
96
            name, data, resp = self.upload_object('apples', **meta)
97
            objects[name] = data
98
            metalist.append(metakey) 
99

  
100
        self.update_container_meta('apples', {'foo': 'bar'})
101

  
102
        container_info = self.get_container_info('apples')
103
        t = datetime.datetime.strptime(container_info['Last-Modified'],
104
                                       DATE_FORMATS[2])
105
        t1 = t + datetime.timedelta(seconds=1)
106
        until = int(_time.mktime(t1.timetuple()))
107

  
108
        _time.sleep(2)
109

  
110
        for i in range(random.randint(1, 100)):
111
            # upload object
112
            meta = {'foo%s' % i: 'bar'}
113
            self.upload_object('apples', **meta)
114

  
115
        self.update_container_meta('apples', {'quality': 'AAA'})
116

  
117
        container_info = self.get_container_info('apples')
118
        self.assertTrue('X-Container-Meta-Quality' in container_info)
119
        self.assertTrue('X-Container-Meta-Foo' in container_info)
120
        self.assertTrue('X-Container-Object-Count' in container_info)
121
        self.assertTrue(int(container_info['X-Container-Object-Count']) > len(objects))
122
        self.assertTrue('X-Container-Bytes-Used' in container_info)
123

  
124
        t = datetime.datetime.strptime(container_info['Last-Modified'],
125
                                       DATE_FORMATS[-1])
126
        last_modified = int(_time.mktime(t.timetuple()))
127
        assert until < last_modified
128

  
129
        container_info = self.get_container_info('apples', until=until)
130
        self.assertTrue('X-Container-Meta-Quality' not in container_info)
131
        self.assertTrue('X-Container-Meta-Foo' in container_info)
132
        self.assertTrue('X-Container-Until-Timestamp' in container_info)
133
        t = datetime.datetime.strptime(
134
            container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2])
135
        self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
136
        self.assertTrue('X-Container-Object-Count' in container_info)
137
        self.assertEqual(int(container_info['X-Container-Object-Count']), len(objects))
138
        self.assertTrue('X-Container-Bytes-Used' in container_info)
139
        self.assertEqual(int(container_info['X-Container-Bytes-Used']),
140
                         sum([len(data) for data in objects.values()]))
141
        self.assertTrue('X-Container-Object-Meta' in container_info)
142
        self.assertEqual(container_info['X-Container-Object-Meta'],
143
                         metalist) 
144

  
86 145

  
87 146
class ContainerGet(PithosAPITest):
88 147
    def setUp(self):
......
102 161
            name, data, resp = self.upload_object('apples', o)
103 162
            self.objects['apples'][name] = data
104 163

  
164
    def test_list_until(self):
165
        account_info = self.get_account_info()
166
        t = datetime.datetime.strptime(account_info['Last-Modified'],
167
                                       DATE_FORMATS[2])
168
        t1 = t + datetime.timedelta(seconds=1)
169
        until = int(_time.mktime(t1.timetuple()))
170

  
171
        _time.sleep(2)
172

  
173
        cname = self.cnames[0]
174
        self.upload_object(cname)
175

  
176
        url = join_urls(self.pithos_path, self.user, cname)
177
        r = self.get('%s?until=%s' % (url, until))
178
        self.assertTrue(r.status_code, 200)
179
        objects = r.content.split('\n')
180
        if '' in objects:
181
            objects.remove('')
182
        self.assertEqual(objects,
183
                         sorted(self.objects[cname].keys()))
184

  
185
        r = self.get('%s?until=%s&format=json' % (url, until))
186
        self.assertTrue(r.status_code, 200)
187
        try:
188
            objects = json.loads(r.content)
189
        except:
190
            self.fail('json format expected')
191
        self.assertEqual([o['name'] for o in objects],
192
                         sorted(self.objects[cname].keys()))
193

  
105 194
    def test_list_shared(self):
106 195
        # share an object
107 196
        cname = self.cnames[0]

Also available in: Unified diff