Distribute the queue serial file after each update
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import time
33 import collections
34 import Queue
35 import random
36 import signal
37 import simplejson
38 import logging
39
40 from cStringIO import StringIO
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
55
56
57 CLIENT_REQUEST_WORKERS = 16
58
59 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61
62
63 class ClientRequestWorker(workerpool.BaseWorker):
64   def RunTask(self, server, request, client_address):
65     """Process the request.
66
67     This is copied from the code in ThreadingMixIn.
68
69     """
70     try:
71       server.finish_request(request, client_address)
72       server.close_request(request)
73     except:
74       server.handle_error(request, client_address)
75       server.close_request(request)
76
77
78 class IOServer(SocketServer.UnixStreamServer):
79   """IO thread class.
80
81   This class takes care of initializing the other threads, setting
82   signal handlers (which are processed only in this thread), and doing
83   cleanup at shutdown.
84
85   """
86   def __init__(self, address, rqhandler, context):
87     """IOServer constructor
88
89     Args:
90       address: the address to bind this IOServer to
91       rqhandler: RequestHandler type object
92       context: Context Object common to all worker threads
93
94     """
95     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96     self.context = context
97
98     # We'll only start threads once we've forked.
99     self.jobqueue = None
100     self.request_workers = None
101
102   def setup_queue(self):
103     self.jobqueue = jqueue.JobQueue(self.context)
104     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105                                                  ClientRequestWorker)
106
107   def process_request(self, request, client_address):
108     """Add task to workerpool to process request.
109
110     """
111     self.request_workers.AddTask(self, request, client_address)
112
113   def serve_forever(self):
114     """Handle one request at a time until told to quit."""
115     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116     try:
117       while not sighandler.called:
118         self.handle_request()
119     finally:
120       sighandler.Reset()
121
122   def server_cleanup(self):
123     """Cleanup the server.
124
125     This involves shutting down the processor threads and the master
126     socket.
127
128     """
129     try:
130       self.server_close()
131       utils.RemoveFile(constants.MASTER_SOCKET)
132     finally:
133       if self.request_workers:
134         self.request_workers.TerminateWorkers()
135       if self.jobqueue:
136         self.jobqueue.Shutdown()
137
138
139 class ClientRqHandler(SocketServer.BaseRequestHandler):
140   """Client handler"""
141   EOM = '\3'
142   READ_SIZE = 4096
143
144   def setup(self):
145     self._buffer = ""
146     self._msgs = collections.deque()
147     self._ops = ClientOps(self.server)
148
149   def handle(self):
150     while True:
151       msg = self.read_message()
152       if msg is None:
153         logging.info("client closed connection")
154         break
155
156       request = simplejson.loads(msg)
157       logging.debug("request: %s", request)
158       if not isinstance(request, dict):
159         logging.error("wrong request received: %s", msg)
160         break
161
162       method = request.get(luxi.KEY_METHOD, None)
163       args = request.get(luxi.KEY_ARGS, None)
164       if method is None or args is None:
165         logging.error("no method or args in request")
166         break
167
168       success = False
169       try:
170         result = self._ops.handle_request(method, args)
171         success = True
172       except:
173         logging.error("Unexpected exception", exc_info=True)
174         err = sys.exc_info()
175         result = "Caught exception: %s" % str(err[1])
176
177       response = {
178         luxi.KEY_SUCCESS: success,
179         luxi.KEY_RESULT: result,
180         }
181       logging.debug("response: %s", response)
182       self.send_message(simplejson.dumps(response))
183
184   def read_message(self):
185     while not self._msgs:
186       data = self.request.recv(self.READ_SIZE)
187       if not data:
188         return None
189       new_msgs = (self._buffer + data).split(self.EOM)
190       self._buffer = new_msgs.pop()
191       self._msgs.extend(new_msgs)
192     return self._msgs.popleft()
193
194   def send_message(self, msg):
195     #print "sending", msg
196     self.request.sendall(msg + self.EOM)
197
198
199 class ClientOps:
200   """Class holding high-level client operations."""
201   def __init__(self, server):
202     self.server = server
203
204   def handle_request(self, method, args):
205     queue = self.server.jobqueue
206
207     # TODO: Parameter validation
208
209     if method == luxi.REQ_SUBMIT_JOB:
210       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211       # we need to compute the node list here, since from now on all
212       # operations require locks on the queue or the storage, and we
213       # shouldn't get another lock
214       node_list = self.server.context.cfg.GetNodeList()
215       return queue.SubmitJob(ops, node_list)
216
217     elif method == luxi.REQ_CANCEL_JOB:
218       (job_id, ) = args
219       return queue.CancelJob(job_id)
220
221     elif method == luxi.REQ_ARCHIVE_JOB:
222       (job_id, ) = args
223       return queue.ArchiveJob(job_id)
224
225     elif method == luxi.REQ_QUERY_JOBS:
226       (job_ids, fields) = args
227       return queue.QueryJobs(job_ids, fields)
228
229     else:
230       raise ValueError("Invalid operation")
231
232
233 class GanetiContext(object):
234   """Context common to all ganeti threads.
235
236   This class creates and holds common objects shared by all threads.
237
238   """
239   _instance = None
240
241   def __init__(self):
242     """Constructs a new GanetiContext object.
243
244     There should be only a GanetiContext object at any time, so this
245     function raises an error if this is not the case.
246
247     """
248     assert self.__class__._instance is None, "double GanetiContext instance"
249
250     # Create a ConfigWriter...
251     self.cfg = config.ConfigWriter()
252     # And a GanetiLockingManager...
253     self.glm = locking.GanetiLockManager(
254                 self.cfg.GetNodeList(),
255                 self.cfg.GetInstanceList())
256
257     # setting this also locks the class against attribute modifications
258     self.__class__._instance = self
259
260   def __setattr__(self, name, value):
261     """Setting GanetiContext attributes is forbidden after initialization.
262
263     """
264     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
265     object.__setattr__(self, name, value)
266
267
268 def CheckMaster(debug):
269   """Checks the node setup.
270
271   If this is the master, the function will return. Otherwise it will
272   exit with an exit code based on the node status.
273
274   """
275   try:
276     ss = ssconf.SimpleStore()
277     master_name = ss.GetMasterNode()
278   except errors.ConfigurationError, err:
279     print "Cluster configuration incomplete: '%s'" % str(err)
280     sys.exit(EXIT_NODESETUP_ERROR)
281
282   try:
283     myself = utils.HostInfo()
284   except errors.ResolverError, err:
285     sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
286     sys.exit(EXIT_NODESETUP_ERROR)
287
288   if myself.name != master_name:
289     if debug:
290       sys.stderr.write("Not master, exiting.\n")
291     sys.exit(EXIT_NOTMASTER)
292
293
294 def ParseOptions():
295   """Parse the command line options.
296
297   Returns:
298     (options, args) as from OptionParser.parse_args()
299
300   """
301   parser = OptionParser(description="Ganeti master daemon",
302                         usage="%prog [-f] [-d]",
303                         version="%%prog (ganeti) %s" %
304                         constants.RELEASE_VERSION)
305
306   parser.add_option("-f", "--foreground", dest="fork",
307                     help="Don't detach from the current terminal",
308                     default=True, action="store_false")
309   parser.add_option("-d", "--debug", dest="debug",
310                     help="Enable some debug messages",
311                     default=False, action="store_true")
312   options, args = parser.parse_args()
313   return options, args
314
315
316 def main():
317   """Main function"""
318
319   options, args = ParseOptions()
320   utils.debug = options.debug
321   utils.no_fork = True
322
323   CheckMaster(options.debug)
324
325   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
326
327   # become a daemon
328   if options.fork:
329     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
330                     noclose_fds=[master.fileno()])
331
332   logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
333                      stderr_logging=not options.fork)
334
335   logging.info("ganeti master daemon startup")
336
337   master.setup_queue()
338   try:
339     master.serve_forever()
340   finally:
341     master.server_cleanup()
342
343
344 if __name__ == "__main__":
345   main()