Move ganeti-masterd to ganeti.server.masterd
[ganeti-local] / lib / server / masterd.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58
59
60 CLIENT_REQUEST_WORKERS = 16
61
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64
65
66 class ClientRequestWorker(workerpool.BaseWorker):
67   # pylint: disable-msg=W0221
68   def RunTask(self, server, message, client):
69     """Process the request.
70
71     """
72     client_ops = ClientOps(server)
73
74     try:
75       (method, args, version) = luxi.ParseRequest(message)
76     except luxi.ProtocolError, err:
77       logging.error("Protocol Error: %s", err)
78       client.close_log()
79       return
80
81     success = False
82     try:
83       # Verify client's version if there was one in the request
84       if version is not None and version != constants.LUXI_VERSION:
85         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
86                                (constants.LUXI_VERSION, version))
87
88       result = client_ops.handle_request(method, args)
89       success = True
90     except errors.GenericError, err:
91       logging.exception("Unexpected exception")
92       success = False
93       result = errors.EncodeException(err)
94     except:
95       logging.exception("Unexpected exception")
96       err = sys.exc_info()
97       result = "Caught exception: %s" % str(err[1])
98
99     try:
100       reply = luxi.FormatResponse(success, result)
101       client.send_message(reply)
102       # awake the main thread so that it can write out the data.
103       server.awaker.signal()
104     except: # pylint: disable-msg=W0702
105       logging.exception("Send error")
106       client.close_log()
107
108
109 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
110   """Handler for master peers.
111
112   """
113   _MAX_UNHANDLED = 1
114   def __init__(self, server, connected_socket, client_address, family):
115     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
116                                                  client_address,
117                                                  constants.LUXI_EOM,
118                                                  family, self._MAX_UNHANDLED)
119     self.server = server
120
121   def handle_message(self, message, _):
122     self.server.request_workers.AddTask((self.server, message, self))
123
124
125 class MasterServer(daemon.AsyncStreamServer):
126   """Master Server.
127
128   This is the main asynchronous master server. It handles connections to the
129   master socket.
130
131   """
132   family = socket.AF_UNIX
133
134   def __init__(self, mainloop, address, uid, gid):
135     """MasterServer constructor
136
137     @type mainloop: ganeti.daemon.Mainloop
138     @param mainloop: Mainloop used to poll for I/O events
139     @param address: the unix socket address to bind the MasterServer to
140     @param uid: The uid of the owner of the socket
141     @param gid: The gid of the owner of the socket
142
143     """
144     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
145     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
146     os.chmod(temp_name, 0770)
147     os.chown(temp_name, uid, gid)
148     os.rename(temp_name, address)
149
150     self.mainloop = mainloop
151     self.awaker = daemon.AsyncAwaker()
152
153     # We'll only start threads once we've forked.
154     self.context = None
155     self.request_workers = None
156
157   def handle_connection(self, connected_socket, client_address):
158     # TODO: add connection count and limit the number of open connections to a
159     # maximum number to avoid breaking for lack of file descriptors or memory.
160     MasterClientHandler(self, connected_socket, client_address, self.family)
161
162   def setup_queue(self):
163     self.context = GanetiContext()
164     self.request_workers = workerpool.WorkerPool("ClientReq",
165                                                  CLIENT_REQUEST_WORKERS,
166                                                  ClientRequestWorker)
167
168   def server_cleanup(self):
169     """Cleanup the server.
170
171     This involves shutting down the processor threads and the master
172     socket.
173
174     """
175     try:
176       self.close()
177     finally:
178       if self.request_workers:
179         self.request_workers.TerminateWorkers()
180       if self.context:
181         self.context.jobqueue.Shutdown()
182
183
184 class ClientOps:
185   """Class holding high-level client operations."""
186   def __init__(self, server):
187     self.server = server
188
189   def handle_request(self, method, args): # pylint: disable-msg=R0911
190     queue = self.server.context.jobqueue
191
192     # TODO: Parameter validation
193
194     # TODO: Rewrite to not exit in each 'if/elif' branch
195
196     if method == luxi.REQ_SUBMIT_JOB:
197       logging.info("Received new job")
198       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
199       return queue.SubmitJob(ops)
200
201     if method == luxi.REQ_SUBMIT_MANY_JOBS:
202       logging.info("Received multiple jobs")
203       jobs = []
204       for ops in args:
205         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
206       return queue.SubmitManyJobs(jobs)
207
208     elif method == luxi.REQ_CANCEL_JOB:
209       job_id = args
210       logging.info("Received job cancel request for %s", job_id)
211       return queue.CancelJob(job_id)
212
213     elif method == luxi.REQ_ARCHIVE_JOB:
214       job_id = args
215       logging.info("Received job archive request for %s", job_id)
216       return queue.ArchiveJob(job_id)
217
218     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
219       (age, timeout) = args
220       logging.info("Received job autoarchive request for age %s, timeout %s",
221                    age, timeout)
222       return queue.AutoArchiveJobs(age, timeout)
223
224     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
225       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
226       logging.info("Received job poll request for %s", job_id)
227       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
228                                      prev_log_serial, timeout)
229
230     elif method == luxi.REQ_QUERY_JOBS:
231       (job_ids, fields) = args
232       if isinstance(job_ids, (tuple, list)) and job_ids:
233         msg = utils.CommaJoin(job_ids)
234       else:
235         msg = str(job_ids)
236       logging.info("Received job query request for %s", msg)
237       return queue.QueryJobs(job_ids, fields)
238
239     elif method == luxi.REQ_QUERY_INSTANCES:
240       (names, fields, use_locking) = args
241       logging.info("Received instance query request for %s", names)
242       if use_locking:
243         raise errors.OpPrereqError("Sync queries are not allowed",
244                                    errors.ECODE_INVAL)
245       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
246                                     use_locking=use_locking)
247       return self._Query(op)
248
249     elif method == luxi.REQ_QUERY_NODES:
250       (names, fields, use_locking) = args
251       logging.info("Received node query request for %s", names)
252       if use_locking:
253         raise errors.OpPrereqError("Sync queries are not allowed",
254                                    errors.ECODE_INVAL)
255       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
256                                 use_locking=use_locking)
257       return self._Query(op)
258
259     elif method == luxi.REQ_QUERY_EXPORTS:
260       nodes, use_locking = args
261       if use_locking:
262         raise errors.OpPrereqError("Sync queries are not allowed",
263                                    errors.ECODE_INVAL)
264       logging.info("Received exports query request")
265       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
266       return self._Query(op)
267
268     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
269       fields = args
270       logging.info("Received config values query request for %s", fields)
271       op = opcodes.OpQueryConfigValues(output_fields=fields)
272       return self._Query(op)
273
274     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
275       logging.info("Received cluster info query request")
276       op = opcodes.OpQueryClusterInfo()
277       return self._Query(op)
278
279     elif method == luxi.REQ_QUERY_TAGS:
280       kind, name = args
281       logging.info("Received tags query request")
282       op = opcodes.OpGetTags(kind=kind, name=name)
283       return self._Query(op)
284
285     elif method == luxi.REQ_QUERY_LOCKS:
286       (fields, sync) = args
287       logging.info("Received locks query request")
288       return self.server.context.glm.QueryLocks(fields, sync)
289
290     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
291       drain_flag = args
292       logging.info("Received queue drain flag change request to %s",
293                    drain_flag)
294       return queue.SetDrainFlag(drain_flag)
295
296     elif method == luxi.REQ_SET_WATCHER_PAUSE:
297       (until, ) = args
298
299       if until is None:
300         logging.info("Received request to no longer pause the watcher")
301       else:
302         if not isinstance(until, (int, float)):
303           raise TypeError("Duration must be an integer or float")
304
305         if until < time.time():
306           raise errors.GenericError("Unable to set pause end time in the past")
307
308         logging.info("Received request to pause the watcher until %s", until)
309
310       return _SetWatcherPause(until)
311
312     else:
313       logging.info("Received invalid request '%s'", method)
314       raise ValueError("Invalid operation '%s'" % method)
315
316   def _Query(self, op):
317     """Runs the specified opcode and returns the result.
318
319     """
320     # Queries don't have a job id
321     proc = mcpu.Processor(self.server.context, None)
322
323     # TODO: Executing an opcode using locks will acquire them in blocking mode.
324     # Consider using a timeout for retries.
325     return proc.ExecOpCode(op, None)
326
327
328 class GanetiContext(object):
329   """Context common to all ganeti threads.
330
331   This class creates and holds common objects shared by all threads.
332
333   """
334   # pylint: disable-msg=W0212
335   # we do want to ensure a singleton here
336   _instance = None
337
338   def __init__(self):
339     """Constructs a new GanetiContext object.
340
341     There should be only a GanetiContext object at any time, so this
342     function raises an error if this is not the case.
343
344     """
345     assert self.__class__._instance is None, "double GanetiContext instance"
346
347     # Create global configuration object
348     self.cfg = config.ConfigWriter()
349
350     # Locking manager
351     self.glm = locking.GanetiLockManager(
352                 self.cfg.GetNodeList(),
353                 self.cfg.GetInstanceList())
354
355     # Job queue
356     self.jobqueue = jqueue.JobQueue(self)
357
358     # setting this also locks the class against attribute modifications
359     self.__class__._instance = self
360
361   def __setattr__(self, name, value):
362     """Setting GanetiContext attributes is forbidden after initialization.
363
364     """
365     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
366     object.__setattr__(self, name, value)
367
368   def AddNode(self, node, ec_id):
369     """Adds a node to the configuration and lock manager.
370
371     """
372     # Add it to the configuration
373     self.cfg.AddNode(node, ec_id)
374
375     # If preseeding fails it'll not be added
376     self.jobqueue.AddNode(node)
377
378     # Add the new node to the Ganeti Lock Manager
379     self.glm.add(locking.LEVEL_NODE, node.name)
380
381   def ReaddNode(self, node):
382     """Updates a node that's already in the configuration
383
384     """
385     # Synchronize the queue again
386     self.jobqueue.AddNode(node)
387
388   def RemoveNode(self, name):
389     """Removes a node from the configuration and lock manager.
390
391     """
392     # Remove node from configuration
393     self.cfg.RemoveNode(name)
394
395     # Notify job queue
396     self.jobqueue.RemoveNode(name)
397
398     # Remove the node from the Ganeti Lock Manager
399     self.glm.remove(locking.LEVEL_NODE, name)
400
401
402 def _SetWatcherPause(until):
403   """Creates or removes the watcher pause file.
404
405   @type until: None or int
406   @param until: Unix timestamp saying until when the watcher shouldn't run
407
408   """
409   if until is None:
410     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
411   else:
412     utils.WriteFile(constants.WATCHER_PAUSEFILE,
413                     data="%d\n" % (until, ))
414
415   return until
416
417
418 @rpc.RunWithRPC
419 def CheckAgreement():
420   """Check the agreement on who is the master.
421
422   The function uses a very simple algorithm: we must get more positive
423   than negative answers. Since in most of the cases we are the master,
424   we'll use our own config file for getting the node list. In the
425   future we could collect the current node list from our (possibly
426   obsolete) known nodes.
427
428   In order to account for cold-start of all nodes, we retry for up to
429   a minute until we get a real answer as the top-voted one. If the
430   nodes are more out-of-sync, for now manual startup of the master
431   should be attempted.
432
433   Note that for a even number of nodes cluster, we need at least half
434   of the nodes (beside ourselves) to vote for us. This creates a
435   problem on two-node clusters, since in this case we require the
436   other node to be up too to confirm our status.
437
438   """
439   myself = netutils.Hostname.GetSysName()
440   #temp instantiation of a config writer, used only to get the node list
441   cfg = config.ConfigWriter()
442   node_list = cfg.GetNodeList()
443   del cfg
444   retries = 6
445   while retries > 0:
446     votes = bootstrap.GatherMasterVotes(node_list)
447     if not votes:
448       # empty node list, this is a one node cluster
449       return True
450     if votes[0][0] is None:
451       retries -= 1
452       time.sleep(10)
453       continue
454     break
455   if retries == 0:
456     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
457                      " after multiple retries. Aborting startup")
458     logging.critical("Use the --no-voting option if you understand what"
459                      " effects it has on the cluster state")
460     return False
461   # here a real node is at the top of the list
462   all_votes = sum(item[1] for item in votes)
463   top_node, top_votes = votes[0]
464
465   result = False
466   if top_node != myself:
467     logging.critical("It seems we are not the master (top-voted node"
468                      " is %s with %d out of %d votes)", top_node, top_votes,
469                      all_votes)
470   elif top_votes < all_votes - top_votes:
471     logging.critical("It seems we are not the master (%d votes for,"
472                      " %d votes against)", top_votes, all_votes - top_votes)
473   else:
474     result = True
475
476   return result
477
478
479 @rpc.RunWithRPC
480 def ActivateMasterIP():
481   # activate ip
482   master_node = ssconf.SimpleStore().GetMasterNode()
483   result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
484   msg = result.fail_msg
485   if msg:
486     logging.error("Can't activate master IP address: %s", msg)
487
488
489 def CheckMasterd(options, args):
490   """Initial checks whether to run or exit with a failure.
491
492   """
493   if args: # masterd doesn't take any arguments
494     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
495     sys.exit(constants.EXIT_FAILURE)
496
497   ssconf.CheckMaster(options.debug)
498
499   try:
500     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
501     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
502   except KeyError:
503     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
504                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
505     sys.exit(constants.EXIT_FAILURE)
506
507   # Check the configuration is sane before anything else
508   try:
509     config.ConfigWriter()
510   except errors.ConfigVersionMismatch, err:
511     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
512     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
513     print >> sys.stderr,  \
514         ("Configuration version mismatch. The current Ganeti software"
515          " expects version %s, but the on-disk configuration file has"
516          " version %s. This is likely the result of upgrading the"
517          " software without running the upgrade procedure. Please contact"
518          " your cluster administrator or complete the upgrade using the"
519          " cfgupgrade utility, after reading the upgrade notes." %
520          (v1, v2))
521     sys.exit(constants.EXIT_FAILURE)
522   except errors.ConfigurationError, err:
523     print >> sys.stderr, \
524         ("Configuration error while opening the configuration file: %s\n"
525          "This might be caused by an incomplete software upgrade or"
526          " by a corrupted configuration file. Until the problem is fixed"
527          " the master daemon cannot start." % str(err))
528     sys.exit(constants.EXIT_FAILURE)
529
530   # If CheckMaster didn't fail we believe we are the master, but we have to
531   # confirm with the other nodes.
532   if options.no_voting:
533     if options.yes_do_it:
534       return
535
536     sys.stdout.write("The 'no voting' option has been selected.\n")
537     sys.stdout.write("This is dangerous, please confirm by"
538                      " typing uppercase 'yes': ")
539     sys.stdout.flush()
540
541     confirmation = sys.stdin.readline().strip()
542     if confirmation != "YES":
543       print >> sys.stderr, "Aborting."
544       sys.exit(constants.EXIT_FAILURE)
545
546     return
547
548   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
549   # process before we call utils.Daemonize in the current process.
550   if not utils.RunInSeparateProcess(CheckAgreement):
551     sys.exit(constants.EXIT_FAILURE)
552
553   # ActivateMasterIP also uses RPC/threads, so we run it again via a
554   # separate process.
555
556   # TODO: decide whether failure to activate the master IP is a fatal error
557   utils.RunInSeparateProcess(ActivateMasterIP)
558
559
560 def PrepMasterd(options, _):
561   """Prep master daemon function, executed with the PID file held.
562
563   """
564   # This is safe to do as the pid file guarantees against
565   # concurrent execution.
566   utils.RemoveFile(constants.MASTER_SOCKET)
567
568   mainloop = daemon.Mainloop()
569   master = MasterServer(mainloop, constants.MASTER_SOCKET,
570                         options.uid, options.gid)
571   return (mainloop, master)
572
573
574 def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
575   """Main master daemon function, executed with the PID file held.
576
577   """
578   (mainloop, master) = prep_data
579   try:
580     rpc.Init()
581     try:
582       master.setup_queue()
583       try:
584         mainloop.Run()
585       finally:
586         master.server_cleanup()
587     finally:
588       rpc.Shutdown()
589   finally:
590     utils.RemoveFile(constants.MASTER_SOCKET)
591
592
593 def Main():
594   """Main function"""
595   parser = OptionParser(description="Ganeti master daemon",
596                         usage="%prog [-f] [-d]",
597                         version="%%prog (ganeti) %s" %
598                         constants.RELEASE_VERSION)
599   parser.add_option("--no-voting", dest="no_voting",
600                     help="Do not check that the nodes agree on this node"
601                     " being the master and start the daemon unconditionally",
602                     default=False, action="store_true")
603   parser.add_option("--yes-do-it", dest="yes_do_it",
604                     help="Override interactive check for --no-voting",
605                     default=False, action="store_true")
606   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
607                      ExecMasterd, multithreaded=True)