4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
60 CLIENT_REQUEST_WORKERS = 16
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66 class ClientRequestWorker(workerpool.BaseWorker):
67 # pylint: disable-msg=W0221
68 def RunTask(self, server, message, client):
69 """Process the request.
72 client_ops = ClientOps(server)
75 (method, args, version) = luxi.ParseRequest(message)
76 except luxi.ProtocolError, err:
77 logging.error("Protocol Error: %s", err)
83 # Verify client's version if there was one in the request
84 if version is not None and version != constants.LUXI_VERSION:
85 raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
86 (constants.LUXI_VERSION, version))
88 result = client_ops.handle_request(method, args)
90 except errors.GenericError, err:
91 logging.exception("Unexpected exception")
93 result = errors.EncodeException(err)
95 logging.exception("Unexpected exception")
97 result = "Caught exception: %s" % str(err[1])
100 reply = luxi.FormatResponse(success, result)
101 client.send_message(reply)
102 # awake the main thread so that it can write out the data.
103 server.awaker.signal()
104 except: # pylint: disable-msg=W0702
105 logging.exception("Send error")
109 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
110 """Handler for master peers.
114 def __init__(self, server, connected_socket, client_address, family):
115 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
118 family, self._MAX_UNHANDLED)
121 def handle_message(self, message, _):
122 self.server.request_workers.AddTask((self.server, message, self))
125 class MasterServer(daemon.AsyncStreamServer):
128 This is the main asynchronous master server. It handles connections to the
132 family = socket.AF_UNIX
134 def __init__(self, mainloop, address, uid, gid):
135 """MasterServer constructor
137 @type mainloop: ganeti.daemon.Mainloop
138 @param mainloop: Mainloop used to poll for I/O events
139 @param address: the unix socket address to bind the MasterServer to
140 @param uid: The uid of the owner of the socket
141 @param gid: The gid of the owner of the socket
144 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
145 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
146 os.chmod(temp_name, 0770)
147 os.chown(temp_name, uid, gid)
148 os.rename(temp_name, address)
150 self.mainloop = mainloop
151 self.awaker = daemon.AsyncAwaker()
153 # We'll only start threads once we've forked.
155 self.request_workers = None
157 def handle_connection(self, connected_socket, client_address):
158 # TODO: add connection count and limit the number of open connections to a
159 # maximum number to avoid breaking for lack of file descriptors or memory.
160 MasterClientHandler(self, connected_socket, client_address, self.family)
162 def setup_queue(self):
163 self.context = GanetiContext()
164 self.request_workers = workerpool.WorkerPool("ClientReq",
165 CLIENT_REQUEST_WORKERS,
168 def server_cleanup(self):
169 """Cleanup the server.
171 This involves shutting down the processor threads and the master
178 if self.request_workers:
179 self.request_workers.TerminateWorkers()
181 self.context.jobqueue.Shutdown()
185 """Class holding high-level client operations."""
186 def __init__(self, server):
189 def handle_request(self, method, args): # pylint: disable-msg=R0911
190 queue = self.server.context.jobqueue
192 # TODO: Parameter validation
194 # TODO: Rewrite to not exit in each 'if/elif' branch
196 if method == luxi.REQ_SUBMIT_JOB:
197 logging.info("Received new job")
198 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
199 return queue.SubmitJob(ops)
201 if method == luxi.REQ_SUBMIT_MANY_JOBS:
202 logging.info("Received multiple jobs")
205 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
206 return queue.SubmitManyJobs(jobs)
208 elif method == luxi.REQ_CANCEL_JOB:
210 logging.info("Received job cancel request for %s", job_id)
211 return queue.CancelJob(job_id)
213 elif method == luxi.REQ_ARCHIVE_JOB:
215 logging.info("Received job archive request for %s", job_id)
216 return queue.ArchiveJob(job_id)
218 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
219 (age, timeout) = args
220 logging.info("Received job autoarchive request for age %s, timeout %s",
222 return queue.AutoArchiveJobs(age, timeout)
224 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
225 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
226 logging.info("Received job poll request for %s", job_id)
227 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
228 prev_log_serial, timeout)
230 elif method == luxi.REQ_QUERY_JOBS:
231 (job_ids, fields) = args
232 if isinstance(job_ids, (tuple, list)) and job_ids:
233 msg = utils.CommaJoin(job_ids)
236 logging.info("Received job query request for %s", msg)
237 return queue.QueryJobs(job_ids, fields)
239 elif method == luxi.REQ_QUERY_INSTANCES:
240 (names, fields, use_locking) = args
241 logging.info("Received instance query request for %s", names)
243 raise errors.OpPrereqError("Sync queries are not allowed",
245 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
246 use_locking=use_locking)
247 return self._Query(op)
249 elif method == luxi.REQ_QUERY_NODES:
250 (names, fields, use_locking) = args
251 logging.info("Received node query request for %s", names)
253 raise errors.OpPrereqError("Sync queries are not allowed",
255 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
256 use_locking=use_locking)
257 return self._Query(op)
259 elif method == luxi.REQ_QUERY_EXPORTS:
260 nodes, use_locking = args
262 raise errors.OpPrereqError("Sync queries are not allowed",
264 logging.info("Received exports query request")
265 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
266 return self._Query(op)
268 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
270 logging.info("Received config values query request for %s", fields)
271 op = opcodes.OpQueryConfigValues(output_fields=fields)
272 return self._Query(op)
274 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
275 logging.info("Received cluster info query request")
276 op = opcodes.OpQueryClusterInfo()
277 return self._Query(op)
279 elif method == luxi.REQ_QUERY_TAGS:
281 logging.info("Received tags query request")
282 op = opcodes.OpGetTags(kind=kind, name=name)
283 return self._Query(op)
285 elif method == luxi.REQ_QUERY_LOCKS:
286 (fields, sync) = args
287 logging.info("Received locks query request")
288 return self.server.context.glm.QueryLocks(fields, sync)
290 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
292 logging.info("Received queue drain flag change request to %s",
294 return queue.SetDrainFlag(drain_flag)
296 elif method == luxi.REQ_SET_WATCHER_PAUSE:
300 logging.info("Received request to no longer pause the watcher")
302 if not isinstance(until, (int, float)):
303 raise TypeError("Duration must be an integer or float")
305 if until < time.time():
306 raise errors.GenericError("Unable to set pause end time in the past")
308 logging.info("Received request to pause the watcher until %s", until)
310 return _SetWatcherPause(until)
313 logging.info("Received invalid request '%s'", method)
314 raise ValueError("Invalid operation '%s'" % method)
316 def _Query(self, op):
317 """Runs the specified opcode and returns the result.
320 # Queries don't have a job id
321 proc = mcpu.Processor(self.server.context, None)
323 # TODO: Executing an opcode using locks will acquire them in blocking mode.
324 # Consider using a timeout for retries.
325 return proc.ExecOpCode(op, None)
328 class GanetiContext(object):
329 """Context common to all ganeti threads.
331 This class creates and holds common objects shared by all threads.
334 # pylint: disable-msg=W0212
335 # we do want to ensure a singleton here
339 """Constructs a new GanetiContext object.
341 There should be only a GanetiContext object at any time, so this
342 function raises an error if this is not the case.
345 assert self.__class__._instance is None, "double GanetiContext instance"
347 # Create global configuration object
348 self.cfg = config.ConfigWriter()
351 self.glm = locking.GanetiLockManager(
352 self.cfg.GetNodeList(),
353 self.cfg.GetInstanceList())
356 self.jobqueue = jqueue.JobQueue(self)
358 # setting this also locks the class against attribute modifications
359 self.__class__._instance = self
361 def __setattr__(self, name, value):
362 """Setting GanetiContext attributes is forbidden after initialization.
365 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
366 object.__setattr__(self, name, value)
368 def AddNode(self, node, ec_id):
369 """Adds a node to the configuration and lock manager.
372 # Add it to the configuration
373 self.cfg.AddNode(node, ec_id)
375 # If preseeding fails it'll not be added
376 self.jobqueue.AddNode(node)
378 # Add the new node to the Ganeti Lock Manager
379 self.glm.add(locking.LEVEL_NODE, node.name)
381 def ReaddNode(self, node):
382 """Updates a node that's already in the configuration
385 # Synchronize the queue again
386 self.jobqueue.AddNode(node)
388 def RemoveNode(self, name):
389 """Removes a node from the configuration and lock manager.
392 # Remove node from configuration
393 self.cfg.RemoveNode(name)
396 self.jobqueue.RemoveNode(name)
398 # Remove the node from the Ganeti Lock Manager
399 self.glm.remove(locking.LEVEL_NODE, name)
402 def _SetWatcherPause(until):
403 """Creates or removes the watcher pause file.
405 @type until: None or int
406 @param until: Unix timestamp saying until when the watcher shouldn't run
410 utils.RemoveFile(constants.WATCHER_PAUSEFILE)
412 utils.WriteFile(constants.WATCHER_PAUSEFILE,
413 data="%d\n" % (until, ))
419 def CheckAgreement():
420 """Check the agreement on who is the master.
422 The function uses a very simple algorithm: we must get more positive
423 than negative answers. Since in most of the cases we are the master,
424 we'll use our own config file for getting the node list. In the
425 future we could collect the current node list from our (possibly
426 obsolete) known nodes.
428 In order to account for cold-start of all nodes, we retry for up to
429 a minute until we get a real answer as the top-voted one. If the
430 nodes are more out-of-sync, for now manual startup of the master
433 Note that for a even number of nodes cluster, we need at least half
434 of the nodes (beside ourselves) to vote for us. This creates a
435 problem on two-node clusters, since in this case we require the
436 other node to be up too to confirm our status.
439 myself = netutils.Hostname.GetSysName()
440 #temp instantiation of a config writer, used only to get the node list
441 cfg = config.ConfigWriter()
442 node_list = cfg.GetNodeList()
446 votes = bootstrap.GatherMasterVotes(node_list)
448 # empty node list, this is a one node cluster
450 if votes[0][0] is None:
456 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
457 " after multiple retries. Aborting startup")
458 logging.critical("Use the --no-voting option if you understand what"
459 " effects it has on the cluster state")
461 # here a real node is at the top of the list
462 all_votes = sum(item[1] for item in votes)
463 top_node, top_votes = votes[0]
466 if top_node != myself:
467 logging.critical("It seems we are not the master (top-voted node"
468 " is %s with %d out of %d votes)", top_node, top_votes,
470 elif top_votes < all_votes - top_votes:
471 logging.critical("It seems we are not the master (%d votes for,"
472 " %d votes against)", top_votes, all_votes - top_votes)
480 def ActivateMasterIP():
482 master_node = ssconf.SimpleStore().GetMasterNode()
483 result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
484 msg = result.fail_msg
486 logging.error("Can't activate master IP address: %s", msg)
489 def CheckMasterd(options, args):
490 """Initial checks whether to run or exit with a failure.
493 if args: # masterd doesn't take any arguments
494 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
495 sys.exit(constants.EXIT_FAILURE)
497 ssconf.CheckMaster(options.debug)
500 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
501 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
503 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
504 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
505 sys.exit(constants.EXIT_FAILURE)
507 # Check the configuration is sane before anything else
509 config.ConfigWriter()
510 except errors.ConfigVersionMismatch, err:
511 v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
512 v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
513 print >> sys.stderr, \
514 ("Configuration version mismatch. The current Ganeti software"
515 " expects version %s, but the on-disk configuration file has"
516 " version %s. This is likely the result of upgrading the"
517 " software without running the upgrade procedure. Please contact"
518 " your cluster administrator or complete the upgrade using the"
519 " cfgupgrade utility, after reading the upgrade notes." %
521 sys.exit(constants.EXIT_FAILURE)
522 except errors.ConfigurationError, err:
523 print >> sys.stderr, \
524 ("Configuration error while opening the configuration file: %s\n"
525 "This might be caused by an incomplete software upgrade or"
526 " by a corrupted configuration file. Until the problem is fixed"
527 " the master daemon cannot start." % str(err))
528 sys.exit(constants.EXIT_FAILURE)
530 # If CheckMaster didn't fail we believe we are the master, but we have to
531 # confirm with the other nodes.
532 if options.no_voting:
533 if options.yes_do_it:
536 sys.stdout.write("The 'no voting' option has been selected.\n")
537 sys.stdout.write("This is dangerous, please confirm by"
538 " typing uppercase 'yes': ")
541 confirmation = sys.stdin.readline().strip()
542 if confirmation != "YES":
543 print >> sys.stderr, "Aborting."
544 sys.exit(constants.EXIT_FAILURE)
548 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
549 # process before we call utils.Daemonize in the current process.
550 if not utils.RunInSeparateProcess(CheckAgreement):
551 sys.exit(constants.EXIT_FAILURE)
553 # ActivateMasterIP also uses RPC/threads, so we run it again via a
556 # TODO: decide whether failure to activate the master IP is a fatal error
557 utils.RunInSeparateProcess(ActivateMasterIP)
560 def PrepMasterd(options, _):
561 """Prep master daemon function, executed with the PID file held.
564 # This is safe to do as the pid file guarantees against
565 # concurrent execution.
566 utils.RemoveFile(constants.MASTER_SOCKET)
568 mainloop = daemon.Mainloop()
569 master = MasterServer(mainloop, constants.MASTER_SOCKET,
570 options.uid, options.gid)
571 return (mainloop, master)
574 def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
575 """Main master daemon function, executed with the PID file held.
578 (mainloop, master) = prep_data
586 master.server_cleanup()
590 utils.RemoveFile(constants.MASTER_SOCKET)
595 parser = OptionParser(description="Ganeti master daemon",
596 usage="%prog [-f] [-d]",
597 version="%%prog (ganeti) %s" %
598 constants.RELEASE_VERSION)
599 parser.add_option("--no-voting", dest="no_voting",
600 help="Do not check that the nodes agree on this node"
601 " being the master and start the daemon unconditionally",
602 default=False, action="store_true")
603 parser.add_option("--yes-do-it", dest="yes_do_it",
604 help="Override interactive check for --no-voting",
605 default=False, action="store_true")
606 daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
607 ExecMasterd, multithreaded=True)