Implement checking for the master role in rapi
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import time
33 import collections
34 import Queue
35 import random
36 import signal
37 import simplejson
38 import logging
39
40 from cStringIO import StringIO
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
55
56
57 CLIENT_REQUEST_WORKERS = 16
58
59 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61
62
63 class ClientRequestWorker(workerpool.BaseWorker):
64   def RunTask(self, server, request, client_address):
65     """Process the request.
66
67     This is copied from the code in ThreadingMixIn.
68
69     """
70     try:
71       server.finish_request(request, client_address)
72       server.close_request(request)
73     except:
74       server.handle_error(request, client_address)
75       server.close_request(request)
76
77
78 class IOServer(SocketServer.UnixStreamServer):
79   """IO thread class.
80
81   This class takes care of initializing the other threads, setting
82   signal handlers (which are processed only in this thread), and doing
83   cleanup at shutdown.
84
85   """
86   def __init__(self, address, rqhandler, context):
87     """IOServer constructor
88
89     Args:
90       address: the address to bind this IOServer to
91       rqhandler: RequestHandler type object
92       context: Context Object common to all worker threads
93
94     """
95     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96     self.context = context
97
98     # We'll only start threads once we've forked.
99     self.jobqueue = None
100     self.request_workers = None
101
102   def setup_queue(self):
103     self.jobqueue = jqueue.JobQueue(self.context)
104     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105                                                  ClientRequestWorker)
106
107   def process_request(self, request, client_address):
108     """Add task to workerpool to process request.
109
110     """
111     self.request_workers.AddTask(self, request, client_address)
112
113   def serve_forever(self):
114     """Handle one request at a time until told to quit."""
115     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116     try:
117       while not sighandler.called:
118         self.handle_request()
119     finally:
120       sighandler.Reset()
121
122   def server_cleanup(self):
123     """Cleanup the server.
124
125     This involves shutting down the processor threads and the master
126     socket.
127
128     """
129     try:
130       self.server_close()
131       utils.RemoveFile(constants.MASTER_SOCKET)
132     finally:
133       if self.request_workers:
134         self.request_workers.TerminateWorkers()
135       if self.jobqueue:
136         self.jobqueue.Shutdown()
137
138
139 class ClientRqHandler(SocketServer.BaseRequestHandler):
140   """Client handler"""
141   EOM = '\3'
142   READ_SIZE = 4096
143
144   def setup(self):
145     self._buffer = ""
146     self._msgs = collections.deque()
147     self._ops = ClientOps(self.server)
148
149   def handle(self):
150     while True:
151       msg = self.read_message()
152       if msg is None:
153         logging.info("client closed connection")
154         break
155
156       request = simplejson.loads(msg)
157       logging.debug("request: %s", request)
158       if not isinstance(request, dict):
159         logging.error("wrong request received: %s", msg)
160         break
161
162       method = request.get(luxi.KEY_METHOD, None)
163       args = request.get(luxi.KEY_ARGS, None)
164       if method is None or args is None:
165         logging.error("no method or args in request")
166         break
167
168       success = False
169       try:
170         result = self._ops.handle_request(method, args)
171         success = True
172       except:
173         logging.error("Unexpected exception", exc_info=True)
174         err = sys.exc_info()
175         result = "Caught exception: %s" % str(err[1])
176
177       response = {
178         luxi.KEY_SUCCESS: success,
179         luxi.KEY_RESULT: result,
180         }
181       logging.debug("response: %s", response)
182       self.send_message(simplejson.dumps(response))
183
184   def read_message(self):
185     while not self._msgs:
186       data = self.request.recv(self.READ_SIZE)
187       if not data:
188         return None
189       new_msgs = (self._buffer + data).split(self.EOM)
190       self._buffer = new_msgs.pop()
191       self._msgs.extend(new_msgs)
192     return self._msgs.popleft()
193
194   def send_message(self, msg):
195     #print "sending", msg
196     self.request.sendall(msg + self.EOM)
197
198
199 class ClientOps:
200   """Class holding high-level client operations."""
201   def __init__(self, server):
202     self.server = server
203
204   def handle_request(self, method, args):
205     queue = self.server.jobqueue
206
207     # TODO: Parameter validation
208
209     if method == luxi.REQ_SUBMIT_JOB:
210       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211       # we need to compute the node list here, since from now on all
212       # operations require locks on the queue or the storage, and we
213       # shouldn't get another lock
214       node_list = self.server.context.cfg.GetNodeList()
215       return queue.SubmitJob(ops, node_list)
216
217     elif method == luxi.REQ_CANCEL_JOB:
218       job_id = args
219       return queue.CancelJob(job_id)
220
221     elif method == luxi.REQ_ARCHIVE_JOB:
222       job_id = args
223       return queue.ArchiveJob(job_id)
224
225     elif method == luxi.REQ_QUERY_JOBS:
226       (job_ids, fields) = args
227       return queue.QueryJobs(job_ids, fields)
228
229     else:
230       raise ValueError("Invalid operation")
231
232
233 class GanetiContext(object):
234   """Context common to all ganeti threads.
235
236   This class creates and holds common objects shared by all threads.
237
238   """
239   _instance = None
240
241   def __init__(self):
242     """Constructs a new GanetiContext object.
243
244     There should be only a GanetiContext object at any time, so this
245     function raises an error if this is not the case.
246
247     """
248     assert self.__class__._instance is None, "double GanetiContext instance"
249
250     # Create a ConfigWriter...
251     self.cfg = config.ConfigWriter()
252     # And a GanetiLockingManager...
253     self.glm = locking.GanetiLockManager(
254                 self.cfg.GetNodeList(),
255                 self.cfg.GetInstanceList())
256
257     # setting this also locks the class against attribute modifications
258     self.__class__._instance = self
259
260   def __setattr__(self, name, value):
261     """Setting GanetiContext attributes is forbidden after initialization.
262
263     """
264     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
265     object.__setattr__(self, name, value)
266
267
268 def ParseOptions():
269   """Parse the command line options.
270
271   Returns:
272     (options, args) as from OptionParser.parse_args()
273
274   """
275   parser = OptionParser(description="Ganeti master daemon",
276                         usage="%prog [-f] [-d]",
277                         version="%%prog (ganeti) %s" %
278                         constants.RELEASE_VERSION)
279
280   parser.add_option("-f", "--foreground", dest="fork",
281                     help="Don't detach from the current terminal",
282                     default=True, action="store_false")
283   parser.add_option("-d", "--debug", dest="debug",
284                     help="Enable some debug messages",
285                     default=False, action="store_true")
286   options, args = parser.parse_args()
287   return options, args
288
289
290 def main():
291   """Main function"""
292
293   options, args = ParseOptions()
294   utils.debug = options.debug
295   utils.no_fork = True
296
297   ssconf.CheckMaster(options.debug)
298
299   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
300
301   # become a daemon
302   if options.fork:
303     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
304                     noclose_fds=[master.fileno()])
305
306   utils.WritePidFile(constants.MASTERD_PID)
307
308   logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
309                      stderr_logging=not options.fork)
310
311   logging.info("ganeti master daemon startup")
312
313   master.setup_queue()
314   try:
315     master.serve_forever()
316   finally:
317     master.server_cleanup()
318     utils.RemovePidFile(constants.MASTERD_PID)
319
320
321 if __name__ == "__main__":
322   main()