4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
64 CLIENT_REQUEST_WORKERS = 16
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
70 class ClientRequestWorker(workerpool.BaseWorker):
71 # pylint: disable=W0221
72 def RunTask(self, server, message, client):
73 """Process the request.
76 client_ops = ClientOps(server)
79 (method, args, version) = luxi.ParseRequest(message)
80 except luxi.ProtocolError, err:
81 logging.error("Protocol Error: %s", err)
87 # Verify client's version if there was one in the request
88 if version is not None and version != constants.LUXI_VERSION:
89 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
90 (constants.LUXI_VERSION, version))
92 result = client_ops.handle_request(method, args)
94 except errors.GenericError, err:
95 logging.exception("Unexpected exception")
97 result = errors.EncodeException(err)
99 logging.exception("Unexpected exception")
101 result = "Caught exception: %s" % str(err[1])
104 reply = luxi.FormatResponse(success, result)
105 client.send_message(reply)
106 # awake the main thread so that it can write out the data.
107 server.awaker.signal()
108 except: # pylint: disable=W0702
109 logging.exception("Send error")
113 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
114 """Handler for master peers.
119 def __init__(self, server, connected_socket, client_address, family):
120 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
123 family, self._MAX_UNHANDLED)
126 def handle_message(self, message, _):
127 self.server.request_workers.AddTask((self.server, message, self))
130 class _MasterShutdownCheck:
131 """Logic for master daemon shutdown.
134 #: How long to wait between checks
135 _CHECK_INTERVAL = 5.0
137 #: How long to wait after all jobs are done (e.g. to give clients time to
138 #: retrieve the job status)
139 _SHUTDOWN_LINGER = 5.0
142 """Initializes this class.
145 self._had_active_jobs = None
146 self._linger_timeout = None
148 def __call__(self, jq_prepare_result):
149 """Determines if master daemon is ready for shutdown.
151 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
152 @rtype: None or number
153 @return: None if master daemon is ready, timeout if the check must be
157 if jq_prepare_result:
158 # Check again shortly
159 logging.info("Job queue has been notified for shutdown but is still"
160 " busy; next check in %s seconds", self._CHECK_INTERVAL)
161 self._had_active_jobs = True
162 return self._CHECK_INTERVAL
164 if not self._had_active_jobs:
165 # Can shut down as there were no active jobs on the first check
168 # No jobs are running anymore, but maybe some clients want to collect some
169 # information. Give them a short amount of time.
170 if self._linger_timeout is None:
171 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
173 remaining = self._linger_timeout.Remaining()
175 logging.info("Job queue no longer busy; shutting down master daemon"
176 " in %s seconds", remaining)
178 # TODO: Should the master daemon socket be closed at this point? Doing so
179 # wouldn't affect existing connections.
187 class MasterServer(daemon.AsyncStreamServer):
190 This is the main asynchronous master server. It handles connections to the
194 family = socket.AF_UNIX
196 def __init__(self, address, uid, gid):
197 """MasterServer constructor
199 @param address: the unix socket address to bind the MasterServer to
200 @param uid: The uid of the owner of the socket
201 @param gid: The gid of the owner of the socket
204 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
205 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
206 os.chmod(temp_name, 0770)
207 os.chown(temp_name, uid, gid)
208 os.rename(temp_name, address)
210 self.awaker = daemon.AsyncAwaker()
212 # We'll only start threads once we've forked.
214 self.request_workers = None
216 self._shutdown_check = None
218 def handle_connection(self, connected_socket, client_address):
219 # TODO: add connection count and limit the number of open connections to a
220 # maximum number to avoid breaking for lack of file descriptors or memory.
221 MasterClientHandler(self, connected_socket, client_address, self.family)
223 def setup_queue(self):
224 self.context = GanetiContext()
225 self.request_workers = workerpool.WorkerPool("ClientReq",
226 CLIENT_REQUEST_WORKERS,
229 def WaitForShutdown(self):
230 """Prepares server for shutdown.
233 if self._shutdown_check is None:
234 self._shutdown_check = _MasterShutdownCheck()
236 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
238 def server_cleanup(self):
239 """Cleanup the server.
241 This involves shutting down the processor threads and the master
248 if self.request_workers:
249 self.request_workers.TerminateWorkers()
251 self.context.jobqueue.Shutdown()
255 """Class holding high-level client operations."""
256 def __init__(self, server):
259 def handle_request(self, method, args): # pylint: disable=R0911
260 context = self.server.context
261 queue = context.jobqueue
263 # TODO: Parameter validation
264 if not isinstance(args, (tuple, list)):
265 logging.info("Received invalid arguments of type '%s'", type(args))
266 raise ValueError("Invalid arguments type '%s'" % type(args))
268 # TODO: Rewrite to not exit in each 'if/elif' branch
270 if method == luxi.REQ_SUBMIT_JOB:
271 logging.info("Received new job")
273 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
274 return queue.SubmitJob(ops)
276 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
277 logging.info("Received multiple jobs")
281 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
282 return queue.SubmitManyJobs(jobs)
284 elif method == luxi.REQ_CANCEL_JOB:
286 logging.info("Received job cancel request for %s", job_id)
287 return queue.CancelJob(job_id)
289 elif method == luxi.REQ_ARCHIVE_JOB:
291 logging.info("Received job archive request for %s", job_id)
292 return queue.ArchiveJob(job_id)
294 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
295 (age, timeout) = args
296 logging.info("Received job autoarchive request for age %s, timeout %s",
298 return queue.AutoArchiveJobs(age, timeout)
300 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
301 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
302 logging.info("Received job poll request for %s", job_id)
303 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
304 prev_log_serial, timeout)
306 elif method == luxi.REQ_QUERY:
307 (what, fields, qfilter) = args
309 if what in constants.QR_VIA_OP:
310 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
312 elif what == constants.QR_LOCK:
313 if qfilter is not None:
314 raise errors.OpPrereqError("Lock queries can't be filtered",
316 return context.glm.QueryLocks(fields)
317 elif what == constants.QR_JOB:
318 return queue.QueryJobs(fields, qfilter)
319 elif what in constants.QR_VIA_LUXI:
320 raise NotImplementedError
322 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
327 elif method == luxi.REQ_QUERY_FIELDS:
328 (what, fields) = args
329 req = objects.QueryFieldsRequest(what=what, fields=fields)
332 fielddefs = query.ALL_FIELDS[req.what]
334 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
337 return query.QueryFields(fielddefs, req.fields)
339 elif method == luxi.REQ_QUERY_JOBS:
340 (job_ids, fields) = args
341 if isinstance(job_ids, (tuple, list)) and job_ids:
342 msg = utils.CommaJoin(job_ids)
345 logging.info("Received job query request for %s", msg)
346 return queue.OldStyleQueryJobs(job_ids, fields)
348 elif method == luxi.REQ_QUERY_INSTANCES:
349 (names, fields, use_locking) = args
350 logging.info("Received instance query request for %s", names)
352 raise errors.OpPrereqError("Sync queries are not allowed",
354 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
355 use_locking=use_locking)
356 return self._Query(op)
358 elif method == luxi.REQ_QUERY_NODES:
359 (names, fields, use_locking) = args
360 logging.info("Received node query request for %s", names)
362 raise errors.OpPrereqError("Sync queries are not allowed",
364 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
365 use_locking=use_locking)
366 return self._Query(op)
368 elif method == luxi.REQ_QUERY_GROUPS:
369 (names, fields, use_locking) = args
370 logging.info("Received group query request for %s", names)
372 raise errors.OpPrereqError("Sync queries are not allowed",
374 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
375 return self._Query(op)
377 elif method == luxi.REQ_QUERY_EXPORTS:
378 (nodes, use_locking) = args
380 raise errors.OpPrereqError("Sync queries are not allowed",
382 logging.info("Received exports query request")
383 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
384 return self._Query(op)
386 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
388 logging.info("Received config values query request for %s", fields)
389 op = opcodes.OpClusterConfigQuery(output_fields=fields)
390 return self._Query(op)
392 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
393 logging.info("Received cluster info query request")
394 op = opcodes.OpClusterQuery()
395 return self._Query(op)
397 elif method == luxi.REQ_QUERY_TAGS:
399 logging.info("Received tags query request")
400 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
401 return self._Query(op)
403 elif method == luxi.REQ_SET_DRAIN_FLAG:
404 (drain_flag, ) = args
405 logging.info("Received queue drain flag change request to %s",
407 return queue.SetDrainFlag(drain_flag)
409 elif method == luxi.REQ_SET_WATCHER_PAUSE:
413 logging.info("Received request to no longer pause the watcher")
415 if not isinstance(until, (int, float)):
416 raise TypeError("Duration must be an integer or float")
418 if until < time.time():
419 raise errors.GenericError("Unable to set pause end time in the past")
421 logging.info("Received request to pause the watcher until %s", until)
423 return _SetWatcherPause(until)
426 logging.info("Received invalid request '%s'", method)
427 raise ValueError("Invalid operation '%s'" % method)
429 def _Query(self, op):
430 """Runs the specified opcode and returns the result.
433 # Queries don't have a job id
434 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
436 # TODO: Executing an opcode using locks will acquire them in blocking mode.
437 # Consider using a timeout for retries.
438 return proc.ExecOpCode(op, None)
441 class GanetiContext(object):
442 """Context common to all ganeti threads.
444 This class creates and holds common objects shared by all threads.
447 # pylint: disable=W0212
448 # we do want to ensure a singleton here
452 """Constructs a new GanetiContext object.
454 There should be only a GanetiContext object at any time, so this
455 function raises an error if this is not the case.
458 assert self.__class__._instance is None, "double GanetiContext instance"
460 # Create global configuration object
461 self.cfg = config.ConfigWriter()
464 self.glm = locking.GanetiLockManager(
465 self.cfg.GetNodeList(),
466 self.cfg.GetNodeGroupList(),
467 self.cfg.GetInstanceList())
469 self.cfg.SetContext(self)
472 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
475 self.jobqueue = jqueue.JobQueue(self)
477 # setting this also locks the class against attribute modifications
478 self.__class__._instance = self
480 def __setattr__(self, name, value):
481 """Setting GanetiContext attributes is forbidden after initialization.
484 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
485 object.__setattr__(self, name, value)
487 def AddNode(self, node, ec_id):
488 """Adds a node to the configuration and lock manager.
491 # Add it to the configuration
492 self.cfg.AddNode(node, ec_id)
494 # If preseeding fails it'll not be added
495 self.jobqueue.AddNode(node)
497 # Add the new node to the Ganeti Lock Manager
498 self.glm.add(locking.LEVEL_NODE, node.name)
499 self.glm.add(locking.LEVEL_NODE_RES, node.name)
501 def ReaddNode(self, node):
502 """Updates a node that's already in the configuration
505 # Synchronize the queue again
506 self.jobqueue.AddNode(node)
508 def RemoveNode(self, name):
509 """Removes a node from the configuration and lock manager.
512 # Remove node from configuration
513 self.cfg.RemoveNode(name)
516 self.jobqueue.RemoveNode(name)
518 # Remove the node from the Ganeti Lock Manager
519 self.glm.remove(locking.LEVEL_NODE, name)
520 self.glm.remove(locking.LEVEL_NODE_RES, name)
523 def _SetWatcherPause(until):
524 """Creates or removes the watcher pause file.
526 @type until: None or int
527 @param until: Unix timestamp saying until when the watcher shouldn't run
531 utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
533 utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
534 data="%d\n" % (until, ))
540 def CheckAgreement():
541 """Check the agreement on who is the master.
543 The function uses a very simple algorithm: we must get more positive
544 than negative answers. Since in most of the cases we are the master,
545 we'll use our own config file for getting the node list. In the
546 future we could collect the current node list from our (possibly
547 obsolete) known nodes.
549 In order to account for cold-start of all nodes, we retry for up to
550 a minute until we get a real answer as the top-voted one. If the
551 nodes are more out-of-sync, for now manual startup of the master
554 Note that for a even number of nodes cluster, we need at least half
555 of the nodes (beside ourselves) to vote for us. This creates a
556 problem on two-node clusters, since in this case we require the
557 other node to be up too to confirm our status.
560 myself = netutils.Hostname.GetSysName()
561 #temp instantiation of a config writer, used only to get the node list
562 cfg = config.ConfigWriter()
563 node_list = cfg.GetNodeList()
567 votes = bootstrap.GatherMasterVotes(node_list)
569 # empty node list, this is a one node cluster
571 if votes[0][0] is None:
577 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
578 " after multiple retries. Aborting startup")
579 logging.critical("Use the --no-voting option if you understand what"
580 " effects it has on the cluster state")
582 # here a real node is at the top of the list
583 all_votes = sum(item[1] for item in votes)
584 top_node, top_votes = votes[0]
587 if top_node != myself:
588 logging.critical("It seems we are not the master (top-voted node"
589 " is %s with %d out of %d votes)", top_node, top_votes,
591 elif top_votes < all_votes - top_votes:
592 logging.critical("It seems we are not the master (%d votes for,"
593 " %d votes against)", top_votes, all_votes - top_votes)
601 def ActivateMasterIP():
603 cfg = config.ConfigWriter()
604 master_params = cfg.GetMasterNetworkParameters()
605 ems = cfg.GetUseExternalMipScript()
606 runner = rpc.BootstrapRunner()
607 result = runner.call_node_activate_master_ip(master_params.name,
610 msg = result.fail_msg
612 logging.error("Can't activate master IP address: %s", msg)
615 def CheckMasterd(options, args):
616 """Initial checks whether to run or exit with a failure.
619 if args: # masterd doesn't take any arguments
620 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
621 sys.exit(constants.EXIT_FAILURE)
623 ssconf.CheckMaster(options.debug)
626 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
627 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
629 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
630 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
631 sys.exit(constants.EXIT_FAILURE)
633 # Determine static runtime architecture information
634 runtime.InitArchInfo()
636 # Check the configuration is sane before anything else
638 config.ConfigWriter()
639 except errors.ConfigVersionMismatch, err:
640 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
641 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
642 print >> sys.stderr, \
643 ("Configuration version mismatch. The current Ganeti software"
644 " expects version %s, but the on-disk configuration file has"
645 " version %s. This is likely the result of upgrading the"
646 " software without running the upgrade procedure. Please contact"
647 " your cluster administrator or complete the upgrade using the"
648 " cfgupgrade utility, after reading the upgrade notes." %
650 sys.exit(constants.EXIT_FAILURE)
651 except errors.ConfigurationError, err:
652 print >> sys.stderr, \
653 ("Configuration error while opening the configuration file: %s\n"
654 "This might be caused by an incomplete software upgrade or"
655 " by a corrupted configuration file. Until the problem is fixed"
656 " the master daemon cannot start." % str(err))
657 sys.exit(constants.EXIT_FAILURE)
659 # If CheckMaster didn't fail we believe we are the master, but we have to
660 # confirm with the other nodes.
661 if options.no_voting:
662 if not options.yes_do_it:
663 sys.stdout.write("The 'no voting' option has been selected.\n")
664 sys.stdout.write("This is dangerous, please confirm by"
665 " typing uppercase 'yes': ")
668 confirmation = sys.stdin.readline().strip()
669 if confirmation != "YES":
670 print >> sys.stderr, "Aborting."
671 sys.exit(constants.EXIT_FAILURE)
674 # CheckAgreement uses RPC and threads, hence it needs to be run in
675 # a separate process before we call utils.Daemonize in the current
677 if not utils.RunInSeparateProcess(CheckAgreement):
678 sys.exit(constants.EXIT_FAILURE)
680 # ActivateMasterIP also uses RPC/threads, so we run it again via a
683 # TODO: decide whether failure to activate the master IP is a fatal error
684 utils.RunInSeparateProcess(ActivateMasterIP)
687 def PrepMasterd(options, _):
688 """Prep master daemon function, executed with the PID file held.
691 # This is safe to do as the pid file guarantees against
692 # concurrent execution.
693 utils.RemoveFile(pathutils.MASTER_SOCKET)
695 mainloop = daemon.Mainloop()
696 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
697 return (mainloop, master)
700 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
701 """Main master daemon function, executed with the PID file held.
704 (mainloop, master) = prep_data
710 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
712 master.server_cleanup()
716 utils.RemoveFile(pathutils.MASTER_SOCKET)
718 logging.info("Clean master daemon shutdown")
723 parser = OptionParser(description="Ganeti master daemon",
724 usage="%prog [-f] [-d]",
725 version="%%prog (ganeti) %s" %
726 constants.RELEASE_VERSION)
727 parser.add_option("--no-voting", dest="no_voting",
728 help="Do not check that the nodes agree on this node"
729 " being the master and start the daemon unconditionally",
730 default=False, action="store_true")
731 parser.add_option("--yes-do-it", dest="yes_do_it",
732 help="Override interactive check for --no-voting",
733 default=False, action="store_true")
734 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
735 ExecMasterd, multithreaded=True)