4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
62 CLIENT_REQUEST_WORKERS = 16
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68 class ClientRequestWorker(workerpool.BaseWorker):
69 # pylint: disable-msg=W0221
70 def RunTask(self, server, message, client):
71 """Process the request.
74 client_ops = ClientOps(server)
77 (method, args, version) = luxi.ParseRequest(message)
78 except luxi.ProtocolError, err:
79 logging.error("Protocol Error: %s", err)
85 # Verify client's version if there was one in the request
86 if version is not None and version != constants.LUXI_VERSION:
87 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88 (constants.LUXI_VERSION, version))
90 result = client_ops.handle_request(method, args)
92 except errors.GenericError, err:
93 logging.exception("Unexpected exception")
95 result = errors.EncodeException(err)
97 logging.exception("Unexpected exception")
99 result = "Caught exception: %s" % str(err[1])
102 reply = luxi.FormatResponse(success, result)
103 client.send_message(reply)
104 # awake the main thread so that it can write out the data.
105 server.awaker.signal()
106 except: # pylint: disable-msg=W0702
107 logging.exception("Send error")
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112 """Handler for master peers.
116 def __init__(self, server, connected_socket, client_address, family):
117 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
120 family, self._MAX_UNHANDLED)
123 def handle_message(self, message, _):
124 self.server.request_workers.AddTask((self.server, message, self))
127 class MasterServer(daemon.AsyncStreamServer):
130 This is the main asynchronous master server. It handles connections to the
134 family = socket.AF_UNIX
136 def __init__(self, mainloop, address, uid, gid):
137 """MasterServer constructor
139 @type mainloop: ganeti.daemon.Mainloop
140 @param mainloop: Mainloop used to poll for I/O events
141 @param address: the unix socket address to bind the MasterServer to
142 @param uid: The uid of the owner of the socket
143 @param gid: The gid of the owner of the socket
146 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148 os.chmod(temp_name, 0770)
149 os.chown(temp_name, uid, gid)
150 os.rename(temp_name, address)
152 self.mainloop = mainloop
153 self.awaker = daemon.AsyncAwaker()
155 # We'll only start threads once we've forked.
157 self.request_workers = None
159 def handle_connection(self, connected_socket, client_address):
160 # TODO: add connection count and limit the number of open connections to a
161 # maximum number to avoid breaking for lack of file descriptors or memory.
162 MasterClientHandler(self, connected_socket, client_address, self.family)
164 def setup_queue(self):
165 self.context = GanetiContext()
166 self.request_workers = workerpool.WorkerPool("ClientReq",
167 CLIENT_REQUEST_WORKERS,
170 def server_cleanup(self):
171 """Cleanup the server.
173 This involves shutting down the processor threads and the master
180 if self.request_workers:
181 self.request_workers.TerminateWorkers()
183 self.context.jobqueue.Shutdown()
187 """Class holding high-level client operations."""
188 def __init__(self, server):
191 def handle_request(self, method, args): # pylint: disable-msg=R0911
192 queue = self.server.context.jobqueue
194 # TODO: Parameter validation
196 # TODO: Rewrite to not exit in each 'if/elif' branch
198 if method == luxi.REQ_SUBMIT_JOB:
199 logging.info("Received new job")
200 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201 return queue.SubmitJob(ops)
203 if method == luxi.REQ_SUBMIT_MANY_JOBS:
204 logging.info("Received multiple jobs")
207 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208 return queue.SubmitManyJobs(jobs)
210 elif method == luxi.REQ_CANCEL_JOB:
212 logging.info("Received job cancel request for %s", job_id)
213 return queue.CancelJob(job_id)
215 elif method == luxi.REQ_ARCHIVE_JOB:
217 logging.info("Received job archive request for %s", job_id)
218 return queue.ArchiveJob(job_id)
220 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221 (age, timeout) = args
222 logging.info("Received job autoarchive request for age %s, timeout %s",
224 return queue.AutoArchiveJobs(age, timeout)
226 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228 logging.info("Received job poll request for %s", job_id)
229 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230 prev_log_serial, timeout)
232 elif method == luxi.REQ_QUERY:
233 req = objects.QueryRequest.FromDict(args)
235 if req.what in constants.QR_VIA_OP:
236 result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
238 elif req.what == constants.QR_LOCK:
239 if req.filter is not None:
240 raise errors.OpPrereqError("Lock queries can't be filtered")
241 return self.server.context.glm.QueryLocks(req.fields)
242 elif req.what in constants.QR_VIA_LUXI:
243 raise NotImplementedError
245 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
250 elif method == luxi.REQ_QUERY_FIELDS:
251 req = objects.QueryFieldsRequest.FromDict(args)
253 if req.what in constants.QR_VIA_OP:
254 result = self._Query(opcodes.OpQueryFields(what=req.what,
256 elif req.what == constants.QR_LOCK:
257 return query.QueryFields(query.LOCK_FIELDS, req.fields)
258 elif req.what in constants.QR_VIA_LUXI:
259 raise NotImplementedError
261 raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
266 elif method == luxi.REQ_QUERY_JOBS:
267 (job_ids, fields) = args
268 if isinstance(job_ids, (tuple, list)) and job_ids:
269 msg = utils.CommaJoin(job_ids)
272 logging.info("Received job query request for %s", msg)
273 return queue.QueryJobs(job_ids, fields)
275 elif method == luxi.REQ_QUERY_INSTANCES:
276 (names, fields, use_locking) = args
277 logging.info("Received instance query request for %s", names)
279 raise errors.OpPrereqError("Sync queries are not allowed",
281 op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
282 use_locking=use_locking)
283 return self._Query(op)
285 elif method == luxi.REQ_QUERY_NODES:
286 (names, fields, use_locking) = args
287 logging.info("Received node query request for %s", names)
289 raise errors.OpPrereqError("Sync queries are not allowed",
291 op = opcodes.OpNodeQuery(names=names, output_fields=fields,
292 use_locking=use_locking)
293 return self._Query(op)
295 elif method == luxi.REQ_QUERY_GROUPS:
296 (names, fields, use_locking) = args
297 logging.info("Received group query request for %s", names)
299 raise errors.OpPrereqError("Sync queries are not allowed",
301 op = opcodes.OpGroupQuery(names=names, output_fields=fields)
302 return self._Query(op)
304 elif method == luxi.REQ_QUERY_EXPORTS:
305 nodes, use_locking = args
307 raise errors.OpPrereqError("Sync queries are not allowed",
309 logging.info("Received exports query request")
310 op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
311 return self._Query(op)
313 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
315 logging.info("Received config values query request for %s", fields)
316 op = opcodes.OpClusterConfigQuery(output_fields=fields)
317 return self._Query(op)
319 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
320 logging.info("Received cluster info query request")
321 op = opcodes.OpClusterQuery()
322 return self._Query(op)
324 elif method == luxi.REQ_QUERY_TAGS:
326 logging.info("Received tags query request")
327 op = opcodes.OpTagsGet(kind=kind, name=name)
328 return self._Query(op)
330 elif method == luxi.REQ_QUERY_LOCKS:
331 (fields, sync) = args
332 logging.info("Received locks query request")
334 raise NotImplementedError("Synchronous queries are not implemented")
335 return self.server.context.glm.OldStyleQueryLocks(fields)
337 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
339 logging.info("Received queue drain flag change request to %s",
341 return queue.SetDrainFlag(drain_flag)
343 elif method == luxi.REQ_SET_WATCHER_PAUSE:
347 logging.info("Received request to no longer pause the watcher")
349 if not isinstance(until, (int, float)):
350 raise TypeError("Duration must be an integer or float")
352 if until < time.time():
353 raise errors.GenericError("Unable to set pause end time in the past")
355 logging.info("Received request to pause the watcher until %s", until)
357 return _SetWatcherPause(until)
360 logging.info("Received invalid request '%s'", method)
361 raise ValueError("Invalid operation '%s'" % method)
363 def _Query(self, op):
364 """Runs the specified opcode and returns the result.
367 # Queries don't have a job id
368 proc = mcpu.Processor(self.server.context, None)
370 # TODO: Executing an opcode using locks will acquire them in blocking mode.
371 # Consider using a timeout for retries.
372 return proc.ExecOpCode(op, None)
375 class GanetiContext(object):
376 """Context common to all ganeti threads.
378 This class creates and holds common objects shared by all threads.
381 # pylint: disable-msg=W0212
382 # we do want to ensure a singleton here
386 """Constructs a new GanetiContext object.
388 There should be only a GanetiContext object at any time, so this
389 function raises an error if this is not the case.
392 assert self.__class__._instance is None, "double GanetiContext instance"
394 # Create global configuration object
395 self.cfg = config.ConfigWriter()
398 self.glm = locking.GanetiLockManager(
399 self.cfg.GetNodeList(),
400 self.cfg.GetNodeGroupList(),
401 self.cfg.GetInstanceList())
404 self.jobqueue = jqueue.JobQueue(self)
406 # setting this also locks the class against attribute modifications
407 self.__class__._instance = self
409 def __setattr__(self, name, value):
410 """Setting GanetiContext attributes is forbidden after initialization.
413 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
414 object.__setattr__(self, name, value)
416 def AddNode(self, node, ec_id):
417 """Adds a node to the configuration and lock manager.
420 # Add it to the configuration
421 self.cfg.AddNode(node, ec_id)
423 # If preseeding fails it'll not be added
424 self.jobqueue.AddNode(node)
426 # Add the new node to the Ganeti Lock Manager
427 self.glm.add(locking.LEVEL_NODE, node.name)
429 def ReaddNode(self, node):
430 """Updates a node that's already in the configuration
433 # Synchronize the queue again
434 self.jobqueue.AddNode(node)
436 def RemoveNode(self, name):
437 """Removes a node from the configuration and lock manager.
440 # Remove node from configuration
441 self.cfg.RemoveNode(name)
444 self.jobqueue.RemoveNode(name)
446 # Remove the node from the Ganeti Lock Manager
447 self.glm.remove(locking.LEVEL_NODE, name)
450 def _SetWatcherPause(until):
451 """Creates or removes the watcher pause file.
453 @type until: None or int
454 @param until: Unix timestamp saying until when the watcher shouldn't run
458 utils.RemoveFile(constants.WATCHER_PAUSEFILE)
460 utils.WriteFile(constants.WATCHER_PAUSEFILE,
461 data="%d\n" % (until, ))
467 def CheckAgreement():
468 """Check the agreement on who is the master.
470 The function uses a very simple algorithm: we must get more positive
471 than negative answers. Since in most of the cases we are the master,
472 we'll use our own config file for getting the node list. In the
473 future we could collect the current node list from our (possibly
474 obsolete) known nodes.
476 In order to account for cold-start of all nodes, we retry for up to
477 a minute until we get a real answer as the top-voted one. If the
478 nodes are more out-of-sync, for now manual startup of the master
481 Note that for a even number of nodes cluster, we need at least half
482 of the nodes (beside ourselves) to vote for us. This creates a
483 problem on two-node clusters, since in this case we require the
484 other node to be up too to confirm our status.
487 myself = netutils.Hostname.GetSysName()
488 #temp instantiation of a config writer, used only to get the node list
489 cfg = config.ConfigWriter()
490 node_list = cfg.GetNodeList()
494 votes = bootstrap.GatherMasterVotes(node_list)
496 # empty node list, this is a one node cluster
498 if votes[0][0] is None:
504 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
505 " after multiple retries. Aborting startup")
506 logging.critical("Use the --no-voting option if you understand what"
507 " effects it has on the cluster state")
509 # here a real node is at the top of the list
510 all_votes = sum(item[1] for item in votes)
511 top_node, top_votes = votes[0]
514 if top_node != myself:
515 logging.critical("It seems we are not the master (top-voted node"
516 " is %s with %d out of %d votes)", top_node, top_votes,
518 elif top_votes < all_votes - top_votes:
519 logging.critical("It seems we are not the master (%d votes for,"
520 " %d votes against)", top_votes, all_votes - top_votes)
528 def ActivateMasterIP():
530 master_node = ssconf.SimpleStore().GetMasterNode()
531 result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
532 msg = result.fail_msg
534 logging.error("Can't activate master IP address: %s", msg)
537 def CheckMasterd(options, args):
538 """Initial checks whether to run or exit with a failure.
541 if args: # masterd doesn't take any arguments
542 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543 sys.exit(constants.EXIT_FAILURE)
545 ssconf.CheckMaster(options.debug)
548 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
549 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
551 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
552 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
553 sys.exit(constants.EXIT_FAILURE)
555 # Check the configuration is sane before anything else
557 config.ConfigWriter()
558 except errors.ConfigVersionMismatch, err:
559 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
560 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
561 print >> sys.stderr, \
562 ("Configuration version mismatch. The current Ganeti software"
563 " expects version %s, but the on-disk configuration file has"
564 " version %s. This is likely the result of upgrading the"
565 " software without running the upgrade procedure. Please contact"
566 " your cluster administrator or complete the upgrade using the"
567 " cfgupgrade utility, after reading the upgrade notes." %
569 sys.exit(constants.EXIT_FAILURE)
570 except errors.ConfigurationError, err:
571 print >> sys.stderr, \
572 ("Configuration error while opening the configuration file: %s\n"
573 "This might be caused by an incomplete software upgrade or"
574 " by a corrupted configuration file. Until the problem is fixed"
575 " the master daemon cannot start." % str(err))
576 sys.exit(constants.EXIT_FAILURE)
578 # If CheckMaster didn't fail we believe we are the master, but we have to
579 # confirm with the other nodes.
580 if options.no_voting:
581 if options.yes_do_it:
584 sys.stdout.write("The 'no voting' option has been selected.\n")
585 sys.stdout.write("This is dangerous, please confirm by"
586 " typing uppercase 'yes': ")
589 confirmation = sys.stdin.readline().strip()
590 if confirmation != "YES":
591 print >> sys.stderr, "Aborting."
592 sys.exit(constants.EXIT_FAILURE)
596 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
597 # process before we call utils.Daemonize in the current process.
598 if not utils.RunInSeparateProcess(CheckAgreement):
599 sys.exit(constants.EXIT_FAILURE)
601 # ActivateMasterIP also uses RPC/threads, so we run it again via a
604 # TODO: decide whether failure to activate the master IP is a fatal error
605 utils.RunInSeparateProcess(ActivateMasterIP)
608 def PrepMasterd(options, _):
609 """Prep master daemon function, executed with the PID file held.
612 # This is safe to do as the pid file guarantees against
613 # concurrent execution.
614 utils.RemoveFile(constants.MASTER_SOCKET)
616 mainloop = daemon.Mainloop()
617 master = MasterServer(mainloop, constants.MASTER_SOCKET,
618 options.uid, options.gid)
619 return (mainloop, master)
622 def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
623 """Main master daemon function, executed with the PID file held.
626 (mainloop, master) = prep_data
634 master.server_cleanup()
638 utils.RemoveFile(constants.MASTER_SOCKET)
643 parser = OptionParser(description="Ganeti master daemon",
644 usage="%prog [-f] [-d]",
645 version="%%prog (ganeti) %s" %
646 constants.RELEASE_VERSION)
647 parser.add_option("--no-voting", dest="no_voting",
648 help="Do not check that the nodes agree on this node"
649 " being the master and start the daemon unconditionally",
650 default=False, action="store_true")
651 parser.add_option("--yes-do-it", dest="yes_do_it",
652 help="Override interactive check for --no-voting",
653 default=False, action="store_true")
654 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
655 ExecMasterd, multithreaded=True)