Revision ef2df7d3
b/daemons/ganeti-masterd | ||
---|---|---|
35 | 35 |
import signal |
36 | 36 |
import logging |
37 | 37 |
|
38 |
from cStringIO import StringIO |
|
39 | 38 |
from optparse import OptionParser |
40 | 39 |
|
41 | 40 |
from ganeti import config |
b/lib/jqueue.py | ||
---|---|---|
153 | 153 |
@ivar received_timestamp: the timestamp for when the job was received |
154 | 154 |
@ivar start_timestmap: the timestamp for start of execution |
155 | 155 |
@ivar end_timestamp: the timestamp for end of execution |
156 |
@ivar lock_status: In-memory locking information for debugging |
|
156 | 157 |
@ivar change: a Condition variable we use for waiting for job changes |
157 | 158 |
|
158 | 159 |
""" |
159 | 160 |
__slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", |
160 | 161 |
"received_timestamp", "start_timestamp", "end_timestamp", |
161 |
"change", |
|
162 |
"lock_status", "change",
|
|
162 | 163 |
"__weakref__"] |
163 | 164 |
|
164 | 165 |
def __init__(self, queue, job_id, ops): |
... | ... | |
186 | 187 |
self.start_timestamp = None |
187 | 188 |
self.end_timestamp = None |
188 | 189 |
|
190 |
# In-memory attributes |
|
191 |
self.lock_status = None |
|
192 |
|
|
189 | 193 |
# Condition to wait for changes |
190 | 194 |
self.change = threading.Condition(self.queue._lock) |
191 | 195 |
|
... | ... | |
209 | 213 |
obj.start_timestamp = state.get("start_timestamp", None) |
210 | 214 |
obj.end_timestamp = state.get("end_timestamp", None) |
211 | 215 |
|
216 |
# In-memory attributes |
|
217 |
obj.lock_status = None |
|
218 |
|
|
212 | 219 |
obj.ops = [] |
213 | 220 |
obj.log_serial = 0 |
214 | 221 |
for op_state in state["ops"]: |
... | ... | |
334 | 341 |
not_marked = False |
335 | 342 |
|
336 | 343 |
|
337 |
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
|
|
344 |
class _OpExecCallbacks(mcpu.OpExecCbBase): |
|
338 | 345 |
def __init__(self, queue, job, op): |
339 | 346 |
"""Initializes this class. |
340 | 347 |
|
... | ... | |
368 | 375 |
assert self._op.status in (constants.OP_STATUS_WAITLOCK, |
369 | 376 |
constants.OP_STATUS_CANCELING) |
370 | 377 |
|
378 |
# All locks are acquired by now |
|
379 |
self._job.lock_status = None |
|
380 |
|
|
371 | 381 |
# Cancel here if we were asked to |
372 | 382 |
if self._op.status == constants.OP_STATUS_CANCELING: |
373 | 383 |
raise CancelJob() |
... | ... | |
401 | 411 |
finally: |
402 | 412 |
self._queue.release() |
403 | 413 |
|
414 |
def ReportLocks(self, msg): |
|
415 |
"""Write locking information to the job. |
|
416 |
|
|
417 |
Called whenever the LU processor is waiting for a lock or has acquired one. |
|
418 |
|
|
419 |
""" |
|
420 |
# Not getting the queue lock because this is a single assignment |
|
421 |
self._job.lock_status = msg |
|
422 |
|
|
404 | 423 |
|
405 | 424 |
class _JobQueueWorker(workerpool.BaseWorker): |
406 | 425 |
"""The actual job workers. |
... | ... | |
457 | 476 |
|
458 | 477 |
# Make sure not to hold queue lock while calling ExecOpCode |
459 | 478 |
result = proc.ExecOpCode(input_opcode, |
460 |
_OpCodeExecCallbacks(queue, job, op))
|
|
479 |
_OpExecCallbacks(queue, job, op)) |
|
461 | 480 |
|
462 | 481 |
queue.acquire() |
463 | 482 |
try: |
... | ... | |
505 | 524 |
queue.acquire() |
506 | 525 |
try: |
507 | 526 |
try: |
527 |
job.lock_status = None |
|
508 | 528 |
job.run_op_index = -1 |
509 | 529 |
job.end_timestamp = TimeStampNow() |
510 | 530 |
queue.UpdateJobUnlocked(job) |
... | ... | |
513 | 533 |
status = job.CalcStatus() |
514 | 534 |
finally: |
515 | 535 |
queue.release() |
536 |
|
|
516 | 537 |
logging.info("Worker %s finished job %s, status = %s", |
517 | 538 |
self.worker_id, job_id, status) |
518 | 539 |
|
... | ... | |
1081 | 1102 |
|
1082 | 1103 |
return results |
1083 | 1104 |
|
1084 |
|
|
1085 | 1105 |
@_RequireOpenQueue |
1086 | 1106 |
def UpdateJobUnlocked(self, job): |
1087 | 1107 |
"""Update a job's on disk storage. |
b/lib/mcpu.py | ||
---|---|---|
36 | 36 |
from ganeti import rpc |
37 | 37 |
from ganeti import cmdlib |
38 | 38 |
from ganeti import locking |
39 |
from ganeti import utils |
|
39 | 40 |
|
40 | 41 |
|
41 | 42 |
class OpExecCbBase: |
... | ... | |
55 | 56 |
|
56 | 57 |
""" |
57 | 58 |
|
59 |
def ReportLocks(self, msg): |
|
60 |
"""Report lock operations. |
|
61 |
|
|
62 |
""" |
|
63 |
|
|
58 | 64 |
|
59 | 65 |
class Processor(object): |
60 | 66 |
"""Object which runs OpCodes""" |
... | ... | |
128 | 134 |
self.rpc = rpc.RpcRunner(context.cfg) |
129 | 135 |
self.hmclass = HooksMaster |
130 | 136 |
|
137 |
def _ReportLocks(self, level, names, shared, acquired): |
|
138 |
"""Reports lock operations. |
|
139 |
|
|
140 |
@type level: int |
|
141 |
@param level: Lock level |
|
142 |
@type names: list or string |
|
143 |
@param names: Lock names |
|
144 |
@type shared: bool |
|
145 |
@param shared: Whether the lock should be acquired in shared mode |
|
146 |
@type acquired: bool |
|
147 |
@param acquired: Whether the lock has already been acquired |
|
148 |
|
|
149 |
""" |
|
150 |
parts = [] |
|
151 |
|
|
152 |
# Build message |
|
153 |
if acquired: |
|
154 |
parts.append("acquired") |
|
155 |
else: |
|
156 |
parts.append("waiting") |
|
157 |
|
|
158 |
parts.append(locking.LEVEL_NAMES[level]) |
|
159 |
|
|
160 |
if names == locking.ALL_SET: |
|
161 |
parts.append("ALL") |
|
162 |
elif isinstance(names, basestring): |
|
163 |
parts.append(names) |
|
164 |
else: |
|
165 |
parts.append(",".join(names)) |
|
166 |
|
|
167 |
if shared: |
|
168 |
parts.append("shared") |
|
169 |
else: |
|
170 |
parts.append("exclusive") |
|
171 |
|
|
172 |
msg = "/".join(parts) |
|
173 |
|
|
174 |
logging.debug("LU locks %s", msg) |
|
175 |
|
|
176 |
if self._cbs: |
|
177 |
self._cbs.ReportLocks(msg) |
|
178 |
|
|
131 | 179 |
def _ExecLU(self, lu): |
132 | 180 |
"""Logical Unit execution sequence. |
133 | 181 |
|
... | ... | |
184 | 232 |
share = lu.share_locks[level] |
185 | 233 |
if acquiring_locks: |
186 | 234 |
needed_locks = lu.needed_locks[level] |
235 |
|
|
236 |
self._ReportLocks(level, needed_locks, share, False) |
|
187 | 237 |
lu.acquired_locks[level] = self.context.glm.acquire(level, |
188 | 238 |
needed_locks, |
189 | 239 |
shared=share) |
240 |
self._ReportLocks(level, needed_locks, share, True) |
|
241 |
|
|
190 | 242 |
else: # adding_locks |
191 | 243 |
add_locks = lu.add_locks[level] |
192 | 244 |
lu.remove_locks[level] = add_locks |
... | ... | |
234 | 286 |
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a |
235 | 287 |
# shared fashion otherwise (to prevent concurrent run with an exclusive |
236 | 288 |
# LU. |
237 |
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL], |
|
238 |
shared=not lu_class.REQ_BGL) |
|
289 |
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL], |
|
290 |
not lu_class.REQ_BGL, False) |
|
291 |
try: |
|
292 |
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL], |
|
293 |
shared=not lu_class.REQ_BGL) |
|
294 |
finally: |
|
295 |
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL], |
|
296 |
not lu_class.REQ_BGL, True) |
|
239 | 297 |
try: |
240 | 298 |
self.exclusive_BGL = lu_class.REQ_BGL |
241 | 299 |
lu = lu_class(self, op, self.context, self.rpc) |
Also available in: Unified diff