Revision 50a3fbb2

b/daemons/ganeti-masterd
82 82
    self.queue = jqueue.QueueManager()
83 83
    self.context = context
84 84
    self.processors = []
85

  
86
    # We'll only start threads once we've forked.
87
    self.jobqueue = None
88

  
85 89
    signal.signal(signal.SIGINT, self.handle_quit_signals)
86 90
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
87 91

  
92
  def setup_queue(self):
93
    self.jobqueue = jqueue.JobQueue(self.context)
94

  
88 95
  def setup_processors(self):
89 96
    """Spawn the processors threads.
90 97

  
......
140 147
    socket.
141 148

  
142 149
    """
143
    self.server_close()
144
    utils.RemoveFile(constants.MASTER_SOCKET)
145
    for i in range(self.QUEUE_PROCESSOR_SIZE):
146
      self.queue.new_queue.put(None)
147
    for idx, t in enumerate(self.processors):
148
      logging.debug("waiting for processor thread %s...", idx)
149
      t.join()
150
    logging.debug("threads done")
150
    try:
151
      self.server_close()
152
      utils.RemoveFile(constants.MASTER_SOCKET)
153
      for i in range(self.QUEUE_PROCESSOR_SIZE):
154
        self.queue.new_queue.put(None)
155
      for idx, t in enumerate(self.processors):
156
        logging.debug("waiting for processor thread %s...", idx)
157
        t.join()
158
      logging.debug("threads done")
159
    finally:
160
      if self.jobqueue:
161
        self.jobqueue.Shutdown()
151 162

  
152 163

  
153 164
class ClientRqHandler(SocketServer.BaseRequestHandler):
......
419 430

  
420 431
  try:
421 432
    master.setup_processors()
433
    master.setup_queue()
422 434
    try:
423 435
      master.serve_forever()
424 436
    finally:

Also available in: Unified diff