Implement forking/master role checking in masterd
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import threading
33 import time
34 import collections
35 import Queue
36 import random
37 import signal
38 import simplejson
39
40
41 from cStringIO import StringIO
42 from optparse import OptionParser
43
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
52
53
54 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56
57
58 class IOServer(SocketServer.UnixStreamServer):
59   """IO thread class.
60
61   This class takes care of initializing the other threads, setting
62   signal handlers (which are processed only in this thread), and doing
63   cleanup at shutdown.
64
65   """
66   QUEUE_PROCESSOR_SIZE = 1
67
68   def __init__(self, address, rqhandler):
69     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
70     self.do_quit = False
71     self.queue = jqueue.QueueManager()
72     self.processors = []
73     signal.signal(signal.SIGINT, self.handle_quit_signals)
74     signal.signal(signal.SIGTERM, self.handle_quit_signals)
75
76   def setup_processors(self):
77     """Spawn the processors threads.
78
79     This initializes the queue and the thread processors. It is done
80     separately from the constructor because we want the clone()
81     syscalls to happen after the daemonize part.
82
83     """
84     for i in range(self.QUEUE_PROCESSOR_SIZE):
85       self.processors.append(threading.Thread(target=PoolWorker,
86                                               args=(i, self.queue.new_queue)))
87     for t in self.processors:
88       t.start()
89
90   def process_request_thread(self, request, client_address):
91     """Process the request.
92
93     This is copied from the code in ThreadingMixIn.
94
95     """
96     try:
97       self.finish_request(request, client_address)
98       self.close_request(request)
99     except:
100       self.handle_error(request, client_address)
101       self.close_request(request)
102
103   def process_request(self, request, client_address):
104     """Start a new thread to process the request.
105
106     This is copied from the coode in ThreadingMixIn.
107
108     """
109     t = threading.Thread(target=self.process_request_thread,
110                          args=(request, client_address))
111     t.start()
112
113   def handle_quit_signals(self, signum, frame):
114     print "received %s in %s" % (signum, frame)
115     self.do_quit = True
116
117   def serve_forever(self):
118     """Handle one request at a time until told to quit."""
119     while not self.do_quit:
120       self.handle_request()
121       print "served request, quit=%s" % (self.do_quit)
122
123   def server_cleanup(self):
124     """Cleanup the server.
125
126     This involves shutting down the processor threads and the master
127     socket.
128
129     """
130     self.server_close()
131     utils.RemoveFile(constants.MASTER_SOCKET)
132     for i in range(self.QUEUE_PROCESSOR_SIZE):
133       self.queue.new_queue.put(None)
134     for idx, t in enumerate(self.processors):
135       print "waiting for processor thread %s..." % idx
136       t.join()
137     print "done threads"
138
139
140 class ClientRqHandler(SocketServer.BaseRequestHandler):
141   """Client handler"""
142   EOM = '\3'
143   READ_SIZE = 4096
144
145   def setup(self):
146     self._buffer = ""
147     self._msgs = collections.deque()
148     self._ops = ClientOps(self.server)
149
150   def handle(self):
151     while True:
152       msg = self.read_message()
153       if msg is None:
154         print "client closed connection"
155         break
156       request = simplejson.loads(msg)
157       if not isinstance(request, dict):
158         print "wrong request received: %s" % msg
159         break
160       method = request.get('request', None)
161       data = request.get('data', None)
162       if method is None or data is None:
163         print "no method or data in request"
164         break
165       print "request:", method, data
166       result = self._ops.handle_request(method, data)
167       print "result:", result
168       self.send_message(simplejson.dumps({'success': True, 'result': result}))
169
170   def read_message(self):
171     while not self._msgs:
172       data = self.request.recv(self.READ_SIZE)
173       if not data:
174         return None
175       new_msgs = (self._buffer + data).split(self.EOM)
176       self._buffer = new_msgs.pop()
177       self._msgs.extend(new_msgs)
178     return self._msgs.popleft()
179
180   def send_message(self, msg):
181     #print "sending", msg
182     self.request.sendall(msg + self.EOM)
183
184
185 class ClientOps:
186   """Class holding high-level client operations."""
187   def __init__(self, server):
188     self.server = server
189     self._cpu = None
190
191   def _getcpu(self):
192     if self._cpu is None:
193       self._cpu = mcpu.Processor(lambda x: None)
194     return self._cpu
195
196   def handle_request(self, operation, args):
197     print operation, args
198     if operation == "submit":
199       return self.put(args)
200     elif operation == "query":
201       return self.query(args)
202     else:
203       raise ValueError("Invalid operation")
204
205   def put(self, args):
206     job = luxi.UnserializeJob(args)
207     rid = self.server.queue.put(job)
208     return rid
209
210   def query(self, args):
211     path = args["object"]
212     fields = args["fields"]
213     names = args["names"]
214     if path == "instances":
215       opclass = opcodes.OpQueryInstances
216     elif path == "jobs":
217       # early exit because job query-ing is special (not via opcodes)
218       return self.query_jobs(fields, names)
219     else:
220       raise ValueError("Invalid object %s" % path)
221
222     op = opclass(output_fields = fields, names=names)
223     cpu = self._getcpu()
224     result = cpu.ExecOpCode(op)
225     return result
226
227   def query_jobs(self, fields, names):
228     return self.server.queue.query_jobs(fields, names)
229
230
231 def JobRunner(proc, job):
232   """Job executor.
233
234   This functions processes a single job in the context of given
235   processor instance.
236
237   """
238   job.SetStatus(opcodes.Job.STATUS_RUNNING)
239   for op in job.data.op_list:
240     proc.ExecOpCode(op)
241   job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
242
243
244 def PoolWorker(worker_id, incoming_queue):
245   """A worker thread function.
246
247   This is the actual processor of a single thread of Job execution.
248
249   """
250   while True:
251     print "worker %s sleeping" % worker_id
252     item = incoming_queue.get(True)
253     if item is None:
254       break
255     print "worker %s processing job %s" % (worker_id, item.data.job_id)
256     utils.Lock('cmd')
257     try:
258       proc = mcpu.Processor(feedback=lambda x: None)
259       try:
260         JobRunner(proc, item)
261       except errors.GenericError, err:
262         print "ganeti exception %s" % err
263     finally:
264       utils.Unlock('cmd')
265       utils.LockCleanup()
266     print "worker %s finish job %s" % (worker_id, item.data.job_id)
267   print "worker %s exiting" % worker_id
268
269
270 def CheckMaster(debug):
271   """Checks the node setup.
272
273   If this is the master, the function will return. Otherwise it will
274   exit with an exit code based on the node status.
275
276   """
277   try:
278     ss = ssconf.SimpleStore()
279     master_name = ss.GetMasterNode()
280   except errors.ConfigurationError, err:
281     print "Cluster configuration incomplete: '%s'" % str(err)
282     sys.exit(EXIT_NODESETUP_ERROR)
283
284   try:
285     myself = utils.HostInfo()
286   except errors.ResolverError, err:
287     sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
288     sys.exit(EXIT_NODESETUP_ERROR)
289
290   if myself.name != master_name:
291     if debug:
292       sys.stderr.write("Not master, exiting.\n")
293     sys.exit(EXIT_NOTMASTER)
294
295
296 def ParseOptions():
297   """Parse the command line options.
298
299   Returns:
300     (options, args) as from OptionParser.parse_args()
301
302   """
303   parser = OptionParser(description="Ganeti master daemon",
304                         usage="%prog [-f] [-d]",
305                         version="%%prog (ganeti) %s" %
306                         constants.RELEASE_VERSION)
307
308   parser.add_option("-f", "--foreground", dest="fork",
309                     help="Don't detach from the current terminal",
310                     default=True, action="store_false")
311   parser.add_option("-d", "--debug", dest="debug",
312                     help="Enable some debug messages",
313                     default=False, action="store_true")
314   options, args = parser.parse_args()
315   return options, args
316
317
318 def main():
319   """Main function"""
320
321   options, args = ParseOptions()
322   utils.debug = options.debug
323
324   CheckMaster(options.debug)
325
326   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
327
328   # become a daemon
329   if options.fork:
330     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
331                     noclose_fds=[master.fileno()])
332
333   master.setup_processors()
334   try:
335     master.serve_forever()
336   finally:
337     master.server_cleanup()
338
339 if __name__ == "__main__":
340   main()