root / commissioning / physicals / fscrud / fscrud_server.py @ 9f1a1bd0
History | View | Annotate | Download (5 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
|
3 |
from json import dumps as json_dumps, loads as json_loads |
4 |
from os.path import join as path_join, exists, isdir |
5 |
from os import listdir, makedirs, unlink |
6 |
from shutil import move |
7 |
|
8 |
def ensure_directory(dirpath): |
9 |
if not isdir(dirpath): |
10 |
if exists(dirpath):
|
11 |
m = ("path '%s' exists but is not a directory!"
|
12 |
% (dirpath,) ) |
13 |
raise ValueError(m) |
14 |
|
15 |
makedirs(dirpath) |
16 |
|
17 |
try:
|
18 |
test_path = path_join(dirpath, "__test")
|
19 |
with open(test_path, "w") as f: |
20 |
f.write("test")
|
21 |
except (OSError, IOError), e: |
22 |
m = "cannot create files in directory '%s'" % (dirpath,)
|
23 |
raise ValueError(m) |
24 |
|
25 |
unlink(test_path) |
26 |
if exists(test_path):
|
27 |
m = "'%s' exists after unlink" % (test_path,)
|
28 |
raise ValueError(m) |
29 |
|
30 |
|
31 |
class FSCrudServer(object): |
32 |
|
33 |
def __init__(self, queuepath, dataroot): |
34 |
err_path = path_join(queuepath, 'errs')
|
35 |
succeed_path = path_join(queuepath, 'succeeds')
|
36 |
ensure_directory(queuepath) |
37 |
ensure_directory(dataroot) |
38 |
ensure_directory(err_path) |
39 |
ensure_directory(succeed_path) |
40 |
|
41 |
self.queuepath = queuepath
|
42 |
self.dataroot = dataroot
|
43 |
self.err_path = err_path
|
44 |
self.succeed_path = succeed_path
|
45 |
|
46 |
from pyinotify import WatchManager, Notifier, ProcessEvent, IN_MOVED_TO |
47 |
watch_manager = WatchManager() |
48 |
|
49 |
class EventProcessor(ProcessEvent): |
50 |
|
51 |
def __init__(self, backend): |
52 |
self.backend = backend
|
53 |
|
54 |
def process_IN_MOVED_TO(self, event): |
55 |
path = event.path |
56 |
backend = self.backend
|
57 |
if path != backend.queuepath:
|
58 |
m = "notification for unknown directory '%s'!" % (path,)
|
59 |
raise AssertionError(m) |
60 |
|
61 |
jobname = event.name |
62 |
backend.runjob(jobname) |
63 |
|
64 |
def process_IN_Q_OVERFLOW(self, event): |
65 |
raise RuntimError("BOOM :(") |
66 |
|
67 |
event_processor = EventProcessor(self)
|
68 |
notifier = Notifier(watch_manager, event_processor) |
69 |
watch_descriptor = watch_manager.add_watch( queuepath, |
70 |
IN_MOVED_TO, |
71 |
rec=False )
|
72 |
self.watch_manager = watch_manager
|
73 |
self.event_processor = event_processor
|
74 |
self.notifier = notifier
|
75 |
self.watch_descriptor = watch_descriptor
|
76 |
|
77 |
def process_events(self): |
78 |
notifier = self.notifier
|
79 |
if notifier.check_events():
|
80 |
notifier.read_events() |
81 |
|
82 |
notifier.process_events() |
83 |
|
84 |
def event_loop(self): |
85 |
while 1: |
86 |
notifier = self.notifier
|
87 |
if notifier.check_events(timeout=100): |
88 |
notifier.read_events() |
89 |
notifier.process_events() |
90 |
|
91 |
def process_all_jobs(self): |
92 |
jobnames = listdir(self.queuepath)
|
93 |
runjob = self.runjob
|
94 |
for jobname in jobnames: |
95 |
runjob(jobname) |
96 |
|
97 |
@classmethod
|
98 |
def main(cls, argv): |
99 |
argc = len(argv)
|
100 |
usage = """
|
101 |
Usage: ./fscrud_server queuepath=<queuepath> <dataroot> [loop|process]
|
102 |
"""
|
103 |
|
104 |
args = [] |
105 |
append = args.append |
106 |
kw = { |
107 |
'queuepath': ".fscrud/queue", |
108 |
'dataroot': ".fscrud/data", |
109 |
} |
110 |
|
111 |
for arg in argv[1:]: |
112 |
key, sep, val = arg.partition('=')
|
113 |
if not sep: |
114 |
append(arg) |
115 |
else:
|
116 |
kw[key] = val |
117 |
|
118 |
queuepath = kw['queuepath']
|
119 |
dataroot = kw['dataroot']
|
120 |
|
121 |
if not args: |
122 |
print(usage) |
123 |
raise SystemExit |
124 |
|
125 |
cmd = args[0]
|
126 |
|
127 |
backend = cls(queuepath, dataroot) |
128 |
if cmd == 'loop': |
129 |
backend.event_loop() |
130 |
elif cmd == 'process': |
131 |
backend.process_all_jobs() |
132 |
else:
|
133 |
raise ValueError("unknown command '%s'" % (cmd,)) |
134 |
|
135 |
def do_runjob(self, jobpath): |
136 |
with open(jobpath) as f: |
137 |
job = json_loads(f.read()) |
138 |
|
139 |
filepath = path_join(self.dataroot, job['path']) |
140 |
if '..' in filepath: |
141 |
raise ValueError("'..' not allowed in paths") |
142 |
|
143 |
dataspec = job['dataspec']
|
144 |
if dataspec is None: |
145 |
# DELETE
|
146 |
unlink(filepath) |
147 |
return
|
148 |
|
149 |
offset, data = dataspec |
150 |
if data is None: |
151 |
# READ
|
152 |
# Nothing to do here, read them yourself!
|
153 |
pass
|
154 |
|
155 |
if not exists(filepath): |
156 |
# CREATE
|
157 |
with open(filepath, "w") as f: |
158 |
pass
|
159 |
|
160 |
# UPDATE
|
161 |
with open(filepath, "r+") as f: |
162 |
f.seek(offset) |
163 |
f.write(data) |
164 |
|
165 |
def runjob(self, jobname): |
166 |
jobpath = path_join(self.queuepath, jobname)
|
167 |
try:
|
168 |
self.do_runjob(jobpath)
|
169 |
except Exception, e: |
170 |
print e
|
171 |
move(jobpath, self.err_path)
|
172 |
else:
|
173 |
move(jobpath, self.suceed_path)
|
174 |
|
175 |
|
176 |
if __name__ == '__main__': |
177 |
import sys |
178 |
|
179 |
FSCrudServer.main(sys.argv) |
180 |
|