Fix inconsistency in the LUXI protocol w.r.t. args
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61
62
63 CLIENT_REQUEST_WORKERS = 16
64
65 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
67
68
69 class ClientRequestWorker(workerpool.BaseWorker):
70   # pylint: disable=W0221
71   def RunTask(self, server, message, client):
72     """Process the request.
73
74     """
75     client_ops = ClientOps(server)
76
77     try:
78       (method, args, version) = luxi.ParseRequest(message)
79     except luxi.ProtocolError, err:
80       logging.error("Protocol Error: %s", err)
81       client.close_log()
82       return
83
84     success = False
85     try:
86       # Verify client's version if there was one in the request
87       if version is not None and version != constants.LUXI_VERSION:
88         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
89                                (constants.LUXI_VERSION, version))
90
91       result = client_ops.handle_request(method, args)
92       success = True
93     except errors.GenericError, err:
94       logging.exception("Unexpected exception")
95       success = False
96       result = errors.EncodeException(err)
97     except:
98       logging.exception("Unexpected exception")
99       err = sys.exc_info()
100       result = "Caught exception: %s" % str(err[1])
101
102     try:
103       reply = luxi.FormatResponse(success, result)
104       client.send_message(reply)
105       # awake the main thread so that it can write out the data.
106       server.awaker.signal()
107     except: # pylint: disable=W0702
108       logging.exception("Send error")
109       client.close_log()
110
111
112 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
113   """Handler for master peers.
114
115   """
116   _MAX_UNHANDLED = 1
117
118   def __init__(self, server, connected_socket, client_address, family):
119     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
120                                                  client_address,
121                                                  constants.LUXI_EOM,
122                                                  family, self._MAX_UNHANDLED)
123     self.server = server
124
125   def handle_message(self, message, _):
126     self.server.request_workers.AddTask((self.server, message, self))
127
128
129 class _MasterShutdownCheck:
130   """Logic for master daemon shutdown.
131
132   """
133   #: How long to wait between checks
134   _CHECK_INTERVAL = 5.0
135
136   #: How long to wait after all jobs are done (e.g. to give clients time to
137   #: retrieve the job status)
138   _SHUTDOWN_LINGER = 5.0
139
140   def __init__(self):
141     """Initializes this class.
142
143     """
144     self._had_active_jobs = None
145     self._linger_timeout = None
146
147   def __call__(self, jq_prepare_result):
148     """Determines if master daemon is ready for shutdown.
149
150     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
151     @rtype: None or number
152     @return: None if master daemon is ready, timeout if the check must be
153              repeated
154
155     """
156     if jq_prepare_result:
157       # Check again shortly
158       logging.info("Job queue has been notified for shutdown but is still"
159                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
160       self._had_active_jobs = True
161       return self._CHECK_INTERVAL
162
163     if not self._had_active_jobs:
164       # Can shut down as there were no active jobs on the first check
165       return None
166
167     # No jobs are running anymore, but maybe some clients want to collect some
168     # information. Give them a short amount of time.
169     if self._linger_timeout is None:
170       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
171
172     remaining = self._linger_timeout.Remaining()
173
174     logging.info("Job queue no longer busy; shutting down master daemon"
175                  " in %s seconds", remaining)
176
177     # TODO: Should the master daemon socket be closed at this point? Doing so
178     # wouldn't affect existing connections.
179
180     if remaining < 0:
181       return None
182     else:
183       return remaining
184
185
186 class MasterServer(daemon.AsyncStreamServer):
187   """Master Server.
188
189   This is the main asynchronous master server. It handles connections to the
190   master socket.
191
192   """
193   family = socket.AF_UNIX
194
195   def __init__(self, address, uid, gid):
196     """MasterServer constructor
197
198     @param address: the unix socket address to bind the MasterServer to
199     @param uid: The uid of the owner of the socket
200     @param gid: The gid of the owner of the socket
201
202     """
203     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
204     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
205     os.chmod(temp_name, 0770)
206     os.chown(temp_name, uid, gid)
207     os.rename(temp_name, address)
208
209     self.awaker = daemon.AsyncAwaker()
210
211     # We'll only start threads once we've forked.
212     self.context = None
213     self.request_workers = None
214
215     self._shutdown_check = None
216
217   def handle_connection(self, connected_socket, client_address):
218     # TODO: add connection count and limit the number of open connections to a
219     # maximum number to avoid breaking for lack of file descriptors or memory.
220     MasterClientHandler(self, connected_socket, client_address, self.family)
221
222   def setup_queue(self):
223     self.context = GanetiContext()
224     self.request_workers = workerpool.WorkerPool("ClientReq",
225                                                  CLIENT_REQUEST_WORKERS,
226                                                  ClientRequestWorker)
227
228   def WaitForShutdown(self):
229     """Prepares server for shutdown.
230
231     """
232     if self._shutdown_check is None:
233       self._shutdown_check = _MasterShutdownCheck()
234
235     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
236
237   def server_cleanup(self):
238     """Cleanup the server.
239
240     This involves shutting down the processor threads and the master
241     socket.
242
243     """
244     try:
245       self.close()
246     finally:
247       if self.request_workers:
248         self.request_workers.TerminateWorkers()
249       if self.context:
250         self.context.jobqueue.Shutdown()
251
252
253 class ClientOps:
254   """Class holding high-level client operations."""
255   def __init__(self, server):
256     self.server = server
257
258   def handle_request(self, method, args): # pylint: disable=R0911
259     context = self.server.context
260     queue = context.jobqueue
261
262     # TODO: Parameter validation
263     if not isinstance(args, (tuple, list)):
264       logging.info("Received invalid arguments of type '%s'", type(args))
265       raise ValueError("Invalid arguments type '%s'" % type(args))
266
267     # TODO: Rewrite to not exit in each 'if/elif' branch
268
269     if method == luxi.REQ_SUBMIT_JOB:
270       logging.info("Received new job")
271       (job_def, ) = args
272       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
273       return queue.SubmitJob(ops)
274
275     if method == luxi.REQ_SUBMIT_MANY_JOBS:
276       logging.info("Received multiple jobs")
277       (job_defs, ) = args
278       jobs = []
279       for ops in job_defs:
280         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
281       return queue.SubmitManyJobs(jobs)
282
283     elif method == luxi.REQ_CANCEL_JOB:
284       (job_id, ) = args
285       logging.info("Received job cancel request for %s", job_id)
286       return queue.CancelJob(job_id)
287
288     elif method == luxi.REQ_ARCHIVE_JOB:
289       (job_id, ) = args
290       logging.info("Received job archive request for %s", job_id)
291       return queue.ArchiveJob(job_id)
292
293     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
294       (age, timeout) = args
295       logging.info("Received job autoarchive request for age %s, timeout %s",
296                    age, timeout)
297       return queue.AutoArchiveJobs(age, timeout)
298
299     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
300       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
301       logging.info("Received job poll request for %s", job_id)
302       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
303                                      prev_log_serial, timeout)
304
305     elif method == luxi.REQ_QUERY:
306       (what, fields, qfilter) = args
307
308       if what in constants.QR_VIA_OP:
309         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
310                                              qfilter=qfilter))
311       elif what == constants.QR_LOCK:
312         if qfilter is not None:
313           raise errors.OpPrereqError("Lock queries can't be filtered")
314         return context.glm.QueryLocks(fields)
315       elif what == constants.QR_JOB:
316         return queue.QueryJobs(fields, qfilter)
317       elif what in constants.QR_VIA_LUXI:
318         raise NotImplementedError
319       else:
320         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
321                                    errors.ECODE_INVAL)
322
323       return result
324
325     elif method == luxi.REQ_QUERY_FIELDS:
326       (what, fields) = args
327       req = objects.QueryFieldsRequest(what=what, fields=fields)
328
329       try:
330         fielddefs = query.ALL_FIELDS[req.what]
331       except KeyError:
332         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
333                                    errors.ECODE_INVAL)
334
335       return query.QueryFields(fielddefs, req.fields)
336
337     elif method == luxi.REQ_QUERY_JOBS:
338       (job_ids, fields) = args
339       if isinstance(job_ids, (tuple, list)) and job_ids:
340         msg = utils.CommaJoin(job_ids)
341       else:
342         msg = str(job_ids)
343       logging.info("Received job query request for %s", msg)
344       return queue.OldStyleQueryJobs(job_ids, fields)
345
346     elif method == luxi.REQ_QUERY_INSTANCES:
347       (names, fields, use_locking) = args
348       logging.info("Received instance query request for %s", names)
349       if use_locking:
350         raise errors.OpPrereqError("Sync queries are not allowed",
351                                    errors.ECODE_INVAL)
352       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
353                                    use_locking=use_locking)
354       return self._Query(op)
355
356     elif method == luxi.REQ_QUERY_NODES:
357       (names, fields, use_locking) = args
358       logging.info("Received node query request for %s", names)
359       if use_locking:
360         raise errors.OpPrereqError("Sync queries are not allowed",
361                                    errors.ECODE_INVAL)
362       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
363                                use_locking=use_locking)
364       return self._Query(op)
365
366     elif method == luxi.REQ_QUERY_GROUPS:
367       (names, fields, use_locking) = args
368       logging.info("Received group query request for %s", names)
369       if use_locking:
370         raise errors.OpPrereqError("Sync queries are not allowed",
371                                    errors.ECODE_INVAL)
372       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
373       return self._Query(op)
374
375     elif method == luxi.REQ_QUERY_EXPORTS:
376       (nodes, use_locking) = args
377       if use_locking:
378         raise errors.OpPrereqError("Sync queries are not allowed",
379                                    errors.ECODE_INVAL)
380       logging.info("Received exports query request")
381       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
382       return self._Query(op)
383
384     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
385       (fields, ) = args
386       logging.info("Received config values query request for %s", fields)
387       op = opcodes.OpClusterConfigQuery(output_fields=fields)
388       return self._Query(op)
389
390     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
391       logging.info("Received cluster info query request")
392       op = opcodes.OpClusterQuery()
393       return self._Query(op)
394
395     elif method == luxi.REQ_QUERY_TAGS:
396       (kind, name) = args
397       logging.info("Received tags query request")
398       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
399       return self._Query(op)
400
401     elif method == luxi.REQ_SET_DRAIN_FLAG:
402       (drain_flag, ) = args
403       logging.info("Received queue drain flag change request to %s",
404                    drain_flag)
405       return queue.SetDrainFlag(drain_flag)
406
407     elif method == luxi.REQ_SET_WATCHER_PAUSE:
408       (until, ) = args
409
410       if until is None:
411         logging.info("Received request to no longer pause the watcher")
412       else:
413         if not isinstance(until, (int, float)):
414           raise TypeError("Duration must be an integer or float")
415
416         if until < time.time():
417           raise errors.GenericError("Unable to set pause end time in the past")
418
419         logging.info("Received request to pause the watcher until %s", until)
420
421       return _SetWatcherPause(until)
422
423     else:
424       logging.info("Received invalid request '%s'", method)
425       raise ValueError("Invalid operation '%s'" % method)
426
427   def _Query(self, op):
428     """Runs the specified opcode and returns the result.
429
430     """
431     # Queries don't have a job id
432     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
433
434     # TODO: Executing an opcode using locks will acquire them in blocking mode.
435     # Consider using a timeout for retries.
436     return proc.ExecOpCode(op, None)
437
438
439 class GanetiContext(object):
440   """Context common to all ganeti threads.
441
442   This class creates and holds common objects shared by all threads.
443
444   """
445   # pylint: disable=W0212
446   # we do want to ensure a singleton here
447   _instance = None
448
449   def __init__(self):
450     """Constructs a new GanetiContext object.
451
452     There should be only a GanetiContext object at any time, so this
453     function raises an error if this is not the case.
454
455     """
456     assert self.__class__._instance is None, "double GanetiContext instance"
457
458     # Create global configuration object
459     self.cfg = config.ConfigWriter()
460
461     # Locking manager
462     self.glm = locking.GanetiLockManager(
463                 self.cfg.GetNodeList(),
464                 self.cfg.GetNodeGroupList(),
465                 self.cfg.GetInstanceList())
466
467     self.cfg.SetContext(self)
468
469     # RPC runner
470     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
471
472     # Job queue
473     self.jobqueue = jqueue.JobQueue(self)
474
475     # setting this also locks the class against attribute modifications
476     self.__class__._instance = self
477
478   def __setattr__(self, name, value):
479     """Setting GanetiContext attributes is forbidden after initialization.
480
481     """
482     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
483     object.__setattr__(self, name, value)
484
485   def AddNode(self, node, ec_id):
486     """Adds a node to the configuration and lock manager.
487
488     """
489     # Add it to the configuration
490     self.cfg.AddNode(node, ec_id)
491
492     # If preseeding fails it'll not be added
493     self.jobqueue.AddNode(node)
494
495     # Add the new node to the Ganeti Lock Manager
496     self.glm.add(locking.LEVEL_NODE, node.name)
497     self.glm.add(locking.LEVEL_NODE_RES, node.name)
498
499   def ReaddNode(self, node):
500     """Updates a node that's already in the configuration
501
502     """
503     # Synchronize the queue again
504     self.jobqueue.AddNode(node)
505
506   def RemoveNode(self, name):
507     """Removes a node from the configuration and lock manager.
508
509     """
510     # Remove node from configuration
511     self.cfg.RemoveNode(name)
512
513     # Notify job queue
514     self.jobqueue.RemoveNode(name)
515
516     # Remove the node from the Ganeti Lock Manager
517     self.glm.remove(locking.LEVEL_NODE, name)
518     self.glm.remove(locking.LEVEL_NODE_RES, name)
519
520
521 def _SetWatcherPause(until):
522   """Creates or removes the watcher pause file.
523
524   @type until: None or int
525   @param until: Unix timestamp saying until when the watcher shouldn't run
526
527   """
528   if until is None:
529     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
530   else:
531     utils.WriteFile(constants.WATCHER_PAUSEFILE,
532                     data="%d\n" % (until, ))
533
534   return until
535
536
537 @rpc.RunWithRPC
538 def CheckAgreement():
539   """Check the agreement on who is the master.
540
541   The function uses a very simple algorithm: we must get more positive
542   than negative answers. Since in most of the cases we are the master,
543   we'll use our own config file for getting the node list. In the
544   future we could collect the current node list from our (possibly
545   obsolete) known nodes.
546
547   In order to account for cold-start of all nodes, we retry for up to
548   a minute until we get a real answer as the top-voted one. If the
549   nodes are more out-of-sync, for now manual startup of the master
550   should be attempted.
551
552   Note that for a even number of nodes cluster, we need at least half
553   of the nodes (beside ourselves) to vote for us. This creates a
554   problem on two-node clusters, since in this case we require the
555   other node to be up too to confirm our status.
556
557   """
558   myself = netutils.Hostname.GetSysName()
559   #temp instantiation of a config writer, used only to get the node list
560   cfg = config.ConfigWriter()
561   node_list = cfg.GetNodeList()
562   del cfg
563   retries = 6
564   while retries > 0:
565     votes = bootstrap.GatherMasterVotes(node_list)
566     if not votes:
567       # empty node list, this is a one node cluster
568       return True
569     if votes[0][0] is None:
570       retries -= 1
571       time.sleep(10)
572       continue
573     break
574   if retries == 0:
575     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
576                      " after multiple retries. Aborting startup")
577     logging.critical("Use the --no-voting option if you understand what"
578                      " effects it has on the cluster state")
579     return False
580   # here a real node is at the top of the list
581   all_votes = sum(item[1] for item in votes)
582   top_node, top_votes = votes[0]
583
584   result = False
585   if top_node != myself:
586     logging.critical("It seems we are not the master (top-voted node"
587                      " is %s with %d out of %d votes)", top_node, top_votes,
588                      all_votes)
589   elif top_votes < all_votes - top_votes:
590     logging.critical("It seems we are not the master (%d votes for,"
591                      " %d votes against)", top_votes, all_votes - top_votes)
592   else:
593     result = True
594
595   return result
596
597
598 @rpc.RunWithRPC
599 def ActivateMasterIP():
600   # activate ip
601   cfg = config.ConfigWriter()
602   master_params = cfg.GetMasterNetworkParameters()
603   ems = cfg.GetUseExternalMipScript()
604   runner = rpc.BootstrapRunner()
605   result = runner.call_node_activate_master_ip(master_params.name,
606                                                master_params, ems)
607
608   msg = result.fail_msg
609   if msg:
610     logging.error("Can't activate master IP address: %s", msg)
611
612
613 def CheckMasterd(options, args):
614   """Initial checks whether to run or exit with a failure.
615
616   """
617   if args: # masterd doesn't take any arguments
618     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
619     sys.exit(constants.EXIT_FAILURE)
620
621   ssconf.CheckMaster(options.debug)
622
623   try:
624     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
625     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
626   except KeyError:
627     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
628                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
629     sys.exit(constants.EXIT_FAILURE)
630
631   # Determine static runtime architecture information
632   runtime.InitArchInfo()
633
634   # Check the configuration is sane before anything else
635   try:
636     config.ConfigWriter()
637   except errors.ConfigVersionMismatch, err:
638     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
639     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
640     print >> sys.stderr,  \
641         ("Configuration version mismatch. The current Ganeti software"
642          " expects version %s, but the on-disk configuration file has"
643          " version %s. This is likely the result of upgrading the"
644          " software without running the upgrade procedure. Please contact"
645          " your cluster administrator or complete the upgrade using the"
646          " cfgupgrade utility, after reading the upgrade notes." %
647          (v1, v2))
648     sys.exit(constants.EXIT_FAILURE)
649   except errors.ConfigurationError, err:
650     print >> sys.stderr, \
651         ("Configuration error while opening the configuration file: %s\n"
652          "This might be caused by an incomplete software upgrade or"
653          " by a corrupted configuration file. Until the problem is fixed"
654          " the master daemon cannot start." % str(err))
655     sys.exit(constants.EXIT_FAILURE)
656
657   # If CheckMaster didn't fail we believe we are the master, but we have to
658   # confirm with the other nodes.
659   if options.no_voting:
660     if not options.yes_do_it:
661       sys.stdout.write("The 'no voting' option has been selected.\n")
662       sys.stdout.write("This is dangerous, please confirm by"
663                        " typing uppercase 'yes': ")
664       sys.stdout.flush()
665
666       confirmation = sys.stdin.readline().strip()
667       if confirmation != "YES":
668         print >> sys.stderr, "Aborting."
669         sys.exit(constants.EXIT_FAILURE)
670
671   else:
672     # CheckAgreement uses RPC and threads, hence it needs to be run in
673     # a separate process before we call utils.Daemonize in the current
674     # process.
675     if not utils.RunInSeparateProcess(CheckAgreement):
676       sys.exit(constants.EXIT_FAILURE)
677
678   # ActivateMasterIP also uses RPC/threads, so we run it again via a
679   # separate process.
680
681   # TODO: decide whether failure to activate the master IP is a fatal error
682   utils.RunInSeparateProcess(ActivateMasterIP)
683
684
685 def PrepMasterd(options, _):
686   """Prep master daemon function, executed with the PID file held.
687
688   """
689   # This is safe to do as the pid file guarantees against
690   # concurrent execution.
691   utils.RemoveFile(constants.MASTER_SOCKET)
692
693   mainloop = daemon.Mainloop()
694   master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
695   return (mainloop, master)
696
697
698 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
699   """Main master daemon function, executed with the PID file held.
700
701   """
702   (mainloop, master) = prep_data
703   try:
704     rpc.Init()
705     try:
706       master.setup_queue()
707       try:
708         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
709       finally:
710         master.server_cleanup()
711     finally:
712       rpc.Shutdown()
713   finally:
714     utils.RemoveFile(constants.MASTER_SOCKET)
715
716   logging.info("Clean master daemon shutdown")
717
718
719 def Main():
720   """Main function"""
721   parser = OptionParser(description="Ganeti master daemon",
722                         usage="%prog [-f] [-d]",
723                         version="%%prog (ganeti) %s" %
724                         constants.RELEASE_VERSION)
725   parser.add_option("--no-voting", dest="no_voting",
726                     help="Do not check that the nodes agree on this node"
727                     " being the master and start the daemon unconditionally",
728                     default=False, action="store_true")
729   parser.add_option("--yes-do-it", dest="yes_do_it",
730                     help="Override interactive check for --no-voting",
731                     default=False, action="store_true")
732   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
733                      ExecMasterd, multithreaded=True)