Explicitly pass params to activate_master_ip
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66
67
68 class ClientRequestWorker(workerpool.BaseWorker):
69   # pylint: disable=W0221
70   def RunTask(self, server, message, client):
71     """Process the request.
72
73     """
74     client_ops = ClientOps(server)
75
76     try:
77       (method, args, version) = luxi.ParseRequest(message)
78     except luxi.ProtocolError, err:
79       logging.error("Protocol Error: %s", err)
80       client.close_log()
81       return
82
83     success = False
84     try:
85       # Verify client's version if there was one in the request
86       if version is not None and version != constants.LUXI_VERSION:
87         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88                                (constants.LUXI_VERSION, version))
89
90       result = client_ops.handle_request(method, args)
91       success = True
92     except errors.GenericError, err:
93       logging.exception("Unexpected exception")
94       success = False
95       result = errors.EncodeException(err)
96     except:
97       logging.exception("Unexpected exception")
98       err = sys.exc_info()
99       result = "Caught exception: %s" % str(err[1])
100
101     try:
102       reply = luxi.FormatResponse(success, result)
103       client.send_message(reply)
104       # awake the main thread so that it can write out the data.
105       server.awaker.signal()
106     except: # pylint: disable=W0702
107       logging.exception("Send error")
108       client.close_log()
109
110
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112   """Handler for master peers.
113
114   """
115   _MAX_UNHANDLED = 1
116
117   def __init__(self, server, connected_socket, client_address, family):
118     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119                                                  client_address,
120                                                  constants.LUXI_EOM,
121                                                  family, self._MAX_UNHANDLED)
122     self.server = server
123
124   def handle_message(self, message, _):
125     self.server.request_workers.AddTask((self.server, message, self))
126
127
128 class MasterServer(daemon.AsyncStreamServer):
129   """Master Server.
130
131   This is the main asynchronous master server. It handles connections to the
132   master socket.
133
134   """
135   family = socket.AF_UNIX
136
137   def __init__(self, mainloop, address, uid, gid):
138     """MasterServer constructor
139
140     @type mainloop: ganeti.daemon.Mainloop
141     @param mainloop: Mainloop used to poll for I/O events
142     @param address: the unix socket address to bind the MasterServer to
143     @param uid: The uid of the owner of the socket
144     @param gid: The gid of the owner of the socket
145
146     """
147     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149     os.chmod(temp_name, 0770)
150     os.chown(temp_name, uid, gid)
151     os.rename(temp_name, address)
152
153     self.mainloop = mainloop
154     self.awaker = daemon.AsyncAwaker()
155
156     # We'll only start threads once we've forked.
157     self.context = None
158     self.request_workers = None
159
160   def handle_connection(self, connected_socket, client_address):
161     # TODO: add connection count and limit the number of open connections to a
162     # maximum number to avoid breaking for lack of file descriptors or memory.
163     MasterClientHandler(self, connected_socket, client_address, self.family)
164
165   def setup_queue(self):
166     self.context = GanetiContext()
167     self.request_workers = workerpool.WorkerPool("ClientReq",
168                                                  CLIENT_REQUEST_WORKERS,
169                                                  ClientRequestWorker)
170
171   def server_cleanup(self):
172     """Cleanup the server.
173
174     This involves shutting down the processor threads and the master
175     socket.
176
177     """
178     try:
179       self.close()
180     finally:
181       if self.request_workers:
182         self.request_workers.TerminateWorkers()
183       if self.context:
184         self.context.jobqueue.Shutdown()
185
186
187 class ClientOps:
188   """Class holding high-level client operations."""
189   def __init__(self, server):
190     self.server = server
191
192   def handle_request(self, method, args): # pylint: disable=R0911
193     queue = self.server.context.jobqueue
194
195     # TODO: Parameter validation
196     if not isinstance(args, (tuple, list)):
197       logging.info("Received invalid arguments of type '%s'", type(args))
198       raise ValueError("Invalid arguments type '%s'" % type(args))
199
200     # TODO: Rewrite to not exit in each 'if/elif' branch
201
202     if method == luxi.REQ_SUBMIT_JOB:
203       logging.info("Received new job")
204       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
205       return queue.SubmitJob(ops)
206
207     if method == luxi.REQ_SUBMIT_MANY_JOBS:
208       logging.info("Received multiple jobs")
209       jobs = []
210       for ops in args:
211         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
212       return queue.SubmitManyJobs(jobs)
213
214     elif method == luxi.REQ_CANCEL_JOB:
215       (job_id, ) = args
216       logging.info("Received job cancel request for %s", job_id)
217       return queue.CancelJob(job_id)
218
219     elif method == luxi.REQ_ARCHIVE_JOB:
220       (job_id, ) = args
221       logging.info("Received job archive request for %s", job_id)
222       return queue.ArchiveJob(job_id)
223
224     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
225       (age, timeout) = args
226       logging.info("Received job autoarchive request for age %s, timeout %s",
227                    age, timeout)
228       return queue.AutoArchiveJobs(age, timeout)
229
230     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
231       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
232       logging.info("Received job poll request for %s", job_id)
233       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
234                                      prev_log_serial, timeout)
235
236     elif method == luxi.REQ_QUERY:
237       (what, fields, qfilter) = args
238       req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
239
240       if req.what in constants.QR_VIA_OP:
241         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
242                                              qfilter=req.qfilter))
243       elif req.what == constants.QR_LOCK:
244         if req.qfilter is not None:
245           raise errors.OpPrereqError("Lock queries can't be filtered")
246         return self.server.context.glm.QueryLocks(req.fields)
247       elif req.what in constants.QR_VIA_LUXI:
248         raise NotImplementedError
249       else:
250         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
251                                    errors.ECODE_INVAL)
252
253       return result
254
255     elif method == luxi.REQ_QUERY_FIELDS:
256       (what, fields) = args
257       req = objects.QueryFieldsRequest(what=what, fields=fields)
258
259       try:
260         fielddefs = query.ALL_FIELDS[req.what]
261       except KeyError:
262         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
263                                    errors.ECODE_INVAL)
264
265       return query.QueryFields(fielddefs, req.fields)
266
267     elif method == luxi.REQ_QUERY_JOBS:
268       (job_ids, fields) = args
269       if isinstance(job_ids, (tuple, list)) and job_ids:
270         msg = utils.CommaJoin(job_ids)
271       else:
272         msg = str(job_ids)
273       logging.info("Received job query request for %s", msg)
274       return queue.QueryJobs(job_ids, fields)
275
276     elif method == luxi.REQ_QUERY_INSTANCES:
277       (names, fields, use_locking) = args
278       logging.info("Received instance query request for %s", names)
279       if use_locking:
280         raise errors.OpPrereqError("Sync queries are not allowed",
281                                    errors.ECODE_INVAL)
282       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
283                                    use_locking=use_locking)
284       return self._Query(op)
285
286     elif method == luxi.REQ_QUERY_NODES:
287       (names, fields, use_locking) = args
288       logging.info("Received node query request for %s", names)
289       if use_locking:
290         raise errors.OpPrereqError("Sync queries are not allowed",
291                                    errors.ECODE_INVAL)
292       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
293                                use_locking=use_locking)
294       return self._Query(op)
295
296     elif method == luxi.REQ_QUERY_GROUPS:
297       (names, fields, use_locking) = args
298       logging.info("Received group query request for %s", names)
299       if use_locking:
300         raise errors.OpPrereqError("Sync queries are not allowed",
301                                    errors.ECODE_INVAL)
302       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
303       return self._Query(op)
304
305     elif method == luxi.REQ_QUERY_EXPORTS:
306       (nodes, use_locking) = args
307       if use_locking:
308         raise errors.OpPrereqError("Sync queries are not allowed",
309                                    errors.ECODE_INVAL)
310       logging.info("Received exports query request")
311       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
312       return self._Query(op)
313
314     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
315       (fields, ) = args
316       logging.info("Received config values query request for %s", fields)
317       op = opcodes.OpClusterConfigQuery(output_fields=fields)
318       return self._Query(op)
319
320     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
321       logging.info("Received cluster info query request")
322       op = opcodes.OpClusterQuery()
323       return self._Query(op)
324
325     elif method == luxi.REQ_QUERY_TAGS:
326       (kind, name) = args
327       logging.info("Received tags query request")
328       op = opcodes.OpTagsGet(kind=kind, name=name)
329       return self._Query(op)
330
331     elif method == luxi.REQ_QUERY_LOCKS:
332       (fields, sync) = args
333       logging.info("Received locks query request")
334       if sync:
335         raise NotImplementedError("Synchronous queries are not implemented")
336       return self.server.context.glm.OldStyleQueryLocks(fields)
337
338     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
339       (drain_flag, ) = args
340       logging.info("Received queue drain flag change request to %s",
341                    drain_flag)
342       return queue.SetDrainFlag(drain_flag)
343
344     elif method == luxi.REQ_SET_WATCHER_PAUSE:
345       (until, ) = args
346
347       if until is None:
348         logging.info("Received request to no longer pause the watcher")
349       else:
350         if not isinstance(until, (int, float)):
351           raise TypeError("Duration must be an integer or float")
352
353         if until < time.time():
354           raise errors.GenericError("Unable to set pause end time in the past")
355
356         logging.info("Received request to pause the watcher until %s", until)
357
358       return _SetWatcherPause(until)
359
360     else:
361       logging.info("Received invalid request '%s'", method)
362       raise ValueError("Invalid operation '%s'" % method)
363
364   def _Query(self, op):
365     """Runs the specified opcode and returns the result.
366
367     """
368     # Queries don't have a job id
369     proc = mcpu.Processor(self.server.context, None)
370
371     # TODO: Executing an opcode using locks will acquire them in blocking mode.
372     # Consider using a timeout for retries.
373     return proc.ExecOpCode(op, None)
374
375
376 class GanetiContext(object):
377   """Context common to all ganeti threads.
378
379   This class creates and holds common objects shared by all threads.
380
381   """
382   # pylint: disable=W0212
383   # we do want to ensure a singleton here
384   _instance = None
385
386   def __init__(self):
387     """Constructs a new GanetiContext object.
388
389     There should be only a GanetiContext object at any time, so this
390     function raises an error if this is not the case.
391
392     """
393     assert self.__class__._instance is None, "double GanetiContext instance"
394
395     # Create global configuration object
396     self.cfg = config.ConfigWriter()
397
398     # Locking manager
399     self.glm = locking.GanetiLockManager(
400                 self.cfg.GetNodeList(),
401                 self.cfg.GetNodeGroupList(),
402                 self.cfg.GetInstanceList())
403
404     # Job queue
405     self.jobqueue = jqueue.JobQueue(self)
406
407     # RPC runner
408     self.rpc = rpc.RpcRunner(self)
409
410     # setting this also locks the class against attribute modifications
411     self.__class__._instance = self
412
413   def __setattr__(self, name, value):
414     """Setting GanetiContext attributes is forbidden after initialization.
415
416     """
417     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
418     object.__setattr__(self, name, value)
419
420   def AddNode(self, node, ec_id):
421     """Adds a node to the configuration and lock manager.
422
423     """
424     # Add it to the configuration
425     self.cfg.AddNode(node, ec_id)
426
427     # If preseeding fails it'll not be added
428     self.jobqueue.AddNode(node)
429
430     # Add the new node to the Ganeti Lock Manager
431     self.glm.add(locking.LEVEL_NODE, node.name)
432     self.glm.add(locking.LEVEL_NODE_RES, node.name)
433
434   def ReaddNode(self, node):
435     """Updates a node that's already in the configuration
436
437     """
438     # Synchronize the queue again
439     self.jobqueue.AddNode(node)
440
441   def RemoveNode(self, name):
442     """Removes a node from the configuration and lock manager.
443
444     """
445     # Remove node from configuration
446     self.cfg.RemoveNode(name)
447
448     # Notify job queue
449     self.jobqueue.RemoveNode(name)
450
451     # Remove the node from the Ganeti Lock Manager
452     self.glm.remove(locking.LEVEL_NODE, name)
453     self.glm.remove(locking.LEVEL_NODE_RES, name)
454
455
456 def _SetWatcherPause(until):
457   """Creates or removes the watcher pause file.
458
459   @type until: None or int
460   @param until: Unix timestamp saying until when the watcher shouldn't run
461
462   """
463   if until is None:
464     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
465   else:
466     utils.WriteFile(constants.WATCHER_PAUSEFILE,
467                     data="%d\n" % (until, ))
468
469   return until
470
471
472 @rpc.RunWithRPC
473 def CheckAgreement():
474   """Check the agreement on who is the master.
475
476   The function uses a very simple algorithm: we must get more positive
477   than negative answers. Since in most of the cases we are the master,
478   we'll use our own config file for getting the node list. In the
479   future we could collect the current node list from our (possibly
480   obsolete) known nodes.
481
482   In order to account for cold-start of all nodes, we retry for up to
483   a minute until we get a real answer as the top-voted one. If the
484   nodes are more out-of-sync, for now manual startup of the master
485   should be attempted.
486
487   Note that for a even number of nodes cluster, we need at least half
488   of the nodes (beside ourselves) to vote for us. This creates a
489   problem on two-node clusters, since in this case we require the
490   other node to be up too to confirm our status.
491
492   """
493   myself = netutils.Hostname.GetSysName()
494   #temp instantiation of a config writer, used only to get the node list
495   cfg = config.ConfigWriter()
496   node_list = cfg.GetNodeList()
497   del cfg
498   retries = 6
499   while retries > 0:
500     votes = bootstrap.GatherMasterVotes(node_list)
501     if not votes:
502       # empty node list, this is a one node cluster
503       return True
504     if votes[0][0] is None:
505       retries -= 1
506       time.sleep(10)
507       continue
508     break
509   if retries == 0:
510     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
511                      " after multiple retries. Aborting startup")
512     logging.critical("Use the --no-voting option if you understand what"
513                      " effects it has on the cluster state")
514     return False
515   # here a real node is at the top of the list
516   all_votes = sum(item[1] for item in votes)
517   top_node, top_votes = votes[0]
518
519   result = False
520   if top_node != myself:
521     logging.critical("It seems we are not the master (top-voted node"
522                      " is %s with %d out of %d votes)", top_node, top_votes,
523                      all_votes)
524   elif top_votes < all_votes - top_votes:
525     logging.critical("It seems we are not the master (%d votes for,"
526                      " %d votes against)", top_votes, all_votes - top_votes)
527   else:
528     result = True
529
530   return result
531
532
533 @rpc.RunWithRPC
534 def ActivateMasterIP():
535   # activate ip
536   cfg = config.ConfigWriter()
537   (master, ip, dev, netmask, family) = cfg.GetMasterNetworkParameters()
538   runner = rpc.BootstrapRunner()
539   result = runner.call_node_activate_master_ip(master, ip, netmask, dev, family)
540
541   msg = result.fail_msg
542   if msg:
543     logging.error("Can't activate master IP address: %s", msg)
544
545
546 def CheckMasterd(options, args):
547   """Initial checks whether to run or exit with a failure.
548
549   """
550   if args: # masterd doesn't take any arguments
551     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
552     sys.exit(constants.EXIT_FAILURE)
553
554   ssconf.CheckMaster(options.debug)
555
556   try:
557     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
558     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
559   except KeyError:
560     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
561                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
562     sys.exit(constants.EXIT_FAILURE)
563
564   # Check the configuration is sane before anything else
565   try:
566     config.ConfigWriter()
567   except errors.ConfigVersionMismatch, err:
568     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
569     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
570     print >> sys.stderr,  \
571         ("Configuration version mismatch. The current Ganeti software"
572          " expects version %s, but the on-disk configuration file has"
573          " version %s. This is likely the result of upgrading the"
574          " software without running the upgrade procedure. Please contact"
575          " your cluster administrator or complete the upgrade using the"
576          " cfgupgrade utility, after reading the upgrade notes." %
577          (v1, v2))
578     sys.exit(constants.EXIT_FAILURE)
579   except errors.ConfigurationError, err:
580     print >> sys.stderr, \
581         ("Configuration error while opening the configuration file: %s\n"
582          "This might be caused by an incomplete software upgrade or"
583          " by a corrupted configuration file. Until the problem is fixed"
584          " the master daemon cannot start." % str(err))
585     sys.exit(constants.EXIT_FAILURE)
586
587   # If CheckMaster didn't fail we believe we are the master, but we have to
588   # confirm with the other nodes.
589   if options.no_voting:
590     if not options.yes_do_it:
591       sys.stdout.write("The 'no voting' option has been selected.\n")
592       sys.stdout.write("This is dangerous, please confirm by"
593                        " typing uppercase 'yes': ")
594       sys.stdout.flush()
595
596       confirmation = sys.stdin.readline().strip()
597       if confirmation != "YES":
598         print >> sys.stderr, "Aborting."
599         sys.exit(constants.EXIT_FAILURE)
600
601   else:
602     # CheckAgreement uses RPC and threads, hence it needs to be run in
603     # a separate process before we call utils.Daemonize in the current
604     # process.
605     if not utils.RunInSeparateProcess(CheckAgreement):
606       sys.exit(constants.EXIT_FAILURE)
607
608   # ActivateMasterIP also uses RPC/threads, so we run it again via a
609   # separate process.
610
611   # TODO: decide whether failure to activate the master IP is a fatal error
612   utils.RunInSeparateProcess(ActivateMasterIP)
613
614
615 def PrepMasterd(options, _):
616   """Prep master daemon function, executed with the PID file held.
617
618   """
619   # This is safe to do as the pid file guarantees against
620   # concurrent execution.
621   utils.RemoveFile(constants.MASTER_SOCKET)
622
623   mainloop = daemon.Mainloop()
624   master = MasterServer(mainloop, constants.MASTER_SOCKET,
625                         options.uid, options.gid)
626   return (mainloop, master)
627
628
629 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
630   """Main master daemon function, executed with the PID file held.
631
632   """
633   (mainloop, master) = prep_data
634   try:
635     rpc.Init()
636     try:
637       master.setup_queue()
638       try:
639         mainloop.Run()
640       finally:
641         master.server_cleanup()
642     finally:
643       rpc.Shutdown()
644   finally:
645     utils.RemoveFile(constants.MASTER_SOCKET)
646
647
648 def Main():
649   """Main function"""
650   parser = OptionParser(description="Ganeti master daemon",
651                         usage="%prog [-f] [-d]",
652                         version="%%prog (ganeti) %s" %
653                         constants.RELEASE_VERSION)
654   parser.add_option("--no-voting", dest="no_voting",
655                     help="Do not check that the nodes agree on this node"
656                     " being the master and start the daemon unconditionally",
657                     default=False, action="store_true")
658   parser.add_option("--yes-do-it", dest="yes_do_it",
659                     help="Override interactive check for --no-voting",
660                     default=False, action="store_true")
661   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
662                      ExecMasterd, multithreaded=True)