4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
41 from cStringIO import StringIO
42 from optparse import OptionParser
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
54 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
58 class IOServer(SocketServer.UnixStreamServer):
61 This class takes care of initializing the other threads, setting
62 signal handlers (which are processed only in this thread), and doing
66 QUEUE_PROCESSOR_SIZE = 1
68 def __init__(self, address, rqhandler):
69 """IOServer constructor
72 address: the address to bind this IOServer to
73 rqhandler: RequestHandler type object
76 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
78 self.queue = jqueue.QueueManager()
80 signal.signal(signal.SIGINT, self.handle_quit_signals)
81 signal.signal(signal.SIGTERM, self.handle_quit_signals)
83 def setup_processors(self):
84 """Spawn the processors threads.
86 This initializes the queue and the thread processors. It is done
87 separately from the constructor because we want the clone()
88 syscalls to happen after the daemonize part.
91 for i in range(self.QUEUE_PROCESSOR_SIZE):
92 self.processors.append(threading.Thread(target=PoolWorker,
93 args=(i, self.queue.new_queue)))
94 for t in self.processors:
97 def process_request_thread(self, request, client_address):
98 """Process the request.
100 This is copied from the code in ThreadingMixIn.
104 self.finish_request(request, client_address)
105 self.close_request(request)
107 self.handle_error(request, client_address)
108 self.close_request(request)
110 def process_request(self, request, client_address):
111 """Start a new thread to process the request.
113 This is copied from the coode in ThreadingMixIn.
116 t = threading.Thread(target=self.process_request_thread,
117 args=(request, client_address))
120 def handle_quit_signals(self, signum, frame):
121 print "received %s in %s" % (signum, frame)
124 def serve_forever(self):
125 """Handle one request at a time until told to quit."""
126 while not self.do_quit:
127 self.handle_request()
128 print "served request, quit=%s" % (self.do_quit)
130 def server_cleanup(self):
131 """Cleanup the server.
133 This involves shutting down the processor threads and the master
138 utils.RemoveFile(constants.MASTER_SOCKET)
139 for i in range(self.QUEUE_PROCESSOR_SIZE):
140 self.queue.new_queue.put(None)
141 for idx, t in enumerate(self.processors):
142 print "waiting for processor thread %s..." % idx
147 class ClientRqHandler(SocketServer.BaseRequestHandler):
154 self._msgs = collections.deque()
155 self._ops = ClientOps(self.server)
159 msg = self.read_message()
161 print "client closed connection"
163 request = simplejson.loads(msg)
164 if not isinstance(request, dict):
165 print "wrong request received: %s" % msg
167 method = request.get('request', None)
168 data = request.get('data', None)
169 if method is None or data is None:
170 print "no method or data in request"
172 print "request:", method, data
173 result = self._ops.handle_request(method, data)
174 print "result:", result
175 self.send_message(simplejson.dumps({'success': True, 'result': result}))
177 def read_message(self):
178 while not self._msgs:
179 data = self.request.recv(self.READ_SIZE)
182 new_msgs = (self._buffer + data).split(self.EOM)
183 self._buffer = new_msgs.pop()
184 self._msgs.extend(new_msgs)
185 return self._msgs.popleft()
187 def send_message(self, msg):
188 #print "sending", msg
189 self.request.sendall(msg + self.EOM)
193 """Class holding high-level client operations."""
194 def __init__(self, server):
199 if self._cpu is None:
200 self._cpu = mcpu.Processor(lambda x: None)
203 def handle_request(self, operation, args):
204 print operation, args
205 if operation == "submit":
206 return self.put(args)
207 elif operation == "query":
208 return self.query(args)
210 raise ValueError("Invalid operation")
213 job = luxi.UnserializeJob(args)
214 rid = self.server.queue.put(job)
217 def query(self, args):
218 path = args["object"]
219 fields = args["fields"]
220 names = args["names"]
221 if path == "instances":
222 opclass = opcodes.OpQueryInstances
224 # early exit because job query-ing is special (not via opcodes)
225 return self.query_jobs(fields, names)
227 raise ValueError("Invalid object %s" % path)
229 op = opclass(output_fields = fields, names=names)
231 result = cpu.ExecOpCode(op)
234 def query_jobs(self, fields, names):
235 return self.server.queue.query_jobs(fields, names)
238 def JobRunner(proc, job):
241 This functions processes a single job in the context of given
245 proc: Ganeti Processor to run the job on
246 job: The job to run (unserialized format)
249 job.SetStatus(opcodes.Job.STATUS_RUNNING)
251 for idx, op in enumerate(job.data.op_list):
252 job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
254 job.data.op_result[idx] = proc.ExecOpCode(op)
255 job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
256 except (errors.OpPrereqError, errors.OpExecError), err:
258 job.data.op_result[idx] = str(err)
259 job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
261 job.SetStatus(opcodes.Job.STATUS_FAIL)
263 job.SetStatus(opcodes.Job.STATUS_SUCCESS)
266 def PoolWorker(worker_id, incoming_queue):
267 """A worker thread function.
269 This is the actual processor of a single thread of Job execution.
272 worker_id: the unique id for this worker
273 incoming_queue: a queue to get jobs from
277 print "worker %s sleeping" % worker_id
278 item = incoming_queue.get(True)
281 print "worker %s processing job %s" % (worker_id, item.data.job_id)
284 proc = mcpu.Processor(feedback=lambda x: None)
286 JobRunner(proc, item)
287 except errors.GenericError, err:
288 msg = "ganeti exception %s" % err
289 item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
291 except Exception, err:
292 msg = "unhandled exception %s" % err
293 item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
296 msg = "unhandled unknown exception"
297 item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
304 print "worker %s finish job %s" % (worker_id, item.data.job_id)
305 print "worker %s exiting" % worker_id
308 def CheckMaster(debug):
309 """Checks the node setup.
311 If this is the master, the function will return. Otherwise it will
312 exit with an exit code based on the node status.
316 ss = ssconf.SimpleStore()
317 master_name = ss.GetMasterNode()
318 except errors.ConfigurationError, err:
319 print "Cluster configuration incomplete: '%s'" % str(err)
320 sys.exit(EXIT_NODESETUP_ERROR)
323 myself = utils.HostInfo()
324 except errors.ResolverError, err:
325 sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
326 sys.exit(EXIT_NODESETUP_ERROR)
328 if myself.name != master_name:
330 sys.stderr.write("Not master, exiting.\n")
331 sys.exit(EXIT_NOTMASTER)
335 """Parse the command line options.
338 (options, args) as from OptionParser.parse_args()
341 parser = OptionParser(description="Ganeti master daemon",
342 usage="%prog [-f] [-d]",
343 version="%%prog (ganeti) %s" %
344 constants.RELEASE_VERSION)
346 parser.add_option("-f", "--foreground", dest="fork",
347 help="Don't detach from the current terminal",
348 default=True, action="store_false")
349 parser.add_option("-d", "--debug", dest="debug",
350 help="Enable some debug messages",
351 default=False, action="store_true")
352 options, args = parser.parse_args()
359 options, args = ParseOptions()
360 utils.debug = options.debug
363 CheckMaster(options.debug)
365 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
369 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
370 noclose_fds=[master.fileno()])
373 utils.Lock('cmd', debug=options.debug)
374 except errors.LockError, err:
375 print >> sys.stderr, str(err)
376 master.server_cleanup()
380 master.setup_processors()
382 master.serve_forever()
384 master.server_cleanup()
390 if __name__ == "__main__":