Revision 307149a8
b/lib/jqueue.py | ||
---|---|---|
37 | 37 |
class _QueuedOpCode(object): |
38 | 38 |
"""Encasulates an opcode object. |
39 | 39 |
|
40 |
Access must be synchronized by using an external lock.
|
|
40 |
Access is synchronized by the '_lock' attribute.
|
|
41 | 41 |
|
42 | 42 |
""" |
43 | 43 |
def __init__(self, op): |
44 | 44 |
self.input = op |
45 | 45 |
self.status = constants.OP_STATUS_QUEUED |
46 | 46 |
self.result = None |
47 |
self._lock = threading.Lock() |
|
48 |
|
|
49 |
@utils.LockedMethod |
|
50 |
def SetStatus(self, status, result): |
|
51 |
"""Update the opcode status and result. |
|
52 |
|
|
53 |
""" |
|
54 |
self.status = status |
|
55 |
self.result = result |
|
56 |
|
|
57 |
@utils.LockedMethod |
|
58 |
def GetStatus(self): |
|
59 |
"""Get the opcode status. |
|
60 |
|
|
61 |
""" |
|
62 |
return self.status |
|
63 |
|
|
64 |
@utils.LockedMethod |
|
65 |
def GetResult(self): |
|
66 |
"""Get the opcode result. |
|
67 |
|
|
68 |
""" |
|
69 |
return self.result |
|
47 | 70 |
|
48 | 71 |
|
49 | 72 |
class _QueuedJob(object): |
... | ... | |
64 | 87 |
# to use it. |
65 | 88 |
self._ops = [_QueuedOpCode(op) for op in ops] |
66 | 89 |
|
67 |
def _GetStatusUnlocked(self):
|
|
90 |
def GetStatus(self):
|
|
68 | 91 |
status = constants.JOB_STATUS_QUEUED |
69 | 92 |
|
70 | 93 |
all_success = True |
71 | 94 |
for op in self._ops: |
72 |
if op.status == constants.OP_STATUS_SUCCESS: |
|
95 |
op_status = op.GetStatus() |
|
96 |
if op_status == constants.OP_STATUS_SUCCESS: |
|
73 | 97 |
continue |
74 | 98 |
|
75 | 99 |
all_success = False |
76 | 100 |
|
77 |
if op.status == constants.OP_STATUS_QUEUED:
|
|
101 |
if op_status == constants.OP_STATUS_QUEUED:
|
|
78 | 102 |
pass |
79 |
elif op.status == constants.OP_STATUS_ERROR:
|
|
103 |
elif op_status == constants.OP_STATUS_ERROR:
|
|
80 | 104 |
status = constants.JOB_STATUS_ERROR |
81 |
elif op.status == constants.OP_STATUS_RUNNING:
|
|
105 |
elif op_status == constants.OP_STATUS_RUNNING:
|
|
82 | 106 |
status = constants.JOB_STATUS_RUNNING |
83 | 107 |
|
84 | 108 |
if all_success: |
... | ... | |
86 | 110 |
|
87 | 111 |
return status |
88 | 112 |
|
89 |
def GetStatus(self): |
|
90 |
self._lock.acquire() |
|
91 |
try: |
|
92 |
return self._GetStatusUnlocked() |
|
93 |
finally: |
|
94 |
self._lock.release() |
|
95 |
|
|
96 | 113 |
def Run(self, proc): |
97 | 114 |
"""Job executor. |
98 | 115 |
|
... | ... | |
107 | 124 |
count = len(self._ops) |
108 | 125 |
for idx, op in enumerate(self._ops): |
109 | 126 |
try: |
110 |
self._lock.acquire() |
|
111 |
try: |
|
112 |
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) |
|
113 |
op.status = constants.OP_STATUS_RUNNING |
|
114 |
finally: |
|
115 |
self._lock.release() |
|
127 |
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) |
|
128 |
op.SetStatus(constants.OP_STATUS_RUNNING, None) |
|
116 | 129 |
|
117 | 130 |
result = proc.ExecOpCode(op.input) |
118 | 131 |
|
119 |
self._lock.acquire() |
|
120 |
try: |
|
121 |
logging.debug("Op %s/%s: Successfully finished %s", |
|
122 |
idx + 1, count, op) |
|
123 |
op.status = constants.OP_STATUS_SUCCESS |
|
124 |
op.result = result |
|
125 |
finally: |
|
126 |
self._lock.release() |
|
132 |
op.SetStatus(constants.OP_STATUS_SUCCESS, result) |
|
133 |
logging.debug("Op %s/%s: Successfully finished %s", |
|
134 |
idx + 1, count, op) |
|
127 | 135 |
except Exception, err: |
128 |
self._lock.acquire() |
|
129 |
try: |
|
130 |
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) |
|
131 |
op.status = constants.OP_STATUS_ERROR |
|
132 |
op.result = str(err) |
|
133 |
finally: |
|
134 |
self._lock.release() |
|
136 |
op.SetStatus(constants.OP_STATUS_ERROR, str(err)) |
|
137 |
logging.debug("Op %s/%s: Error in %s", idx + 1, count, op) |
|
135 | 138 |
raise |
136 | 139 |
|
137 | 140 |
except errors.GenericError, err: |
... | ... | |
227 | 230 |
row.append(job.GetStatus()) |
228 | 231 |
elif fname == "result": |
229 | 232 |
# TODO |
230 |
row.append(map(lambda op: op.result, job._ops))
|
|
233 |
row.append([op.GetResult() for op in job._ops])
|
|
231 | 234 |
else: |
232 | 235 |
raise errors.OpExecError("Invalid job query field '%s'" % fname) |
233 | 236 |
return row |
Also available in: Unified diff