4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
64 CLIENT_REQUEST_WORKERS = 16
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
70 def _LogNewJob(status, info, ops):
71 """Log information about a recently submitted job.
75 logging.info("New job with id %s, summary: %s",
76 info, utils.CommaJoin(op.Summary() for op in ops))
78 logging.info("Failed to submit job, reason: '%s', summary: %s",
79 info, utils.CommaJoin(op.Summary() for op in ops))
82 class ClientRequestWorker(workerpool.BaseWorker):
83 # pylint: disable=W0221
84 def RunTask(self, server, message, client):
85 """Process the request.
88 client_ops = ClientOps(server)
91 (method, args, version) = luxi.ParseRequest(message)
92 except luxi.ProtocolError, err:
93 logging.error("Protocol Error: %s", err)
99 # Verify client's version if there was one in the request
100 if version is not None and version != constants.LUXI_VERSION:
101 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102 (constants.LUXI_VERSION, version))
104 result = client_ops.handle_request(method, args)
106 except errors.GenericError, err:
107 logging.exception("Unexpected exception")
109 result = errors.EncodeException(err)
111 logging.exception("Unexpected exception")
113 result = "Caught exception: %s" % str(err[1])
116 reply = luxi.FormatResponse(success, result)
117 client.send_message(reply)
118 # awake the main thread so that it can write out the data.
119 server.awaker.signal()
120 except: # pylint: disable=W0702
121 logging.exception("Send error")
125 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126 """Handler for master peers.
131 def __init__(self, server, connected_socket, client_address, family):
132 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
135 family, self._MAX_UNHANDLED)
138 def handle_message(self, message, _):
139 self.server.request_workers.AddTask((self.server, message, self))
142 class _MasterShutdownCheck:
143 """Logic for master daemon shutdown.
146 #: How long to wait between checks
147 _CHECK_INTERVAL = 5.0
149 #: How long to wait after all jobs are done (e.g. to give clients time to
150 #: retrieve the job status)
151 _SHUTDOWN_LINGER = 5.0
154 """Initializes this class.
157 self._had_active_jobs = None
158 self._linger_timeout = None
160 def __call__(self, jq_prepare_result):
161 """Determines if master daemon is ready for shutdown.
163 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164 @rtype: None or number
165 @return: None if master daemon is ready, timeout if the check must be
169 if jq_prepare_result:
170 # Check again shortly
171 logging.info("Job queue has been notified for shutdown but is still"
172 " busy; next check in %s seconds", self._CHECK_INTERVAL)
173 self._had_active_jobs = True
174 return self._CHECK_INTERVAL
176 if not self._had_active_jobs:
177 # Can shut down as there were no active jobs on the first check
180 # No jobs are running anymore, but maybe some clients want to collect some
181 # information. Give them a short amount of time.
182 if self._linger_timeout is None:
183 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
185 remaining = self._linger_timeout.Remaining()
187 logging.info("Job queue no longer busy; shutting down master daemon"
188 " in %s seconds", remaining)
190 # TODO: Should the master daemon socket be closed at this point? Doing so
191 # wouldn't affect existing connections.
199 class MasterServer(daemon.AsyncStreamServer):
202 This is the main asynchronous master server. It handles connections to the
206 family = socket.AF_UNIX
208 def __init__(self, address, uid, gid):
209 """MasterServer constructor
211 @param address: the unix socket address to bind the MasterServer to
212 @param uid: The uid of the owner of the socket
213 @param gid: The gid of the owner of the socket
216 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218 os.chmod(temp_name, 0770)
219 os.chown(temp_name, uid, gid)
220 os.rename(temp_name, address)
222 self.awaker = daemon.AsyncAwaker()
224 # We'll only start threads once we've forked.
226 self.request_workers = None
228 self._shutdown_check = None
230 def handle_connection(self, connected_socket, client_address):
231 # TODO: add connection count and limit the number of open connections to a
232 # maximum number to avoid breaking for lack of file descriptors or memory.
233 MasterClientHandler(self, connected_socket, client_address, self.family)
235 def setup_queue(self):
236 self.context = GanetiContext()
237 self.request_workers = workerpool.WorkerPool("ClientReq",
238 CLIENT_REQUEST_WORKERS,
241 def WaitForShutdown(self):
242 """Prepares server for shutdown.
245 if self._shutdown_check is None:
246 self._shutdown_check = _MasterShutdownCheck()
248 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
250 def server_cleanup(self):
251 """Cleanup the server.
253 This involves shutting down the processor threads and the master
260 if self.request_workers:
261 self.request_workers.TerminateWorkers()
263 self.context.jobqueue.Shutdown()
267 """Class holding high-level client operations."""
268 def __init__(self, server):
271 def handle_request(self, method, args): # pylint: disable=R0911
272 context = self.server.context
273 queue = context.jobqueue
275 # TODO: Parameter validation
276 if not isinstance(args, (tuple, list)):
277 logging.info("Received invalid arguments of type '%s'", type(args))
278 raise ValueError("Invalid arguments type '%s'" % type(args))
280 # TODO: Rewrite to not exit in each 'if/elif' branch
282 if method == luxi.REQ_SUBMIT_JOB:
283 logging.info("Receiving new job")
285 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286 job_id = queue.SubmitJob(ops)
287 _LogNewJob(True, job_id, ops)
290 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291 logging.info("Receiving multiple jobs")
295 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296 job_ids = queue.SubmitManyJobs(jobs)
297 for ((status, job_id), ops) in zip(job_ids, jobs):
298 _LogNewJob(status, job_id, ops)
301 elif method == luxi.REQ_CANCEL_JOB:
303 logging.info("Received job cancel request for %s", job_id)
304 return queue.CancelJob(job_id)
306 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307 (job_id, priority) = args
308 logging.info("Received request to change priority for job %s to %s",
310 return queue.ChangeJobPriority(job_id, priority)
312 elif method == luxi.REQ_ARCHIVE_JOB:
314 logging.info("Received job archive request for %s", job_id)
315 return queue.ArchiveJob(job_id)
317 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318 (age, timeout) = args
319 logging.info("Received job autoarchive request for age %s, timeout %s",
321 return queue.AutoArchiveJobs(age, timeout)
323 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325 logging.info("Received job poll request for %s", job_id)
326 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327 prev_log_serial, timeout)
329 elif method == luxi.REQ_QUERY:
330 (what, fields, qfilter) = args
332 if what in constants.QR_VIA_OP:
333 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
335 elif what == constants.QR_LOCK:
336 if qfilter is not None:
337 raise errors.OpPrereqError("Lock queries can't be filtered",
339 return context.glm.QueryLocks(fields)
340 elif what == constants.QR_JOB:
341 return queue.QueryJobs(fields, qfilter)
342 elif what in constants.QR_VIA_LUXI:
343 raise NotImplementedError
345 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
350 elif method == luxi.REQ_QUERY_FIELDS:
351 (what, fields) = args
352 req = objects.QueryFieldsRequest(what=what, fields=fields)
355 fielddefs = query.ALL_FIELDS[req.what]
357 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
360 return query.QueryFields(fielddefs, req.fields)
362 elif method == luxi.REQ_QUERY_JOBS:
363 (job_ids, fields) = args
364 if isinstance(job_ids, (tuple, list)) and job_ids:
365 msg = utils.CommaJoin(job_ids)
368 logging.info("Received job query request for %s", msg)
369 return queue.OldStyleQueryJobs(job_ids, fields)
371 elif method == luxi.REQ_QUERY_INSTANCES:
372 (names, fields, use_locking) = args
373 logging.info("Received instance query request for %s", names)
375 raise errors.OpPrereqError("Sync queries are not allowed",
377 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378 use_locking=use_locking)
379 return self._Query(op)
381 elif method == luxi.REQ_QUERY_NODES:
382 (names, fields, use_locking) = args
383 logging.info("Received node query request for %s", names)
385 raise errors.OpPrereqError("Sync queries are not allowed",
387 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388 use_locking=use_locking)
389 return self._Query(op)
391 elif method == luxi.REQ_QUERY_GROUPS:
392 (names, fields, use_locking) = args
393 logging.info("Received group query request for %s", names)
395 raise errors.OpPrereqError("Sync queries are not allowed",
397 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398 return self._Query(op)
400 elif method == luxi.REQ_QUERY_EXPORTS:
401 (nodes, use_locking) = args
403 raise errors.OpPrereqError("Sync queries are not allowed",
405 logging.info("Received exports query request")
406 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
407 return self._Query(op)
409 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
411 logging.info("Received config values query request for %s", fields)
412 op = opcodes.OpClusterConfigQuery(output_fields=fields)
413 return self._Query(op)
415 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
416 logging.info("Received cluster info query request")
417 op = opcodes.OpClusterQuery()
418 return self._Query(op)
420 elif method == luxi.REQ_QUERY_TAGS:
422 logging.info("Received tags query request")
423 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
424 return self._Query(op)
426 elif method == luxi.REQ_SET_DRAIN_FLAG:
427 (drain_flag, ) = args
428 logging.info("Received queue drain flag change request to %s",
430 return queue.SetDrainFlag(drain_flag)
432 elif method == luxi.REQ_SET_WATCHER_PAUSE:
436 logging.info("Received request to no longer pause the watcher")
438 if not isinstance(until, (int, float)):
439 raise TypeError("Duration must be an integer or float")
441 if until < time.time():
442 raise errors.GenericError("Unable to set pause end time in the past")
444 logging.info("Received request to pause the watcher until %s", until)
446 return _SetWatcherPause(until)
449 logging.info("Received invalid request '%s'", method)
450 raise ValueError("Invalid operation '%s'" % method)
452 def _Query(self, op):
453 """Runs the specified opcode and returns the result.
456 # Queries don't have a job id
457 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
459 # TODO: Executing an opcode using locks will acquire them in blocking mode.
460 # Consider using a timeout for retries.
461 return proc.ExecOpCode(op, None)
464 class GanetiContext(object):
465 """Context common to all ganeti threads.
467 This class creates and holds common objects shared by all threads.
470 # pylint: disable=W0212
471 # we do want to ensure a singleton here
475 """Constructs a new GanetiContext object.
477 There should be only a GanetiContext object at any time, so this
478 function raises an error if this is not the case.
481 assert self.__class__._instance is None, "double GanetiContext instance"
483 # Create global configuration object
484 self.cfg = config.ConfigWriter()
487 self.glm = locking.GanetiLockManager(
488 self.cfg.GetNodeList(),
489 self.cfg.GetNodeGroupList(),
490 self.cfg.GetInstanceList(),
491 self.cfg.GetNetworkList())
493 self.cfg.SetContext(self)
496 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
499 self.jobqueue = jqueue.JobQueue(self)
501 # setting this also locks the class against attribute modifications
502 self.__class__._instance = self
504 def __setattr__(self, name, value):
505 """Setting GanetiContext attributes is forbidden after initialization.
508 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
509 object.__setattr__(self, name, value)
511 def AddNode(self, node, ec_id):
512 """Adds a node to the configuration and lock manager.
515 # Add it to the configuration
516 self.cfg.AddNode(node, ec_id)
518 # If preseeding fails it'll not be added
519 self.jobqueue.AddNode(node)
521 # Add the new node to the Ganeti Lock Manager
522 self.glm.add(locking.LEVEL_NODE, node.name)
523 self.glm.add(locking.LEVEL_NODE_RES, node.name)
525 def ReaddNode(self, node):
526 """Updates a node that's already in the configuration
529 # Synchronize the queue again
530 self.jobqueue.AddNode(node)
532 def RemoveNode(self, name):
533 """Removes a node from the configuration and lock manager.
536 # Remove node from configuration
537 self.cfg.RemoveNode(name)
540 self.jobqueue.RemoveNode(name)
542 # Remove the node from the Ganeti Lock Manager
543 self.glm.remove(locking.LEVEL_NODE, name)
544 self.glm.remove(locking.LEVEL_NODE_RES, name)
547 def _SetWatcherPause(until):
548 """Creates or removes the watcher pause file.
550 @type until: None or int
551 @param until: Unix timestamp saying until when the watcher shouldn't run
555 utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
557 utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
558 data="%d\n" % (until, ))
564 def CheckAgreement():
565 """Check the agreement on who is the master.
567 The function uses a very simple algorithm: we must get more positive
568 than negative answers. Since in most of the cases we are the master,
569 we'll use our own config file for getting the node list. In the
570 future we could collect the current node list from our (possibly
571 obsolete) known nodes.
573 In order to account for cold-start of all nodes, we retry for up to
574 a minute until we get a real answer as the top-voted one. If the
575 nodes are more out-of-sync, for now manual startup of the master
578 Note that for a even number of nodes cluster, we need at least half
579 of the nodes (beside ourselves) to vote for us. This creates a
580 problem on two-node clusters, since in this case we require the
581 other node to be up too to confirm our status.
584 myself = netutils.Hostname.GetSysName()
585 #temp instantiation of a config writer, used only to get the node list
586 cfg = config.ConfigWriter()
587 node_list = cfg.GetNodeList()
591 votes = bootstrap.GatherMasterVotes(node_list)
593 # empty node list, this is a one node cluster
595 if votes[0][0] is None:
601 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
602 " after multiple retries. Aborting startup")
603 logging.critical("Use the --no-voting option if you understand what"
604 " effects it has on the cluster state")
606 # here a real node is at the top of the list
607 all_votes = sum(item[1] for item in votes)
608 top_node, top_votes = votes[0]
611 if top_node != myself:
612 logging.critical("It seems we are not the master (top-voted node"
613 " is %s with %d out of %d votes)", top_node, top_votes,
615 elif top_votes < all_votes - top_votes:
616 logging.critical("It seems we are not the master (%d votes for,"
617 " %d votes against)", top_votes, all_votes - top_votes)
625 def ActivateMasterIP():
627 cfg = config.ConfigWriter()
628 master_params = cfg.GetMasterNetworkParameters()
629 ems = cfg.GetUseExternalMipScript()
630 runner = rpc.BootstrapRunner()
631 result = runner.call_node_activate_master_ip(master_params.name,
634 msg = result.fail_msg
636 logging.error("Can't activate master IP address: %s", msg)
639 def CheckMasterd(options, args):
640 """Initial checks whether to run or exit with a failure.
643 if args: # masterd doesn't take any arguments
644 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
645 sys.exit(constants.EXIT_FAILURE)
647 ssconf.CheckMaster(options.debug)
650 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
651 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
653 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
654 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
655 sys.exit(constants.EXIT_FAILURE)
657 # Determine static runtime architecture information
658 runtime.InitArchInfo()
660 # Check the configuration is sane before anything else
662 config.ConfigWriter()
663 except errors.ConfigVersionMismatch, err:
664 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
665 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
666 print >> sys.stderr, \
667 ("Configuration version mismatch. The current Ganeti software"
668 " expects version %s, but the on-disk configuration file has"
669 " version %s. This is likely the result of upgrading the"
670 " software without running the upgrade procedure. Please contact"
671 " your cluster administrator or complete the upgrade using the"
672 " cfgupgrade utility, after reading the upgrade notes." %
674 sys.exit(constants.EXIT_FAILURE)
675 except errors.ConfigurationError, err:
676 print >> sys.stderr, \
677 ("Configuration error while opening the configuration file: %s\n"
678 "This might be caused by an incomplete software upgrade or"
679 " by a corrupted configuration file. Until the problem is fixed"
680 " the master daemon cannot start." % str(err))
681 sys.exit(constants.EXIT_FAILURE)
683 # If CheckMaster didn't fail we believe we are the master, but we have to
684 # confirm with the other nodes.
685 if options.no_voting:
686 if not options.yes_do_it:
687 sys.stdout.write("The 'no voting' option has been selected.\n")
688 sys.stdout.write("This is dangerous, please confirm by"
689 " typing uppercase 'yes': ")
692 confirmation = sys.stdin.readline().strip()
693 if confirmation != "YES":
694 print >> sys.stderr, "Aborting."
695 sys.exit(constants.EXIT_FAILURE)
698 # CheckAgreement uses RPC and threads, hence it needs to be run in
699 # a separate process before we call utils.Daemonize in the current
701 if not utils.RunInSeparateProcess(CheckAgreement):
702 sys.exit(constants.EXIT_FAILURE)
704 # ActivateMasterIP also uses RPC/threads, so we run it again via a
707 # TODO: decide whether failure to activate the master IP is a fatal error
708 utils.RunInSeparateProcess(ActivateMasterIP)
711 def PrepMasterd(options, _):
712 """Prep master daemon function, executed with the PID file held.
715 # This is safe to do as the pid file guarantees against
716 # concurrent execution.
717 utils.RemoveFile(pathutils.MASTER_SOCKET)
719 mainloop = daemon.Mainloop()
720 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
721 return (mainloop, master)
724 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
725 """Main master daemon function, executed with the PID file held.
728 (mainloop, master) = prep_data
734 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
736 master.server_cleanup()
740 utils.RemoveFile(pathutils.MASTER_SOCKET)
742 logging.info("Clean master daemon shutdown")
747 parser = OptionParser(description="Ganeti master daemon",
748 usage="%prog [-f] [-d]",
749 version="%%prog (ganeti) %s" %
750 constants.RELEASE_VERSION)
751 parser.add_option("--no-voting", dest="no_voting",
752 help="Do not check that the nodes agree on this node"
753 " being the master and start the daemon unconditionally",
754 default=False, action="store_true")
755 parser.add_option("--yes-do-it", dest="yes_do_it",
756 help="Override interactive check for --no-voting",
757 default=False, action="store_true")
758 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
759 ExecMasterd, multithreaded=True)