Statistics
| Branch: | Tag: | Revision:

root / pithos / lib / hashfiler / filer.py @ 5bd53e3b

History | View | Annotate | Download (9.4 kB)

1
from sqlite3 import connect
2
from os import makedirs
3
from os.path import realpath, dirname, exists, isdir
4
from pithos.lib.hashfiler import (Blocker, Mapper, Noder, AccessController,
5
                                  inf, SERIAL, PARENT, PATH, SIZE,
6
                                  POPULATION, POPSIZE, SOURCE, MTIME, CLUSTER)
7

    
8
def packversion(serial, mtime):
9
    """Return a version string combining serial and mtime"""
10
    return "%x:%.8f" % (serial, mtime)
11

    
12
def unpackversion(strver):
13
    """Return a serial, mtime tuple from a combined version string"""
14
    s, t = strver.split(':')
15
    return int(s, 16), float(t)
16

    
17
class Filer(Noder, AccessController, Mapper, Blocker):
18
    version = None
19

    
20
    def __init__(self, **params):
21
        filepath = realpath(params['filepath'])
22
        basepath = dirname(filepath)
23
        if not exists(basepath):
24
            makedirs(basepath)
25
        elif not isdir(basepath):
26
            raise ValueError("%s is not a directory" % (basepath,))
27

    
28
        conn = connect(filepath, check_same_thread=0)
29
        cur = conn.cursor()
30
        self.filepath = filepath
31
        self.conn = conn
32
        self.cur = cur
33
        self.execute = cur.execute
34
        self.executemany = cur.executemany
35
        self.fetchone = cur.fetchone
36
        self.fetchall = cur.fetchall
37

    
38
        self.serial = 0
39
        self.limit_serial = 0
40
        self.serial_step = 256
41

    
42
        self.filer_init()
43

    
44
        params['connection'] = conn
45
        params['cursor'] = cur
46
        Blocker.__init__(self, **params)
47
        params['namelen'] = self.hashlen
48
        Noder.__init__(self, **params)
49
        AccessController.__init__(self, **params)
50
        Mapper.__init__(self, **params)
51

    
52

    
53
    def filer_init(self):
54
        execute = self.execute
55

    
56
        execute(""" create table if not exists conf
57
                          ( param text,
58
                            value text,
59
                            primary key(param) )""")
60

    
61
        execute(""" select value from conf where param = 'serial' """)
62
        r = self.fetchone()
63
        if r is None:
64
            execute(""" insert into conf values('serial', 100) """)
65

    
66
    def alloc_serial(self, nr=1):
67
        """Allocate global serial identifiers.
68
           Pre-allocate a buffer of identifiers locally,
69
           to reduce load to the database.
70
        """
71
        serial = self.serial + nr
72
        if serial >= self.limit_serial:
73
            params = self.params
74
            db_limit_serial = int(self.get_parameter('serial'), 16)
75
            limit_serial = db_limit_serial + self.serial_step
76
            self.set_parameter('serial', hex(limit_serial))
77
            self.limit_serial = limit_serial
78
            serial = db_limit_serial
79
        self.serial = serial
80
        return serial
81

    
82
    def quit(self):
83
        return
84

    
85
    def begin(self):
86
        """Commits the current transaction (if any)
87
           and starts a new one.
88
        """
89
        self.cur.execute("begin deferred")
90

    
91
    def commit(self):
92
        """Commit the current transaction (if any)
93
           but do not start a new one.
94
        """
95
        self.conn.commit()
96

    
97
    def rollback(self):
98
        """Abort the current transaction and undo the changes it made."""
99
        self.conn.rollback()
100

    
101
    def __enter__(self):
102
        self.begin()
103
        return self
104

    
105
    def __exit__(self, exc, val, trace):
106
        if not exc:
107
            self.commit()
108
        else:
109
            self.rollback()
110

    
111
    def get_parameter(self, name):
112
        c = self.cur.execute("select value from conf where param = ?", (name,))
113
        r = c.fetchone()
114
        if r is None:
115
            return r
116
        return r[0]
117

    
118
    def set_parameter(self, name, value):
119
        q = "insert or replace into conf values (?, ?)"
120
        self.cur.execute(q, (name, value))
121

    
122
    def list_parameters(self):
123
        c = self.cur.execute("select * from conf")
124
        return c.fetchall()
125

    
126
    def file_create(self, parent, path, size, cluster=0):
127
        """Create a new file on path in parent's namespace with the given size.
128
           Return the (serial, timestamp) of the created file.
129
        """
130
        return self.node_create(parent, path, size, None, cluster=cluster)
131

    
132
    def file_list(self, parent, prefix='',
133
                  start='', delimiter=None,
134
                  after=0.0, before=inf,
135
                  filterq=None, versions=0,
136
                  cluster=0, limit=10000):
137
        return self.node_list(parent, prefix=prefix,
138
                              start=start, delimiter=delimiter,
139
                              after=after, before=before,
140
                              filterq=filterq, versions=versions,
141
                              cluster=cluster, limit=limit)
142

    
143
    def file_get_properties(self, serial):
