4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
40 from cStringIO import StringIO
41 from optparse import OptionParser
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
57 CLIENT_REQUEST_WORKERS = 16
59 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63 class ClientRequestWorker(workerpool.BaseWorker):
64 def RunTask(self, server, request, client_address):
65 """Process the request.
67 This is copied from the code in ThreadingMixIn.
71 server.finish_request(request, client_address)
72 server.close_request(request)
74 server.handle_error(request, client_address)
75 server.close_request(request)
78 class IOServer(SocketServer.UnixStreamServer):
81 This class takes care of initializing the other threads, setting
82 signal handlers (which are processed only in this thread), and doing
86 def __init__(self, address, rqhandler, context):
87 """IOServer constructor
90 address: the address to bind this IOServer to
91 rqhandler: RequestHandler type object
92 context: Context Object common to all worker threads
95 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96 self.context = context
98 # We'll only start threads once we've forked.
100 self.request_workers = None
102 def setup_queue(self):
103 self.jobqueue = jqueue.JobQueue(self.context)
104 self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
107 def process_request(self, request, client_address):
108 """Add task to workerpool to process request.
111 self.request_workers.AddTask(self, request, client_address)
113 def serve_forever(self):
114 """Handle one request at a time until told to quit."""
115 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
117 while not sighandler.called:
118 self.handle_request()
122 def server_cleanup(self):
123 """Cleanup the server.
125 This involves shutting down the processor threads and the master
131 utils.RemoveFile(constants.MASTER_SOCKET)
133 if self.request_workers:
134 self.request_workers.TerminateWorkers()
136 self.jobqueue.Shutdown()
139 class ClientRqHandler(SocketServer.BaseRequestHandler):
146 self._msgs = collections.deque()
147 self._ops = ClientOps(self.server)
151 msg = self.read_message()
153 logging.info("client closed connection")
156 request = simplejson.loads(msg)
157 logging.debug("request: %s", request)
158 if not isinstance(request, dict):
159 logging.error("wrong request received: %s", msg)
162 method = request.get(luxi.KEY_METHOD, None)
163 args = request.get(luxi.KEY_ARGS, None)
164 if method is None or args is None:
165 logging.error("no method or args in request")
170 result = self._ops.handle_request(method, args)
173 logging.error("Unexpected exception", exc_info=True)
175 result = "Caught exception: %s" % str(err[1])
178 luxi.KEY_SUCCESS: success,
179 luxi.KEY_RESULT: result,
181 logging.debug("response: %s", response)
182 self.send_message(simplejson.dumps(response))
184 def read_message(self):
185 while not self._msgs:
186 data = self.request.recv(self.READ_SIZE)
189 new_msgs = (self._buffer + data).split(self.EOM)
190 self._buffer = new_msgs.pop()
191 self._msgs.extend(new_msgs)
192 return self._msgs.popleft()
194 def send_message(self, msg):
195 #print "sending", msg
196 self.request.sendall(msg + self.EOM)
200 """Class holding high-level client operations."""
201 def __init__(self, server):
204 def handle_request(self, method, args):
205 queue = self.server.jobqueue
207 # TODO: Parameter validation
209 if method == luxi.REQ_SUBMIT_JOB:
210 ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211 # we need to compute the node list here, since from now on all
212 # operations require locks on the queue or the storage, and we
213 # shouldn't get another lock
214 node_list = self.server.context.cfg.GetNodeList()
215 return queue.SubmitJob(ops, node_list)
217 elif method == luxi.REQ_CANCEL_JOB:
219 return queue.CancelJob(job_id)
221 elif method == luxi.REQ_ARCHIVE_JOB:
223 return queue.ArchiveJob(job_id)
225 elif method == luxi.REQ_QUERY_JOBS:
226 (job_ids, fields) = args
227 return queue.QueryJobs(job_ids, fields)
230 raise ValueError("Invalid operation")
233 class GanetiContext(object):
234 """Context common to all ganeti threads.
236 This class creates and holds common objects shared by all threads.
242 """Constructs a new GanetiContext object.
244 There should be only a GanetiContext object at any time, so this
245 function raises an error if this is not the case.
248 assert self.__class__._instance is None, "double GanetiContext instance"
250 # Create a ConfigWriter...
251 self.cfg = config.ConfigWriter()
252 # And a GanetiLockingManager...
253 self.glm = locking.GanetiLockManager(
254 self.cfg.GetNodeList(),
255 self.cfg.GetInstanceList())
257 # setting this also locks the class against attribute modifications
258 self.__class__._instance = self
260 def __setattr__(self, name, value):
261 """Setting GanetiContext attributes is forbidden after initialization.
264 assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
265 object.__setattr__(self, name, value)
268 def CheckMaster(debug):
269 """Checks the node setup.
271 If this is the master, the function will return. Otherwise it will
272 exit with an exit code based on the node status.
276 ss = ssconf.SimpleStore()
277 master_name = ss.GetMasterNode()
278 except errors.ConfigurationError, err:
279 print "Cluster configuration incomplete: '%s'" % str(err)
280 sys.exit(EXIT_NODESETUP_ERROR)
283 myself = utils.HostInfo()
284 except errors.ResolverError, err:
285 sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
286 sys.exit(EXIT_NODESETUP_ERROR)
288 if myself.name != master_name:
290 sys.stderr.write("Not master, exiting.\n")
291 sys.exit(EXIT_NOTMASTER)
295 """Parse the command line options.
298 (options, args) as from OptionParser.parse_args()
301 parser = OptionParser(description="Ganeti master daemon",
302 usage="%prog [-f] [-d]",
303 version="%%prog (ganeti) %s" %
304 constants.RELEASE_VERSION)
306 parser.add_option("-f", "--foreground", dest="fork",
307 help="Don't detach from the current terminal",
308 default=True, action="store_false")
309 parser.add_option("-d", "--debug", dest="debug",
310 help="Enable some debug messages",
311 default=False, action="store_true")
312 options, args = parser.parse_args()
319 options, args = ParseOptions()
320 utils.debug = options.debug
323 CheckMaster(options.debug)
325 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
329 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
330 noclose_fds=[master.fileno()])
332 logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
333 stderr_logging=not options.fork)
335 logging.info("ganeti master daemon startup")
339 master.serve_forever()
341 master.server_cleanup()
344 if __name__ == "__main__":