4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
41 from cStringIO import StringIO
42 from optparse import OptionParser
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
54 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
58 class IOServer(SocketServer.UnixStreamServer):
61 This class takes care of initializing the other threads, setting
62 signal handlers (which are processed only in this thread), and doing
66 QUEUE_PROCESSOR_SIZE = 1
68 def __init__(self, address, rqhandler):
69 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
71 self.queue = jqueue.QueueManager()
73 signal.signal(signal.SIGINT, self.handle_quit_signals)
74 signal.signal(signal.SIGTERM, self.handle_quit_signals)
76 def setup_processors(self):
77 """Spawn the processors threads.
79 This initializes the queue and the thread processors. It is done
80 separately from the constructor because we want the clone()
81 syscalls to happen after the daemonize part.
84 for i in range(self.QUEUE_PROCESSOR_SIZE):
85 self.processors.append(threading.Thread(target=PoolWorker,
86 args=(i, self.queue.new_queue)))
87 for t in self.processors:
90 def process_request_thread(self, request, client_address):
91 """Process the request.
93 This is copied from the code in ThreadingMixIn.
97 self.finish_request(request, client_address)
98 self.close_request(request)
100 self.handle_error(request, client_address)
101 self.close_request(request)
103 def process_request(self, request, client_address):
104 """Start a new thread to process the request.
106 This is copied from the coode in ThreadingMixIn.
109 t = threading.Thread(target=self.process_request_thread,
110 args=(request, client_address))
113 def handle_quit_signals(self, signum, frame):
114 print "received %s in %s" % (signum, frame)
117 def serve_forever(self):
118 """Handle one request at a time until told to quit."""
119 while not self.do_quit:
120 self.handle_request()
121 print "served request, quit=%s" % (self.do_quit)
123 def server_cleanup(self):
124 """Cleanup the server.
126 This involves shutting down the processor threads and the master
131 utils.RemoveFile(constants.MASTER_SOCKET)
132 for i in range(self.QUEUE_PROCESSOR_SIZE):
133 self.queue.new_queue.put(None)
134 for idx, t in enumerate(self.processors):
135 print "waiting for processor thread %s..." % idx
140 class ClientRqHandler(SocketServer.BaseRequestHandler):
147 self._msgs = collections.deque()
148 self._ops = ClientOps(self.server)
152 msg = self.read_message()
154 print "client closed connection"
156 request = simplejson.loads(msg)
157 if not isinstance(request, dict):
158 print "wrong request received: %s" % msg
160 method = request.get('request', None)
161 data = request.get('data', None)
162 if method is None or data is None:
163 print "no method or data in request"
165 print "request:", method, data
166 result = self._ops.handle_request(method, data)
167 print "result:", result
168 self.send_message(simplejson.dumps({'success': True, 'result': result}))
170 def read_message(self):
171 while not self._msgs:
172 data = self.request.recv(self.READ_SIZE)
175 new_msgs = (self._buffer + data).split(self.EOM)
176 self._buffer = new_msgs.pop()
177 self._msgs.extend(new_msgs)
178 return self._msgs.popleft()
180 def send_message(self, msg):
181 #print "sending", msg
182 self.request.sendall(msg + self.EOM)
186 """Class holding high-level client operations."""
187 def __init__(self, server):
192 if self._cpu is None:
193 self._cpu = mcpu.Processor(lambda x: None)
196 def handle_request(self, operation, args):
197 print operation, args
198 if operation == "submit":
199 return self.put(args)
200 elif operation == "query":
201 return self.query(args)
203 raise ValueError("Invalid operation")
206 job = luxi.UnserializeJob(args)
207 rid = self.server.queue.put(job)
210 def query(self, args):
211 path = args["object"]
212 fields = args["fields"]
213 names = args["names"]
214 if path == "instances":
215 opclass = opcodes.OpQueryInstances
217 # early exit because job query-ing is special (not via opcodes)
218 return self.query_jobs(fields, names)
220 raise ValueError("Invalid object %s" % path)
222 op = opclass(output_fields = fields, names=names)
224 result = cpu.ExecOpCode(op)
227 def query_jobs(self, fields, names):
228 return self.server.queue.query_jobs(fields, names)
231 def JobRunner(proc, job):
234 This functions processes a single job in the context of given
238 job.SetStatus(opcodes.Job.STATUS_RUNNING)
240 for idx, op in enumerate(job.data.op_list):
241 job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
243 job.data.op_result[idx] = proc.ExecOpCode(op)
244 job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
245 except (errors.OpPrereqError, errors.OpExecError), err:
247 job.data.op_result[idx] = str(err)
248 job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
250 job.SetStatus(opcodes.Job.STATUS_FAIL)
252 job.SetStatus(opcodes.Job.STATUS_SUCCESS)
255 def PoolWorker(worker_id, incoming_queue):
256 """A worker thread function.
258 This is the actual processor of a single thread of Job execution.
262 print "worker %s sleeping" % worker_id
263 item = incoming_queue.get(True)
266 print "worker %s processing job %s" % (worker_id, item.data.job_id)
269 proc = mcpu.Processor(feedback=lambda x: None)
271 JobRunner(proc, item)
272 except errors.GenericError, err:
273 print "ganeti exception %s" % err
278 print "worker %s finish job %s" % (worker_id, item.data.job_id)
279 print "worker %s exiting" % worker_id
282 def CheckMaster(debug):
283 """Checks the node setup.
285 If this is the master, the function will return. Otherwise it will
286 exit with an exit code based on the node status.
290 ss = ssconf.SimpleStore()
291 master_name = ss.GetMasterNode()
292 except errors.ConfigurationError, err:
293 print "Cluster configuration incomplete: '%s'" % str(err)
294 sys.exit(EXIT_NODESETUP_ERROR)
297 myself = utils.HostInfo()
298 except errors.ResolverError, err:
299 sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
300 sys.exit(EXIT_NODESETUP_ERROR)
302 if myself.name != master_name:
304 sys.stderr.write("Not master, exiting.\n")
305 sys.exit(EXIT_NOTMASTER)
309 """Parse the command line options.
312 (options, args) as from OptionParser.parse_args()
315 parser = OptionParser(description="Ganeti master daemon",
316 usage="%prog [-f] [-d]",
317 version="%%prog (ganeti) %s" %
318 constants.RELEASE_VERSION)
320 parser.add_option("-f", "--foreground", dest="fork",
321 help="Don't detach from the current terminal",
322 default=True, action="store_false")
323 parser.add_option("-d", "--debug", dest="debug",
324 help="Enable some debug messages",
325 default=False, action="store_true")
326 options, args = parser.parse_args()
333 options, args = ParseOptions()
334 utils.debug = options.debug
336 CheckMaster(options.debug)
338 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
342 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
343 noclose_fds=[master.fileno()])
345 master.setup_processors()
347 master.serve_forever()
349 master.server_cleanup()
351 if __name__ == "__main__":