Basic IP pool management logic
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62
63
64 CLIENT_REQUEST_WORKERS = 16
65
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68
69
70 def _LogNewJob(status, info, ops):
71   """Log information about a recently submitted job.
72
73   """
74   if status:
75     logging.info("New job with id %s, summary: %s",
76                  info, utils.CommaJoin(op.Summary() for op in ops))
77   else:
78     logging.info("Failed to submit job, reason: '%s', summary: %s",
79                  info, utils.CommaJoin(op.Summary() for op in ops))
80
81
82 class ClientRequestWorker(workerpool.BaseWorker):
83   # pylint: disable=W0221
84   def RunTask(self, server, message, client):
85     """Process the request.
86
87     """
88     client_ops = ClientOps(server)
89
90     try:
91       (method, args, version) = luxi.ParseRequest(message)
92     except luxi.ProtocolError, err:
93       logging.error("Protocol Error: %s", err)
94       client.close_log()
95       return
96
97     success = False
98     try:
99       # Verify client's version if there was one in the request
100       if version is not None and version != constants.LUXI_VERSION:
101         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102                                (constants.LUXI_VERSION, version))
103
104       result = client_ops.handle_request(method, args)
105       success = True
106     except errors.GenericError, err:
107       logging.exception("Unexpected exception")
108       success = False
109       result = errors.EncodeException(err)
110     except:
111       logging.exception("Unexpected exception")
112       err = sys.exc_info()
113       result = "Caught exception: %s" % str(err[1])
114
115     try:
116       reply = luxi.FormatResponse(success, result)
117       client.send_message(reply)
118       # awake the main thread so that it can write out the data.
119       server.awaker.signal()
120     except: # pylint: disable=W0702
121       logging.exception("Send error")
122       client.close_log()
123
124
125 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126   """Handler for master peers.
127
128   """
129   _MAX_UNHANDLED = 1
130
131   def __init__(self, server, connected_socket, client_address, family):
132     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
133                                                  client_address,
134                                                  constants.LUXI_EOM,
135                                                  family, self._MAX_UNHANDLED)
136     self.server = server
137
138   def handle_message(self, message, _):
139     self.server.request_workers.AddTask((self.server, message, self))
140
141
142 class _MasterShutdownCheck:
143   """Logic for master daemon shutdown.
144
145   """
146   #: How long to wait between checks
147   _CHECK_INTERVAL = 5.0
148
149   #: How long to wait after all jobs are done (e.g. to give clients time to
150   #: retrieve the job status)
151   _SHUTDOWN_LINGER = 5.0
152
153   def __init__(self):
154     """Initializes this class.
155
156     """
157     self._had_active_jobs = None
158     self._linger_timeout = None
159
160   def __call__(self, jq_prepare_result):
161     """Determines if master daemon is ready for shutdown.
162
163     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164     @rtype: None or number
165     @return: None if master daemon is ready, timeout if the check must be
166              repeated
167
168     """
169     if jq_prepare_result:
170       # Check again shortly
171       logging.info("Job queue has been notified for shutdown but is still"
172                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
173       self._had_active_jobs = True
174       return self._CHECK_INTERVAL
175
176     if not self._had_active_jobs:
177       # Can shut down as there were no active jobs on the first check
178       return None
179
180     # No jobs are running anymore, but maybe some clients want to collect some
181     # information. Give them a short amount of time.
182     if self._linger_timeout is None:
183       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
184
185     remaining = self._linger_timeout.Remaining()
186
187     logging.info("Job queue no longer busy; shutting down master daemon"
188                  " in %s seconds", remaining)
189
190     # TODO: Should the master daemon socket be closed at this point? Doing so
191     # wouldn't affect existing connections.
192
193     if remaining < 0:
194       return None
195     else:
196       return remaining
197
198
199 class MasterServer(daemon.AsyncStreamServer):
200   """Master Server.
201
202   This is the main asynchronous master server. It handles connections to the
203   master socket.
204
205   """
206   family = socket.AF_UNIX
207
208   def __init__(self, address, uid, gid):
209     """MasterServer constructor
210
211     @param address: the unix socket address to bind the MasterServer to
212     @param uid: The uid of the owner of the socket
213     @param gid: The gid of the owner of the socket
214
215     """
216     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218     os.chmod(temp_name, 0770)
219     os.chown(temp_name, uid, gid)
220     os.rename(temp_name, address)
221
222     self.awaker = daemon.AsyncAwaker()
223
224     # We'll only start threads once we've forked.
225     self.context = None
226     self.request_workers = None
227
228     self._shutdown_check = None
229
230   def handle_connection(self, connected_socket, client_address):
231     # TODO: add connection count and limit the number of open connections to a
232     # maximum number to avoid breaking for lack of file descriptors or memory.
233     MasterClientHandler(self, connected_socket, client_address, self.family)
234
235   def setup_queue(self):
236     self.context = GanetiContext()
237     self.request_workers = workerpool.WorkerPool("ClientReq",
238                                                  CLIENT_REQUEST_WORKERS,
239                                                  ClientRequestWorker)
240
241   def WaitForShutdown(self):
242     """Prepares server for shutdown.
243
244     """
245     if self._shutdown_check is None:
246       self._shutdown_check = _MasterShutdownCheck()
247
248     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249
250   def server_cleanup(self):
251     """Cleanup the server.
252
253     This involves shutting down the processor threads and the master
254     socket.
255
256     """
257     try:
258       self.close()
259     finally:
260       if self.request_workers:
261         self.request_workers.TerminateWorkers()
262       if self.context:
263         self.context.jobqueue.Shutdown()
264
265
266 class ClientOps:
267   """Class holding high-level client operations."""
268   def __init__(self, server):
269     self.server = server
270
271   def handle_request(self, method, args): # pylint: disable=R0911
272     context = self.server.context
273     queue = context.jobqueue
274
275     # TODO: Parameter validation
276     if not isinstance(args, (tuple, list)):
277       logging.info("Received invalid arguments of type '%s'", type(args))
278       raise ValueError("Invalid arguments type '%s'" % type(args))
279
280     # TODO: Rewrite to not exit in each 'if/elif' branch
281
282     if method == luxi.REQ_SUBMIT_JOB:
283       logging.info("Receiving new job")
284       (job_def, ) = args
285       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286       job_id = queue.SubmitJob(ops)
287       _LogNewJob(True, job_id, ops)
288       return job_id
289
290     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291       logging.info("Receiving multiple jobs")
292       (job_defs, ) = args
293       jobs = []
294       for ops in job_defs:
295         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296       job_ids = queue.SubmitManyJobs(jobs)
297       for ((status, job_id), ops) in zip(job_ids, jobs):
298         _LogNewJob(status, job_id, ops)
299       return job_ids
300
301     elif method == luxi.REQ_CANCEL_JOB:
302       (job_id, ) = args
303       logging.info("Received job cancel request for %s", job_id)
304       return queue.CancelJob(job_id)
305
306     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307       (job_id, priority) = args
308       logging.info("Received request to change priority for job %s to %s",
309                    job_id, priority)
310       return queue.ChangeJobPriority(job_id, priority)
311
312     elif method == luxi.REQ_ARCHIVE_JOB:
313       (job_id, ) = args
314       logging.info("Received job archive request for %s", job_id)
315       return queue.ArchiveJob(job_id)
316
317     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318       (age, timeout) = args
319       logging.info("Received job autoarchive request for age %s, timeout %s",
320                    age, timeout)
321       return queue.AutoArchiveJobs(age, timeout)
322
323     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325       logging.info("Received job poll request for %s", job_id)
326       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327                                      prev_log_serial, timeout)
328
329     elif method == luxi.REQ_QUERY:
330       (what, fields, qfilter) = args
331
332       if what in constants.QR_VIA_OP:
333         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
334                                              qfilter=qfilter))
335       elif what == constants.QR_LOCK:
336         if qfilter is not None:
337           raise errors.OpPrereqError("Lock queries can't be filtered",
338                                      errors.ECODE_INVAL)
339         return context.glm.QueryLocks(fields)
340       elif what == constants.QR_JOB:
341         return queue.QueryJobs(fields, qfilter)
342       elif what in constants.QR_VIA_LUXI:
343         raise NotImplementedError
344       else:
345         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
346                                    errors.ECODE_INVAL)
347
348       return result
349
350     elif method == luxi.REQ_QUERY_FIELDS:
351       (what, fields) = args
352       req = objects.QueryFieldsRequest(what=what, fields=fields)
353
354       try:
355         fielddefs = query.ALL_FIELDS[req.what]
356       except KeyError:
357         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
358                                    errors.ECODE_INVAL)
359
360       return query.QueryFields(fielddefs, req.fields)
361
362     elif method == luxi.REQ_QUERY_JOBS:
363       (job_ids, fields) = args
364       if isinstance(job_ids, (tuple, list)) and job_ids:
365         msg = utils.CommaJoin(job_ids)
366       else:
367         msg = str(job_ids)
368       logging.info("Received job query request for %s", msg)
369       return queue.OldStyleQueryJobs(job_ids, fields)
370
371     elif method == luxi.REQ_QUERY_INSTANCES:
372       (names, fields, use_locking) = args
373       logging.info("Received instance query request for %s", names)
374       if use_locking:
375         raise errors.OpPrereqError("Sync queries are not allowed",
376                                    errors.ECODE_INVAL)
377       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378                                    use_locking=use_locking)
379       return self._Query(op)
380
381     elif method == luxi.REQ_QUERY_NODES:
382       (names, fields, use_locking) = args
383       logging.info("Received node query request for %s", names)
384       if use_locking:
385         raise errors.OpPrereqError("Sync queries are not allowed",
386                                    errors.ECODE_INVAL)
387       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388                                use_locking=use_locking)
389       return self._Query(op)
390
391     elif method == luxi.REQ_QUERY_GROUPS:
392       (names, fields, use_locking) = args
393       logging.info("Received group query request for %s", names)
394       if use_locking:
395         raise errors.OpPrereqError("Sync queries are not allowed",
396                                    errors.ECODE_INVAL)
397       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398       return self._Query(op)
399
400     elif method == luxi.REQ_QUERY_EXPORTS:
401       (nodes, use_locking) = args
402       if use_locking:
403         raise errors.OpPrereqError("Sync queries are not allowed",
404                                    errors.ECODE_INVAL)
405       logging.info("Received exports query request")
406       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
407       return self._Query(op)
408
409     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
410       (fields, ) = args
411       logging.info("Received config values query request for %s", fields)
412       op = opcodes.OpClusterConfigQuery(output_fields=fields)
413       return self._Query(op)
414
415     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
416       logging.info("Received cluster info query request")
417       op = opcodes.OpClusterQuery()
418       return self._Query(op)
419
420     elif method == luxi.REQ_QUERY_TAGS:
421       (kind, name) = args
422       logging.info("Received tags query request")
423       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
424       return self._Query(op)
425
426     elif method == luxi.REQ_SET_DRAIN_FLAG:
427       (drain_flag, ) = args
428       logging.info("Received queue drain flag change request to %s",
429                    drain_flag)
430       return queue.SetDrainFlag(drain_flag)
431
432     elif method == luxi.REQ_SET_WATCHER_PAUSE:
433       (until, ) = args
434
435       if until is None:
436         logging.info("Received request to no longer pause the watcher")
437       else:
438         if not isinstance(until, (int, float)):
439           raise TypeError("Duration must be an integer or float")
440
441         if until < time.time():
442           raise errors.GenericError("Unable to set pause end time in the past")
443
444         logging.info("Received request to pause the watcher until %s", until)
445
446       return _SetWatcherPause(until)
447
448     else:
449       logging.info("Received invalid request '%s'", method)
450       raise ValueError("Invalid operation '%s'" % method)
451
452   def _Query(self, op):
453     """Runs the specified opcode and returns the result.
454
455     """
456     # Queries don't have a job id
457     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
458
459     # TODO: Executing an opcode using locks will acquire them in blocking mode.
460     # Consider using a timeout for retries.
461     return proc.ExecOpCode(op, None)
462
463
464 class GanetiContext(object):
465   """Context common to all ganeti threads.
466
467   This class creates and holds common objects shared by all threads.
468
469   """
470   # pylint: disable=W0212
471   # we do want to ensure a singleton here
472   _instance = None
473
474   def __init__(self):
475     """Constructs a new GanetiContext object.
476
477     There should be only a GanetiContext object at any time, so this
478     function raises an error if this is not the case.
479
480     """
481     assert self.__class__._instance is None, "double GanetiContext instance"
482
483     # Create global configuration object
484     self.cfg = config.ConfigWriter()
485
486     # Locking manager
487     self.glm = locking.GanetiLockManager(
488       self.cfg.GetNodeList(),
489       self.cfg.GetNodeGroupList(),
490       self.cfg.GetInstanceList(),
491       self.cfg.GetNetworkList())
492
493     self.cfg.SetContext(self)
494
495     # RPC runner
496     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
497
498     # Job queue
499     self.jobqueue = jqueue.JobQueue(self)
500
501     # setting this also locks the class against attribute modifications
502     self.__class__._instance = self
503
504   def __setattr__(self, name, value):
505     """Setting GanetiContext attributes is forbidden after initialization.
506
507     """
508     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
509     object.__setattr__(self, name, value)
510
511   def AddNode(self, node, ec_id):
512     """Adds a node to the configuration and lock manager.
513
514     """
515     # Add it to the configuration
516     self.cfg.AddNode(node, ec_id)
517
518     # If preseeding fails it'll not be added
519     self.jobqueue.AddNode(node)
520
521     # Add the new node to the Ganeti Lock Manager
522     self.glm.add(locking.LEVEL_NODE, node.name)
523     self.glm.add(locking.LEVEL_NODE_RES, node.name)
524
525   def ReaddNode(self, node):
526     """Updates a node that's already in the configuration
527
528     """
529     # Synchronize the queue again
530     self.jobqueue.AddNode(node)
531
532   def RemoveNode(self, name):
533     """Removes a node from the configuration and lock manager.
534
535     """
536     # Remove node from configuration
537     self.cfg.RemoveNode(name)
538
539     # Notify job queue
540     self.jobqueue.RemoveNode(name)
541
542     # Remove the node from the Ganeti Lock Manager
543     self.glm.remove(locking.LEVEL_NODE, name)
544     self.glm.remove(locking.LEVEL_NODE_RES, name)
545
546
547 def _SetWatcherPause(until):
548   """Creates or removes the watcher pause file.
549
550   @type until: None or int
551   @param until: Unix timestamp saying until when the watcher shouldn't run
552
553   """
554   if until is None:
555     utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
556   else:
557     utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
558                     data="%d\n" % (until, ))
559
560   return until
561
562
563 @rpc.RunWithRPC
564 def CheckAgreement():
565   """Check the agreement on who is the master.
566
567   The function uses a very simple algorithm: we must get more positive
568   than negative answers. Since in most of the cases we are the master,
569   we'll use our own config file for getting the node list. In the
570   future we could collect the current node list from our (possibly
571   obsolete) known nodes.
572
573   In order to account for cold-start of all nodes, we retry for up to
574   a minute until we get a real answer as the top-voted one. If the
575   nodes are more out-of-sync, for now manual startup of the master
576   should be attempted.
577
578   Note that for a even number of nodes cluster, we need at least half
579   of the nodes (beside ourselves) to vote for us. This creates a
580   problem on two-node clusters, since in this case we require the
581   other node to be up too to confirm our status.
582
583   """
584   myself = netutils.Hostname.GetSysName()
585   #temp instantiation of a config writer, used only to get the node list
586   cfg = config.ConfigWriter()
587   node_list = cfg.GetNodeList()
588   del cfg
589   retries = 6
590   while retries > 0:
591     votes = bootstrap.GatherMasterVotes(node_list)
592     if not votes:
593       # empty node list, this is a one node cluster
594       return True
595     if votes[0][0] is None:
596       retries -= 1
597       time.sleep(10)
598       continue
599     break
600   if retries == 0:
601     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
602                      " after multiple retries. Aborting startup")
603     logging.critical("Use the --no-voting option if you understand what"
604                      " effects it has on the cluster state")
605     return False
606   # here a real node is at the top of the list
607   all_votes = sum(item[1] for item in votes)
608   top_node, top_votes = votes[0]
609
610   result = False
611   if top_node != myself:
612     logging.critical("It seems we are not the master (top-voted node"
613                      " is %s with %d out of %d votes)", top_node, top_votes,
614                      all_votes)
615   elif top_votes < all_votes - top_votes:
616     logging.critical("It seems we are not the master (%d votes for,"
617                      " %d votes against)", top_votes, all_votes - top_votes)
618   else:
619     result = True
620
621   return result
622
623
624 @rpc.RunWithRPC
625 def ActivateMasterIP():
626   # activate ip
627   cfg = config.ConfigWriter()
628   master_params = cfg.GetMasterNetworkParameters()
629   ems = cfg.GetUseExternalMipScript()
630   runner = rpc.BootstrapRunner()
631   result = runner.call_node_activate_master_ip(master_params.name,
632                                                master_params, ems)
633
634   msg = result.fail_msg
635   if msg:
636     logging.error("Can't activate master IP address: %s", msg)
637
638
639 def CheckMasterd(options, args):
640   """Initial checks whether to run or exit with a failure.
641
642   """
643   if args: # masterd doesn't take any arguments
644     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
645     sys.exit(constants.EXIT_FAILURE)
646
647   ssconf.CheckMaster(options.debug)
648
649   try:
650     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
651     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
652   except KeyError:
653     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
654                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
655     sys.exit(constants.EXIT_FAILURE)
656
657   # Determine static runtime architecture information
658   runtime.InitArchInfo()
659
660   # Check the configuration is sane before anything else
661   try:
662     config.ConfigWriter()
663   except errors.ConfigVersionMismatch, err:
664     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
665     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
666     print >> sys.stderr,  \
667         ("Configuration version mismatch. The current Ganeti software"
668          " expects version %s, but the on-disk configuration file has"
669          " version %s. This is likely the result of upgrading the"
670          " software without running the upgrade procedure. Please contact"
671          " your cluster administrator or complete the upgrade using the"
672          " cfgupgrade utility, after reading the upgrade notes." %
673          (v1, v2))
674     sys.exit(constants.EXIT_FAILURE)
675   except errors.ConfigurationError, err:
676     print >> sys.stderr, \
677         ("Configuration error while opening the configuration file: %s\n"
678          "This might be caused by an incomplete software upgrade or"
679          " by a corrupted configuration file. Until the problem is fixed"
680          " the master daemon cannot start." % str(err))
681     sys.exit(constants.EXIT_FAILURE)
682
683   # If CheckMaster didn't fail we believe we are the master, but we have to
684   # confirm with the other nodes.
685   if options.no_voting:
686     if not options.yes_do_it:
687       sys.stdout.write("The 'no voting' option has been selected.\n")
688       sys.stdout.write("This is dangerous, please confirm by"
689                        " typing uppercase 'yes': ")
690       sys.stdout.flush()
691
692       confirmation = sys.stdin.readline().strip()
693       if confirmation != "YES":
694         print >> sys.stderr, "Aborting."
695         sys.exit(constants.EXIT_FAILURE)
696
697   else:
698     # CheckAgreement uses RPC and threads, hence it needs to be run in
699     # a separate process before we call utils.Daemonize in the current
700     # process.
701     if not utils.RunInSeparateProcess(CheckAgreement):
702       sys.exit(constants.EXIT_FAILURE)
703
704   # ActivateMasterIP also uses RPC/threads, so we run it again via a
705   # separate process.
706
707   # TODO: decide whether failure to activate the master IP is a fatal error
708   utils.RunInSeparateProcess(ActivateMasterIP)
709
710
711 def PrepMasterd(options, _):
712   """Prep master daemon function, executed with the PID file held.
713
714   """
715   # This is safe to do as the pid file guarantees against
716   # concurrent execution.
717   utils.RemoveFile(pathutils.MASTER_SOCKET)
718
719   mainloop = daemon.Mainloop()
720   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
721   return (mainloop, master)
722
723
724 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
725   """Main master daemon function, executed with the PID file held.
726
727   """
728   (mainloop, master) = prep_data
729   try:
730     rpc.Init()
731     try:
732       master.setup_queue()
733       try:
734         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
735       finally:
736         master.server_cleanup()
737     finally:
738       rpc.Shutdown()
739   finally:
740     utils.RemoveFile(pathutils.MASTER_SOCKET)
741
742   logging.info("Clean master daemon shutdown")
743
744
745 def Main():
746   """Main function"""
747   parser = OptionParser(description="Ganeti master daemon",
748                         usage="%prog [-f] [-d]",
749                         version="%%prog (ganeti) %s" %
750                         constants.RELEASE_VERSION)
751   parser.add_option("--no-voting", dest="no_voting",
752                     help="Do not check that the nodes agree on this node"
753                     " being the master and start the daemon unconditionally",
754                     default=False, action="store_true")
755   parser.add_option("--yes-do-it", dest="yes_do_it",
756                     help="Override interactive check for --no-voting",
757                     default=False, action="store_true")
758   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
759                      ExecMasterd, multithreaded=True)