144
        """Alias for node_get_properties()"""
145
        return self.node_get_properties(serial)
146

    
147
    def file_set_attributes(self, serial, items):
148
        """Alias for node_attr_set()."""
149
        return self.node_attr_set(serial, items)
150

    
151
    def file_increment(self, serial, size=None):
152
        """Alias for node_increment()."""
153
        return self.node_increment(serial, size=None)
154

    
155
    def file_list_attributes(self, parent):
156
        """List file attributes. Alias for node_attr_list_keys()."""
157
        return self.node_attr_list_keys(parent)
158

    
159
    def file_get_attributes(self, serial, keys=()):
160
        """Alias for node_attr_get()."""
161
        return self.node_attr_get(serial, keys=keys)
162

    
163
    def file_del_attributes(self, serial):
164
        """Alias for node_attr_del()."""
165
        return self.node_attr_del(serial)
166

    
167
    def file_remove(self, serial, recursive=0):
168
        """Remove the file given.
169
           If the file's namespace is not empty,
170
           remove nothing and return false.
171
           Return true if the file was removed.
172
        """
173
        r = self.node_remove(serial, recursive=recursive)
174
        if not r:
175
            return 0
176

    
177
        r = self.map_remv(serial)
178
        return 1
179

    
180
    def file_purge_cluster(self, parent, cluster=0):
181
        """Purge a whole cluster from the namespace of a node.
182
           Return the serials of the purged nodes.
183
        """
184
        purged = self.node_delete(parent, cluster=0)
185
        purged = [props[0] for props in purged]
186
        map_remv = self.map_remv
187
        for serial in purged:
188
            # XXX: victim caches?
189
            map_remv(serial)
190
        return purged
191

    
192
    def file_purge_path(self, parent, path, after=0, before=inf, cluster=0):
193
        """Purge all versions in (after, before)
194
           for the given parent and path and cluster.
195
           Return the list of serials deleted.
196
        """
197
        serials = self.node_purge(parent, path, after, cluster=cluster)
198
        map_remv = self.map_remv
199
        for serial in serials:
200
            map_remv(serial)
201
        return serials
202

    
203
    def file_lookup(self, parent, path, before=inf,
204
                    create=0, size=0, cluster=0):
205
        """Lookup the latest node in parent and path.
206
           If the object is not found and create is true,
207
           it is created with the given size.
208
           Also, accept a before time limit and a cluster.
209
        """
210
        props = self.node_lookup(parent, path, before=inf, cluster=cluster)
211
        if props or not create:
212
            return props
213

    
214
        source = None
215
        population = 0
216
        popsize = 0
217
        r = self.node_create(parent, path, size, source, cluster=cluster)
218
        serial, mtime = r
219
        return (serial, parent, path, size,
220
                population, popsize, source, mtime)
221

    
222
    def file_copy(self, source, parent=None, path=None):
223
        """Create a file as a copy (data and attributes) of another.
224
           If parent or path is not specified, it remains the same.
225
        """
226
        serial, mtime = self.node_copy(source, parent, path)
227
        self.node_attr_copy(source, serial)
228
        return serial, mtime
229

    
230
    def file_read(self, serial, nr, blkoff=0):
231
        """Read a number of blocks at block offset from a file."""
232
        hashes = self.map_retr(serial, blkoff=blkoff, nr=nr)
233
        blocks = self.block_retr(hashes)
234
        return blocks
235

    
236
    def file_write(self, serial, blocks, blkoff=0, fork=0):
237
        """Write blocks at block offset and create a new version.
238
           If fork is true, the old version is preserved.
239
        """
240
        blocksize = self.blocksize
241
        datasize = (len(blocks) -1) * blocksize + len(blocks[-1])
242
        props = self.node_get_properties(serial)
243
        oldsize = props[3]
244
        newsize = blkoff * blocksize + datasize
245

    
246
        if fork:
247
            serial, mtime = self.file_clone(serial, props[1], props[2])
248

    
249
        hashes, missing = self.block_stor(blocks)
250
        self.map_stor(serial, hashes, blkoff=blkoff)
251

    
252
        node_increment = self.node_increment
253
        if newsize > oldsize:
254
            mtime = node_increment(serial, newsize)
255
        elif not fork:
256
            mtime = node_increment(serial, oldsize)
257

    
258
        return serial, mtime, newsize, hashes, missing
259

    
260
    def file_move(self, serial, parent, path=None):
261
        """Move a file, possibly to another parent and path."""
262
        if path is None:
263
            props = self.node_get_properties(serial)
264
            path = props[PATH]
265

    
266
        return self.node_move(serial, parent, path)
267

    
268
    def file_recluster(self, serial, cluster):
269
        """Send file to the given cluster."""
270
        self.node_set_properties(serial, (('cluster', cluster),))
271

    
272

    
273
if __name__ == "__main__":
274

    
275
    hf = Filer( filepath=".hf-test/nodes.sqlite3",
276
                mappath=".hf-test/maps",
277
                blockpath=".hf-test/blocks",
278
                blocksize=64*1024,
279
                hashtype='sha256')
280

    
281
    #hf.quit()
282