4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60 from ganeti import runtime
61 from ganeti import pathutils
64 CLIENT_REQUEST_WORKERS = 16
66 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
70 def _LogNewJob(status, info, ops):
71 """Log information about a recently submitted job.
75 logging.info("New job with id %s, summary: %s",
76 info, utils.CommaJoin(op.Summary() for op in ops))
78 logging.info("Failed to submit job, reason: '%s', summary: %s",
79 info, utils.CommaJoin(op.Summary() for op in ops))
82 class ClientRequestWorker(workerpool.BaseWorker):
83 # pylint: disable=W0221
84 def RunTask(self, server, message, client):
85 """Process the request.
88 client_ops = ClientOps(server)
91 (method, args, version) = luxi.ParseRequest(message)
92 except luxi.ProtocolError, err:
93 logging.error("Protocol Error: %s", err)
99 # Verify client's version if there was one in the request
100 if version is not None and version != constants.LUXI_VERSION:
101 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102 (constants.LUXI_VERSION, version))
104 result = client_ops.handle_request(method, args)
106 except errors.GenericError, err:
107 logging.exception("Unexpected exception")
109 result = errors.EncodeException(err)
111 logging.exception("Unexpected exception")
113 result = "Caught exception: %s" % str(err[1])
116 reply = luxi.FormatResponse(success, result)
117 client.send_message(reply)
118 # awake the main thread so that it can write out the data.
119 server.awaker.signal()
120 except: # pylint: disable=W0702
121 logging.exception("Send error")
125 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126 """Handler for master peers.
131 def __init__(self, server, connected_socket, client_address, family):
132 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
135 family, self._MAX_UNHANDLED)
138 def handle_message(self, message, _):
139 self.server.request_workers.AddTask((self.server, message, self))
142 class _MasterShutdownCheck:
143 """Logic for master daemon shutdown.
146 #: How long to wait between checks
147 _CHECK_INTERVAL = 5.0
149 #: How long to wait after all jobs are done (e.g. to give clients time to
150 #: retrieve the job status)
151 _SHUTDOWN_LINGER = 5.0
154 """Initializes this class.
157 self._had_active_jobs = None
158 self._linger_timeout = None
160 def __call__(self, jq_prepare_result):
161 """Determines if master daemon is ready for shutdown.
163 @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164 @rtype: None or number
165 @return: None if master daemon is ready, timeout if the check must be
169 if jq_prepare_result:
170 # Check again shortly
171 logging.info("Job queue has been notified for shutdown but is still"
172 " busy; next check in %s seconds", self._CHECK_INTERVAL)
173 self._had_active_jobs = True
174 return self._CHECK_INTERVAL
176 if not self._had_active_jobs:
177 # Can shut down as there were no active jobs on the first check
180 # No jobs are running anymore, but maybe some clients want to collect some
181 # information. Give them a short amount of time.
182 if self._linger_timeout is None:
183 self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
185 remaining = self._linger_timeout.Remaining()
187 logging.info("Job queue no longer busy; shutting down master daemon"
188 " in %s seconds", remaining)
190 # TODO: Should the master daemon socket be closed at this point? Doing so
191 # wouldn't affect existing connections.
199 class MasterServer(daemon.AsyncStreamServer):
202 This is the main asynchronous master server. It handles connections to the
206 family = socket.AF_UNIX
208 def __init__(self, address, uid, gid):
209 """MasterServer constructor
211 @param address: the unix socket address to bind the MasterServer to
212 @param uid: The uid of the owner of the socket
213 @param gid: The gid of the owner of the socket
216 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218 os.chmod(temp_name, 0770)
219 os.chown(temp_name, uid, gid)
220 os.rename(temp_name, address)
222 self.awaker = daemon.AsyncAwaker()
224 # We'll only start threads once we've forked.
226 self.request_workers = None
228 self._shutdown_check = None
230 def handle_connection(self, connected_socket, client_address):
231 # TODO: add connection count and limit the number of open connections to a
232 # maximum number to avoid breaking for lack of file descriptors or memory.
233 MasterClientHandler(self, connected_socket, client_address, self.family)
235 def setup_queue(self):
236 self.context = GanetiContext()
237 self.request_workers = workerpool.WorkerPool("ClientReq",
238 CLIENT_REQUEST_WORKERS,
241 def WaitForShutdown(self):
242 """Prepares server for shutdown.
245 if self._shutdown_check is None:
246 self._shutdown_check = _MasterShutdownCheck()
248 return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
250 def server_cleanup(self):
251 """Cleanup the server.
253 This involves shutting down the processor threads and the master
260 if self.request_workers:
261 self.request_workers.TerminateWorkers()
263 self.context.jobqueue.Shutdown()
267 """Class holding high-level client operations."""
268 def __init__(self, server):
271 def handle_request(self, method, args): # pylint: disable=R0911
272 context = self.server.context
273 queue = context.jobqueue
275 # TODO: Parameter validation
276 if not isinstance(args, (tuple, list)):
277 logging.info("Received invalid arguments of type '%s'", type(args))
278 raise ValueError("Invalid arguments type '%s'" % type(args))
280 # TODO: Rewrite to not exit in each 'if/elif' branch
282 if method == luxi.REQ_SUBMIT_JOB:
283 logging.info("Receiving new job")
285 ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286 job_id = queue.SubmitJob(ops)
287 _LogNewJob(True, job_id, ops)
290 elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291 logging.info("Receiving multiple jobs")
295 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296 job_ids = queue.SubmitManyJobs(jobs)
297 for ((status, job_id), ops) in zip(job_ids, jobs):
298 _LogNewJob(status, job_id, ops)
301 elif method == luxi.REQ_CANCEL_JOB:
303 logging.info("Received job cancel request for %s", job_id)
304 return queue.CancelJob(job_id)
306 elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307 (job_id, priority) = args
308 logging.info("Received request to change priority for job %s to %s",
310 return queue.ChangeJobPriority(job_id, priority)
312 elif method == luxi.REQ_ARCHIVE_JOB:
314 logging.info("Received job archive request for %s", job_id)
315 return queue.ArchiveJob(job_id)
317 elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318 (age, timeout) = args
319 logging.info("Received job autoarchive request for age %s, timeout %s",
321 return queue.AutoArchiveJobs(age, timeout)
323 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325 logging.info("Received job poll request for %s", job_id)
326 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327 prev_log_serial, timeout)
329 elif method == luxi.REQ_QUERY:
330 (what, fields, qfilter) = args
332 if what in constants.QR_VIA_OP:
333 result = self._Query(opcodes.OpQuery(what=what, fields=fields,
335 elif what == constants.QR_LOCK:
336 if qfilter is not None:
337 raise errors.OpPrereqError("Lock queries can't be filtered",
339 return context.glm.QueryLocks(fields)
340 elif what == constants.QR_JOB:
341 return queue.QueryJobs(fields, qfilter)
342 elif what in constants.QR_VIA_LUXI:
343 raise NotImplementedError
345 raise errors.OpPrereqError("Resource type '%s' unknown" % what,
350 elif method == luxi.REQ_QUERY_FIELDS:
351 (what, fields) = args
352 req = objects.QueryFieldsRequest(what=what, fields=fields)
355 fielddefs = query.ALL_FIELDS[req.what]
357 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
360 return query.QueryFields(fielddefs, req.fields)
362 elif method == luxi.REQ_QUERY_JOBS:
363 (job_ids, fields) = args
364 if isinstance(job_ids, (tuple, list)) and job_ids:
365 msg = utils.CommaJoin(job_ids)
368 logging.info("Received job query request for %s", msg)
369 return queue.OldStyleQueryJobs(job_ids, fields)
371 elif method == luxi.REQ_QUERY_INSTANCES:
372 (names, fields, use_locking) = args
373 logging.info("Received instance query request for %s", names)
375 raise errors.OpPrereqError("Sync queries are not allowed",
377 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378 use_locking=use_locking)
379 return self._Query(op)
381 elif method == luxi.REQ_QUERY_NODES:
382 (names, fields, use_locking) = args
383 logging.info("Received node query request for %s", names)
385 raise errors.OpPrereqError("Sync queries are not allowed",
387 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388 use_locking=use_locking)
389 return self._Query(op)
391 elif method == luxi.REQ_QUERY_GROUPS:
392 (names, fields, use_locking) = args
393 logging.info("Received group query request for %s", names)
395 raise errors.OpPrereqError("Sync queries are not allowed",
397 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398 return self._Query(op)
400 elif method == luxi.REQ_QUERY_NETWORKS:
401 (names, fields, use_locking) = args
402 logging.info("Received network query request for %s", names)
404 raise errors.OpPrereqError("Sync queries are not allowed",
406 op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
407 return self._Query(op)
409 elif method == luxi.REQ_QUERY_EXPORTS:
410 (nodes, use_locking) = args
412 raise errors.OpPrereqError("Sync queries are not allowed",
414 logging.info("Received exports query request")
415 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
416 return self._Query(op)
418 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
420 logging.info("Received config values query request for %s", fields)
421 op = opcodes.OpClusterConfigQuery(output_fields=fields)
422 return self._Query(op)
424 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
425 logging.info("Received cluster info query request")
426 op = opcodes.OpClusterQuery()
427 return self._Query(op)
429 elif method == luxi.REQ_QUERY_TAGS:
431 logging.info("Received tags query request")
432 op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
433 return self._Query(op)
435 elif method == luxi.REQ_SET_DRAIN_FLAG:
436 (drain_flag, ) = args
437 logging.info("Received queue drain flag change request to %s",
439 return queue.SetDrainFlag(drain_flag)
441 elif method == luxi.REQ_SET_WATCHER_PAUSE:
445 logging.info("Received request to no longer pause the watcher")
447 if not isinstance(until, (int, float)):
448 raise TypeError("Duration must be an integer or float")
450 if until < time.time():
451 raise errors.GenericError("Unable to set pause end time in the past")
453 logging.info("Received request to pause the watcher until %s", until)
455 return _SetWatcherPause(until)
458 logging.info("Received invalid request '%s'", method)
459 raise ValueError("Invalid operation '%s'" % method)
461 def _Query(self, op):
462 """Runs the specified opcode and returns the result.
465 # Queries don't have a job id
466 proc = mcpu.Processor(self.server.context, None, enable_locks=False)
468 # TODO: Executing an opcode using locks will acquire them in blocking mode.
469 # Consider using a timeout for retries.
470 return proc.ExecOpCode(op, None)
473 class GanetiContext(object):
474 """Context common to all ganeti threads.
476 This class creates and holds common objects shared by all threads.
479 # pylint: disable=W0212
480 # we do want to ensure a singleton here
484 """Constructs a new GanetiContext object.
486 There should be only a GanetiContext object at any time, so this
487 function raises an error if this is not the case.
490 assert self.__class__._instance is None, "double GanetiContext instance"
492 # Create global configuration object
493 self.cfg = config.ConfigWriter()
496 self.glm = locking.GanetiLockManager(
497 self.cfg.GetNodeList(),
498 self.cfg.GetNodeGroupList(),
499 self.cfg.GetInstanceList(),
500 self.cfg.GetNetworkList())
502 self.cfg.SetContext(self)
505 self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
508 self.jobqueue = jqueue.JobQueue(self)
510 # setting this also locks the class against attribute modifications
511 self.__class__._instance = self
513 def __setattr__(self, name, value):
514 """Setting GanetiContext attributes is forbidden after initialization.
517 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
518 object.__setattr__(self, name, value)
520 def AddNode(self, node, ec_id):
521 """Adds a node to the configuration and lock manager.
524 # Add it to the configuration
525 self.cfg.AddNode(node, ec_id)
527 # If preseeding fails it'll not be added
528 self.jobqueue.AddNode(node)
530 # Add the new node to the Ganeti Lock Manager
531 self.glm.add(locking.LEVEL_NODE, node.name)
532 self.glm.add(locking.LEVEL_NODE_RES, node.name)
534 def ReaddNode(self, node):
535 """Updates a node that's already in the configuration
538 # Synchronize the queue again
539 self.jobqueue.AddNode(node)
541 def RemoveNode(self, name):
542 """Removes a node from the configuration and lock manager.
545 # Remove node from configuration
546 self.cfg.RemoveNode(name)
549 self.jobqueue.RemoveNode(name)
551 # Remove the node from the Ganeti Lock Manager
552 self.glm.remove(locking.LEVEL_NODE, name)
553 self.glm.remove(locking.LEVEL_NODE_RES, name)
556 def _SetWatcherPause(until):
557 """Creates or removes the watcher pause file.
559 @type until: None or int
560 @param until: Unix timestamp saying until when the watcher shouldn't run
564 utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
566 utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
567 data="%d\n" % (until, ))
573 def CheckAgreement():
574 """Check the agreement on who is the master.
576 The function uses a very simple algorithm: we must get more positive
577 than negative answers. Since in most of the cases we are the master,
578 we'll use our own config file for getting the node list. In the
579 future we could collect the current node list from our (possibly
580 obsolete) known nodes.
582 In order to account for cold-start of all nodes, we retry for up to
583 a minute until we get a real answer as the top-voted one. If the
584 nodes are more out-of-sync, for now manual startup of the master
587 Note that for a even number of nodes cluster, we need at least half
588 of the nodes (beside ourselves) to vote for us. This creates a
589 problem on two-node clusters, since in this case we require the
590 other node to be up too to confirm our status.
593 myself = netutils.Hostname.GetSysName()
594 #temp instantiation of a config writer, used only to get the node list
595 cfg = config.ConfigWriter()
596 node_list = cfg.GetNodeList()
600 votes = bootstrap.GatherMasterVotes(node_list)
602 # empty node list, this is a one node cluster
604 if votes[0][0] is None:
610 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
611 " after multiple retries. Aborting startup")
612 logging.critical("Use the --no-voting option if you understand what"
613 " effects it has on the cluster state")
615 # here a real node is at the top of the list
616 all_votes = sum(item[1] for item in votes)
617 top_node, top_votes = votes[0]
620 if top_node != myself:
621 logging.critical("It seems we are not the master (top-voted node"
622 " is %s with %d out of %d votes)", top_node, top_votes,
624 elif top_votes < all_votes - top_votes:
625 logging.critical("It seems we are not the master (%d votes for,"
626 " %d votes against)", top_votes, all_votes - top_votes)
634 def ActivateMasterIP():
636 cfg = config.ConfigWriter()
637 master_params = cfg.GetMasterNetworkParameters()
638 ems = cfg.GetUseExternalMipScript()
639 runner = rpc.BootstrapRunner()
640 result = runner.call_node_activate_master_ip(master_params.name,
643 msg = result.fail_msg
645 logging.error("Can't activate master IP address: %s", msg)
648 def CheckMasterd(options, args):
649 """Initial checks whether to run or exit with a failure.
652 if args: # masterd doesn't take any arguments
653 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
654 sys.exit(constants.EXIT_FAILURE)
656 ssconf.CheckMaster(options.debug)
659 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
660 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
662 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
663 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
664 sys.exit(constants.EXIT_FAILURE)
666 # Determine static runtime architecture information
667 runtime.InitArchInfo()
669 # Check the configuration is sane before anything else
671 config.ConfigWriter()
672 except errors.ConfigVersionMismatch, err:
673 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
674 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
675 print >> sys.stderr, \
676 ("Configuration version mismatch. The current Ganeti software"
677 " expects version %s, but the on-disk configuration file has"
678 " version %s. This is likely the result of upgrading the"
679 " software without running the upgrade procedure. Please contact"
680 " your cluster administrator or complete the upgrade using the"
681 " cfgupgrade utility, after reading the upgrade notes." %
683 sys.exit(constants.EXIT_FAILURE)
684 except errors.ConfigurationError, err:
685 print >> sys.stderr, \
686 ("Configuration error while opening the configuration file: %s\n"
687 "This might be caused by an incomplete software upgrade or"
688 " by a corrupted configuration file. Until the problem is fixed"
689 " the master daemon cannot start." % str(err))
690 sys.exit(constants.EXIT_FAILURE)
692 # If CheckMaster didn't fail we believe we are the master, but we have to
693 # confirm with the other nodes.
694 if options.no_voting:
695 if not options.yes_do_it:
696 sys.stdout.write("The 'no voting' option has been selected.\n")
697 sys.stdout.write("This is dangerous, please confirm by"
698 " typing uppercase 'yes': ")
701 confirmation = sys.stdin.readline().strip()
702 if confirmation != "YES":
703 print >> sys.stderr, "Aborting."
704 sys.exit(constants.EXIT_FAILURE)
707 # CheckAgreement uses RPC and threads, hence it needs to be run in
708 # a separate process before we call utils.Daemonize in the current
710 if not utils.RunInSeparateProcess(CheckAgreement):
711 sys.exit(constants.EXIT_FAILURE)
713 # ActivateMasterIP also uses RPC/threads, so we run it again via a
716 # TODO: decide whether failure to activate the master IP is a fatal error
717 utils.RunInSeparateProcess(ActivateMasterIP)
720 def PrepMasterd(options, _):
721 """Prep master daemon function, executed with the PID file held.
724 # This is safe to do as the pid file guarantees against
725 # concurrent execution.
726 utils.RemoveFile(pathutils.MASTER_SOCKET)
728 mainloop = daemon.Mainloop()
729 master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
730 return (mainloop, master)
733 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
734 """Main master daemon function, executed with the PID file held.
737 (mainloop, master) = prep_data
743 mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
745 master.server_cleanup()
749 utils.RemoveFile(pathutils.MASTER_SOCKET)
751 logging.info("Clean master daemon shutdown")
756 parser = OptionParser(description="Ganeti master daemon",
757 usage="%prog [-f] [-d]",
758 version="%%prog (ganeti) %s" %
759 constants.RELEASE_VERSION)
760 parser.add_option("--no-voting", dest="no_voting",
761 help="Do not check that the nodes agree on this node"
762 " being the master and start the daemon unconditionally",
763 default=False, action="store_true")
764 parser.add_option("--yes-do-it", dest="yes_do_it",
765 help="Override interactive check for --no-voting",
766 default=False, action="store_true")
767 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
768 ExecMasterd, multithreaded=True)