4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
60 CLIENT_REQUEST_WORKERS = 16
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66 class ClientRequestWorker(workerpool.BaseWorker):
67 # pylint: disable-msg=W0221
68 def RunTask(self, server, message, client):
69 """Process the request.
72 client_ops = ClientOps(server)
75 (method, args) = luxi.ParseRequest(message)
76 except luxi.ProtocolError, err:
77 logging.error("Protocol Error: %s", err)
83 result = client_ops.handle_request(method, args)
85 except errors.GenericError, err:
86 logging.exception("Unexpected exception")
88 result = errors.EncodeException(err)
90 logging.exception("Unexpected exception")
92 result = "Caught exception: %s" % str(err[1])
95 reply = luxi.FormatResponse(success, result)
96 client.send_message(reply)
97 # awake the main thread so that it can write out the data.
98 server.awaker.signal()
99 except: # pylint: disable-msg=W0702
100 logging.exception("Send error")
104 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
105 """Handler for master peers.
109 def __init__(self, server, connected_socket, client_address, family):
110 daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
113 family, self._MAX_UNHANDLED)
116 def handle_message(self, message, _):
117 self.server.request_workers.AddTask((self.server, message, self))
120 class MasterServer(daemon.AsyncStreamServer):
123 This is the main asynchronous master server. It handles connections to the
127 family = socket.AF_UNIX
129 def __init__(self, mainloop, address, uid, gid):
130 """MasterServer constructor
132 @type mainloop: ganeti.daemon.Mainloop
133 @param mainloop: Mainloop used to poll for I/O events
134 @param address: the unix socket address to bind the MasterServer to
135 @param uid: The uid of the owner of the socket
136 @param gid: The gid of the owner of the socket
139 temp_name = tempfile.mktemp(dir=os.path.dirname(address))
140 daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
141 os.chmod(temp_name, 0770)
142 os.chown(temp_name, uid, gid)
143 os.rename(temp_name, address)
145 self.mainloop = mainloop
146 self.awaker = daemon.AsyncAwaker()
148 # We'll only start threads once we've forked.
150 self.request_workers = None
152 def handle_connection(self, connected_socket, client_address):
153 # TODO: add connection count and limit the number of open connections to a
154 # maximum number to avoid breaking for lack of file descriptors or memory.
155 MasterClientHandler(self, connected_socket, client_address, self.family)
157 def setup_queue(self):
158 self.context = GanetiContext()
159 self.request_workers = workerpool.WorkerPool("ClientReq",
160 CLIENT_REQUEST_WORKERS,
163 def server_cleanup(self):
164 """Cleanup the server.
166 This involves shutting down the processor threads and the master
173 if self.request_workers:
174 self.request_workers.TerminateWorkers()
176 self.context.jobqueue.Shutdown()
180 """Class holding high-level client operations."""
181 def __init__(self, server):
184 def handle_request(self, method, args): # pylint: disable-msg=R0911
185 queue = self.server.context.jobqueue
187 # TODO: Parameter validation
189 # TODO: Rewrite to not exit in each 'if/elif' branch
191 if method == luxi.REQ_SUBMIT_JOB:
192 logging.info("Received new job")
193 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
194 return queue.SubmitJob(ops)
196 if method == luxi.REQ_SUBMIT_MANY_JOBS:
197 logging.info("Received multiple jobs")
200 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
201 return queue.SubmitManyJobs(jobs)
203 elif method == luxi.REQ_CANCEL_JOB:
205 logging.info("Received job cancel request for %s", job_id)
206 return queue.CancelJob(job_id)
208 elif method == luxi.REQ_ARCHIVE_JOB:
210 logging.info("Received job archive request for %s", job_id)
211 return queue.ArchiveJob(job_id)
213 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
214 (age, timeout) = args
215 logging.info("Received job autoarchive request for age %s, timeout %s",
217 return queue.AutoArchiveJobs(age, timeout)
219 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
221 logging.info("Received job poll request for %s", job_id)
222 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
223 prev_log_serial, timeout)
225 elif method == luxi.REQ_QUERY_JOBS:
226 (job_ids, fields) = args
227 if isinstance(job_ids, (tuple, list)) and job_ids:
228 msg = utils.CommaJoin(job_ids)
231 logging.info("Received job query request for %s", msg)
232 return queue.QueryJobs(job_ids, fields)
234 elif method == luxi.REQ_QUERY_INSTANCES:
235 (names, fields, use_locking) = args
236 logging.info("Received instance query request for %s", names)
238 raise errors.OpPrereqError("Sync queries are not allowed",
240 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
241 use_locking=use_locking)
242 return self._Query(op)
244 elif method == luxi.REQ_QUERY_NODES:
245 (names, fields, use_locking) = args
246 logging.info("Received node query request for %s", names)
248 raise errors.OpPrereqError("Sync queries are not allowed",
250 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
251 use_locking=use_locking)
252 return self._Query(op)
254 elif method == luxi.REQ_QUERY_EXPORTS:
255 nodes, use_locking = args
257 raise errors.OpPrereqError("Sync queries are not allowed",
259 logging.info("Received exports query request")
260 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
261 return self._Query(op)
263 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
265 logging.info("Received config values query request for %s", fields)
266 op = opcodes.OpQueryConfigValues(output_fields=fields)
267 return self._Query(op)
269 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
270 logging.info("Received cluster info query request")
271 op = opcodes.OpQueryClusterInfo()
272 return self._Query(op)
274 elif method == luxi.REQ_QUERY_TAGS:
276 logging.info("Received tags query request")
277 op = opcodes.OpGetTags(kind=kind, name=name)
278 return self._Query(op)
280 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
282 logging.info("Received queue drain flag change request to %s",
284 return queue.SetDrainFlag(drain_flag)
286 elif method == luxi.REQ_SET_WATCHER_PAUSE:
290 logging.info("Received request to no longer pause the watcher")
292 if not isinstance(until, (int, float)):
293 raise TypeError("Duration must be an integer or float")
295 if until < time.time():
296 raise errors.GenericError("Unable to set pause end time in the past")
298 logging.info("Received request to pause the watcher until %s", until)
300 return _SetWatcherPause(until)
303 logging.info("Received invalid request '%s'", method)
304 raise ValueError("Invalid operation '%s'" % method)
306 def _Query(self, op):
307 """Runs the specified opcode and returns the result.
310 # Queries don't have a job id
311 proc = mcpu.Processor(self.server.context, None)
312 return proc.ExecOpCode(op, None)
315 class GanetiContext(object):
316 """Context common to all ganeti threads.
318 This class creates and holds common objects shared by all threads.
321 # pylint: disable-msg=W0212
322 # we do want to ensure a singleton here
326 """Constructs a new GanetiContext object.
328 There should be only a GanetiContext object at any time, so this
329 function raises an error if this is not the case.
332 assert self.__class__._instance is None, "double GanetiContext instance"
334 # Create global configuration object
335 self.cfg = config.ConfigWriter()
338 self.glm = locking.GanetiLockManager(
339 self.cfg.GetNodeList(),
340 self.cfg.GetInstanceList())
343 self.jobqueue = jqueue.JobQueue(self)
345 # setting this also locks the class against attribute modifications
346 self.__class__._instance = self
348 def __setattr__(self, name, value):
349 """Setting GanetiContext attributes is forbidden after initialization.
352 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
353 object.__setattr__(self, name, value)
355 def AddNode(self, node, ec_id):
356 """Adds a node to the configuration and lock manager.
359 # Add it to the configuration
360 self.cfg.AddNode(node, ec_id)
362 # If preseeding fails it'll not be added
363 self.jobqueue.AddNode(node)
365 # Add the new node to the Ganeti Lock Manager
366 self.glm.add(locking.LEVEL_NODE, node.name)
368 def ReaddNode(self, node):
369 """Updates a node that's already in the configuration
372 # Synchronize the queue again
373 self.jobqueue.AddNode(node)
375 def RemoveNode(self, name):
376 """Removes a node from the configuration and lock manager.
379 # Remove node from configuration
380 self.cfg.RemoveNode(name)
383 self.jobqueue.RemoveNode(name)
385 # Remove the node from the Ganeti Lock Manager
386 self.glm.remove(locking.LEVEL_NODE, name)
389 def _SetWatcherPause(until):
390 """Creates or removes the watcher pause file.
392 @type until: None or int
393 @param until: Unix timestamp saying until when the watcher shouldn't run
397 utils.RemoveFile(constants.WATCHER_PAUSEFILE)
399 utils.WriteFile(constants.WATCHER_PAUSEFILE,
400 data="%d\n" % (until, ))
406 def CheckAgreement():
407 """Check the agreement on who is the master.
409 The function uses a very simple algorithm: we must get more positive
410 than negative answers. Since in most of the cases we are the master,
411 we'll use our own config file for getting the node list. In the
412 future we could collect the current node list from our (possibly
413 obsolete) known nodes.
415 In order to account for cold-start of all nodes, we retry for up to
416 a minute until we get a real answer as the top-voted one. If the
417 nodes are more out-of-sync, for now manual startup of the master
420 Note that for a even number of nodes cluster, we need at least half
421 of the nodes (beside ourselves) to vote for us. This creates a
422 problem on two-node clusters, since in this case we require the
423 other node to be up too to confirm our status.
426 myself = netutils.Hostname.GetSysName()
427 #temp instantiation of a config writer, used only to get the node list
428 cfg = config.ConfigWriter()
429 node_list = cfg.GetNodeList()
433 votes = bootstrap.GatherMasterVotes(node_list)
435 # empty node list, this is a one node cluster
437 if votes[0][0] is None:
443 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
444 " after multiple retries. Aborting startup")
445 logging.critical("Use the --no-voting option if you understand what"
446 " effects it has on the cluster state")
448 # here a real node is at the top of the list
449 all_votes = sum(item[1] for item in votes)
450 top_node, top_votes = votes[0]
453 if top_node != myself:
454 logging.critical("It seems we are not the master (top-voted node"
455 " is %s with %d out of %d votes)", top_node, top_votes,
457 elif top_votes < all_votes - top_votes:
458 logging.critical("It seems we are not the master (%d votes for,"
459 " %d votes against)", top_votes, all_votes - top_votes)
467 def ActivateMasterIP():
469 master_node = ssconf.SimpleStore().GetMasterNode()
470 result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
471 msg = result.fail_msg
473 logging.error("Can't activate master IP address: %s", msg)
476 def CheckMasterd(options, args):
477 """Initial checks whether to run or exit with a failure.
480 if args: # masterd doesn't take any arguments
481 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
482 sys.exit(constants.EXIT_FAILURE)
484 ssconf.CheckMaster(options.debug)
487 options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
488 options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
490 print >> sys.stderr, ("User or group not existing on system: %s:%s" %
491 (constants.MASTERD_USER, constants.DAEMONS_GROUP))
492 sys.exit(constants.EXIT_FAILURE)
495 # If CheckMaster didn't fail we believe we are the master, but we have to
496 # confirm with the other nodes.
497 if options.no_voting:
498 if options.yes_do_it:
501 sys.stdout.write("The 'no voting' option has been selected.\n")
502 sys.stdout.write("This is dangerous, please confirm by"
503 " typing uppercase 'yes': ")
506 confirmation = sys.stdin.readline().strip()
507 if confirmation != "YES":
508 print >> sys.stderr, "Aborting."
509 sys.exit(constants.EXIT_FAILURE)
513 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
514 # process before we call utils.Daemonize in the current process.
515 if not utils.RunInSeparateProcess(CheckAgreement):
516 sys.exit(constants.EXIT_FAILURE)
518 # ActivateMasterIP also uses RPC/threads, so we run it again via a
521 # TODO: decide whether failure to activate the master IP is a fatal error
522 utils.RunInSeparateProcess(ActivateMasterIP)
525 def ExecMasterd(options, args): # pylint: disable-msg=W0613
526 """Main master daemon function, executed with the PID file held.
529 # This is safe to do as the pid file guarantees against
530 # concurrent execution.
531 utils.RemoveFile(constants.MASTER_SOCKET)
533 mainloop = daemon.Mainloop()
534 master = MasterServer(mainloop, constants.MASTER_SOCKET,
535 options.uid, options.gid)
543 master.server_cleanup()
547 utils.RemoveFile(constants.MASTER_SOCKET)
552 parser = OptionParser(description="Ganeti master daemon",
553 usage="%prog [-f] [-d]",
554 version="%%prog (ganeti) %s" %
555 constants.RELEASE_VERSION)
556 parser.add_option("--no-voting", dest="no_voting",
557 help="Do not check that the nodes agree on this node"
558 " being the master and start the daemon unconditionally",
559 default=False, action="store_true")
560 parser.add_option("--yes-do-it", dest="yes_do_it",
561 help="Override interactive check for --no-voting",
562 default=False, action="store_true")
563 dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
564 (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
566 daemon.GenericMain(constants.MASTERD, parser, dirs,
567 CheckMasterd, ExecMasterd,
571 if __name__ == "__main__":