ganeti-masterd: Remove PID file at the end
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import time
33 import collections
34 import Queue
35 import random
36 import signal
37 import simplejson
38 import logging
39
40 from cStringIO import StringIO
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import workerpool
54 from ganeti import rpc
55 from ganeti import bootstrap
56
57
58 CLIENT_REQUEST_WORKERS = 16
59
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62
63
64 class ClientRequestWorker(workerpool.BaseWorker):
65   def RunTask(self, server, request, client_address):
66     """Process the request.
67
68     This is copied from the code in ThreadingMixIn.
69
70     """
71     try:
72       server.finish_request(request, client_address)
73       server.close_request(request)
74     except:
75       server.handle_error(request, client_address)
76       server.close_request(request)
77
78
79 class IOServer(SocketServer.UnixStreamServer):
80   """IO thread class.
81
82   This class takes care of initializing the other threads, setting
83   signal handlers (which are processed only in this thread), and doing
84   cleanup at shutdown.
85
86   """
87   def __init__(self, address, rqhandler):
88     """IOServer constructor
89
90     Args:
91       address: the address to bind this IOServer to
92       rqhandler: RequestHandler type object
93
94     """
95     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96
97     # We'll only start threads once we've forked.
98     self.context = None
99     self.request_workers = None
100
101   def setup_queue(self):
102     self.context = GanetiContext()
103     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104                                                  ClientRequestWorker)
105
106   def process_request(self, request, client_address):
107     """Add task to workerpool to process request.
108
109     """
110     self.request_workers.AddTask(self, request, client_address)
111
112   def serve_forever(self):
113     """Handle one request at a time until told to quit."""
114     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115     try:
116       while not sighandler.called:
117         self.handle_request()
118     finally:
119       sighandler.Reset()
120
121   def server_cleanup(self):
122     """Cleanup the server.
123
124     This involves shutting down the processor threads and the master
125     socket.
126
127     """
128     try:
129       self.server_close()
130     finally:
131       if self.request_workers:
132         self.request_workers.TerminateWorkers()
133       if self.context:
134         self.context.jobqueue.Shutdown()
135
136
137 class ClientRqHandler(SocketServer.BaseRequestHandler):
138   """Client handler"""
139   EOM = '\3'
140   READ_SIZE = 4096
141
142   def setup(self):
143     self._buffer = ""
144     self._msgs = collections.deque()
145     self._ops = ClientOps(self.server)
146
147   def handle(self):
148     while True:
149       msg = self.read_message()
150       if msg is None:
151         logging.info("client closed connection")
152         break
153
154       request = simplejson.loads(msg)
155       logging.debug("request: %s", request)
156       if not isinstance(request, dict):
157         logging.error("wrong request received: %s", msg)
158         break
159
160       method = request.get(luxi.KEY_METHOD, None)
161       args = request.get(luxi.KEY_ARGS, None)
162       if method is None or args is None:
163         logging.error("no method or args in request")
164         break
165
166       success = False
167       try:
168         result = self._ops.handle_request(method, args)
169         success = True
170       except errors.GenericError, err:
171         success = False
172         result = (err.__class__.__name__, err.args)
173       except:
174         logging.error("Unexpected exception", exc_info=True)
175         err = sys.exc_info()
176         result = "Caught exception: %s" % str(err[1])
177
178       response = {
179         luxi.KEY_SUCCESS: success,
180         luxi.KEY_RESULT: result,
181         }
182       logging.debug("response: %s", response)
183       self.send_message(simplejson.dumps(response))
184
185   def read_message(self):
186     while not self._msgs:
187       data = self.request.recv(self.READ_SIZE)
188       if not data:
189         return None
190       new_msgs = (self._buffer + data).split(self.EOM)
191       self._buffer = new_msgs.pop()
192       self._msgs.extend(new_msgs)
193     return self._msgs.popleft()
194
195   def send_message(self, msg):
196     #print "sending", msg
197     self.request.sendall(msg + self.EOM)
198
199
200 class ClientOps:
201   """Class holding high-level client operations."""
202   def __init__(self, server):
203     self.server = server
204
205   def handle_request(self, method, args):
206     queue = self.server.context.jobqueue
207
208     # TODO: Parameter validation
209
210     if method == luxi.REQ_SUBMIT_JOB:
211       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
212       return queue.SubmitJob(ops)
213
214     elif method == luxi.REQ_CANCEL_JOB:
215       job_id = args
216       return queue.CancelJob(job_id)
217
218     elif method == luxi.REQ_ARCHIVE_JOB:
219       job_id = args
220       return queue.ArchiveJob(job_id)
221
222     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
223       age = args
224       return queue.AutoArchiveJobs(age)
225
226     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
229                                      prev_log_serial, timeout)
230
231     elif method == luxi.REQ_QUERY_JOBS:
232       (job_ids, fields) = args
233       return queue.QueryJobs(job_ids, fields)
234
235     elif method == luxi.REQ_QUERY_INSTANCES:
236       (names, fields) = args
237       op = opcodes.OpQueryInstances(names=names, output_fields=fields)
238       return self._Query(op)
239
240     elif method == luxi.REQ_QUERY_NODES:
241       (names, fields) = args
242       op = opcodes.OpQueryNodes(names=names, output_fields=fields)
243       return self._Query(op)
244
245     elif method == luxi.REQ_QUERY_EXPORTS:
246       nodes = args
247       op = opcodes.OpQueryExports(nodes=nodes)
248       return self._Query(op)
249
250     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
251       fields = args
252       op = opcodes.OpQueryConfigValues(output_fields=fields)
253       return self._Query(op)
254
255     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
256       drain_flag = args
257       return queue.SetDrainFlag(drain_flag)
258
259     else:
260       raise ValueError("Invalid operation")
261
262   def _DummyLog(self, *args):
263     pass
264
265   def _Query(self, op):
266     """Runs the specified opcode and returns the result.
267
268     """
269     proc = mcpu.Processor(self.server.context)
270     # TODO: Where should log messages go?
271     return proc.ExecOpCode(op, self._DummyLog, None)
272
273
274 class GanetiContext(object):
275   """Context common to all ganeti threads.
276
277   This class creates and holds common objects shared by all threads.
278
279   """
280   _instance = None
281
282   def __init__(self):
283     """Constructs a new GanetiContext object.
284
285     There should be only a GanetiContext object at any time, so this
286     function raises an error if this is not the case.
287
288     """
289     assert self.__class__._instance is None, "double GanetiContext instance"
290
291     # Create global configuration object
292     self.cfg = config.ConfigWriter()
293
294     # Locking manager
295     self.glm = locking.GanetiLockManager(
296                 self.cfg.GetNodeList(),
297                 self.cfg.GetInstanceList())
298
299     # Job queue
300     self.jobqueue = jqueue.JobQueue(self)
301
302     # setting this also locks the class against attribute modifications
303     self.__class__._instance = self
304
305   def __setattr__(self, name, value):
306     """Setting GanetiContext attributes is forbidden after initialization.
307
308     """
309     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
310     object.__setattr__(self, name, value)
311
312   def AddNode(self, node):
313     """Adds a node to the configuration and lock manager.
314
315     """
316     # Add it to the configuration
317     self.cfg.AddNode(node)
318
319     # If preseeding fails it'll not be added
320     self.jobqueue.AddNode(node)
321
322     # Add the new node to the Ganeti Lock Manager
323     self.glm.add(locking.LEVEL_NODE, node.name)
324
325   def ReaddNode(self, node):
326     """Updates a node that's already in the configuration
327
328     """
329     # Synchronize the queue again
330     self.jobqueue.AddNode(node)
331
332   def RemoveNode(self, name):
333     """Removes a node from the configuration and lock manager.
334
335     """
336     # Remove node from configuration
337     self.cfg.RemoveNode(name)
338
339     # Notify job queue
340     self.jobqueue.RemoveNode(name)
341
342     # Remove the node from the Ganeti Lock Manager
343     self.glm.remove(locking.LEVEL_NODE, name)
344
345
346 def ParseOptions():
347   """Parse the command line options.
348
349   Returns:
350     (options, args) as from OptionParser.parse_args()
351
352   """
353   parser = OptionParser(description="Ganeti master daemon",
354                         usage="%prog [-f] [-d]",
355                         version="%%prog (ganeti) %s" %
356                         constants.RELEASE_VERSION)
357
358   parser.add_option("-f", "--foreground", dest="fork",
359                     help="Don't detach from the current terminal",
360                     default=True, action="store_false")
361   parser.add_option("-d", "--debug", dest="debug",
362                     help="Enable some debug messages",
363                     default=False, action="store_true")
364   options, args = parser.parse_args()
365   return options, args
366
367
368 def CheckAgreement():
369   """Check the agreement on who is the master.
370
371   The function uses a very simple algorithm: we must get more positive
372   than negative answers. Since in most of the cases we are the master,
373   we'll use our own config file for getting the node list. In the
374   future we could collect the current node list from our (possibly
375   obsolete) known nodes.
376
377   In order to account for cold-start of all nodes, we retry for up to
378   a minute until we get a real answer as the top-voted one. If the
379   nodes are more out-of-sync, for now manual startup of the master
380   should be attempted.
381
382   Note that for a even number of nodes cluster, we need at least half
383   of the nodes (beside ourselves) to vote for us. This creates a
384   problem on two-node clusters, since in this case we require the
385   other node to be up too to confirm our status.
386
387   """
388   myself = utils.HostInfo().name
389   #temp instantiation of a config writer, used only to get the node list
390   cfg = config.ConfigWriter()
391   node_list = cfg.GetNodeList()
392   del cfg
393   retries = 6
394   while retries > 0:
395     votes = bootstrap.GatherMasterVotes(node_list)
396     if not votes:
397       # empty node list, this is a one node cluster
398       return True
399     if votes[0][0] is None:
400       retries -= 1
401       time.sleep(10)
402       continue
403     break
404   if retries == 0:
405       logging.critical("Cluster inconsistent, most of the nodes didn't answer"
406                        " after multiple retries. Aborting startup")
407       return False
408   # here a real node is at the top of the list
409   all_votes = sum(item[1] for item in votes)
410   top_node, top_votes = votes[0]
411   result = False
412   if top_node != myself:
413     logging.critical("It seems we are not the master (top-voted node"
414                      " is %s)", top_node)
415   elif top_votes < all_votes - top_votes:
416     logging.critical("It seems we are not the master (%d votes for,"
417                      " %d votes against)", top_votes, all_votes - top_votes)
418   else:
419     result = True
420
421   return result
422
423
424 def main():
425   """Main function"""
426
427   options, args = ParseOptions()
428   utils.debug = options.debug
429   utils.no_fork = True
430
431   rpc.Init()
432   try:
433     ssconf.CheckMaster(options.debug)
434
435     # we believe we are the master, let's ask the other nodes...
436     if not CheckAgreement():
437       return
438
439     master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
440   finally:
441     rpc.Shutdown()
442
443   # become a daemon
444   if options.fork:
445     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
446                     noclose_fds=[master.fileno()])
447
448   utils.WritePidFile(constants.MASTERD_PID)
449   try:
450     utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
451                        stderr_logging=not options.fork)
452
453     logging.info("Ganeti master daemon startup")
454
455     rpc.Init()
456     try:
457       # activate ip
458       master_node = ssconf.SimpleConfigReader().GetMasterNode()
459       if not rpc.RpcRunner.call_node_start_master(master_node, False):
460         logging.error("Can't activate master IP address")
461
462       master.setup_queue()
463       try:
464         master.serve_forever()
465       finally:
466         master.server_cleanup()
467     finally:
468       rpc.Shutdown()
469   finally:
470     utils.RemovePidFile(constants.MASTERD_PID)
471
472
473 if __name__ == "__main__":
474   main()