Change method dispatch in ClientOps to enforce luxi.REQ_ALL
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62 from ganeti import ht
63
64
65 CLIENT_REQUEST_WORKERS = 16
66
67 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
68 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
69
70
71 def _LogNewJob(status, info, ops):
72   """Log information about a recently submitted job.
73
74   """
75   op_summary = utils.CommaJoin(op.Summary() for op in ops)
76
77   if status:
78     logging.info("New job with id %s, summary: %s", info, op_summary)
79   else:
80     logging.info("Failed to submit job, reason: '%s', summary: %s",
81                  info, op_summary)
82
83
84 class ClientRequestWorker(workerpool.BaseWorker):
85   # pylint: disable=W0221
86   def RunTask(self, server, message, client):
87     """Process the request.
88
89     """
90     client_ops = ClientOps(server)
91
92     try:
93       (method, args, version) = luxi.ParseRequest(message)
94     except luxi.ProtocolError, err:
95       logging.error("Protocol Error: %s", err)
96       client.close_log()
97       return
98
99     success = False
100     try:
101       # Verify client's version if there was one in the request
102       if version is not None and version != constants.LUXI_VERSION:
103         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
104                                (constants.LUXI_VERSION, version))
105
106       result = client_ops.handle_request(method, args)
107       success = True
108     except errors.GenericError, err:
109       logging.exception("Unexpected exception")
110       success = False
111       result = errors.EncodeException(err)
112     except:
113       logging.exception("Unexpected exception")
114       err = sys.exc_info()
115       result = "Caught exception: %s" % str(err[1])
116
117     try:
118       reply = luxi.FormatResponse(success, result)
119       client.send_message(reply)
120       # awake the main thread so that it can write out the data.
121       server.awaker.signal()
122     except: # pylint: disable=W0702
123       logging.exception("Send error")
124       client.close_log()
125
126
127 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
128   """Handler for master peers.
129
130   """
131   _MAX_UNHANDLED = 1
132
133   def __init__(self, server, connected_socket, client_address, family):
134     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
135                                                  client_address,
136                                                  constants.LUXI_EOM,
137                                                  family, self._MAX_UNHANDLED)
138     self.server = server
139
140   def handle_message(self, message, _):
141     self.server.request_workers.AddTask((self.server, message, self))
142
143
144 class _MasterShutdownCheck:
145   """Logic for master daemon shutdown.
146
147   """
148   #: How long to wait between checks
149   _CHECK_INTERVAL = 5.0
150
151   #: How long to wait after all jobs are done (e.g. to give clients time to
152   #: retrieve the job status)
153   _SHUTDOWN_LINGER = 5.0
154
155   def __init__(self):
156     """Initializes this class.
157
158     """
159     self._had_active_jobs = None
160     self._linger_timeout = None
161
162   def __call__(self, jq_prepare_result):
163     """Determines if master daemon is ready for shutdown.
164
165     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
166     @rtype: None or number
167     @return: None if master daemon is ready, timeout if the check must be
168              repeated
169
170     """
171     if jq_prepare_result:
172       # Check again shortly
173       logging.info("Job queue has been notified for shutdown but is still"
174                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
175       self._had_active_jobs = True
176       return self._CHECK_INTERVAL
177
178     if not self._had_active_jobs:
179       # Can shut down as there were no active jobs on the first check
180       return None
181
182     # No jobs are running anymore, but maybe some clients want to collect some
183     # information. Give them a short amount of time.
184     if self._linger_timeout is None:
185       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
186
187     remaining = self._linger_timeout.Remaining()
188
189     logging.info("Job queue no longer busy; shutting down master daemon"
190                  " in %s seconds", remaining)
191
192     # TODO: Should the master daemon socket be closed at this point? Doing so
193     # wouldn't affect existing connections.
194
195     if remaining < 0:
196       return None
197     else:
198       return remaining
199
200
201 class MasterServer(daemon.AsyncStreamServer):
202   """Master Server.
203
204   This is the main asynchronous master server. It handles connections to the
205   master socket.
206
207   """
208   family = socket.AF_UNIX
209
210   def __init__(self, address, uid, gid):
211     """MasterServer constructor
212
213     @param address: the unix socket address to bind the MasterServer to
214     @param uid: The uid of the owner of the socket
215     @param gid: The gid of the owner of the socket
216
217     """
218     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
219     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
220     os.chmod(temp_name, 0770)
221     os.chown(temp_name, uid, gid)
222     os.rename(temp_name, address)
223
224     self.awaker = daemon.AsyncAwaker()
225
226     # We'll only start threads once we've forked.
227     self.context = None
228     self.request_workers = None
229
230     self._shutdown_check = None
231
232   def handle_connection(self, connected_socket, client_address):
233     # TODO: add connection count and limit the number of open connections to a
234     # maximum number to avoid breaking for lack of file descriptors or memory.
235     MasterClientHandler(self, connected_socket, client_address, self.family)
236
237   def setup_queue(self):
238     self.context = GanetiContext()
239     self.request_workers = workerpool.WorkerPool("ClientReq",
240                                                  CLIENT_REQUEST_WORKERS,
241                                                  ClientRequestWorker)
242
243   def WaitForShutdown(self):
244     """Prepares server for shutdown.
245
246     """
247     if self._shutdown_check is None:
248       self._shutdown_check = _MasterShutdownCheck()
249
250     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251
252   def server_cleanup(self):
253     """Cleanup the server.
254
255     This involves shutting down the processor threads and the master
256     socket.
257
258     """
259     try:
260       self.close()
261     finally:
262       if self.request_workers:
263         self.request_workers.TerminateWorkers()
264       if self.context:
265         self.context.jobqueue.Shutdown()
266
267
268 class ClientOps:
269   """Class holding high-level client operations."""
270   def __init__(self, server):
271     self.server = server
272
273   def handle_request(self, method, args): # pylint: disable=R0911
274     context = self.server.context
275     queue = context.jobqueue
276
277     # TODO: Parameter validation
278     if not isinstance(args, (tuple, list)):
279       logging.info("Received invalid arguments of type '%s'", type(args))
280       raise ValueError("Invalid arguments type '%s'" % type(args))
281
282     if method not in luxi.REQ_ALL:
283       logging.info("Received invalid request '%s'", method)
284       raise ValueError("Invalid operation '%s'" % method)
285
286     # TODO: Rewrite to not exit in each 'if/elif' branch
287
288     if method == luxi.REQ_SUBMIT_JOB:
289       logging.info("Receiving new job")
290       (job_def, ) = args
291       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
292       job_id = queue.SubmitJob(ops)
293       _LogNewJob(True, job_id, ops)
294       return job_id
295
296     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
297       logging.info("Receiving multiple jobs")
298       (job_defs, ) = args
299       jobs = []
300       for ops in job_defs:
301         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
302       job_ids = queue.SubmitManyJobs(jobs)
303       for ((status, job_id), ops) in zip(job_ids, jobs):
304         _LogNewJob(status, job_id, ops)
305       return job_ids
306
307     elif method == luxi.REQ_CANCEL_JOB:
308       (job_id, ) = args
309       logging.info("Received job cancel request for %s", job_id)
310       return queue.CancelJob(job_id)
311
312     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
313       (job_id, priority) = args
314       logging.info("Received request to change priority for job %s to %s",
315                    job_id, priority)
316       return queue.ChangeJobPriority(job_id, priority)
317
318     elif method == luxi.REQ_ARCHIVE_JOB:
319       (job_id, ) = args
320       logging.info("Received job archive request for %s", job_id)
321       return queue.ArchiveJob(job_id)
322
323     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
324       (age, timeout) = args
325       logging.info("Received job autoarchive request for age %s, timeout %s",
326                    age, timeout)
327       return queue.AutoArchiveJobs(age, timeout)
328
329     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
330       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
331       logging.info("Received job poll request for %s", job_id)
332       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
333                                      prev_log_serial, timeout)
334
335     elif method == luxi.REQ_QUERY:
336       (what, fields, qfilter) = args
337
338       if what in constants.QR_VIA_OP:
339         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
340                                              qfilter=qfilter))
341       elif what == constants.QR_LOCK:
342         if qfilter is not None:
343           raise errors.OpPrereqError("Lock queries can't be filtered",
344                                      errors.ECODE_INVAL)
345         return context.glm.QueryLocks(fields)
346       elif what == constants.QR_JOB:
347         return queue.QueryJobs(fields, qfilter)
348       elif what in constants.QR_VIA_LUXI:
349         raise NotImplementedError
350       else:
351         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
352                                    errors.ECODE_INVAL)
353
354       return result
355
356     elif method == luxi.REQ_QUERY_FIELDS:
357       (what, fields) = args
358       req = objects.QueryFieldsRequest(what=what, fields=fields)
359
360       try:
361         fielddefs = query.ALL_FIELDS[req.what]
362       except KeyError:
363         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
364                                    errors.ECODE_INVAL)
365
366       return query.QueryFields(fielddefs, req.fields)
367
368     elif method == luxi.REQ_QUERY_JOBS:
369       (job_ids, fields) = args
370       if isinstance(job_ids, (tuple, list)) and job_ids:
371         msg = utils.CommaJoin(job_ids)
372       else:
373         msg = str(job_ids)
374       logging.info("Received job query request for %s", msg)
375       return queue.OldStyleQueryJobs(job_ids, fields)
376
377     elif method == luxi.REQ_QUERY_INSTANCES:
378       (names, fields, use_locking) = args
379       logging.info("Received instance query request for %s", names)
380       if use_locking:
381         raise errors.OpPrereqError("Sync queries are not allowed",
382                                    errors.ECODE_INVAL)
383       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
384                                    use_locking=use_locking)
385       return self._Query(op)
386
387     elif method == luxi.REQ_QUERY_NODES:
388       (names, fields, use_locking) = args
389       logging.info("Received node query request for %s", names)
390       if use_locking:
391         raise errors.OpPrereqError("Sync queries are not allowed",
392                                    errors.ECODE_INVAL)
393       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
394                                use_locking=use_locking)
395       return self._Query(op)
396
397     elif method == luxi.REQ_QUERY_GROUPS:
398       (names, fields, use_locking) = args
399       logging.info("Received group query request for %s", names)
400       if use_locking:
401         raise errors.OpPrereqError("Sync queries are not allowed",
402                                    errors.ECODE_INVAL)
403       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
404       return self._Query(op)
405
406     elif method == luxi.REQ_QUERY_NETWORKS:
407       (names, fields, use_locking) = args
408       logging.info("Received network query request for %s", names)
409       if use_locking:
410         raise errors.OpPrereqError("Sync queries are not allowed",
411                                    errors.ECODE_INVAL)
412       op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
413       return self._Query(op)
414
415     elif method == luxi.REQ_QUERY_EXPORTS:
416       (nodes, use_locking) = args
417       if use_locking:
418         raise errors.OpPrereqError("Sync queries are not allowed",
419                                    errors.ECODE_INVAL)
420       logging.info("Received exports query request")
421       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
422       return self._Query(op)
423
424     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
425       (fields, ) = args
426       logging.info("Received config values query request for %s", fields)
427       op = opcodes.OpClusterConfigQuery(output_fields=fields)
428       return self._Query(op)
429
430     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
431       logging.info("Received cluster info query request")
432       op = opcodes.OpClusterQuery()
433       return self._Query(op)
434
435     elif method == luxi.REQ_QUERY_TAGS:
436       (kind, name) = args
437       logging.info("Received tags query request")
438       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
439       return self._Query(op)
440
441     elif method == luxi.REQ_SET_DRAIN_FLAG:
442       (drain_flag, ) = args
443       logging.info("Received queue drain flag change request to %s",
444                    drain_flag)
445       return queue.SetDrainFlag(drain_flag)
446
447     elif method == luxi.REQ_SET_WATCHER_PAUSE:
448       (until, ) = args
449
450       return _SetWatcherPause(context, until)
451
452     else:
453       logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
454       raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
455                                    " but not implemented" % method)
456
457   def _Query(self, op):
458     """Runs the specified opcode and returns the result.
459
460     """
461     # Queries don't have a job id
462     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
463
464     # TODO: Executing an opcode using locks will acquire them in blocking mode.
465     # Consider using a timeout for retries.
466     return proc.ExecOpCode(op, None)
467
468
469 class GanetiContext(object):
470   """Context common to all ganeti threads.
471
472   This class creates and holds common objects shared by all threads.
473
474   """
475   # pylint: disable=W0212
476   # we do want to ensure a singleton here
477   _instance = None
478
479   def __init__(self):
480     """Constructs a new GanetiContext object.
481
482     There should be only a GanetiContext object at any time, so this
483     function raises an error if this is not the case.
484
485     """
486     assert self.__class__._instance is None, "double GanetiContext instance"
487
488     # Create global configuration object
489     self.cfg = config.ConfigWriter()
490
491     # Locking manager
492     self.glm = locking.GanetiLockManager(
493       self.cfg.GetNodeList(),
494       self.cfg.GetNodeGroupList(),
495       self.cfg.GetInstanceList(),
496       self.cfg.GetNetworkList())
497
498     self.cfg.SetContext(self)
499
500     # RPC runner
501     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
502
503     # Job queue
504     self.jobqueue = jqueue.JobQueue(self)
505
506     # setting this also locks the class against attribute modifications
507     self.__class__._instance = self
508
509   def __setattr__(self, name, value):
510     """Setting GanetiContext attributes is forbidden after initialization.
511
512     """
513     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
514     object.__setattr__(self, name, value)
515
516   def AddNode(self, node, ec_id):
517     """Adds a node to the configuration and lock manager.
518
519     """
520     # Add it to the configuration
521     self.cfg.AddNode(node, ec_id)
522
523     # If preseeding fails it'll not be added
524     self.jobqueue.AddNode(node)
525
526     # Add the new node to the Ganeti Lock Manager
527     self.glm.add(locking.LEVEL_NODE, node.name)
528     self.glm.add(locking.LEVEL_NODE_RES, node.name)
529
530   def ReaddNode(self, node):
531     """Updates a node that's already in the configuration
532
533     """
534     # Synchronize the queue again
535     self.jobqueue.AddNode(node)
536
537   def RemoveNode(self, name):
538     """Removes a node from the configuration and lock manager.
539
540     """
541     # Remove node from configuration
542     self.cfg.RemoveNode(name)
543
544     # Notify job queue
545     self.jobqueue.RemoveNode(name)
546
547     # Remove the node from the Ganeti Lock Manager
548     self.glm.remove(locking.LEVEL_NODE, name)
549     self.glm.remove(locking.LEVEL_NODE_RES, name)
550
551
552 def _SetWatcherPause(context, until):
553   """Creates or removes the watcher pause file.
554
555   @type context: L{GanetiContext}
556   @param context: Global Ganeti context
557   @type until: None or int
558   @param until: Unix timestamp saying until when the watcher shouldn't run
559
560   """
561   node_names = context.cfg.GetNodeList()
562
563   if until is None:
564     logging.info("Received request to no longer pause watcher")
565   else:
566     if not ht.TNumber(until):
567       raise TypeError("Duration must be numeric")
568
569     if until < time.time():
570       raise errors.GenericError("Unable to set pause end time in the past")
571
572     logging.info("Received request to pause watcher until %s", until)
573
574   result = context.rpc.call_set_watcher_pause(node_names, until)
575
576   errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
577                            for (node_name, nres) in result.items()
578                            if nres.fail_msg and not nres.offline)
579   if errmsg:
580     raise errors.OpExecError("Watcher pause was set where possible, but failed"
581                              " on the following node(s): %s" % errmsg)
582
583   return until
584
585
586 @rpc.RunWithRPC
587 def CheckAgreement():
588   """Check the agreement on who is the master.
589
590   The function uses a very simple algorithm: we must get more positive
591   than negative answers. Since in most of the cases we are the master,
592   we'll use our own config file for getting the node list. In the
593   future we could collect the current node list from our (possibly
594   obsolete) known nodes.
595
596   In order to account for cold-start of all nodes, we retry for up to
597   a minute until we get a real answer as the top-voted one. If the
598   nodes are more out-of-sync, for now manual startup of the master
599   should be attempted.
600
601   Note that for a even number of nodes cluster, we need at least half
602   of the nodes (beside ourselves) to vote for us. This creates a
603   problem on two-node clusters, since in this case we require the
604   other node to be up too to confirm our status.
605
606   """
607   myself = netutils.Hostname.GetSysName()
608   #temp instantiation of a config writer, used only to get the node list
609   cfg = config.ConfigWriter()
610   node_list = cfg.GetNodeList()
611   del cfg
612   retries = 6
613   while retries > 0:
614     votes = bootstrap.GatherMasterVotes(node_list)
615     if not votes:
616       # empty node list, this is a one node cluster
617       return True
618     if votes[0][0] is None:
619       retries -= 1
620       time.sleep(10)
621       continue
622     break
623   if retries == 0:
624     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
625                      " after multiple retries. Aborting startup")
626     logging.critical("Use the --no-voting option if you understand what"
627                      " effects it has on the cluster state")
628     return False
629   # here a real node is at the top of the list
630   all_votes = sum(item[1] for item in votes)
631   top_node, top_votes = votes[0]
632
633   result = False
634   if top_node != myself:
635     logging.critical("It seems we are not the master (top-voted node"
636                      " is %s with %d out of %d votes)", top_node, top_votes,
637                      all_votes)
638   elif top_votes < all_votes - top_votes:
639     logging.critical("It seems we are not the master (%d votes for,"
640                      " %d votes against)", top_votes, all_votes - top_votes)
641   else:
642     result = True
643
644   return result
645
646
647 @rpc.RunWithRPC
648 def ActivateMasterIP():
649   # activate ip
650   cfg = config.ConfigWriter()
651   master_params = cfg.GetMasterNetworkParameters()
652   ems = cfg.GetUseExternalMipScript()
653   runner = rpc.BootstrapRunner()
654   result = runner.call_node_activate_master_ip(master_params.name,
655                                                master_params, ems)
656
657   msg = result.fail_msg
658   if msg:
659     logging.error("Can't activate master IP address: %s", msg)
660
661
662 def CheckMasterd(options, args):
663   """Initial checks whether to run or exit with a failure.
664
665   """
666   if args: # masterd doesn't take any arguments
667     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
668     sys.exit(constants.EXIT_FAILURE)
669
670   ssconf.CheckMaster(options.debug)
671
672   try:
673     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
674     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
675   except KeyError:
676     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
677                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
678     sys.exit(constants.EXIT_FAILURE)
679
680   # Determine static runtime architecture information
681   runtime.InitArchInfo()
682
683   # Check the configuration is sane before anything else
684   try:
685     config.ConfigWriter()
686   except errors.ConfigVersionMismatch, err:
687     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
688     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
689     print >> sys.stderr,  \
690         ("Configuration version mismatch. The current Ganeti software"
691          " expects version %s, but the on-disk configuration file has"
692          " version %s. This is likely the result of upgrading the"
693          " software without running the upgrade procedure. Please contact"
694          " your cluster administrator or complete the upgrade using the"
695          " cfgupgrade utility, after reading the upgrade notes." %
696          (v1, v2))
697     sys.exit(constants.EXIT_FAILURE)
698   except errors.ConfigurationError, err:
699     print >> sys.stderr, \
700         ("Configuration error while opening the configuration file: %s\n"
701          "This might be caused by an incomplete software upgrade or"
702          " by a corrupted configuration file. Until the problem is fixed"
703          " the master daemon cannot start." % str(err))
704     sys.exit(constants.EXIT_FAILURE)
705
706   # If CheckMaster didn't fail we believe we are the master, but we have to
707   # confirm with the other nodes.
708   if options.no_voting:
709     if not options.yes_do_it:
710       sys.stdout.write("The 'no voting' option has been selected.\n")
711       sys.stdout.write("This is dangerous, please confirm by"
712                        " typing uppercase 'yes': ")
713       sys.stdout.flush()
714
715       confirmation = sys.stdin.readline().strip()
716       if confirmation != "YES":
717         print >> sys.stderr, "Aborting."
718         sys.exit(constants.EXIT_FAILURE)
719
720   else:
721     # CheckAgreement uses RPC and threads, hence it needs to be run in
722     # a separate process before we call utils.Daemonize in the current
723     # process.
724     if not utils.RunInSeparateProcess(CheckAgreement):
725       sys.exit(constants.EXIT_FAILURE)
726
727   # ActivateMasterIP also uses RPC/threads, so we run it again via a
728   # separate process.
729
730   # TODO: decide whether failure to activate the master IP is a fatal error
731   utils.RunInSeparateProcess(ActivateMasterIP)
732
733
734 def PrepMasterd(options, _):
735   """Prep master daemon function, executed with the PID file held.
736
737   """
738   # This is safe to do as the pid file guarantees against
739   # concurrent execution.
740   utils.RemoveFile(pathutils.MASTER_SOCKET)
741
742   mainloop = daemon.Mainloop()
743   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
744   return (mainloop, master)
745
746
747 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
748   """Main master daemon function, executed with the PID file held.
749
750   """
751   (mainloop, master) = prep_data
752   try:
753     rpc.Init()
754     try:
755       master.setup_queue()
756       try:
757         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
758       finally:
759         master.server_cleanup()
760     finally:
761       rpc.Shutdown()
762   finally:
763     utils.RemoveFile(pathutils.MASTER_SOCKET)
764
765   logging.info("Clean master daemon shutdown")
766
767
768 def Main():
769   """Main function"""
770   parser = OptionParser(description="Ganeti master daemon",
771                         usage="%prog [-f] [-d]",
772                         version="%%prog (ganeti) %s" %
773                         constants.RELEASE_VERSION)
774   parser.add_option("--no-voting", dest="no_voting",
775                     help="Do not check that the nodes agree on this node"
776                     " being the master and start the daemon unconditionally",
777                     default=False, action="store_true")
778   parser.add_option("--yes-do-it", dest="yes_do_it",
779                     help="Override interactive check for --no-voting",
780                     default=False, action="store_true")
781   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
782                      ExecMasterd, multithreaded=True)