masterd: Remove duplicate code
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62
63
64 CLIENT_REQUEST_WORKERS = 16
65
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68
69
70 def _LogNewJob(status, info, ops):
71   """Log information about a recently submitted job.
72
73   """
74   op_summary = utils.CommaJoin(op.Summary() for op in ops)
75
76   if status:
77     logging.info("New job with id %s, summary: %s", info, op_summary)
78   else:
79     logging.info("Failed to submit job, reason: '%s', summary: %s",
80                  info, op_summary)
81
82
83 class ClientRequestWorker(workerpool.BaseWorker):
84   # pylint: disable=W0221
85   def RunTask(self, server, message, client):
86     """Process the request.
87
88     """
89     client_ops = ClientOps(server)
90
91     try:
92       (method, args, version) = luxi.ParseRequest(message)
93     except luxi.ProtocolError, err:
94       logging.error("Protocol Error: %s", err)
95       client.close_log()
96       return
97
98     success = False
99     try:
100       # Verify client's version if there was one in the request
101       if version is not None and version != constants.LUXI_VERSION:
102         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
103                                (constants.LUXI_VERSION, version))
104
105       result = client_ops.handle_request(method, args)
106       success = True
107     except errors.GenericError, err:
108       logging.exception("Unexpected exception")
109       success = False
110       result = errors.EncodeException(err)
111     except:
112       logging.exception("Unexpected exception")
113       err = sys.exc_info()
114       result = "Caught exception: %s" % str(err[1])
115
116     try:
117       reply = luxi.FormatResponse(success, result)
118       client.send_message(reply)
119       # awake the main thread so that it can write out the data.
120       server.awaker.signal()
121     except: # pylint: disable=W0702
122       logging.exception("Send error")
123       client.close_log()
124
125
126 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
127   """Handler for master peers.
128
129   """
130   _MAX_UNHANDLED = 1
131
132   def __init__(self, server, connected_socket, client_address, family):
133     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
134                                                  client_address,
135                                                  constants.LUXI_EOM,
136                                                  family, self._MAX_UNHANDLED)
137     self.server = server
138
139   def handle_message(self, message, _):
140     self.server.request_workers.AddTask((self.server, message, self))
141
142
143 class _MasterShutdownCheck:
144   """Logic for master daemon shutdown.
145
146   """
147   #: How long to wait between checks
148   _CHECK_INTERVAL = 5.0
149
150   #: How long to wait after all jobs are done (e.g. to give clients time to
151   #: retrieve the job status)
152   _SHUTDOWN_LINGER = 5.0
153
154   def __init__(self):
155     """Initializes this class.
156
157     """
158     self._had_active_jobs = None
159     self._linger_timeout = None
160
161   def __call__(self, jq_prepare_result):
162     """Determines if master daemon is ready for shutdown.
163
164     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
165     @rtype: None or number
166     @return: None if master daemon is ready, timeout if the check must be
167              repeated
168
169     """
170     if jq_prepare_result:
171       # Check again shortly
172       logging.info("Job queue has been notified for shutdown but is still"
173                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
174       self._had_active_jobs = True
175       return self._CHECK_INTERVAL
176
177     if not self._had_active_jobs:
178       # Can shut down as there were no active jobs on the first check
179       return None
180
181     # No jobs are running anymore, but maybe some clients want to collect some
182     # information. Give them a short amount of time.
183     if self._linger_timeout is None:
184       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
185
186     remaining = self._linger_timeout.Remaining()
187
188     logging.info("Job queue no longer busy; shutting down master daemon"
189                  " in %s seconds", remaining)
190
191     # TODO: Should the master daemon socket be closed at this point? Doing so
192     # wouldn't affect existing connections.
193
194     if remaining < 0:
195       return None
196     else:
197       return remaining
198
199
200 class MasterServer(daemon.AsyncStreamServer):
201   """Master Server.
202
203   This is the main asynchronous master server. It handles connections to the
204   master socket.
205
206   """
207   family = socket.AF_UNIX
208
209   def __init__(self, address, uid, gid):
210     """MasterServer constructor
211
212     @param address: the unix socket address to bind the MasterServer to
213     @param uid: The uid of the owner of the socket
214     @param gid: The gid of the owner of the socket
215
216     """
217     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
218     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
219     os.chmod(temp_name, 0770)
220     os.chown(temp_name, uid, gid)
221     os.rename(temp_name, address)
222
223     self.awaker = daemon.AsyncAwaker()
224
225     # We'll only start threads once we've forked.
226     self.context = None
227     self.request_workers = None
228
229     self._shutdown_check = None
230
231   def handle_connection(self, connected_socket, client_address):
232     # TODO: add connection count and limit the number of open connections to a
233     # maximum number to avoid breaking for lack of file descriptors or memory.
234     MasterClientHandler(self, connected_socket, client_address, self.family)
235
236   def setup_queue(self):
237     self.context = GanetiContext()
238     self.request_workers = workerpool.WorkerPool("ClientReq",
239                                                  CLIENT_REQUEST_WORKERS,
240                                                  ClientRequestWorker)
241
242   def WaitForShutdown(self):
243     """Prepares server for shutdown.
244
245     """
246     if self._shutdown_check is None:
247       self._shutdown_check = _MasterShutdownCheck()
248
249     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
250
251   def server_cleanup(self):
252     """Cleanup the server.
253
254     This involves shutting down the processor threads and the master
255     socket.
256
257     """
258     try:
259       self.close()
260     finally:
261       if self.request_workers:
262         self.request_workers.TerminateWorkers()
263       if self.context:
264         self.context.jobqueue.Shutdown()
265
266
267 class ClientOps:
268   """Class holding high-level client operations."""
269   def __init__(self, server):
270     self.server = server
271
272   def handle_request(self, method, args): # pylint: disable=R0911
273     context = self.server.context
274     queue = context.jobqueue
275
276     # TODO: Parameter validation
277     if not isinstance(args, (tuple, list)):
278       logging.info("Received invalid arguments of type '%s'", type(args))
279       raise ValueError("Invalid arguments type '%s'" % type(args))
280
281     # TODO: Rewrite to not exit in each 'if/elif' branch
282
283     if method == luxi.REQ_SUBMIT_JOB:
284       logging.info("Receiving new job")
285       (job_def, ) = args
286       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
287       job_id = queue.SubmitJob(ops)
288       _LogNewJob(True, job_id, ops)
289       return job_id
290
291     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
292       logging.info("Receiving multiple jobs")
293       (job_defs, ) = args
294       jobs = []
295       for ops in job_defs:
296         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
297       job_ids = queue.SubmitManyJobs(jobs)
298       for ((status, job_id), ops) in zip(job_ids, jobs):
299         _LogNewJob(status, job_id, ops)
300       return job_ids
301
302     elif method == luxi.REQ_CANCEL_JOB:
303       (job_id, ) = args
304       logging.info("Received job cancel request for %s", job_id)
305       return queue.CancelJob(job_id)
306
307     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
308       (job_id, priority) = args
309       logging.info("Received request to change priority for job %s to %s",
310                    job_id, priority)
311       return queue.ChangeJobPriority(job_id, priority)
312
313     elif method == luxi.REQ_ARCHIVE_JOB:
314       (job_id, ) = args
315       logging.info("Received job archive request for %s", job_id)
316       return queue.ArchiveJob(job_id)
317
318     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
319       (age, timeout) = args
320       logging.info("Received job autoarchive request for age %s, timeout %s",
321                    age, timeout)
322       return queue.AutoArchiveJobs(age, timeout)
323
324     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
325       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
326       logging.info("Received job poll request for %s", job_id)
327       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
328                                      prev_log_serial, timeout)
329
330     elif method == luxi.REQ_QUERY:
331       (what, fields, qfilter) = args
332
333       if what in constants.QR_VIA_OP:
334         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
335                                              qfilter=qfilter))
336       elif what == constants.QR_LOCK:
337         if qfilter is not None:
338           raise errors.OpPrereqError("Lock queries can't be filtered",
339                                      errors.ECODE_INVAL)
340         return context.glm.QueryLocks(fields)
341       elif what == constants.QR_JOB:
342         return queue.QueryJobs(fields, qfilter)
343       elif what in constants.QR_VIA_LUXI:
344         raise NotImplementedError
345       else:
346         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
347                                    errors.ECODE_INVAL)
348
349       return result
350
351     elif method == luxi.REQ_QUERY_FIELDS:
352       (what, fields) = args
353       req = objects.QueryFieldsRequest(what=what, fields=fields)
354
355       try:
356         fielddefs = query.ALL_FIELDS[req.what]
357       except KeyError:
358         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
359                                    errors.ECODE_INVAL)
360
361       return query.QueryFields(fielddefs, req.fields)
362
363     elif method == luxi.REQ_QUERY_JOBS:
364       (job_ids, fields) = args
365       if isinstance(job_ids, (tuple, list)) and job_ids:
366         msg = utils.CommaJoin(job_ids)
367       else:
368         msg = str(job_ids)
369       logging.info("Received job query request for %s", msg)
370       return queue.OldStyleQueryJobs(job_ids, fields)
371
372     elif method == luxi.REQ_QUERY_INSTANCES:
373       (names, fields, use_locking) = args
374       logging.info("Received instance query request for %s", names)
375       if use_locking:
376         raise errors.OpPrereqError("Sync queries are not allowed",
377                                    errors.ECODE_INVAL)
378       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
379                                    use_locking=use_locking)
380       return self._Query(op)
381
382     elif method == luxi.REQ_QUERY_NODES:
383       (names, fields, use_locking) = args
384       logging.info("Received node query request for %s", names)
385       if use_locking:
386         raise errors.OpPrereqError("Sync queries are not allowed",
387                                    errors.ECODE_INVAL)
388       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
389                                use_locking=use_locking)
390       return self._Query(op)
391
392     elif method == luxi.REQ_QUERY_GROUPS:
393       (names, fields, use_locking) = args
394       logging.info("Received group query request for %s", names)
395       if use_locking:
396         raise errors.OpPrereqError("Sync queries are not allowed",
397                                    errors.ECODE_INVAL)
398       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
399       return self._Query(op)
400
401     elif method == luxi.REQ_QUERY_NETWORKS:
402       (names, fields, use_locking) = args
403       logging.info("Received network query request for %s", names)
404       if use_locking:
405         raise errors.OpPrereqError("Sync queries are not allowed",
406                                    errors.ECODE_INVAL)
407       op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
408       return self._Query(op)
409
410     elif method == luxi.REQ_QUERY_EXPORTS:
411       (nodes, use_locking) = args
412       if use_locking:
413         raise errors.OpPrereqError("Sync queries are not allowed",
414                                    errors.ECODE_INVAL)
415       logging.info("Received exports query request")
416       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
417       return self._Query(op)
418
419     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
420       (fields, ) = args
421       logging.info("Received config values query request for %s", fields)
422       op = opcodes.OpClusterConfigQuery(output_fields=fields)
423       return self._Query(op)
424
425     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
426       logging.info("Received cluster info query request")
427       op = opcodes.OpClusterQuery()
428       return self._Query(op)
429
430     elif method == luxi.REQ_QUERY_TAGS:
431       (kind, name) = args
432       logging.info("Received tags query request")
433       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
434       return self._Query(op)
435
436     elif method == luxi.REQ_SET_DRAIN_FLAG:
437       (drain_flag, ) = args
438       logging.info("Received queue drain flag change request to %s",
439                    drain_flag)
440       return queue.SetDrainFlag(drain_flag)
441
442     elif method == luxi.REQ_SET_WATCHER_PAUSE:
443       (until, ) = args
444
445       if until is None:
446         logging.info("Received request to no longer pause the watcher")
447       else:
448         if not isinstance(until, (int, float)):
449           raise TypeError("Duration must be an integer or float")
450
451         if until < time.time():
452           raise errors.GenericError("Unable to set pause end time in the past")
453
454         logging.info("Received request to pause the watcher until %s", until)
455
456       return _SetWatcherPause(until)
457
458     else:
459       logging.info("Received invalid request '%s'", method)
460       raise ValueError("Invalid operation '%s'" % method)
461
462   def _Query(self, op):
463     """Runs the specified opcode and returns the result.
464
465     """
466     # Queries don't have a job id
467     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
468
469     # TODO: Executing an opcode using locks will acquire them in blocking mode.
470     # Consider using a timeout for retries.
471     return proc.ExecOpCode(op, None)
472
473
474 class GanetiContext(object):
475   """Context common to all ganeti threads.
476
477   This class creates and holds common objects shared by all threads.
478
479   """
480   # pylint: disable=W0212
481   # we do want to ensure a singleton here
482   _instance = None
483
484   def __init__(self):
485     """Constructs a new GanetiContext object.
486
487     There should be only a GanetiContext object at any time, so this
488     function raises an error if this is not the case.
489
490     """
491     assert self.__class__._instance is None, "double GanetiContext instance"
492
493     # Create global configuration object
494     self.cfg = config.ConfigWriter()
495
496     # Locking manager
497     self.glm = locking.GanetiLockManager(
498       self.cfg.GetNodeList(),
499       self.cfg.GetNodeGroupList(),
500       self.cfg.GetInstanceList(),
501       self.cfg.GetNetworkList())
502
503     self.cfg.SetContext(self)
504
505     # RPC runner
506     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
507
508     # Job queue
509     self.jobqueue = jqueue.JobQueue(self)
510
511     # setting this also locks the class against attribute modifications
512     self.__class__._instance = self
513
514   def __setattr__(self, name, value):
515     """Setting GanetiContext attributes is forbidden after initialization.
516
517     """
518     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
519     object.__setattr__(self, name, value)
520
521   def AddNode(self, node, ec_id):
522     """Adds a node to the configuration and lock manager.
523
524     """
525     # Add it to the configuration
526     self.cfg.AddNode(node, ec_id)
527
528     # If preseeding fails it'll not be added
529     self.jobqueue.AddNode(node)
530
531     # Add the new node to the Ganeti Lock Manager
532     self.glm.add(locking.LEVEL_NODE, node.name)
533     self.glm.add(locking.LEVEL_NODE_RES, node.name)
534
535   def ReaddNode(self, node):
536     """Updates a node that's already in the configuration
537
538     """
539     # Synchronize the queue again
540     self.jobqueue.AddNode(node)
541
542   def RemoveNode(self, name):
543     """Removes a node from the configuration and lock manager.
544
545     """
546     # Remove node from configuration
547     self.cfg.RemoveNode(name)
548
549     # Notify job queue
550     self.jobqueue.RemoveNode(name)
551
552     # Remove the node from the Ganeti Lock Manager
553     self.glm.remove(locking.LEVEL_NODE, name)
554     self.glm.remove(locking.LEVEL_NODE_RES, name)
555
556
557 def _SetWatcherPause(until):
558   """Creates or removes the watcher pause file.
559
560   @type until: None or int
561   @param until: Unix timestamp saying until when the watcher shouldn't run
562
563   """
564   if until is None:
565     utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
566   else:
567     utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
568                     data="%d\n" % (until, ))
569
570   return until
571
572
573 @rpc.RunWithRPC
574 def CheckAgreement():
575   """Check the agreement on who is the master.
576
577   The function uses a very simple algorithm: we must get more positive
578   than negative answers. Since in most of the cases we are the master,
579   we'll use our own config file for getting the node list. In the
580   future we could collect the current node list from our (possibly
581   obsolete) known nodes.
582
583   In order to account for cold-start of all nodes, we retry for up to
584   a minute until we get a real answer as the top-voted one. If the
585   nodes are more out-of-sync, for now manual startup of the master
586   should be attempted.
587
588   Note that for a even number of nodes cluster, we need at least half
589   of the nodes (beside ourselves) to vote for us. This creates a
590   problem on two-node clusters, since in this case we require the
591   other node to be up too to confirm our status.
592
593   """
594   myself = netutils.Hostname.GetSysName()
595   #temp instantiation of a config writer, used only to get the node list
596   cfg = config.ConfigWriter()
597   node_list = cfg.GetNodeList()
598   del cfg
599   retries = 6
600   while retries > 0:
601     votes = bootstrap.GatherMasterVotes(node_list)
602     if not votes:
603       # empty node list, this is a one node cluster
604       return True
605     if votes[0][0] is None:
606       retries -= 1
607       time.sleep(10)
608       continue
609     break
610   if retries == 0:
611     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
612                      " after multiple retries. Aborting startup")
613     logging.critical("Use the --no-voting option if you understand what"
614                      " effects it has on the cluster state")
615     return False
616   # here a real node is at the top of the list
617   all_votes = sum(item[1] for item in votes)
618   top_node, top_votes = votes[0]
619
620   result = False
621   if top_node != myself:
622     logging.critical("It seems we are not the master (top-voted node"
623                      " is %s with %d out of %d votes)", top_node, top_votes,
624                      all_votes)
625   elif top_votes < all_votes - top_votes:
626     logging.critical("It seems we are not the master (%d votes for,"
627                      " %d votes against)", top_votes, all_votes - top_votes)
628   else:
629     result = True
630
631   return result
632
633
634 @rpc.RunWithRPC
635 def ActivateMasterIP():
636   # activate ip
637   cfg = config.ConfigWriter()
638   master_params = cfg.GetMasterNetworkParameters()
639   ems = cfg.GetUseExternalMipScript()
640   runner = rpc.BootstrapRunner()
641   result = runner.call_node_activate_master_ip(master_params.name,
642                                                master_params, ems)
643
644   msg = result.fail_msg
645   if msg:
646     logging.error("Can't activate master IP address: %s", msg)
647
648
649 def CheckMasterd(options, args):
650   """Initial checks whether to run or exit with a failure.
651
652   """
653   if args: # masterd doesn't take any arguments
654     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
655     sys.exit(constants.EXIT_FAILURE)
656
657   ssconf.CheckMaster(options.debug)
658
659   try:
660     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
661     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
662   except KeyError:
663     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
664                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
665     sys.exit(constants.EXIT_FAILURE)
666
667   # Determine static runtime architecture information
668   runtime.InitArchInfo()
669
670   # Check the configuration is sane before anything else
671   try:
672     config.ConfigWriter()
673   except errors.ConfigVersionMismatch, err:
674     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
675     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
676     print >> sys.stderr,  \
677         ("Configuration version mismatch. The current Ganeti software"
678          " expects version %s, but the on-disk configuration file has"
679          " version %s. This is likely the result of upgrading the"
680          " software without running the upgrade procedure. Please contact"
681          " your cluster administrator or complete the upgrade using the"
682          " cfgupgrade utility, after reading the upgrade notes." %
683          (v1, v2))
684     sys.exit(constants.EXIT_FAILURE)
685   except errors.ConfigurationError, err:
686     print >> sys.stderr, \
687         ("Configuration error while opening the configuration file: %s\n"
688          "This might be caused by an incomplete software upgrade or"
689          " by a corrupted configuration file. Until the problem is fixed"
690          " the master daemon cannot start." % str(err))
691     sys.exit(constants.EXIT_FAILURE)
692
693   # If CheckMaster didn't fail we believe we are the master, but we have to
694   # confirm with the other nodes.
695   if options.no_voting:
696     if not options.yes_do_it:
697       sys.stdout.write("The 'no voting' option has been selected.\n")
698       sys.stdout.write("This is dangerous, please confirm by"
699                        " typing uppercase 'yes': ")
700       sys.stdout.flush()
701
702       confirmation = sys.stdin.readline().strip()
703       if confirmation != "YES":
704         print >> sys.stderr, "Aborting."
705         sys.exit(constants.EXIT_FAILURE)
706
707   else:
708     # CheckAgreement uses RPC and threads, hence it needs to be run in
709     # a separate process before we call utils.Daemonize in the current
710     # process.
711     if not utils.RunInSeparateProcess(CheckAgreement):
712       sys.exit(constants.EXIT_FAILURE)
713
714   # ActivateMasterIP also uses RPC/threads, so we run it again via a
715   # separate process.
716
717   # TODO: decide whether failure to activate the master IP is a fatal error
718   utils.RunInSeparateProcess(ActivateMasterIP)
719
720
721 def PrepMasterd(options, _):
722   """Prep master daemon function, executed with the PID file held.
723
724   """
725   # This is safe to do as the pid file guarantees against
726   # concurrent execution.
727   utils.RemoveFile(pathutils.MASTER_SOCKET)
728
729   mainloop = daemon.Mainloop()
730   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
731   return (mainloop, master)
732
733
734 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
735   """Main master daemon function, executed with the PID file held.
736
737   """
738   (mainloop, master) = prep_data
739   try:
740     rpc.Init()
741     try:
742       master.setup_queue()
743       try:
744         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
745       finally:
746         master.server_cleanup()
747     finally:
748       rpc.Shutdown()
749   finally:
750     utils.RemoveFile(pathutils.MASTER_SOCKET)
751
752   logging.info("Clean master daemon shutdown")
753
754
755 def Main():
756   """Main function"""
757   parser = OptionParser(description="Ganeti master daemon",
758                         usage="%prog [-f] [-d]",
759                         version="%%prog (ganeti) %s" %
760                         constants.RELEASE_VERSION)
761   parser.add_option("--no-voting", dest="no_voting",
762                     help="Do not check that the nodes agree on this node"
763                     " being the master and start the daemon unconditionally",
764                     default=False, action="store_true")
765   parser.add_option("--yes-do-it", dest="yes_do_it",
766                     help="Override interactive check for --no-voting",
767                     default=False, action="store_true")
768   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
769                      ExecMasterd, multithreaded=True)