4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
41 from cStringIO import StringIO
42 from optparse import OptionParser
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
54 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
58 class IOServer(SocketServer.UnixStreamServer):
61 This class takes care of initializing the other threads, setting
62 signal handlers (which are processed only in this thread), and doing
66 QUEUE_PROCESSOR_SIZE = 1
68 def __init__(self, address, rqhandler):
69 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
71 self.queue = jqueue.QueueManager()
73 signal.signal(signal.SIGINT, self.handle_quit_signals)
74 signal.signal(signal.SIGTERM, self.handle_quit_signals)
76 def setup_processors(self):
77 """Spawn the processors threads.
79 This initializes the queue and the thread processors. It is done
80 separately from the constructor because we want the clone()
81 syscalls to happen after the daemonize part.
84 for i in range(self.QUEUE_PROCESSOR_SIZE):
85 self.processors.append(threading.Thread(target=PoolWorker,
86 args=(i, self.queue.new_queue)))
87 for t in self.processors:
90 def process_request_thread(self, request, client_address):
91 """Process the request.
93 This is copied from the code in ThreadingMixIn.
97 self.finish_request(request, client_address)
98 self.close_request(request)
100 self.handle_error(request, client_address)
101 self.close_request(request)
103 def process_request(self, request, client_address):
104 """Start a new thread to process the request.
106 This is copied from the coode in ThreadingMixIn.
109 t = threading.Thread(target=self.process_request_thread,
110 args=(request, client_address))
113 def handle_quit_signals(self, signum, frame):
114 print "received %s in %s" % (signum, frame)
117 def serve_forever(self):
118 """Handle one request at a time until told to quit."""
119 while not self.do_quit:
120 self.handle_request()
121 print "served request, quit=%s" % (self.do_quit)
123 def server_cleanup(self):
124 """Cleanup the server.
126 This involves shutting down the processor threads and the master
131 utils.RemoveFile(constants.MASTER_SOCKET)
132 for i in range(self.QUEUE_PROCESSOR_SIZE):
133 self.queue.new_queue.put(None)
134 for idx, t in enumerate(self.processors):
135 print "waiting for processor thread %s..." % idx
140 class ClientRqHandler(SocketServer.BaseRequestHandler):
147 self._msgs = collections.deque()
148 self._ops = ClientOps(self.server)
152 msg = self.read_message()
154 print "client closed connection"
156 request = simplejson.loads(msg)
157 if not isinstance(request, dict):
158 print "wrong request received: %s" % msg
160 method = request.get('request', None)
161 data = request.get('data', None)
162 if method is None or data is None:
163 print "no method or data in request"
165 print "request:", method, data
166 result = self._ops.handle_request(method, data)
167 print "result:", result
168 self.send_message(simplejson.dumps({'success': True, 'result': result}))
170 def read_message(self):
171 while not self._msgs:
172 data = self.request.recv(self.READ_SIZE)
175 new_msgs = (self._buffer + data).split(self.EOM)
176 self._buffer = new_msgs.pop()
177 self._msgs.extend(new_msgs)
178 return self._msgs.popleft()
180 def send_message(self, msg):
181 #print "sending", msg
182 self.request.sendall(msg + self.EOM)
186 """Class holding high-level client operations."""
187 def __init__(self, server):
192 if self._cpu is None:
193 self._cpu = mcpu.Processor(lambda x: None)
196 def handle_request(self, operation, args):
197 print operation, args
198 if operation == "submit":
199 return self.put(args)
200 elif operation == "query":
201 return self.query(args)
203 raise ValueError("Invalid operation")
206 job = luxi.UnserializeJob(args)
207 rid = self.server.queue.put(job)
210 def query(self, args):
211 path = args["object"]
212 fields = args["fields"]
213 names = args["names"]
214 if path == "instances":
215 opclass = opcodes.OpQueryInstances
217 # early exit because job query-ing is special (not via opcodes)
218 return self.query_jobs(fields, names)
220 raise ValueError("Invalid object %s" % path)
222 op = opclass(output_fields = fields, names=names)
224 result = cpu.ExecOpCode(op)
227 def query_jobs(self, fields, names):
228 return self.server.queue.query_jobs(fields, names)
231 def JobRunner(proc, job):
234 This functions processes a single job in the context of given
238 job.SetStatus(opcodes.Job.STATUS_RUNNING)
239 for op in job.data.op_list:
241 job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
244 def PoolWorker(worker_id, incoming_queue):
245 """A worker thread function.
247 This is the actual processor of a single thread of Job execution.
251 print "worker %s sleeping" % worker_id
252 item = incoming_queue.get(True)
255 print "worker %s processing job %s" % (worker_id, item.data.job_id)
258 proc = mcpu.Processor(feedback=lambda x: None)
260 JobRunner(proc, item)
261 except errors.GenericError, err:
262 print "ganeti exception %s" % err
266 print "worker %s finish job %s" % (worker_id, item.data.job_id)
267 print "worker %s exiting" % worker_id
270 def CheckMaster(debug):
271 """Checks the node setup.
273 If this is the master, the function will return. Otherwise it will
274 exit with an exit code based on the node status.
278 ss = ssconf.SimpleStore()
279 master_name = ss.GetMasterNode()
280 except errors.ConfigurationError, err:
281 print "Cluster configuration incomplete: '%s'" % str(err)
282 sys.exit(EXIT_NODESETUP_ERROR)
285 myself = utils.HostInfo()
286 except errors.ResolverError, err:
287 sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
288 sys.exit(EXIT_NODESETUP_ERROR)
290 if myself.name != master_name:
292 sys.stderr.write("Not master, exiting.\n")
293 sys.exit(EXIT_NOTMASTER)
297 """Parse the command line options.
300 (options, args) as from OptionParser.parse_args()
303 parser = OptionParser(description="Ganeti master daemon",
304 usage="%prog [-f] [-d]",
305 version="%%prog (ganeti) %s" %
306 constants.RELEASE_VERSION)
308 parser.add_option("-f", "--foreground", dest="fork",
309 help="Don't detach from the current terminal",
310 default=True, action="store_false")
311 parser.add_option("-d", "--debug", dest="debug",
312 help="Enable some debug messages",
313 default=False, action="store_true")
314 options, args = parser.parse_args()
321 options, args = ParseOptions()
322 utils.debug = options.debug
324 CheckMaster(options.debug)
326 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
330 utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
331 noclose_fds=[master.fileno()])
333 master.setup_processors()
335 master.serve_forever()
337 master.server_cleanup()
339 if __name__ == "__main__":