gnt-cluster watcher: Show more information
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import os
31 import sys
32 import SocketServer
33 import time
34 import collections
35 import signal
36 import logging
37
38 from cStringIO import StringIO
39 from optparse import OptionParser
40
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import daemon
44 from ganeti import mcpu
45 from ganeti import opcodes
46 from ganeti import jqueue
47 from ganeti import locking
48 from ganeti import luxi
49 from ganeti import utils
50 from ganeti import errors
51 from ganeti import ssconf
52 from ganeti import workerpool
53 from ganeti import rpc
54 from ganeti import bootstrap
55 from ganeti import serializer
56
57
58 CLIENT_REQUEST_WORKERS = 16
59
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62
63
64 class ClientRequestWorker(workerpool.BaseWorker):
65   def RunTask(self, server, request, client_address):
66     """Process the request.
67
68     This is copied from the code in ThreadingMixIn.
69
70     """
71     try:
72       server.finish_request(request, client_address)
73       server.close_request(request)
74     except:
75       server.handle_error(request, client_address)
76       server.close_request(request)
77
78
79 class IOServer(SocketServer.UnixStreamServer):
80   """IO thread class.
81
82   This class takes care of initializing the other threads, setting
83   signal handlers (which are processed only in this thread), and doing
84   cleanup at shutdown.
85
86   """
87   def __init__(self, address, rqhandler):
88     """IOServer constructor
89
90     @param address: the address to bind this IOServer to
91     @param rqhandler: RequestHandler type object
92
93     """
94     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
95
96     # We'll only start threads once we've forked.
97     self.context = None
98     self.request_workers = None
99
100   def setup_queue(self):
101     self.context = GanetiContext()
102     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
103                                                  ClientRequestWorker)
104
105   def process_request(self, request, client_address):
106     """Add task to workerpool to process request.
107
108     """
109     self.request_workers.AddTask(self, request, client_address)
110
111   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
112   def serve_forever(self, signal_handlers=None):
113     """Handle one request at a time until told to quit."""
114     assert isinstance(signal_handlers, dict) and \
115            len(signal_handlers) > 0, \
116            "Broken SignalHandled decorator"
117     # Since we use SignalHandled only once, the resulting dict will map all
118     # signals to the same handler. We'll just use the first one.
119     sighandler = signal_handlers.values()[0]
120     while not sighandler.called:
121       self.handle_request()
122
123   def server_cleanup(self):
124     """Cleanup the server.
125
126     This involves shutting down the processor threads and the master
127     socket.
128
129     """
130     try:
131       self.server_close()
132     finally:
133       if self.request_workers:
134         self.request_workers.TerminateWorkers()
135       if self.context:
136         self.context.jobqueue.Shutdown()
137
138
139 class ClientRqHandler(SocketServer.BaseRequestHandler):
140   """Client handler"""
141   EOM = '\3'
142   READ_SIZE = 4096
143
144   def setup(self):
145     self._buffer = ""
146     self._msgs = collections.deque()
147     self._ops = ClientOps(self.server)
148
149   def handle(self):
150     while True:
151       msg = self.read_message()
152       if msg is None:
153         logging.debug("client closed connection")
154         break
155
156       request = serializer.LoadJson(msg)
157       logging.debug("request: %s", request)
158       if not isinstance(request, dict):
159         logging.error("wrong request received: %s", msg)
160         break
161
162       method = request.get(luxi.KEY_METHOD, None)
163       args = request.get(luxi.KEY_ARGS, None)
164       if method is None or args is None:
165         logging.error("no method or args in request")
166         break
167
168       success = False
169       try:
170         result = self._ops.handle_request(method, args)
171         success = True
172       except errors.GenericError, err:
173         success = False
174         result = errors.EncodeException(err)
175       except:
176         logging.error("Unexpected exception", exc_info=True)
177         err = sys.exc_info()
178         result = "Caught exception: %s" % str(err[1])
179
180       response = {
181         luxi.KEY_SUCCESS: success,
182         luxi.KEY_RESULT: result,
183         }
184       logging.debug("response: %s", response)
185       self.send_message(serializer.DumpJson(response))
186
187   def read_message(self):
188     while not self._msgs:
189       data = self.request.recv(self.READ_SIZE)
190       if not data:
191         return None
192       new_msgs = (self._buffer + data).split(self.EOM)
193       self._buffer = new_msgs.pop()
194       self._msgs.extend(new_msgs)
195     return self._msgs.popleft()
196
197   def send_message(self, msg):
198     #print "sending", msg
199     # TODO: sendall is not guaranteed to send everything
200     self.request.sendall(msg + self.EOM)
201
202
203 class ClientOps:
204   """Class holding high-level client operations."""
205   def __init__(self, server):
206     self.server = server
207
208   def handle_request(self, method, args):
209     queue = self.server.context.jobqueue
210
211     # TODO: Parameter validation
212
213     if method == luxi.REQ_SUBMIT_JOB:
214       logging.info("Received new job")
215       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
216       return queue.SubmitJob(ops)
217
218     if method == luxi.REQ_SUBMIT_MANY_JOBS:
219       logging.info("Received multiple jobs")
220       jobs = []
221       for ops in args:
222         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
223       return queue.SubmitManyJobs(jobs)
224
225     elif method == luxi.REQ_CANCEL_JOB:
226       job_id = args
227       logging.info("Received job cancel request for %s", job_id)
228       return queue.CancelJob(job_id)
229
230     elif method == luxi.REQ_ARCHIVE_JOB:
231       job_id = args
232       logging.info("Received job archive request for %s", job_id)
233       return queue.ArchiveJob(job_id)
234
235     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
236       (age, timeout) = args
237       logging.info("Received job autoarchive request for age %s, timeout %s",
238                    age, timeout)
239       return queue.AutoArchiveJobs(age, timeout)
240
241     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
242       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
243       logging.info("Received job poll request for %s", job_id)
244       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
245                                      prev_log_serial, timeout)
246
247     elif method == luxi.REQ_QUERY_JOBS:
248       (job_ids, fields) = args
249       if isinstance(job_ids, (tuple, list)) and job_ids:
250         msg = ", ".join(job_ids)
251       else:
252         msg = str(job_ids)
253       logging.info("Received job query request for %s", msg)
254       return queue.QueryJobs(job_ids, fields)
255
256     elif method == luxi.REQ_QUERY_INSTANCES:
257       (names, fields, use_locking) = args
258       logging.info("Received instance query request for %s", names)
259       if use_locking:
260         raise errors.OpPrereqError("Sync queries are not allowed")
261       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262                                     use_locking=use_locking)
263       return self._Query(op)
264
265     elif method == luxi.REQ_QUERY_NODES:
266       (names, fields, use_locking) = args
267       logging.info("Received node query request for %s", names)
268       if use_locking:
269         raise errors.OpPrereqError("Sync queries are not allowed")
270       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
271                                 use_locking=use_locking)
272       return self._Query(op)
273
274     elif method == luxi.REQ_QUERY_EXPORTS:
275       nodes, use_locking = args
276       if use_locking:
277         raise errors.OpPrereqError("Sync queries are not allowed")
278       logging.info("Received exports query request")
279       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
280       return self._Query(op)
281
282     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
283       fields = args
284       logging.info("Received config values query request for %s", fields)
285       op = opcodes.OpQueryConfigValues(output_fields=fields)
286       return self._Query(op)
287
288     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
289       logging.info("Received cluster info query request")
290       op = opcodes.OpQueryClusterInfo()
291       return self._Query(op)
292
293     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
294       drain_flag = args
295       logging.info("Received queue drain flag change request to %s",
296                    drain_flag)
297       return queue.SetDrainFlag(drain_flag)
298
299     elif method == luxi.REQ_SET_WATCHER_PAUSE:
300       (until, ) = args
301
302       if until is None:
303         logging.info("Received request to no longer pause the watcher")
304       else:
305         if not isinstance(until, (int, float)):
306           raise TypeError("Duration must be an integer or float")
307
308         if until < time.time():
309           raise errors.GenericError("Unable to set pause end time in the past")
310
311         logging.info("Received request to pause the watcher until %s", until)
312
313       return _SetWatcherPause(until)
314
315     else:
316       logging.info("Received invalid request '%s'", method)
317       raise ValueError("Invalid operation '%s'" % method)
318
319   def _DummyLog(self, *args):
320     pass
321
322   def _Query(self, op):
323     """Runs the specified opcode and returns the result.
324
325     """
326     proc = mcpu.Processor(self.server.context)
327     # TODO: Where should log messages go?
328     return proc.ExecOpCode(op, self._DummyLog, None)
329
330
331 class GanetiContext(object):
332   """Context common to all ganeti threads.
333
334   This class creates and holds common objects shared by all threads.
335
336   """
337   _instance = None
338
339   def __init__(self):
340     """Constructs a new GanetiContext object.
341
342     There should be only a GanetiContext object at any time, so this
343     function raises an error if this is not the case.
344
345     """
346     assert self.__class__._instance is None, "double GanetiContext instance"
347
348     # Create global configuration object
349     self.cfg = config.ConfigWriter()
350
351     # Locking manager
352     self.glm = locking.GanetiLockManager(
353                 self.cfg.GetNodeList(),
354                 self.cfg.GetInstanceList())
355
356     # Job queue
357     self.jobqueue = jqueue.JobQueue(self)
358
359     # setting this also locks the class against attribute modifications
360     self.__class__._instance = self
361
362   def __setattr__(self, name, value):
363     """Setting GanetiContext attributes is forbidden after initialization.
364
365     """
366     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
367     object.__setattr__(self, name, value)
368
369   def AddNode(self, node):
370     """Adds a node to the configuration and lock manager.
371
372     """
373     # Add it to the configuration
374     self.cfg.AddNode(node)
375
376     # If preseeding fails it'll not be added
377     self.jobqueue.AddNode(node)
378
379     # Add the new node to the Ganeti Lock Manager
380     self.glm.add(locking.LEVEL_NODE, node.name)
381
382   def ReaddNode(self, node):
383     """Updates a node that's already in the configuration
384
385     """
386     # Synchronize the queue again
387     self.jobqueue.AddNode(node)
388
389   def RemoveNode(self, name):
390     """Removes a node from the configuration and lock manager.
391
392     """
393     # Remove node from configuration
394     self.cfg.RemoveNode(name)
395
396     # Notify job queue
397     self.jobqueue.RemoveNode(name)
398
399     # Remove the node from the Ganeti Lock Manager
400     self.glm.remove(locking.LEVEL_NODE, name)
401
402
403 def _SetWatcherPause(until):
404   """Creates or removes the watcher pause file.
405
406   @type until: None or int
407   @param until: Unix timestamp saying until when the watcher shouldn't run
408
409   """
410   if until is None:
411     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
412   else:
413     utils.WriteFile(constants.WATCHER_PAUSEFILE,
414                     data="%d\n" % (until, ))
415
416   return until
417
418
419 def CheckAgreement():
420   """Check the agreement on who is the master.
421
422   The function uses a very simple algorithm: we must get more positive
423   than negative answers. Since in most of the cases we are the master,
424   we'll use our own config file for getting the node list. In the
425   future we could collect the current node list from our (possibly
426   obsolete) known nodes.
427
428   In order to account for cold-start of all nodes, we retry for up to
429   a minute until we get a real answer as the top-voted one. If the
430   nodes are more out-of-sync, for now manual startup of the master
431   should be attempted.
432
433   Note that for a even number of nodes cluster, we need at least half
434   of the nodes (beside ourselves) to vote for us. This creates a
435   problem on two-node clusters, since in this case we require the
436   other node to be up too to confirm our status.
437
438   """
439   myself = utils.HostInfo().name
440   #temp instantiation of a config writer, used only to get the node list
441   cfg = config.ConfigWriter()
442   node_list = cfg.GetNodeList()
443   del cfg
444   retries = 6
445   while retries > 0:
446     votes = bootstrap.GatherMasterVotes(node_list)
447     if not votes:
448       # empty node list, this is a one node cluster
449       return True
450     if votes[0][0] is None:
451       retries -= 1
452       time.sleep(10)
453       continue
454     break
455   if retries == 0:
456     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
457                      " after multiple retries. Aborting startup")
458     return False
459   # here a real node is at the top of the list
460   all_votes = sum(item[1] for item in votes)
461   top_node, top_votes = votes[0]
462
463   result = False
464   if top_node != myself:
465     logging.critical("It seems we are not the master (top-voted node"
466                      " is %s with %d out of %d votes)", top_node, top_votes,
467                      all_votes)
468   elif top_votes < all_votes - top_votes:
469     logging.critical("It seems we are not the master (%d votes for,"
470                      " %d votes against)", top_votes, all_votes - top_votes)
471   else:
472     result = True
473
474   return result
475
476
477 def CheckAgreementWithRpc():
478   rpc.Init()
479   try:
480     return CheckAgreement()
481   finally:
482     rpc.Shutdown()
483
484
485 def _RunInSeparateProcess(fn):
486   """Runs a function in a separate process.
487
488   Note: Only boolean return values are supported.
489
490   @type fn: callable
491   @param fn: Function to be called
492   @rtype: bool
493
494   """
495   pid = os.fork()
496   if pid == 0:
497     # Child process
498     try:
499       # Call function
500       result = int(bool(fn()))
501       assert result in (0, 1)
502     except:
503       logging.exception("Error while calling function in separate process")
504       # 0 and 1 are reserved for the return value
505       result = 33
506
507     os._exit(result)
508
509   # Parent process
510
511   # Avoid zombies and check exit code
512   (_, status) = os.waitpid(pid, 0)
513
514   if os.WIFSIGNALED(status):
515     signum = os.WTERMSIG(status)
516     exitcode = None
517   else:
518     signum = None
519     exitcode = os.WEXITSTATUS(status)
520
521   if not (exitcode in (0, 1) and signum is None):
522     logging.error("Child program failed (code=%s, signal=%s)",
523                   exitcode, signum)
524     sys.exit(constants.EXIT_FAILURE)
525
526   return bool(exitcode)
527
528
529 def CheckMasterd(options, args):
530   """Initial checks whether to run or exit with a failure.
531
532   """
533   ssconf.CheckMaster(options.debug)
534
535   # If CheckMaster didn't fail we believe we are the master, but we have to
536   # confirm with the other nodes.
537   if options.no_voting:
538     if options.yes_do_it:
539       return
540
541     sys.stdout.write("The 'no voting' option has been selected.\n")
542     sys.stdout.write("This is dangerous, please confirm by"
543                      " typing uppercase 'yes': ")
544     sys.stdout.flush()
545
546     confirmation = sys.stdin.readline().strip()
547     if confirmation != "YES":
548       print >>sys.stderr, "Aborting."
549       sys.exit(constants.EXIT_FAILURE)
550
551     return
552
553   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
554   # process before we call utils.Daemonize in the current process.
555   if not _RunInSeparateProcess(CheckAgreementWithRpc):
556     sys.exit(constants.EXIT_FAILURE)
557
558
559 def ExecMasterd (options, args):
560   """Main master daemon function, executed with the PID file held.
561
562   """
563   # This is safe to do as the pid file guarantees against
564   # concurrent execution.
565   utils.RemoveFile(constants.MASTER_SOCKET)
566
567   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
568   try:
569     rpc.Init()
570     try:
571       # activate ip
572       master_node = ssconf.SimpleStore().GetMasterNode()
573       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
574       msg = result.RemoteFailMsg()
575       if msg:
576         logging.error("Can't activate master IP address: %s", msg)
577
578       master.setup_queue()
579       try:
580         master.serve_forever()
581       finally:
582         master.server_cleanup()
583     finally:
584       rpc.Shutdown()
585   finally:
586     utils.RemoveFile(constants.MASTER_SOCKET)
587
588
589 def main():
590   """Main function"""
591   parser = OptionParser(description="Ganeti master daemon",
592                         usage="%prog [-f] [-d]",
593                         version="%%prog (ganeti) %s" %
594                         constants.RELEASE_VERSION)
595   parser.add_option("--no-voting", dest="no_voting",
596                     help="Do not check that the nodes agree on this node"
597                     " being the master and start the daemon unconditionally",
598                     default=False, action="store_true")
599   parser.add_option("--yes-do-it", dest="yes_do_it",
600                     help="Override interactive check for --no-voting",
601                     default=False, action="store_true")
602   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
603           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
604          ]
605   daemon.GenericMain(constants.MASTERD, parser, dirs,
606                      CheckMasterd, ExecMasterd)
607
608
609 if __name__ == "__main__":
610   main()