# Copyright 2013 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
#
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

"""
This is the burnin class that tests the Pithos functionality

"""

import random
from datetime import datetime
from tempfile import NamedTemporaryFile

from synnefo_tools.burnin.common import BurninTests, Proper
from kamaki.clients import ClientError


def sample_block(f, block):
    block_size = 4 * 1024 * 1024
    f.seek(block * block_size)
    ch = [f.read(1)]
    f.seek(block_size / 2, 1)
    ch.append(f.read(1))
    f.seek((block + 1) * block_size - 1)
    ch.append(f.read(1))
    return ch


# Too many public methods. pylint: disable-msg=R0904
class PithosTestSuite(BurninTests):
    """Test Pithos functionality"""
    containers = Proper(value=None)
    created_container = Proper(value=None)
    now_unformated = Proper(value=datetime.utcnow())
    obj_metakey = Proper(value=None)
    large_file = Proper(value=None)

    def test_005_account_head(self):
        """Test account HEAD"""
        self._set_pithos_account(self._get_uuid())
        pithos = self.clients.pithos
        r = pithos.account_head()
        self.assertEqual(r.status_code, 204)
        self.info('Returns 204')

        r = pithos.account_head(until='1000000000')
        self.assertEqual(r.status_code, 204)
        datestring = unicode(r.headers['x-account-until-timestamp'])
        self.assertEqual(u'Sun, 09 Sep 2001 01:46:40 GMT', datestring)
        self.assertTrue('x-account-policy-quota' in r.headers)
        self.info('Until and policy quota exist')

        for format in pithos.DATE_FORMATS:
            now_formated = self.now_unformated.strftime(format)
            r1 = pithos.account_head(
                if_modified_since=now_formated, success=(204, 304, 412))
            r2 = pithos.account_head(
                if_unmodified_since=now_formated, success=(204, 304, 412))
            self.assertNotEqual(r1.status_code, r2.status_code)
        self.info('If_(un)modified_since is OK')

    def test_010_account_get(self):
        """Test account GET"""
        self.info('Preparation')
        pithos = self.clients.pithos
        for i in range(1, 3):
            cont_name = "cont%s_%s%s" % (
                i, self.run_id or 0, random.randint(1000, 9999))
            self._create_pithos_container(cont_name)
        pithos.container, obj = cont_name, 'shared_file'
        pithos.create_object(obj)
        pithos.set_object_sharing(obj, read_permission='*')
        self.info('Created object /%s/%s' % (cont_name, obj))

        #  Try to re-create the same container
        pithos.create_container(cont_name)

        r = pithos.list_containers()
        fullLen = len(r)
        self.assertTrue(fullLen > 2)
        self.info('Normal use is OK')

        cnames = [c['name'] for c in r]
        self.assertEqual(sorted(list(set(cnames))), sorted(cnames))
        self.info('Containers have unique names')

        r = pithos.account_get(limit=1)
        self.assertEqual(len(r.json), 1)
        self.info('Limit works')

        r = pithos.account_get(marker='cont')
        cont1, cont3 = r.json[0], r.json[2]
        self.info('Marker works')

        r = pithos.account_get(limit=2, marker='cont')
        conames = [container['name'] for container in r.json if (
            container['name'].lower().startswith('cont'))]
        self.assertTrue(cont1['name'] in conames)
        self.assertFalse(cont3['name'] in conames)
        self.info('Marker-limit combination works')

        r = pithos.account_get(show_only_shared=True)
        self.assertTrue(cont_name in [c['name'] for c in r.json])
        self.info('Show-only-shared works')

        r = pithos.account_get(until=1342609206.0)
        self.assertTrue(len(r.json) <= fullLen)
        self.info('Until works')

        for format in pithos.DATE_FORMATS:
            now_formated = self.now_unformated.strftime(format)
            r1 = pithos.account_get(
                if_modified_since=now_formated, success=(200, 304, 412))
            r2 = pithos.account_get(
                if_unmodified_since=now_formated, success=(200, 304, 412))
            self.assertNotEqual(r1.status_code, r2.status_code)
        self.info('If_(un)modified_since is OK')

    def test_015_account_post(self):
        """Test account POST"""
        pithos = self.clients.pithos
        r = pithos.account_post()
        self.assertEqual(r.status_code, 202)
        self.info('Status code is OK')

        rand_num = '%s%s' % (self.run_id or 0, random.randint(1000, 9999))
        grpName = 'grp%s' % rand_num

        u1, u2 = pithos.account, 'invalid-user-uuid-%s' % rand_num
        self.assertRaises(
            ClientError, pithos.set_account_group, grpName, [u1, u2])
        self.info('Invalid uuid is handled correctly')

        pithos.set_account_group(grpName, [u1])
        r = pithos.get_account_group()
        self.assertEqual(r['x-account-group-' + grpName], '%s' % u1)
        self.info('Account group is OK')
        pithos.del_account_group(grpName)
        r = pithos.get_account_group()
        self.assertTrue('x-account-group-' + grpName not in r)
        self.info('Removed account group')

        mprefix = 'meta%s' % rand_num
        pithos.set_account_meta({
            mprefix + '1': 'v1', mprefix + '2': 'v2'})
        r = pithos.get_account_meta()
        self.assertEqual(r['x-account-meta-' + mprefix + '1'], 'v1')
        self.assertEqual(r['x-account-meta-' + mprefix + '2'], 'v2')
        self.info('Account meta is OK')

        pithos.del_account_meta(mprefix + '1')
        r = pithos.get_account_meta()
        self.assertTrue('x-account-meta-' + mprefix + '1' not in r)
        self.assertTrue('x-account-meta-' + mprefix + '2' in r)
        self.info('Selective removal of account meta is OK')

        pithos.del_account_meta(mprefix + '2')
        r = pithos.get_account_meta()
        self.assertTrue('x-account-meta-' + mprefix + '2' not in r)
        self.info('Temporary account meta are removed')

    def test_020_container_head(self):
        """Test container HEAD"""
        pithos = self.clients.pithos
        r = pithos.container_head()
        self.assertEqual(r.status_code, 204)
        self.info('Status code is OK')

        r = pithos.container_head(until=1000000, success=(204, 404))
        self.assertEqual(r.status_code, 404)
        self.info('Until works')

        for format in pithos.DATE_FORMATS:
            now_formated = self.now_unformated.strftime(format)
            r1 = pithos.container_head(
                if_modified_since=now_formated, success=(204, 304, 412))
            r2 = pithos.container_head(
                if_unmodified_since=now_formated, success=(204, 304, 412))
            self.assertNotEqual(r1.status_code, r2.status_code)

        k = 'metakey%s' % random.randint(1000, 9999)
        pithos.set_container_meta({k: 'our value'})
        r = pithos.get_container_meta()
        k = 'x-container-meta-%s' % k
        self.assertIn(k, r)
        self.assertEqual('our value', r[k])
        self.info('Container meta exists')

        self.obj_metakey = 'metakey%s' % random.randint(1000, 9999)
        obj = 'object_with_meta'
        pithos.create_object(obj)
        pithos.set_object_meta(obj, {self.obj_metakey: 'our value'})
        r = pithos.get_container_object_meta()
        self.assertIn('x-container-object-meta', r)
        self.assertIn(
            self.obj_metakey, r['x-container-object-meta'].lower())
        self.info('Container object meta exists')

    def test_025_container_get(self):
        """Test container GET"""
        pithos = self.clients.pithos

        r = pithos.container_get()
        self.assertEqual(r.status_code, 200)
        self.info('Status code is OK')

        fullLen = len(r.json)
        self.assertGreater(fullLen, 0)
        self.info('There are enough (%s) containers' % fullLen)

        obj1 = 'test%s' % random.randint(1000, 9999)
        pithos.create_object(obj1)
        obj2 = 'test%s' % random.randint(1000, 9999)
        pithos.create_object(obj2)
        obj3 = 'another%s.test' % random.randint(1000, 9999)
        pithos.create_object(obj3)

        r = pithos.container_get(prefix='test')
        self.assertTrue(len(r.json) > 1)
        test_objects = [o for o in r.json if o['name'].startswith('test')]
        self.assertEqual(len(r.json), len(test_objects))
        self.info('Prefix is OK')

        r = pithos.container_get(limit=1)
        self.assertEqual(len(r.json), 1)
        self.info('Limit is OK')

        r = pithos.container_get(marker=obj3[:-5])
        self.assertTrue(len(r.json) > 1)
        aoobjects = [obj for obj in r.json if obj['name'] > obj3[:-5]]
        self.assertEqual(len(r.json), len(aoobjects))
        self.info('Marker is OK')

        r = pithos.container_get(prefix=obj3, delimiter='.')
        self.assertTrue(fullLen > len(r.json))
        self.info('Delimiter is OK')

        r = pithos.container_get(path='/')
        fullLen += 3
        self.assertEqual(fullLen, len(r.json))
        self.info('Path is OK')

        r = pithos.container_get(format='xml')
        self.assertEqual(r.text.split()[4], 'name="' + pithos.container + '">')
        self.info('Format is OK')

        r = pithos.container_get(meta=[self.obj_metakey, ])
        self.assertTrue(len(r.json) > 0)
        self.info('Meta is OK')

        r = pithos.container_get(show_only_shared=True)
        self.assertTrue(len(r.json) < fullLen)
        self.info('Show-only-shared is OK')

        try:
            r = pithos.container_get(until=1000000000)
            datestring = unicode(r.headers['x-account-until-timestamp'])
            self.assertEqual(u'Sun, 09 Sep 2001 01:46:40 GMT', datestring)
            self.info('Until is OK')
        except ClientError:
            pass

    def test_030_container_put(self):
        """Test container PUT"""
        pithos = self.clients.pithos
        pithos.container = 'cont%s%s' % (
            self.run_id or 0, random.randint(1000, 9999))
        self.temp_containers.append(pithos.container)

        r = pithos.create_container()
        self.assertTrue(isinstance(r, dict))

        r = pithos.get_container_limit(pithos.container)
        cquota = r.values()[0]
        newquota = 2 * int(cquota)
        self.info('Limit is OK')
        pithos.del_container()

        r = pithos.create_container(sizelimit=newquota)
        self.assertTrue(isinstance(r, dict))

        r = pithos.get_container_limit(pithos.container)
        xquota = int(r.values()[0])
        self.assertEqual(newquota, xquota)
        self.info('Can set container limit')
        pithos.del_container()

        r = pithos.create_container(versioning='auto')
        self.assertTrue(isinstance(r, dict))

        r = pithos.get_container_versioning(pithos.container)
        nvers = r.values()[0]
        self.assertEqual('auto', nvers)
        self.info('Versioning=auto is OK')
        pithos.del_container()

        r = pithos.container_put(versioning='none')
        self.assertEqual(r.status_code, 201)

        r = pithos.get_container_versioning(pithos.container)
        nvers = r.values()[0]
        self.assertEqual('none', nvers)
        self.info('Versioning=none is OK')
        pithos.del_container()

        r = pithos.create_container(metadata={'m1': 'v1', 'm2': 'v2'})
        self.assertTrue(isinstance(r, dict))

        r = pithos.get_container_meta(pithos.container)
        self.assertTrue('x-container-meta-m1' in r)
        self.assertEqual(r['x-container-meta-m1'], 'v1')
        self.assertTrue('x-container-meta-m2' in r)
        self.assertEqual(r['x-container-meta-m2'], 'v2')

        r = pithos.container_put(metadata={'m1': '', 'm2': 'v2a'})
        self.assertEqual(r.status_code, 202)

        r = pithos.get_container_meta(pithos.container)
        self.assertTrue('x-container-meta-m1' not in r)
        self.assertTrue('x-container-meta-m2' in r)
        self.assertEqual(r['x-container-meta-m2'], 'v2a')
        self.info('Container meta is OK')

        pithos.del_container_meta(pithos.container)

    def test_035_container_post(self):
        """Test container POST"""
        pithos = self.clients.pithos

        r = pithos.container_post()
        self.assertEqual(r.status_code, 202)
        self.info('Status is OK')

        pithos.set_container_meta({'m1': 'v1', 'm2': 'v2'})
        r = pithos.get_container_meta(pithos.container)
        self.assertTrue('x-container-meta-m1' in r)
        self.assertEqual(r['x-container-meta-m1'], 'v1')
        self.assertTrue('x-container-meta-m2' in r)
        self.assertEqual(r['x-container-meta-m2'], 'v2')
        self.info('Set metadata works')

        r = pithos.del_container_meta('m1')
        r = pithos.set_container_meta({'m2': 'v2a'})
        r = pithos.get_container_meta(pithos.container)
        self.assertTrue('x-container-meta-m1' not in r)
        self.assertTrue('x-container-meta-m2' in r)
        self.assertEqual(r['x-container-meta-m2'], 'v2a')
        self.info('Delete metadata works')

        r = pithos.get_container_limit(pithos.container)
        cquota = r.values()[0]
        newquota = 2 * int(cquota)
        r = pithos.set_container_limit(newquota)
        r = pithos.get_container_limit(pithos.container)
        xquota = int(r.values()[0])
        self.assertEqual(newquota, xquota)
        self.info('Set quota works')

        pithos.set_container_versioning('auto')
        r = pithos.get_container_versioning(pithos.container)
        nvers = r.values()[0]
        self.assertEqual('auto', nvers)
        pithos.set_container_versioning('none')
        r = pithos.get_container_versioning(pithos.container)
        nvers = r.values()[0]
        self.assertEqual('none', nvers)
        self.info('Set versioning works')

        f = self._create_large_file(1024 * 1024 * 100)
        self.large_file = f
        self.info('Created file %s of 100 MB' % f.name)

        pithos.create_directory('dir')
        self.info('Upload the file ...')
        r = pithos.upload_object('/dir/sample.file', f)
        for term in ('content-length', 'content-type', 'x-object-version'):
            self.assertTrue(term in r)
        r = pithos.get_object_info('/dir/sample.file')
        self.assertTrue(int(r['content-length']) > 100000000)
        self.info('Made remote directory /dir and object /dir/sample.file')

        """What is tranfer_encoding? What should I check about it? """
        #TODO

        obj = 'object_with_meta'
        pithos.container = self.temp_containers[-2]
        r = pithos.object_post(
            obj, update='False', metadata={'newmeta': 'newval'})

        r = pithos.get_object_info(obj)
        self.assertTrue('x-object-meta-newmeta' in r)
        self.assertFalse('x-object-meta-%s' % self.obj_metakey not in r)
        self.info('Metadata with update=False works')

    def test_040_container_delete(self):
        """Test container DELETE"""
        pithos = self.clients.pithos

        r = pithos.container_delete(success=409)
        self.assertEqual(r.status_code, 409)
        self.assertRaises(ClientError, pithos.container_delete)
        self.info('Successfully failed to delete non-empty container')

        r = pithos.container_delete(until='1000000000')
        self.assertEqual(r.status_code, 204)
        self.info('Successfully failed to delete old-timestamped container')

        obj_names = [o['name'] for o in pithos.container_get().json]
        pithos.del_container(delimiter='/')
        r = pithos.container_get()
        self.assertEqual(len(r.json), 0)
        self.info('Successfully emptied container')

        for obj in obj_names:
            r = pithos.get_object_versionlist(obj)
            self.assertTrue(len(r) > 0)
        self.info('Versions are still there')

        pithos.purge_container()
        for obj in obj_names:
            self.assertRaises(ClientError, pithos.get_object_versionlist, obj)
        self.info('Successfully purged container')

        self.temp_containers.remove(pithos.container)
        pithos.container = self.temp_containers[-1]

    def test_045_object_head(self):
        """Test object HEAD"""
        pithos = self.clients.pithos

        obj = 'dir/sample.file'
        r = pithos.object_head(obj)
        self.assertEqual(r.status_code, 200)
        self.info('Status code is OK')
        etag = r.headers['etag']
        real_version = r.headers['x-object-version']

        self.assertRaises(ClientError, pithos.object_head, obj, version=-10)
        r = pithos.object_head(obj, version=real_version)
        self.assertEqual(r.headers['x-object-version'], real_version)
        self.info('Version works')

        r = pithos.object_head(obj, if_etag_match=etag)
        self.assertEqual(r.status_code, 200)
        self.info('if-etag-match is OK')

        r = pithos.object_head(
            obj, if_etag_not_match=etag, success=(200, 412, 304))
        self.assertNotEqual(r.status_code, 200)
        self.info('if-etag-not-match works')

        r = pithos.object_head(
            obj, version=real_version, if_etag_match=etag, success=200)
        self.assertEqual(r.status_code, 200)
        self.info('Version with if-etag-match works')

        for format in pithos.DATE_FORMATS:
            now_formated = self.now_unformated.strftime(format)
            r1 = pithos.object_head(
                obj, if_modified_since=now_formated, success=(200, 304, 412))
            r2 = pithos.object_head(
                obj, if_unmodified_since=now_formated, success=(200, 304, 412))
            self.assertNotEqual(r1.status_code, r2.status_code)
        self.info('if-(un)modified-since works')

    def test_050_object_get(self):
        """Test object GET"""
        pithos = self.clients.pithos
        obj = 'dir/sample.file'

        r = pithos.object_get(obj)
        self.assertEqual(r.status_code, 200)
        self.info('Status code is OK')

        osize = int(r.headers['content-length'])
        etag = r.headers['etag']

        r = pithos.object_get(obj, hashmap=True)
        self.assertEqual(
            set(('hashes', 'block_size', 'block_hash', 'bytes')), set(r.json))
        self.info('Hashmap works')
        hash0 = r.json['hashes'][0]

        r = pithos.object_get(obj, format='xml', hashmap=True)
        self.assertTrue(r.text.split('hash>')[1].startswith(hash0))
        self.info('Hashmap with XML format works')

        rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2)
        r = pithos.object_get(obj, data_range=rangestr, success=(200, 206))
        partsize = int(r.headers['content-length'])
        self.assertTrue(0 < partsize and partsize <= 1 + osize / 3)
        self.info('Range x-y works')
        orig = r.text

        rangestr = 'bytes=%s' % (osize / 3)
        r = pithos.object_get(
            obj, data_range=rangestr, if_range=True, success=(200, 206))
        partsize = int(r.headers['content-length'])
        self.assertTrue(partsize, 1 + (osize / 3))
        diff = set(r.text).symmetric_difference(set(orig[:partsize]))
        self.assertEqual(len(diff), 0)
        self.info('Range x works')

        rangestr = 'bytes=-%s' % (osize / 3)
        r = pithos.object_get(
            obj, data_range=rangestr, if_range=True, success=(200, 206))
        partsize = int(r.headers['content-length'])
        self.assertTrue(partsize, osize / 3)
        diff = set(r.text).symmetric_difference(set(orig[-partsize:]))
        self.assertEqual(len(diff), 0)
        self.info('Range -x works')

        r = pithos.object_get(obj, if_etag_match=etag)
        self.assertEqual(r.status_code, 200)
        self.info('if-etag-match works')

        r = pithos.object_get(obj, if_etag_not_match=etag + 'LALALA')
        self.assertEqual(r.status_code, 200)
        self.info('if-etag-not-match works')

        for format in pithos.DATE_FORMATS:
            now_formated = self.now_unformated.strftime(format)
            r1 = pithos.object_get(
                obj, if_modified_since=now_formated, success=(200, 304, 412))
            r2 = pithos.object_get(
                obj, if_unmodified_since=now_formated, success=(200, 304, 412))
            self.assertNotEqual(r1.status_code, r2.status_code)
        self.info('if(un)modified-since works')

        obj, dnl_f = 'dir/sample.file', NamedTemporaryFile()
        self.info('Download %s as %s ...' % (obj, dnl_f.name))
        pithos.download_object(obj, dnl_f)
        self.info('Download is completed')

        f_size = len(orig)
        for pos in (0, f_size / 2, f_size - 128):
            dnl_f.seek(pos)
            self.large_file.seek(pos)
            self.assertEqual(self.large_file.read(64), dnl_f.read(64))
        self.info('Sampling shows that files match')

        self.info('Create a boring file of 42 blocks...')
        bor_f = self._create_boring_file(42)
        trg_fname = 'dir/uploaded.file'
        self.info('Now, upload the boring file as %s...' % trg_fname)
        pithos.upload_object(trg_fname, bor_f)
        self.info('Boring file %s is uploaded as %s' % (bor_f.name, trg_fname))
        dnl_f = NamedTemporaryFile()
        self.info('Download boring file as %s' % dnl_f.name)
        pithos.download_object(trg_fname, dnl_f)
        self.info('File is downloaded')

        for i in range(42):
            self.assertEqual(sample_block(bor_f, i), sample_block(dnl_f, i))

    def test_055_object_put(self):
        """Test object PUT"""
        pithos = self.clients.pithos
        obj = 'sample.file'

        pithos.create_object(obj + '.FAKE')
        r = pithos.get_object_info(obj + '.FAKE')
        self.assertEqual(
            set(r['content-type']), set('application/octer-stream'))
        self.info('Simple call creates a new object correctly')

        r = pithos.object_put(
            obj,
            data='a',
            content_type='application/octer-stream',
            permissions=dict(
                read=['accX:groupA', 'u1', 'u2'],
                write=['u2', 'u3']),
            metadata=dict(key1='val1', key2='val2'),
            content_encoding='UTF-8',
            content_disposition='attachment; filename="fname.ext"')
        self.assertEqual(r.status_code, 201)
        self.info('Status code is OK')
        etag = r.headers['etag']

        r = pithos.get_object_info(obj)
        self.assertTrue('content-disposition' in r)
        self.assertEqual(
            r['content-disposition'], 'attachment; filename="fname.ext"')
        self.info('Content-disposition is OK')

        sharing = r['x-object-sharing'].split('; ')
        self.assertTrue(sharing[0].startswith('read='))
        read = set(sharing[0][5:].split(','))
        self.assertEqual(set(('u1', 'accx:groupa')), read)
        self.assertTrue(sharing[1].startswith('write='))
        write = set(sharing[1][6:].split(','))
        self.assertEqual(set(('u2', 'u3')), write)
        self.info('Permissions are OK')

        r = pithos.get_object_meta(obj)
        self.assertEqual(r['x-object-meta-key1'], 'val1')
        self.assertEqual(r['x-object-meta-key2'], 'val2')
        self.info('Meta are OK')

        pithos.object_put(
            obj,
            if_etag_match=etag,
            data='b',
            content_type='application/octet-stream',
            public=True)
        self.info('If-etag-match is OK')

        r = pithos.object_get(obj)
        self.assertTrue('x-object-public' in r.headers)
        self.info('Publishing works')

        etag = r.headers['etag']
        self.assertEqual(r.text, 'b')
        self.info('Remote object content is correct')

        r = pithos.object_put(
            obj,
            if_etag_not_match=etag,
            data='c',
            content_type='application/octet-stream',
            success=(201, 412))
        self.assertEqual(r.status_code, 412)
        self.info('If-etag-not-match is OK')

        r = pithos.get_object_info('dir')
        self.assertEqual(r['content-type'], 'application/directory')
        self.info('Directory has been created correctly')

        r = pithos.object_put(
            '%s_v2' % obj,
            format=None,
            copy_from='/%s/%s' % (pithos.container, obj),
            content_encoding='application/octet-stream',
            source_account=pithos.account,
            content_length=0,
            success=201)
        self.assertEqual(r.status_code, 201)
        r1 = pithos.get_object_info(obj)
        r2 = pithos.get_object_info('%s_v2' % obj)
        self.assertEqual(r1['x-object-hash'], r2['x-object-hash'])
        self.info('Object has being copied in same container, OK')

        pithos.copy_object(
            src_container=pithos.container,
            src_object=obj,
            dst_container=self.temp_containers[-2],
            dst_object='%s_new' % obj)
        pithos.container = self.temp_containers[-2]
        r1 = pithos.get_object_info('%s_new' % obj)
        pithos.container = self.temp_containers[-1]
        r2 = pithos.get_object_info(obj)
        self.assertEqual(r1['x-object-hash'], r2['x-object-hash'])
        self.info('Object has being copied in another container, OK')

        fromstr = '/%s/%s_new' % (self.temp_containers[-2], obj)
        r = pithos.object_put(
            obj,
            format=None,
            copy_from=fromstr,
            content_encoding='application/octet-stream',
            source_account=pithos.account,
            content_length=0,
            success=201)
        self.assertEqual(r.status_code, 201)
        self.info('Cross container put accepts content_encoding')

        r = pithos.get_object_info(obj)
        self.assertEqual(r['etag'], etag)
        self.info('Etag is OK')

        r = pithos.object_put(
            '%s_v3' % obj,
            format=None,
            move_from=fromstr,
            content_encoding='application/octet-stream',
            source_account='nonExistendAddress@NeverLand.com',
            content_length=0,
            success=(403, ))
        self.info('Fake source account is handled correctly')

        r1 = pithos.get_object_info(obj)
        pithos.container = self.temp_containers[-2]
        pithos.move_object(
            src_container=self.temp_containers[-1],
            src_object=obj,
            dst_container=pithos.container,
            dst_object=obj + '_new')
        r0 = pithos.get_object_info(obj + '_new')
        self.assertEqual(r1['x-object-hash'], r0['x-object-hash'])
        self.info('Cross container move is OK')

        pithos.container = self.temp_containers[-1]
        pithos.create_container(versioning='auto')
        pithos.upload_from_string(obj, 'first version')
        source_hashmap = pithos.get_object_hashmap(obj)['hashes']
        pithos.upload_from_string(obj, 'second version')
        pithos.upload_from_string(obj, 'third version')
        versions = pithos.get_object_versionlist(obj)
        self.assertEqual(len(versions), 3)
        vers0 = versions[0][0]

        pithos.container = self.temp_containers[-2]
        pithos.object_put(
            obj,
            format=None,
            move_from='/%s/%s' % (self.temp_containers[-1], obj),
            source_version=vers0,
            content_encoding='application/octet-stream',
            content_length=0, success=201)
        target_hashmap = pithos.get_object_hashmap(obj)['hashes']
        self.info('Source-version is probably not OK (Check bug #4963)')
        source_hashmap, target_hashmap = source_hashmap, target_hashmap
        #  Comment out until it's fixed
        #  self.assertEqual(source_hashmap, target_hashmap)
        #  self.info('Source-version is OK')

        mobj = 'manifest.test'
        txt = ''
        for i in range(10):
            txt += '%s' % i
            pithos.object_put(
                '%s/%s' % (mobj, i),
                data='%s' % i,
                content_length=1,
                success=201,
                content_type='application/octet-stream',
                content_encoding='application/octet-stream')
        pithos.object_put(
            mobj,
            content_length=0,
            content_type='application/octet-stream',
            manifest='%s/%s' % (pithos.container, mobj))
        r = pithos.object_get(mobj)
        self.assertEqual(r.text, txt)
        self.info('Manifest file creation works')

        f = self._create_large_file(1024 * 10)
        pithos.upload_object('sample.file', f)
        r = pithos.get_object_info('sample.file')
        self.assertEqual(int(r['content-length']), 10240)
        self.info('Overwrite is OK')

        """MISSING: test transfer-encoding?"""

    def test_060_object_copy(self):
        pithos = self.clients.pithos
        obj, trg = 'source.file2copy', 'copied.file'
        data = '{"key1":"val1", "key2":"val2"}'

        r = pithos.object_put(
            obj,
            content_type='application/octet-stream',
            data=data,
            metadata=dict(mkey1='mval1', mkey2='mval2'),
            permissions=dict(
                read=['accX:groupA', 'u1', 'u2'],
                write=['u2', 'u3']),
            content_disposition='attachment; filename="fname.ext"')
        self.info('Prepared a file /%s/%s' % (pithos.container, obj))

        r = pithos.object_copy(
            obj,
            destination='/%s/%s' % (pithos.container, trg),
            ignore_content_type=False, content_type='application/json',
            metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'},
            permissions={'write': ['u5', 'accX:groupB']})
        self.assertEqual(r.status_code, 201)
        self.info('Status code is OK')

        r = pithos.get_object_info(trg)
        self.assertTrue('content-disposition' in r)
        self.info('Content-disposition is OK')

        self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
        self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
        self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
        self.info('Metadata are OK')

        r = pithos.get_object_sharing(trg)
        self.assertFalse('read' in r or 'u2' in r['write'])
        self.assertTrue('accx:groupb' in r['write'])
        self.info('Sharing is OK')

        r = pithos.object_copy(
            obj,
            destination='/%s/%s' % (pithos.container, obj),
            content_encoding='utf8',
            content_type='application/json',
            destination_account='nonExistendAddress@NeverLand.com',
            success=(201, 404))
        self.assertEqual(r.status_code, 404)
        self.info('Non existing UUID correctly causes a 404')

        r = pithos.object_copy(
            obj,
            destination='/%s/%s' % (self.temp_containers[-1], obj),
            content_encoding='utf8',
            content_type='application/json')
        self.assertEqual(r.status_code, 201)
        self.assertEqual(
            r.headers['content-type'],
            'application/json; charset=UTF-8')

        pithos.container = self.temp_containers[-1]
        r = pithos.object_get(obj)
        etag = r.headers['etag']
        ctype = r.headers['content-type']
        self.assertEqual(ctype, 'application/json')
        self.info('Cross container copy w. content-type/encoding is OK')

        r = pithos.object_copy(
            obj,
            destination='/%s/%s0' % (pithos.container, obj),
            ignore_content_type=True,
            content_type='text/x-python')
        self.assertEqual(r.status_code, 201)
        self.assertNotEqual(r.headers['content-type'], 'application/json')
        r = pithos.object_get(obj + '0')
        self.assertNotEqual(r.headers['content-type'], 'text/x-python')

        r = pithos.object_copy(
            obj,
            destination='/%s/%s1' % (pithos.container, obj),
            if_etag_match=etag)
        self.assertEqual(r.status_code, 201)
        self.info('if-etag-match is OK')

        r = pithos.object_copy(
            obj,
            destination='/%s/%s2' % (pithos.container, obj),
            if_etag_not_match='lalala')
        self.assertEqual(r.status_code, 201)
        self.info('if-etag-not-match is OK')

        r = pithos.object_copy(
            '%s2' % obj,
            destination='/%s/%s3' % (pithos.container, obj),
            format='xml',
            public=True)
        self.assertEqual(r.status_code, 201)
        self.assertTrue('xml' in r.headers['content-type'])

        r = pithos.get_object_info(obj + '3')
        self.assertTrue('x-object-public' in r)
        self.info('Publish, format and source-version are OK')

    def test_065_object_move(self):
        """Test object MOVE"""
        pithos = self.clients.pithos
        obj = 'source.file2move'
        data = '{"key1": "val1", "key2": "val2"}'

        r = pithos.object_put(
            obj,
            content_type='application/octet-stream',
            data=data,
            metadata=dict(mkey1='mval1', mkey2='mval2'),
            permissions=dict(
                read=['accX:groupA', 'u1', 'u2'],
                write=['u2', 'u3']))
        self.info('Prepared a file /%s/%s' % (pithos.container, obj))

        r = pithos.object_move(
            obj,
            destination='/%s/%s0' % (pithos.container, obj),
            ignore_content_type=False, content_type='application/json',
            metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'},
            permissions={'write': ['u5', 'accX:groupB']})
        self.assertEqual(r.status_code, 201)
        self.info('Status code is OK')

        r = pithos.get_object_meta(obj + '0')
        self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
        self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
        self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
        self.info('Metadata are OK')

        r = pithos.get_object_sharing(obj + '0')
        self.assertFalse('read' in r)
        self.assertTrue('u5' in r['write'])
        self.assertTrue('accx:groupb' in r['write'])
        self.info('Sharing is OK')

        self.assertRaises(ClientError, pithos.get_object_info, obj)
        self.info('Old object is not there, which is OK')

        r = pithos.object_move(
            obj + '0',
            destination='/%s/%s' % (pithos.container, obj),
            content_encoding='utf8',
            content_type='application/json',
            destination_account='nonExistendAddress@NeverLand.com',
            success=(201, 404))
        self.assertEqual(r.status_code, 404)
        self.info('Non existing UUID correctly causes a 404')

        r = pithos.object_move(
            obj + '0',
            destination='/%s/%s' % (self.temp_containers[-2], obj),
            content_encoding='utf8',
            content_type='application/json',
            content_disposition='attachment; filename="fname.ext"')
        self.assertEqual(r.status_code, 201)
        self.assertEqual(
            r.headers['content-type'],
            'application/json; charset=UTF-8')

        pithos.container = self.temp_containers[-2]
        r = pithos.object_get(obj)
        etag = r.headers['etag']
        ctype = r.headers['content-type']
        self.assertEqual(ctype, 'application/json')
        self.assertTrue('fname.ext' in r.headers['content-disposition'])
        self.info('Cross container copy w. content-type/encoding is OK')

        r = pithos.object_move(
            obj,
            destination='/%s/%s0' % (pithos.container, obj),
            ignore_content_type=True,
            content_type='text/x-python')
        self.assertEqual(r.status_code, 201)
        self.assertNotEqual(r.headers['content-type'], 'application/json')
        r = pithos.object_get(obj + '0')
        self.assertNotEqual(r.headers['content-type'], 'text/x-python')

        r = pithos.object_move(
            obj + '0',
            destination='/%s/%s' % (pithos.container, obj),
            if_etag_match=etag)
        self.assertEqual(r.status_code, 201)
        self.info('if-etag-match is OK')

        r = pithos.object_move(
            obj,
            destination='/%s/%s0' % (pithos.container, obj),
            if_etag_not_match='lalala')
        self.assertEqual(r.status_code, 201)
        self.info('if-etag-not-match is OK')

        r = pithos.object_move(
            obj + '0',
            destination='/%s/%s' % (pithos.container, obj),
            format='xml',
            public=True)
        self.assertEqual(r.status_code, 201)
        self.assertTrue('xml' in r.headers['content-type'])

        r = pithos.get_object_info(obj)
        self.assertTrue('x-object-public' in r)
        self.info('Publish, format and source-version are OK')

    def test_070_object_post(self):
        """Test object POST"""
        pithos = self.clients.pithos
        obj = 'sample2post.file'
        newf = NamedTemporaryFile()
        newf.writelines([
            'ello!\n',
            'This is a test line\n',
            'inside a test file\n'])
        newf.flush()

        r = pithos.object_put(
            obj,
            content_type='application/octet-stream',
            data='H',
            metadata=dict(mkey1='mval1', mkey2='mval2'),
            permissions=dict(
                read=['accX:groupA', 'u1', 'u2'],
                write=['u2', 'u3']))
        self.info(
            'Prepared a local file %s & a remote object %s', newf.name, obj)

        newf.seek(0)
        pithos.append_object(obj, newf)
        r = pithos.object_get(obj)
        self.assertEqual(r.text[:5], 'Hello')
        self.info('Append is OK')

        newf.seek(0)
        # r = pithos.overwrite_object(obj, 0, 10, newf, 'text/x-python')
        # r = pithos.object_get(obj)
        # print r.text, r.headers
        # self.assertTrue(r.text.startswith('ello!'))
        # self.assertEqual(r.headers['content-type'], 'text/x-python')
        # self.info('Overwrite (involves content-legth/range/type) is OK')
        self.info('ATTENTION: Overwrite is probably NOT OK')
        #  This is just to mock the effects of the commented action
        pithos.object_delete(obj)
        pithos.upload_object(obj, newf, content_type='text/x-python')
        r = pithos.object_post(
            obj,
            update=True,
            content_type='text/x-python',
            metadata=dict(mkey1='mval1', mkey2='mval2'),
            permissions=dict(
                read=['accX:groupA', 'u1', 'u2'],
                write=['u2', 'u3']))
        #  ---

        r = pithos.truncate_object(obj, 5)
        r = pithos.object_get(obj)
        self.assertEqual(r.text, 'ello!')
        self.assertEqual(r.headers['content-type'], 'text/x-python')
        self.info(
            'Truncate (involves content-range, object-bytes and source-object)'
            ' is OK')

        pithos.set_object_meta(obj, {'mkey2': 'mval2a', 'mkey3': 'mval3'})
        r = pithos.get_object_meta(obj)
        self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
        self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
        self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
        pithos.del_object_meta(obj, 'mkey1')
        r = pithos.get_object_meta(obj)
        self.assertFalse('x-object-meta-mkey1' in r)
        self.info('Metadata are OK')

        pithos.set_object_sharing(
            obj, read_permission=['u4', 'u5'], write_permission=['u4'])
        r = pithos.get_object_sharing(obj)
        self.assertTrue('read' in r)
        self.assertTrue('u5' in r['read'])
        self.assertTrue('write' in r)
        self.assertTrue('u4' in r['write'])
        pithos.del_object_sharing(obj)
        r = pithos.get_object_sharing(obj)
        self.assertTrue(len(r) == 0)
        self.info('Sharing is OK')

        pithos.publish_object(obj)
        r = pithos.get_object_info(obj)
        self.assertTrue('x-object-public' in r)
        pithos.unpublish_object(obj)
        r = pithos.get_object_info(obj)
        self.assertFalse('x-object-public' in r)
        self.info('Publishing is OK')

        etag = r['etag']
        r = pithos.object_post(
            obj,
            update=True,
            public=True,
            if_etag_not_match=etag,
            success=(412, 202, 204))
        self.assertEqual(r.status_code, 412)
        self.info('if-etag-not-match is OK')

        r = pithos.object_post(
            obj,
            update=True,
            public=True,
            if_etag_match=etag,
            content_type='application/octet-srteam',
            content_encoding='application/json')

        r = pithos.get_object_info(obj)
        helloVersion = r['x-object-version']
        self.assertTrue('x-object-public' in r)
        #self.assertEqual(r['content-type'], 'application/octet-srteam')
        #self.info('If-etag-match is OK')
        self.info('If-etag-match is probably not OK')

        pithos.container = self.temp_containers[-1]
        pithos.create_object(obj)
        r = pithos.object_post(
            obj,
            update=True,
            content_type='application/octet-srteam',
            content_length=5,
            content_range='bytes 1-5/*',
            source_object='/%s/%s' % (self.temp_containers[-2], obj),
            source_account='thisAccountWillNeverExist@adminland.com',
            source_version=helloVersion,
            data='12345',
            success=(403, 202, 204))
        self.assertEqual(r.status_code, 403)
        self.info('Successfully failed with invalud user UUID')

        r = pithos.object_post(
            obj,
            update=True,
            content_type='application/octet-srteam',
            content_length=5,
            content_range='bytes 1-5/*',
            source_object='/%s/%s' % (self.temp_containers[-1], obj),
            source_account=pithos.account,
            source_version=helloVersion,
            data='12345',
            content_disposition='attachment; filename="fname.ext"')

        r = pithos.object_get(obj)
        self.assertEqual(r.text, 'eello!')
        self.info('Cross container POST with source-version/account are OK')

        self.assertTrue('content-disposition' in r.headers)
        self.assertTrue('fname.ext' in r.headers['content-disposition'])
        self.info('Content-disposition POST is OK')

        mobj = 'manifest.test'
        txt = ''
        for i in range(10):
            txt += '%s' % i
            r = pithos.object_put(
                '%s/%s' % (mobj, i),
                data='%s' % i,
                content_length=1,
                success=201,
                content_encoding='application/octet-stream',
                content_type='application/octet-stream')

        pithos.create_object_by_manifestation(
            mobj, content_type='application/octet-stream')

        r = pithos.object_post(
            mobj, manifest='%s/%s' % (pithos.container, mobj))

        r = pithos.object_get(mobj)
        self.assertEqual(r.text, txt)
        self.info('Manifestation is OK')

        """We need to check transfer_encoding """

    @classmethod
    def tearDownClass(cls):  # noqa
        """Clean up"""
        from kamaki.cli.logger import deactivate
        deactivate('kamaki.clients.send')
        deactivate('kamaki.clients.recv')
        pithos = cls.clients.pithos
        for c in getattr(cls, 'temp_containers', []):
            pithos.container = c
            try:
                pithos.del_container(delimiter='/')
                pithos.purge_container(c)
            except ClientError as ce:
                print ('Failed to destroy container (%s)' % ce)
