26 |
26 |
import threading
|
27 |
27 |
import errno
|
28 |
28 |
import re
|
|
29 |
import time
|
29 |
30 |
|
30 |
31 |
from ganeti import constants
|
31 |
32 |
from ganeti import serializer
|
... | ... | |
44 |
45 |
|
45 |
46 |
Access is synchronized by the '_lock' attribute.
|
46 |
47 |
|
|
48 |
The 'log' attribute holds the execution log and consists of tuples
|
|
49 |
of the form (timestamp, level, message).
|
|
50 |
|
47 |
51 |
"""
|
48 |
52 |
def __init__(self, op):
|
49 |
|
self.__Setup(op, constants.OP_STATUS_QUEUED, None)
|
|
53 |
self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
|
50 |
54 |
|
51 |
|
def __Setup(self, input, status, result):
|
|
55 |
def __Setup(self, input_, status, result, log):
|
52 |
56 |
self._lock = threading.Lock()
|
53 |
|
self.input = input
|
|
57 |
self.input = input_
|
54 |
58 |
self.status = status
|
55 |
59 |
self.result = result
|
|
60 |
self.log = log
|
56 |
61 |
|
57 |
62 |
@classmethod
|
58 |
63 |
def Restore(cls, state):
|
59 |
64 |
obj = object.__new__(cls)
|
60 |
65 |
obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
|
61 |
|
state["status"], state["result"])
|
|
66 |
state["status"], state["result"], state["log"])
|
62 |
67 |
return obj
|
63 |
68 |
|
64 |
69 |
@utils.LockedMethod
|
... | ... | |
67 |
72 |
"input": self.input.__getstate__(),
|
68 |
73 |
"status": self.status,
|
69 |
74 |
"result": self.result,
|
|
75 |
"log": self.log,
|
70 |
76 |
}
|
71 |
77 |
|
72 |
78 |
@utils.LockedMethod
|
... | ... | |
98 |
104 |
"""
|
99 |
105 |
return self.result
|
100 |
106 |
|
|
107 |
@utils.LockedMethod
|
|
108 |
def Log(self, *args):
|
|
109 |
"""Append a log entry.
|
|
110 |
|
|
111 |
"""
|
|
112 |
assert len(args) < 2
|
|
113 |
|
|
114 |
if len(args) == 1:
|
|
115 |
log_type = constants.ELOG_MESSAGE
|
|
116 |
log_msg = args[0]
|
|
117 |
else:
|
|
118 |
log_type, log_msg = args
|
|
119 |
self.log.append((time.time(), log_type, log_msg))
|
|
120 |
|
|
121 |
@utils.LockedMethod
|
|
122 |
def RetrieveLog(self, start_at=0):
|
|
123 |
"""Retrieve (a part of) the execution log.
|
|
124 |
|
|
125 |
"""
|
|
126 |
return self.log[start_at:]
|
|
127 |
|
101 |
128 |
|
102 |
129 |
class _QueuedJob(object):
|
103 |
130 |
"""In-memory job representation.
|
... | ... | |
110 |
137 |
# TODO
|
111 |
138 |
raise Exception("No opcodes")
|
112 |
139 |
|
113 |
|
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
|
|
140 |
self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
|
114 |
141 |
|
115 |
|
def __Setup(self, storage, job_id, ops):
|
|
142 |
def __Setup(self, storage, job_id, ops, run_op_index):
|
|
143 |
self._lock = threading.Lock()
|
116 |
144 |
self.storage = storage
|
117 |
145 |
self.id = job_id
|
118 |
146 |
self._ops = ops
|
|
147 |
self.run_op_index = run_op_index
|
119 |
148 |
|
120 |
149 |
@classmethod
|
121 |
150 |
def Restore(cls, storage, state):
|
122 |
151 |
obj = object.__new__(cls)
|
123 |
|
obj.__Setup(storage, state["id"],
|
124 |
|
[_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
|
|
152 |
op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
|
|
153 |
obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
|
125 |
154 |
return obj
|
126 |
155 |
|
127 |
156 |
def Serialize(self):
|
128 |
157 |
return {
|
129 |
158 |
"id": self.id,
|
130 |
159 |
"ops": [op.Serialize() for op in self._ops],
|
|
160 |
"run_op_index": self.run_op_index,
|
131 |
161 |
}
|
132 |
162 |
|
133 |
163 |
def SetUnclean(self, msg):
|
... | ... | |
162 |
192 |
|
163 |
193 |
return status
|
164 |
194 |
|
|
195 |
@utils.LockedMethod
|
|
196 |
def GetRunOpIndex(self):
|
|
197 |
return self.run_op_index
|
|
198 |
|
165 |
199 |
def Run(self, proc):
|
166 |
200 |
"""Job executor.
|
167 |
201 |
|
... | ... | |
177 |
211 |
for idx, op in enumerate(self._ops):
|
178 |
212 |
try:
|
179 |
213 |
logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
|
|
214 |
|
|
215 |
self._lock.acquire()
|
|
216 |
try:
|
|
217 |
self.run_op_index = idx
|
|
218 |
finally:
|
|
219 |
self._lock.release()
|
|
220 |
|
180 |
221 |
op.SetStatus(constants.OP_STATUS_RUNNING, None)
|
181 |
222 |
self.storage.UpdateJob(self)
|
182 |
223 |
|
183 |
|
result = proc.ExecOpCode(op.input)
|
|
224 |
result = proc.ExecOpCode(op.input, op.Log)
|
184 |
225 |
|
185 |
226 |
op.SetStatus(constants.OP_STATUS_SUCCESS, result)
|
186 |
227 |
self.storage.UpdateJob(self)
|
... | ... | |
207 |
248 |
logging.debug("Worker %s processing job %s",
|
208 |
249 |
self.worker_id, job.id)
|
209 |
250 |
# TODO: feedback function
|
210 |
|
proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
|
|
251 |
proc = mcpu.Processor(self.pool.context)
|
211 |
252 |
try:
|
212 |
253 |
job.Run(proc)
|
213 |
254 |
finally:
|
... | ... | |
477 |
518 |
row.append([op.GetResult() for op in job._ops])
|
478 |
519 |
elif fname == "opstatus":
|
479 |
520 |
row.append([op.GetStatus() for op in job._ops])
|
|
521 |
elif fname == "ticker":
|
|
522 |
ji = job.GetRunOpIndex()
|
|
523 |
if ji < 0:
|
|
524 |
lmsg = None
|
|
525 |
else:
|
|
526 |
lmsg = job._ops[ji].RetrieveLog(-1)
|
|
527 |
# message might be empty here
|
|
528 |
if lmsg:
|
|
529 |
lmsg = lmsg[0]
|
|
530 |
else:
|
|
531 |
lmsg = None
|
|
532 |
row.append(lmsg)
|
480 |
533 |
else:
|
481 |
534 |
raise errors.OpExecError("Invalid job query field '%s'" % fname)
|
482 |
535 |
return row
|