Statistics
| Branch: | Tag: | Revision:

root / commissioning / physicals / fscrud / fscrud_server.py @ 9f1a1bd0

History | View | Annotate | Download (5 kB)

1
#!/usr/bin/env python
2

    
3
from json import dumps as json_dumps, loads as json_loads
4
from os.path import join as path_join, exists, isdir
5
from os import listdir, makedirs, unlink
6
from shutil import move
7

    
8
def ensure_directory(dirpath):
9
    if not isdir(dirpath):
10
        if exists(dirpath):
11
            m = ("path '%s' exists but is not a directory!"
12
                    % (dirpath,)  )
13
            raise ValueError(m)
14

    
15
        makedirs(dirpath)
16

    
17
    try:
18
        test_path = path_join(dirpath, "__test")
19
        with open(test_path, "w") as f:
20
            f.write("test")
21
    except (OSError, IOError), e:
22
        m = "cannot create files in directory '%s'" % (dirpath,)
23
        raise ValueError(m)
24

    
25
    unlink(test_path)
26
    if exists(test_path):
27
        m = "'%s' exists after unlink" % (test_path,)
28
        raise ValueError(m)
29

    
30

    
31
class FSCrudServer(object):
32

    
33
    def __init__(self, queuepath, dataroot):
34
        err_path = path_join(queuepath, 'errs')
35
        succeed_path =  path_join(queuepath, 'succeeds')
36
        ensure_directory(queuepath)
37
        ensure_directory(dataroot)
38
        ensure_directory(err_path)
39
        ensure_directory(succeed_path)
40

    
41
        self.queuepath = queuepath
42
        self.dataroot = dataroot
43
        self.err_path = err_path
44
        self.succeed_path = succeed_path
45

    
46
        from pyinotify import WatchManager, Notifier, ProcessEvent, IN_MOVED_TO
47
        watch_manager = WatchManager()
48

    
49
        class EventProcessor(ProcessEvent):
50

    
51
            def __init__(self, backend):
52
                self.backend = backend
53

    
54
            def process_IN_MOVED_TO(self, event):
55
                path = event.path
56
                backend = self.backend
57
                if path != backend.queuepath:
58
                    m = "notification for unknown directory '%s'!" % (path,)
59
                    raise AssertionError(m)
60

    
61
                jobname = event.name
62
                backend.runjob(jobname)
63

    
64
            def process_IN_Q_OVERFLOW(self, event):
65
                raise RuntimError("BOOM :(")
66

    
67
        event_processor = EventProcessor(self)
68
        notifier = Notifier(watch_manager, event_processor)
69
        watch_descriptor = watch_manager.add_watch( queuepath,
70
                                                    IN_MOVED_TO,
71
                                                    rec=False   )
72
        self.watch_manager = watch_manager
73
        self.event_processor = event_processor
74
        self.notifier = notifier
75
        self.watch_descriptor = watch_descriptor
76

    
77
    def process_events(self):
78
        notifier = self.notifier
79
        if notifier.check_events():
80
            notifier.read_events()
81

    
82
        notifier.process_events()
83

    
84
    def event_loop(self):
85
        while 1:
86
            notifier = self.notifier
87
            if notifier.check_events(timeout=100):
88
                notifier.read_events()
89
            notifier.process_events()
90

    
91
    def process_all_jobs(self):
92
        jobnames = listdir(self.queuepath)
93
        runjob = self.runjob
94
        for jobname in jobnames:
95
            runjob(jobname)
96

    
97
    @classmethod
98
    def main(cls, argv):
99
        argc = len(argv)
100
        usage = """
101
Usage: ./fscrud_server queuepath=<queuepath> <dataroot> [loop|process]
102
"""
103

    
104
        args = []
105
        append = args.append
106
        kw = {
107
            'queuepath':    ".fscrud/queue",
108
            'dataroot':     ".fscrud/data",
109
        }
110

    
111
        for arg in argv[1:]:
112
            key, sep, val = arg.partition('=')
113
            if not sep:
114
                append(arg)
115
            else:
116
                kw[key] = val
117

    
118
        queuepath = kw['queuepath']
119
        dataroot = kw['dataroot']
120

    
121
        if not args:
122
            print(usage)
123
            raise SystemExit
124

    
125
        cmd = args[0]
126

    
127
        backend = cls(queuepath, dataroot)
128
        if cmd == 'loop':
129
            backend.event_loop()
130
        elif cmd == 'process':
131
            backend.process_all_jobs()
132
        else:
133
            raise ValueError("unknown command '%s'" % (cmd,))
134

    
135
    def do_runjob(self, jobpath):
136
        with open(jobpath) as f:
137
            job = json_loads(f.read())
138

    
139
        filepath = path_join(self.dataroot, job['path'])
140
        if '..' in filepath:
141
            raise ValueError("'..' not allowed in paths")
142

    
143
        dataspec = job['dataspec']
144
        if dataspec is None:
145
            # DELETE
146
            unlink(filepath)
147
            return
148

    
149
        offset, data = dataspec
150
        if data is None:
151
            # READ
152
            # Nothing to do here, read them yourself!
153
            pass
154

    
155
        if not exists(filepath):
156
            # CREATE
157
            with open(filepath, "w") as f:
158
                pass
159

    
160
        # UPDATE
161
        with open(filepath, "r+") as f:
162
            f.seek(offset)
163
            f.write(data)
164

    
165
    def runjob(self, jobname):
166
        jobpath = path_join(self.queuepath, jobname)
167
        try:
168
            self.do_runjob(jobpath)
169
        except Exception, e:
170
            print e
171
            move(jobpath, self.err_path)
172
        else:
173
            move(jobpath, self.suceed_path)
174

    
175

    
176
if __name__ == '__main__':
177
    import sys
178

    
179
    FSCrudServer.main(sys.argv)
180