Convert cli.SubmitOpCode to use the master
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import threading
33 import time
34 import collections
35 import Queue
36 import random
37 import signal
38 import simplejson
39
40
41 from cStringIO import StringIO
42 from optparse import OptionParser
43
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
52
53
54 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56
57
58 class IOServer(SocketServer.UnixStreamServer):
59   """IO thread class.
60
61   This class takes care of initializing the other threads, setting
62   signal handlers (which are processed only in this thread), and doing
63   cleanup at shutdown.
64
65   """
66   QUEUE_PROCESSOR_SIZE = 1
67
68   def __init__(self, address, rqhandler):
69     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
70     self.do_quit = False
71     self.queue = jqueue.QueueManager()
72     self.processors = []
73     signal.signal(signal.SIGINT, self.handle_quit_signals)
74     signal.signal(signal.SIGTERM, self.handle_quit_signals)
75
76   def setup_processors(self):
77     """Spawn the processors threads.
78
79     This initializes the queue and the thread processors. It is done
80     separately from the constructor because we want the clone()
81     syscalls to happen after the daemonize part.
82
83     """
84     for i in range(self.QUEUE_PROCESSOR_SIZE):
85       self.processors.append(threading.Thread(target=PoolWorker,
86                                               args=(i, self.queue.new_queue)))
87     for t in self.processors:
88       t.start()
89
90   def process_request_thread(self, request, client_address):
91     """Process the request.
92
93     This is copied from the code in ThreadingMixIn.
94
95     """
96     try:
97       self.finish_request(request, client_address)
98       self.close_request(request)
99     except:
100       self.handle_error(request, client_address)
101       self.close_request(request)
102
103   def process_request(self, request, client_address):
104     """Start a new thread to process the request.
105
106     This is copied from the coode in ThreadingMixIn.
107
108     """
109     t = threading.Thread(target=self.process_request_thread,
110                          args=(request, client_address))
111     t.start()
112
113   def handle_quit_signals(self, signum, frame):
114     print "received %s in %s" % (signum, frame)
115     self.do_quit = True
116
117   def serve_forever(self):
118     """Handle one request at a time until told to quit."""
119     while not self.do_quit:
120       self.handle_request()
121       print "served request, quit=%s" % (self.do_quit)
122
123   def server_cleanup(self):
124     """Cleanup the server.
125
126     This involves shutting down the processor threads and the master
127     socket.
128
129     """
130     self.server_close()
131     utils.RemoveFile(constants.MASTER_SOCKET)
132     for i in range(self.QUEUE_PROCESSOR_SIZE):
133       self.queue.new_queue.put(None)
134     for idx, t in enumerate(self.processors):
135       print "waiting for processor thread %s..." % idx
136       t.join()
137     print "done threads"
138
139
140 class ClientRqHandler(SocketServer.BaseRequestHandler):
141   """Client handler"""
142   EOM = '\3'
143   READ_SIZE = 4096
144
145   def setup(self):
146     self._buffer = ""
147     self._msgs = collections.deque()
148     self._ops = ClientOps(self.server)
149
150   def handle(self):
151     while True:
152       msg = self.read_message()
153       if msg is None:
154         print "client closed connection"
155         break
156       request = simplejson.loads(msg)
157       if not isinstance(request, dict):
158         print "wrong request received: %s" % msg
159         break
160       method = request.get('request', None)
161       data = request.get('data', None)
162       if method is None or data is None:
163         print "no method or data in request"
164         break
165       print "request:", method, data
166       result = self._ops.handle_request(method, data)
167       print "result:", result
168       self.send_message(simplejson.dumps({'success': True, 'result': result}))
169
170   def read_message(self):
171     while not self._msgs:
172       data = self.request.recv(self.READ_SIZE)
173       if not data:
174         return None
175       new_msgs = (self._buffer + data).split(self.EOM)
176       self._buffer = new_msgs.pop()
177       self._msgs.extend(new_msgs)
178     return self._msgs.popleft()
179
180   def send_message(self, msg):
181     #print "sending", msg
182     self.request.sendall(msg + self.EOM)
183
184
185 class ClientOps:
186   """Class holding high-level client operations."""
187   def __init__(self, server):
188     self.server = server
189     self._cpu = None
190
191   def _getcpu(self):
192     if self._cpu is None:
193       self._cpu = mcpu.Processor(lambda x: None)
194     return self._cpu
195
196   def handle_request(self, operation, args):
197     print operation, args
198     if operation == "submit":
199       return self.put(args)
200     elif operation == "query":
201       return self.query(args)
202     else:
203       raise ValueError("Invalid operation")
204
205   def put(self, args):
206     job = luxi.UnserializeJob(args)
207     rid = self.server.queue.put(job)
208     return rid
209
210   def query(self, args):
211     path = args["object"]
212     fields = args["fields"]
213     names = args["names"]
214     if path == "instances":
215       opclass = opcodes.OpQueryInstances
216     elif path == "jobs":
217       # early exit because job query-ing is special (not via opcodes)
218       return self.query_jobs(fields, names)
219     else:
220       raise ValueError("Invalid object %s" % path)
221
222     op = opclass(output_fields = fields, names=names)
223     cpu = self._getcpu()
224     result = cpu.ExecOpCode(op)
225     return result
226
227   def query_jobs(self, fields, names):
228     return self.server.queue.query_jobs(fields, names)
229
230
231 def JobRunner(proc, job):
232   """Job executor.
233
234   This functions processes a single job in the context of given
235   processor instance.
236
237   """
238   job.SetStatus(opcodes.Job.STATUS_RUNNING)
239   fail = False
240   for idx, op in enumerate(job.data.op_list):
241     job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
242     try:
243       job.data.op_result[idx] = proc.ExecOpCode(op)
244       job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
245     except (errors.OpPrereqError, errors.OpExecError), err:
246       fail = True
247       job.data.op_result[idx] = str(err)
248       job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
249   if fail:
250     job.SetStatus(opcodes.Job.STATUS_FAIL)
251   else:
252     job.SetStatus(opcodes.Job.STATUS_SUCCESS)
253
254
255 def PoolWorker(worker_id, incoming_queue):
256   """A worker thread function.
257
258   This is the actual processor of a single thread of Job execution.
259
260   """
261   while True:
262     print "worker %s sleeping" % worker_id
263     item = incoming_queue.get(True)
264     if item is None:
265       break
266     print "worker %s processing job %s" % (worker_id, item.data.job_id)
267     #utils.Lock('cmd')
268     try:
269       proc = mcpu.Processor(feedback=lambda x: None)
270       try:
271         JobRunner(proc, item)
272       except errors.GenericError, err:
273         print "ganeti exception %s" % err
274     finally:
275       #utils.Unlock('cmd')
276       #utils.LockCleanup()
277       pass
278     print "worker %s finish job %s" % (worker_id, item.data.job_id)
279   print "worker %s exiting" % worker_id
280
281
282 def CheckMaster(debug):
283   """Checks the node setup.
284
285   If this is the master, the function will return. Otherwise it will
286   exit with an exit code based on the node status.
287
288   """
289   try:
290     ss = ssconf.SimpleStore()
291     master_name = ss.GetMasterNode()
292   except errors.ConfigurationError, err:
293     print "Cluster configuration incomplete: '%s'" % str(err)
294     sys.exit(EXIT_NODESETUP_ERROR)
295
296   try:
297     myself = utils.HostInfo()
298   except errors.ResolverError, err:
299     sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
300     sys.exit(EXIT_NODESETUP_ERROR)
301
302   if myself.name != master_name:
303     if debug:
304       sys.stderr.write("Not master, exiting.\n")
305     sys.exit(EXIT_NOTMASTER)
306
307
308 def ParseOptions():
309   """Parse the command line options.
310
311   Returns:
312     (options, args) as from OptionParser.parse_args()
313
314   """
315   parser = OptionParser(description="Ganeti master daemon",
316                         usage="%prog [-f] [-d]",
317                         version="%%prog (ganeti) %s" %
318                         constants.RELEASE_VERSION)
319
320   parser.add_option("-f", "--foreground", dest="fork",
321                     help="Don't detach from the current terminal",
322                     default=True, action="store_false")
323   parser.add_option("-d", "--debug", dest="debug",
324                     help="Enable some debug messages",
325                     default=False, action="store_true")
326   options, args = parser.parse_args()
327   return options, args
328
329
330 def main():
331   """Main function"""
332
333   options, args = ParseOptions()
334   utils.debug = options.debug
335
336   CheckMaster(options.debug)
337
338   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
339
340   # become a daemon
341   if options.fork:
342     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
343                     noclose_fds=[master.fileno()])
344
345   master.setup_processors()
346   try:
347     master.serve_forever()
348   finally:
349     master.server_cleanup()
350
351 if __name__ == "__main__":
352   main()