4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
40 from optparse import OptionParser
42 from ganeti import config
43 from ganeti import constants
44 from ganeti import daemon
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import workerpool
54 from ganeti import rpc
55 from ganeti import bootstrap
56 from ganeti import serializer
59 CLIENT_REQUEST_WORKERS = 16
61 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
65 class ClientRequestWorker(workerpool.BaseWorker):
66 # pylint: disable-msg=W0221
67 def RunTask(self, server, request, client_address):
68 """Process the request.
70 This is copied from the code in ThreadingMixIn.
74 server.finish_request(request, client_address)
75 server.close_request(request)
76 except: # pylint: disable-msg=W0702
77 server.handle_error(request, client_address)
78 server.close_request(request)
81 class IOServer(SocketServer.UnixStreamServer):
84 This class takes care of initializing the other threads, setting
85 signal handlers (which are processed only in this thread), and doing
89 def __init__(self, address, rqhandler):
90 """IOServer constructor
92 @param address: the address to bind this IOServer to
93 @param rqhandler: RequestHandler type object
96 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
98 # We'll only start threads once we've forked.
100 self.request_workers = None
102 def setup_queue(self):
103 self.context = GanetiContext()
104 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
107 def process_request(self, request, client_address):
108 """Add task to workerpool to process request.
111 self.request_workers.AddTask(self, request, client_address)
113 @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114 def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115 """Handle one request at a time until told to quit."""
116 assert isinstance(signal_handlers, dict) and \
117 len(signal_handlers) > 0, \
118 "Broken SignalHandled decorator"
119 # Since we use SignalHandled only once, the resulting dict will map all
120 # signals to the same handler. We'll just use the first one.
121 sighandler = signal_handlers.values()[0]
122 while not sighandler.called:
123 self.handle_request()
125 def server_cleanup(self):
126 """Cleanup the server.
128 This involves shutting down the processor threads and the master
135 if self.request_workers:
136 self.request_workers.TerminateWorkers()
138 self.context.jobqueue.Shutdown()
141 class ClientRqHandler(SocketServer.BaseRequestHandler):
147 # pylint: disable-msg=W0201
148 # setup() is the api for initialising for this class
150 self._msgs = collections.deque()
151 self._ops = ClientOps(self.server)
155 msg = self.read_message()
157 logging.debug("client closed connection")
160 request = serializer.LoadJson(msg)
161 logging.debug("request: %s", request)
162 if not isinstance(request, dict):
163 logging.error("wrong request received: %s", msg)
166 method = request.get(luxi.KEY_METHOD, None)
167 args = request.get(luxi.KEY_ARGS, None)
168 if method is None or args is None:
169 logging.error("no method or args in request")
174 result = self._ops.handle_request(method, args)
176 except errors.GenericError, err:
178 result = errors.EncodeException(err)
180 logging.error("Unexpected exception", exc_info=True)
182 result = "Caught exception: %s" % str(err[1])
185 luxi.KEY_SUCCESS: success,
186 luxi.KEY_RESULT: result,
188 logging.debug("response: %s", response)
189 self.send_message(serializer.DumpJson(response))
191 def read_message(self):
192 while not self._msgs:
193 data = self.request.recv(self.READ_SIZE)
196 new_msgs = (self._buffer + data).split(self.EOM)
197 self._buffer = new_msgs.pop()
198 self._msgs.extend(new_msgs)
199 return self._msgs.popleft()
201 def send_message(self, msg):
202 #print "sending", msg
203 # TODO: sendall is not guaranteed to send everything
204 self.request.sendall(msg + self.EOM)
208 """Class holding high-level client operations."""
209 def __init__(self, server):
212 def handle_request(self, method, args): # pylint: disable-msg=R0911
213 queue = self.server.context.jobqueue
215 # TODO: Parameter validation
217 # TODO: Rewrite to not exit in each 'if/elif' branch
219 if method == luxi.REQ_SUBMIT_JOB:
220 logging.info("Received new job")
221 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222 return queue.SubmitJob(ops)
224 if method == luxi.REQ_SUBMIT_MANY_JOBS:
225 logging.info("Received multiple jobs")
228 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229 return queue.SubmitManyJobs(jobs)
231 elif method == luxi.REQ_CANCEL_JOB:
233 logging.info("Received job cancel request for %s", job_id)
234 return queue.CancelJob(job_id)
236 elif method == luxi.REQ_ARCHIVE_JOB:
238 logging.info("Received job archive request for %s", job_id)
239 return queue.ArchiveJob(job_id)
241 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242 (age, timeout) = args
243 logging.info("Received job autoarchive request for age %s, timeout %s",
245 return queue.AutoArchiveJobs(age, timeout)
247 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249 logging.info("Received job poll request for %s", job_id)
250 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251 prev_log_serial, timeout)
253 elif method == luxi.REQ_QUERY_JOBS:
254 (job_ids, fields) = args
255 if isinstance(job_ids, (tuple, list)) and job_ids:
256 msg = utils.CommaJoin(job_ids)
259 logging.info("Received job query request for %s", msg)
260 return queue.QueryJobs(job_ids, fields)
262 elif method == luxi.REQ_QUERY_INSTANCES:
263 (names, fields, use_locking) = args
264 logging.info("Received instance query request for %s", names)
266 raise errors.OpPrereqError("Sync queries are not allowed",
268 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269 use_locking=use_locking)
270 return self._Query(op)
272 elif method == luxi.REQ_QUERY_NODES:
273 (names, fields, use_locking) = args
274 logging.info("Received node query request for %s", names)
276 raise errors.OpPrereqError("Sync queries are not allowed",
278 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279 use_locking=use_locking)
280 return self._Query(op)
282 elif method == luxi.REQ_QUERY_EXPORTS:
283 nodes, use_locking = args
285 raise errors.OpPrereqError("Sync queries are not allowed",
287 logging.info("Received exports query request")
288 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289 return self._Query(op)
291 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
293 logging.info("Received config values query request for %s", fields)
294 op = opcodes.OpQueryConfigValues(output_fields=fields)
295 return self._Query(op)
297 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298 logging.info("Received cluster info query request")
299 op = opcodes.OpQueryClusterInfo()
300 return self._Query(op)
302 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
304 logging.info("Received queue drain flag change request to %s",
306 return queue.SetDrainFlag(drain_flag)
308 elif method == luxi.REQ_SET_WATCHER_PAUSE:
312 logging.info("Received request to no longer pause the watcher")
314 if not isinstance(until, (int, float)):
315 raise TypeError("Duration must be an integer or float")
317 if until < time.time():
318 raise errors.GenericError("Unable to set pause end time in the past")
320 logging.info("Received request to pause the watcher until %s", until)
322 return _SetWatcherPause(until)
325 logging.info("Received invalid request '%s'", method)
326 raise ValueError("Invalid operation '%s'" % method)
328 def _Query(self, op):
329 """Runs the specified opcode and returns the result.
332 # Queries don't have a job id
333 proc = mcpu.Processor(self.server.context, None)
334 return proc.ExecOpCode(op, None)
337 class GanetiContext(object):
338 """Context common to all ganeti threads.
340 This class creates and holds common objects shared by all threads.
343 # pylint: disable-msg=W0212
344 # we do want to ensure a singleton here
348 """Constructs a new GanetiContext object.
350 There should be only a GanetiContext object at any time, so this
351 function raises an error if this is not the case.
354 assert self.__class__._instance is None, "double GanetiContext instance"
356 # Create global configuration object
357 self.cfg = config.ConfigWriter()
360 self.glm = locking.GanetiLockManager(
361 self.cfg.GetNodeList(),
362 self.cfg.GetInstanceList())
365 self.jobqueue = jqueue.JobQueue(self)
367 # setting this also locks the class against attribute modifications
368 self.__class__._instance = self
370 def __setattr__(self, name, value):
371 """Setting GanetiContext attributes is forbidden after initialization.
374 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
375 object.__setattr__(self, name, value)
377 def AddNode(self, node, ec_id):
378 """Adds a node to the configuration and lock manager.
381 # Add it to the configuration
382 self.cfg.AddNode(node, ec_id)
384 # If preseeding fails it'll not be added
385 self.jobqueue.AddNode(node)
387 # Add the new node to the Ganeti Lock Manager
388 self.glm.add(locking.LEVEL_NODE, node.name)
390 def ReaddNode(self, node):
391 """Updates a node that's already in the configuration
394 # Synchronize the queue again
395 self.jobqueue.AddNode(node)
397 def RemoveNode(self, name):
398 """Removes a node from the configuration and lock manager.
401 # Remove node from configuration
402 self.cfg.RemoveNode(name)
405 self.jobqueue.RemoveNode(name)
407 # Remove the node from the Ganeti Lock Manager
408 self.glm.remove(locking.LEVEL_NODE, name)
411 def _SetWatcherPause(until):
412 """Creates or removes the watcher pause file.
414 @type until: None or int
415 @param until: Unix timestamp saying until when the watcher shouldn't run
419 utils.RemoveFile(constants.WATCHER_PAUSEFILE)
421 utils.WriteFile(constants.WATCHER_PAUSEFILE,
422 data="%d\n" % (until, ))
427 def CheckAgreement():
428 """Check the agreement on who is the master.
430 The function uses a very simple algorithm: we must get more positive
431 than negative answers. Since in most of the cases we are the master,
432 we'll use our own config file for getting the node list. In the
433 future we could collect the current node list from our (possibly
434 obsolete) known nodes.
436 In order to account for cold-start of all nodes, we retry for up to
437 a minute until we get a real answer as the top-voted one. If the
438 nodes are more out-of-sync, for now manual startup of the master
441 Note that for a even number of nodes cluster, we need at least half
442 of the nodes (beside ourselves) to vote for us. This creates a
443 problem on two-node clusters, since in this case we require the
444 other node to be up too to confirm our status.
447 myself = utils.HostInfo().name
448 #temp instantiation of a config writer, used only to get the node list
449 cfg = config.ConfigWriter()
450 node_list = cfg.GetNodeList()
454 votes = bootstrap.GatherMasterVotes(node_list)
456 # empty node list, this is a one node cluster
458 if votes[0][0] is None:
464 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
465 " after multiple retries. Aborting startup")
467 # here a real node is at the top of the list
468 all_votes = sum(item[1] for item in votes)
469 top_node, top_votes = votes[0]
472 if top_node != myself:
473 logging.critical("It seems we are not the master (top-voted node"
474 " is %s with %d out of %d votes)", top_node, top_votes,
476 elif top_votes < all_votes - top_votes:
477 logging.critical("It seems we are not the master (%d votes for,"
478 " %d votes against)", top_votes, all_votes - top_votes)
485 def CheckAgreementWithRpc():
488 return CheckAgreement()
493 def _RunInSeparateProcess(fn):
494 """Runs a function in a separate process.
496 Note: Only boolean return values are supported.
499 @param fn: Function to be called
508 result = int(bool(fn()))
509 assert result in (0, 1)
510 except: # pylint: disable-msg=W0702
511 logging.exception("Error while calling function in separate process")
512 # 0 and 1 are reserved for the return value
515 os._exit(result) # pylint: disable-msg=W0212
519 # Avoid zombies and check exit code
520 (_, status) = os.waitpid(pid, 0)
522 if os.WIFSIGNALED(status):
523 signum = os.WTERMSIG(status)
527 exitcode = os.WEXITSTATUS(status)
529 if not (exitcode in (0, 1) and signum is None):
530 logging.error("Child program failed (code=%s, signal=%s)",
532 sys.exit(constants.EXIT_FAILURE)
534 return bool(exitcode)
537 def CheckMasterd(options, args):
538 """Initial checks whether to run or exit with a failure.
541 if args: # masterd doesn't take any arguments
542 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543 sys.exit(constants.EXIT_FAILURE)
545 ssconf.CheckMaster(options.debug)
547 # If CheckMaster didn't fail we believe we are the master, but we have to
548 # confirm with the other nodes.
549 if options.no_voting:
550 if options.yes_do_it:
553 sys.stdout.write("The 'no voting' option has been selected.\n")
554 sys.stdout.write("This is dangerous, please confirm by"
555 " typing uppercase 'yes': ")
558 confirmation = sys.stdin.readline().strip()
559 if confirmation != "YES":
560 print >> sys.stderr, "Aborting."
561 sys.exit(constants.EXIT_FAILURE)
565 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
566 # process before we call utils.Daemonize in the current process.
567 if not _RunInSeparateProcess(CheckAgreementWithRpc):
568 sys.exit(constants.EXIT_FAILURE)
571 def ExecMasterd (options, args): # pylint: disable-msg=W0613
572 """Main master daemon function, executed with the PID file held.
575 # This is safe to do as the pid file guarantees against
576 # concurrent execution.
577 utils.RemoveFile(constants.MASTER_SOCKET)
579 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
584 master_node = ssconf.SimpleStore().GetMasterNode()
585 result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
586 msg = result.fail_msg
588 logging.error("Can't activate master IP address: %s", msg)
592 master.serve_forever()
594 master.server_cleanup()
598 utils.RemoveFile(constants.MASTER_SOCKET)
603 parser = OptionParser(description="Ganeti master daemon",
604 usage="%prog [-f] [-d]",
605 version="%%prog (ganeti) %s" %
606 constants.RELEASE_VERSION)
607 parser.add_option("--no-voting", dest="no_voting",
608 help="Do not check that the nodes agree on this node"
609 " being the master and start the daemon unconditionally",
610 default=False, action="store_true")
611 parser.add_option("--yes-do-it", dest="yes_do_it",
612 help="Override interactive check for --no-voting",
613 default=False, action="store_true")
614 dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
615 (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
617 daemon.GenericMain(constants.MASTERD, parser, dirs,
618 CheckMasterd, ExecMasterd)
621 if __name__ == "__main__":