Revision 307149a8

b/lib/jqueue.py
37 37
class _QueuedOpCode(object):
38 38
  """Encasulates an opcode object.
39 39

  
40
  Access must be synchronized by using an external lock.
40
  Access is synchronized by the '_lock' attribute.
41 41

  
42 42
  """
43 43
  def __init__(self, op):
44 44
    self.input = op
45 45
    self.status = constants.OP_STATUS_QUEUED
46 46
    self.result = None
47
    self._lock = threading.Lock()
48

  
49
  @utils.LockedMethod
50
  def SetStatus(self, status, result):
51
    """Update the opcode status and result.
52

  
53
    """
54
    self.status = status
55
    self.result = result
56

  
57
  @utils.LockedMethod
58
  def GetStatus(self):
59
    """Get the opcode status.
60

  
61
    """
62
    return self.status
63

  
64
  @utils.LockedMethod
65
  def GetResult(self):
66
    """Get the opcode result.
67

  
68
    """
69
    return self.result
47 70

  
48 71

  
49 72
class _QueuedJob(object):
......
64 87
    # to use it.
65 88
    self._ops = [_QueuedOpCode(op) for op in ops]
66 89

  
67
  def _GetStatusUnlocked(self):
90
  def GetStatus(self):
68 91
    status = constants.JOB_STATUS_QUEUED
69 92

  
70 93
    all_success = True
71 94
    for op in self._ops:
72
      if op.status == constants.OP_STATUS_SUCCESS:
95
      op_status = op.GetStatus()
96
      if op_status == constants.OP_STATUS_SUCCESS:
73 97
        continue
74 98

  
75 99
      all_success = False
76 100

  
77
      if op.status == constants.OP_STATUS_QUEUED:
101
      if op_status == constants.OP_STATUS_QUEUED:
78 102
        pass
79
      elif op.status == constants.OP_STATUS_ERROR:
103
      elif op_status == constants.OP_STATUS_ERROR:
80 104
        status = constants.JOB_STATUS_ERROR
81
      elif op.status == constants.OP_STATUS_RUNNING:
105
      elif op_status == constants.OP_STATUS_RUNNING:
82 106
        status = constants.JOB_STATUS_RUNNING
83 107

  
84 108
    if all_success:
......
86 110

  
87 111
    return status
88 112

  
89
  def GetStatus(self):
90
    self._lock.acquire()
91
    try:
92
      return self._GetStatusUnlocked()
93
    finally:
94
      self._lock.release()
95

  
96 113
  def Run(self, proc):
97 114
    """Job executor.
98 115

  
......
107 124
      count = len(self._ops)
108 125
      for idx, op in enumerate(self._ops):
109 126
        try:
110
          self._lock.acquire()
111
          try:
112
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
113
            op.status = constants.OP_STATUS_RUNNING
114
          finally:
115
            self._lock.release()
127
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
128
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
116 129

  
117 130
          result = proc.ExecOpCode(op.input)
118 131

  
119
          self._lock.acquire()
120
          try:
121
            logging.debug("Op %s/%s: Successfully finished %s",
122
                          idx + 1, count, op)
123
            op.status = constants.OP_STATUS_SUCCESS
124
            op.result = result
125
          finally:
126
            self._lock.release()
132
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
133
          logging.debug("Op %s/%s: Successfully finished %s",
134
                        idx + 1, count, op)
127 135
        except Exception, err:
128
          self._lock.acquire()
129
          try:
130
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
131
            op.status = constants.OP_STATUS_ERROR
132
            op.result = str(err)
133
          finally:
134
            self._lock.release()
136
          op.SetStatus(constants.OP_STATUS_ERROR, str(err))
137
          logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
135 138
          raise
136 139

  
137 140
    except errors.GenericError, err:
......
227 230
        row.append(job.GetStatus())
228 231
      elif fname == "result":
229 232
        # TODO
230
        row.append(map(lambda op: op.result, job._ops))
233
        row.append([op.GetResult() for op in job._ops])
231 234
      else:
232 235
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
233 236
    return row

Also available in: Unified diff