Merge branch 'stable-2.9' into stable-2.10
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62 from ganeti import ht
63
64 from ganeti.utils import version
65
66
67 CLIENT_REQUEST_WORKERS = 16
68
69 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
70 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
71
72
73 def _LogNewJob(status, info, ops):
74   """Log information about a recently submitted job.
75
76   """
77   op_summary = utils.CommaJoin(op.Summary() for op in ops)
78
79   if status:
80     logging.info("New job with id %s, summary: %s", info, op_summary)
81   else:
82     logging.info("Failed to submit job, reason: '%s', summary: %s",
83                  info, op_summary)
84
85
86 class ClientRequestWorker(workerpool.BaseWorker):
87   # pylint: disable=W0221
88   def RunTask(self, server, message, client):
89     """Process the request.
90
91     """
92     client_ops = ClientOps(server)
93
94     try:
95       (method, args, ver) = luxi.ParseRequest(message)
96     except luxi.ProtocolError, err:
97       logging.error("Protocol Error: %s", err)
98       client.close_log()
99       return
100
101     success = False
102     try:
103       # Verify client's version if there was one in the request
104       if ver is not None and ver != constants.LUXI_VERSION:
105         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
106                                (constants.LUXI_VERSION, ver))
107
108       result = client_ops.handle_request(method, args)
109       success = True
110     except errors.GenericError, err:
111       logging.exception("Unexpected exception")
112       success = False
113       result = errors.EncodeException(err)
114     except:
115       logging.exception("Unexpected exception")
116       err = sys.exc_info()
117       result = "Caught exception: %s" % str(err[1])
118
119     try:
120       reply = luxi.FormatResponse(success, result)
121       client.send_message(reply)
122       # awake the main thread so that it can write out the data.
123       server.awaker.signal()
124     except: # pylint: disable=W0702
125       logging.exception("Send error")
126       client.close_log()
127
128
129 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
130   """Handler for master peers.
131
132   """
133   _MAX_UNHANDLED = 1
134
135   def __init__(self, server, connected_socket, client_address, family):
136     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
137                                                  client_address,
138                                                  constants.LUXI_EOM,
139                                                  family, self._MAX_UNHANDLED)
140     self.server = server
141
142   def handle_message(self, message, _):
143     self.server.request_workers.AddTask((self.server, message, self))
144
145
146 class _MasterShutdownCheck:
147   """Logic for master daemon shutdown.
148
149   """
150   #: How long to wait between checks
151   _CHECK_INTERVAL = 5.0
152
153   #: How long to wait after all jobs are done (e.g. to give clients time to
154   #: retrieve the job status)
155   _SHUTDOWN_LINGER = 5.0
156
157   def __init__(self):
158     """Initializes this class.
159
160     """
161     self._had_active_jobs = None
162     self._linger_timeout = None
163
164   def __call__(self, jq_prepare_result):
165     """Determines if master daemon is ready for shutdown.
166
167     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
168     @rtype: None or number
169     @return: None if master daemon is ready, timeout if the check must be
170              repeated
171
172     """
173     if jq_prepare_result:
174       # Check again shortly
175       logging.info("Job queue has been notified for shutdown but is still"
176                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
177       self._had_active_jobs = True
178       return self._CHECK_INTERVAL
179
180     if not self._had_active_jobs:
181       # Can shut down as there were no active jobs on the first check
182       return None
183
184     # No jobs are running anymore, but maybe some clients want to collect some
185     # information. Give them a short amount of time.
186     if self._linger_timeout is None:
187       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
188
189     remaining = self._linger_timeout.Remaining()
190
191     logging.info("Job queue no longer busy; shutting down master daemon"
192                  " in %s seconds", remaining)
193
194     # TODO: Should the master daemon socket be closed at this point? Doing so
195     # wouldn't affect existing connections.
196
197     if remaining < 0:
198       return None
199     else:
200       return remaining
201
202
203 class MasterServer(daemon.AsyncStreamServer):
204   """Master Server.
205
206   This is the main asynchronous master server. It handles connections to the
207   master socket.
208
209   """
210   family = socket.AF_UNIX
211
212   def __init__(self, address, uid, gid):
213     """MasterServer constructor
214
215     @param address: the unix socket address to bind the MasterServer to
216     @param uid: The uid of the owner of the socket
217     @param gid: The gid of the owner of the socket
218
219     """
220     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
221     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
222     os.chmod(temp_name, 0770)
223     os.chown(temp_name, uid, gid)
224     os.rename(temp_name, address)
225
226     self.awaker = daemon.AsyncAwaker()
227
228     # We'll only start threads once we've forked.
229     self.context = None
230     self.request_workers = None
231
232     self._shutdown_check = None
233
234   def handle_connection(self, connected_socket, client_address):
235     # TODO: add connection count and limit the number of open connections to a
236     # maximum number to avoid breaking for lack of file descriptors or memory.
237     MasterClientHandler(self, connected_socket, client_address, self.family)
238
239   def setup_queue(self):
240     self.context = GanetiContext()
241     self.request_workers = workerpool.WorkerPool("ClientReq",
242                                                  CLIENT_REQUEST_WORKERS,
243                                                  ClientRequestWorker)
244
245   def WaitForShutdown(self):
246     """Prepares server for shutdown.
247
248     """
249     if self._shutdown_check is None:
250       self._shutdown_check = _MasterShutdownCheck()
251
252     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
253
254   def server_cleanup(self):
255     """Cleanup the server.
256
257     This involves shutting down the processor threads and the master
258     socket.
259
260     """
261     try:
262       self.close()
263     finally:
264       if self.request_workers:
265         self.request_workers.TerminateWorkers()
266       if self.context:
267         self.context.jobqueue.Shutdown()
268
269
270 class ClientOps:
271   """Class holding high-level client operations."""
272   def __init__(self, server):
273     self.server = server
274
275   def handle_request(self, method, args): # pylint: disable=R0911
276     context = self.server.context
277     queue = context.jobqueue
278
279     # TODO: Parameter validation
280     if not isinstance(args, (tuple, list)):
281       logging.info("Received invalid arguments of type '%s'", type(args))
282       raise ValueError("Invalid arguments type '%s'" % type(args))
283
284     if method not in luxi.REQ_ALL:
285       logging.info("Received invalid request '%s'", method)
286       raise ValueError("Invalid operation '%s'" % method)
287
288     # TODO: Rewrite to not exit in each 'if/elif' branch
289
290     if method == luxi.REQ_SUBMIT_JOB:
291       logging.info("Receiving new job")
292       (job_def, ) = args
293       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
294       job_id = queue.SubmitJob(ops)
295       _LogNewJob(True, job_id, ops)
296       return job_id
297
298     elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
299       logging.info("Forcefully receiving new job")
300       (job_def, ) = args
301       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
302       job_id = queue.SubmitJobToDrainedQueue(ops)
303       _LogNewJob(True, job_id, ops)
304       return job_id
305
306     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
307       logging.info("Receiving multiple jobs")
308       (job_defs, ) = args
309       jobs = []
310       for ops in job_defs:
311         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
312       job_ids = queue.SubmitManyJobs(jobs)
313       for ((status, job_id), ops) in zip(job_ids, jobs):
314         _LogNewJob(status, job_id, ops)
315       return job_ids
316
317     elif method == luxi.REQ_CANCEL_JOB:
318       (job_id, ) = args
319       logging.info("Received job cancel request for %s", job_id)
320       return queue.CancelJob(job_id)
321
322     elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
323       (job_id, priority) = args
324       logging.info("Received request to change priority for job %s to %s",
325                    job_id, priority)
326       return queue.ChangeJobPriority(job_id, priority)
327
328     elif method == luxi.REQ_ARCHIVE_JOB:
329       (job_id, ) = args
330       logging.info("Received job archive request for %s", job_id)
331       return queue.ArchiveJob(job_id)
332
333     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
334       (age, timeout) = args
335       logging.info("Received job autoarchive request for age %s, timeout %s",
336                    age, timeout)
337       return queue.AutoArchiveJobs(age, timeout)
338
339     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
340       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
341       logging.info("Received job poll request for %s", job_id)
342       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
343                                      prev_log_serial, timeout)
344
345     elif method == luxi.REQ_QUERY:
346       (what, fields, qfilter) = args
347
348       if what in constants.QR_VIA_OP:
349         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
350                                              qfilter=qfilter))
351       elif what == constants.QR_LOCK:
352         if qfilter is not None:
353           raise errors.OpPrereqError("Lock queries can't be filtered",
354                                      errors.ECODE_INVAL)
355         return context.glm.QueryLocks(fields)
356       elif what == constants.QR_JOB:
357         return queue.QueryJobs(fields, qfilter)
358       elif what in constants.QR_VIA_LUXI:
359         raise NotImplementedError
360       else:
361         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
362                                    errors.ECODE_INVAL)
363
364       return result
365
366     elif method == luxi.REQ_QUERY_FIELDS:
367       (what, fields) = args
368       req = objects.QueryFieldsRequest(what=what, fields=fields)
369
370       try:
371         fielddefs = query.ALL_FIELDS[req.what]
372       except KeyError:
373         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
374                                    errors.ECODE_INVAL)
375
376       return query.QueryFields(fielddefs, req.fields)
377
378     elif method == luxi.REQ_QUERY_JOBS:
379       (job_ids, fields) = args
380       if isinstance(job_ids, (tuple, list)) and job_ids:
381         msg = utils.CommaJoin(job_ids)
382       else:
383         msg = str(job_ids)
384       logging.info("Received job query request for %s", msg)
385       return queue.OldStyleQueryJobs(job_ids, fields)
386
387     elif method == luxi.REQ_QUERY_INSTANCES:
388       (names, fields, use_locking) = args
389       logging.info("Received instance query request for %s", names)
390       if use_locking:
391         raise errors.OpPrereqError("Sync queries are not allowed",
392                                    errors.ECODE_INVAL)
393       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
394                                    use_locking=use_locking)
395       return self._Query(op)
396
397     elif method == luxi.REQ_QUERY_NODES:
398       (names, fields, use_locking) = args
399       logging.info("Received node query request for %s", names)
400       if use_locking:
401         raise errors.OpPrereqError("Sync queries are not allowed",
402                                    errors.ECODE_INVAL)
403       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
404                                use_locking=use_locking)
405       return self._Query(op)
406
407     elif method == luxi.REQ_QUERY_GROUPS:
408       (names, fields, use_locking) = args
409       logging.info("Received group query request for %s", names)
410       if use_locking:
411         raise errors.OpPrereqError("Sync queries are not allowed",
412                                    errors.ECODE_INVAL)
413       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
414       return self._Query(op)
415
416     elif method == luxi.REQ_QUERY_NETWORKS:
417       (names, fields, use_locking) = args
418       logging.info("Received network query request for %s", names)
419       if use_locking:
420         raise errors.OpPrereqError("Sync queries are not allowed",
421                                    errors.ECODE_INVAL)
422       op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
423       return self._Query(op)
424
425     elif method == luxi.REQ_QUERY_EXPORTS:
426       (nodes, use_locking) = args
427       if use_locking:
428         raise errors.OpPrereqError("Sync queries are not allowed",
429                                    errors.ECODE_INVAL)
430       logging.info("Received exports query request")
431       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
432       return self._Query(op)
433
434     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
435       (fields, ) = args
436       logging.info("Received config values query request for %s", fields)
437       op = opcodes.OpClusterConfigQuery(output_fields=fields)
438       return self._Query(op)
439
440     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
441       logging.info("Received cluster info query request")
442       op = opcodes.OpClusterQuery()
443       return self._Query(op)
444
445     elif method == luxi.REQ_QUERY_TAGS:
446       (kind, name) = args
447       logging.info("Received tags query request")
448       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
449       return self._Query(op)
450
451     elif method == luxi.REQ_SET_DRAIN_FLAG:
452       (drain_flag, ) = args
453       logging.info("Received queue drain flag change request to %s",
454                    drain_flag)
455       return queue.SetDrainFlag(drain_flag)
456
457     elif method == luxi.REQ_SET_WATCHER_PAUSE:
458       (until, ) = args
459
460       return _SetWatcherPause(context, until)
461
462     else:
463       logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
464       raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
465                                    " but not implemented" % method)
466
467   def _Query(self, op):
468     """Runs the specified opcode and returns the result.
469
470     """
471     # Queries don't have a job id
472     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
473
474     # TODO: Executing an opcode using locks will acquire them in blocking mode.
475     # Consider using a timeout for retries.
476     return proc.ExecOpCode(op, None)
477
478
479 class GanetiContext(object):
480   """Context common to all ganeti threads.
481
482   This class creates and holds common objects shared by all threads.
483
484   """
485   # pylint: disable=W0212
486   # we do want to ensure a singleton here
487   _instance = None
488
489   def __init__(self):
490     """Constructs a new GanetiContext object.
491
492     There should be only a GanetiContext object at any time, so this
493     function raises an error if this is not the case.
494
495     """
496     assert self.__class__._instance is None, "double GanetiContext instance"
497
498     # Create global configuration object
499     self.cfg = config.ConfigWriter()
500
501     # Locking manager
502     self.glm = locking.GanetiLockManager(
503       self.cfg.GetNodeList(),
504       self.cfg.GetNodeGroupList(),
505       [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
506       self.cfg.GetNetworkList())
507
508     self.cfg.SetContext(self)
509
510     # RPC runner
511     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
512
513     # Job queue
514     self.jobqueue = jqueue.JobQueue(self)
515
516     # setting this also locks the class against attribute modifications
517     self.__class__._instance = self
518
519   def __setattr__(self, name, value):
520     """Setting GanetiContext attributes is forbidden after initialization.
521
522     """
523     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
524     object.__setattr__(self, name, value)
525
526   def AddNode(self, node, ec_id):
527     """Adds a node to the configuration and lock manager.
528
529     """
530     # Add it to the configuration
531     self.cfg.AddNode(node, ec_id)
532
533     # If preseeding fails it'll not be added
534     self.jobqueue.AddNode(node)
535
536     # Add the new node to the Ganeti Lock Manager
537     self.glm.add(locking.LEVEL_NODE, node.uuid)
538     self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
539
540   def ReaddNode(self, node):
541     """Updates a node that's already in the configuration
542
543     """
544     # Synchronize the queue again
545     self.jobqueue.AddNode(node)
546
547   def RemoveNode(self, node):
548     """Removes a node from the configuration and lock manager.
549
550     """
551     # Remove node from configuration
552     self.cfg.RemoveNode(node.uuid)
553
554     # Notify job queue
555     self.jobqueue.RemoveNode(node.name)
556
557     # Remove the node from the Ganeti Lock Manager
558     self.glm.remove(locking.LEVEL_NODE, node.uuid)
559     self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
560
561
562 def _SetWatcherPause(context, until):
563   """Creates or removes the watcher pause file.
564
565   @type context: L{GanetiContext}
566   @param context: Global Ganeti context
567   @type until: None or int
568   @param until: Unix timestamp saying until when the watcher shouldn't run
569
570   """
571   node_names = context.cfg.GetNodeList()
572
573   if until is None:
574     logging.info("Received request to no longer pause watcher")
575   else:
576     if not ht.TNumber(until):
577       raise TypeError("Duration must be numeric")
578
579     if until < time.time():
580       raise errors.GenericError("Unable to set pause end time in the past")
581
582     logging.info("Received request to pause watcher until %s", until)
583
584   result = context.rpc.call_set_watcher_pause(node_names, until)
585
586   errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
587                            for (node_name, nres) in result.items()
588                            if nres.fail_msg and not nres.offline)
589   if errmsg:
590     raise errors.OpExecError("Watcher pause was set where possible, but failed"
591                              " on the following node(s): %s" % errmsg)
592
593   return until
594
595
596 @rpc.RunWithRPC
597 def CheckAgreement():
598   """Check the agreement on who is the master.
599
600   The function uses a very simple algorithm: we must get more positive
601   than negative answers. Since in most of the cases we are the master,
602   we'll use our own config file for getting the node list. In the
603   future we could collect the current node list from our (possibly
604   obsolete) known nodes.
605
606   In order to account for cold-start of all nodes, we retry for up to
607   a minute until we get a real answer as the top-voted one. If the
608   nodes are more out-of-sync, for now manual startup of the master
609   should be attempted.
610
611   Note that for a even number of nodes cluster, we need at least half
612   of the nodes (beside ourselves) to vote for us. This creates a
613   problem on two-node clusters, since in this case we require the
614   other node to be up too to confirm our status.
615
616   """
617   myself = netutils.Hostname.GetSysName()
618   #temp instantiation of a config writer, used only to get the node list
619   cfg = config.ConfigWriter()
620   node_names = cfg.GetNodeNames(cfg.GetNodeList())
621   del cfg
622   retries = 6
623   while retries > 0:
624     votes = bootstrap.GatherMasterVotes(node_names)
625     if not votes:
626       # empty node list, this is a one node cluster
627       return True
628     if votes[0][0] is None:
629       retries -= 1
630       time.sleep(10)
631       continue
632     break
633   if retries == 0:
634     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
635                      " after multiple retries. Aborting startup")
636     logging.critical("Use the --no-voting option if you understand what"
637                      " effects it has on the cluster state")
638     return False
639   # here a real node is at the top of the list
640   all_votes = sum(item[1] for item in votes)
641   top_node, top_votes = votes[0]
642
643   result = False
644   if top_node != myself:
645     logging.critical("It seems we are not the master (top-voted node"
646                      " is %s with %d out of %d votes)", top_node, top_votes,
647                      all_votes)
648   elif top_votes < all_votes - top_votes:
649     logging.critical("It seems we are not the master (%d votes for,"
650                      " %d votes against)", top_votes, all_votes - top_votes)
651   else:
652     result = True
653
654   return result
655
656
657 @rpc.RunWithRPC
658 def ActivateMasterIP():
659   # activate ip
660   cfg = config.ConfigWriter()
661   master_params = cfg.GetMasterNetworkParameters()
662   ems = cfg.GetUseExternalMipScript()
663   runner = rpc.BootstrapRunner()
664   # we use the node name, as the configuration is only available here yet
665   result = runner.call_node_activate_master_ip(
666              cfg.GetNodeName(master_params.uuid), master_params, ems)
667
668   msg = result.fail_msg
669   if msg:
670     logging.error("Can't activate master IP address: %s", msg)
671
672
673 def CheckMasterd(options, args):
674   """Initial checks whether to run or exit with a failure.
675
676   """
677   if args: # masterd doesn't take any arguments
678     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
679     sys.exit(constants.EXIT_FAILURE)
680
681   ssconf.CheckMaster(options.debug)
682
683   try:
684     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
685     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
686   except KeyError:
687     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
688                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
689     sys.exit(constants.EXIT_FAILURE)
690
691   # Determine static runtime architecture information
692   runtime.InitArchInfo()
693
694   # Check the configuration is sane before anything else
695   try:
696     config.ConfigWriter()
697   except errors.ConfigVersionMismatch, err:
698     v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
699     v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
700     print >> sys.stderr,  \
701         ("Configuration version mismatch. The current Ganeti software"
702          " expects version %s, but the on-disk configuration file has"
703          " version %s. This is likely the result of upgrading the"
704          " software without running the upgrade procedure. Please contact"
705          " your cluster administrator or complete the upgrade using the"
706          " cfgupgrade utility, after reading the upgrade notes." %
707          (v1, v2))
708     sys.exit(constants.EXIT_FAILURE)
709   except errors.ConfigurationError, err:
710     print >> sys.stderr, \
711         ("Configuration error while opening the configuration file: %s\n"
712          "This might be caused by an incomplete software upgrade or"
713          " by a corrupted configuration file. Until the problem is fixed"
714          " the master daemon cannot start." % str(err))
715     sys.exit(constants.EXIT_FAILURE)
716
717   # If CheckMaster didn't fail we believe we are the master, but we have to
718   # confirm with the other nodes.
719   if options.no_voting:
720     if not options.yes_do_it:
721       sys.stdout.write("The 'no voting' option has been selected.\n")
722       sys.stdout.write("This is dangerous, please confirm by"
723                        " typing uppercase 'yes': ")
724       sys.stdout.flush()
725
726       confirmation = sys.stdin.readline().strip()
727       if confirmation != "YES":
728         print >> sys.stderr, "Aborting."
729         sys.exit(constants.EXIT_FAILURE)
730
731   else:
732     # CheckAgreement uses RPC and threads, hence it needs to be run in
733     # a separate process before we call utils.Daemonize in the current
734     # process.
735     if not utils.RunInSeparateProcess(CheckAgreement):
736       sys.exit(constants.EXIT_FAILURE)
737
738   # ActivateMasterIP also uses RPC/threads, so we run it again via a
739   # separate process.
740
741   # TODO: decide whether failure to activate the master IP is a fatal error
742   utils.RunInSeparateProcess(ActivateMasterIP)
743
744
745 def PrepMasterd(options, _):
746   """Prep master daemon function, executed with the PID file held.
747
748   """
749   # This is safe to do as the pid file guarantees against
750   # concurrent execution.
751   utils.RemoveFile(pathutils.MASTER_SOCKET)
752
753   mainloop = daemon.Mainloop()
754   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
755   return (mainloop, master)
756
757
758 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
759   """Main master daemon function, executed with the PID file held.
760
761   """
762   (mainloop, master) = prep_data
763   try:
764     rpc.Init()
765     try:
766       master.setup_queue()
767       try:
768         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
769       finally:
770         master.server_cleanup()
771     finally:
772       rpc.Shutdown()
773   finally:
774     utils.RemoveFile(pathutils.MASTER_SOCKET)
775
776   logging.info("Clean master daemon shutdown")
777
778
779 def Main():
780   """Main function"""
781   parser = OptionParser(description="Ganeti master daemon",
782                         usage="%prog [-f] [-d]",
783                         version="%%prog (ganeti) %s" %
784                         constants.RELEASE_VERSION)
785   parser.add_option("--no-voting", dest="no_voting",
786                     help="Do not check that the nodes agree on this node"
787                     " being the master and start the daemon unconditionally",
788                     default=False, action="store_true")
789   parser.add_option("--yes-do-it", dest="yes_do_it",
790                     help="Override interactive check for --no-voting",
791                     default=False, action="store_true")
792   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
793                      ExecMasterd, multithreaded=True)