9b0851fe1b43223606e5ae1d80d0404bd781058a
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import os
31 import sys
32 import SocketServer
33 import time
34 import collections
35 import signal
36 import logging
37
38 from optparse import OptionParser
39
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import daemon
43 from ganeti import mcpu
44 from ganeti import opcodes
45 from ganeti import jqueue
46 from ganeti import locking
47 from ganeti import luxi
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti import ssconf
51 from ganeti import workerpool
52 from ganeti import rpc
53 from ganeti import bootstrap
54 from ganeti import serializer
55
56
57 CLIENT_REQUEST_WORKERS = 16
58
59 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61
62
63 class ClientRequestWorker(workerpool.BaseWorker):
64   def RunTask(self, server, request, client_address):
65     """Process the request.
66
67     This is copied from the code in ThreadingMixIn.
68
69     """
70     try:
71       server.finish_request(request, client_address)
72       server.close_request(request)
73     except:
74       server.handle_error(request, client_address)
75       server.close_request(request)
76
77
78 class IOServer(SocketServer.UnixStreamServer):
79   """IO thread class.
80
81   This class takes care of initializing the other threads, setting
82   signal handlers (which are processed only in this thread), and doing
83   cleanup at shutdown.
84
85   """
86   def __init__(self, address, rqhandler):
87     """IOServer constructor
88
89     @param address: the address to bind this IOServer to
90     @param rqhandler: RequestHandler type object
91
92     """
93     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94
95     # We'll only start threads once we've forked.
96     self.context = None
97     self.request_workers = None
98
99   def setup_queue(self):
100     self.context = GanetiContext()
101     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
102                                                  ClientRequestWorker)
103
104   def process_request(self, request, client_address):
105     """Add task to workerpool to process request.
106
107     """
108     self.request_workers.AddTask(self, request, client_address)
109
110   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
111   def serve_forever(self, signal_handlers=None):
112     """Handle one request at a time until told to quit."""
113     assert isinstance(signal_handlers, dict) and \
114            len(signal_handlers) > 0, \
115            "Broken SignalHandled decorator"
116     # Since we use SignalHandled only once, the resulting dict will map all
117     # signals to the same handler. We'll just use the first one.
118     sighandler = signal_handlers.values()[0]
119     while not sighandler.called:
120       self.handle_request()
121
122   def server_cleanup(self):
123     """Cleanup the server.
124
125     This involves shutting down the processor threads and the master
126     socket.
127
128     """
129     try:
130       self.server_close()
131     finally:
132       if self.request_workers:
133         self.request_workers.TerminateWorkers()
134       if self.context:
135         self.context.jobqueue.Shutdown()
136
137
138 class ClientRqHandler(SocketServer.BaseRequestHandler):
139   """Client handler"""
140   EOM = '\3'
141   READ_SIZE = 4096
142
143   def setup(self):
144     self._buffer = ""
145     self._msgs = collections.deque()
146     self._ops = ClientOps(self.server)
147
148   def handle(self):
149     while True:
150       msg = self.read_message()
151       if msg is None:
152         logging.debug("client closed connection")
153         break
154
155       request = serializer.LoadJson(msg)
156       logging.debug("request: %s", request)
157       if not isinstance(request, dict):
158         logging.error("wrong request received: %s", msg)
159         break
160
161       method = request.get(luxi.KEY_METHOD, None)
162       args = request.get(luxi.KEY_ARGS, None)
163       if method is None or args is None:
164         logging.error("no method or args in request")
165         break
166
167       success = False
168       try:
169         result = self._ops.handle_request(method, args)
170         success = True
171       except errors.GenericError, err:
172         success = False
173         result = errors.EncodeException(err)
174       except:
175         logging.error("Unexpected exception", exc_info=True)
176         err = sys.exc_info()
177         result = "Caught exception: %s" % str(err[1])
178
179       response = {
180         luxi.KEY_SUCCESS: success,
181         luxi.KEY_RESULT: result,
182         }
183       logging.debug("response: %s", response)
184       self.send_message(serializer.DumpJson(response))
185
186   def read_message(self):
187     while not self._msgs:
188       data = self.request.recv(self.READ_SIZE)
189       if not data:
190         return None
191       new_msgs = (self._buffer + data).split(self.EOM)
192       self._buffer = new_msgs.pop()
193       self._msgs.extend(new_msgs)
194     return self._msgs.popleft()
195
196   def send_message(self, msg):
197     #print "sending", msg
198     # TODO: sendall is not guaranteed to send everything
199     self.request.sendall(msg + self.EOM)
200
201
202 class ClientOps:
203   """Class holding high-level client operations."""
204   def __init__(self, server):
205     self.server = server
206
207   def handle_request(self, method, args):
208     queue = self.server.context.jobqueue
209
210     # TODO: Parameter validation
211
212     if method == luxi.REQ_SUBMIT_JOB:
213       logging.info("Received new job")
214       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215       return queue.SubmitJob(ops)
216
217     if method == luxi.REQ_SUBMIT_MANY_JOBS:
218       logging.info("Received multiple jobs")
219       jobs = []
220       for ops in args:
221         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222       return queue.SubmitManyJobs(jobs)
223
224     elif method == luxi.REQ_CANCEL_JOB:
225       job_id = args
226       logging.info("Received job cancel request for %s", job_id)
227       return queue.CancelJob(job_id)
228
229     elif method == luxi.REQ_ARCHIVE_JOB:
230       job_id = args
231       logging.info("Received job archive request for %s", job_id)
232       return queue.ArchiveJob(job_id)
233
234     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
235       (age, timeout) = args
236       logging.info("Received job autoarchive request for age %s, timeout %s",
237                    age, timeout)
238       return queue.AutoArchiveJobs(age, timeout)
239
240     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
241       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
242       logging.info("Received job poll request for %s", job_id)
243       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
244                                      prev_log_serial, timeout)
245
246     elif method == luxi.REQ_QUERY_JOBS:
247       (job_ids, fields) = args
248       if isinstance(job_ids, (tuple, list)) and job_ids:
249         msg = ", ".join(job_ids)
250       else:
251         msg = str(job_ids)
252       logging.info("Received job query request for %s", msg)
253       return queue.QueryJobs(job_ids, fields)
254
255     elif method == luxi.REQ_QUERY_INSTANCES:
256       (names, fields, use_locking) = args
257       logging.info("Received instance query request for %s", names)
258       if use_locking:
259         raise errors.OpPrereqError("Sync queries are not allowed",
260                                    errors.ECODE_INVAL)
261       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262                                     use_locking=use_locking)
263       return self._Query(op)
264
265     elif method == luxi.REQ_QUERY_NODES:
266       (names, fields, use_locking) = args
267       logging.info("Received node query request for %s", names)
268       if use_locking:
269         raise errors.OpPrereqError("Sync queries are not allowed",
270                                    errors.ECODE_INVAL)
271       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
272                                 use_locking=use_locking)
273       return self._Query(op)
274
275     elif method == luxi.REQ_QUERY_EXPORTS:
276       nodes, use_locking = args
277       if use_locking:
278         raise errors.OpPrereqError("Sync queries are not allowed",
279                                    errors.ECODE_INVAL)
280       logging.info("Received exports query request")
281       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
282       return self._Query(op)
283
284     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
285       fields = args
286       logging.info("Received config values query request for %s", fields)
287       op = opcodes.OpQueryConfigValues(output_fields=fields)
288       return self._Query(op)
289
290     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
291       logging.info("Received cluster info query request")
292       op = opcodes.OpQueryClusterInfo()
293       return self._Query(op)
294
295     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
296       drain_flag = args
297       logging.info("Received queue drain flag change request to %s",
298                    drain_flag)
299       return queue.SetDrainFlag(drain_flag)
300
301     elif method == luxi.REQ_SET_WATCHER_PAUSE:
302       (until, ) = args
303
304       if until is None:
305         logging.info("Received request to no longer pause the watcher")
306       else:
307         if not isinstance(until, (int, float)):
308           raise TypeError("Duration must be an integer or float")
309
310         if until < time.time():
311           raise errors.GenericError("Unable to set pause end time in the past")
312
313         logging.info("Received request to pause the watcher until %s", until)
314
315       return _SetWatcherPause(until)
316
317     else:
318       logging.info("Received invalid request '%s'", method)
319       raise ValueError("Invalid operation '%s'" % method)
320
321   def _Query(self, op):
322     """Runs the specified opcode and returns the result.
323
324     """
325     # Queries don't have a job id
326     proc = mcpu.Processor(self.server.context, None)
327     return proc.ExecOpCode(op, None)
328
329
330 class GanetiContext(object):
331   """Context common to all ganeti threads.
332
333   This class creates and holds common objects shared by all threads.
334
335   """
336   _instance = None
337
338   def __init__(self):
339     """Constructs a new GanetiContext object.
340
341     There should be only a GanetiContext object at any time, so this
342     function raises an error if this is not the case.
343
344     """
345     assert self.__class__._instance is None, "double GanetiContext instance"
346
347     # Create global configuration object
348     self.cfg = config.ConfigWriter()
349
350     # Locking manager
351     self.glm = locking.GanetiLockManager(
352                 self.cfg.GetNodeList(),
353                 self.cfg.GetInstanceList())
354
355     # Job queue
356     self.jobqueue = jqueue.JobQueue(self)
357
358     # setting this also locks the class against attribute modifications
359     self.__class__._instance = self
360
361   def __setattr__(self, name, value):
362     """Setting GanetiContext attributes is forbidden after initialization.
363
364     """
365     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
366     object.__setattr__(self, name, value)
367
368   def AddNode(self, node, ec_id):
369     """Adds a node to the configuration and lock manager.
370
371     """
372     # Add it to the configuration
373     self.cfg.AddNode(node, ec_id)
374
375     # If preseeding fails it'll not be added
376     self.jobqueue.AddNode(node)
377
378     # Add the new node to the Ganeti Lock Manager
379     self.glm.add(locking.LEVEL_NODE, node.name)
380
381   def ReaddNode(self, node):
382     """Updates a node that's already in the configuration
383
384     """
385     # Synchronize the queue again
386     self.jobqueue.AddNode(node)
387
388   def RemoveNode(self, name):
389     """Removes a node from the configuration and lock manager.
390
391     """
392     # Remove node from configuration
393     self.cfg.RemoveNode(name)
394
395     # Notify job queue
396     self.jobqueue.RemoveNode(name)
397
398     # Remove the node from the Ganeti Lock Manager
399     self.glm.remove(locking.LEVEL_NODE, name)
400
401
402 def _SetWatcherPause(until):
403   """Creates or removes the watcher pause file.
404
405   @type until: None or int
406   @param until: Unix timestamp saying until when the watcher shouldn't run
407
408   """
409   if until is None:
410     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
411   else:
412     utils.WriteFile(constants.WATCHER_PAUSEFILE,
413                     data="%d\n" % (until, ))
414
415   return until
416
417
418 def CheckAgreement():
419   """Check the agreement on who is the master.
420
421   The function uses a very simple algorithm: we must get more positive
422   than negative answers. Since in most of the cases we are the master,
423   we'll use our own config file for getting the node list. In the
424   future we could collect the current node list from our (possibly
425   obsolete) known nodes.
426
427   In order to account for cold-start of all nodes, we retry for up to
428   a minute until we get a real answer as the top-voted one. If the
429   nodes are more out-of-sync, for now manual startup of the master
430   should be attempted.
431
432   Note that for a even number of nodes cluster, we need at least half
433   of the nodes (beside ourselves) to vote for us. This creates a
434   problem on two-node clusters, since in this case we require the
435   other node to be up too to confirm our status.
436
437   """
438   myself = utils.HostInfo().name
439   #temp instantiation of a config writer, used only to get the node list
440   cfg = config.ConfigWriter()
441   node_list = cfg.GetNodeList()
442   del cfg
443   retries = 6
444   while retries > 0:
445     votes = bootstrap.GatherMasterVotes(node_list)
446     if not votes:
447       # empty node list, this is a one node cluster
448       return True
449     if votes[0][0] is None:
450       retries -= 1
451       time.sleep(10)
452       continue
453     break
454   if retries == 0:
455     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
456                      " after multiple retries. Aborting startup")
457     return False
458   # here a real node is at the top of the list
459   all_votes = sum(item[1] for item in votes)
460   top_node, top_votes = votes[0]
461
462   result = False
463   if top_node != myself:
464     logging.critical("It seems we are not the master (top-voted node"
465                      " is %s with %d out of %d votes)", top_node, top_votes,
466                      all_votes)
467   elif top_votes < all_votes - top_votes:
468     logging.critical("It seems we are not the master (%d votes for,"
469                      " %d votes against)", top_votes, all_votes - top_votes)
470   else:
471     result = True
472
473   return result
474
475
476 def CheckAgreementWithRpc():
477   rpc.Init()
478   try:
479     return CheckAgreement()
480   finally:
481     rpc.Shutdown()
482
483
484 def _RunInSeparateProcess(fn):
485   """Runs a function in a separate process.
486
487   Note: Only boolean return values are supported.
488
489   @type fn: callable
490   @param fn: Function to be called
491   @rtype: bool
492
493   """
494   pid = os.fork()
495   if pid == 0:
496     # Child process
497     try:
498       # Call function
499       result = int(bool(fn()))
500       assert result in (0, 1)
501     except:
502       logging.exception("Error while calling function in separate process")
503       # 0 and 1 are reserved for the return value
504       result = 33
505
506     os._exit(result)
507
508   # Parent process
509
510   # Avoid zombies and check exit code
511   (_, status) = os.waitpid(pid, 0)
512
513   if os.WIFSIGNALED(status):
514     signum = os.WTERMSIG(status)
515     exitcode = None
516   else:
517     signum = None
518     exitcode = os.WEXITSTATUS(status)
519
520   if not (exitcode in (0, 1) and signum is None):
521     logging.error("Child program failed (code=%s, signal=%s)",
522                   exitcode, signum)
523     sys.exit(constants.EXIT_FAILURE)
524
525   return bool(exitcode)
526
527
528 def CheckMasterd(options, args):
529   """Initial checks whether to run or exit with a failure.
530
531   """
532   ssconf.CheckMaster(options.debug)
533
534   # If CheckMaster didn't fail we believe we are the master, but we have to
535   # confirm with the other nodes.
536   if options.no_voting:
537     if options.yes_do_it:
538       return
539
540     sys.stdout.write("The 'no voting' option has been selected.\n")
541     sys.stdout.write("This is dangerous, please confirm by"
542                      " typing uppercase 'yes': ")
543     sys.stdout.flush()
544
545     confirmation = sys.stdin.readline().strip()
546     if confirmation != "YES":
547       print >>sys.stderr, "Aborting."
548       sys.exit(constants.EXIT_FAILURE)
549
550     return
551
552   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
553   # process before we call utils.Daemonize in the current process.
554   if not _RunInSeparateProcess(CheckAgreementWithRpc):
555     sys.exit(constants.EXIT_FAILURE)
556
557
558 def ExecMasterd (options, args):
559   """Main master daemon function, executed with the PID file held.
560
561   """
562   # This is safe to do as the pid file guarantees against
563   # concurrent execution.
564   utils.RemoveFile(constants.MASTER_SOCKET)
565
566   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
567   try:
568     rpc.Init()
569     try:
570       # activate ip
571       master_node = ssconf.SimpleStore().GetMasterNode()
572       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
573       msg = result.fail_msg
574       if msg:
575         logging.error("Can't activate master IP address: %s", msg)
576
577       master.setup_queue()
578       try:
579         master.serve_forever()
580       finally:
581         master.server_cleanup()
582     finally:
583       rpc.Shutdown()
584   finally:
585     utils.RemoveFile(constants.MASTER_SOCKET)
586
587
588 def main():
589   """Main function"""
590   parser = OptionParser(description="Ganeti master daemon",
591                         usage="%prog [-f] [-d]",
592                         version="%%prog (ganeti) %s" %
593                         constants.RELEASE_VERSION)
594   parser.add_option("--no-voting", dest="no_voting",
595                     help="Do not check that the nodes agree on this node"
596                     " being the master and start the daemon unconditionally",
597                     default=False, action="store_true")
598   parser.add_option("--yes-do-it", dest="yes_do_it",
599                     help="Override interactive check for --no-voting",
600                     default=False, action="store_true")
601   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
602           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
603          ]
604   daemon.GenericMain(constants.MASTERD, parser, dirs,
605                      CheckMasterd, ExecMasterd)
606
607
608 if __name__ == "__main__":
609   main()