4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
40 from cStringIO import StringIO
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
55 from ganeti import rpc
58 CLIENT_REQUEST_WORKERS = 16
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64 class ClientRequestWorker(workerpool.BaseWorker):
65 def RunTask(self, server, request, client_address):
66 """Process the request.
68 This is copied from the code in ThreadingMixIn.
72 server.finish_request(request, client_address)
73 server.close_request(request)
75 server.handle_error(request, client_address)
76 server.close_request(request)
79 class IOServer(SocketServer.UnixStreamServer):
82 This class takes care of initializing the other threads, setting
83 signal handlers (which are processed only in this thread), and doing
87 def __init__(self, address, rqhandler):
88 """IOServer constructor
91 address: the address to bind this IOServer to
92 rqhandler: RequestHandler type object
95 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97 # We'll only start threads once we've forked.
99 self.request_workers = None
101 def setup_queue(self):
102 self.context = GanetiContext()
103 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
106 def process_request(self, request, client_address):
107 """Add task to workerpool to process request.
110 self.request_workers.AddTask(self, request, client_address)
112 def serve_forever(self):
113 """Handle one request at a time until told to quit."""
114 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116 while not sighandler.called:
117 self.handle_request()
121 def server_cleanup(self):
122 """Cleanup the server.
124 This involves shutting down the processor threads and the master
131 if self.request_workers:
132 self.request_workers.TerminateWorkers()
134 self.context.jobqueue.Shutdown()
137 class ClientRqHandler(SocketServer.BaseRequestHandler):
144 self._msgs = collections.deque()
145 self._ops = ClientOps(self.server)
149 msg = self.read_message()
151 logging.info("client closed connection")
154 request = simplejson.loads(msg)
155 logging.debug("request: %s", request)
156 if not isinstance(request, dict):
157 logging.error("wrong request received: %s", msg)
160 method = request.get(luxi.KEY_METHOD, None)
161 args = request.get(luxi.KEY_ARGS, None)
162 if method is None or args is None:
163 logging.error("no method or args in request")
168 result = self._ops.handle_request(method, args)
171 logging.error("Unexpected exception", exc_info=True)
173 result = "Caught exception: %s" % str(err[1])
176 luxi.KEY_SUCCESS: success,
177 luxi.KEY_RESULT: result,
179 logging.debug("response: %s", response)
180 self.send_message(simplejson.dumps(response))
182 def read_message(self):
183 while not self._msgs:
184 data = self.request.recv(self.READ_SIZE)
187 new_msgs = (self._buffer + data).split(self.EOM)
188 self._buffer = new_msgs.pop()
189 self._msgs.extend(new_msgs)
190 return self._msgs.popleft()
192 def send_message(self, msg):
193 #print "sending", msg
194 self.request.sendall(msg + self.EOM)
198 """Class holding high-level client operations."""
199 def __init__(self, server):
202 def handle_request(self, method, args):
203 queue = self.server.context.jobqueue
205 # TODO: Parameter validation
207 if method == luxi.REQ_SUBMIT_JOB:
208 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
209 return queue.SubmitJob(ops)
211 elif method == luxi.REQ_CANCEL_JOB:
213 return queue.CancelJob(job_id)
215 elif method == luxi.REQ_ARCHIVE_JOB:
217 return queue.ArchiveJob(job_id)
219 elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221 return queue.AutoArchiveJobs(age)
223 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
224 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
225 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
226 prev_log_serial, timeout)
228 elif method == luxi.REQ_QUERY_JOBS:
229 (job_ids, fields) = args
230 return queue.QueryJobs(job_ids, fields)
232 elif method == luxi.REQ_QUERY_INSTANCES:
233 (names, fields) = args
234 op = opcodes.OpQueryInstances(names=names, output_fields=fields)
235 return self._Query(op)
237 elif method == luxi.REQ_QUERY_NODES:
238 (names, fields) = args
239 op = opcodes.OpQueryNodes(names=names, output_fields=fields)
240 return self._Query(op)
242 elif method == luxi.REQ_QUERY_EXPORTS:
244 op = opcodes.OpQueryExports(nodes=nodes)
245 return self._Query(op)
247 elif method == luxi.REQ_QUERY_CONFIG_VALUES:
249 op = opcodes.OpQueryConfigValues(output_fields=fields)
250 return self._Query(op)
253 raise ValueError("Invalid operation")
255 def _DummyLog(self, *args):
258 def _Query(self, op):
259 """Runs the specified opcode and returns the result.
262 proc = mcpu.Processor(self.server.context)
263 # TODO: Where should log messages go?
264 return proc.ExecOpCode(op, self._DummyLog, None)
267 class GanetiContext(object):
268 """Context common to all ganeti threads.
270 This class creates and holds common objects shared by all threads.
276 """Constructs a new GanetiContext object.
278 There should be only a GanetiContext object at any time, so this
279 function raises an error if this is not the case.
282 assert self.__class__._instance is None, "double GanetiContext instance"
284 # Create global configuration object
285 self.cfg = config.ConfigWriter()
288 self.glm = locking.GanetiLockManager(
289 self.cfg.GetNodeList(),
290 self.cfg.GetInstanceList())
293 self.jobqueue = jqueue.JobQueue(self)
295 # setting this also locks the class against attribute modifications
296 self.__class__._instance = self
298 def __setattr__(self, name, value):
299 """Setting GanetiContext attributes is forbidden after initialization.
302 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
303 object.__setattr__(self, name, value)
305 def AddNode(self, node):
306 """Adds a node to the configuration and lock manager.
309 # Add it to the configuration
310 self.cfg.AddNode(node)
312 # If preseeding fails it'll not be added
313 self.jobqueue.AddNode(node.name)
315 # Add the new node to the Ganeti Lock Manager
316 self.glm.add(locking.LEVEL_NODE, node.name)
318 def ReaddNode(self, node):
319 """Updates a node that's already in the configuration
322 # Synchronize the queue again
323 self.jobqueue.AddNode(node.name)
325 def RemoveNode(self, name):
326 """Removes a node from the configuration and lock manager.
329 # Remove node from configuration
330 self.cfg.RemoveNode(name)
333 self.jobqueue.RemoveNode(name)
335 # Remove the node from the Ganeti Lock Manager
336 self.glm.remove(locking.LEVEL_NODE, name)
340 """Parse the command line options.
343 (options, args) as from OptionParser.parse_args()
346 parser = OptionParser(description="Ganeti master daemon",
347 usage="%prog [-f] [-d]",
348 version="%%prog (ganeti) %s" %
349 constants.RELEASE_VERSION)
351 parser.add_option("-f", "--foreground", dest="fork",
352 help="Don't detach from the current terminal",
353 default=True, action="store_false")
354 parser.add_option("-d", "--debug", dest="debug",
355 help="Enable some debug messages",
356 default=False, action="store_true")
357 options, args = parser.parse_args()
361 def CheckAgreement():
362 """Check the agreement on who is the master.
364 The function uses a very simple algorithm: we must get more positive
365 than negative answers. Since in most of the cases we are the master,
366 we'll use our own config file for getting the node list. In the
367 future we could collect the current node list from our (possibly
368 obsolete) known nodes.
371 myself = utils.HostInfo().name
372 #temp instantiation of a config writer, used only to get the node list
373 cfg = config.ConfigWriter()
374 node_list = cfg.GetNodeList()
377 node_list.remove(myself)
381 # either single node cluster, or a misconfiguration, but I won't
382 # break any other node, so I can proceed
384 results = rpc.RpcRunner.call_master_info(node_list)
385 if not isinstance(results, dict):
386 # this should not happen (unless internal error in rpc)
387 logging.critical("Can't complete rpc call, aborting master startup")
389 positive = negative = 0
392 if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
393 logging.warning("Can't contact node %s", node)
395 master_node = results[node][2]
396 if master_node == myself:
400 if not master_node in other_masters:
401 other_masters[master_node] = 0
402 other_masters[master_node] += 1
403 if positive <= negative:
405 logging.critical("It seems we are not the master (%d votes for,"
406 " %d votes against)", positive, negative)
407 if len(other_masters) > 1:
408 logging.critical("The other nodes do not agree on a single master")
410 # TODO: resync my files from the master
411 logging.critical("It seems the real master is %s",
412 other_masters.keys()[0])
414 logging.critical("Can't contact any node for data, aborting startup")
422 options, args = ParseOptions()
423 utils.debug = options.debug
426 ssconf.CheckMaster(options.debug)
428 # we believe we are the master, let's ask the other nodes...
429 if not CheckAgreement():
432 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
436 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
437 noclose_fds=[master.fileno()])
439 utils.WritePidFile(constants.MASTERD_PID)
441 logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
442 stderr_logging=not options.fork)
444 logging.info("ganeti master daemon startup")
447 master_node = ssconf.SimpleConfigReader().GetMasterNode()
448 if not rpc.RpcRunner.call_node_start_master(master_node, False):
449 logging.error("Can't activate master IP address")
453 master.serve_forever()
455 master.server_cleanup()
456 utils.RemovePidFile(constants.MASTERD_PID)
459 if __name__ == "__main__":