Remove the python confd server side code
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
62
63
64 CLIENT_REQUEST_WORKERS = 16
65
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68
69
70 class ClientRequestWorker(workerpool.BaseWorker):
71   # pylint: disable=W0221
72   def RunTask(self, server, message, client):
73     """Process the request.
74
75     """
76     client_ops = ClientOps(server)
77
78     try:
79       (method, args, version) = luxi.ParseRequest(message)
80     except luxi.ProtocolError, err:
81       logging.error("Protocol Error: %s", err)
82       client.close_log()
83       return
84
85     success = False
86     try:
87       # Verify client's version if there was one in the request
88       if version is not None and version != constants.LUXI_VERSION:
89         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
90                                (constants.LUXI_VERSION, version))
91
92       result = client_ops.handle_request(method, args)
93       success = True
94     except errors.GenericError, err:
95       logging.exception("Unexpected exception")
96       success = False
97       result = errors.EncodeException(err)
98     except:
99       logging.exception("Unexpected exception")
100       err = sys.exc_info()
101       result = "Caught exception: %s" % str(err[1])
102
103     try:
104       reply = luxi.FormatResponse(success, result)
105       client.send_message(reply)
106       # awake the main thread so that it can write out the data.
107       server.awaker.signal()
108     except: # pylint: disable=W0702
109       logging.exception("Send error")
110       client.close_log()
111
112
113 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
114   """Handler for master peers.
115
116   """
117   _MAX_UNHANDLED = 1
118
119   def __init__(self, server, connected_socket, client_address, family):
120     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
121                                                  client_address,
122                                                  constants.LUXI_EOM,
123                                                  family, self._MAX_UNHANDLED)
124     self.server = server
125
126   def handle_message(self, message, _):
127     self.server.request_workers.AddTask((self.server, message, self))
128
129
130 class _MasterShutdownCheck:
131   """Logic for master daemon shutdown.
132
133   """
134   #: How long to wait between checks
135   _CHECK_INTERVAL = 5.0
136
137   #: How long to wait after all jobs are done (e.g. to give clients time to
138   #: retrieve the job status)
139   _SHUTDOWN_LINGER = 5.0
140
141   def __init__(self):
142     """Initializes this class.
143
144     """
145     self._had_active_jobs = None
146     self._linger_timeout = None
147
148   def __call__(self, jq_prepare_result):
149     """Determines if master daemon is ready for shutdown.
150
151     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
152     @rtype: None or number
153     @return: None if master daemon is ready, timeout if the check must be
154              repeated
155
156     """
157     if jq_prepare_result:
158       # Check again shortly
159       logging.info("Job queue has been notified for shutdown but is still"
160                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
161       self._had_active_jobs = True
162       return self._CHECK_INTERVAL
163
164     if not self._had_active_jobs:
165       # Can shut down as there were no active jobs on the first check
166       return None
167
168     # No jobs are running anymore, but maybe some clients want to collect some
169     # information. Give them a short amount of time.
170     if self._linger_timeout is None:
171       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
172
173     remaining = self._linger_timeout.Remaining()
174
175     logging.info("Job queue no longer busy; shutting down master daemon"
176                  " in %s seconds", remaining)
177
178     # TODO: Should the master daemon socket be closed at this point? Doing so
179     # wouldn't affect existing connections.
180
181     if remaining < 0:
182       return None
183     else:
184       return remaining
185
186
187 class MasterServer(daemon.AsyncStreamServer):
188   """Master Server.
189
190   This is the main asynchronous master server. It handles connections to the
191   master socket.
192
193   """
194   family = socket.AF_UNIX
195
196   def __init__(self, address, uid, gid):
197     """MasterServer constructor
198
199     @param address: the unix socket address to bind the MasterServer to
200     @param uid: The uid of the owner of the socket
201     @param gid: The gid of the owner of the socket
202
203     """
204     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
205     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
206     os.chmod(temp_name, 0770)
207     os.chown(temp_name, uid, gid)
208     os.rename(temp_name, address)
209
210     self.awaker = daemon.AsyncAwaker()
211
212     # We'll only start threads once we've forked.
213     self.context = None
214     self.request_workers = None
215
216     self._shutdown_check = None
217
218   def handle_connection(self, connected_socket, client_address):
219     # TODO: add connection count and limit the number of open connections to a
220     # maximum number to avoid breaking for lack of file descriptors or memory.
221     MasterClientHandler(self, connected_socket, client_address, self.family)
222
223   def setup_queue(self):
224     self.context = GanetiContext()
225     self.request_workers = workerpool.WorkerPool("ClientReq",
226                                                  CLIENT_REQUEST_WORKERS,
227                                                  ClientRequestWorker)
228
229   def WaitForShutdown(self):
230     """Prepares server for shutdown.
231
232     """
233     if self._shutdown_check is None:
234       self._shutdown_check = _MasterShutdownCheck()
235
236     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
237
238   def server_cleanup(self):
239     """Cleanup the server.
240
241     This involves shutting down the processor threads and the master
242     socket.
243
244     """
245     try:
246       self.close()
247     finally:
248       if self.request_workers:
249         self.request_workers.TerminateWorkers()
250       if self.context:
251         self.context.jobqueue.Shutdown()
252
253
254 class ClientOps:
255   """Class holding high-level client operations."""
256   def __init__(self, server):
257     self.server = server
258
259   def handle_request(self, method, args): # pylint: disable=R0911
260     context = self.server.context
261     queue = context.jobqueue
262
263     # TODO: Parameter validation
264     if not isinstance(args, (tuple, list)):
265       logging.info("Received invalid arguments of type '%s'", type(args))
266       raise ValueError("Invalid arguments type '%s'" % type(args))
267
268     # TODO: Rewrite to not exit in each 'if/elif' branch
269
270     if method == luxi.REQ_SUBMIT_JOB:
271       logging.info("Received new job")
272       (job_def, ) = args
273       ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
274       return queue.SubmitJob(ops)
275
276     elif method == luxi.REQ_SUBMIT_MANY_JOBS:
277       logging.info("Received multiple jobs")
278       (job_defs, ) = args
279       jobs = []
280       for ops in job_defs:
281         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
282       return queue.SubmitManyJobs(jobs)
283
284     elif method == luxi.REQ_CANCEL_JOB:
285       (job_id, ) = args
286       logging.info("Received job cancel request for %s", job_id)
287       return queue.CancelJob(job_id)
288
289     elif method == luxi.REQ_ARCHIVE_JOB:
290       (job_id, ) = args
291       logging.info("Received job archive request for %s", job_id)
292       return queue.ArchiveJob(job_id)
293
294     elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
295       (age, timeout) = args
296       logging.info("Received job autoarchive request for age %s, timeout %s",
297                    age, timeout)
298       return queue.AutoArchiveJobs(age, timeout)
299
300     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
301       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
302       logging.info("Received job poll request for %s", job_id)
303       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
304                                      prev_log_serial, timeout)
305
306     elif method == luxi.REQ_QUERY:
307       (what, fields, qfilter) = args
308
309       if what in constants.QR_VIA_OP:
310         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
311                                              qfilter=qfilter))
312       elif what == constants.QR_LOCK:
313         if qfilter is not None:
314           raise errors.OpPrereqError("Lock queries can't be filtered",
315                                      errors.ECODE_INVAL)
316         return context.glm.QueryLocks(fields)
317       elif what == constants.QR_JOB:
318         return queue.QueryJobs(fields, qfilter)
319       elif what in constants.QR_VIA_LUXI:
320         raise NotImplementedError
321       else:
322         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
323                                    errors.ECODE_INVAL)
324
325       return result
326
327     elif method == luxi.REQ_QUERY_FIELDS:
328       (what, fields) = args
329       req = objects.QueryFieldsRequest(what=what, fields=fields)
330
331       try:
332         fielddefs = query.ALL_FIELDS[req.what]
333       except KeyError:
334         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
335                                    errors.ECODE_INVAL)
336
337       return query.QueryFields(fielddefs, req.fields)
338
339     elif method == luxi.REQ_QUERY_JOBS:
340       (job_ids, fields) = args
341       if isinstance(job_ids, (tuple, list)) and job_ids:
342         msg = utils.CommaJoin(job_ids)
343       else:
344         msg = str(job_ids)
345       logging.info("Received job query request for %s", msg)
346       return queue.OldStyleQueryJobs(job_ids, fields)
347
348     elif method == luxi.REQ_QUERY_INSTANCES:
349       (names, fields, use_locking) = args
350       logging.info("Received instance query request for %s", names)
351       if use_locking:
352         raise errors.OpPrereqError("Sync queries are not allowed",
353                                    errors.ECODE_INVAL)
354       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
355                                    use_locking=use_locking)
356       return self._Query(op)
357
358     elif method == luxi.REQ_QUERY_NODES:
359       (names, fields, use_locking) = args
360       logging.info("Received node query request for %s", names)
361       if use_locking:
362         raise errors.OpPrereqError("Sync queries are not allowed",
363                                    errors.ECODE_INVAL)
364       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
365                                use_locking=use_locking)
366       return self._Query(op)
367
368     elif method == luxi.REQ_QUERY_GROUPS:
369       (names, fields, use_locking) = args
370       logging.info("Received group query request for %s", names)
371       if use_locking:
372         raise errors.OpPrereqError("Sync queries are not allowed",
373                                    errors.ECODE_INVAL)
374       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
375       return self._Query(op)
376
377     elif method == luxi.REQ_QUERY_EXPORTS:
378       (nodes, use_locking) = args
379       if use_locking:
380         raise errors.OpPrereqError("Sync queries are not allowed",
381                                    errors.ECODE_INVAL)
382       logging.info("Received exports query request")
383       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
384       return self._Query(op)
385
386     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
387       (fields, ) = args
388       logging.info("Received config values query request for %s", fields)
389       op = opcodes.OpClusterConfigQuery(output_fields=fields)
390       return self._Query(op)
391
392     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
393       logging.info("Received cluster info query request")
394       op = opcodes.OpClusterQuery()
395       return self._Query(op)
396
397     elif method == luxi.REQ_QUERY_TAGS:
398       (kind, name) = args
399       logging.info("Received tags query request")
400       op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
401       return self._Query(op)
402
403     elif method == luxi.REQ_SET_DRAIN_FLAG:
404       (drain_flag, ) = args
405       logging.info("Received queue drain flag change request to %s",
406                    drain_flag)
407       return queue.SetDrainFlag(drain_flag)
408
409     elif method == luxi.REQ_SET_WATCHER_PAUSE:
410       (until, ) = args
411
412       if until is None:
413         logging.info("Received request to no longer pause the watcher")
414       else:
415         if not isinstance(until, (int, float)):
416           raise TypeError("Duration must be an integer or float")
417
418         if until < time.time():
419           raise errors.GenericError("Unable to set pause end time in the past")
420
421         logging.info("Received request to pause the watcher until %s", until)
422
423       return _SetWatcherPause(until)
424
425     else:
426       logging.info("Received invalid request '%s'", method)
427       raise ValueError("Invalid operation '%s'" % method)
428
429   def _Query(self, op):
430     """Runs the specified opcode and returns the result.
431
432     """
433     # Queries don't have a job id
434     proc = mcpu.Processor(self.server.context, None, enable_locks=False)
435
436     # TODO: Executing an opcode using locks will acquire them in blocking mode.
437     # Consider using a timeout for retries.
438     return proc.ExecOpCode(op, None)
439
440
441 class GanetiContext(object):
442   """Context common to all ganeti threads.
443
444   This class creates and holds common objects shared by all threads.
445
446   """
447   # pylint: disable=W0212
448   # we do want to ensure a singleton here
449   _instance = None
450
451   def __init__(self):
452     """Constructs a new GanetiContext object.
453
454     There should be only a GanetiContext object at any time, so this
455     function raises an error if this is not the case.
456
457     """
458     assert self.__class__._instance is None, "double GanetiContext instance"
459
460     # Create global configuration object
461     self.cfg = config.ConfigWriter()
462
463     # Locking manager
464     self.glm = locking.GanetiLockManager(
465       self.cfg.GetNodeList(),
466       self.cfg.GetNodeGroupList(),
467       self.cfg.GetInstanceList())
468
469     self.cfg.SetContext(self)
470
471     # RPC runner
472     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
473
474     # Job queue
475     self.jobqueue = jqueue.JobQueue(self)
476
477     # setting this also locks the class against attribute modifications
478     self.__class__._instance = self
479
480   def __setattr__(self, name, value):
481     """Setting GanetiContext attributes is forbidden after initialization.
482
483     """
484     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
485     object.__setattr__(self, name, value)
486
487   def AddNode(self, node, ec_id):
488     """Adds a node to the configuration and lock manager.
489
490     """
491     # Add it to the configuration
492     self.cfg.AddNode(node, ec_id)
493
494     # If preseeding fails it'll not be added
495     self.jobqueue.AddNode(node)
496
497     # Add the new node to the Ganeti Lock Manager
498     self.glm.add(locking.LEVEL_NODE, node.name)
499     self.glm.add(locking.LEVEL_NODE_RES, node.name)
500
501   def ReaddNode(self, node):
502     """Updates a node that's already in the configuration
503
504     """
505     # Synchronize the queue again
506     self.jobqueue.AddNode(node)
507
508   def RemoveNode(self, name):
509     """Removes a node from the configuration and lock manager.
510
511     """
512     # Remove node from configuration
513     self.cfg.RemoveNode(name)
514
515     # Notify job queue
516     self.jobqueue.RemoveNode(name)
517
518     # Remove the node from the Ganeti Lock Manager
519     self.glm.remove(locking.LEVEL_NODE, name)
520     self.glm.remove(locking.LEVEL_NODE_RES, name)
521
522
523 def _SetWatcherPause(until):
524   """Creates or removes the watcher pause file.
525
526   @type until: None or int
527   @param until: Unix timestamp saying until when the watcher shouldn't run
528
529   """
530   if until is None:
531     utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
532   else:
533     utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
534                     data="%d\n" % (until, ))
535
536   return until
537
538
539 @rpc.RunWithRPC
540 def CheckAgreement():
541   """Check the agreement on who is the master.
542
543   The function uses a very simple algorithm: we must get more positive
544   than negative answers. Since in most of the cases we are the master,
545   we'll use our own config file for getting the node list. In the
546   future we could collect the current node list from our (possibly
547   obsolete) known nodes.
548
549   In order to account for cold-start of all nodes, we retry for up to
550   a minute until we get a real answer as the top-voted one. If the
551   nodes are more out-of-sync, for now manual startup of the master
552   should be attempted.
553
554   Note that for a even number of nodes cluster, we need at least half
555   of the nodes (beside ourselves) to vote for us. This creates a
556   problem on two-node clusters, since in this case we require the
557   other node to be up too to confirm our status.
558
559   """
560   myself = netutils.Hostname.GetSysName()
561   #temp instantiation of a config writer, used only to get the node list
562   cfg = config.ConfigWriter()
563   node_list = cfg.GetNodeList()
564   del cfg
565   retries = 6
566   while retries > 0:
567     votes = bootstrap.GatherMasterVotes(node_list)
568     if not votes:
569       # empty node list, this is a one node cluster
570       return True
571     if votes[0][0] is None:
572       retries -= 1
573       time.sleep(10)
574       continue
575     break
576   if retries == 0:
577     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
578                      " after multiple retries. Aborting startup")
579     logging.critical("Use the --no-voting option if you understand what"
580                      " effects it has on the cluster state")
581     return False
582   # here a real node is at the top of the list
583   all_votes = sum(item[1] for item in votes)
584   top_node, top_votes = votes[0]
585
586   result = False
587   if top_node != myself:
588     logging.critical("It seems we are not the master (top-voted node"
589                      " is %s with %d out of %d votes)", top_node, top_votes,
590                      all_votes)
591   elif top_votes < all_votes - top_votes:
592     logging.critical("It seems we are not the master (%d votes for,"
593                      " %d votes against)", top_votes, all_votes - top_votes)
594   else:
595     result = True
596
597   return result
598
599
600 @rpc.RunWithRPC
601 def ActivateMasterIP():
602   # activate ip
603   cfg = config.ConfigWriter()
604   master_params = cfg.GetMasterNetworkParameters()
605   ems = cfg.GetUseExternalMipScript()
606   runner = rpc.BootstrapRunner()
607   result = runner.call_node_activate_master_ip(master_params.name,
608                                                master_params, ems)
609
610   msg = result.fail_msg
611   if msg:
612     logging.error("Can't activate master IP address: %s", msg)
613
614
615 def CheckMasterd(options, args):
616   """Initial checks whether to run or exit with a failure.
617
618   """
619   if args: # masterd doesn't take any arguments
620     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
621     sys.exit(constants.EXIT_FAILURE)
622
623   ssconf.CheckMaster(options.debug)
624
625   try:
626     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
627     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
628   except KeyError:
629     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
630                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
631     sys.exit(constants.EXIT_FAILURE)
632
633   # Determine static runtime architecture information
634   runtime.InitArchInfo()
635
636   # Check the configuration is sane before anything else
637   try:
638     config.ConfigWriter()
639   except errors.ConfigVersionMismatch, err:
640     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
641     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
642     print >> sys.stderr,  \
643         ("Configuration version mismatch. The current Ganeti software"
644          " expects version %s, but the on-disk configuration file has"
645          " version %s. This is likely the result of upgrading the"
646          " software without running the upgrade procedure. Please contact"
647          " your cluster administrator or complete the upgrade using the"
648          " cfgupgrade utility, after reading the upgrade notes." %
649          (v1, v2))
650     sys.exit(constants.EXIT_FAILURE)
651   except errors.ConfigurationError, err:
652     print >> sys.stderr, \
653         ("Configuration error while opening the configuration file: %s\n"
654          "This might be caused by an incomplete software upgrade or"
655          " by a corrupted configuration file. Until the problem is fixed"
656          " the master daemon cannot start." % str(err))
657     sys.exit(constants.EXIT_FAILURE)
658
659   # If CheckMaster didn't fail we believe we are the master, but we have to
660   # confirm with the other nodes.
661   if options.no_voting:
662     if not options.yes_do_it:
663       sys.stdout.write("The 'no voting' option has been selected.\n")
664       sys.stdout.write("This is dangerous, please confirm by"
665                        " typing uppercase 'yes': ")
666       sys.stdout.flush()
667
668       confirmation = sys.stdin.readline().strip()
669       if confirmation != "YES":
670         print >> sys.stderr, "Aborting."
671         sys.exit(constants.EXIT_FAILURE)
672
673   else:
674     # CheckAgreement uses RPC and threads, hence it needs to be run in
675     # a separate process before we call utils.Daemonize in the current
676     # process.
677     if not utils.RunInSeparateProcess(CheckAgreement):
678       sys.exit(constants.EXIT_FAILURE)
679
680   # ActivateMasterIP also uses RPC/threads, so we run it again via a
681   # separate process.
682
683   # TODO: decide whether failure to activate the master IP is a fatal error
684   utils.RunInSeparateProcess(ActivateMasterIP)
685
686
687 def PrepMasterd(options, _):
688   """Prep master daemon function, executed with the PID file held.
689
690   """
691   # This is safe to do as the pid file guarantees against
692   # concurrent execution.
693   utils.RemoveFile(pathutils.MASTER_SOCKET)
694
695   mainloop = daemon.Mainloop()
696   master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
697   return (mainloop, master)
698
699
700 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
701   """Main master daemon function, executed with the PID file held.
702
703   """
704   (mainloop, master) = prep_data
705   try:
706     rpc.Init()
707     try:
708       master.setup_queue()
709       try:
710         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
711       finally:
712         master.server_cleanup()
713     finally:
714       rpc.Shutdown()
715   finally:
716     utils.RemoveFile(pathutils.MASTER_SOCKET)
717
718   logging.info("Clean master daemon shutdown")
719
720
721 def Main():
722   """Main function"""
723   parser = OptionParser(description="Ganeti master daemon",
724                         usage="%prog [-f] [-d]",
725                         version="%%prog (ganeti) %s" %
726                         constants.RELEASE_VERSION)
727   parser.add_option("--no-voting", dest="no_voting",
728                     help="Do not check that the nodes agree on this node"
729                     " being the master and start the daemon unconditionally",
730                     default=False, action="store_true")
731   parser.add_option("--yes-do-it", dest="yes_do_it",
732                     help="Override interactive check for --no-voting",
733                     default=False, action="store_true")
734   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
735                      ExecMasterd, multithreaded=True)