Add per-opcode results to job processing
authorIustin Pop <iustin@google.com>
Thu, 10 Apr 2008 15:36:29 +0000 (15:36 +0000)
committerIustin Pop <iustin@google.com>
Thu, 10 Apr 2008 15:36:29 +0000 (15:36 +0000)
This patch changes the definition of a job and introduces per-opcode
results.

First, the result and status fields of a job are condensed into a single
'status' attribute. Then, we introduce an opcode status and one result
list, that allow jobs to return values.

The gnt-job script is also modified to allow these new fields to be
queried.

Note that the patch changes the opcode field to op_list, and it changes
its return value from string to a list of (serialized) opcodes.

Reviewed-by: ultrotter

daemons/ganeti-masterd
lib/jqueue.py
lib/opcodes.py
scripts/gnt-job

index 24fec0e..d67b6fd 100644 (file)
@@ -236,9 +236,20 @@ def JobRunner(proc, job):
 
   """
   job.SetStatus(opcodes.Job.STATUS_RUNNING)
-  for op in job.data.op_list:
-    proc.ExecOpCode(op)
-  job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
+  fail = False
+  for idx, op in enumerate(job.data.op_list):
+    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
+    try:
+      job.data.op_result[idx] = proc.ExecOpCode(op)
+      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
+    except (errors.OpPrereqError, errors.OpExecError), err:
+      fail = True
+      job.data.op_result[idx] = str(err)
+      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
+  if fail:
+    job.SetStatus(opcodes.Job.STATUS_FAIL)
+  else:
+    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
 
 
 def PoolWorker(worker_id, incoming_queue):
index b0b8916..783c597 100644 (file)
@@ -38,6 +38,8 @@ class JobObject:
     self.data = jdesc
     jdesc.status = opcodes.Job.STATUS_PENDING
     jdesc.job_id = jid
+    jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
+    jdesc.op_result = [None for i in jdesc.op_list]
     self.lock = threading.Lock()
 
   def SetStatus(self, status, result=None):
@@ -114,8 +116,12 @@ class QueueManager:
             row.append(jdata.job_id)
           elif fname == "status":
             row.append(jdata.status)
-          elif fname == "opcodes":
-            row.append(",".join([op.OP_ID for op in jdata.op_list]))
+          elif fname == "op_list":
+            row.append([op.__getstate__() for op in jdata.op_list])
+          elif fname == "op_status":
+            row.append(jdata.op_status)
+          elif fname == "op_result":
+            row.append(jdata.op_result)
           else:
             raise errors.OpExecError("Invalid job query field '%s'" %
                                            fname)
index a5e41de..a5d8ded 100644 (file)
@@ -74,15 +74,31 @@ class BaseJO(object):
 
 
 class Job(BaseJO):
-  """Job definition structure"""
+  """Job definition structure
+
+  The Job definitions has two sets of parameters:
+    - the parameters of the job itself (all filled by server):
+      - job_id,
+      - status: pending, running, successfull, failed, aborted
+    - opcode parameters:
+      - op_list, list of opcodes, clients creates this
+      - op_status, status for each opcode, server fills in
+      - op_result, result for each opcode, server fills in
+
+  """
   STATUS_PENDING = 1
   STATUS_RUNNING = 2
-  STATUS_FINISHED = 3
-  RESULT_OK = 1
-  RESULT_FAIL = 2
-  RESULT_ABORT = 3
+  STATUS_SUCCESS = 3
+  STATUS_FAIL = 4
+  STATUS_ABORT = 5
 
-  __slots__ = ["job_id", "op_list", "status", "result"]
+  __slots__ = [
+    "job_id",
+    "status",
+    "op_list",
+    "op_status",
+    "op_result",
+    ]
 
   def __getstate__(self):
     """Specialized getstate for jobs
index e438f5a..e98f637 100644 (file)
@@ -53,7 +53,9 @@ def ListJobs(opts, args):
     headers = {
       "id": "ID",
       "status": "Status",
-      "opcodes": "OpCodes",
+      "op_list": "OpCodes",
+      "op_status": "OpStatus",
+      "op_result": "OpResult",
       }
   else:
     headers = None
@@ -71,8 +73,12 @@ def ListJobs(opts, args):
           val = "pending"
         elif val == opcodes.Job.STATUS_RUNNING:
           val = "running"
-        elif val == opcodes.Job.STATUS_FINISHED:
+        elif val == opcodes.Job.STATUS_SUCCESS:
           val = "finished"
+        elif val == opcodes.Job.STATUS_FAIL:
+          val = "failed"
+        elif val == opcodes.Job.STATUS_ABORT:
+          val = "aborted"
         else:
           raise errors.ProgrammerError("Unknown job status code '%s'" % val)
 
@@ -91,7 +97,8 @@ commands = {
   'list': (ListJobs, ARGS_NONE,
             [DEBUG_OPT, NOHDR_OPT, SEP_OPT, USEUNITS_OPT, FIELDS_OPT],
             "", "List the jobs and their status. The available fields are"
-           " (see the man page for details): id, status, opcodes."
+           " (see the man page for details): id, status, op_list,"
+           " op_status, op_result."
            " The default field"
            " list is (in order): id, status."),
   }