Revision 0bbe448c

b/daemons/ganeti-masterd
225 225
  """Class holding high-level client operations."""
226 226
  def __init__(self, server):
227 227
    self.server = server
228
    self._cpu = None
229

  
230
  def _getcpu(self):
231
    if self._cpu is None:
232
      self._cpu = mcpu.Processor(lambda x: None)
233
    return self._cpu
234

  
235
  def handle_request(self, operation, args):
236
    print operation, args
237
    if operation == "submit":
238
      return self.put(args)
239
    elif operation == "query":
240
      return self.query(args)
241
    else:
242
      raise ValueError("Invalid operation")
243 228

  
244
  def put(self, args):
245
    job = luxi.UnserializeJob(args)
246
    rid = self.server.queue.put(job)
247
    return rid
248

  
249
  def query(self, args):
250
    path = args["object"]
251
    fields = args["fields"]
252
    names = args["names"]
253
    if path == "instances":
254
      opclass = opcodes.OpQueryInstances
255
    elif path == "jobs":
256
      # early exit because job query-ing is special (not via opcodes)
257
      return self.query_jobs(fields, names)
258
    else:
259
      raise ValueError("Invalid object %s" % path)
229
  def handle_request(self, method, args):
230
    queue = self.server.jobqueue
231

  
232
    # TODO: Parameter validation
233

  
234
    if method == luxi.REQ_SUBMIT_JOB:
235
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
236
      return queue.SubmitJob(ops)
260 237

  
261
    op = opclass(output_fields = fields, names=names)
262
    cpu = self._getcpu()
263
    result = cpu.ExecOpCode(op)
264
    return result
238
    elif method == luxi.REQ_CANCEL_JOB:
239
      (job_id, ) = args
240
      return queue.CancelJob(job_id)
265 241

  
266
  def query_jobs(self, fields, names):
267
    return self.server.queue.query_jobs(fields, names)
242
    elif method == luxi.REQ_ARCHIVE_JOB:
243
      (job_id, ) = args
244
      return queue.ArchiveJob(job_id)
245

  
246
    elif method == luxi.REQ_QUERY_JOBS:
247
      (job_ids, fields) = args
248
      return queue.QueryJobs(job_ids, fields)
249

  
250
    else:
251
      raise ValueError("Invalid operation")
268 252

  
269 253

  
270 254
def JobRunner(proc, job, context):
b/lib/cli.py
41 41
                      Option, OptionValueError, SUPPRESS_HELP)
42 42

  
43 43
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
44
           "SubmitOpCode", "SubmitJob", "SubmitQuery",
44
           "SubmitOpCode",
45 45
           "cli_option", "GenerateTable", "AskUser",
46 46
           "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
47 47
           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
......
370 370

  
371 371

  
372 372
def SubmitOpCode(op, proc=None, feedback_fn=None):
373
  """Function to submit an opcode.
373
  """Legacy function to submit an opcode.
374 374

  
375 375
  This is just a simple wrapper over the construction of the processor
376 376
  instance. It should be extended to better handle feedback and
......
379 379
  """
380 380
  # TODO: Fix feedback_fn situation.
381 381
  cl = luxi.Client()
382
  job = opcodes.Job(op_list=[op])
383
  jid = SubmitJob(job, cl)
384 382

  
385
  query = {
386
    "object": "jobs",
387
    "fields": ["status"],
388
    "names": [jid],
389
    }
383
  job_id = cl.SubmitJob([op])
390 384

  
391 385
  while True:
392
    jdata = SubmitQuery(query, cl)
393
    if not jdata:
386
    jobs = cl.QueryJobs([job_id], ["status"])
387
    if not jobs:
394 388
      # job not found, go away!
395
      raise errors.JobLost("Job with id %s lost" % jid)
389
      raise errors.JobLost("Job with id %s lost" % job_id)
396 390

  
397
    status = jdata[0][0]
398
    if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL):
391
    # TODO: Handle canceled and archived jobs
392
    status = jobs[0][0]
393
    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
399 394
      break
400 395
    time.sleep(1)
401 396

  
402
  query["fields"].extend(["op_list", "op_status", "op_result"])
403
  jdata = SubmitQuery(query, cl)
404
  if not jdata:
405
    raise errors.JobLost("Job with id %s lost" % jid)
406
  status, op_list, op_status, op_result = jdata[0]
407
  if status != opcodes.Job.STATUS_SUCCESS:
408
    raise errors.OpExecError(op_result[0])
409
  return op_result[0]
397
  jobs = cl.QueryJobs([job_id], ["status", "result"])
398
  if not jobs:
399
    raise errors.JobLost("Job with id %s lost" % job_id)
410 400

  
411

  
412
def SubmitJob(job, cl=None):
413
  if cl is None:
414
    cl = luxi.Client()
415
  return cl.SubmitJob(job)
416

  
417

  
418
def SubmitQuery(data, cl=None):
419
  if cl is None:
420
    cl = luxi.Client()
421
  return cl.Query(data)
401
  status, result = jobs[0]
402
  if status == constants.JOB_STATUS_SUCCESS:
403
    return result[0]
404
  else:
405
    raise errors.OpExecError(result)
422 406

  
423 407

  
424 408
def FormatError(err):
b/lib/luxi.py
45 45
KEY_SUCCESS = "success"
46 46
KEY_RESULT = "result"
47 47

  
48
REQ_SUBMIT = 'submit'
49
REQ_ABORT = 'abort'
50
REQ_QUERY = 'query'
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_QUERY_JOBS = "QueryJobs"
51 52

  
52 53
DEF_CTMO = 10
53 54
DEF_RWTO = 60
......
294 295

  
295 296
    return data[KEY_RESULT]
296 297

  
297
  def SubmitJob(self, job):
298
    """Submit a job"""
299
    return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
300

  
301
  def Query(self, data):
302
    """Make a query"""
303
    result = self.CallMethod(REQ_QUERY, data)
304
    if data["object"] == "jobs":
305
      # custom job processing of query values
306
      for row in result:
307
        for idx, field in enumerate(data["fields"]):
308
          if field == "op_list":
309
            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
310
    return result
298
  def SubmitJob(self, ops):
299
    ops_state = map(lambda op: op.__getstate__(), ops)
300
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
301

  
302
  def CancelJob(self, job_id):
303
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
304

  
305
  def ArchiveJob(self, job_id):
306
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
307

  
308
  def QueryJobs(self, job_ids, fields):
309
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
311 310

  
312 311
# TODO: class Server(object)

Also available in: Unified diff