Remove deprecated “QueryLocks” LUXI request
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66
67
68 class ClientRequestWorker(workerpool.BaseWorker):
69   # pylint: disable=W0221
70   def RunTask(self, server, message, client):
71     """Process the request.
72
73     """
74     client_ops = ClientOps(server)
75
76     try:
77       (method, args, version) = luxi.ParseRequest(message)
78     except luxi.ProtocolError, err:
79       logging.error("Protocol Error: %s", err)
80       client.close_log()
81       return
82
83     success = False
84     try:
85       # Verify client's version if there was one in the request
86       if version is not None and version != constants.LUXI_VERSION:
87         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88                                (constants.LUXI_VERSION, version))
89
90       result = client_ops.handle_request(method, args)
91       success = True
92     except errors.GenericError, err:
93       logging.exception("Unexpected exception")
94       success = False
95       result = errors.EncodeException(err)
96     except:
97       logging.exception("Unexpected exception")
98       err = sys.exc_info()
99       result = "Caught exception: %s" % str(err[1])
100
101     try:
102       reply = luxi.FormatResponse(success, result)
103       client.send_message(reply)
104       # awake the main thread so that it can write out the data.
105       server.awaker.signal()
106     except: # pylint: disable=W0702
107       logging.exception("Send error")
108       client.close_log()
109
110
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112   """Handler for master peers.
113
114   """
115   _MAX_UNHANDLED = 1
116
117   def __init__(self, server, connected_socket, client_address, family):
118     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119                                                  client_address,
120                                                  constants.LUXI_EOM,
121                                                  family, self._MAX_UNHANDLED)
122     self.server = server
123
124   def handle_message(self, message, _):
125     self.server.request_workers.AddTask((self.server, message, self))
126
127
128 class _MasterShutdownCheck:
129   """Logic for master daemon shutdown.
130
131   """
132   #: How long to wait between checks
133   _CHECK_INTERVAL = 5.0
134
135   #: How long to wait after all jobs are done (e.g. to give clients time to
136   #: retrieve the job status)
137   _SHUTDOWN_LINGER = 5.0
138
139   def __init__(self):
140     """Initializes this class.
141
142     """
143     self._had_active_jobs = None
144     self._linger_timeout = None
145
146   def __call__(self, jq_prepare_result):
147     """Determines if master daemon is ready for shutdown.
148
149     @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
150     @rtype: None or number
151     @return: None if master daemon is ready, timeout if the check must be
152              repeated
153
154     """
155     if jq_prepare_result:
156       # Check again shortly
157       logging.info("Job queue has been notified for shutdown but is still"
158                    " busy; next check in %s seconds", self._CHECK_INTERVAL)
159       self._had_active_jobs = True
160       return self._CHECK_INTERVAL
161
162     if not self._had_active_jobs:
163       # Can shut down as there were no active jobs on the first check
164       return None
165
166     # No jobs are running anymore, but maybe some clients want to collect some
167     # information. Give them a short amount of time.
168     if self._linger_timeout is None:
169       self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
170
171     remaining = self._linger_timeout.Remaining()
172
173     logging.info("Job queue no longer busy; shutting down master daemon"
174                  " in %s seconds", remaining)
175
176     # TODO: Should the master daemon socket be closed at this point? Doing so
177     # wouldn't affect existing connections.
178
179     if remaining < 0:
180       return None
181     else:
182       return remaining
183
184
185 class MasterServer(daemon.AsyncStreamServer):
186   """Master Server.
187
188   This is the main asynchronous master server. It handles connections to the
189   master socket.
190
191   """
192   family = socket.AF_UNIX
193
194   def __init__(self, address, uid, gid):
195     """MasterServer constructor
196
197     @param address: the unix socket address to bind the MasterServer to
198     @param uid: The uid of the owner of the socket
199     @param gid: The gid of the owner of the socket
200
201     """
202     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
203     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
204     os.chmod(temp_name, 0770)
205     os.chown(temp_name, uid, gid)
206     os.rename(temp_name, address)
207
208     self.awaker = daemon.AsyncAwaker()
209
210     # We'll only start threads once we've forked.
211     self.context = None
212     self.request_workers = None
213
214     self._shutdown_check = None
215
216   def handle_connection(self, connected_socket, client_address):
217     # TODO: add connection count and limit the number of open connections to a
218     # maximum number to avoid breaking for lack of file descriptors or memory.
219     MasterClientHandler(self, connected_socket, client_address, self.family)
220
221   def setup_queue(self):
222     self.context = GanetiContext()
223     self.request_workers = workerpool.WorkerPool("ClientReq",
224                                                  CLIENT_REQUEST_WORKERS,
225                                                  ClientRequestWorker)
226
227   def WaitForShutdown(self):
228     """Prepares server for shutdown.
229
230     """
231     if self._shutdown_check is None:
232       self._shutdown_check = _MasterShutdownCheck()
233
234     return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
235
236   def server_cleanup(self):
237     """Cleanup the server.
238
239     This involves shutting down the processor threads and the master
240     socket.
241
242     """
243     try:
244       self.close()
245     finally:
246       if self.request_workers:
247         self.request_workers.TerminateWorkers()
248       if self.context:
249         self.context.jobqueue.Shutdown()
250
251
252 class ClientOps:
253   """Class holding high-level client operations."""
254   def __init__(self, server):
255     self.server = server
256
257   def handle_request(self, method, args): # pylint: disable=R0911
258     queue = self.server.context.jobqueue
259
260     # TODO: Parameter validation
261     if not isinstance(args, (tuple, list)):
262       logging.info("Received invalid arguments of type '%s'", type(args))
263       raise ValueError("Invalid arguments type '%s'" % type(args))
264
265     # TODO: Rewrite to not exit in each 'if/elif' branch
266
267     if method == luxi.REQ_SUBMIT_JOB:
268       logging.info("Received new job")
269       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
270       return queue.SubmitJob(ops)
271
272     if method == luxi.REQ_SUBMIT_MANY_JOBS:
273       logging.info("Received multiple jobs")
274       jobs = []
275       for ops in args:
276         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
277       return queue.SubmitManyJobs(jobs)
278
279     elif method == luxi.REQ_CANCEL_JOB:
280       (job_id, ) = args
281       logging.info("Received job cancel request for %s", job_id)
282       return queue.CancelJob(job_id)
283
284     elif method == luxi.REQ_ARCHIVE_JOB:
285       (job_id, ) = args
286       logging.info("Received job archive request for %s", job_id)
287       return queue.ArchiveJob(job_id)
288
289     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
290       (age, timeout) = args
291       logging.info("Received job autoarchive request for age %s, timeout %s",
292                    age, timeout)
293       return queue.AutoArchiveJobs(age, timeout)
294
295     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
296       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
297       logging.info("Received job poll request for %s", job_id)
298       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
299                                      prev_log_serial, timeout)
300
301     elif method == luxi.REQ_QUERY:
302       (what, fields, qfilter) = args
303       req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
304
305       if req.what in constants.QR_VIA_OP:
306         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
307                                              qfilter=req.qfilter))
308       elif req.what == constants.QR_LOCK:
309         if req.qfilter is not None:
310           raise errors.OpPrereqError("Lock queries can't be filtered")
311         return self.server.context.glm.QueryLocks(req.fields)
312       elif req.what in constants.QR_VIA_LUXI:
313         raise NotImplementedError
314       else:
315         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
316                                    errors.ECODE_INVAL)
317
318       return result
319
320     elif method == luxi.REQ_QUERY_FIELDS:
321       (what, fields) = args
322       req = objects.QueryFieldsRequest(what=what, fields=fields)
323
324       try:
325         fielddefs = query.ALL_FIELDS[req.what]
326       except KeyError:
327         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
328                                    errors.ECODE_INVAL)
329
330       return query.QueryFields(fielddefs, req.fields)
331
332     elif method == luxi.REQ_QUERY_JOBS:
333       (job_ids, fields) = args
334       if isinstance(job_ids, (tuple, list)) and job_ids:
335         msg = utils.CommaJoin(job_ids)
336       else:
337         msg = str(job_ids)
338       logging.info("Received job query request for %s", msg)
339       return queue.QueryJobs(job_ids, fields)
340
341     elif method == luxi.REQ_QUERY_INSTANCES:
342       (names, fields, use_locking) = args
343       logging.info("Received instance query request for %s", names)
344       if use_locking:
345         raise errors.OpPrereqError("Sync queries are not allowed",
346                                    errors.ECODE_INVAL)
347       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
348                                    use_locking=use_locking)
349       return self._Query(op)
350
351     elif method == luxi.REQ_QUERY_NODES:
352       (names, fields, use_locking) = args
353       logging.info("Received node query request for %s", names)
354       if use_locking:
355         raise errors.OpPrereqError("Sync queries are not allowed",
356                                    errors.ECODE_INVAL)
357       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
358                                use_locking=use_locking)
359       return self._Query(op)
360
361     elif method == luxi.REQ_QUERY_GROUPS:
362       (names, fields, use_locking) = args
363       logging.info("Received group query request for %s", names)
364       if use_locking:
365         raise errors.OpPrereqError("Sync queries are not allowed",
366                                    errors.ECODE_INVAL)
367       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
368       return self._Query(op)
369
370     elif method == luxi.REQ_QUERY_EXPORTS:
371       (nodes, use_locking) = args
372       if use_locking:
373         raise errors.OpPrereqError("Sync queries are not allowed",
374                                    errors.ECODE_INVAL)
375       logging.info("Received exports query request")
376       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
377       return self._Query(op)
378
379     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
380       (fields, ) = args
381       logging.info("Received config values query request for %s", fields)
382       op = opcodes.OpClusterConfigQuery(output_fields=fields)
383       return self._Query(op)
384
385     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
386       logging.info("Received cluster info query request")
387       op = opcodes.OpClusterQuery()
388       return self._Query(op)
389
390     elif method == luxi.REQ_QUERY_TAGS:
391       (kind, name) = args
392       logging.info("Received tags query request")
393       op = opcodes.OpTagsGet(kind=kind, name=name)
394       return self._Query(op)
395
396     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
397       (drain_flag, ) = args
398       logging.info("Received queue drain flag change request to %s",
399                    drain_flag)
400       return queue.SetDrainFlag(drain_flag)
401
402     elif method == luxi.REQ_SET_WATCHER_PAUSE:
403       (until, ) = args
404
405       if until is None:
406         logging.info("Received request to no longer pause the watcher")
407       else:
408         if not isinstance(until, (int, float)):
409           raise TypeError("Duration must be an integer or float")
410
411         if until < time.time():
412           raise errors.GenericError("Unable to set pause end time in the past")
413
414         logging.info("Received request to pause the watcher until %s", until)
415
416       return _SetWatcherPause(until)
417
418     else:
419       logging.info("Received invalid request '%s'", method)
420       raise ValueError("Invalid operation '%s'" % method)
421
422   def _Query(self, op):
423     """Runs the specified opcode and returns the result.
424
425     """
426     # Queries don't have a job id
427     proc = mcpu.Processor(self.server.context, None)
428
429     # TODO: Executing an opcode using locks will acquire them in blocking mode.
430     # Consider using a timeout for retries.
431     return proc.ExecOpCode(op, None)
432
433
434 class GanetiContext(object):
435   """Context common to all ganeti threads.
436
437   This class creates and holds common objects shared by all threads.
438
439   """
440   # pylint: disable=W0212
441   # we do want to ensure a singleton here
442   _instance = None
443
444   def __init__(self):
445     """Constructs a new GanetiContext object.
446
447     There should be only a GanetiContext object at any time, so this
448     function raises an error if this is not the case.
449
450     """
451     assert self.__class__._instance is None, "double GanetiContext instance"
452
453     # Create global configuration object
454     self.cfg = config.ConfigWriter()
455
456     # Locking manager
457     self.glm = locking.GanetiLockManager(
458                 self.cfg.GetNodeList(),
459                 self.cfg.GetNodeGroupList(),
460                 self.cfg.GetInstanceList())
461
462     self.cfg.SetContext(self)
463
464     # RPC runner
465     self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
466
467     # Job queue
468     self.jobqueue = jqueue.JobQueue(self)
469
470     # setting this also locks the class against attribute modifications
471     self.__class__._instance = self
472
473   def __setattr__(self, name, value):
474     """Setting GanetiContext attributes is forbidden after initialization.
475
476     """
477     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
478     object.__setattr__(self, name, value)
479
480   def AddNode(self, node, ec_id):
481     """Adds a node to the configuration and lock manager.
482
483     """
484     # Add it to the configuration
485     self.cfg.AddNode(node, ec_id)
486
487     # If preseeding fails it'll not be added
488     self.jobqueue.AddNode(node)
489
490     # Add the new node to the Ganeti Lock Manager
491     self.glm.add(locking.LEVEL_NODE, node.name)
492     self.glm.add(locking.LEVEL_NODE_RES, node.name)
493
494   def ReaddNode(self, node):
495     """Updates a node that's already in the configuration
496
497     """
498     # Synchronize the queue again
499     self.jobqueue.AddNode(node)
500
501   def RemoveNode(self, name):
502     """Removes a node from the configuration and lock manager.
503
504     """
505     # Remove node from configuration
506     self.cfg.RemoveNode(name)
507
508     # Notify job queue
509     self.jobqueue.RemoveNode(name)
510
511     # Remove the node from the Ganeti Lock Manager
512     self.glm.remove(locking.LEVEL_NODE, name)
513     self.glm.remove(locking.LEVEL_NODE_RES, name)
514
515
516 def _SetWatcherPause(until):
517   """Creates or removes the watcher pause file.
518
519   @type until: None or int
520   @param until: Unix timestamp saying until when the watcher shouldn't run
521
522   """
523   if until is None:
524     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
525   else:
526     utils.WriteFile(constants.WATCHER_PAUSEFILE,
527                     data="%d\n" % (until, ))
528
529   return until
530
531
532 @rpc.RunWithRPC
533 def CheckAgreement():
534   """Check the agreement on who is the master.
535
536   The function uses a very simple algorithm: we must get more positive
537   than negative answers. Since in most of the cases we are the master,
538   we'll use our own config file for getting the node list. In the
539   future we could collect the current node list from our (possibly
540   obsolete) known nodes.
541
542   In order to account for cold-start of all nodes, we retry for up to
543   a minute until we get a real answer as the top-voted one. If the
544   nodes are more out-of-sync, for now manual startup of the master
545   should be attempted.
546
547   Note that for a even number of nodes cluster, we need at least half
548   of the nodes (beside ourselves) to vote for us. This creates a
549   problem on two-node clusters, since in this case we require the
550   other node to be up too to confirm our status.
551
552   """
553   myself = netutils.Hostname.GetSysName()
554   #temp instantiation of a config writer, used only to get the node list
555   cfg = config.ConfigWriter()
556   node_list = cfg.GetNodeList()
557   del cfg
558   retries = 6
559   while retries > 0:
560     votes = bootstrap.GatherMasterVotes(node_list)
561     if not votes:
562       # empty node list, this is a one node cluster
563       return True
564     if votes[0][0] is None:
565       retries -= 1
566       time.sleep(10)
567       continue
568     break
569   if retries == 0:
570     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
571                      " after multiple retries. Aborting startup")
572     logging.critical("Use the --no-voting option if you understand what"
573                      " effects it has on the cluster state")
574     return False
575   # here a real node is at the top of the list
576   all_votes = sum(item[1] for item in votes)
577   top_node, top_votes = votes[0]
578
579   result = False
580   if top_node != myself:
581     logging.critical("It seems we are not the master (top-voted node"
582                      " is %s with %d out of %d votes)", top_node, top_votes,
583                      all_votes)
584   elif top_votes < all_votes - top_votes:
585     logging.critical("It seems we are not the master (%d votes for,"
586                      " %d votes against)", top_votes, all_votes - top_votes)
587   else:
588     result = True
589
590   return result
591
592
593 @rpc.RunWithRPC
594 def ActivateMasterIP():
595   # activate ip
596   cfg = config.ConfigWriter()
597   master_params = cfg.GetMasterNetworkParameters()
598   ems = cfg.GetUseExternalMipScript()
599   runner = rpc.BootstrapRunner()
600   result = runner.call_node_activate_master_ip(master_params.name,
601                                                master_params, ems)
602
603   msg = result.fail_msg
604   if msg:
605     logging.error("Can't activate master IP address: %s", msg)
606
607
608 def CheckMasterd(options, args):
609   """Initial checks whether to run or exit with a failure.
610
611   """
612   if args: # masterd doesn't take any arguments
613     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
614     sys.exit(constants.EXIT_FAILURE)
615
616   ssconf.CheckMaster(options.debug)
617
618   try:
619     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
620     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
621   except KeyError:
622     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
623                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
624     sys.exit(constants.EXIT_FAILURE)
625
626   # Check the configuration is sane before anything else
627   try:
628     config.ConfigWriter()
629   except errors.ConfigVersionMismatch, err:
630     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
631     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
632     print >> sys.stderr,  \
633         ("Configuration version mismatch. The current Ganeti software"
634          " expects version %s, but the on-disk configuration file has"
635          " version %s. This is likely the result of upgrading the"
636          " software without running the upgrade procedure. Please contact"
637          " your cluster administrator or complete the upgrade using the"
638          " cfgupgrade utility, after reading the upgrade notes." %
639          (v1, v2))
640     sys.exit(constants.EXIT_FAILURE)
641   except errors.ConfigurationError, err:
642     print >> sys.stderr, \
643         ("Configuration error while opening the configuration file: %s\n"
644          "This might be caused by an incomplete software upgrade or"
645          " by a corrupted configuration file. Until the problem is fixed"
646          " the master daemon cannot start." % str(err))
647     sys.exit(constants.EXIT_FAILURE)
648
649   # If CheckMaster didn't fail we believe we are the master, but we have to
650   # confirm with the other nodes.
651   if options.no_voting:
652     if not options.yes_do_it:
653       sys.stdout.write("The 'no voting' option has been selected.\n")
654       sys.stdout.write("This is dangerous, please confirm by"
655                        " typing uppercase 'yes': ")
656       sys.stdout.flush()
657
658       confirmation = sys.stdin.readline().strip()
659       if confirmation != "YES":
660         print >> sys.stderr, "Aborting."
661         sys.exit(constants.EXIT_FAILURE)
662
663   else:
664     # CheckAgreement uses RPC and threads, hence it needs to be run in
665     # a separate process before we call utils.Daemonize in the current
666     # process.
667     if not utils.RunInSeparateProcess(CheckAgreement):
668       sys.exit(constants.EXIT_FAILURE)
669
670   # ActivateMasterIP also uses RPC/threads, so we run it again via a
671   # separate process.
672
673   # TODO: decide whether failure to activate the master IP is a fatal error
674   utils.RunInSeparateProcess(ActivateMasterIP)
675
676
677 def PrepMasterd(options, _):
678   """Prep master daemon function, executed with the PID file held.
679
680   """
681   # This is safe to do as the pid file guarantees against
682   # concurrent execution.
683   utils.RemoveFile(constants.MASTER_SOCKET)
684
685   mainloop = daemon.Mainloop()
686   master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
687   return (mainloop, master)
688
689
690 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
691   """Main master daemon function, executed with the PID file held.
692
693   """
694   (mainloop, master) = prep_data
695   try:
696     rpc.Init()
697     try:
698       master.setup_queue()
699       try:
700         mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
701       finally:
702         master.server_cleanup()
703     finally:
704       rpc.Shutdown()
705   finally:
706     utils.RemoveFile(constants.MASTER_SOCKET)
707
708   logging.info("Clean master daemon shutdown")
709
710
711 def Main():
712   """Main function"""
713   parser = OptionParser(description="Ganeti master daemon",
714                         usage="%prog [-f] [-d]",
715                         version="%%prog (ganeti) %s" %
716                         constants.RELEASE_VERSION)
717   parser.add_option("--no-voting", dest="no_voting",
718                     help="Do not check that the nodes agree on this node"
719                     " being the master and start the daemon unconditionally",
720                     default=False, action="store_true")
721   parser.add_option("--yes-do-it", dest="yes_do_it",
722                     help="Override interactive check for --no-voting",
723                     default=False, action="store_true")
724   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
725                      ExecMasterd, multithreaded=True)