Revision f1048938 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
26 | 26 |
import threading |
27 | 27 |
import errno |
28 | 28 |
import re |
29 |
import time |
|
29 | 30 |
|
30 | 31 |
from ganeti import constants |
31 | 32 |
from ganeti import serializer |
... | ... | |
44 | 45 |
|
45 | 46 |
Access is synchronized by the '_lock' attribute. |
46 | 47 |
|
48 |
The 'log' attribute holds the execution log and consists of tuples |
|
49 |
of the form (timestamp, level, message). |
|
50 |
|
|
47 | 51 |
""" |
48 | 52 |
def __init__(self, op): |
49 |
self.__Setup(op, constants.OP_STATUS_QUEUED, None) |
|
53 |
self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
|
|
50 | 54 |
|
51 |
def __Setup(self, input, status, result):
|
|
55 |
def __Setup(self, input_, status, result, log):
|
|
52 | 56 |
self._lock = threading.Lock() |
53 |
self.input = input |
|
57 |
self.input = input_
|
|
54 | 58 |
self.status = status |
55 | 59 |
self.result = result |
60 |
self.log = log |
|
56 | 61 |
|
57 | 62 |
@classmethod |
58 | 63 |
def Restore(cls, state): |
59 | 64 |
obj = object.__new__(cls) |
60 | 65 |
obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]), |
61 |
state["status"], state["result"]) |
|
66 |
state["status"], state["result"], state["log"])
|
|
62 | 67 |
return obj |
63 | 68 |
|
64 | 69 |
@utils.LockedMethod |
... | ... | |
67 | 72 |
"input": self.input.__getstate__(), |
68 | 73 |
"status": self.status, |
69 | 74 |
"result": self.result, |
75 |
"log": self.log, |
|
70 | 76 |
} |
71 | 77 |
|
72 | 78 |
@utils.LockedMethod |
... | ... | |
98 | 104 |
""" |
99 | 105 |
return self.result |
100 | 106 |
|
107 |
@utils.LockedMethod |
|
108 |
def Log(self, *args): |
|
109 |
"""Append a log entry. |
|
110 |
|
|
111 |
""" |
|
112 |
assert len(args) < 2 |
|
113 |
|
|
114 |
if len(args) == 1: |
|
115 |
log_type = constants.ELOG_MESSAGE |
|
116 |
log_msg = args[0] |
|
117 |
else: |
|
118 |
log_type, log_msg = args |
|
119 |
self.log.append((time.time(), log_type, log_msg)) |
|
120 |
|
|
121 |
@utils.LockedMethod |
|
122 |
def RetrieveLog(self, start_at=0): |
|
123 |
"""Retrieve (a part of) the execution log. |
|
124 |
|
|
125 |
""" |
|
126 |
return self.log[start_at:] |
|
127 |
|
|
101 | 128 |
|
102 | 129 |
class _QueuedJob(object): |
103 | 130 |
"""In-memory job representation. |
... | ... | |
110 | 137 |
# TODO |
111 | 138 |
raise Exception("No opcodes") |
112 | 139 |
|
113 |
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops]) |
|
140 |
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
|
|
114 | 141 |
|
115 |
def __Setup(self, storage, job_id, ops): |
|
142 |
def __Setup(self, storage, job_id, ops, run_op_index): |
|
143 |
self._lock = threading.Lock() |
|
116 | 144 |
self.storage = storage |
117 | 145 |
self.id = job_id |
118 | 146 |
self._ops = ops |
147 |
self.run_op_index = run_op_index |
|
119 | 148 |
|
120 | 149 |
@classmethod |
121 | 150 |
def Restore(cls, storage, state): |
122 | 151 |
obj = object.__new__(cls) |
123 |
obj.__Setup(storage, state["id"],
|
|
124 |
[_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
|
|
152 |
op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
|
|
153 |
obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
|
|
125 | 154 |
return obj |
126 | 155 |
|
127 | 156 |
def Serialize(self): |
128 | 157 |
return { |
129 | 158 |
"id": self.id, |
130 | 159 |
"ops": [op.Serialize() for op in self._ops], |
160 |
"run_op_index": self.run_op_index, |
|
131 | 161 |
} |
132 | 162 |
|
133 | 163 |
def SetUnclean(self, msg): |
... | ... | |
162 | 192 |
|
163 | 193 |
return status |
164 | 194 |
|
195 |
@utils.LockedMethod |
|
196 |
def GetRunOpIndex(self): |
|
197 |
return self.run_op_index |
|
198 |
|
|
165 | 199 |
def Run(self, proc): |
166 | 200 |
"""Job executor. |
167 | 201 |
|
... | ... | |
177 | 211 |
for idx, op in enumerate(self._ops): |
178 | 212 |
try: |
179 | 213 |
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op) |
214 |
|
|
215 |
self._lock.acquire() |
|
216 |
try: |
|
217 |
self.run_op_index = idx |
|
218 |
finally: |
|
219 |
self._lock.release() |
|
220 |
|
|
180 | 221 |
op.SetStatus(constants.OP_STATUS_RUNNING, None) |
181 | 222 |
self.storage.UpdateJob(self) |
182 | 223 |
|
183 |
result = proc.ExecOpCode(op.input) |
|
224 |
result = proc.ExecOpCode(op.input, op.Log)
|
|
184 | 225 |
|
185 | 226 |
op.SetStatus(constants.OP_STATUS_SUCCESS, result) |
186 | 227 |
self.storage.UpdateJob(self) |
... | ... | |
207 | 248 |
logging.debug("Worker %s processing job %s", |
208 | 249 |
self.worker_id, job.id) |
209 | 250 |
# TODO: feedback function |
210 |
proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
|
|
251 |
proc = mcpu.Processor(self.pool.context) |
|
211 | 252 |
try: |
212 | 253 |
job.Run(proc) |
213 | 254 |
finally: |
... | ... | |
477 | 518 |
row.append([op.GetResult() for op in job._ops]) |
478 | 519 |
elif fname == "opstatus": |
479 | 520 |
row.append([op.GetStatus() for op in job._ops]) |
521 |
elif fname == "ticker": |
|
522 |
ji = job.GetRunOpIndex() |
|
523 |
if ji < 0: |
|
524 |
lmsg = None |
|
525 |
else: |
|
526 |
lmsg = job._ops[ji].RetrieveLog(-1) |
|
527 |
# message might be empty here |
|
528 |
if lmsg: |
|
529 |
lmsg = lmsg[0] |
|
530 |
else: |
|
531 |
lmsg = None |
|
532 |
row.append(lmsg) |
|
480 | 533 |
else: |
481 | 534 |
raise errors.OpExecError("Invalid job query field '%s'" % fname) |
482 | 535 |
return row |
Also available in: Unified diff