Split starting and stopping master IP and daemons
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66
67
68 class ClientRequestWorker(workerpool.BaseWorker):
69   # pylint: disable=W0221
70   def RunTask(self, server, message, client):
71     """Process the request.
72
73     """
74     client_ops = ClientOps(server)
75
76     try:
77       (method, args, version) = luxi.ParseRequest(message)
78     except luxi.ProtocolError, err:
79       logging.error("Protocol Error: %s", err)
80       client.close_log()
81       return
82
83     success = False
84     try:
85       # Verify client's version if there was one in the request
86       if version is not None and version != constants.LUXI_VERSION:
87         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88                                (constants.LUXI_VERSION, version))
89
90       result = client_ops.handle_request(method, args)
91       success = True
92     except errors.GenericError, err:
93       logging.exception("Unexpected exception")
94       success = False
95       result = errors.EncodeException(err)
96     except:
97       logging.exception("Unexpected exception")
98       err = sys.exc_info()
99       result = "Caught exception: %s" % str(err[1])
100
101     try:
102       reply = luxi.FormatResponse(success, result)
103       client.send_message(reply)
104       # awake the main thread so that it can write out the data.
105       server.awaker.signal()
106     except: # pylint: disable=W0702
107       logging.exception("Send error")
108       client.close_log()
109
110
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112   """Handler for master peers.
113
114   """
115   _MAX_UNHANDLED = 1
116
117   def __init__(self, server, connected_socket, client_address, family):
118     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119                                                  client_address,
120                                                  constants.LUXI_EOM,
121                                                  family, self._MAX_UNHANDLED)
122     self.server = server
123
124   def handle_message(self, message, _):
125     self.server.request_workers.AddTask((self.server, message, self))
126
127
128 class MasterServer(daemon.AsyncStreamServer):
129   """Master Server.
130
131   This is the main asynchronous master server. It handles connections to the
132   master socket.
133
134   """
135   family = socket.AF_UNIX
136
137   def __init__(self, mainloop, address, uid, gid):
138     """MasterServer constructor
139
140     @type mainloop: ganeti.daemon.Mainloop
141     @param mainloop: Mainloop used to poll for I/O events
142     @param address: the unix socket address to bind the MasterServer to
143     @param uid: The uid of the owner of the socket
144     @param gid: The gid of the owner of the socket
145
146     """
147     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149     os.chmod(temp_name, 0770)
150     os.chown(temp_name, uid, gid)
151     os.rename(temp_name, address)
152
153     self.mainloop = mainloop
154     self.awaker = daemon.AsyncAwaker()
155
156     # We'll only start threads once we've forked.
157     self.context = None
158     self.request_workers = None
159
160   def handle_connection(self, connected_socket, client_address):
161     # TODO: add connection count and limit the number of open connections to a
162     # maximum number to avoid breaking for lack of file descriptors or memory.
163     MasterClientHandler(self, connected_socket, client_address, self.family)
164
165   def setup_queue(self):
166     self.context = GanetiContext()
167     self.request_workers = workerpool.WorkerPool("ClientReq",
168                                                  CLIENT_REQUEST_WORKERS,
169                                                  ClientRequestWorker)
170
171   def server_cleanup(self):
172     """Cleanup the server.
173
174     This involves shutting down the processor threads and the master
175     socket.
176
177     """
178     try:
179       self.close()
180     finally:
181       if self.request_workers:
182         self.request_workers.TerminateWorkers()
183       if self.context:
184         self.context.jobqueue.Shutdown()
185
186
187 class ClientOps:
188   """Class holding high-level client operations."""
189   def __init__(self, server):
190     self.server = server
191
192   def handle_request(self, method, args): # pylint: disable=R0911
193     queue = self.server.context.jobqueue
194
195     # TODO: Parameter validation
196
197     # TODO: Rewrite to not exit in each 'if/elif' branch
198
199     if method == luxi.REQ_SUBMIT_JOB:
200       logging.info("Received new job")
201       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
202       return queue.SubmitJob(ops)
203
204     if method == luxi.REQ_SUBMIT_MANY_JOBS:
205       logging.info("Received multiple jobs")
206       jobs = []
207       for ops in args:
208         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
209       return queue.SubmitManyJobs(jobs)
210
211     elif method == luxi.REQ_CANCEL_JOB:
212       job_id = args
213       logging.info("Received job cancel request for %s", job_id)
214       return queue.CancelJob(job_id)
215
216     elif method == luxi.REQ_ARCHIVE_JOB:
217       job_id = args
218       logging.info("Received job archive request for %s", job_id)
219       return queue.ArchiveJob(job_id)
220
221     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
222       (age, timeout) = args
223       logging.info("Received job autoarchive request for age %s, timeout %s",
224                    age, timeout)
225       return queue.AutoArchiveJobs(age, timeout)
226
227     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229       logging.info("Received job poll request for %s", job_id)
230       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
231                                      prev_log_serial, timeout)
232
233     elif method == luxi.REQ_QUERY:
234       req = objects.QueryRequest.FromDict(args)
235
236       if req.what in constants.QR_VIA_OP:
237         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
238                                              filter=req.filter))
239       elif req.what == constants.QR_LOCK:
240         if req.filter is not None:
241           raise errors.OpPrereqError("Lock queries can't be filtered")
242         return self.server.context.glm.QueryLocks(req.fields)
243       elif req.what in constants.QR_VIA_LUXI:
244         raise NotImplementedError
245       else:
246         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
247                                    errors.ECODE_INVAL)
248
249       return result
250
251     elif method == luxi.REQ_QUERY_FIELDS:
252       req = objects.QueryFieldsRequest.FromDict(args)
253
254       try:
255         fielddefs = query.ALL_FIELDS[req.what]
256       except KeyError:
257         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
258                                    errors.ECODE_INVAL)
259
260       return query.QueryFields(fielddefs, req.fields)
261
262     elif method == luxi.REQ_QUERY_JOBS:
263       (job_ids, fields) = args
264       if isinstance(job_ids, (tuple, list)) and job_ids:
265         msg = utils.CommaJoin(job_ids)
266       else:
267         msg = str(job_ids)
268       logging.info("Received job query request for %s", msg)
269       return queue.QueryJobs(job_ids, fields)
270
271     elif method == luxi.REQ_QUERY_INSTANCES:
272       (names, fields, use_locking) = args
273       logging.info("Received instance query request for %s", names)
274       if use_locking:
275         raise errors.OpPrereqError("Sync queries are not allowed",
276                                    errors.ECODE_INVAL)
277       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
278                                    use_locking=use_locking)
279       return self._Query(op)
280
281     elif method == luxi.REQ_QUERY_NODES:
282       (names, fields, use_locking) = args
283       logging.info("Received node query request for %s", names)
284       if use_locking:
285         raise errors.OpPrereqError("Sync queries are not allowed",
286                                    errors.ECODE_INVAL)
287       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
288                                use_locking=use_locking)
289       return self._Query(op)
290
291     elif method == luxi.REQ_QUERY_GROUPS:
292       (names, fields, use_locking) = args
293       logging.info("Received group query request for %s", names)
294       if use_locking:
295         raise errors.OpPrereqError("Sync queries are not allowed",
296                                    errors.ECODE_INVAL)
297       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
298       return self._Query(op)
299
300     elif method == luxi.REQ_QUERY_EXPORTS:
301       nodes, use_locking = args
302       if use_locking:
303         raise errors.OpPrereqError("Sync queries are not allowed",
304                                    errors.ECODE_INVAL)
305       logging.info("Received exports query request")
306       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
307       return self._Query(op)
308
309     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
310       fields = args
311       logging.info("Received config values query request for %s", fields)
312       op = opcodes.OpClusterConfigQuery(output_fields=fields)
313       return self._Query(op)
314
315     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
316       logging.info("Received cluster info query request")
317       op = opcodes.OpClusterQuery()
318       return self._Query(op)
319
320     elif method == luxi.REQ_QUERY_TAGS:
321       kind, name = args
322       logging.info("Received tags query request")
323       op = opcodes.OpTagsGet(kind=kind, name=name)
324       return self._Query(op)
325
326     elif method == luxi.REQ_QUERY_LOCKS:
327       (fields, sync) = args
328       logging.info("Received locks query request")
329       if sync:
330         raise NotImplementedError("Synchronous queries are not implemented")
331       return self.server.context.glm.OldStyleQueryLocks(fields)
332
333     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
334       drain_flag = args
335       logging.info("Received queue drain flag change request to %s",
336                    drain_flag)
337       return queue.SetDrainFlag(drain_flag)
338
339     elif method == luxi.REQ_SET_WATCHER_PAUSE:
340       (until, ) = args
341
342       if until is None:
343         logging.info("Received request to no longer pause the watcher")
344       else:
345         if not isinstance(until, (int, float)):
346           raise TypeError("Duration must be an integer or float")
347
348         if until < time.time():
349           raise errors.GenericError("Unable to set pause end time in the past")
350
351         logging.info("Received request to pause the watcher until %s", until)
352
353       return _SetWatcherPause(until)
354
355     else:
356       logging.info("Received invalid request '%s'", method)
357       raise ValueError("Invalid operation '%s'" % method)
358
359   def _Query(self, op):
360     """Runs the specified opcode and returns the result.
361
362     """
363     # Queries don't have a job id
364     proc = mcpu.Processor(self.server.context, None)
365
366     # TODO: Executing an opcode using locks will acquire them in blocking mode.
367     # Consider using a timeout for retries.
368     return proc.ExecOpCode(op, None)
369
370
371 class GanetiContext(object):
372   """Context common to all ganeti threads.
373
374   This class creates and holds common objects shared by all threads.
375
376   """
377   # pylint: disable=W0212
378   # we do want to ensure a singleton here
379   _instance = None
380
381   def __init__(self):
382     """Constructs a new GanetiContext object.
383
384     There should be only a GanetiContext object at any time, so this
385     function raises an error if this is not the case.
386
387     """
388     assert self.__class__._instance is None, "double GanetiContext instance"
389
390     # Create global configuration object
391     self.cfg = config.ConfigWriter()
392
393     # Locking manager
394     self.glm = locking.GanetiLockManager(
395                 self.cfg.GetNodeList(),
396                 self.cfg.GetNodeGroupList(),
397                 self.cfg.GetInstanceList())
398
399     # Job queue
400     self.jobqueue = jqueue.JobQueue(self)
401
402     # RPC runner
403     self.rpc = rpc.RpcRunner(self)
404
405     # setting this also locks the class against attribute modifications
406     self.__class__._instance = self
407
408   def __setattr__(self, name, value):
409     """Setting GanetiContext attributes is forbidden after initialization.
410
411     """
412     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
413     object.__setattr__(self, name, value)
414
415   def AddNode(self, node, ec_id):
416     """Adds a node to the configuration and lock manager.
417
418     """
419     # Add it to the configuration
420     self.cfg.AddNode(node, ec_id)
421
422     # If preseeding fails it'll not be added
423     self.jobqueue.AddNode(node)
424
425     # Add the new node to the Ganeti Lock Manager
426     self.glm.add(locking.LEVEL_NODE, node.name)
427
428   def ReaddNode(self, node):
429     """Updates a node that's already in the configuration
430
431     """
432     # Synchronize the queue again
433     self.jobqueue.AddNode(node)
434
435   def RemoveNode(self, name):
436     """Removes a node from the configuration and lock manager.
437
438     """
439     # Remove node from configuration
440     self.cfg.RemoveNode(name)
441
442     # Notify job queue
443     self.jobqueue.RemoveNode(name)
444
445     # Remove the node from the Ganeti Lock Manager
446     self.glm.remove(locking.LEVEL_NODE, name)
447
448
449 def _SetWatcherPause(until):
450   """Creates or removes the watcher pause file.
451
452   @type until: None or int
453   @param until: Unix timestamp saying until when the watcher shouldn't run
454
455   """
456   if until is None:
457     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
458   else:
459     utils.WriteFile(constants.WATCHER_PAUSEFILE,
460                     data="%d\n" % (until, ))
461
462   return until
463
464
465 @rpc.RunWithRPC
466 def CheckAgreement():
467   """Check the agreement on who is the master.
468
469   The function uses a very simple algorithm: we must get more positive
470   than negative answers. Since in most of the cases we are the master,
471   we'll use our own config file for getting the node list. In the
472   future we could collect the current node list from our (possibly
473   obsolete) known nodes.
474
475   In order to account for cold-start of all nodes, we retry for up to
476   a minute until we get a real answer as the top-voted one. If the
477   nodes are more out-of-sync, for now manual startup of the master
478   should be attempted.
479
480   Note that for a even number of nodes cluster, we need at least half
481   of the nodes (beside ourselves) to vote for us. This creates a
482   problem on two-node clusters, since in this case we require the
483   other node to be up too to confirm our status.
484
485   """
486   myself = netutils.Hostname.GetSysName()
487   #temp instantiation of a config writer, used only to get the node list
488   cfg = config.ConfigWriter()
489   node_list = cfg.GetNodeList()
490   del cfg
491   retries = 6
492   while retries > 0:
493     votes = bootstrap.GatherMasterVotes(node_list)
494     if not votes:
495       # empty node list, this is a one node cluster
496       return True
497     if votes[0][0] is None:
498       retries -= 1
499       time.sleep(10)
500       continue
501     break
502   if retries == 0:
503     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
504                      " after multiple retries. Aborting startup")
505     logging.critical("Use the --no-voting option if you understand what"
506                      " effects it has on the cluster state")
507     return False
508   # here a real node is at the top of the list
509   all_votes = sum(item[1] for item in votes)
510   top_node, top_votes = votes[0]
511
512   result = False
513   if top_node != myself:
514     logging.critical("It seems we are not the master (top-voted node"
515                      " is %s with %d out of %d votes)", top_node, top_votes,
516                      all_votes)
517   elif top_votes < all_votes - top_votes:
518     logging.critical("It seems we are not the master (%d votes for,"
519                      " %d votes against)", top_votes, all_votes - top_votes)
520   else:
521     result = True
522
523   return result
524
525
526 @rpc.RunWithRPC
527 def ActivateMasterIP():
528   # activate ip
529   master_node = ssconf.SimpleStore().GetMasterNode()
530   result = rpc.RpcRunner.call_node_activate_master_ip(master_node)
531   msg = result.fail_msg
532   if msg:
533     logging.error("Can't activate master IP address: %s", msg)
534
535
536 def CheckMasterd(options, args):
537   """Initial checks whether to run or exit with a failure.
538
539   """
540   if args: # masterd doesn't take any arguments
541     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
542     sys.exit(constants.EXIT_FAILURE)
543
544   ssconf.CheckMaster(options.debug)
545
546   try:
547     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
548     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
549   except KeyError:
550     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
551                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
552     sys.exit(constants.EXIT_FAILURE)
553
554   # Check the configuration is sane before anything else
555   try:
556     config.ConfigWriter()
557   except errors.ConfigVersionMismatch, err:
558     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
559     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
560     print >> sys.stderr,  \
561         ("Configuration version mismatch. The current Ganeti software"
562          " expects version %s, but the on-disk configuration file has"
563          " version %s. This is likely the result of upgrading the"
564          " software without running the upgrade procedure. Please contact"
565          " your cluster administrator or complete the upgrade using the"
566          " cfgupgrade utility, after reading the upgrade notes." %
567          (v1, v2))
568     sys.exit(constants.EXIT_FAILURE)
569   except errors.ConfigurationError, err:
570     print >> sys.stderr, \
571         ("Configuration error while opening the configuration file: %s\n"
572          "This might be caused by an incomplete software upgrade or"
573          " by a corrupted configuration file. Until the problem is fixed"
574          " the master daemon cannot start." % str(err))
575     sys.exit(constants.EXIT_FAILURE)
576
577   # If CheckMaster didn't fail we believe we are the master, but we have to
578   # confirm with the other nodes.
579   if options.no_voting:
580     if not options.yes_do_it:
581       sys.stdout.write("The 'no voting' option has been selected.\n")
582       sys.stdout.write("This is dangerous, please confirm by"
583                        " typing uppercase 'yes': ")
584       sys.stdout.flush()
585
586       confirmation = sys.stdin.readline().strip()
587       if confirmation != "YES":
588         print >> sys.stderr, "Aborting."
589         sys.exit(constants.EXIT_FAILURE)
590
591   else:
592     # CheckAgreement uses RPC and threads, hence it needs to be run in
593     # a separate process before we call utils.Daemonize in the current
594     # process.
595     if not utils.RunInSeparateProcess(CheckAgreement):
596       sys.exit(constants.EXIT_FAILURE)
597
598   # ActivateMasterIP also uses RPC/threads, so we run it again via a
599   # separate process.
600
601   # TODO: decide whether failure to activate the master IP is a fatal error
602   utils.RunInSeparateProcess(ActivateMasterIP)
603
604
605 def PrepMasterd(options, _):
606   """Prep master daemon function, executed with the PID file held.
607
608   """
609   # This is safe to do as the pid file guarantees against
610   # concurrent execution.
611   utils.RemoveFile(constants.MASTER_SOCKET)
612
613   mainloop = daemon.Mainloop()
614   master = MasterServer(mainloop, constants.MASTER_SOCKET,
615                         options.uid, options.gid)
616   return (mainloop, master)
617
618
619 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
620   """Main master daemon function, executed with the PID file held.
621
622   """
623   (mainloop, master) = prep_data
624   try:
625     rpc.Init()
626     try:
627       master.setup_queue()
628       try:
629         mainloop.Run()
630       finally:
631         master.server_cleanup()
632     finally:
633       rpc.Shutdown()
634   finally:
635     utils.RemoveFile(constants.MASTER_SOCKET)
636
637
638 def Main():
639   """Main function"""
640   parser = OptionParser(description="Ganeti master daemon",
641                         usage="%prog [-f] [-d]",
642                         version="%%prog (ganeti) %s" %
643                         constants.RELEASE_VERSION)
644   parser.add_option("--no-voting", dest="no_voting",
645                     help="Do not check that the nodes agree on this node"
646                     " being the master and start the daemon unconditionally",
647                     default=False, action="store_true")
648   parser.add_option("--yes-do-it", dest="yes_do_it",
649                     help="Override interactive check for --no-voting",
650                     default=False, action="store_true")
651   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
652                      ExecMasterd, multithreaded=True)