Modify gnt-node add to call external script
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58
59
60 CLIENT_REQUEST_WORKERS = 16
61
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64
65
66 class ClientRequestWorker(workerpool.BaseWorker):
67   # pylint: disable-msg=W0221
68   def RunTask(self, server, message, client):
69     """Process the request.
70
71     """
72     client_ops = ClientOps(server)
73
74     try:
75       (method, args) = luxi.ParseRequest(message)
76     except luxi.ProtocolError, err:
77       logging.error("Protocol Error: %s", err)
78       client.close_log()
79       return
80
81     success = False
82     try:
83       result = client_ops.handle_request(method, args)
84       success = True
85     except errors.GenericError, err:
86       logging.exception("Unexpected exception")
87       success = False
88       result = errors.EncodeException(err)
89     except:
90       logging.exception("Unexpected exception")
91       err = sys.exc_info()
92       result = "Caught exception: %s" % str(err[1])
93
94     try:
95       reply = luxi.FormatResponse(success, result)
96       client.send_message(reply)
97       # awake the main thread so that it can write out the data.
98       server.awaker.signal()
99     except: # pylint: disable-msg=W0702
100       logging.exception("Send error")
101       client.close_log()
102
103
104 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
105   """Handler for master peers.
106
107   """
108   _MAX_UNHANDLED = 1
109   def __init__(self, server, connected_socket, client_address, family):
110     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
111                                                  client_address,
112                                                  constants.LUXI_EOM,
113                                                  family, self._MAX_UNHANDLED)
114     self.server = server
115
116   def handle_message(self, message, _):
117     self.server.request_workers.AddTask((self.server, message, self))
118
119
120 class MasterServer(daemon.AsyncStreamServer):
121   """Master Server.
122
123   This is the main asynchronous master server. It handles connections to the
124   master socket.
125
126   """
127   family = socket.AF_UNIX
128
129   def __init__(self, mainloop, address, uid, gid):
130     """MasterServer constructor
131
132     @type mainloop: ganeti.daemon.Mainloop
133     @param mainloop: Mainloop used to poll for I/O events
134     @param address: the unix socket address to bind the MasterServer to
135     @param uid: The uid of the owner of the socket
136     @param gid: The gid of the owner of the socket
137
138     """
139     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
140     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
141     os.chmod(temp_name, 0770)
142     os.chown(temp_name, uid, gid)
143     os.rename(temp_name, address)
144
145     self.mainloop = mainloop
146     self.awaker = daemon.AsyncAwaker()
147
148     # We'll only start threads once we've forked.
149     self.context = None
150     self.request_workers = None
151
152   def handle_connection(self, connected_socket, client_address):
153     # TODO: add connection count and limit the number of open connections to a
154     # maximum number to avoid breaking for lack of file descriptors or memory.
155     MasterClientHandler(self, connected_socket, client_address, self.family)
156
157   def setup_queue(self):
158     self.context = GanetiContext()
159     self.request_workers = workerpool.WorkerPool("ClientReq",
160                                                  CLIENT_REQUEST_WORKERS,
161                                                  ClientRequestWorker)
162
163   def server_cleanup(self):
164     """Cleanup the server.
165
166     This involves shutting down the processor threads and the master
167     socket.
168
169     """
170     try:
171       self.close()
172     finally:
173       if self.request_workers:
174         self.request_workers.TerminateWorkers()
175       if self.context:
176         self.context.jobqueue.Shutdown()
177
178
179 class ClientOps:
180   """Class holding high-level client operations."""
181   def __init__(self, server):
182     self.server = server
183
184   def handle_request(self, method, args): # pylint: disable-msg=R0911
185     queue = self.server.context.jobqueue
186
187     # TODO: Parameter validation
188
189     # TODO: Rewrite to not exit in each 'if/elif' branch
190
191     if method == luxi.REQ_SUBMIT_JOB:
192       logging.info("Received new job")
193       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
194       return queue.SubmitJob(ops)
195
196     if method == luxi.REQ_SUBMIT_MANY_JOBS:
197       logging.info("Received multiple jobs")
198       jobs = []
199       for ops in args:
200         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
201       return queue.SubmitManyJobs(jobs)
202
203     elif method == luxi.REQ_CANCEL_JOB:
204       job_id = args
205       logging.info("Received job cancel request for %s", job_id)
206       return queue.CancelJob(job_id)
207
208     elif method == luxi.REQ_ARCHIVE_JOB:
209       job_id = args
210       logging.info("Received job archive request for %s", job_id)
211       return queue.ArchiveJob(job_id)
212
213     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
214       (age, timeout) = args
215       logging.info("Received job autoarchive request for age %s, timeout %s",
216                    age, timeout)
217       return queue.AutoArchiveJobs(age, timeout)
218
219     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
221       logging.info("Received job poll request for %s", job_id)
222       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
223                                      prev_log_serial, timeout)
224
225     elif method == luxi.REQ_QUERY_JOBS:
226       (job_ids, fields) = args
227       if isinstance(job_ids, (tuple, list)) and job_ids:
228         msg = utils.CommaJoin(job_ids)
229       else:
230         msg = str(job_ids)
231       logging.info("Received job query request for %s", msg)
232       return queue.QueryJobs(job_ids, fields)
233
234     elif method == luxi.REQ_QUERY_INSTANCES:
235       (names, fields, use_locking) = args
236       logging.info("Received instance query request for %s", names)
237       if use_locking:
238         raise errors.OpPrereqError("Sync queries are not allowed",
239                                    errors.ECODE_INVAL)
240       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
241                                     use_locking=use_locking)
242       return self._Query(op)
243
244     elif method == luxi.REQ_QUERY_NODES:
245       (names, fields, use_locking) = args
246       logging.info("Received node query request for %s", names)
247       if use_locking:
248         raise errors.OpPrereqError("Sync queries are not allowed",
249                                    errors.ECODE_INVAL)
250       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
251                                 use_locking=use_locking)
252       return self._Query(op)
253
254     elif method == luxi.REQ_QUERY_EXPORTS:
255       nodes, use_locking = args
256       if use_locking:
257         raise errors.OpPrereqError("Sync queries are not allowed",
258                                    errors.ECODE_INVAL)
259       logging.info("Received exports query request")
260       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
261       return self._Query(op)
262
263     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
264       fields = args
265       logging.info("Received config values query request for %s", fields)
266       op = opcodes.OpQueryConfigValues(output_fields=fields)
267       return self._Query(op)
268
269     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
270       logging.info("Received cluster info query request")
271       op = opcodes.OpQueryClusterInfo()
272       return self._Query(op)
273
274     elif method == luxi.REQ_QUERY_TAGS:
275       kind, name = args
276       logging.info("Received tags query request")
277       op = opcodes.OpGetTags(kind=kind, name=name)
278       return self._Query(op)
279
280     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
281       drain_flag = args
282       logging.info("Received queue drain flag change request to %s",
283                    drain_flag)
284       return queue.SetDrainFlag(drain_flag)
285
286     elif method == luxi.REQ_SET_WATCHER_PAUSE:
287       (until, ) = args
288
289       if until is None:
290         logging.info("Received request to no longer pause the watcher")
291       else:
292         if not isinstance(until, (int, float)):
293           raise TypeError("Duration must be an integer or float")
294
295         if until < time.time():
296           raise errors.GenericError("Unable to set pause end time in the past")
297
298         logging.info("Received request to pause the watcher until %s", until)
299
300       return _SetWatcherPause(until)
301
302     else:
303       logging.info("Received invalid request '%s'", method)
304       raise ValueError("Invalid operation '%s'" % method)
305
306   def _Query(self, op):
307     """Runs the specified opcode and returns the result.
308
309     """
310     # Queries don't have a job id
311     proc = mcpu.Processor(self.server.context, None)
312     return proc.ExecOpCode(op, None)
313
314
315 class GanetiContext(object):
316   """Context common to all ganeti threads.
317
318   This class creates and holds common objects shared by all threads.
319
320   """
321   # pylint: disable-msg=W0212
322   # we do want to ensure a singleton here
323   _instance = None
324
325   def __init__(self):
326     """Constructs a new GanetiContext object.
327
328     There should be only a GanetiContext object at any time, so this
329     function raises an error if this is not the case.
330
331     """
332     assert self.__class__._instance is None, "double GanetiContext instance"
333
334     # Create global configuration object
335     self.cfg = config.ConfigWriter()
336
337     # Locking manager
338     self.glm = locking.GanetiLockManager(
339                 self.cfg.GetNodeList(),
340                 self.cfg.GetInstanceList())
341
342     # Job queue
343     self.jobqueue = jqueue.JobQueue(self)
344
345     # setting this also locks the class against attribute modifications
346     self.__class__._instance = self
347
348   def __setattr__(self, name, value):
349     """Setting GanetiContext attributes is forbidden after initialization.
350
351     """
352     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
353     object.__setattr__(self, name, value)
354
355   def AddNode(self, node, ec_id):
356     """Adds a node to the configuration and lock manager.
357
358     """
359     # Add it to the configuration
360     self.cfg.AddNode(node, ec_id)
361
362     # If preseeding fails it'll not be added
363     self.jobqueue.AddNode(node)
364
365     # Add the new node to the Ganeti Lock Manager
366     self.glm.add(locking.LEVEL_NODE, node.name)
367
368   def ReaddNode(self, node):
369     """Updates a node that's already in the configuration
370
371     """
372     # Synchronize the queue again
373     self.jobqueue.AddNode(node)
374
375   def RemoveNode(self, name):
376     """Removes a node from the configuration and lock manager.
377
378     """
379     # Remove node from configuration
380     self.cfg.RemoveNode(name)
381
382     # Notify job queue
383     self.jobqueue.RemoveNode(name)
384
385     # Remove the node from the Ganeti Lock Manager
386     self.glm.remove(locking.LEVEL_NODE, name)
387
388
389 def _SetWatcherPause(until):
390   """Creates or removes the watcher pause file.
391
392   @type until: None or int
393   @param until: Unix timestamp saying until when the watcher shouldn't run
394
395   """
396   if until is None:
397     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
398   else:
399     utils.WriteFile(constants.WATCHER_PAUSEFILE,
400                     data="%d\n" % (until, ))
401
402   return until
403
404
405 @rpc.RunWithRPC
406 def CheckAgreement():
407   """Check the agreement on who is the master.
408
409   The function uses a very simple algorithm: we must get more positive
410   than negative answers. Since in most of the cases we are the master,
411   we'll use our own config file for getting the node list. In the
412   future we could collect the current node list from our (possibly
413   obsolete) known nodes.
414
415   In order to account for cold-start of all nodes, we retry for up to
416   a minute until we get a real answer as the top-voted one. If the
417   nodes are more out-of-sync, for now manual startup of the master
418   should be attempted.
419
420   Note that for a even number of nodes cluster, we need at least half
421   of the nodes (beside ourselves) to vote for us. This creates a
422   problem on two-node clusters, since in this case we require the
423   other node to be up too to confirm our status.
424
425   """
426   myself = netutils.Hostname.GetSysName()
427   #temp instantiation of a config writer, used only to get the node list
428   cfg = config.ConfigWriter()
429   node_list = cfg.GetNodeList()
430   del cfg
431   retries = 6
432   while retries > 0:
433     votes = bootstrap.GatherMasterVotes(node_list)
434     if not votes:
435       # empty node list, this is a one node cluster
436       return True
437     if votes[0][0] is None:
438       retries -= 1
439       time.sleep(10)
440       continue
441     break
442   if retries == 0:
443     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
444                      " after multiple retries. Aborting startup")
445     logging.critical("Use the --no-voting option if you understand what"
446                      " effects it has on the cluster state")
447     return False
448   # here a real node is at the top of the list
449   all_votes = sum(item[1] for item in votes)
450   top_node, top_votes = votes[0]
451
452   result = False
453   if top_node != myself:
454     logging.critical("It seems we are not the master (top-voted node"
455                      " is %s with %d out of %d votes)", top_node, top_votes,
456                      all_votes)
457   elif top_votes < all_votes - top_votes:
458     logging.critical("It seems we are not the master (%d votes for,"
459                      " %d votes against)", top_votes, all_votes - top_votes)
460   else:
461     result = True
462
463   return result
464
465
466 @rpc.RunWithRPC
467 def ActivateMasterIP():
468   # activate ip
469   master_node = ssconf.SimpleStore().GetMasterNode()
470   result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
471   msg = result.fail_msg
472   if msg:
473     logging.error("Can't activate master IP address: %s", msg)
474
475
476 def CheckMasterd(options, args):
477   """Initial checks whether to run or exit with a failure.
478
479   """
480   if args: # masterd doesn't take any arguments
481     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
482     sys.exit(constants.EXIT_FAILURE)
483
484   ssconf.CheckMaster(options.debug)
485
486   try:
487     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
488     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
489   except KeyError:
490     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
491                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
492     sys.exit(constants.EXIT_FAILURE)
493
494
495   # If CheckMaster didn't fail we believe we are the master, but we have to
496   # confirm with the other nodes.
497   if options.no_voting:
498     if options.yes_do_it:
499       return
500
501     sys.stdout.write("The 'no voting' option has been selected.\n")
502     sys.stdout.write("This is dangerous, please confirm by"
503                      " typing uppercase 'yes': ")
504     sys.stdout.flush()
505
506     confirmation = sys.stdin.readline().strip()
507     if confirmation != "YES":
508       print >> sys.stderr, "Aborting."
509       sys.exit(constants.EXIT_FAILURE)
510
511     return
512
513   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
514   # process before we call utils.Daemonize in the current process.
515   if not utils.RunInSeparateProcess(CheckAgreement):
516     sys.exit(constants.EXIT_FAILURE)
517
518   # ActivateMasterIP also uses RPC/threads, so we run it again via a
519   # separate process.
520
521   # TODO: decide whether failure to activate the master IP is a fatal error
522   utils.RunInSeparateProcess(ActivateMasterIP)
523
524
525 def ExecMasterd(options, args): # pylint: disable-msg=W0613
526   """Main master daemon function, executed with the PID file held.
527
528   """
529   # This is safe to do as the pid file guarantees against
530   # concurrent execution.
531   utils.RemoveFile(constants.MASTER_SOCKET)
532
533   mainloop = daemon.Mainloop()
534   master = MasterServer(mainloop, constants.MASTER_SOCKET,
535                         options.uid, options.gid)
536   try:
537     rpc.Init()
538     try:
539       master.setup_queue()
540       try:
541         mainloop.Run()
542       finally:
543         master.server_cleanup()
544     finally:
545       rpc.Shutdown()
546   finally:
547     utils.RemoveFile(constants.MASTER_SOCKET)
548
549
550 def main():
551   """Main function"""
552   parser = OptionParser(description="Ganeti master daemon",
553                         usage="%prog [-f] [-d]",
554                         version="%%prog (ganeti) %s" %
555                         constants.RELEASE_VERSION)
556   parser.add_option("--no-voting", dest="no_voting",
557                     help="Do not check that the nodes agree on this node"
558                     " being the master and start the daemon unconditionally",
559                     default=False, action="store_true")
560   parser.add_option("--yes-do-it", dest="yes_do_it",
561                     help="Override interactive check for --no-voting",
562                     default=False, action="store_true")
563   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
564           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
565          ]
566   daemon.GenericMain(constants.MASTERD, parser, dirs,
567                      CheckMasterd, ExecMasterd,
568                      multithreaded=True)
569
570
571 if __name__ == "__main__":
572   main()