Drop objects.QueryRequest
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61
62
63 CLIENT_REQUEST_WORKERS = 16
64
65 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
67
68
69 class ClientRequestWorker(workerpool.BaseWorker):
70   # pylint: disable=W0221
71   def RunTask(self, server, message, client):
72     """Process the request.
73
74     """
75     client_ops = ClientOps(server)
76
77     try:
78       (method, args, version) = luxi.ParseRequest(message)
79     except luxi.ProtocolError, err:
80       logging.error("Protocol Error: %s", err)
81       client.close_log()
82       return
83
84     success = False
85     try:
86       # Verify client's version if there was one in the request
87       if version is not None and version != constants.LUXI_VERSION:
88         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
89                                (constants.LUXI_VERSION, version))
90
91       result = client_ops.handle_request(method, args)
92       success = True
93     except errors.GenericError, err:
94       logging.exception("Unexpected exception")
95       success = False
96       result = errors.EncodeException(err)
97     except:
98       logging.exception("Unexpected exception")
99       err = sys.exc_info()
100       result = "Caught exception: %s" % str(err[1])
101
102     try:
103       reply = luxi.FormatResponse(success, result)
104       client.send_message(reply)
105       # awake the main thread so that it can write out the data.
106       server.awaker.signal()
107     except: # pylint: disable=W0702
108       logging.exception("Send error")
109       client.close_log()
110
111
112 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
113   """Handler for master peers.
114
115   """
116   _MAX_UNHANDLED = 1
117
118   def __init__(self, server, connected_socket, client_address, family):
119     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
120                                                  client_address,
121                                                  constants.LUXI_EOM,
122                                                  family, self._MAX_UNHANDLED)
123     self.server = server
124
125   def handle_message(self, message, _):
126     self.server.request_workers.AddTask((self.server, message, self))
127
128
129 class _MasterShutdownCheck:
130   """Logic for master daemon shutdown.
131
132   """
133   #: How long to wait between checks
134   _CHECK_INTERVAL = 5.0
135
136   #: How long to wait after all jobs are done (e.g. to give clients time to
137   #: retrieve the job status)
138   _SHUTDOWN_LINGER = 5.0
139
140   def __init__(self):
141     """Initializes this class.
142
143     """
144     self._had_active_jobs = None
145     self._linger_timeout = None
146
147   def __call__(self, jq_prepare_result):
148     """Determines if master daemon is ready for shutdown.
149
150     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
151     @rtype: None or number
152     @return: None if master daemon is ready, timeout if the check must be
153              repeated
154
155     """
156     if jq_prepare_result:
157       # Check again shortly
158       logging.info("Job queue has been notified for shutdown but is still"
159                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
160       self._had_active_jobs = True
161       return self._CHECK_INTERVAL
162
163     if not self._had_active_jobs:
164       # Can shut down as there were no active jobs on the first check
165       return None
166
167     # No jobs are running anymore, but maybe some clients want to collect some
168     # information. Give them a short amount of time.
169     if self._linger_timeout is None:
170       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
171
172     remaining = self._linger_timeout.Remaining()
173
174     logging.info("Job queue no longer busy; shutting down master daemon"
175                  " in %s seconds", remaining)
176
177     # TODO: Should the master daemon socket be closed at this point? Doing so
178     # wouldn't affect existing connections.
179
180     if remaining < 0:
181       return None
182     else:
183       return remaining
184
185
186 class MasterServer(daemon.AsyncStreamServer):
187   """Master Server.
188
189   This is the main asynchronous master server. It handles connections to the
190   master socket.
191
192   """
193   family = socket.AF_UNIX
194
195   def __init__(self, address, uid, gid):
196     """MasterServer constructor
197
198     @param address: the unix socket address to bind the MasterServer to
199     @param uid: The uid of the owner of the socket
200     @param gid: The gid of the owner of the socket
201
202     """
203     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
204     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
205     os.chmod(temp_name, 0770)
206     os.chown(temp_name, uid, gid)
207     os.rename(temp_name, address)
208
209     self.awaker = daemon.AsyncAwaker()
210
211     # We'll only start threads once we've forked.
212     self.context = None
213     self.request_workers = None
214
215     self._shutdown_check = None
216
217   def handle_connection(self, connected_socket, client_address):
218     # TODO: add connection count and limit the number of open connections to a
219     # maximum number to avoid breaking for lack of file descriptors or memory.
220     MasterClientHandler(self, connected_socket, client_address, self.family)
221
222   def setup_queue(self):
223     self.context = GanetiContext()
224     self.request_workers = workerpool.WorkerPool("ClientReq",
225                                                  CLIENT_REQUEST_WORKERS,
226                                                  ClientRequestWorker)
227
228   def WaitForShutdown(self):
229     """Prepares server for shutdown.
230
231     """
232     if self._shutdown_check is None:
233       self._shutdown_check = _MasterShutdownCheck()
234
235     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
236
237   def server_cleanup(self):
238     """Cleanup the server.
239
240     This involves shutting down the processor threads and the master
241     socket.
242
243     """
244     try:
245       self.close()
246     finally:
247       if self.request_workers:
248         self.request_workers.TerminateWorkers()
249       if self.context:
250         self.context.jobqueue.Shutdown()
251
252
253 class ClientOps:
254   """Class holding high-level client operations."""
255   def __init__(self, server):
256     self.server = server
257
258   def handle_request(self, method, args): # pylint: disable=R0911
259     context = self.server.context
260     queue = context.jobqueue
261
262     # TODO: Parameter validation
263     if not isinstance(args, (tuple, list)):
264       logging.info("Received invalid arguments of type '%s'", type(args))
265       raise ValueError("Invalid arguments type '%s'" % type(args))
266
267     # TODO: Rewrite to not exit in each 'if/elif' branch
268
269     if method == luxi.REQ_SUBMIT_JOB:
270       logging.info("Received new job")
271       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
272       return queue.SubmitJob(ops)
273
274     if method == luxi.REQ_SUBMIT_MANY_JOBS:
275       logging.info("Received multiple jobs")
276       jobs = []
277       for ops in args:
278         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
279       return queue.SubmitManyJobs(jobs)
280
281     elif method == luxi.REQ_CANCEL_JOB:
282       (job_id, ) = args
283       logging.info("Received job cancel request for %s", job_id)
284       return queue.CancelJob(job_id)
285
286     elif method == luxi.REQ_ARCHIVE_JOB:
287       (job_id, ) = args
288       logging.info("Received job archive request for %s", job_id)
289       return queue.ArchiveJob(job_id)
290
291     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
292       (age, timeout) = args
293       logging.info("Received job autoarchive request for age %s, timeout %s",
294                    age, timeout)
295       return queue.AutoArchiveJobs(age, timeout)
296
297     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
298       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
299       logging.info("Received job poll request for %s", job_id)
300       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
301                                      prev_log_serial, timeout)
302
303     elif method == luxi.REQ_QUERY:
304       (what, fields, qfilter) = args
305
306       if what in constants.QR_VIA_OP:
307         result = self._Query(opcodes.OpQuery(what=what, fields=fields,
308                                              qfilter=qfilter))
309       elif what == constants.QR_LOCK:
310         if qfilter is not None:
311           raise errors.OpPrereqError("Lock queries can't be filtered")
312         return context.glm.QueryLocks(fields)
313       elif what == constants.QR_JOB:
314         return queue.QueryJobs(fields, qfilter)
315       elif what in constants.QR_VIA_LUXI:
316         raise NotImplementedError
317       else:
318         raise errors.OpPrereqError("Resource type '%s' unknown" % what,
319                                    errors.ECODE_INVAL)
320
321       return result
322
323     elif method == luxi.REQ_QUERY_FIELDS:
324       (what, fields) = args
325       req = objects.QueryFieldsRequest(what=what, fields=fields)
326
327       try:
328         fielddefs = query.ALL_FIELDS[req.what]
329       except KeyError:
330         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
331                                    errors.ECODE_INVAL)
332
333       return query.QueryFields(fielddefs, req.fields)
334
335     elif method == luxi.REQ_QUERY_JOBS:
336       (job_ids, fields) = args
337       if isinstance(job_ids, (tuple, list)) and job_ids:
338         msg = utils.CommaJoin(job_ids)
339       else:
340         msg = str(job_ids)
341       logging.info("Received job query request for %s", msg)
342       return queue.OldStyleQueryJobs(job_ids, fields)
343
344     elif method == luxi.REQ_QUERY_INSTANCES:
345       (names, fields, use_locking) = args
346       logging.info("Received instance query request for %s", names)
347       if use_locking:
348         raise errors.OpPrereqError("Sync queries are not allowed",
349                                    errors.ECODE_INVAL)
350       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
351                                    use_locking=use_locking)
352       return self._Query(op)
353
354     elif method == luxi.REQ_QUERY_NODES:
355       (names, fields, use_locking) = args
356       logging.info("Received node query request for %s", names)
357       if use_locking:
358         raise errors.OpPrereqError("Sync queries are not allowed",
359                                    errors.ECODE_INVAL)
360       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
361                                use_locking=use_locking)
362       return self._Query(op)
363
364     elif method == luxi.REQ_QUERY_GROUPS:
365       (names, fields, use_locking) = args
366       logging.info("Received group query request for %s", names)
367       if use_locking:
368         raise errors.OpPrereqError("Sync queries are not allowed",
369                                    errors.ECODE_INVAL)
370       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
371       return self._Query(op)
372
373     elif method == luxi.REQ_QUERY_EXPORTS:
374       (nodes, use_locking) = args
375       if use_locking:
376         raise errors.OpPrereqError("Sync queries are not allowed",
377                                    errors.ECODE_INVAL)
378       logging.info("Received exports query request")
379       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
380       return self._Query(op)
381
382     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
383       (fields, ) = args
384       logging.info("Received config values query request for %s", fields)
385       op = opcodes.OpClusterConfigQuery(output_fields=fields)
386       return self._Query(op)
387
388     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
389       logging.info("Received cluster info query request")
390       op = opcodes.OpClusterQuery()
391       return self._Query(op)
392
393     elif method == luxi.REQ_QUERY_TAGS:
394       (kind, name) = args
395       logging.info("Received tags query request")
396       op = opcodes.OpTagsGet(kind=kind, name=name)
397       return self._Query(op)
398
399     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
400       (drain_flag, ) = args
401       logging.info("Received queue drain flag change request to %s",
402                    drain_flag)
403       return queue.SetDrainFlag(drain_flag)
404
405     elif method == luxi.REQ_SET_WATCHER_PAUSE:
406       (until, ) = args
407
408       if until is None:
409         logging.info("Received request to no longer pause the watcher")
410       else:
411         if not isinstance(until, (int, float)):
412           raise TypeError("Duration must be an integer or float")
413
414         if until < time.time():
415           raise errors.GenericError("Unable to set pause end time in the past")
416
417         logging.info("Received request to pause the watcher until %s", until)
418
419       return _SetWatcherPause(until)
420
421     else:
422       logging.info("Received invalid request '%s'", method)
423       raise ValueError("Invalid operation '%s'" % method)
424
425   def _Query(self, op):
426     """Runs the specified opcode and returns the result.
427
428     """
429     # Queries don't have a job id
430     proc = mcpu.Processor(self.server.context, None)
431
432     # TODO: Executing an opcode using locks will acquire them in blocking mode.
433     # Consider using a timeout for retries.
434     return proc.ExecOpCode(op, None)
435
436
437 class GanetiContext(object):
438   """Context common to all ganeti threads.
439
440   This class creates and holds common objects shared by all threads.
441
442   """
443   # pylint: disable=W0212
444   # we do want to ensure a singleton here
445   _instance = None
446
447   def __init__(self):
448     """Constructs a new GanetiContext object.
449
450     There should be only a GanetiContext object at any time, so this
451     function raises an error if this is not the case.
452
453     """
454     assert self.__class__._instance is None, "double GanetiContext instance"
455
456     # Create global configuration object
457     self.cfg = config.ConfigWriter()
458
459     # Locking manager
460     self.glm = locking.GanetiLockManager(
461                 self.cfg.GetNodeList(),
462                 self.cfg.GetNodeGroupList(),
463                 self.cfg.GetInstanceList())
464
465     self.cfg.SetContext(self)
466
467     # RPC runner
468     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
469
470     # Job queue
471     self.jobqueue = jqueue.JobQueue(self)
472
473     # setting this also locks the class against attribute modifications
474     self.__class__._instance = self
475
476   def __setattr__(self, name, value):
477     """Setting GanetiContext attributes is forbidden after initialization.
478
479     """
480     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
481     object.__setattr__(self, name, value)
482
483   def AddNode(self, node, ec_id):
484     """Adds a node to the configuration and lock manager.
485
486     """
487     # Add it to the configuration
488     self.cfg.AddNode(node, ec_id)
489
490     # If preseeding fails it'll not be added
491     self.jobqueue.AddNode(node)
492
493     # Add the new node to the Ganeti Lock Manager
494     self.glm.add(locking.LEVEL_NODE, node.name)
495     self.glm.add(locking.LEVEL_NODE_RES, node.name)
496
497   def ReaddNode(self, node):
498     """Updates a node that's already in the configuration
499
500     """
501     # Synchronize the queue again
502     self.jobqueue.AddNode(node)
503
504   def RemoveNode(self, name):
505     """Removes a node from the configuration and lock manager.
506
507     """
508     # Remove node from configuration
509     self.cfg.RemoveNode(name)
510
511     # Notify job queue
512     self.jobqueue.RemoveNode(name)
513
514     # Remove the node from the Ganeti Lock Manager
515     self.glm.remove(locking.LEVEL_NODE, name)
516     self.glm.remove(locking.LEVEL_NODE_RES, name)
517
518
519 def _SetWatcherPause(until):
520   """Creates or removes the watcher pause file.
521
522   @type until: None or int
523   @param until: Unix timestamp saying until when the watcher shouldn't run
524
525   """
526   if until is None:
527     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
528   else:
529     utils.WriteFile(constants.WATCHER_PAUSEFILE,
530                     data="%d\n" % (until, ))
531
532   return until
533
534
535 @rpc.RunWithRPC
536 def CheckAgreement():
537   """Check the agreement on who is the master.
538
539   The function uses a very simple algorithm: we must get more positive
540   than negative answers. Since in most of the cases we are the master,
541   we'll use our own config file for getting the node list. In the
542   future we could collect the current node list from our (possibly
543   obsolete) known nodes.
544
545   In order to account for cold-start of all nodes, we retry for up to
546   a minute until we get a real answer as the top-voted one. If the
547   nodes are more out-of-sync, for now manual startup of the master
548   should be attempted.
549
550   Note that for a even number of nodes cluster, we need at least half
551   of the nodes (beside ourselves) to vote for us. This creates a
552   problem on two-node clusters, since in this case we require the
553   other node to be up too to confirm our status.
554
555   """
556   myself = netutils.Hostname.GetSysName()
557   #temp instantiation of a config writer, used only to get the node list
558   cfg = config.ConfigWriter()
559   node_list = cfg.GetNodeList()
560   del cfg
561   retries = 6
562   while retries > 0:
563     votes = bootstrap.GatherMasterVotes(node_list)
564     if not votes:
565       # empty node list, this is a one node cluster
566       return True
567     if votes[0][0] is None:
568       retries -= 1
569       time.sleep(10)
570       continue
571     break
572   if retries == 0:
573     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
574                      " after multiple retries. Aborting startup")
575     logging.critical("Use the --no-voting option if you understand what"
576                      " effects it has on the cluster state")
577     return False
578   # here a real node is at the top of the list
579   all_votes = sum(item[1] for item in votes)
580   top_node, top_votes = votes[0]
581
582   result = False
583   if top_node != myself:
584     logging.critical("It seems we are not the master (top-voted node"
585                      " is %s with %d out of %d votes)", top_node, top_votes,
586                      all_votes)
587   elif top_votes < all_votes - top_votes:
588     logging.critical("It seems we are not the master (%d votes for,"
589                      " %d votes against)", top_votes, all_votes - top_votes)
590   else:
591     result = True
592
593   return result
594
595
596 @rpc.RunWithRPC
597 def ActivateMasterIP():
598   # activate ip
599   cfg = config.ConfigWriter()
600   master_params = cfg.GetMasterNetworkParameters()
601   ems = cfg.GetUseExternalMipScript()
602   runner = rpc.BootstrapRunner()
603   result = runner.call_node_activate_master_ip(master_params.name,
604                                                master_params, ems)
605
606   msg = result.fail_msg
607   if msg:
608     logging.error("Can't activate master IP address: %s", msg)
609
610
611 def CheckMasterd(options, args):
612   """Initial checks whether to run or exit with a failure.
613
614   """
615   if args: # masterd doesn't take any arguments
616     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
617     sys.exit(constants.EXIT_FAILURE)
618
619   ssconf.CheckMaster(options.debug)
620
621   try:
622     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
623     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
624   except KeyError:
625     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
626                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
627     sys.exit(constants.EXIT_FAILURE)
628
629   # Determine static runtime architecture information
630   runtime.InitArchInfo()
631
632   # Check the configuration is sane before anything else
633   try:
634     config.ConfigWriter()
635   except errors.ConfigVersionMismatch, err:
636     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
637     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
638     print >> sys.stderr,  \
639         ("Configuration version mismatch. The current Ganeti software"
640          " expects version %s, but the on-disk configuration file has"
641          " version %s. This is likely the result of upgrading the"
642          " software without running the upgrade procedure. Please contact"
643          " your cluster administrator or complete the upgrade using the"
644          " cfgupgrade utility, after reading the upgrade notes." %
645          (v1, v2))
646     sys.exit(constants.EXIT_FAILURE)
647   except errors.ConfigurationError, err:
648     print >> sys.stderr, \
649         ("Configuration error while opening the configuration file: %s\n"
650          "This might be caused by an incomplete software upgrade or"
651          " by a corrupted configuration file. Until the problem is fixed"
652          " the master daemon cannot start." % str(err))
653     sys.exit(constants.EXIT_FAILURE)
654
655   # If CheckMaster didn't fail we believe we are the master, but we have to
656   # confirm with the other nodes.
657   if options.no_voting:
658     if not options.yes_do_it:
659       sys.stdout.write("The 'no voting' option has been selected.\n")
660       sys.stdout.write("This is dangerous, please confirm by"
661                        " typing uppercase 'yes': ")
662       sys.stdout.flush()
663
664       confirmation = sys.stdin.readline().strip()
665       if confirmation != "YES":
666         print >> sys.stderr, "Aborting."
667         sys.exit(constants.EXIT_FAILURE)
668
669   else:
670     # CheckAgreement uses RPC and threads, hence it needs to be run in
671     # a separate process before we call utils.Daemonize in the current
672     # process.
673     if not utils.RunInSeparateProcess(CheckAgreement):
674       sys.exit(constants.EXIT_FAILURE)
675
676   # ActivateMasterIP also uses RPC/threads, so we run it again via a
677   # separate process.
678
679   # TODO: decide whether failure to activate the master IP is a fatal error
680   utils.RunInSeparateProcess(ActivateMasterIP)
681
682
683 def PrepMasterd(options, _):
684   """Prep master daemon function, executed with the PID file held.
685
686   """
687   # This is safe to do as the pid file guarantees against
688   # concurrent execution.
689   utils.RemoveFile(constants.MASTER_SOCKET)
690
691   mainloop = daemon.Mainloop()
692   master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
693   return (mainloop, master)
694
695
696 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
697   """Main master daemon function, executed with the PID file held.
698
699   """
700   (mainloop, master) = prep_data
701   try:
702     rpc.Init()
703     try:
704       master.setup_queue()
705       try:
706         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
707       finally:
708         master.server_cleanup()
709     finally:
710       rpc.Shutdown()
711   finally:
712     utils.RemoveFile(constants.MASTER_SOCKET)
713
714   logging.info("Clean master daemon shutdown")
715
716
717 def Main():
718   """Main function"""
719   parser = OptionParser(description="Ganeti master daemon",
720                         usage="%prog [-f] [-d]",
721                         version="%%prog (ganeti) %s" %
722                         constants.RELEASE_VERSION)
723   parser.add_option("--no-voting", dest="no_voting",
724                     help="Do not check that the nodes agree on this node"
725                     " being the master and start the daemon unconditionally",
726                     default=False, action="store_true")
727   parser.add_option("--yes-do-it", dest="yes_do_it",
728                     help="Override interactive check for --no-voting",
729                     default=False, action="store_true")
730   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
731                      ExecMasterd, multithreaded=True)