4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
40 from cStringIO import StringIO
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
55 from ganeti import rpc
58 CLIENT_REQUEST_WORKERS = 16
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64 class ClientRequestWorker(workerpool.BaseWorker):
65 def RunTask(self, server, request, client_address):
66 """Process the request.
68 This is copied from the code in ThreadingMixIn.
72 server.finish_request(request, client_address)
73 server.close_request(request)
75 server.handle_error(request, client_address)
76 server.close_request(request)
79 class IOServer(SocketServer.UnixStreamServer):
82 This class takes care of initializing the other threads, setting
83 signal handlers (which are processed only in this thread), and doing
87 def __init__(self, address, rqhandler):
88 """IOServer constructor
91 address: the address to bind this IOServer to
92 rqhandler: RequestHandler type object
95 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97 # We'll only start threads once we've forked.
99 self.request_workers = None
101 def setup_queue(self):
102 self.context = GanetiContext()
103 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
106 def process_request(self, request, client_address):
107 """Add task to workerpool to process request.
110 self.request_workers.AddTask(self, request, client_address)
112 def serve_forever(self):
113 """Handle one request at a time until told to quit."""
114 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116 while not sighandler.called:
117 self.handle_request()
121 def server_cleanup(self):
122 """Cleanup the server.
124 This involves shutting down the processor threads and the master
131 if self.request_workers:
132 self.request_workers.TerminateWorkers()
134 self.context.jobqueue.Shutdown()
137 class ClientRqHandler(SocketServer.BaseRequestHandler):
144 self._msgs = collections.deque()
145 self._ops = ClientOps(self.server)
149 msg = self.read_message()
151 logging.info("client closed connection")
154 request = simplejson.loads(msg)
155 logging.debug("request: %s", request)
156 if not isinstance(request, dict):
157 logging.error("wrong request received: %s", msg)
160 method = request.get(luxi.KEY_METHOD, None)
161 args = request.get(luxi.KEY_ARGS, None)
162 if method is None or args is None:
163 logging.error("no method or args in request")
168 result = self._ops.handle_request(method, args)
171 logging.error("Unexpected exception", exc_info=True)
173 result = "Caught exception: %s" % str(err[1])
176 luxi.KEY_SUCCESS: success,
177 luxi.KEY_RESULT: result,
179 logging.debug("response: %s", response)
180 self.send_message(simplejson.dumps(response))
182 def read_message(self):
183 while not self._msgs:
184 data = self.request.recv(self.READ_SIZE)
187 new_msgs = (self._buffer + data).split(self.EOM)
188 self._buffer = new_msgs.pop()
189 self._msgs.extend(new_msgs)
190 return self._msgs.popleft()
192 def send_message(self, msg):
193 #print "sending", msg
194 self.request.sendall(msg + self.EOM)
198 """Class holding high-level client operations."""
199 def __init__(self, server):
202 def handle_request(self, method, args):
203 queue = self.server.context.jobqueue
205 # TODO: Parameter validation
207 if method == luxi.REQ_SUBMIT_JOB:
208 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
209 return queue.SubmitJob(ops)
211 elif method == luxi.REQ_CANCEL_JOB:
213 return queue.CancelJob(job_id)
215 elif method == luxi.REQ_ARCHIVE_JOB:
217 return queue.ArchiveJob(job_id)
219 elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220 (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
221 return queue.WaitForJobChanges(job_id, fields, prev_job_info,
222 prev_log_serial, timeout)
224 elif method == luxi.REQ_QUERY_JOBS:
225 (job_ids, fields) = args
226 return queue.QueryJobs(job_ids, fields)
228 elif method == luxi.REQ_QUERY_INSTANCES:
229 (names, fields) = args
230 op = opcodes.OpQueryInstances(names=names, output_fields=fields)
231 return self._Query(op)
233 elif method == luxi.REQ_QUERY_NODES:
234 (names, fields) = args
235 op = opcodes.OpQueryNodes(names=names, output_fields=fields)
236 return self._Query(op)
238 elif method == luxi.REQ_QUERY_EXPORTS:
240 op = opcodes.OpQueryExports(nodes=nodes)
241 return self._Query(op)
244 raise ValueError("Invalid operation")
246 def _DummyLog(self, *args):
249 def _Query(self, op):
250 """Runs the specified opcode and returns the result.
253 proc = mcpu.Processor(self.server.context)
254 # TODO: Where should log messages go?
255 return proc.ExecOpCode(op, self._DummyLog)
258 class GanetiContext(object):
259 """Context common to all ganeti threads.
261 This class creates and holds common objects shared by all threads.
267 """Constructs a new GanetiContext object.
269 There should be only a GanetiContext object at any time, so this
270 function raises an error if this is not the case.
273 assert self.__class__._instance is None, "double GanetiContext instance"
275 # Create global configuration object
276 self.cfg = config.ConfigWriter()
279 self.glm = locking.GanetiLockManager(
280 self.cfg.GetNodeList(),
281 self.cfg.GetInstanceList())
284 self.jobqueue = jqueue.JobQueue(self)
286 # setting this also locks the class against attribute modifications
287 self.__class__._instance = self
289 def __setattr__(self, name, value):
290 """Setting GanetiContext attributes is forbidden after initialization.
293 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
294 object.__setattr__(self, name, value)
296 def AddNode(self, node):
297 """Adds a node to the configuration and lock manager.
300 # Add it to the configuration
301 self.cfg.AddNode(node)
303 # If preseeding fails it'll not be added
304 self.jobqueue.AddNode(node.name)
306 # Add the new node to the Ganeti Lock Manager
307 self.glm.add(locking.LEVEL_NODE, node.name)
309 def ReaddNode(self, node):
310 """Updates a node that's already in the configuration
313 # Synchronize the queue again
314 self.jobqueue.AddNode(node.name)
316 def RemoveNode(self, name):
317 """Removes a node from the configuration and lock manager.
320 # Remove node from configuration
321 self.cfg.RemoveNode(name)
324 self.jobqueue.RemoveNode(name)
326 # Remove the node from the Ganeti Lock Manager
327 self.glm.remove(locking.LEVEL_NODE, name)
331 """Parse the command line options.
334 (options, args) as from OptionParser.parse_args()
337 parser = OptionParser(description="Ganeti master daemon",
338 usage="%prog [-f] [-d]",
339 version="%%prog (ganeti) %s" %
340 constants.RELEASE_VERSION)
342 parser.add_option("-f", "--foreground", dest="fork",
343 help="Don't detach from the current terminal",
344 default=True, action="store_false")
345 parser.add_option("-d", "--debug", dest="debug",
346 help="Enable some debug messages",
347 default=False, action="store_true")
348 options, args = parser.parse_args()
352 def CheckAgreement():
353 """Check the agreement on who is the master.
355 The function uses a very simple algorithm: we must get more positive
356 than negative answers. Since in most of the cases we are the master,
357 we'll use our own config file for getting the node list. In the
358 future we could collect the current node list from our (possibly
359 obsolete) known nodes.
362 myself = utils.HostInfo().name
363 #temp instantiation of a config writer, used only to get the node list
364 cfg = config.ConfigWriter()
365 node_list = cfg.GetNodeList()
368 node_list.remove(myself)
372 # either single node cluster, or a misconfiguration, but I won't
373 # break any other node, so I can proceed
375 results = rpc.call_master_info(node_list)
376 if not isinstance(results, dict):
377 # this should not happen (unless internal error in rpc)
378 logging.critical("Can't complete rpc call, aborting master startup")
380 positive = negative = 0
383 if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
384 logging.warning("Can't contact node %s", node)
386 master_node = results[node][2]
387 if master_node == myself:
391 if not master_node in other_masters:
392 other_masters[master_node] = 0
393 other_masters[master_node] += 1
394 if positive <= negative:
396 logging.critical("It seems we are not the master (%d votes for,"
397 " %d votes against)", positive, negative)
398 if len(other_masters) > 1:
399 logging.critical("The other nodes do not agree on a single master")
401 # TODO: resync my files from the master
402 logging.critical("It seems the real master is %s",
403 other_masters.keys()[0])
405 logging.critical("Can't contact any node for data, aborting startup")
413 options, args = ParseOptions()
414 utils.debug = options.debug
417 ssconf.CheckMaster(options.debug)
419 # we believe we are the master, let's ask the other nodes...
420 if not CheckAgreement():
423 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
427 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
428 noclose_fds=[master.fileno()])
430 utils.WritePidFile(constants.MASTERD_PID)
432 logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
433 stderr_logging=not options.fork)
435 logging.info("ganeti master daemon startup")
438 master_node = ssconf.SimpleStore().GetMasterNode()
439 if not rpc.call_node_start_master(master_node, False):
440 logging.error("Can't activate master IP address")
444 master.serve_forever()
446 master.server_cleanup()
447 utils.RemovePidFile(constants.MASTERD_PID)
450 if __name__ == "__main__":