Add new “daemon-util” script to start/stop Ganeti daemons
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import os
31 import sys
32 import SocketServer
33 import time
34 import collections
35 import signal
36 import logging
37
38 from optparse import OptionParser
39
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import daemon
43 from ganeti import mcpu
44 from ganeti import opcodes
45 from ganeti import jqueue
46 from ganeti import locking
47 from ganeti import luxi
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti import ssconf
51 from ganeti import workerpool
52 from ganeti import rpc
53 from ganeti import bootstrap
54 from ganeti import serializer
55
56
57 CLIENT_REQUEST_WORKERS = 16
58
59 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61
62
63 class ClientRequestWorker(workerpool.BaseWorker):
64   def RunTask(self, server, request, client_address):
65     """Process the request.
66
67     This is copied from the code in ThreadingMixIn.
68
69     """
70     try:
71       server.finish_request(request, client_address)
72       server.close_request(request)
73     except:
74       server.handle_error(request, client_address)
75       server.close_request(request)
76
77
78 class IOServer(SocketServer.UnixStreamServer):
79   """IO thread class.
80
81   This class takes care of initializing the other threads, setting
82   signal handlers (which are processed only in this thread), and doing
83   cleanup at shutdown.
84
85   """
86   def __init__(self, address, rqhandler):
87     """IOServer constructor
88
89     @param address: the address to bind this IOServer to
90     @param rqhandler: RequestHandler type object
91
92     """
93     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94
95     # We'll only start threads once we've forked.
96     self.context = None
97     self.request_workers = None
98
99   def setup_queue(self):
100     self.context = GanetiContext()
101     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
102                                                  ClientRequestWorker)
103
104   def process_request(self, request, client_address):
105     """Add task to workerpool to process request.
106
107     """
108     self.request_workers.AddTask(self, request, client_address)
109
110   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
111   def serve_forever(self, signal_handlers=None):
112     """Handle one request at a time until told to quit."""
113     assert isinstance(signal_handlers, dict) and \
114            len(signal_handlers) > 0, \
115            "Broken SignalHandled decorator"
116     # Since we use SignalHandled only once, the resulting dict will map all
117     # signals to the same handler. We'll just use the first one.
118     sighandler = signal_handlers.values()[0]
119     while not sighandler.called:
120       self.handle_request()
121
122   def server_cleanup(self):
123     """Cleanup the server.
124
125     This involves shutting down the processor threads and the master
126     socket.
127
128     """
129     try:
130       self.server_close()
131     finally:
132       if self.request_workers:
133         self.request_workers.TerminateWorkers()
134       if self.context:
135         self.context.jobqueue.Shutdown()
136
137
138 class ClientRqHandler(SocketServer.BaseRequestHandler):
139   """Client handler"""
140   EOM = '\3'
141   READ_SIZE = 4096
142
143   def setup(self):
144     self._buffer = ""
145     self._msgs = collections.deque()
146     self._ops = ClientOps(self.server)
147
148   def handle(self):
149     while True:
150       msg = self.read_message()
151       if msg is None:
152         logging.debug("client closed connection")
153         break
154
155       request = serializer.LoadJson(msg)
156       logging.debug("request: %s", request)
157       if not isinstance(request, dict):
158         logging.error("wrong request received: %s", msg)
159         break
160
161       method = request.get(luxi.KEY_METHOD, None)
162       args = request.get(luxi.KEY_ARGS, None)
163       if method is None or args is None:
164         logging.error("no method or args in request")
165         break
166
167       success = False
168       try:
169         result = self._ops.handle_request(method, args)
170         success = True
171       except errors.GenericError, err:
172         success = False
173         result = errors.EncodeException(err)
174       except:
175         logging.error("Unexpected exception", exc_info=True)
176         err = sys.exc_info()
177         result = "Caught exception: %s" % str(err[1])
178
179       response = {
180         luxi.KEY_SUCCESS: success,
181         luxi.KEY_RESULT: result,
182         }
183       logging.debug("response: %s", response)
184       self.send_message(serializer.DumpJson(response))
185
186   def read_message(self):
187     while not self._msgs:
188       data = self.request.recv(self.READ_SIZE)
189       if not data:
190         return None
191       new_msgs = (self._buffer + data).split(self.EOM)
192       self._buffer = new_msgs.pop()
193       self._msgs.extend(new_msgs)
194     return self._msgs.popleft()
195
196   def send_message(self, msg):
197     #print "sending", msg
198     # TODO: sendall is not guaranteed to send everything
199     self.request.sendall(msg + self.EOM)
200
201
202 class ClientOps:
203   """Class holding high-level client operations."""
204   def __init__(self, server):
205     self.server = server
206
207   def handle_request(self, method, args):
208     queue = self.server.context.jobqueue
209
210     # TODO: Parameter validation
211
212     if method == luxi.REQ_SUBMIT_JOB:
213       logging.info("Received new job")
214       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215       return queue.SubmitJob(ops)
216
217     if method == luxi.REQ_SUBMIT_MANY_JOBS:
218       logging.info("Received multiple jobs")
219       jobs = []
220       for ops in args:
221         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222       return queue.SubmitManyJobs(jobs)
223
224     elif method == luxi.REQ_CANCEL_JOB:
225       job_id = args
226       logging.info("Received job cancel request for %s", job_id)
227       return queue.CancelJob(job_id)
228
229     elif method == luxi.REQ_ARCHIVE_JOB:
230       job_id = args
231       logging.info("Received job archive request for %s", job_id)
232       return queue.ArchiveJob(job_id)
233
234     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
235       (age, timeout) = args
236       logging.info("Received job autoarchive request for age %s, timeout %s",
237                    age, timeout)
238       return queue.AutoArchiveJobs(age, timeout)
239
240     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
241       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
242       logging.info("Received job poll request for %s", job_id)
243       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
244                                      prev_log_serial, timeout)
245
246     elif method == luxi.REQ_QUERY_JOBS:
247       (job_ids, fields) = args
248       if isinstance(job_ids, (tuple, list)) and job_ids:
249         msg = ", ".join(job_ids)
250       else:
251         msg = str(job_ids)
252       logging.info("Received job query request for %s", msg)
253       return queue.QueryJobs(job_ids, fields)
254
255     elif method == luxi.REQ_QUERY_INSTANCES:
256       (names, fields, use_locking) = args
257       logging.info("Received instance query request for %s", names)
258       if use_locking:
259         raise errors.OpPrereqError("Sync queries are not allowed",
260                                    errors.ECODE_INVAL)
261       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262                                     use_locking=use_locking)
263       return self._Query(op)
264
265     elif method == luxi.REQ_QUERY_NODES:
266       (names, fields, use_locking) = args
267       logging.info("Received node query request for %s", names)
268       if use_locking:
269         raise errors.OpPrereqError("Sync queries are not allowed",
270                                    errors.ECODE_INVAL)
271       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
272                                 use_locking=use_locking)
273       return self._Query(op)
274
275     elif method == luxi.REQ_QUERY_EXPORTS:
276       nodes, use_locking = args
277       if use_locking:
278         raise errors.OpPrereqError("Sync queries are not allowed",
279                                    errors.ECODE_INVAL)
280       logging.info("Received exports query request")
281       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
282       return self._Query(op)
283
284     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
285       fields = args
286       logging.info("Received config values query request for %s", fields)
287       op = opcodes.OpQueryConfigValues(output_fields=fields)
288       return self._Query(op)
289
290     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
291       logging.info("Received cluster info query request")
292       op = opcodes.OpQueryClusterInfo()
293       return self._Query(op)
294
295     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
296       drain_flag = args
297       logging.info("Received queue drain flag change request to %s",
298                    drain_flag)
299       return queue.SetDrainFlag(drain_flag)
300
301     elif method == luxi.REQ_SET_WATCHER_PAUSE:
302       (until, ) = args
303
304       if until is None:
305         logging.info("Received request to no longer pause the watcher")
306       else:
307         if not isinstance(until, (int, float)):
308           raise TypeError("Duration must be an integer or float")
309
310         if until < time.time():
311           raise errors.GenericError("Unable to set pause end time in the past")
312
313         logging.info("Received request to pause the watcher until %s", until)
314
315       return _SetWatcherPause(until)
316
317     else:
318       logging.info("Received invalid request '%s'", method)
319       raise ValueError("Invalid operation '%s'" % method)
320
321   def _Query(self, op):
322     """Runs the specified opcode and returns the result.
323
324     """
325     proc = mcpu.Processor(self.server.context)
326     return proc.ExecOpCode(op, None)
327
328
329 class GanetiContext(object):
330   """Context common to all ganeti threads.
331
332   This class creates and holds common objects shared by all threads.
333
334   """
335   _instance = None
336
337   def __init__(self):
338     """Constructs a new GanetiContext object.
339
340     There should be only a GanetiContext object at any time, so this
341     function raises an error if this is not the case.
342
343     """
344     assert self.__class__._instance is None, "double GanetiContext instance"
345
346     # Create global configuration object
347     self.cfg = config.ConfigWriter()
348
349     # Locking manager
350     self.glm = locking.GanetiLockManager(
351                 self.cfg.GetNodeList(),
352                 self.cfg.GetInstanceList())
353
354     # Job queue
355     self.jobqueue = jqueue.JobQueue(self)
356
357     # setting this also locks the class against attribute modifications
358     self.__class__._instance = self
359
360   def __setattr__(self, name, value):
361     """Setting GanetiContext attributes is forbidden after initialization.
362
363     """
364     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
365     object.__setattr__(self, name, value)
366
367   def AddNode(self, node):
368     """Adds a node to the configuration and lock manager.
369
370     """
371     # Add it to the configuration
372     self.cfg.AddNode(node)
373
374     # If preseeding fails it'll not be added
375     self.jobqueue.AddNode(node)
376
377     # Add the new node to the Ganeti Lock Manager
378     self.glm.add(locking.LEVEL_NODE, node.name)
379
380   def ReaddNode(self, node):
381     """Updates a node that's already in the configuration
382
383     """
384     # Synchronize the queue again
385     self.jobqueue.AddNode(node)
386
387   def RemoveNode(self, name):
388     """Removes a node from the configuration and lock manager.
389
390     """
391     # Remove node from configuration
392     self.cfg.RemoveNode(name)
393
394     # Notify job queue
395     self.jobqueue.RemoveNode(name)
396
397     # Remove the node from the Ganeti Lock Manager
398     self.glm.remove(locking.LEVEL_NODE, name)
399
400
401 def _SetWatcherPause(until):
402   """Creates or removes the watcher pause file.
403
404   @type until: None or int
405   @param until: Unix timestamp saying until when the watcher shouldn't run
406
407   """
408   if until is None:
409     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
410   else:
411     utils.WriteFile(constants.WATCHER_PAUSEFILE,
412                     data="%d\n" % (until, ))
413
414   return until
415
416
417 def CheckAgreement():
418   """Check the agreement on who is the master.
419
420   The function uses a very simple algorithm: we must get more positive
421   than negative answers. Since in most of the cases we are the master,
422   we'll use our own config file for getting the node list. In the
423   future we could collect the current node list from our (possibly
424   obsolete) known nodes.
425
426   In order to account for cold-start of all nodes, we retry for up to
427   a minute until we get a real answer as the top-voted one. If the
428   nodes are more out-of-sync, for now manual startup of the master
429   should be attempted.
430
431   Note that for a even number of nodes cluster, we need at least half
432   of the nodes (beside ourselves) to vote for us. This creates a
433   problem on two-node clusters, since in this case we require the
434   other node to be up too to confirm our status.
435
436   """
437   myself = utils.HostInfo().name
438   #temp instantiation of a config writer, used only to get the node list
439   cfg = config.ConfigWriter()
440   node_list = cfg.GetNodeList()
441   del cfg
442   retries = 6
443   while retries > 0:
444     votes = bootstrap.GatherMasterVotes(node_list)
445     if not votes:
446       # empty node list, this is a one node cluster
447       return True
448     if votes[0][0] is None:
449       retries -= 1
450       time.sleep(10)
451       continue
452     break
453   if retries == 0:
454     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
455                      " after multiple retries. Aborting startup")
456     return False
457   # here a real node is at the top of the list
458   all_votes = sum(item[1] for item in votes)
459   top_node, top_votes = votes[0]
460
461   result = False
462   if top_node != myself:
463     logging.critical("It seems we are not the master (top-voted node"
464                      " is %s with %d out of %d votes)", top_node, top_votes,
465                      all_votes)
466   elif top_votes < all_votes - top_votes:
467     logging.critical("It seems we are not the master (%d votes for,"
468                      " %d votes against)", top_votes, all_votes - top_votes)
469   else:
470     result = True
471
472   return result
473
474
475 def CheckAgreementWithRpc():
476   rpc.Init()
477   try:
478     return CheckAgreement()
479   finally:
480     rpc.Shutdown()
481
482
483 def _RunInSeparateProcess(fn):
484   """Runs a function in a separate process.
485
486   Note: Only boolean return values are supported.
487
488   @type fn: callable
489   @param fn: Function to be called
490   @rtype: bool
491
492   """
493   pid = os.fork()
494   if pid == 0:
495     # Child process
496     try:
497       # Call function
498       result = int(bool(fn()))
499       assert result in (0, 1)
500     except:
501       logging.exception("Error while calling function in separate process")
502       # 0 and 1 are reserved for the return value
503       result = 33
504
505     os._exit(result)
506
507   # Parent process
508
509   # Avoid zombies and check exit code
510   (_, status) = os.waitpid(pid, 0)
511
512   if os.WIFSIGNALED(status):
513     signum = os.WTERMSIG(status)
514     exitcode = None
515   else:
516     signum = None
517     exitcode = os.WEXITSTATUS(status)
518
519   if not (exitcode in (0, 1) and signum is None):
520     logging.error("Child program failed (code=%s, signal=%s)",
521                   exitcode, signum)
522     sys.exit(constants.EXIT_FAILURE)
523
524   return bool(exitcode)
525
526
527 def CheckMasterd(options, args):
528   """Initial checks whether to run or exit with a failure.
529
530   """
531   ssconf.CheckMaster(options.debug)
532
533   # If CheckMaster didn't fail we believe we are the master, but we have to
534   # confirm with the other nodes.
535   if options.no_voting:
536     if options.yes_do_it:
537       return
538
539     sys.stdout.write("The 'no voting' option has been selected.\n")
540     sys.stdout.write("This is dangerous, please confirm by"
541                      " typing uppercase 'yes': ")
542     sys.stdout.flush()
543
544     confirmation = sys.stdin.readline().strip()
545     if confirmation != "YES":
546       print >>sys.stderr, "Aborting."
547       sys.exit(constants.EXIT_FAILURE)
548
549     return
550
551   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
552   # process before we call utils.Daemonize in the current process.
553   if not _RunInSeparateProcess(CheckAgreementWithRpc):
554     sys.exit(constants.EXIT_FAILURE)
555
556
557 def ExecMasterd (options, args):
558   """Main master daemon function, executed with the PID file held.
559
560   """
561   # This is safe to do as the pid file guarantees against
562   # concurrent execution.
563   utils.RemoveFile(constants.MASTER_SOCKET)
564
565   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
566   try:
567     rpc.Init()
568     try:
569       # activate ip
570       master_node = ssconf.SimpleStore().GetMasterNode()
571       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
572       msg = result.fail_msg
573       if msg:
574         logging.error("Can't activate master IP address: %s", msg)
575
576       master.setup_queue()
577       try:
578         master.serve_forever()
579       finally:
580         master.server_cleanup()
581     finally:
582       rpc.Shutdown()
583   finally:
584     utils.RemoveFile(constants.MASTER_SOCKET)
585
586
587 def main():
588   """Main function"""
589   parser = OptionParser(description="Ganeti master daemon",
590                         usage="%prog [-f] [-d]",
591                         version="%%prog (ganeti) %s" %
592                         constants.RELEASE_VERSION)
593   parser.add_option("--no-voting", dest="no_voting",
594                     help="Do not check that the nodes agree on this node"
595                     " being the master and start the daemon unconditionally",
596                     default=False, action="store_true")
597   parser.add_option("--yes-do-it", dest="yes_do_it",
598                     help="Override interactive check for --no-voting",
599                     default=False, action="store_true")
600   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
601           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
602          ]
603   daemon.GenericMain(constants.MASTERD, parser, dirs,
604                      CheckMasterd, ExecMasterd)
605
606
607 if __name__ == "__main__":
608   main()