Revision c1f2901b

b/daemons/ganeti-masterd
27 27
"""
28 28

  
29 29

  
30
import sys
30 31
import SocketServer
31 32
import threading
32 33
import time
......
38 39

  
39 40

  
40 41
from cStringIO import StringIO
42
from optparse import OptionParser
41 43

  
42 44
from ganeti import constants
43 45
from ganeti import mcpu
......
45 47
from ganeti import jqueue
46 48
from ganeti import luxi
47 49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52

  
53

  
54
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
48 56

  
49 57

  
50 58
class IOServer(SocketServer.UnixStreamServer):
......
62 70
    self.do_quit = False
63 71
    self.queue = jqueue.QueueManager()
64 72
    self.processors = []
73
    signal.signal(signal.SIGINT, self.handle_quit_signals)
74
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
75

  
76
  def setup_processors(self):
77
    """Spawn the processors threads.
78

  
79
    This initializes the queue and the thread processors. It is done
80
    separately from the constructor because we want the clone()
81
    syscalls to happen after the daemonize part.
82

  
83
    """
65 84
    for i in range(self.QUEUE_PROCESSOR_SIZE):
66 85
      self.processors.append(threading.Thread(target=PoolWorker,
67 86
                                              args=(i, self.queue.new_queue)))
68 87
    for t in self.processors:
69 88
      t.start()
70
    signal.signal(signal.SIGINT, self.handle_sigint)
71 89

  
72 90
  def process_request_thread(self, request, client_address):
73 91
    """Process the request.
......
92 110
                         args=(request, client_address))
93 111
    t.start()
94 112

  
95
  def handle_sigint(self, signum, frame):
113
  def handle_quit_signals(self, signum, frame):
96 114
    print "received %s in %s" % (signum, frame)
97 115
    self.do_quit = True
98
    self.server_close()
99
    for i in range(self.QUEUE_PROCESSOR_SIZE):
100
      self.queue.new_queue.put(None)
101 116

  
102 117
  def serve_forever(self):
103 118
    """Handle one request at a time until told to quit."""
104 119
    while not self.do_quit:
105 120
      self.handle_request()
121
      print "served request, quit=%s" % (self.do_quit)
122

  
123
  def server_cleanup(self):
124
    """Cleanup the server.
125

  
126
    This involves shutting down the processor threads and the master
127
    socket.
128

  
129
    """
130
    self.server_close()
131
    utils.RemoveFile(constants.MASTER_SOCKET)
132
    for i in range(self.QUEUE_PROCESSOR_SIZE):
133
      self.queue.new_queue.put(None)
134
    for idx, t in enumerate(self.processors):
135
      print "waiting for processor thread %s..." % idx
136
      t.join()
137
    print "done threads"
106 138

  
107 139

  
108 140
class ClientRqHandler(SocketServer.BaseRequestHandler):
......
235 267
  print "worker %s exiting" % worker_id
236 268

  
237 269

  
270
def CheckMaster(debug):
271
  """Checks the node setup.
272

  
273
  If this is the master, the function will return. Otherwise it will
274
  exit with an exit code based on the node status.
275

  
276
  """
277
  try:
278
    ss = ssconf.SimpleStore()
279
    master_name = ss.GetMasterNode()
280
  except errors.ConfigurationError, err:
281
    print "Cluster configuration incomplete: '%s'" % str(err)
282
    sys.exit(EXIT_NODESETUP_ERROR)
283

  
284
  try:
285
    myself = utils.HostInfo()
286
  except errors.ResolverError, err:
287
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
288
    sys.exit(EXIT_NODESETUP_ERROR)
289

  
290
  if myself.name != master_name:
291
    if debug:
292
      sys.stderr.write("Not master, exiting.\n")
293
    sys.exit(EXIT_NOTMASTER)
294

  
295

  
296
def ParseOptions():
297
  """Parse the command line options.
298

  
299
  Returns:
300
    (options, args) as from OptionParser.parse_args()
301

  
302
  """
303
  parser = OptionParser(description="Ganeti master daemon",
304
                        usage="%prog [-f] [-d]",
305
                        version="%%prog (ganeti) %s" %
306
                        constants.RELEASE_VERSION)
307

  
308
  parser.add_option("-f", "--foreground", dest="fork",
309
                    help="Don't detach from the current terminal",
310
                    default=True, action="store_false")
311
  parser.add_option("-d", "--debug", dest="debug",
312
                    help="Enable some debug messages",
313
                    default=False, action="store_true")
314
  options, args = parser.parse_args()
315
  return options, args
316

  
317

  
238 318
def main():
239 319
  """Main function"""
240 320

  
321
  options, args = ParseOptions()
322
  utils.debug = options.debug
323

  
324
  CheckMaster(options.debug)
325

  
241 326
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
242
  master.serve_forever()
243 327

  
328
  # become a daemon
329
  if options.fork:
330
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
331
                    noclose_fds=[master.fileno()])
332

  
333
  master.setup_processors()
334
  try:
335
    master.serve_forever()
336
  finally:
337
    master.server_cleanup()
244 338

  
245 339
if __name__ == "__main__":
246 340
  main()
b/lib/constants.py
54 54
LOG_OS_DIR = LOG_DIR + "/os"
55 55
LOG_NODESERVER = LOG_DIR + "/node-daemon.log"
56 56
LOG_WATCHER = LOG_DIR + "/watcher.log"
57
LOG_MASTERDAEMON = LOG_DIR + "/master-daemon.log"
57 58

  
58 59
OS_SEARCH_PATH = _autoconf.OS_SEARCH_PATH
59 60
EXPORT_DIR = _autoconf.EXPORT_DIR

Also available in: Unified diff