4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
40 from cStringIO import StringIO
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import workerpool
54 from ganeti import rpc
55 from ganeti import bootstrap
58 CLIENT_REQUEST_WORKERS = 16
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64 class ClientRequestWorker(workerpool.BaseWorker):
65 def RunTask(self, server, request, client_address):
66 """Process the request.
68 This is copied from the code in ThreadingMixIn.
72 server.finish_request(request, client_address)
73 server.close_request(request)
75 server.handle_error(request, client_address)
76 server.close_request(request)
79 class IOServer(SocketServer.UnixStreamServer):
82 This class takes care of initializing the other threads, setting
83 signal handlers (which are processed only in this thread), and doing
87 def __init__(self, address, rqhandler):
88 """IOServer constructor
91 address: the address to bind this IOServer to
92 rqhandler: RequestHandler type object
95 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97 # We'll only start threads once we've forked.
99 self.request_workers = None
101 def setup_queue(self):
102 self.context = GanetiContext()
103 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
106 def process_request(self, request, client_address):
107 """Add task to workerpool to process request.
110 self.request_workers.AddTask(self, request, client_address)
112 def serve_forever(self):
113 """Handle one request at a time until told to quit."""
114 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116 while not sighandler.called:
117 self.handle_request()
121 def server_cleanup(self):
122 """Cleanup the server.
124 This involves shutting down the processor threads and the master
131 if self.request_workers:
132 self.request_workers.TerminateWorkers()
134 self.context.jobqueue.Shutdown()
137 class ClientRqHandler(SocketServer.BaseRequestHandler):
144 self._msgs = collections.deque()
145 self._ops = ClientOps(self.server)
149 msg = self.read_message()
151 logging.info("client closed connection")
154 request = simplejson.loads(msg)
155 logging.debug("request: %s", request)
156 if not isinstance(request, dict):
157 logging.error("wrong request received: %s", msg)
160 method = request.get(luxi.KEY_METHOD, None)
161 args = request.get(luxi.KEY_ARGS, None)
162 if method is None or args is None:
163 logging.error("no method or args in request")
168 result = self._ops.handle_request(method, args)
170 except errors.GenericError, err:
172 result = (err.__class__.__name__, err.args)
174 logging.error("Unexpected exception", exc_info=True)
176 result = "Caught exception: %s" % str(err[1])
179 luxi.KEY_SUCCESS: success,
180 luxi.KEY_RESULT: result,
182 logging.debug("response: %s", response)
183 self.send_message(simplejson.dumps(response))
185 def read_message(self):
186 while not self._msgs:
187 data = self.request.recv(self.READ_SIZE)
190 new_msgs = (self._buffer + data).split(self.EOM)
191 self._buffer = new_msgs.pop()
192 self._msgs.extend(new_msgs)
193 return self._msgs.popleft()
195 def send_message(self, msg):
196 #print "sending", msg
197 self.request.sendall(msg + self.EOM)
201 """Class holding high-level client operations."""
202 def __init__(self, server):
205 def handle_request(self, method, args):
206 queue = self.server.context.jobqueue
208 # TODO: Parameter validation
210 if method == luxi.REQ_SUBMIT_JOB:
211 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
212 return queue.SubmitJob(ops)
214 elif method == luxi.REQ_CANCEL_JOB:
216 return queue.CancelJob(job_id)
218 elif method == luxi.REQ_ARCHIVE_JOB:
220 return queue.ArchiveJob(job_id)
222 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
224 return queue.AutoArchiveJobs(age)
226 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
229 prev_log_serial, timeout)
231 elif method == luxi.REQ_QUERY_JOBS:
232 (job_ids, fields) = args
233 return queue.QueryJobs(job_ids, fields)
235 elif method == luxi.REQ_QUERY_INSTANCES:
236 (names, fields) = args
237 op = opcodes.OpQueryInstances(names=names, output_fields=fields)
238 return self._Query(op)
240 elif method == luxi.REQ_QUERY_NODES:
241 (names, fields) = args
242 op = opcodes.OpQueryNodes(names=names, output_fields=fields)
243 return self._Query(op)
245 elif method == luxi.REQ_QUERY_EXPORTS:
247 op = opcodes.OpQueryExports(nodes=nodes)
248 return self._Query(op)
250 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
252 op = opcodes.OpQueryConfigValues(output_fields=fields)
253 return self._Query(op)
255 elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
257 return queue.SetDrainFlag(drain_flag)
260 raise ValueError("Invalid operation")
262 def _DummyLog(self, *args):
265 def _Query(self, op):
266 """Runs the specified opcode and returns the result.
269 proc = mcpu.Processor(self.server.context)
270 # TODO: Where should log messages go?
271 return proc.ExecOpCode(op, self._DummyLog, None)
274 class GanetiContext(object):
275 """Context common to all ganeti threads.
277 This class creates and holds common objects shared by all threads.
283 """Constructs a new GanetiContext object.
285 There should be only a GanetiContext object at any time, so this
286 function raises an error if this is not the case.
289 assert self.__class__._instance is None, "double GanetiContext instance"
291 # Create global configuration object
292 self.cfg = config.ConfigWriter()
295 self.glm = locking.GanetiLockManager(
296 self.cfg.GetNodeList(),
297 self.cfg.GetInstanceList())
300 self.jobqueue = jqueue.JobQueue(self)
302 # setting this also locks the class against attribute modifications
303 self.__class__._instance = self
305 def __setattr__(self, name, value):
306 """Setting GanetiContext attributes is forbidden after initialization.
309 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
310 object.__setattr__(self, name, value)
312 def AddNode(self, node):
313 """Adds a node to the configuration and lock manager.
316 # Add it to the configuration
317 self.cfg.AddNode(node)
319 # If preseeding fails it'll not be added
320 self.jobqueue.AddNode(node)
322 # Add the new node to the Ganeti Lock Manager
323 self.glm.add(locking.LEVEL_NODE, node.name)
325 def ReaddNode(self, node):
326 """Updates a node that's already in the configuration
329 # Synchronize the queue again
330 self.jobqueue.AddNode(node)
332 def RemoveNode(self, name):
333 """Removes a node from the configuration and lock manager.
336 # Remove node from configuration
337 self.cfg.RemoveNode(name)
340 self.jobqueue.RemoveNode(name)
342 # Remove the node from the Ganeti Lock Manager
343 self.glm.remove(locking.LEVEL_NODE, name)
347 """Parse the command line options.
350 (options, args) as from OptionParser.parse_args()
353 parser = OptionParser(description="Ganeti master daemon",
354 usage="%prog [-f] [-d]",
355 version="%%prog (ganeti) %s" %
356 constants.RELEASE_VERSION)
358 parser.add_option("-f", "--foreground", dest="fork",
359 help="Don't detach from the current terminal",
360 default=True, action="store_false")
361 parser.add_option("-d", "--debug", dest="debug",
362 help="Enable some debug messages",
363 default=False, action="store_true")
364 options, args = parser.parse_args()
368 def CheckAgreement():
369 """Check the agreement on who is the master.
371 The function uses a very simple algorithm: we must get more positive
372 than negative answers. Since in most of the cases we are the master,
373 we'll use our own config file for getting the node list. In the
374 future we could collect the current node list from our (possibly
375 obsolete) known nodes.
377 In order to account for cold-start of all nodes, we retry for up to
378 a minute until we get a real answer as the top-voted one. If the
379 nodes are more out-of-sync, for now manual startup of the master
382 Note that for a even number of nodes cluster, we need at least half
383 of the nodes (beside ourselves) to vote for us. This creates a
384 problem on two-node clusters, since in this case we require the
385 other node to be up too to confirm our status.
388 myself = utils.HostInfo().name
389 #temp instantiation of a config writer, used only to get the node list
390 cfg = config.ConfigWriter()
391 node_list = cfg.GetNodeList()
395 votes = bootstrap.GatherMasterVotes(node_list)
397 # empty node list, this is a one node cluster
399 if votes[0][0] is None:
405 logging.critical("Cluster inconsistent, most of the nodes didn't answer"
406 " after multiple retries. Aborting startup")
408 # here a real node is at the top of the list
409 all_votes = sum(item[1] for item in votes)
410 top_node, top_votes = votes[0]
412 if top_node != myself:
413 logging.critical("It seems we are not the master (top-voted node"
415 elif top_votes < all_votes - top_votes:
416 logging.critical("It seems we are not the master (%d votes for,"
417 " %d votes against)", top_votes, all_votes - top_votes)
427 options, args = ParseOptions()
428 utils.debug = options.debug
433 ssconf.CheckMaster(options.debug)
435 # we believe we are the master, let's ask the other nodes...
436 if not CheckAgreement():
439 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
445 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
446 noclose_fds=[master.fileno()])
448 utils.WritePidFile(constants.MASTERD_PID)
450 utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
451 stderr_logging=not options.fork)
453 logging.info("Ganeti master daemon startup")
458 master_node = ssconf.SimpleConfigReader().GetMasterNode()
459 if not rpc.RpcRunner.call_node_start_master(master_node, False):
460 logging.error("Can't activate master IP address")
464 master.serve_forever()
466 master.server_cleanup()
470 utils.RemovePidFile(constants.MASTERD_PID)
473 if __name__ == "__main__":