4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
38 from cStringIO import StringIO
39 from optparse import OptionParser
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import daemon
44 from ganeti import mcpu
45 from ganeti import opcodes
46 from ganeti import jqueue
47 from ganeti import locking
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
52 from ganeti import workerpool
53 from ganeti import rpc
54 from ganeti import bootstrap
55 from ganeti import serializer
58 CLIENT_REQUEST_WORKERS = 16
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64 class ClientRequestWorker(workerpool.BaseWorker):
65 def RunTask(self, server, request, client_address):
66 """Process the request.
68 This is copied from the code in ThreadingMixIn.
72 server.finish_request(request, client_address)
73 server.close_request(request)
75 server.handle_error(request, client_address)
76 server.close_request(request)
79 class IOServer(SocketServer.UnixStreamServer):
82 This class takes care of initializing the other threads, setting
83 signal handlers (which are processed only in this thread), and doing
87 def __init__(self, address, rqhandler):
88 """IOServer constructor
90 @param address: the address to bind this IOServer to
91 @param rqhandler: RequestHandler type object
94 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96 # We'll only start threads once we've forked.
98 self.request_workers = None
100 def setup_queue(self):
101 self.context = GanetiContext()
102 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105 def process_request(self, request, client_address):
106 """Add task to workerpool to process request.
109 self.request_workers.AddTask(self, request, client_address)
111 @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
112 def serve_forever(self, signal_handlers=None):
113 """Handle one request at a time until told to quit."""
114 assert isinstance(signal_handlers, dict) and \
115 len(signal_handlers) > 0, \
116 "Broken SignalHandled decorator"
117 # Since we use SignalHandled only once, the resulting dict will map all
118 # signals to the same handler. We'll just use the first one.
119 sighandler = signal_handlers.values()[0]
120 while not sighandler.called:
121 self.handle_request()
123 def server_cleanup(self):
124 """Cleanup the server.
126 This involves shutting down the processor threads and the master
133 if self.request_workers:
134 self.request_workers.TerminateWorkers()
136 self.context.jobqueue.Shutdown()
139 class ClientRqHandler(SocketServer.BaseRequestHandler):
146 self._msgs = collections.deque()
147 self._ops = ClientOps(self.server)
151 msg = self.read_message()
153 logging.debug("client closed connection")
156 request = serializer.LoadJson(msg)
157 logging.debug("request: %s", request)
158 if not isinstance(request, dict):
159 logging.error("wrong request received: %s", msg)
162 method = request.get(luxi.KEY_METHOD, None)
163 args = request.get(luxi.KEY_ARGS, None)
164 if method is None or args is None:
165 logging.error("no method or args in request")
170 result = self._ops.handle_request(method, args)
172 except errors.GenericError, err:
174 result = errors.EncodeException(err)
176 logging.error("Unexpected exception", exc_info=True)
178 result = "Caught exception: %s" % str(err[1])
181 luxi.KEY_SUCCESS: success,
182 luxi.KEY_RESULT: result,
184 logging.debug("response: %s", response)
185 self.send_message(serializer.DumpJson(response))
187 def read_message(self):
188 while not self._msgs:
189 data = self.request.recv(self.READ_SIZE)
192 new_msgs = (self._buffer + data).split(self.EOM)
193 self._buffer = new_msgs.pop()
194 self._msgs.extend(new_msgs)
195 return self._msgs.popleft()
197 def send_message(self, msg):
198 #print "sending", msg
199 # TODO: sendall is not guaranteed to send everything
200 self.request.sendall(msg + self.EOM)
204 """Class holding high-level client operations."""
205 def __init__(self, server):
208 def handle_request(self, method, args):
209 queue = self.server.context.jobqueue
211 # TODO: Parameter validation
213 if method == luxi.REQ_SUBMIT_JOB:
214 logging.info("Received new job")
215 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
216 return queue.SubmitJob(ops)
218 if method == luxi.REQ_SUBMIT_MANY_JOBS:
219 logging.info("Received multiple jobs")
222 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
223 return queue.SubmitManyJobs(jobs)
225 elif method == luxi.REQ_CANCEL_JOB:
227 logging.info("Received job cancel request for %s", job_id)
228 return queue.CancelJob(job_id)
230 elif method == luxi.REQ_ARCHIVE_JOB:
232 logging.info("Received job archive request for %s", job_id)
233 return queue.ArchiveJob(job_id)
235 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
236 (age, timeout) = args
237 logging.info("Received job autoarchive request for age %s, timeout %s",
239 return queue.AutoArchiveJobs(age, timeout)
241 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
242 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
243 logging.info("Received job poll request for %s", job_id)
244 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
245 prev_log_serial, timeout)
247 elif method == luxi.REQ_QUERY_JOBS:
248 (job_ids, fields) = args
249 if isinstance(job_ids, (tuple, list)) and job_ids:
250 msg = ", ".join(job_ids)
253 logging.info("Received job query request for %s", msg)
254 return queue.QueryJobs(job_ids, fields)
256 elif method == luxi.REQ_QUERY_INSTANCES:
257 (names, fields, use_locking) = args
258 logging.info("Received instance query request for %s", names)
260 raise errors.OpPrereqError("Sync queries are not allowed")
261 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262 use_locking=use_locking)
263 return self._Query(op)
265 elif method == luxi.REQ_QUERY_NODES:
266 (names, fields, use_locking) = args
267 logging.info("Received node query request for %s", names)
269 raise errors.OpPrereqError("Sync queries are not allowed")
270 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
271 use_locking=use_locking)
272 return self._Query(op)
274 elif method == luxi.REQ_QUERY_EXPORTS:
275 nodes, use_locking = args
277 raise errors.OpPrereqError("Sync queries are not allowed")
278 logging.info("Received exports query request")
279 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
280 return self._Query(op)
282 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
284 logging.info("Received config values query request for %s", fields)
285 op = opcodes.OpQueryConfigValues(output_fields=fields)
286 return self._Query(op)
288 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
289 logging.info("Received cluster info query request")
290 op = opcodes.OpQueryClusterInfo()
291 return self._Query(op)
293 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
295 logging.info("Received queue drain flag change request to %s",
297 return queue.SetDrainFlag(drain_flag)
299 elif method == luxi.REQ_SET_WATCHER_PAUSE:
303 logging.info("Received request to no longer pause the watcher")
305 if not isinstance(until, (int, float)):
306 raise TypeError("Duration must be an integer or float")
308 if until < time.time():
309 raise errors.GenericError("Unable to set pause end time in the past")
311 logging.info("Received request to pause the watcher until %s", until)
313 return _SetWatcherPause(until)
316 logging.info("Received invalid request '%s'", method)
317 raise ValueError("Invalid operation '%s'" % method)
319 def _DummyLog(self, *args):
322 def _Query(self, op):
323 """Runs the specified opcode and returns the result.
326 proc = mcpu.Processor(self.server.context)
327 # TODO: Where should log messages go?
328 return proc.ExecOpCode(op, self._DummyLog, None)
331 class GanetiContext(object):
332 """Context common to all ganeti threads.
334 This class creates and holds common objects shared by all threads.
340 """Constructs a new GanetiContext object.
342 There should be only a GanetiContext object at any time, so this
343 function raises an error if this is not the case.
346 assert self.__class__._instance is None, "double GanetiContext instance"
348 # Create global configuration object
349 self.cfg = config.ConfigWriter()
352 self.glm = locking.GanetiLockManager(
353 self.cfg.GetNodeList(),
354 self.cfg.GetInstanceList())
357 self.jobqueue = jqueue.JobQueue(self)
359 # setting this also locks the class against attribute modifications
360 self.__class__._instance = self
362 def __setattr__(self, name, value):
363 """Setting GanetiContext attributes is forbidden after initialization.
366 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
367 object.__setattr__(self, name, value)
369 def AddNode(self, node):
370 """Adds a node to the configuration and lock manager.
373 # Add it to the configuration
374 self.cfg.AddNode(node)
376 # If preseeding fails it'll not be added
377 self.jobqueue.AddNode(node)
379 # Add the new node to the Ganeti Lock Manager
380 self.glm.add(locking.LEVEL_NODE, node.name)
382 def ReaddNode(self, node):
383 """Updates a node that's already in the configuration
386 # Synchronize the queue again
387 self.jobqueue.AddNode(node)
389 def RemoveNode(self, name):
390 """Removes a node from the configuration and lock manager.
393 # Remove node from configuration
394 self.cfg.RemoveNode(name)
397 self.jobqueue.RemoveNode(name)
399 # Remove the node from the Ganeti Lock Manager
400 self.glm.remove(locking.LEVEL_NODE, name)
403 def _SetWatcherPause(until):
404 """Creates or removes the watcher pause file.
406 @type until: None or int
407 @param until: Unix timestamp saying until when the watcher shouldn't run
411 utils.RemoveFile(constants.WATCHER_PAUSEFILE)
413 utils.WriteFile(constants.WATCHER_PAUSEFILE,
414 data="%d\n" % (until, ))
419 def CheckAgreement():
420 """Check the agreement on who is the master.
422 The function uses a very simple algorithm: we must get more positive
423 than negative answers. Since in most of the cases we are the master,
424 we'll use our own config file for getting the node list. In the
425 future we could collect the current node list from our (possibly
426 obsolete) known nodes.
428 In order to account for cold-start of all nodes, we retry for up to
429 a minute until we get a real answer as the top-voted one. If the
430 nodes are more out-of-sync, for now manual startup of the master
433 Note that for a even number of nodes cluster, we need at least half
434 of the nodes (beside ourselves) to vote for us. This creates a
435 problem on two-node clusters, since in this case we require the
436 other node to be up too to confirm our status.
439 myself = utils.HostInfo().name
440 #temp instantiation of a config writer, used only to get the node list
441 cfg = config.ConfigWriter()
442 node_list = cfg.GetNodeList()
446 votes = bootstrap.GatherMasterVotes(node_list)
448 # empty node list, this is a one node cluster
450 if votes[0][0] is None:
456 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
457 " after multiple retries. Aborting startup")
459 # here a real node is at the top of the list
460 all_votes = sum(item[1] for item in votes)
461 top_node, top_votes = votes[0]
464 if top_node != myself:
465 logging.critical("It seems we are not the master (top-voted node"
466 " is %s with %d out of %d votes)", top_node, top_votes,
468 elif top_votes < all_votes - top_votes:
469 logging.critical("It seems we are not the master (%d votes for,"
470 " %d votes against)", top_votes, all_votes - top_votes)
477 def CheckAgreementWithRpc():
480 return CheckAgreement()
485 def _RunInSeparateProcess(fn):
486 """Runs a function in a separate process.
488 Note: Only boolean return values are supported.
491 @param fn: Function to be called
500 result = int(bool(fn()))
501 assert result in (0, 1)
503 logging.exception("Error while calling function in separate process")
504 # 0 and 1 are reserved for the return value
511 # Avoid zombies and check exit code
512 (_, status) = os.waitpid(pid, 0)
514 if os.WIFSIGNALED(status):
515 signum = os.WTERMSIG(status)
519 exitcode = os.WEXITSTATUS(status)
521 if not (exitcode in (0, 1) and signum is None):
522 logging.error("Child program failed (code=%s, signal=%s)",
524 sys.exit(constants.EXIT_FAILURE)
526 return bool(exitcode)
529 def CheckMasterd(options, args):
530 """Initial checks whether to run or exit with a failure.
533 ssconf.CheckMaster(options.debug)
535 # If CheckMaster didn't fail we believe we are the master, but we have to
536 # confirm with the other nodes.
537 if options.no_voting:
538 if options.yes_do_it:
541 sys.stdout.write("The 'no voting' option has been selected.\n")
542 sys.stdout.write("This is dangerous, please confirm by"
543 " typing uppercase 'yes': ")
546 confirmation = sys.stdin.readline().strip()
547 if confirmation != "YES":
548 print >>sys.stderr, "Aborting."
549 sys.exit(constants.EXIT_FAILURE)
553 # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
554 # process before we call utils.Daemonize in the current process.
555 if not _RunInSeparateProcess(CheckAgreementWithRpc):
556 sys.exit(constants.EXIT_FAILURE)
559 def ExecMasterd (options, args):
560 """Main master daemon function, executed with the PID file held.
563 # This is safe to do as the pid file guarantees against
564 # concurrent execution.
565 utils.RemoveFile(constants.MASTER_SOCKET)
567 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
572 master_node = ssconf.SimpleStore().GetMasterNode()
573 result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
574 msg = result.RemoteFailMsg()
576 logging.error("Can't activate master IP address: %s", msg)
580 master.serve_forever()
582 master.server_cleanup()
586 utils.RemoveFile(constants.MASTER_SOCKET)
591 parser = OptionParser(description="Ganeti master daemon",
592 usage="%prog [-f] [-d]",
593 version="%%prog (ganeti) %s" %
594 constants.RELEASE_VERSION)
595 parser.add_option("--no-voting", dest="no_voting",
596 help="Do not check that the nodes agree on this node"
597 " being the master and start the daemon unconditionally",
598 default=False, action="store_true")
599 parser.add_option("--yes-do-it", dest="yes_do_it",
600 help="Override interactive check for --no-voting",
601 default=False, action="store_true")
602 dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
603 (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
605 daemon.GenericMain(constants.MASTERD, parser, dirs,
606 CheckMasterd, ExecMasterd)
609 if __name__ == "__main__":