4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
41 from cStringIO import StringIO
42 from optparse import OptionParser
44 from ganeti import config
45 from ganeti import constants
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import serializer
60 CLIENT_REQUEST_WORKERS = 16
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66 class ClientRequestWorker(workerpool.BaseWorker):
67 def RunTask(self, server, request, client_address):
68 """Process the request.
70 This is copied from the code in ThreadingMixIn.
74 server.finish_request(request, client_address)
75 server.close_request(request)
77 server.handle_error(request, client_address)
78 server.close_request(request)
81 class IOServer(SocketServer.UnixStreamServer):
84 This class takes care of initializing the other threads, setting
85 signal handlers (which are processed only in this thread), and doing
89 def __init__(self, address, rqhandler):
90 """IOServer constructor
92 @param address: the address to bind this IOServer to
93 @param rqhandler: RequestHandler type object
96 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
98 # We'll only start threads once we've forked.
100 self.request_workers = None
102 def setup_queue(self):
103 self.context = GanetiContext()
104 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
107 def process_request(self, request, client_address):
108 """Add task to workerpool to process request.
111 self.request_workers.AddTask(self, request, client_address)
113 def serve_forever(self):
114 """Handle one request at a time until told to quit."""
115 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
117 while not sighandler.called:
118 self.handle_request()
122 def server_cleanup(self):
123 """Cleanup the server.
125 This involves shutting down the processor threads and the master
132 if self.request_workers:
133 self.request_workers.TerminateWorkers()
135 self.context.jobqueue.Shutdown()
138 class ClientRqHandler(SocketServer.BaseRequestHandler):
145 self._msgs = collections.deque()
146 self._ops = ClientOps(self.server)
150 msg = self.read_message()
152 logging.debug("client closed connection")
155 request = serializer.LoadJson(msg)
156 logging.debug("request: %s", request)
157 if not isinstance(request, dict):
158 logging.error("wrong request received: %s", msg)
161 method = request.get(luxi.KEY_METHOD, None)
162 args = request.get(luxi.KEY_ARGS, None)
163 if method is None or args is None:
164 logging.error("no method or args in request")
169 result = self._ops.handle_request(method, args)
171 except errors.GenericError, err:
173 result = (err.__class__.__name__, err.args)
175 logging.error("Unexpected exception", exc_info=True)
177 result = "Caught exception: %s" % str(err[1])
180 luxi.KEY_SUCCESS: success,
181 luxi.KEY_RESULT: result,
183 logging.debug("response: %s", response)
184 self.send_message(serializer.DumpJson(response))
186 def read_message(self):
187 while not self._msgs:
188 data = self.request.recv(self.READ_SIZE)
191 new_msgs = (self._buffer + data).split(self.EOM)
192 self._buffer = new_msgs.pop()
193 self._msgs.extend(new_msgs)
194 return self._msgs.popleft()
196 def send_message(self, msg):
197 #print "sending", msg
198 self.request.sendall(msg + self.EOM)
202 """Class holding high-level client operations."""
203 def __init__(self, server):
206 def handle_request(self, method, args):
207 queue = self.server.context.jobqueue
209 # TODO: Parameter validation
211 if method == luxi.REQ_SUBMIT_JOB:
212 logging.info("Received new job")
213 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
214 return queue.SubmitJob(ops)
216 if method == luxi.REQ_SUBMIT_MANY_JOBS:
217 logging.info("Received multiple jobs")
220 jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
221 return queue.SubmitManyJobs(jobs)
223 elif method == luxi.REQ_CANCEL_JOB:
225 logging.info("Received job cancel request for %s", job_id)
226 return queue.CancelJob(job_id)
228 elif method == luxi.REQ_ARCHIVE_JOB:
230 logging.info("Received job archive request for %s", job_id)
231 return queue.ArchiveJob(job_id)
233 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
234 (age, timeout) = args
235 logging.info("Received job autoarchive request for age %s, timeout %s",
237 return queue.AutoArchiveJobs(age, timeout)
239 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
240 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
241 logging.info("Received job poll request for %s", job_id)
242 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
243 prev_log_serial, timeout)
245 elif method == luxi.REQ_QUERY_JOBS:
246 (job_ids, fields) = args
247 if isinstance(job_ids, (tuple, list)) and job_ids:
248 msg = ", ".join(job_ids)
251 logging.info("Received job query request for %s", msg)
252 return queue.QueryJobs(job_ids, fields)
254 elif method == luxi.REQ_QUERY_INSTANCES:
255 (names, fields, use_locking) = args
256 logging.info("Received instance query request for %s", names)
258 raise errors.OpPrereqError("Sync queries are not allowed")
259 op = opcodes.OpQueryInstances(names=names, output_fields=fields,
260 use_locking=use_locking)
261 return self._Query(op)
263 elif method == luxi.REQ_QUERY_NODES:
264 (names, fields, use_locking) = args
265 logging.info("Received node query request for %s", names)
267 raise errors.OpPrereqError("Sync queries are not allowed")
268 op = opcodes.OpQueryNodes(names=names, output_fields=fields,
269 use_locking=use_locking)
270 return self._Query(op)
272 elif method == luxi.REQ_QUERY_EXPORTS:
273 nodes, use_locking = args
275 raise errors.OpPrereqError("Sync queries are not allowed")
276 logging.info("Received exports query request")
277 op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
278 return self._Query(op)
280 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
282 logging.info("Received config values query request for %s", fields)
283 op = opcodes.OpQueryConfigValues(output_fields=fields)
284 return self._Query(op)
286 elif method == luxi.REQ_QUERY_CLUSTER_INFO:
287 logging.info("Received cluster info query request")
288 op = opcodes.OpQueryClusterInfo()
289 return self._Query(op)
291 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
293 logging.info("Received queue drain flag change request to %s",
295 return queue.SetDrainFlag(drain_flag)
298 logging.info("Received invalid request '%s'", method)
299 raise ValueError("Invalid operation '%s'" % method)
301 def _DummyLog(self, *args):
304 def _Query(self, op):
305 """Runs the specified opcode and returns the result.
308 proc = mcpu.Processor(self.server.context)
309 # TODO: Where should log messages go?
310 return proc.ExecOpCode(op, self._DummyLog, None)
313 class GanetiContext(object):
314 """Context common to all ganeti threads.
316 This class creates and holds common objects shared by all threads.
322 """Constructs a new GanetiContext object.
324 There should be only a GanetiContext object at any time, so this
325 function raises an error if this is not the case.
328 assert self.__class__._instance is None, "double GanetiContext instance"
330 # Create global configuration object
331 self.cfg = config.ConfigWriter()
334 self.glm = locking.GanetiLockManager(
335 self.cfg.GetNodeList(),
336 self.cfg.GetInstanceList())
339 self.jobqueue = jqueue.JobQueue(self)
341 # setting this also locks the class against attribute modifications
342 self.__class__._instance = self
344 def __setattr__(self, name, value):
345 """Setting GanetiContext attributes is forbidden after initialization.
348 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
349 object.__setattr__(self, name, value)
351 def AddNode(self, node):
352 """Adds a node to the configuration and lock manager.
355 # Add it to the configuration
356 self.cfg.AddNode(node)
358 # If preseeding fails it'll not be added
359 self.jobqueue.AddNode(node)
361 # Add the new node to the Ganeti Lock Manager
362 self.glm.add(locking.LEVEL_NODE, node.name)
364 def ReaddNode(self, node):
365 """Updates a node that's already in the configuration
368 # Synchronize the queue again
369 self.jobqueue.AddNode(node)
371 def RemoveNode(self, name):
372 """Removes a node from the configuration and lock manager.
375 # Remove node from configuration
376 self.cfg.RemoveNode(name)
379 self.jobqueue.RemoveNode(name)
381 # Remove the node from the Ganeti Lock Manager
382 self.glm.remove(locking.LEVEL_NODE, name)
386 """Parse the command line options.
388 @return: (options, args) as from OptionParser.parse_args()
391 parser = OptionParser(description="Ganeti master daemon",
392 usage="%prog [-f] [-d]",
393 version="%%prog (ganeti) %s" %
394 constants.RELEASE_VERSION)
396 parser.add_option("-f", "--foreground", dest="fork",
397 help="Don't detach from the current terminal",
398 default=True, action="store_false")
399 parser.add_option("-d", "--debug", dest="debug",
400 help="Enable some debug messages",
401 default=False, action="store_true")
402 parser.add_option("--no-voting", dest="no_voting",
403 help="Do not check that the nodes agree on this node"
404 " being the master and start the daemon unconditionally",
405 default=False, action="store_true")
406 options, args = parser.parse_args()
410 def CheckAgreement():
411 """Check the agreement on who is the master.
413 The function uses a very simple algorithm: we must get more positive
414 than negative answers. Since in most of the cases we are the master,
415 we'll use our own config file for getting the node list. In the
416 future we could collect the current node list from our (possibly
417 obsolete) known nodes.
419 In order to account for cold-start of all nodes, we retry for up to
420 a minute until we get a real answer as the top-voted one. If the
421 nodes are more out-of-sync, for now manual startup of the master
424 Note that for a even number of nodes cluster, we need at least half
425 of the nodes (beside ourselves) to vote for us. This creates a
426 problem on two-node clusters, since in this case we require the
427 other node to be up too to confirm our status.
430 myself = utils.HostInfo().name
431 #temp instantiation of a config writer, used only to get the node list
432 cfg = config.ConfigWriter()
433 node_list = cfg.GetNodeList()
437 votes = bootstrap.GatherMasterVotes(node_list)
439 # empty node list, this is a one node cluster
441 if votes[0][0] is None:
447 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
448 " after multiple retries. Aborting startup")
450 # here a real node is at the top of the list
451 all_votes = sum(item[1] for item in votes)
452 top_node, top_votes = votes[0]
454 if top_node != myself:
455 logging.critical("It seems we are not the master (top-voted node"
456 " is %s with %d out of %d votes)", top_node, top_votes,
458 elif top_votes < all_votes - top_votes:
459 logging.critical("It seems we are not the master (%d votes for,"
460 " %d votes against)", top_votes, all_votes - top_votes)
470 options, args = ParseOptions()
471 utils.debug = options.debug
479 ssconf.CheckMaster(options.debug)
481 # we believe we are the master, let's ask the other nodes...
482 if options.no_voting:
483 sys.stdout.write("The 'no voting' option has been selected.\n")
484 sys.stdout.write("This is dangerous, please confirm by"
485 " typing uppercase 'yes': ")
487 confirmation = sys.stdin.readline().strip()
488 if confirmation != "YES":
492 if not CheckAgreement():
495 dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
496 (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
498 utils.EnsureDirs(dirs)
500 # This is safe to do as the pid file guarantees against
501 # concurrent execution.
502 utils.RemoveFile(constants.MASTER_SOCKET)
504 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
510 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
512 utils.WritePidFile(constants.MASTERD_PID)
514 utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
515 stderr_logging=not options.fork, multithreaded=True)
517 logging.info("Ganeti master daemon startup")
522 master_node = ssconf.SimpleConfigReader().GetMasterNode()
523 if not rpc.RpcRunner.call_node_start_master(master_node, False):
524 logging.error("Can't activate master IP address")
528 master.serve_forever()
530 master.server_cleanup()
534 utils.RemovePidFile(constants.MASTERD_PID)
535 utils.RemoveFile(constants.MASTER_SOCKET)
538 if __name__ == "__main__":