Revision 39dcf2ef

b/daemons/ganeti-masterd
41 41
from cStringIO import StringIO
42 42
from optparse import OptionParser
43 43

  
44
from ganeti import config
44 45
from ganeti import constants
45 46
from ganeti import mcpu
46 47
from ganeti import opcodes
47 48
from ganeti import jqueue
49
from ganeti import locking
48 50
from ganeti import luxi
49 51
from ganeti import utils
50 52
from ganeti import errors
......
65 67
  """
66 68
  QUEUE_PROCESSOR_SIZE = 1
67 69

  
68
  def __init__(self, address, rqhandler):
70
  def __init__(self, address, rqhandler, context):
69 71
    """IOServer constructor
70 72

  
71 73
    Args:
72 74
      address: the address to bind this IOServer to
73 75
      rqhandler: RequestHandler type object
76
      context: Context Object common to all worker threads
74 77

  
75 78
    """
76 79
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
77 80
    self.do_quit = False
78 81
    self.queue = jqueue.QueueManager()
82
    self.context = context
79 83
    self.processors = []
80 84
    signal.signal(signal.SIGINT, self.handle_quit_signals)
81 85
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
......
90 94
    """
91 95
    for i in range(self.QUEUE_PROCESSOR_SIZE):
92 96
      self.processors.append(threading.Thread(target=PoolWorker,
93
                                              args=(i, self.queue.new_queue)))
97
                                              args=(i, self.queue.new_queue,
98
                                                    self.context)))
94 99
    for t in self.processors:
95 100
      t.start()
96 101

  
......
235 240
    return self.server.queue.query_jobs(fields, names)
236 241

  
237 242

  
238
def JobRunner(proc, job):
243
def JobRunner(proc, job, context):
239 244
  """Job executor.
240 245

  
241 246
  This functions processes a single job in the context of given
......
244 249
  Args:
245 250
    proc: Ganeti Processor to run the job on
246 251
    job: The job to run (unserialized format)
252
    context: Ganeti shared context
247 253

  
248 254
  """
249 255
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
......
263 269
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
264 270

  
265 271

  
266
def PoolWorker(worker_id, incoming_queue):
272
def PoolWorker(worker_id, incoming_queue, context):
267 273
  """A worker thread function.
268 274

  
269 275
  This is the actual processor of a single thread of Job execution.
......
271 277
  Args:
272 278
    worker_id: the unique id for this worker
273 279
    incoming_queue: a queue to get jobs from
280
    context: the common server context, containing all shared data and
281
             synchronization structures.
274 282

  
275 283
  """
276 284
  while True:
......
283 291
    try:
284 292
      proc = mcpu.Processor(feedback=lambda x: None)
285 293
      try:
286
        JobRunner(proc, item)
294
        JobRunner(proc, item, context)
287 295
      except errors.GenericError, err:
288 296
        msg = "ganeti exception %s" % err
289 297
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
......
305 313
  print "worker %s exiting" % worker_id
306 314

  
307 315

  
316
class GanetiContext(object):
317
  """Context common to all ganeti threads.
318

  
319
  This class creates and holds common objects shared by all threads.
320

  
321
  """
322
  _instance = None
323

  
324
  def __init__(self):
325
    """Constructs a new GanetiContext object.
326

  
327
    There should be only a GanetiContext object at any time, so this
328
    function raises an error if this is not the case.
329

  
330
    """
331
    assert self.__class__._instance is None, "double GanetiContext instance"
332

  
333
    # Create a ConfigWriter...
334
    self.cfg = config.ConfigWriter()
335
    # And a GanetiLockingManager...
336
    self.GLM = locking.GanetiLockManager(
337
                self.cfg.GetNodeList(),
338
                self.cfg.GetInstanceList())
339

  
340
    # setting this also locks the class against attribute modifications
341
    self.__class__._instance = self
342

  
343
  def __setattr__(self, name, value):
344
    """Setting GanetiContext attributes is forbidden after initialization.
345

  
346
    """
347
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
348
    object.__setattr__(self, name, value)
349

  
350

  
308 351
def CheckMaster(debug):
309 352
  """Checks the node setup.
310 353

  
......
362 405

  
363 406
  CheckMaster(options.debug)
364 407

  
365
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
408
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
366 409

  
367 410
  # become a daemon
368 411
  if options.fork:

Also available in: Unified diff