4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
42 from cStringIO import StringIO
43 from optparse import OptionParser
45 from ganeti import config
46 from ganeti import constants
47 from ganeti import mcpu
48 from ganeti import opcodes
49 from ganeti import jqueue
50 from ganeti import locking
51 from ganeti import luxi
52 from ganeti import utils
53 from ganeti import errors
54 from ganeti import ssconf
55 from ganeti import workerpool
56 from ganeti import rpc
57 from ganeti import bootstrap
60 CLIENT_REQUEST_WORKERS = 16
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66 class ClientRequestWorker(workerpool.BaseWorker):
67 def RunTask(self, server, request, client_address):
68 """Process the request.
70 This is copied from the code in ThreadingMixIn.
74 server.finish_request(request, client_address)
75 server.close_request(request)
77 server.handle_error(request, client_address)
78 server.close_request(request)
81 class IOServer(SocketServer.UnixStreamServer):
84 This class takes care of initializing the other threads, setting
85 signal handlers (which are processed only in this thread), and doing
89 def __init__(self, address, rqhandler):
90 """IOServer constructor
92 @param address: the address to bind this IOServer to
93 @param rqhandler: RequestHandler type object
96 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
98 # We'll only start threads once we've forked.
100 self.request_workers = None
102 def setup_queue(self):
103 self.context = GanetiContext()
104 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
107 def process_request(self, request, client_address):
108 """Add task to workerpool to process request.
111 self.request_workers.AddTask(self, request, client_address)
113 def serve_forever(self):
114 """Handle one request at a time until told to quit."""
115 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
117 while not sighandler.called:
118 self.handle_request()
122 def server_cleanup(self):
123 """Cleanup the server.
125 This involves shutting down the processor threads and the master
132 if self.request_workers:
133 self.request_workers.TerminateWorkers()
135 self.context.jobqueue.Shutdown()
138 class ClientRqHandler(SocketServer.BaseRequestHandler):
145 self._msgs = collections.deque()
146 self._ops = ClientOps(self.server)
150 msg = self.read_message()
152 logging.info("client closed connection")
155 request = simplejson.loads(msg)
156 logging.debug("request: %s", request)
157 if not isinstance(request, dict):
158 logging.error("wrong request received: %s", msg)
161 method = request.get(luxi.KEY_METHOD, None)
162 args = request.get(luxi.KEY_ARGS, None)
163 if method is None or args is None:
164 logging.error("no method or args in request")
169 result = self._ops.handle_request(method, args)
171 except errors.GenericError, err:
173 result = (err.__class__.__name__, err.args)
175 logging.error("Unexpected exception", exc_info=True)
177 result = "Caught exception: %s" % str(err[1])
180 luxi.KEY_SUCCESS: success,
181 luxi.KEY_RESULT: result,
183 logging.debug("response: %s", response)
184 self.send_message(simplejson.dumps(response))
186 def read_message(self):
187 while not self._msgs:
188 data = self.request.recv(self.READ_SIZE)
191 new_msgs = (self._buffer + data).split(self.EOM)
192 self._buffer = new_msgs.pop()
193 self._msgs.extend(new_msgs)
194 return self._msgs.popleft()
196 def send_message(self, msg):
197 #print "sending", msg
198 self.request.sendall(msg + self.EOM)
202 """Class holding high-level client operations."""
203 def __init__(self, server):
206 def handle_request(self, method, args):
207 queue = self.server.context.jobqueue
209 # TODO: Parameter validation
211 if method == luxi.REQ_SUBMIT_JOB:
212 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
213 return queue.SubmitJob(ops)
215 elif method == luxi.REQ_CANCEL_JOB:
217 return queue.CancelJob(job_id)
219 elif method == luxi.REQ_ARCHIVE_JOB:
221 return queue.ArchiveJob(job_id)
223 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
225 return queue.AutoArchiveJobs(age)
227 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230 prev_log_serial, timeout)
232 elif method == luxi.REQ_QUERY_JOBS:
233 (job_ids, fields) = args
234 return queue.QueryJobs(job_ids, fields)
236 elif method == luxi.REQ_QUERY_INSTANCES:
237 (names, fields) = args
238 op = opcodes.OpQueryInstances(names=names, output_fields=fields)
239 return self._Query(op)
241 elif method == luxi.REQ_QUERY_NODES:
242 (names, fields) = args
243 op = opcodes.OpQueryNodes(names=names, output_fields=fields)
244 return self._Query(op)
246 elif method == luxi.REQ_QUERY_EXPORTS:
248 op = opcodes.OpQueryExports(nodes=nodes)
249 return self._Query(op)
251 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
253 op = opcodes.OpQueryConfigValues(output_fields=fields)
254 return self._Query(op)
256 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
258 return queue.SetDrainFlag(drain_flag)
261 raise ValueError("Invalid operation")
263 def _DummyLog(self, *args):
266 def _Query(self, op):
267 """Runs the specified opcode and returns the result.
270 proc = mcpu.Processor(self.server.context)
271 # TODO: Where should log messages go?
272 return proc.ExecOpCode(op, self._DummyLog, None)
275 class GanetiContext(object):
276 """Context common to all ganeti threads.
278 This class creates and holds common objects shared by all threads.
284 """Constructs a new GanetiContext object.
286 There should be only a GanetiContext object at any time, so this
287 function raises an error if this is not the case.
290 assert self.__class__._instance is None, "double GanetiContext instance"
292 # Create global configuration object
293 self.cfg = config.ConfigWriter()
296 self.glm = locking.GanetiLockManager(
297 self.cfg.GetNodeList(),
298 self.cfg.GetInstanceList())
301 self.jobqueue = jqueue.JobQueue(self)
303 # setting this also locks the class against attribute modifications
304 self.__class__._instance = self
306 def __setattr__(self, name, value):
307 """Setting GanetiContext attributes is forbidden after initialization.
310 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
311 object.__setattr__(self, name, value)
313 def AddNode(self, node):
314 """Adds a node to the configuration and lock manager.
317 # Add it to the configuration
318 self.cfg.AddNode(node)
320 # If preseeding fails it'll not be added
321 self.jobqueue.AddNode(node)
323 # Add the new node to the Ganeti Lock Manager
324 self.glm.add(locking.LEVEL_NODE, node.name)
326 def ReaddNode(self, node):
327 """Updates a node that's already in the configuration
330 # Synchronize the queue again
331 self.jobqueue.AddNode(node)
333 def RemoveNode(self, name):
334 """Removes a node from the configuration and lock manager.
337 # Remove node from configuration
338 self.cfg.RemoveNode(name)
341 self.jobqueue.RemoveNode(name)
343 # Remove the node from the Ganeti Lock Manager
344 self.glm.remove(locking.LEVEL_NODE, name)
348 """Parse the command line options.
350 @return: (options, args) as from OptionParser.parse_args()
353 parser = OptionParser(description="Ganeti master daemon",
354 usage="%prog [-f] [-d]",
355 version="%%prog (ganeti) %s" %
356 constants.RELEASE_VERSION)
358 parser.add_option("-f", "--foreground", dest="fork",
359 help="Don't detach from the current terminal",
360 default=True, action="store_false")
361 parser.add_option("-d", "--debug", dest="debug",
362 help="Enable some debug messages",
363 default=False, action="store_true")
364 options, args = parser.parse_args()
368 def CheckAgreement():
369 """Check the agreement on who is the master.
371 The function uses a very simple algorithm: we must get more positive
372 than negative answers. Since in most of the cases we are the master,
373 we'll use our own config file for getting the node list. In the
374 future we could collect the current node list from our (possibly
375 obsolete) known nodes.
377 In order to account for cold-start of all nodes, we retry for up to
378 a minute until we get a real answer as the top-voted one. If the
379 nodes are more out-of-sync, for now manual startup of the master
382 Note that for a even number of nodes cluster, we need at least half
383 of the nodes (beside ourselves) to vote for us. This creates a
384 problem on two-node clusters, since in this case we require the
385 other node to be up too to confirm our status.
388 myself = utils.HostInfo().name
389 #temp instantiation of a config writer, used only to get the node list
390 cfg = config.ConfigWriter()
391 node_list = cfg.GetNodeList()
395 votes = bootstrap.GatherMasterVotes(node_list)
397 # empty node list, this is a one node cluster
399 if votes[0][0] is None:
405 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
406 " after multiple retries. Aborting startup")
408 # here a real node is at the top of the list
409 all_votes = sum(item[1] for item in votes)
410 top_node, top_votes = votes[0]
412 if top_node != myself:
413 logging.critical("It seems we are not the master (top-voted node"
414 " is %s with %d out of %d votes)", top_node, top_votes,
416 elif top_votes < all_votes - top_votes:
417 logging.critical("It seems we are not the master (%d votes for,"
418 " %d votes against)", top_votes, all_votes - top_votes)
428 options, args = ParseOptions()
429 utils.debug = options.debug
434 ssconf.CheckMaster(options.debug)
436 # we believe we are the master, let's ask the other nodes...
437 if not CheckAgreement():
440 dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
441 (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
443 for dir, mode in dirs:
446 except EnvironmentError, err:
447 if err.errno != errno.EEXIST:
448 raise errors.GenericError("Cannot create needed directory"
449 " '%s': %s" % (constants.SOCKET_DIR, err))
450 if not os.path.isdir(dir):
451 raise errors.GenericError("%s is not a directory" % dir)
453 # This is safe to do as the pid file guarantees against
454 # concurrent execution.
455 utils.RemoveFile(constants.MASTER_SOCKET)
457 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
463 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
464 noclose_fds=[master.fileno()])
466 utils.WritePidFile(constants.MASTERD_PID)
468 utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
469 stderr_logging=not options.fork)
471 logging.info("Ganeti master daemon startup")
476 master_node = ssconf.SimpleConfigReader().GetMasterNode()
477 if not rpc.RpcRunner.call_node_start_master(master_node, False):
478 logging.error("Can't activate master IP address")
482 master.serve_forever()
484 master.server_cleanup()
488 utils.RemovePidFile(constants.MASTERD_PID)
489 utils.RemoveFile(constants.MASTER_SOCKET)
492 if __name__ == "__main__":