Revision 39dcf2ef
b/daemons/ganeti-masterd | ||
---|---|---|
41 | 41 |
from cStringIO import StringIO |
42 | 42 |
from optparse import OptionParser |
43 | 43 |
|
44 |
from ganeti import config |
|
44 | 45 |
from ganeti import constants |
45 | 46 |
from ganeti import mcpu |
46 | 47 |
from ganeti import opcodes |
47 | 48 |
from ganeti import jqueue |
49 |
from ganeti import locking |
|
48 | 50 |
from ganeti import luxi |
49 | 51 |
from ganeti import utils |
50 | 52 |
from ganeti import errors |
... | ... | |
65 | 67 |
""" |
66 | 68 |
QUEUE_PROCESSOR_SIZE = 1 |
67 | 69 |
|
68 |
def __init__(self, address, rqhandler): |
|
70 |
def __init__(self, address, rqhandler, context):
|
|
69 | 71 |
"""IOServer constructor |
70 | 72 |
|
71 | 73 |
Args: |
72 | 74 |
address: the address to bind this IOServer to |
73 | 75 |
rqhandler: RequestHandler type object |
76 |
context: Context Object common to all worker threads |
|
74 | 77 |
|
75 | 78 |
""" |
76 | 79 |
SocketServer.UnixStreamServer.__init__(self, address, rqhandler) |
77 | 80 |
self.do_quit = False |
78 | 81 |
self.queue = jqueue.QueueManager() |
82 |
self.context = context |
|
79 | 83 |
self.processors = [] |
80 | 84 |
signal.signal(signal.SIGINT, self.handle_quit_signals) |
81 | 85 |
signal.signal(signal.SIGTERM, self.handle_quit_signals) |
... | ... | |
90 | 94 |
""" |
91 | 95 |
for i in range(self.QUEUE_PROCESSOR_SIZE): |
92 | 96 |
self.processors.append(threading.Thread(target=PoolWorker, |
93 |
args=(i, self.queue.new_queue))) |
|
97 |
args=(i, self.queue.new_queue, |
|
98 |
self.context))) |
|
94 | 99 |
for t in self.processors: |
95 | 100 |
t.start() |
96 | 101 |
|
... | ... | |
235 | 240 |
return self.server.queue.query_jobs(fields, names) |
236 | 241 |
|
237 | 242 |
|
238 |
def JobRunner(proc, job): |
|
243 |
def JobRunner(proc, job, context):
|
|
239 | 244 |
"""Job executor. |
240 | 245 |
|
241 | 246 |
This functions processes a single job in the context of given |
... | ... | |
244 | 249 |
Args: |
245 | 250 |
proc: Ganeti Processor to run the job on |
246 | 251 |
job: The job to run (unserialized format) |
252 |
context: Ganeti shared context |
|
247 | 253 |
|
248 | 254 |
""" |
249 | 255 |
job.SetStatus(opcodes.Job.STATUS_RUNNING) |
... | ... | |
263 | 269 |
job.SetStatus(opcodes.Job.STATUS_SUCCESS) |
264 | 270 |
|
265 | 271 |
|
266 |
def PoolWorker(worker_id, incoming_queue): |
|
272 |
def PoolWorker(worker_id, incoming_queue, context):
|
|
267 | 273 |
"""A worker thread function. |
268 | 274 |
|
269 | 275 |
This is the actual processor of a single thread of Job execution. |
... | ... | |
271 | 277 |
Args: |
272 | 278 |
worker_id: the unique id for this worker |
273 | 279 |
incoming_queue: a queue to get jobs from |
280 |
context: the common server context, containing all shared data and |
|
281 |
synchronization structures. |
|
274 | 282 |
|
275 | 283 |
""" |
276 | 284 |
while True: |
... | ... | |
283 | 291 |
try: |
284 | 292 |
proc = mcpu.Processor(feedback=lambda x: None) |
285 | 293 |
try: |
286 |
JobRunner(proc, item) |
|
294 |
JobRunner(proc, item, context)
|
|
287 | 295 |
except errors.GenericError, err: |
288 | 296 |
msg = "ganeti exception %s" % err |
289 | 297 |
item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg]) |
... | ... | |
305 | 313 |
print "worker %s exiting" % worker_id |
306 | 314 |
|
307 | 315 |
|
316 |
class GanetiContext(object): |
|
317 |
"""Context common to all ganeti threads. |
|
318 |
|
|
319 |
This class creates and holds common objects shared by all threads. |
|
320 |
|
|
321 |
""" |
|
322 |
_instance = None |
|
323 |
|
|
324 |
def __init__(self): |
|
325 |
"""Constructs a new GanetiContext object. |
|
326 |
|
|
327 |
There should be only a GanetiContext object at any time, so this |
|
328 |
function raises an error if this is not the case. |
|
329 |
|
|
330 |
""" |
|
331 |
assert self.__class__._instance is None, "double GanetiContext instance" |
|
332 |
|
|
333 |
# Create a ConfigWriter... |
|
334 |
self.cfg = config.ConfigWriter() |
|
335 |
# And a GanetiLockingManager... |
|
336 |
self.GLM = locking.GanetiLockManager( |
|
337 |
self.cfg.GetNodeList(), |
|
338 |
self.cfg.GetInstanceList()) |
|
339 |
|
|
340 |
# setting this also locks the class against attribute modifications |
|
341 |
self.__class__._instance = self |
|
342 |
|
|
343 |
def __setattr__(self, name, value): |
|
344 |
"""Setting GanetiContext attributes is forbidden after initialization. |
|
345 |
|
|
346 |
""" |
|
347 |
assert self.__class__._instance is None, "Attempt to modify Ganeti Context" |
|
348 |
object.__setattr__(self, name, value) |
|
349 |
|
|
350 |
|
|
308 | 351 |
def CheckMaster(debug): |
309 | 352 |
"""Checks the node setup. |
310 | 353 |
|
... | ... | |
362 | 405 |
|
363 | 406 |
CheckMaster(options.debug) |
364 | 407 |
|
365 |
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) |
|
408 |
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
|
|
366 | 409 |
|
367 | 410 |
# become a daemon |
368 | 411 |
if options.fork: |
Also available in: Unified diff