Handle any exception in ganeti-masterd
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import threading
33 import time
34 import collections
35 import Queue
36 import random
37 import signal
38 import simplejson
39
40
41 from cStringIO import StringIO
42 from optparse import OptionParser
43
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
52
53
54 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56
57
58 class IOServer(SocketServer.UnixStreamServer):
59   """IO thread class.
60
61   This class takes care of initializing the other threads, setting
62   signal handlers (which are processed only in this thread), and doing
63   cleanup at shutdown.
64
65   """
66   QUEUE_PROCESSOR_SIZE = 1
67
68   def __init__(self, address, rqhandler):
69     """IOServer constructor
70
71     Args:
72       address: the address to bind this IOServer to
73       rqhandler: RequestHandler type object
74
75     """
76     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
77     self.do_quit = False
78     self.queue = jqueue.QueueManager()
79     self.processors = []
80     signal.signal(signal.SIGINT, self.handle_quit_signals)
81     signal.signal(signal.SIGTERM, self.handle_quit_signals)
82
83   def setup_processors(self):
84     """Spawn the processors threads.
85
86     This initializes the queue and the thread processors. It is done
87     separately from the constructor because we want the clone()
88     syscalls to happen after the daemonize part.
89
90     """
91     for i in range(self.QUEUE_PROCESSOR_SIZE):
92       self.processors.append(threading.Thread(target=PoolWorker,
93                                               args=(i, self.queue.new_queue)))
94     for t in self.processors:
95       t.start()
96
97   def process_request_thread(self, request, client_address):
98     """Process the request.
99
100     This is copied from the code in ThreadingMixIn.
101
102     """
103     try:
104       self.finish_request(request, client_address)
105       self.close_request(request)
106     except:
107       self.handle_error(request, client_address)
108       self.close_request(request)
109
110   def process_request(self, request, client_address):
111     """Start a new thread to process the request.
112
113     This is copied from the coode in ThreadingMixIn.
114
115     """
116     t = threading.Thread(target=self.process_request_thread,
117                          args=(request, client_address))
118     t.start()
119
120   def handle_quit_signals(self, signum, frame):
121     print "received %s in %s" % (signum, frame)
122     self.do_quit = True
123
124   def serve_forever(self):
125     """Handle one request at a time until told to quit."""
126     while not self.do_quit:
127       self.handle_request()
128       print "served request, quit=%s" % (self.do_quit)
129
130   def server_cleanup(self):
131     """Cleanup the server.
132
133     This involves shutting down the processor threads and the master
134     socket.
135
136     """
137     self.server_close()
138     utils.RemoveFile(constants.MASTER_SOCKET)
139     for i in range(self.QUEUE_PROCESSOR_SIZE):
140       self.queue.new_queue.put(None)
141     for idx, t in enumerate(self.processors):
142       print "waiting for processor thread %s..." % idx
143       t.join()
144     print "done threads"
145
146
147 class ClientRqHandler(SocketServer.BaseRequestHandler):
148   """Client handler"""
149   EOM = '\3'
150   READ_SIZE = 4096
151
152   def setup(self):
153     self._buffer = ""
154     self._msgs = collections.deque()
155     self._ops = ClientOps(self.server)
156
157   def handle(self):
158     while True:
159       msg = self.read_message()
160       if msg is None:
161         print "client closed connection"
162         break
163       request = simplejson.loads(msg)
164       if not isinstance(request, dict):
165         print "wrong request received: %s" % msg
166         break
167       method = request.get('request', None)
168       data = request.get('data', None)
169       if method is None or data is None:
170         print "no method or data in request"
171         break
172       print "request:", method, data
173       result = self._ops.handle_request(method, data)
174       print "result:", result
175       self.send_message(simplejson.dumps({'success': True, 'result': result}))
176
177   def read_message(self):
178     while not self._msgs:
179       data = self.request.recv(self.READ_SIZE)
180       if not data:
181         return None
182       new_msgs = (self._buffer + data).split(self.EOM)
183       self._buffer = new_msgs.pop()
184       self._msgs.extend(new_msgs)
185     return self._msgs.popleft()
186
187   def send_message(self, msg):
188     #print "sending", msg
189     self.request.sendall(msg + self.EOM)
190
191
192 class ClientOps:
193   """Class holding high-level client operations."""
194   def __init__(self, server):
195     self.server = server
196     self._cpu = None
197
198   def _getcpu(self):
199     if self._cpu is None:
200       self._cpu = mcpu.Processor(lambda x: None)
201     return self._cpu
202
203   def handle_request(self, operation, args):
204     print operation, args
205     if operation == "submit":
206       return self.put(args)
207     elif operation == "query":
208       return self.query(args)
209     else:
210       raise ValueError("Invalid operation")
211
212   def put(self, args):
213     job = luxi.UnserializeJob(args)
214     rid = self.server.queue.put(job)
215     return rid
216
217   def query(self, args):
218     path = args["object"]
219     fields = args["fields"]
220     names = args["names"]
221     if path == "instances":
222       opclass = opcodes.OpQueryInstances
223     elif path == "jobs":
224       # early exit because job query-ing is special (not via opcodes)
225       return self.query_jobs(fields, names)
226     else:
227       raise ValueError("Invalid object %s" % path)
228
229     op = opclass(output_fields = fields, names=names)
230     cpu = self._getcpu()
231     result = cpu.ExecOpCode(op)
232     return result
233
234   def query_jobs(self, fields, names):
235     return self.server.queue.query_jobs(fields, names)
236
237
238 def JobRunner(proc, job):
239   """Job executor.
240
241   This functions processes a single job in the context of given
242   processor instance.
243
244   Args:
245     proc: Ganeti Processor to run the job on
246     job: The job to run (unserialized format)
247
248   """
249   job.SetStatus(opcodes.Job.STATUS_RUNNING)
250   fail = False
251   for idx, op in enumerate(job.data.op_list):
252     job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
253     try:
254       job.data.op_result[idx] = proc.ExecOpCode(op)
255       job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
256     except (errors.OpPrereqError, errors.OpExecError), err:
257       fail = True
258       job.data.op_result[idx] = str(err)
259       job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
260   if fail:
261     job.SetStatus(opcodes.Job.STATUS_FAIL)
262   else:
263     job.SetStatus(opcodes.Job.STATUS_SUCCESS)
264
265
266 def PoolWorker(worker_id, incoming_queue):
267   """A worker thread function.
268
269   This is the actual processor of a single thread of Job execution.
270
271   Args:
272     worker_id: the unique id for this worker
273     incoming_queue: a queue to get jobs from
274
275   """
276   while True:
277     print "worker %s sleeping" % worker_id
278     item = incoming_queue.get(True)
279     if item is None:
280       break
281     print "worker %s processing job %s" % (worker_id, item.data.job_id)
282     #utils.Lock('cmd')
283     try:
284       proc = mcpu.Processor(feedback=lambda x: None)
285       try:
286         JobRunner(proc, item)
287       except errors.GenericError, err:
288         msg = "ganeti exception %s" % err
289         item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
290         print msg
291       except Exception, err:
292         msg = "unhandled exception %s" % err
293         item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
294         print msg
295       except:
296         msg = "unhandled unknown exception"
297         item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
298         print msg
299
300     finally:
301       #utils.Unlock('cmd')
302       #utils.LockCleanup()
303       pass
304     print "worker %s finish job %s" % (worker_id, item.data.job_id)
305   print "worker %s exiting" % worker_id
306
307
308 def CheckMaster(debug):
309   """Checks the node setup.
310
311   If this is the master, the function will return. Otherwise it will
312   exit with an exit code based on the node status.
313
314   """
315   try:
316     ss = ssconf.SimpleStore()
317     master_name = ss.GetMasterNode()
318   except errors.ConfigurationError, err:
319     print "Cluster configuration incomplete: '%s'" % str(err)
320     sys.exit(EXIT_NODESETUP_ERROR)
321
322   try:
323     myself = utils.HostInfo()
324   except errors.ResolverError, err:
325     sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
326     sys.exit(EXIT_NODESETUP_ERROR)
327
328   if myself.name != master_name:
329     if debug:
330       sys.stderr.write("Not master, exiting.\n")
331     sys.exit(EXIT_NOTMASTER)
332
333
334 def ParseOptions():
335   """Parse the command line options.
336
337   Returns:
338     (options, args) as from OptionParser.parse_args()
339
340   """
341   parser = OptionParser(description="Ganeti master daemon",
342                         usage="%prog [-f] [-d]",
343                         version="%%prog (ganeti) %s" %
344                         constants.RELEASE_VERSION)
345
346   parser.add_option("-f", "--foreground", dest="fork",
347                     help="Don't detach from the current terminal",
348                     default=True, action="store_false")
349   parser.add_option("-d", "--debug", dest="debug",
350                     help="Enable some debug messages",
351                     default=False, action="store_true")
352   options, args = parser.parse_args()
353   return options, args
354
355
356 def main():
357   """Main function"""
358
359   options, args = ParseOptions()
360   utils.debug = options.debug
361   utils.no_fork = True
362
363   CheckMaster(options.debug)
364
365   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
366
367   # become a daemon
368   if options.fork:
369     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
370                     noclose_fds=[master.fileno()])
371
372   try:
373     utils.Lock('cmd', debug=options.debug)
374   except errors.LockError, err:
375     print >> sys.stderr, str(err)
376     master.server_cleanup()
377     return
378
379   try:
380     master.setup_processors()
381     try:
382       master.serve_forever()
383     finally:
384       master.server_cleanup()
385   finally:
386     utils.Unlock('cmd')
387     utils.LockCleanup()
388
389
390 if __name__ == "__main__":
391   main()