Merge branch 'devel-2.1' into stable-2.1
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import os
33 import sys
34 import SocketServer
35 import time
36 import collections
37 import signal
38 import logging
39
40 from optparse import OptionParser
41
42 from ganeti import config
43 from ganeti import constants
44 from ganeti import daemon
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import workerpool
54 from ganeti import rpc
55 from ganeti import bootstrap
56 from ganeti import serializer
57
58
59 CLIENT_REQUEST_WORKERS = 16
60
61 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63
64
65 class ClientRequestWorker(workerpool.BaseWorker):
66    # pylint: disable-msg=W0221
67   def RunTask(self, server, request, client_address):
68     """Process the request.
69
70     This is copied from the code in ThreadingMixIn.
71
72     """
73     try:
74       server.finish_request(request, client_address)
75       server.close_request(request)
76     except: # pylint: disable-msg=W0702
77       server.handle_error(request, client_address)
78       server.close_request(request)
79
80
81 class IOServer(SocketServer.UnixStreamServer):
82   """IO thread class.
83
84   This class takes care of initializing the other threads, setting
85   signal handlers (which are processed only in this thread), and doing
86   cleanup at shutdown.
87
88   """
89   def __init__(self, address, rqhandler):
90     """IOServer constructor
91
92     @param address: the address to bind this IOServer to
93     @param rqhandler: RequestHandler type object
94
95     """
96     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97
98     # We'll only start threads once we've forked.
99     self.context = None
100     self.request_workers = None
101
102   def setup_queue(self):
103     self.context = GanetiContext()
104     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105                                                  ClientRequestWorker)
106
107   def process_request(self, request, client_address):
108     """Add task to workerpool to process request.
109
110     """
111     self.request_workers.AddTask(self, request, client_address)
112
113   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114   def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115     """Handle one request at a time until told to quit."""
116     assert isinstance(signal_handlers, dict) and \
117            len(signal_handlers) > 0, \
118            "Broken SignalHandled decorator"
119     # Since we use SignalHandled only once, the resulting dict will map all
120     # signals to the same handler. We'll just use the first one.
121     sighandler = signal_handlers.values()[0]
122     while not sighandler.called:
123       self.handle_request()
124
125   def server_cleanup(self):
126     """Cleanup the server.
127
128     This involves shutting down the processor threads and the master
129     socket.
130
131     """
132     try:
133       self.server_close()
134     finally:
135       if self.request_workers:
136         self.request_workers.TerminateWorkers()
137       if self.context:
138         self.context.jobqueue.Shutdown()
139
140
141 class ClientRqHandler(SocketServer.BaseRequestHandler):
142   """Client handler"""
143   EOM = '\3'
144   READ_SIZE = 4096
145
146   def setup(self):
147     # pylint: disable-msg=W0201
148     # setup() is the api for initialising for this class
149     self._buffer = ""
150     self._msgs = collections.deque()
151     self._ops = ClientOps(self.server)
152
153   def handle(self):
154     while True:
155       msg = self.read_message()
156       if msg is None:
157         logging.debug("client closed connection")
158         break
159
160       request = serializer.LoadJson(msg)
161       logging.debug("request: %s", request)
162       if not isinstance(request, dict):
163         logging.error("wrong request received: %s", msg)
164         break
165
166       method = request.get(luxi.KEY_METHOD, None)
167       args = request.get(luxi.KEY_ARGS, None)
168       if method is None or args is None:
169         logging.error("no method or args in request")
170         break
171
172       success = False
173       try:
174         result = self._ops.handle_request(method, args)
175         success = True
176       except errors.GenericError, err:
177         success = False
178         result = errors.EncodeException(err)
179       except:
180         logging.error("Unexpected exception", exc_info=True)
181         err = sys.exc_info()
182         result = "Caught exception: %s" % str(err[1])
183
184       response = {
185         luxi.KEY_SUCCESS: success,
186         luxi.KEY_RESULT: result,
187         }
188       logging.debug("response: %s", response)
189       self.send_message(serializer.DumpJson(response))
190
191   def read_message(self):
192     while not self._msgs:
193       data = self.request.recv(self.READ_SIZE)
194       if not data:
195         return None
196       new_msgs = (self._buffer + data).split(self.EOM)
197       self._buffer = new_msgs.pop()
198       self._msgs.extend(new_msgs)
199     return self._msgs.popleft()
200
201   def send_message(self, msg):
202     #print "sending", msg
203     # TODO: sendall is not guaranteed to send everything
204     self.request.sendall(msg + self.EOM)
205
206
207 class ClientOps:
208   """Class holding high-level client operations."""
209   def __init__(self, server):
210     self.server = server
211
212   def handle_request(self, method, args): # pylint: disable-msg=R0911
213     queue = self.server.context.jobqueue
214
215     # TODO: Parameter validation
216
217     # TODO: Rewrite to not exit in each 'if/elif' branch
218
219     if method == luxi.REQ_SUBMIT_JOB:
220       logging.info("Received new job")
221       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222       return queue.SubmitJob(ops)
223
224     if method == luxi.REQ_SUBMIT_MANY_JOBS:
225       logging.info("Received multiple jobs")
226       jobs = []
227       for ops in args:
228         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229       return queue.SubmitManyJobs(jobs)
230
231     elif method == luxi.REQ_CANCEL_JOB:
232       job_id = args
233       logging.info("Received job cancel request for %s", job_id)
234       return queue.CancelJob(job_id)
235
236     elif method == luxi.REQ_ARCHIVE_JOB:
237       job_id = args
238       logging.info("Received job archive request for %s", job_id)
239       return queue.ArchiveJob(job_id)
240
241     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242       (age, timeout) = args
243       logging.info("Received job autoarchive request for age %s, timeout %s",
244                    age, timeout)
245       return queue.AutoArchiveJobs(age, timeout)
246
247     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249       logging.info("Received job poll request for %s", job_id)
250       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251                                      prev_log_serial, timeout)
252
253     elif method == luxi.REQ_QUERY_JOBS:
254       (job_ids, fields) = args
255       if isinstance(job_ids, (tuple, list)) and job_ids:
256         msg = utils.CommaJoin(job_ids)
257       else:
258         msg = str(job_ids)
259       logging.info("Received job query request for %s", msg)
260       return queue.QueryJobs(job_ids, fields)
261
262     elif method == luxi.REQ_QUERY_INSTANCES:
263       (names, fields, use_locking) = args
264       logging.info("Received instance query request for %s", names)
265       if use_locking:
266         raise errors.OpPrereqError("Sync queries are not allowed",
267                                    errors.ECODE_INVAL)
268       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269                                     use_locking=use_locking)
270       return self._Query(op)
271
272     elif method == luxi.REQ_QUERY_NODES:
273       (names, fields, use_locking) = args
274       logging.info("Received node query request for %s", names)
275       if use_locking:
276         raise errors.OpPrereqError("Sync queries are not allowed",
277                                    errors.ECODE_INVAL)
278       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279                                 use_locking=use_locking)
280       return self._Query(op)
281
282     elif method == luxi.REQ_QUERY_EXPORTS:
283       nodes, use_locking = args
284       if use_locking:
285         raise errors.OpPrereqError("Sync queries are not allowed",
286                                    errors.ECODE_INVAL)
287       logging.info("Received exports query request")
288       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289       return self._Query(op)
290
291     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292       fields = args
293       logging.info("Received config values query request for %s", fields)
294       op = opcodes.OpQueryConfigValues(output_fields=fields)
295       return self._Query(op)
296
297     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298       logging.info("Received cluster info query request")
299       op = opcodes.OpQueryClusterInfo()
300       return self._Query(op)
301
302     elif method == luxi.REQ_QUERY_TAGS:
303       kind, name = args
304       logging.info("Received tags query request")
305       op = opcodes.OpGetTags(kind=kind, name=name)
306       return self._Query(op)
307
308     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
309       drain_flag = args
310       logging.info("Received queue drain flag change request to %s",
311                    drain_flag)
312       return queue.SetDrainFlag(drain_flag)
313
314     elif method == luxi.REQ_SET_WATCHER_PAUSE:
315       (until, ) = args
316
317       if until is None:
318         logging.info("Received request to no longer pause the watcher")
319       else:
320         if not isinstance(until, (int, float)):
321           raise TypeError("Duration must be an integer or float")
322
323         if until < time.time():
324           raise errors.GenericError("Unable to set pause end time in the past")
325
326         logging.info("Received request to pause the watcher until %s", until)
327
328       return _SetWatcherPause(until)
329
330     else:
331       logging.info("Received invalid request '%s'", method)
332       raise ValueError("Invalid operation '%s'" % method)
333
334   def _Query(self, op):
335     """Runs the specified opcode and returns the result.
336
337     """
338     # Queries don't have a job id
339     proc = mcpu.Processor(self.server.context, None)
340     return proc.ExecOpCode(op, None)
341
342
343 class GanetiContext(object):
344   """Context common to all ganeti threads.
345
346   This class creates and holds common objects shared by all threads.
347
348   """
349   # pylint: disable-msg=W0212
350   # we do want to ensure a singleton here
351   _instance = None
352
353   def __init__(self):
354     """Constructs a new GanetiContext object.
355
356     There should be only a GanetiContext object at any time, so this
357     function raises an error if this is not the case.
358
359     """
360     assert self.__class__._instance is None, "double GanetiContext instance"
361
362     # Create global configuration object
363     self.cfg = config.ConfigWriter()
364
365     # Locking manager
366     self.glm = locking.GanetiLockManager(
367                 self.cfg.GetNodeList(),
368                 self.cfg.GetInstanceList())
369
370     # Job queue
371     self.jobqueue = jqueue.JobQueue(self)
372
373     # setting this also locks the class against attribute modifications
374     self.__class__._instance = self
375
376   def __setattr__(self, name, value):
377     """Setting GanetiContext attributes is forbidden after initialization.
378
379     """
380     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
381     object.__setattr__(self, name, value)
382
383   def AddNode(self, node, ec_id):
384     """Adds a node to the configuration and lock manager.
385
386     """
387     # Add it to the configuration
388     self.cfg.AddNode(node, ec_id)
389
390     # If preseeding fails it'll not be added
391     self.jobqueue.AddNode(node)
392
393     # Add the new node to the Ganeti Lock Manager
394     self.glm.add(locking.LEVEL_NODE, node.name)
395
396   def ReaddNode(self, node):
397     """Updates a node that's already in the configuration
398
399     """
400     # Synchronize the queue again
401     self.jobqueue.AddNode(node)
402
403   def RemoveNode(self, name):
404     """Removes a node from the configuration and lock manager.
405
406     """
407     # Remove node from configuration
408     self.cfg.RemoveNode(name)
409
410     # Notify job queue
411     self.jobqueue.RemoveNode(name)
412
413     # Remove the node from the Ganeti Lock Manager
414     self.glm.remove(locking.LEVEL_NODE, name)
415
416
417 def _SetWatcherPause(until):
418   """Creates or removes the watcher pause file.
419
420   @type until: None or int
421   @param until: Unix timestamp saying until when the watcher shouldn't run
422
423   """
424   if until is None:
425     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
426   else:
427     utils.WriteFile(constants.WATCHER_PAUSEFILE,
428                     data="%d\n" % (until, ))
429
430   return until
431
432
433 def CheckAgreement():
434   """Check the agreement on who is the master.
435
436   The function uses a very simple algorithm: we must get more positive
437   than negative answers. Since in most of the cases we are the master,
438   we'll use our own config file for getting the node list. In the
439   future we could collect the current node list from our (possibly
440   obsolete) known nodes.
441
442   In order to account for cold-start of all nodes, we retry for up to
443   a minute until we get a real answer as the top-voted one. If the
444   nodes are more out-of-sync, for now manual startup of the master
445   should be attempted.
446
447   Note that for a even number of nodes cluster, we need at least half
448   of the nodes (beside ourselves) to vote for us. This creates a
449   problem on two-node clusters, since in this case we require the
450   other node to be up too to confirm our status.
451
452   """
453   myself = utils.HostInfo().name
454   #temp instantiation of a config writer, used only to get the node list
455   cfg = config.ConfigWriter()
456   node_list = cfg.GetNodeList()
457   del cfg
458   retries = 6
459   while retries > 0:
460     votes = bootstrap.GatherMasterVotes(node_list)
461     if not votes:
462       # empty node list, this is a one node cluster
463       return True
464     if votes[0][0] is None:
465       retries -= 1
466       time.sleep(10)
467       continue
468     break
469   if retries == 0:
470     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
471                      " after multiple retries. Aborting startup")
472     return False
473   # here a real node is at the top of the list
474   all_votes = sum(item[1] for item in votes)
475   top_node, top_votes = votes[0]
476
477   result = False
478   if top_node != myself:
479     logging.critical("It seems we are not the master (top-voted node"
480                      " is %s with %d out of %d votes)", top_node, top_votes,
481                      all_votes)
482   elif top_votes < all_votes - top_votes:
483     logging.critical("It seems we are not the master (%d votes for,"
484                      " %d votes against)", top_votes, all_votes - top_votes)
485   else:
486     result = True
487
488   return result
489
490
491 def CheckAgreementWithRpc():
492   rpc.Init()
493   try:
494     return CheckAgreement()
495   finally:
496     rpc.Shutdown()
497
498
499 def _RunInSeparateProcess(fn):
500   """Runs a function in a separate process.
501
502   Note: Only boolean return values are supported.
503
504   @type fn: callable
505   @param fn: Function to be called
506   @rtype: bool
507
508   """
509   pid = os.fork()
510   if pid == 0:
511     # Child process
512     try:
513       # Call function
514       result = int(bool(fn()))
515       assert result in (0, 1)
516     except: # pylint: disable-msg=W0702
517       logging.exception("Error while calling function in separate process")
518       # 0 and 1 are reserved for the return value
519       result = 33
520
521     os._exit(result) # pylint: disable-msg=W0212
522
523   # Parent process
524
525   # Avoid zombies and check exit code
526   (_, status) = os.waitpid(pid, 0)
527
528   if os.WIFSIGNALED(status):
529     signum = os.WTERMSIG(status)
530     exitcode = None
531   else:
532     signum = None
533     exitcode = os.WEXITSTATUS(status)
534
535   if not (exitcode in (0, 1) and signum is None):
536     logging.error("Child program failed (code=%s, signal=%s)",
537                   exitcode, signum)
538     sys.exit(constants.EXIT_FAILURE)
539
540   return bool(exitcode)
541
542
543 def CheckMasterd(options, args):
544   """Initial checks whether to run or exit with a failure.
545
546   """
547   if args: # masterd doesn't take any arguments
548     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
549     sys.exit(constants.EXIT_FAILURE)
550
551   ssconf.CheckMaster(options.debug)
552
553   # If CheckMaster didn't fail we believe we are the master, but we have to
554   # confirm with the other nodes.
555   if options.no_voting:
556     if options.yes_do_it:
557       return
558
559     sys.stdout.write("The 'no voting' option has been selected.\n")
560     sys.stdout.write("This is dangerous, please confirm by"
561                      " typing uppercase 'yes': ")
562     sys.stdout.flush()
563
564     confirmation = sys.stdin.readline().strip()
565     if confirmation != "YES":
566       print >> sys.stderr, "Aborting."
567       sys.exit(constants.EXIT_FAILURE)
568
569     return
570
571   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
572   # process before we call utils.Daemonize in the current process.
573   if not _RunInSeparateProcess(CheckAgreementWithRpc):
574     sys.exit(constants.EXIT_FAILURE)
575
576
577 def ExecMasterd (options, args): # pylint: disable-msg=W0613
578   """Main master daemon function, executed with the PID file held.
579
580   """
581   # This is safe to do as the pid file guarantees against
582   # concurrent execution.
583   utils.RemoveFile(constants.MASTER_SOCKET)
584
585   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
586   try:
587     rpc.Init()
588     try:
589       # activate ip
590       master_node = ssconf.SimpleStore().GetMasterNode()
591       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
592       msg = result.fail_msg
593       if msg:
594         logging.error("Can't activate master IP address: %s", msg)
595
596       master.setup_queue()
597       try:
598         master.serve_forever()
599       finally:
600         master.server_cleanup()
601     finally:
602       rpc.Shutdown()
603   finally:
604     utils.RemoveFile(constants.MASTER_SOCKET)
605
606
607 def main():
608   """Main function"""
609   parser = OptionParser(description="Ganeti master daemon",
610                         usage="%prog [-f] [-d]",
611                         version="%%prog (ganeti) %s" %
612                         constants.RELEASE_VERSION)
613   parser.add_option("--no-voting", dest="no_voting",
614                     help="Do not check that the nodes agree on this node"
615                     " being the master and start the daemon unconditionally",
616                     default=False, action="store_true")
617   parser.add_option("--yes-do-it", dest="yes_do_it",
618                     help="Override interactive check for --no-voting",
619                     default=False, action="store_true")
620   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
621           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
622          ]
623   daemon.GenericMain(constants.MASTERD, parser, dirs,
624                      CheckMasterd, ExecMasterd)
625
626
627 if __name__ == "__main__":
628   main()