Revision 0bbe448c daemons/ganeti-masterd
b/daemons/ganeti-masterd | ||
---|---|---|
225 | 225 |
"""Class holding high-level client operations.""" |
226 | 226 |
def __init__(self, server): |
227 | 227 |
self.server = server |
228 |
self._cpu = None |
|
229 |
|
|
230 |
def _getcpu(self): |
|
231 |
if self._cpu is None: |
|
232 |
self._cpu = mcpu.Processor(lambda x: None) |
|
233 |
return self._cpu |
|
234 |
|
|
235 |
def handle_request(self, operation, args): |
|
236 |
print operation, args |
|
237 |
if operation == "submit": |
|
238 |
return self.put(args) |
|
239 |
elif operation == "query": |
|
240 |
return self.query(args) |
|
241 |
else: |
|
242 |
raise ValueError("Invalid operation") |
|
243 | 228 |
|
244 |
def put(self, args): |
|
245 |
job = luxi.UnserializeJob(args) |
|
246 |
rid = self.server.queue.put(job) |
|
247 |
return rid |
|
248 |
|
|
249 |
def query(self, args): |
|
250 |
path = args["object"] |
|
251 |
fields = args["fields"] |
|
252 |
names = args["names"] |
|
253 |
if path == "instances": |
|
254 |
opclass = opcodes.OpQueryInstances |
|
255 |
elif path == "jobs": |
|
256 |
# early exit because job query-ing is special (not via opcodes) |
|
257 |
return self.query_jobs(fields, names) |
|
258 |
else: |
|
259 |
raise ValueError("Invalid object %s" % path) |
|
229 |
def handle_request(self, method, args): |
|
230 |
queue = self.server.jobqueue |
|
231 |
|
|
232 |
# TODO: Parameter validation |
|
233 |
|
|
234 |
if method == luxi.REQ_SUBMIT_JOB: |
|
235 |
ops = [opcodes.OpCode.LoadOpCode(state) for state in args] |
|
236 |
return queue.SubmitJob(ops) |
|
260 | 237 |
|
261 |
op = opclass(output_fields = fields, names=names) |
|
262 |
cpu = self._getcpu() |
|
263 |
result = cpu.ExecOpCode(op) |
|
264 |
return result |
|
238 |
elif method == luxi.REQ_CANCEL_JOB: |
|
239 |
(job_id, ) = args |
|
240 |
return queue.CancelJob(job_id) |
|
265 | 241 |
|
266 |
def query_jobs(self, fields, names): |
|
267 |
return self.server.queue.query_jobs(fields, names) |
|
242 |
elif method == luxi.REQ_ARCHIVE_JOB: |
|
243 |
(job_id, ) = args |
|
244 |
return queue.ArchiveJob(job_id) |
|
245 |
|
|
246 |
elif method == luxi.REQ_QUERY_JOBS: |
|
247 |
(job_ids, fields) = args |
|
248 |
return queue.QueryJobs(job_ids, fields) |
|
249 |
|
|
250 |
else: |
|
251 |
raise ValueError("Invalid operation") |
|
268 | 252 |
|
269 | 253 |
|
270 | 254 |
def JobRunner(proc, job, context): |
Also available in: Unified diff