1ef5480f1f1a88342fe767e37bde244487a7ed27
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62
63
64 CLIENT_REQUEST_WORKERS = 16
65
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68
69
70 def _LogNewJob(status, info, ops):
71   """Log information about a recently submitted job.
72
73   """
74   if status:
75     logging.info("New job with id %s, summary: %s",
76                  info, utils.CommaJoin(op.Summary() for op in ops))
77   else:
78     logging.info("Failed to submit job, reason: '%s', summary: %s",
79                  info, utils.CommaJoin(op.Summary() for op in ops))
80
81
82 class ClientRequestWorker(workerpool.BaseWorker):
83   # pylint: disable=W0221
84   def RunTask(self, server, message, client):
85     """Process the request.
86
87     """
88     client_ops = ClientOps(server)
89
90     try:
91       (method, args, version) = luxi.ParseRequest(message)
92     except luxi.ProtocolError, err:
93       logging.error("Protocol Error: %s", err)
94       client.close_log()
95       return
96
97     success = False
98     try:
99       # Verify client's version if there was one in the request
100       if version is not None and version != constants.LUXI_VERSION:
101         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102                                (constants.LUXI_VERSION, version))
103
104       result = client_ops.handle_request(method, args)
105       success = True
106     except errors.GenericError, err:
107       logging.exception("Unexpected exception")
108       success = False
109       result = errors.EncodeException(err)
110     except:
111       logging.exception("Unexpected exception")
112       err = sys.exc_info()
113       result = "Caught exception: %s" % str(err[1])
114
115     try:
116       reply = luxi.FormatResponse(success, result)
117       client.send_message(reply)
118       # awake the main thread so that it can write out the data.
119       server.awaker.signal()
120     except: # pylint: disable=W0702
121       logging.exception("Send error")
122       client.close_log()
123
124
125 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126   """Handler for master peers.
127
128   """
129   _MAX_UNHANDLED = 1
130
131   def __init__(self, server, connected_socket, client_address, family):
132     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
133                                                  client_address,
134                                                  constants.LUXI_EOM,
135                                                  family, self._MAX_UNHANDLED)
136     self.server = server
137
138   def handle_message(self, message, _):
139     self.server.request_workers.AddTask((self.server, message, self))
140
141
142 class _MasterShutdownCheck:
143   """Logic for master daemon shutdown.
144
145   """
146   #: How long to wait between checks
147   _CHECK_INTERVAL = 5.0
148
149   #: How long to wait after all jobs are done (e.g. to give clients time to
150   #: retrieve the job status)
151   _SHUTDOWN_LINGER = 5.0
152
153   def __init__(self):
154     """Initializes this class.
155
156     """
157     self._had_active_jobs = None
158     self._linger_timeout = None
159
160   def __call__(self, jq_prepare_result):
161     """Determines if master daemon is ready for shutdown.
162
163     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164     @rtype: None or number
165     @return: None if master daemon is ready, timeout if the check must be
166              repeated
167
168     """
169     if jq_prepare_result:
170       # Check again shortly
171       logging.info("Job queue has been notified for shutdown but is still"
172                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
173       self._had_active_jobs = True
174       return self._CHECK_INTERVAL
175
176     if not self._had_active_jobs:
177       # Can shut down as there were no active jobs on the first check
178       return None
179
180     # No jobs are running anymore, but maybe some clients want to collect some
181     # information. Give them a short amount of time.
182     if self._linger_timeout is None:
183       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
184
185     remaining = self._linger_timeout.Remaining()
186
187     logging.info("Job queue no longer busy; shutting down master daemon"
188                  " in %s seconds", remaining)
189
190     # TODO: Should the master daemon socket be closed at this point? Doing so
191     # wouldn't affect existing connections.
192
193     if remaining < 0:
194       return None
195     else:
196       return remaining
197
198
199 class MasterServer(daemon.AsyncStreamServer):
200   """Master Server.
201
202   This is the main asynchronous master server. It handles connections to the
203   master socket.
204
205   """
206   family = socket.AF_UNIX
207
208   def __init__(self, address, uid, gid):
209     """MasterServer constructor
210
211     @param address: the unix socket address to bind the MasterServer to
212     @param uid: The uid of the owner of the socket
213     @param gid: The gid of the owner of the socket
214
215     """
216     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218     os.chmod(temp_name, 0770)
219     os.chown(temp_name, uid, gid)
220     os.rename(temp_name, address)
221
222     self.awaker = daemon.AsyncAwaker()
223
224     # We'll only start threads once we've forked.
225     self.context = None
226     self.request_workers = None
227
228     self._shutdown_check = None
229
230   def handle_connection(self, connected_socket, client_address):
231     # TODO: add connection count and limit the number of open connections to a
232     # maximum number to avoid breaking for lack of file descriptors or memory.
233     MasterClientHandler(self, connected_socket, client_address, self.family)
234
235   def setup_queue(self):
236     self.context = GanetiContext()
237     self.request_workers = workerpool.WorkerPool("ClientReq",
238                                                  CLIENT_REQUEST_WORKERS,
239                                                  ClientRequestWorker)
240
241   def WaitForShutdown(self):
242     """Prepares server for shutdown.
243
244     """
245     if self._shutdown_check is None:
246       self._shutdown_check = _MasterShutdownCheck()
247
248     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249
250   def server_cleanup(self):
251     """Cleanup the server.
252
253     This involves shutting down the processor threads and the master
254     socket.
255
256     """
257     try:
258       self.close()
259     finally:
260       if self.request_workers:
261         self.request_workers.TerminateWorkers()
262       if self.context:
263         self.context.jobqueue.Shutdown()
264
265
266 class ClientOps:
267   """Class holding high-level client operations."""
268   def __init__(self, server):
269     self.server = server
270
271   def handle_request(self, method, args): # pylint: disable=R0911
272     context = self.server.context
273     queue = context.jobqueue
274
275     # TODO: Parameter validation
276     if not isinstance(args, (tuple, list)):
277       logging.info("Received invalid arguments of type '%s'", type(args))
278       raise ValueError("Invalid arguments type '%s'" % type(args))
279
280     # TODO: Rewrite to not exit in each 'if/elif' branch
281
282     if method == luxi.REQ_SUBMIT_JOB:
283       logging.info("Receiving new job")
284       (job_def, ) = args
285       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286       job_id = queue.SubmitJob(ops)
287       _LogNewJob(True, job_id, ops)
288       return job_id
289
290     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291       logging.info("Receiving multiple jobs")
292       (job_defs, ) = args
293       jobs = []
294       for ops in job_defs:
295         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296       job_ids = queue.SubmitManyJobs(jobs)
297       for ((status, job_id), ops) in zip(job_ids, jobs):
298         _LogNewJob(status, job_id, ops)
299       return job_ids
300
301     elif method == luxi.REQ_CANCEL_JOB:
302       (job_id, ) = args
303       logging.info("Received job cancel request for %s", job_id)
304       return queue.CancelJob(job_id)
305
306     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307       (job_id, priority) = args
308       logging.info("Received request to change priority for job %s to %s",
309                    job_id, priority)
310       return queue.ChangeJobPriority(job_id, priority)
311
312     elif method == luxi.REQ_ARCHIVE_JOB:
313       (job_id, ) = args
314       logging.info("Received job archive request for %s", job_id)
315       return queue.ArchiveJob(job_id)
316
317     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318       (age, timeout) = args
319       logging.info("Received job autoarchive request for age %s, timeout %s",
320                    age, timeout)
321       return queue.AutoArchiveJobs(age, timeout)
322
323     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325       logging.info("Received job poll request for %s", job_id)
326       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327                                      prev_log_serial, timeout)
328
329     elif method == luxi.REQ_QUERY:
330       (what, fields, qfilter) = args
331
332       if what in constants.QR_VIA_OP:
333         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
334                                              qfilter=qfilter))
335       elif what == constants.QR_LOCK:
336         if qfilter is not None:
337           raise errors.OpPrereqError("Lock queries can't be filtered",
338                                      errors.ECODE_INVAL)
339         return context.glm.QueryLocks(fields)
340       elif what == constants.QR_JOB:
341         return queue.QueryJobs(fields, qfilter)
342       elif what in constants.QR_VIA_LUXI:
343         raise NotImplementedError
344       else:
345         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
346                                    errors.ECODE_INVAL)
347
348       return result
349
350     elif method == luxi.REQ_QUERY_FIELDS:
351       (what, fields) = args
352       req = objects.QueryFieldsRequest(what=what, fields=fields)
353
354       try:
355         fielddefs = query.ALL_FIELDS[req.what]
356       except KeyError:
357         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
358                                    errors.ECODE_INVAL)
359
360       return query.QueryFields(fielddefs, req.fields)
361
362     elif method == luxi.REQ_QUERY_JOBS:
363       (job_ids, fields) = args
364       if isinstance(job_ids, (tuple, list)) and job_ids:
365         msg = utils.CommaJoin(job_ids)
366       else:
367         msg = str(job_ids)
368       logging.info("Received job query request for %s", msg)
369       return queue.OldStyleQueryJobs(job_ids, fields)
370
371     elif method == luxi.REQ_QUERY_INSTANCES:
372       (names, fields, use_locking) = args
373       logging.info("Received instance query request for %s", names)
374       if use_locking:
375         raise errors.OpPrereqError("Sync queries are not allowed",
376                                    errors.ECODE_INVAL)
377       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378                                    use_locking=use_locking)
379       return self._Query(op)
380
381     elif method == luxi.REQ_QUERY_NODES:
382       (names, fields, use_locking) = args
383       logging.info("Received node query request for %s", names)
384       if use_locking:
385         raise errors.OpPrereqError("Sync queries are not allowed",
386                                    errors.ECODE_INVAL)
387       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388                                use_locking=use_locking)
389       return self._Query(op)
390
391     elif method == luxi.REQ_QUERY_GROUPS:
392       (names, fields, use_locking) = args
393       logging.info("Received group query request for %s", names)
394       if use_locking:
395         raise errors.OpPrereqError("Sync queries are not allowed",
396                                    errors.ECODE_INVAL)
397       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398       return self._Query(op)
399
400     elif method == luxi.REQ_QUERY_NETWORKS:
401       (names, fields, use_locking) = args
402       logging.info("Received network query request for %s", names)
403       if use_locking:
404         raise errors.OpPrereqError("Sync queries are not allowed",
405                                    errors.ECODE_INVAL)
406       op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
407       return self._Query(op)
408
409     elif method == luxi.REQ_QUERY_EXPORTS:
410       (nodes, use_locking) = args
411       if use_locking:
412         raise errors.OpPrereqError("Sync queries are not allowed",
413                                    errors.ECODE_INVAL)
414       logging.info("Received exports query request")
415       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
416       return self._Query(op)
417
418     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
419       (fields, ) = args
420       logging.info("Received config values query request for %s", fields)
421       op = opcodes.OpClusterConfigQuery(output_fields=fields)
422       return self._Query(op)
423
424     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
425       logging.info("Received cluster info query request")
426       op = opcodes.OpClusterQuery()
427       return self._Query(op)
428
429     elif method == luxi.REQ_QUERY_TAGS:
430       (kind, name) = args
431       logging.info("Received tags query request")
432       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
433       return self._Query(op)
434
435     elif method == luxi.REQ_SET_DRAIN_FLAG:
436       (drain_flag, ) = args
437       logging.info("Received queue drain flag change request to %s",
438                    drain_flag)
439       return queue.SetDrainFlag(drain_flag)
440
441     elif method == luxi.REQ_SET_WATCHER_PAUSE:
442       (until, ) = args
443
444       if until is None:
445         logging.info("Received request to no longer pause the watcher")
446       else:
447         if not isinstance(until, (int, float)):
448           raise TypeError("Duration must be an integer or float")
449
450         if until < time.time():
451           raise errors.GenericError("Unable to set pause end time in the past")
452
453         logging.info("Received request to pause the watcher until %s", until)
454
455       return _SetWatcherPause(until)
456
457     else:
458       logging.info("Received invalid request '%s'", method)
459       raise ValueError("Invalid operation '%s'" % method)
460
461   def _Query(self, op):
462     """Runs the specified opcode and returns the result.
463
464     """
465     # Queries don't have a job id
466     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
467
468     # TODO: Executing an opcode using locks will acquire them in blocking mode.
469     # Consider using a timeout for retries.
470     return proc.ExecOpCode(op, None)
471
472
473 class GanetiContext(object):
474   """Context common to all ganeti threads.
475
476   This class creates and holds common objects shared by all threads.
477
478   """
479   # pylint: disable=W0212
480   # we do want to ensure a singleton here
481   _instance = None
482
483   def __init__(self):
484     """Constructs a new GanetiContext object.
485
486     There should be only a GanetiContext object at any time, so this
487     function raises an error if this is not the case.
488
489     """
490     assert self.__class__._instance is None, "double GanetiContext instance"
491
492     # Create global configuration object
493     self.cfg = config.ConfigWriter()
494
495     # Locking manager
496     self.glm = locking.GanetiLockManager(
497       self.cfg.GetNodeList(),
498       self.cfg.GetNodeGroupList(),
499       self.cfg.GetInstanceList(),
500       self.cfg.GetNetworkList())
501
502     self.cfg.SetContext(self)
503
504     # RPC runner
505     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
506
507     # Job queue
508     self.jobqueue = jqueue.JobQueue(self)
509
510     # setting this also locks the class against attribute modifications
511     self.__class__._instance = self
512
513   def __setattr__(self, name, value):
514     """Setting GanetiContext attributes is forbidden after initialization.
515
516     """
517     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
518     object.__setattr__(self, name, value)
519
520   def AddNode(self, node, ec_id):
521     """Adds a node to the configuration and lock manager.
522
523     """
524     # Add it to the configuration
525     self.cfg.AddNode(node, ec_id)
526
527     # If preseeding fails it'll not be added
528     self.jobqueue.AddNode(node)
529
530     # Add the new node to the Ganeti Lock Manager
531     self.glm.add(locking.LEVEL_NODE, node.name)
532     self.glm.add(locking.LEVEL_NODE_RES, node.name)
533
534   def ReaddNode(self, node):
535     """Updates a node that's already in the configuration
536
537     """
538     # Synchronize the queue again
539     self.jobqueue.AddNode(node)
540
541   def RemoveNode(self, name):
542     """Removes a node from the configuration and lock manager.
543
544     """
545     # Remove node from configuration
546     self.cfg.RemoveNode(name)
547
548     # Notify job queue
549     self.jobqueue.RemoveNode(name)
550
551     # Remove the node from the Ganeti Lock Manager
552     self.glm.remove(locking.LEVEL_NODE, name)
553     self.glm.remove(locking.LEVEL_NODE_RES, name)
554
555
556 def _SetWatcherPause(until):
557   """Creates or removes the watcher pause file.
558
559   @type until: None or int
560   @param until: Unix timestamp saying until when the watcher shouldn't run
561
562   """
563   if until is None:
564     utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
565   else:
566     utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
567                     data="%d\n" % (until, ))
568
569   return until
570
571
572 @rpc.RunWithRPC
573 def CheckAgreement():
574   """Check the agreement on who is the master.
575
576   The function uses a very simple algorithm: we must get more positive
577   than negative answers. Since in most of the cases we are the master,
578   we'll use our own config file for getting the node list. In the
579   future we could collect the current node list from our (possibly
580   obsolete) known nodes.
581
582   In order to account for cold-start of all nodes, we retry for up to
583   a minute until we get a real answer as the top-voted one. If the
584   nodes are more out-of-sync, for now manual startup of the master
585   should be attempted.
586
587   Note that for a even number of nodes cluster, we need at least half
588   of the nodes (beside ourselves) to vote for us. This creates a
589   problem on two-node clusters, since in this case we require the
590   other node to be up too to confirm our status.
591
592   """
593   myself = netutils.Hostname.GetSysName()
594   #temp instantiation of a config writer, used only to get the node list
595   cfg = config.ConfigWriter()
596   node_list = cfg.GetNodeList()
597   del cfg
598   retries = 6
599   while retries > 0:
600     votes = bootstrap.GatherMasterVotes(node_list)
601     if not votes:
602       # empty node list, this is a one node cluster
603       return True
604     if votes[0][0] is None:
605       retries -= 1
606       time.sleep(10)
607       continue
608     break
609   if retries == 0:
610     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
611                      " after multiple retries. Aborting startup")
612     logging.critical("Use the --no-voting option if you understand what"
613                      " effects it has on the cluster state")
614     return False
615   # here a real node is at the top of the list
616   all_votes = sum(item[1] for item in votes)
617   top_node, top_votes = votes[0]
618
619   result = False
620   if top_node != myself:
621     logging.critical("It seems we are not the master (top-voted node"
622                      " is %s with %d out of %d votes)", top_node, top_votes,
623                      all_votes)
624   elif top_votes < all_votes - top_votes:
625     logging.critical("It seems we are not the master (%d votes for,"
626                      " %d votes against)", top_votes, all_votes - top_votes)
627   else:
628     result = True
629
630   return result
631
632
633 @rpc.RunWithRPC
634 def ActivateMasterIP():
635   # activate ip
636   cfg = config.ConfigWriter()
637   master_params = cfg.GetMasterNetworkParameters()
638   ems = cfg.GetUseExternalMipScript()
639   runner = rpc.BootstrapRunner()
640   result = runner.call_node_activate_master_ip(master_params.name,
641                                                master_params, ems)
642
643   msg = result.fail_msg
644   if msg:
645     logging.error("Can't activate master IP address: %s", msg)
646
647
648 def CheckMasterd(options, args):
649   """Initial checks whether to run or exit with a failure.
650
651   """
652   if args: # masterd doesn't take any arguments
653     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
654     sys.exit(constants.EXIT_FAILURE)
655
656   ssconf.CheckMaster(options.debug)
657
658   try:
659     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
660     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
661   except KeyError:
662     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
663                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
664     sys.exit(constants.EXIT_FAILURE)
665
666   # Determine static runtime architecture information
667   runtime.InitArchInfo()
668
669   # Check the configuration is sane before anything else
670   try:
671     config.ConfigWriter()
672   except errors.ConfigVersionMismatch, err:
673     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
674     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
675     print >> sys.stderr,  \
676         ("Configuration version mismatch. The current Ganeti software"
677          " expects version %s, but the on-disk configuration file has"
678          " version %s. This is likely the result of upgrading the"
679          " software without running the upgrade procedure. Please contact"
680          " your cluster administrator or complete the upgrade using the"
681          " cfgupgrade utility, after reading the upgrade notes." %
682          (v1, v2))
683     sys.exit(constants.EXIT_FAILURE)
684   except errors.ConfigurationError, err:
685     print >> sys.stderr, \
686         ("Configuration error while opening the configuration file: %s\n"
687          "This might be caused by an incomplete software upgrade or"
688          " by a corrupted configuration file. Until the problem is fixed"
689          " the master daemon cannot start." % str(err))
690     sys.exit(constants.EXIT_FAILURE)
691
692   # If CheckMaster didn't fail we believe we are the master, but we have to
693   # confirm with the other nodes.
694   if options.no_voting:
695     if not options.yes_do_it:
696       sys.stdout.write("The 'no voting' option has been selected.\n")
697       sys.stdout.write("This is dangerous, please confirm by"
698                        " typing uppercase 'yes': ")
699       sys.stdout.flush()
700
701       confirmation = sys.stdin.readline().strip()
702       if confirmation != "YES":
703         print >> sys.stderr, "Aborting."
704         sys.exit(constants.EXIT_FAILURE)
705
706   else:
707     # CheckAgreement uses RPC and threads, hence it needs to be run in
708     # a separate process before we call utils.Daemonize in the current
709     # process.
710     if not utils.RunInSeparateProcess(CheckAgreement):
711       sys.exit(constants.EXIT_FAILURE)
712
713   # ActivateMasterIP also uses RPC/threads, so we run it again via a
714   # separate process.
715
716   # TODO: decide whether failure to activate the master IP is a fatal error
717   utils.RunInSeparateProcess(ActivateMasterIP)
718
719
720 def PrepMasterd(options, _):
721   """Prep master daemon function, executed with the PID file held.
722
723   """
724   # This is safe to do as the pid file guarantees against
725   # concurrent execution.
726   utils.RemoveFile(pathutils.MASTER_SOCKET)
727
728   mainloop = daemon.Mainloop()
729   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
730   return (mainloop, master)
731
732
733 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
734   """Main master daemon function, executed with the PID file held.
735
736   """
737   (mainloop, master) = prep_data
738   try:
739     rpc.Init()
740     try:
741       master.setup_queue()
742       try:
743         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
744       finally:
745         master.server_cleanup()
746     finally:
747       rpc.Shutdown()
748   finally:
749     utils.RemoveFile(pathutils.MASTER_SOCKET)
750
751   logging.info("Clean master daemon shutdown")
752
753
754 def Main():
755   """Main function"""
756   parser = OptionParser(description="Ganeti master daemon",
757                         usage="%prog [-f] [-d]",
758                         version="%%prog (ganeti) %s" %
759                         constants.RELEASE_VERSION)
760   parser.add_option("--no-voting", dest="no_voting",
761                     help="Do not check that the nodes agree on this node"
762                     " being the master and start the daemon unconditionally",
763                     default=False, action="store_true")
764   parser.add_option("--yes-do-it", dest="yes_do_it",
765                     help="Override interactive check for --no-voting",
766                     default=False, action="store_true")
767   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
768                      ExecMasterd, multithreaded=True)