Add function for checking file access permissions
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62 from ganeti import ht
63
64
65 CLIENT_REQUEST_WORKERS = 16
66
67 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
68 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
69
70
71 def _LogNewJob(status, info, ops):
72   """Log information about a recently submitted job.
73
74   """
75   op_summary = utils.CommaJoin(op.Summary() for op in ops)
76
77   if status:
78     logging.info("New job with id %s, summary: %s", info, op_summary)
79   else:
80     logging.info("Failed to submit job, reason: '%s', summary: %s",
81                  info, op_summary)
82
83
84 class ClientRequestWorker(workerpool.BaseWorker):
85   # pylint: disable=W0221
86   def RunTask(self, server, message, client):
87     """Process the request.
88
89     """
90     client_ops = ClientOps(server)
91
92     try:
93       (method, args, version) = luxi.ParseRequest(message)
94     except luxi.ProtocolError, err:
95       logging.error("Protocol Error: %s", err)
96       client.close_log()
97       return
98
99     success = False
100     try:
101       # Verify client's version if there was one in the request
102       if version is not None and version != constants.LUXI_VERSION:
103         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
104                                (constants.LUXI_VERSION, version))
105
106       result = client_ops.handle_request(method, args)
107       success = True
108     except errors.GenericError, err:
109       logging.exception("Unexpected exception")
110       success = False
111       result = errors.EncodeException(err)
112     except:
113       logging.exception("Unexpected exception")
114       err = sys.exc_info()
115       result = "Caught exception: %s" % str(err[1])
116
117     try:
118       reply = luxi.FormatResponse(success, result)
119       client.send_message(reply)
120       # awake the main thread so that it can write out the data.
121       server.awaker.signal()
122     except: # pylint: disable=W0702
123       logging.exception("Send error")
124       client.close_log()
125
126
127 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
128   """Handler for master peers.
129
130   """
131   _MAX_UNHANDLED = 1
132
133   def __init__(self, server, connected_socket, client_address, family):
134     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
135                                                  client_address,
136                                                  constants.LUXI_EOM,
137                                                  family, self._MAX_UNHANDLED)
138     self.server = server
139
140   def handle_message(self, message, _):
141     self.server.request_workers.AddTask((self.server, message, self))
142
143
144 class _MasterShutdownCheck:
145   """Logic for master daemon shutdown.
146
147   """
148   #: How long to wait between checks
149   _CHECK_INTERVAL = 5.0
150
151   #: How long to wait after all jobs are done (e.g. to give clients time to
152   #: retrieve the job status)
153   _SHUTDOWN_LINGER = 5.0
154
155   def __init__(self):
156     """Initializes this class.
157
158     """
159     self._had_active_jobs = None
160     self._linger_timeout = None
161
162   def __call__(self, jq_prepare_result):
163     """Determines if master daemon is ready for shutdown.
164
165     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
166     @rtype: None or number
167     @return: None if master daemon is ready, timeout if the check must be
168              repeated
169
170     """
171     if jq_prepare_result:
172       # Check again shortly
173       logging.info("Job queue has been notified for shutdown but is still"
174                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
175       self._had_active_jobs = True
176       return self._CHECK_INTERVAL
177
178     if not self._had_active_jobs:
179       # Can shut down as there were no active jobs on the first check
180       return None
181
182     # No jobs are running anymore, but maybe some clients want to collect some
183     # information. Give them a short amount of time.
184     if self._linger_timeout is None:
185       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
186
187     remaining = self._linger_timeout.Remaining()
188
189     logging.info("Job queue no longer busy; shutting down master daemon"
190                  " in %s seconds", remaining)
191
192     # TODO: Should the master daemon socket be closed at this point? Doing so
193     # wouldn't affect existing connections.
194
195     if remaining < 0:
196       return None
197     else:
198       return remaining
199
200
201 class MasterServer(daemon.AsyncStreamServer):
202   """Master Server.
203
204   This is the main asynchronous master server. It handles connections to the
205   master socket.
206
207   """
208   family = socket.AF_UNIX
209
210   def __init__(self, address, uid, gid):
211     """MasterServer constructor
212
213     @param address: the unix socket address to bind the MasterServer to
214     @param uid: The uid of the owner of the socket
215     @param gid: The gid of the owner of the socket
216
217     """
218     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
219     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
220     os.chmod(temp_name, 0770)
221     os.chown(temp_name, uid, gid)
222     os.rename(temp_name, address)
223
224     self.awaker = daemon.AsyncAwaker()
225
226     # We'll only start threads once we've forked.
227     self.context = None
228     self.request_workers = None
229
230     self._shutdown_check = None
231
232   def handle_connection(self, connected_socket, client_address):
233     # TODO: add connection count and limit the number of open connections to a
234     # maximum number to avoid breaking for lack of file descriptors or memory.
235     MasterClientHandler(self, connected_socket, client_address, self.family)
236
237   def setup_queue(self):
238     self.context = GanetiContext()
239     self.request_workers = workerpool.WorkerPool("ClientReq",
240                                                  CLIENT_REQUEST_WORKERS,
241                                                  ClientRequestWorker)
242
243   def WaitForShutdown(self):
244     """Prepares server for shutdown.
245
246     """
247     if self._shutdown_check is None:
248       self._shutdown_check = _MasterShutdownCheck()
249
250     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251
252   def server_cleanup(self):
253     """Cleanup the server.
254
255     This involves shutting down the processor threads and the master
256     socket.
257
258     """
259     try:
260       self.close()
261     finally:
262       if self.request_workers:
263         self.request_workers.TerminateWorkers()
264       if self.context:
265         self.context.jobqueue.Shutdown()
266
267
268 class ClientOps:
269   """Class holding high-level client operations."""
270   def __init__(self, server):
271     self.server = server
272
273   def handle_request(self, method, args): # pylint: disable=R0911
274     context = self.server.context
275     queue = context.jobqueue
276
277     # TODO: Parameter validation
278     if not isinstance(args, (tuple, list)):
279       logging.info("Received invalid arguments of type '%s'", type(args))
280       raise ValueError("Invalid arguments type '%s'" % type(args))
281
282     # TODO: Rewrite to not exit in each 'if/elif' branch
283
284     if method == luxi.REQ_SUBMIT_JOB:
285       logging.info("Receiving new job")
286       (job_def, ) = args
287       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
288       job_id = queue.SubmitJob(ops)
289       _LogNewJob(True, job_id, ops)
290       return job_id
291
292     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
293       logging.info("Receiving multiple jobs")
294       (job_defs, ) = args
295       jobs = []
296       for ops in job_defs:
297         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
298       job_ids = queue.SubmitManyJobs(jobs)
299       for ((status, job_id), ops) in zip(job_ids, jobs):
300         _LogNewJob(status, job_id, ops)
301       return job_ids
302
303     elif method == luxi.REQ_CANCEL_JOB:
304       (job_id, ) = args
305       logging.info("Received job cancel request for %s", job_id)
306       return queue.CancelJob(job_id)
307
308     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
309       (job_id, priority) = args
310       logging.info("Received request to change priority for job %s to %s",
311                    job_id, priority)
312       return queue.ChangeJobPriority(job_id, priority)
313
314     elif method == luxi.REQ_ARCHIVE_JOB:
315       (job_id, ) = args
316       logging.info("Received job archive request for %s", job_id)
317       return queue.ArchiveJob(job_id)
318
319     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
320       (age, timeout) = args
321       logging.info("Received job autoarchive request for age %s, timeout %s",
322                    age, timeout)
323       return queue.AutoArchiveJobs(age, timeout)
324
325     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
326       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
327       logging.info("Received job poll request for %s", job_id)
328       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
329                                      prev_log_serial, timeout)
330
331     elif method == luxi.REQ_QUERY:
332       (what, fields, qfilter) = args
333
334       if what in constants.QR_VIA_OP:
335         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
336                                              qfilter=qfilter))
337       elif what == constants.QR_LOCK:
338         if qfilter is not None:
339           raise errors.OpPrereqError("Lock queries can't be filtered",
340                                      errors.ECODE_INVAL)
341         return context.glm.QueryLocks(fields)
342       elif what == constants.QR_JOB:
343         return queue.QueryJobs(fields, qfilter)
344       elif what in constants.QR_VIA_LUXI:
345         raise NotImplementedError
346       else:
347         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
348                                    errors.ECODE_INVAL)
349
350       return result
351
352     elif method == luxi.REQ_QUERY_FIELDS:
353       (what, fields) = args
354       req = objects.QueryFieldsRequest(what=what, fields=fields)
355
356       try:
357         fielddefs = query.ALL_FIELDS[req.what]
358       except KeyError:
359         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
360                                    errors.ECODE_INVAL)
361
362       return query.QueryFields(fielddefs, req.fields)
363
364     elif method == luxi.REQ_QUERY_JOBS:
365       (job_ids, fields) = args
366       if isinstance(job_ids, (tuple, list)) and job_ids:
367         msg = utils.CommaJoin(job_ids)
368       else:
369         msg = str(job_ids)
370       logging.info("Received job query request for %s", msg)
371       return queue.OldStyleQueryJobs(job_ids, fields)
372
373     elif method == luxi.REQ_QUERY_INSTANCES:
374       (names, fields, use_locking) = args
375       logging.info("Received instance query request for %s", names)
376       if use_locking:
377         raise errors.OpPrereqError("Sync queries are not allowed",
378                                    errors.ECODE_INVAL)
379       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
380                                    use_locking=use_locking)
381       return self._Query(op)
382
383     elif method == luxi.REQ_QUERY_NODES:
384       (names, fields, use_locking) = args
385       logging.info("Received node query request for %s", names)
386       if use_locking:
387         raise errors.OpPrereqError("Sync queries are not allowed",
388                                    errors.ECODE_INVAL)
389       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
390                                use_locking=use_locking)
391       return self._Query(op)
392
393     elif method == luxi.REQ_QUERY_GROUPS:
394       (names, fields, use_locking) = args
395       logging.info("Received group query request for %s", names)
396       if use_locking:
397         raise errors.OpPrereqError("Sync queries are not allowed",
398                                    errors.ECODE_INVAL)
399       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
400       return self._Query(op)
401
402     elif method == luxi.REQ_QUERY_NETWORKS:
403       (names, fields, use_locking) = args
404       logging.info("Received network query request for %s", names)
405       if use_locking:
406         raise errors.OpPrereqError("Sync queries are not allowed",
407                                    errors.ECODE_INVAL)
408       op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
409       return self._Query(op)
410
411     elif method == luxi.REQ_QUERY_EXPORTS:
412       (nodes, use_locking) = args
413       if use_locking:
414         raise errors.OpPrereqError("Sync queries are not allowed",
415                                    errors.ECODE_INVAL)
416       logging.info("Received exports query request")
417       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
418       return self._Query(op)
419
420     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
421       (fields, ) = args
422       logging.info("Received config values query request for %s", fields)
423       op = opcodes.OpClusterConfigQuery(output_fields=fields)
424       return self._Query(op)
425
426     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
427       logging.info("Received cluster info query request")
428       op = opcodes.OpClusterQuery()
429       return self._Query(op)
430
431     elif method == luxi.REQ_QUERY_TAGS:
432       (kind, name) = args
433       logging.info("Received tags query request")
434       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
435       return self._Query(op)
436
437     elif method == luxi.REQ_SET_DRAIN_FLAG:
438       (drain_flag, ) = args
439       logging.info("Received queue drain flag change request to %s",
440                    drain_flag)
441       return queue.SetDrainFlag(drain_flag)
442
443     elif method == luxi.REQ_SET_WATCHER_PAUSE:
444       (until, ) = args
445
446       return _SetWatcherPause(context, until)
447
448     else:
449       logging.info("Received invalid request '%s'", method)
450       raise ValueError("Invalid operation '%s'" % method)
451
452   def _Query(self, op):
453     """Runs the specified opcode and returns the result.
454
455     """
456     # Queries don't have a job id
457     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
458
459     # TODO: Executing an opcode using locks will acquire them in blocking mode.
460     # Consider using a timeout for retries.
461     return proc.ExecOpCode(op, None)
462
463
464 class GanetiContext(object):
465   """Context common to all ganeti threads.
466
467   This class creates and holds common objects shared by all threads.
468
469   """
470   # pylint: disable=W0212
471   # we do want to ensure a singleton here
472   _instance = None
473
474   def __init__(self):
475     """Constructs a new GanetiContext object.
476
477     There should be only a GanetiContext object at any time, so this
478     function raises an error if this is not the case.
479
480     """
481     assert self.__class__._instance is None, "double GanetiContext instance"
482
483     # Create global configuration object
484     self.cfg = config.ConfigWriter()
485
486     # Locking manager
487     self.glm = locking.GanetiLockManager(
488       self.cfg.GetNodeList(),
489       self.cfg.GetNodeGroupList(),
490       self.cfg.GetInstanceList(),
491       self.cfg.GetNetworkList())
492
493     self.cfg.SetContext(self)
494
495     # RPC runner
496     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
497
498     # Job queue
499     self.jobqueue = jqueue.JobQueue(self)
500
501     # setting this also locks the class against attribute modifications
502     self.__class__._instance = self
503
504   def __setattr__(self, name, value):
505     """Setting GanetiContext attributes is forbidden after initialization.
506
507     """
508     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
509     object.__setattr__(self, name, value)
510
511   def AddNode(self, node, ec_id):
512     """Adds a node to the configuration and lock manager.
513
514     """
515     # Add it to the configuration
516     self.cfg.AddNode(node, ec_id)
517
518     # If preseeding fails it'll not be added
519     self.jobqueue.AddNode(node)
520
521     # Add the new node to the Ganeti Lock Manager
522     self.glm.add(locking.LEVEL_NODE, node.name)
523     self.glm.add(locking.LEVEL_NODE_RES, node.name)
524
525   def ReaddNode(self, node):
526     """Updates a node that's already in the configuration
527
528     """
529     # Synchronize the queue again
530     self.jobqueue.AddNode(node)
531
532   def RemoveNode(self, name):
533     """Removes a node from the configuration and lock manager.
534
535     """
536     # Remove node from configuration
537     self.cfg.RemoveNode(name)
538
539     # Notify job queue
540     self.jobqueue.RemoveNode(name)
541
542     # Remove the node from the Ganeti Lock Manager
543     self.glm.remove(locking.LEVEL_NODE, name)
544     self.glm.remove(locking.LEVEL_NODE_RES, name)
545
546
547 def _SetWatcherPause(context, until):
548   """Creates or removes the watcher pause file.
549
550   @type context: L{GanetiContext}
551   @param context: Global Ganeti context
552   @type until: None or int
553   @param until: Unix timestamp saying until when the watcher shouldn't run
554
555   """
556   node_names = context.cfg.GetNodeList()
557
558   if until is None:
559     logging.info("Received request to no longer pause watcher")
560   else:
561     if not ht.TNumber(until):
562       raise TypeError("Duration must be numeric")
563
564     if until < time.time():
565       raise errors.GenericError("Unable to set pause end time in the past")
566
567     logging.info("Received request to pause watcher until %s", until)
568
569   result = context.rpc.call_set_watcher_pause(node_names, until)
570
571   errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
572                            for (node_name, nres) in result.items()
573                            if nres.fail_msg and not nres.offline)
574   if errmsg:
575     raise errors.OpExecError("Watcher pause was set where possible, but failed"
576                              " on the following node(s): %s" % errmsg)
577
578   return until
579
580
581 @rpc.RunWithRPC
582 def CheckAgreement():
583   """Check the agreement on who is the master.
584
585   The function uses a very simple algorithm: we must get more positive
586   than negative answers. Since in most of the cases we are the master,
587   we'll use our own config file for getting the node list. In the
588   future we could collect the current node list from our (possibly
589   obsolete) known nodes.
590
591   In order to account for cold-start of all nodes, we retry for up to
592   a minute until we get a real answer as the top-voted one. If the
593   nodes are more out-of-sync, for now manual startup of the master
594   should be attempted.
595
596   Note that for a even number of nodes cluster, we need at least half
597   of the nodes (beside ourselves) to vote for us. This creates a
598   problem on two-node clusters, since in this case we require the
599   other node to be up too to confirm our status.
600
601   """
602   myself = netutils.Hostname.GetSysName()
603   #temp instantiation of a config writer, used only to get the node list
604   cfg = config.ConfigWriter()
605   node_list = cfg.GetNodeList()
606   del cfg
607   retries = 6
608   while retries > 0:
609     votes = bootstrap.GatherMasterVotes(node_list)
610     if not votes:
611       # empty node list, this is a one node cluster
612       return True
613     if votes[0][0] is None:
614       retries -= 1
615       time.sleep(10)
616       continue
617     break
618   if retries == 0:
619     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
620                      " after multiple retries. Aborting startup")
621     logging.critical("Use the --no-voting option if you understand what"
622                      " effects it has on the cluster state")
623     return False
624   # here a real node is at the top of the list
625   all_votes = sum(item[1] for item in votes)
626   top_node, top_votes = votes[0]
627
628   result = False
629   if top_node != myself:
630     logging.critical("It seems we are not the master (top-voted node"
631                      " is %s with %d out of %d votes)", top_node, top_votes,
632                      all_votes)
633   elif top_votes < all_votes - top_votes:
634     logging.critical("It seems we are not the master (%d votes for,"
635                      " %d votes against)", top_votes, all_votes - top_votes)
636   else:
637     result = True
638
639   return result
640
641
642 @rpc.RunWithRPC
643 def ActivateMasterIP():
644   # activate ip
645   cfg = config.ConfigWriter()
646   master_params = cfg.GetMasterNetworkParameters()
647   ems = cfg.GetUseExternalMipScript()
648   runner = rpc.BootstrapRunner()
649   result = runner.call_node_activate_master_ip(master_params.name,
650                                                master_params, ems)
651
652   msg = result.fail_msg
653   if msg:
654     logging.error("Can't activate master IP address: %s", msg)
655
656
657 def CheckMasterd(options, args):
658   """Initial checks whether to run or exit with a failure.
659
660   """
661   if args: # masterd doesn't take any arguments
662     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
663     sys.exit(constants.EXIT_FAILURE)
664
665   ssconf.CheckMaster(options.debug)
666
667   try:
668     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
669     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
670   except KeyError:
671     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
672                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
673     sys.exit(constants.EXIT_FAILURE)
674
675   # Determine static runtime architecture information
676   runtime.InitArchInfo()
677
678   # Check the configuration is sane before anything else
679   try:
680     config.ConfigWriter()
681   except errors.ConfigVersionMismatch, err:
682     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
683     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
684     print >> sys.stderr,  \
685         ("Configuration version mismatch. The current Ganeti software"
686          " expects version %s, but the on-disk configuration file has"
687          " version %s. This is likely the result of upgrading the"
688          " software without running the upgrade procedure. Please contact"
689          " your cluster administrator or complete the upgrade using the"
690          " cfgupgrade utility, after reading the upgrade notes." %
691          (v1, v2))
692     sys.exit(constants.EXIT_FAILURE)
693   except errors.ConfigurationError, err:
694     print >> sys.stderr, \
695         ("Configuration error while opening the configuration file: %s\n"
696          "This might be caused by an incomplete software upgrade or"
697          " by a corrupted configuration file. Until the problem is fixed"
698          " the master daemon cannot start." % str(err))
699     sys.exit(constants.EXIT_FAILURE)
700
701   # If CheckMaster didn't fail we believe we are the master, but we have to
702   # confirm with the other nodes.
703   if options.no_voting:
704     if not options.yes_do_it:
705       sys.stdout.write("The 'no voting' option has been selected.\n")
706       sys.stdout.write("This is dangerous, please confirm by"
707                        " typing uppercase 'yes': ")
708       sys.stdout.flush()
709
710       confirmation = sys.stdin.readline().strip()
711       if confirmation != "YES":
712         print >> sys.stderr, "Aborting."
713         sys.exit(constants.EXIT_FAILURE)
714
715   else:
716     # CheckAgreement uses RPC and threads, hence it needs to be run in
717     # a separate process before we call utils.Daemonize in the current
718     # process.
719     if not utils.RunInSeparateProcess(CheckAgreement):
720       sys.exit(constants.EXIT_FAILURE)
721
722   # ActivateMasterIP also uses RPC/threads, so we run it again via a
723   # separate process.
724
725   # TODO: decide whether failure to activate the master IP is a fatal error
726   utils.RunInSeparateProcess(ActivateMasterIP)
727
728
729 def PrepMasterd(options, _):
730   """Prep master daemon function, executed with the PID file held.
731
732   """
733   # This is safe to do as the pid file guarantees against
734   # concurrent execution.
735   utils.RemoveFile(pathutils.MASTER_SOCKET)
736
737   mainloop = daemon.Mainloop()
738   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
739   return (mainloop, master)
740
741
742 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
743   """Main master daemon function, executed with the PID file held.
744
745   """
746   (mainloop, master) = prep_data
747   try:
748     rpc.Init()
749     try:
750       master.setup_queue()
751       try:
752         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
753       finally:
754         master.server_cleanup()
755     finally:
756       rpc.Shutdown()
757   finally:
758     utils.RemoveFile(pathutils.MASTER_SOCKET)
759
760   logging.info("Clean master daemon shutdown")
761
762
763 def Main():
764   """Main function"""
765   parser = OptionParser(description="Ganeti master daemon",
766                         usage="%prog [-f] [-d]",
767                         version="%%prog (ganeti) %s" %
768                         constants.RELEASE_VERSION)
769   parser.add_option("--no-voting", dest="no_voting",
770                     help="Do not check that the nodes agree on this node"
771                     " being the master and start the daemon unconditionally",
772                     default=False, action="store_true")
773   parser.add_option("--yes-do-it", dest="yes_do_it",
774                     help="Override interactive check for --no-voting",
775                     default=False, action="store_true")
776   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
777                      ExecMasterd, multithreaded=True)