4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
40 from optparse import OptionParser
42 from ganeti import config
43 from ganeti import constants
44 from ganeti import daemon
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import workerpool
54 from ganeti import rpc
55 from ganeti import bootstrap
56 from ganeti import serializer
59 CLIENT_REQUEST_WORKERS = 16
61 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
65 class ClientRequestWorker(workerpool.BaseWorker):
66 # pylint: disable-msg=W0221
67 def RunTask(self, server, request, client_address):
68 """Process the request.
70 This is copied from the code in ThreadingMixIn.
74 server.finish_request(request, client_address)
75 server.close_request(request)
76 except: # pylint: disable-msg=W0702
77 server.handle_error(request, client_address)
78 server.close_request(request)
81 class IOServer(SocketServer.UnixStreamServer):
84 This class takes care of initializing the other threads, setting
85 signal handlers (which are processed only in this thread), and doing
89 def __init__(self, address, rqhandler):
90 """IOServer constructor
92 @param address: the address to bind this IOServer to
93 @param rqhandler: RequestHandler type object
96 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
98 # We'll only start threads once we've forked.
100 self.request_workers = None
102 def setup_queue(self):
103 self.context = GanetiContext()
104 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
107 def process_request(self, request, client_address):
108 """Add task to workerpool to process request.
111 self.request_workers.AddTask(self, request, client_address)
113 @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114 def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115 """Handle one request at a time until told to quit."""
116 assert isinstance(signal_handlers, dict) and \
117 len(signal_handlers) > 0, \
118 "Broken SignalHandled decorator"
119 # Since we use SignalHandled only once, the resulting dict will map all
120 # signals to the same handler. We'll just use the first one.
121 sighandler = signal_handlers.values()[0]
122 while not sighandler.called:
123 self.handle_request()
125 def server_cleanup(self):
126 """Cleanup the server.
128 This involves shutting down the processor threads and the master
135 if self.request_workers:
136 self.request_workers.TerminateWorkers()
138 self.context.jobqueue.Shutdown()
141 class ClientRqHandler(SocketServer.BaseRequestHandler):
147 # pylint: disable-msg=W0201
148 # setup() is the api for initialising for this class
150 self._msgs = collections.deque()
151 self._ops = ClientOps(self.server)
155 msg = self.read_message()
157 logging.debug("client closed connection")
160 request = serializer.LoadJson(msg)
161 logging.debug("request: %s", request)
162 if not isinstance(request, dict):
163 logging.error("wrong request received: %s", msg)
166 method = request.get(luxi.KEY_METHOD, None)
167 args = request.get(luxi.KEY_ARGS, None)
168 if method is None or args is None:
169 logging.error("no method or args in request")
174 result = self._ops.handle_request(method, args)
176 except errors.GenericError, err:
178 result = errors.EncodeException(err)
180 logging.error("Unexpected exception", exc_info=True)
182 result = "Caught exception: %s" % str(err[1])
185 luxi.KEY_SUCCESS: success,
186 luxi.KEY_RESULT: result,
188 logging.debug("response: %s", response)
189 self.send_message(serializer.DumpJson(response))
191 def read_message(self):
192 while not self._msgs:
193 data = self.request.recv(self.READ_SIZE)
196 new_msgs = (self._buffer + data).split(self.EOM)
197 self._buffer = new_msgs.pop()
198 self._msgs.extend(new_msgs)
199 return self._msgs.popleft()
201 def send_message(self, msg):
202 #print "sending", msg
203 # TODO: sendall is not guaranteed to send everything
204 self.request.sendall(msg + self.EOM)
208 """Class holding high-level client operations."""
209 def __init__(self, server):
212 def handle_request(self, method, args): # pylint: disable-msg=R0911
213 queue = self.server.context.jobqueue
215 # TODO: Parameter validation
217 # TODO: Rewrite to not exit in each 'if/elif' branch
219 if method == luxi.REQ_SUBMIT_JOB:
220 logging.info("Received new job")
221 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222 return queue.SubmitJob(ops)
224 if method == luxi.REQ_SUBMIT_MANY_JOBS:
225 logging.info("Received multiple jobs")
228 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229 return queue.SubmitManyJobs(jobs)
231 elif method == luxi.REQ_CANCEL_JOB:
233 logging.info("Received job cancel request for %s", job_id)
234 return queue.CancelJob(job_id)
236 elif method == luxi.REQ_ARCHIVE_JOB:
238 logging.info("Received job archive request for %s", job_id)
239 return queue.ArchiveJob(job_id)
241 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242 (age, timeout) = args
243 logging.info("Received job autoarchive request for age %s, timeout %s",
245 return queue.AutoArchiveJobs(age, timeout)
247 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249 logging.info("Received job poll request for %s", job_id)
250 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251 prev_log_serial, timeout)
253 elif method == luxi.REQ_QUERY_JOBS:
254 (job_ids, fields) = args
255 if isinstance(job_ids, (tuple, list)) and job_ids:
256 msg = utils.CommaJoin(job_ids)
259 logging.info("Received job query request for %s", msg)
260 return queue.QueryJobs(job_ids, fields)
262 elif method == luxi.REQ_QUERY_INSTANCES:
263 (names, fields, use_locking) = args
264 logging.info("Received instance query request for %s", names)
266 raise errors.OpPrereqError("Sync queries are not allowed",
268 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269 use_locking=use_locking)
270 return self._Query(op)
272 elif method == luxi.REQ_QUERY_NODES:
273 (names, fields, use_locking) = args
274 logging.info("Received node query request for %s", names)
276 raise errors.OpPrereqError("Sync queries are not allowed",
278 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279 use_locking=use_locking)
280 return self._Query(op)
282 elif method == luxi.REQ_QUERY_EXPORTS:
283 nodes, use_locking = args
285 raise errors.OpPrereqError("Sync queries are not allowed",
287 logging.info("Received exports query request")
288 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289 return self._Query(op)
291 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
293 logging.info("Received config values query request for %s", fields)
294 op = opcodes.OpQueryConfigValues(output_fields=fields)
295 return self._Query(op)
297 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298 logging.info("Received cluster info query request")
299 op = opcodes.OpQueryClusterInfo()
300 return self._Query(op)
302 elif method == luxi.REQ_QUERY_TAGS:
304 logging.info("Received tags query request")
305 op = opcodes.OpGetTags(kind=kind, name=name)
306 return self._Query(op)
308 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
310 logging.info("Received queue drain flag change request to %s",
312 return queue.SetDrainFlag(drain_flag)
314 elif method == luxi.REQ_SET_WATCHER_PAUSE:
318 logging.info("Received request to no longer pause the watcher")
320 if not isinstance(until, (int, float)):
321 raise TypeError("Duration must be an integer or float")
323 if until < time.time():
324 raise errors.GenericError("Unable to set pause end time in the past")
326 logging.info("Received request to pause the watcher until %s", until)
328 return _SetWatcherPause(until)
331 logging.info("Received invalid request '%s'", method)
332 raise ValueError("Invalid operation '%s'" % method)
334 def _Query(self, op):
335 """Runs the specified opcode and returns the result.
338 # Queries don't have a job id
339 proc = mcpu.Processor(self.server.context, None)
340 return proc.ExecOpCode(op, None)
343 class GanetiContext(object):
344 """Context common to all ganeti threads.
346 This class creates and holds common objects shared by all threads.
349 # pylint: disable-msg=W0212
350 # we do want to ensure a singleton here
354 """Constructs a new GanetiContext object.
356 There should be only a GanetiContext object at any time, so this
357 function raises an error if this is not the case.
360 assert self.__class__._instance is None, "double GanetiContext instance"
362 # Create global configuration object
363 self.cfg = config.ConfigWriter()
366 self.glm = locking.GanetiLockManager(
367 self.cfg.GetNodeList(),
368 self.cfg.GetInstanceList())
371 self.jobqueue = jqueue.JobQueue(self)
373 # setting this also locks the class against attribute modifications
374 self.__class__._instance = self
376 def __setattr__(self, name, value):
377 """Setting GanetiContext attributes is forbidden after initialization.
380 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
381 object.__setattr__(self, name, value)
383 def AddNode(self, node, ec_id):
384 """Adds a node to the configuration and lock manager.
387 # Add it to the configuration
388 self.cfg.AddNode(node, ec_id)
390 # If preseeding fails it'll not be added
391 self.jobqueue.AddNode(node)
393 # Add the new node to the Ganeti Lock Manager
394 self.glm.add(locking.LEVEL_NODE, node.name)
396 def ReaddNode(self, node):
397 """Updates a node that's already in the configuration
400 # Synchronize the queue again
401 self.jobqueue.AddNode(node)
403 def RemoveNode(self, name):
404 """Removes a node from the configuration and lock manager.
407 # Remove node from configuration
408 self.cfg.RemoveNode(name)
411 self.jobqueue.RemoveNode(name)
413 # Remove the node from the Ganeti Lock Manager
414 self.glm.remove(locking.LEVEL_NODE, name)
417 def _SetWatcherPause(until):
418 """Creates or removes the watcher pause file.
420 @type until: None or int
421 @param until: Unix timestamp saying until when the watcher shouldn't run
425 utils.RemoveFile(constants.WATCHER_PAUSEFILE)
427 utils.WriteFile(constants.WATCHER_PAUSEFILE,
428 data="%d\n" % (until, ))
433 def CheckAgreement():
434 """Check the agreement on who is the master.
436 The function uses a very simple algorithm: we must get more positive
437 than negative answers. Since in most of the cases we are the master,
438 we'll use our own config file for getting the node list. In the
439 future we could collect the current node list from our (possibly
440 obsolete) known nodes.
442 In order to account for cold-start of all nodes, we retry for up to
443 a minute until we get a real answer as the top-voted one. If the
444 nodes are more out-of-sync, for now manual startup of the master
447 Note that for a even number of nodes cluster, we need at least half
448 of the nodes (beside ourselves) to vote for us. This creates a
449 problem on two-node clusters, since in this case we require the
450 other node to be up too to confirm our status.
453 myself = utils.HostInfo().name
454 #temp instantiation of a config writer, used only to get the node list
455 cfg = config.ConfigWriter()
456 node_list = cfg.GetNodeList()
460 votes = bootstrap.GatherMasterVotes(node_list)
462 # empty node list, this is a one node cluster
464 if votes[0][0] is None:
470 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
471 " after multiple retries. Aborting startup")
473 # here a real node is at the top of the list
474 all_votes = sum(item[1] for item in votes)
475 top_node, top_votes = votes[0]
478 if top_node != myself:
479 logging.critical("It seems we are not the master (top-voted node"
480 " is %s with %d out of %d votes)", top_node, top_votes,
482 elif top_votes < all_votes - top_votes:
483 logging.critical("It seems we are not the master (%d votes for,"
484 " %d votes against)", top_votes, all_votes - top_votes)
491 def CheckAgreementWithRpc():
494 return CheckAgreement()
499 def _RunInSeparateProcess(fn):
500 """Runs a function in a separate process.
502 Note: Only boolean return values are supported.
505 @param fn: Function to be called
514 result = int(bool(fn()))
515 assert result in (0, 1)
516 except: # pylint: disable-msg=W0702
517 logging.exception("Error while calling function in separate process")
518 # 0 and 1 are reserved for the return value
521 os._exit(result) # pylint: disable-msg=W0212
525 # Avoid zombies and check exit code
526 (_, status) = os.waitpid(pid, 0)
528 if os.WIFSIGNALED(status):
529 signum = os.WTERMSIG(status)
533 exitcode = os.WEXITSTATUS(status)
535 if not (exitcode in (0, 1) and signum is None):
536 logging.error("Child program failed (code=%s, signal=%s)",
538 sys.exit(constants.EXIT_FAILURE)
540 return bool(exitcode)
543 def CheckMasterd(options, args):
544 """Initial checks whether to run or exit with a failure.
547 if args: # masterd doesn't take any arguments
548 print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
549 sys.exit(constants.EXIT_FAILURE)
551 ssconf.CheckMaster(options.debug)
553 # If CheckMaster didn't fail we believe we are the master, but we have to
554 # confirm with the other nodes.
555 if options.no_voting:
556 if options.yes_do_it:
559 sys.stdout.write("The 'no voting' option has been selected.\n")
560 sys.stdout.write("This is dangerous, please confirm by"
561 " typing uppercase 'yes': ")
564 confirmation = sys.stdin.readline().strip()
565 if confirmation != "YES":
566 print >> sys.stderr, "Aborting."
567 sys.exit(constants.EXIT_FAILURE)
571 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
572 # process before we call utils.Daemonize in the current process.
573 if not _RunInSeparateProcess(CheckAgreementWithRpc):
574 sys.exit(constants.EXIT_FAILURE)
577 def ExecMasterd (options, args): # pylint: disable-msg=W0613
578 """Main master daemon function, executed with the PID file held.
581 # This is safe to do as the pid file guarantees against
582 # concurrent execution.
583 utils.RemoveFile(constants.MASTER_SOCKET)
585 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
590 master_node = ssconf.SimpleStore().GetMasterNode()
591 result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
592 msg = result.fail_msg
594 logging.error("Can't activate master IP address: %s", msg)
598 master.serve_forever()
600 master.server_cleanup()
604 utils.RemoveFile(constants.MASTER_SOCKET)
609 parser = OptionParser(description="Ganeti master daemon",
610 usage="%prog [-f] [-d]",
611 version="%%prog (ganeti) %s" %
612 constants.RELEASE_VERSION)
613 parser.add_option("--no-voting", dest="no_voting",
614 help="Do not check that the nodes agree on this node"
615 " being the master and start the daemon unconditionally",
616 default=False, action="store_true")
617 parser.add_option("--yes-do-it", dest="yes_do_it",
618 help="Override interactive check for --no-voting",
619 default=False, action="store_true")
620 dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
621 (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
623 daemon.GenericMain(constants.MASTERD, parser, dirs,
624 CheckMasterd, ExecMasterd)
627 if __name__ == "__main__":