Export job dependencies through lock monitor
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66
67
68 class ClientRequestWorker(workerpool.BaseWorker):
69   # pylint: disable-msg=W0221
70   def RunTask(self, server, message, client):
71     """Process the request.
72
73     """
74     client_ops = ClientOps(server)
75
76     try:
77       (method, args, version) = luxi.ParseRequest(message)
78     except luxi.ProtocolError, err:
79       logging.error("Protocol Error: %s", err)
80       client.close_log()
81       return
82
83     success = False
84     try:
85       # Verify client's version if there was one in the request
86       if version is not None and version != constants.LUXI_VERSION:
87         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88                                (constants.LUXI_VERSION, version))
89
90       result = client_ops.handle_request(method, args)
91       success = True
92     except errors.GenericError, err:
93       logging.exception("Unexpected exception")
94       success = False
95       result = errors.EncodeException(err)
96     except:
97       logging.exception("Unexpected exception")
98       err = sys.exc_info()
99       result = "Caught exception: %s" % str(err[1])
100
101     try:
102       reply = luxi.FormatResponse(success, result)
103       client.send_message(reply)
104       # awake the main thread so that it can write out the data.
105       server.awaker.signal()
106     except: # pylint: disable-msg=W0702
107       logging.exception("Send error")
108       client.close_log()
109
110
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112   """Handler for master peers.
113
114   """
115   _MAX_UNHANDLED = 1
116   def __init__(self, server, connected_socket, client_address, family):
117     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
118                                                  client_address,
119                                                  constants.LUXI_EOM,
120                                                  family, self._MAX_UNHANDLED)
121     self.server = server
122
123   def handle_message(self, message, _):
124     self.server.request_workers.AddTask((self.server, message, self))
125
126
127 class MasterServer(daemon.AsyncStreamServer):
128   """Master Server.
129
130   This is the main asynchronous master server. It handles connections to the
131   master socket.
132
133   """
134   family = socket.AF_UNIX
135
136   def __init__(self, mainloop, address, uid, gid):
137     """MasterServer constructor
138
139     @type mainloop: ganeti.daemon.Mainloop
140     @param mainloop: Mainloop used to poll for I/O events
141     @param address: the unix socket address to bind the MasterServer to
142     @param uid: The uid of the owner of the socket
143     @param gid: The gid of the owner of the socket
144
145     """
146     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148     os.chmod(temp_name, 0770)
149     os.chown(temp_name, uid, gid)
150     os.rename(temp_name, address)
151
152     self.mainloop = mainloop
153     self.awaker = daemon.AsyncAwaker()
154
155     # We'll only start threads once we've forked.
156     self.context = None
157     self.request_workers = None
158
159   def handle_connection(self, connected_socket, client_address):
160     # TODO: add connection count and limit the number of open connections to a
161     # maximum number to avoid breaking for lack of file descriptors or memory.
162     MasterClientHandler(self, connected_socket, client_address, self.family)
163
164   def setup_queue(self):
165     self.context = GanetiContext()
166     self.request_workers = workerpool.WorkerPool("ClientReq",
167                                                  CLIENT_REQUEST_WORKERS,
168                                                  ClientRequestWorker)
169
170   def server_cleanup(self):
171     """Cleanup the server.
172
173     This involves shutting down the processor threads and the master
174     socket.
175
176     """
177     try:
178       self.close()
179     finally:
180       if self.request_workers:
181         self.request_workers.TerminateWorkers()
182       if self.context:
183         self.context.jobqueue.Shutdown()
184
185
186 class ClientOps:
187   """Class holding high-level client operations."""
188   def __init__(self, server):
189     self.server = server
190
191   def handle_request(self, method, args): # pylint: disable-msg=R0911
192     queue = self.server.context.jobqueue
193
194     # TODO: Parameter validation
195
196     # TODO: Rewrite to not exit in each 'if/elif' branch
197
198     if method == luxi.REQ_SUBMIT_JOB:
199       logging.info("Received new job")
200       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201       return queue.SubmitJob(ops)
202
203     if method == luxi.REQ_SUBMIT_MANY_JOBS:
204       logging.info("Received multiple jobs")
205       jobs = []
206       for ops in args:
207         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208       return queue.SubmitManyJobs(jobs)
209
210     elif method == luxi.REQ_CANCEL_JOB:
211       job_id = args
212       logging.info("Received job cancel request for %s", job_id)
213       return queue.CancelJob(job_id)
214
215     elif method == luxi.REQ_ARCHIVE_JOB:
216       job_id = args
217       logging.info("Received job archive request for %s", job_id)
218       return queue.ArchiveJob(job_id)
219
220     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221       (age, timeout) = args
222       logging.info("Received job autoarchive request for age %s, timeout %s",
223                    age, timeout)
224       return queue.AutoArchiveJobs(age, timeout)
225
226     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228       logging.info("Received job poll request for %s", job_id)
229       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230                                      prev_log_serial, timeout)
231
232     elif method == luxi.REQ_QUERY:
233       req = objects.QueryRequest.FromDict(args)
234
235       if req.what in constants.QR_VIA_OP:
236         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
237                                              filter=req.filter))
238       elif req.what == constants.QR_LOCK:
239         if req.filter is not None:
240           raise errors.OpPrereqError("Lock queries can't be filtered")
241         return self.server.context.glm.QueryLocks(req.fields)
242       elif req.what in constants.QR_VIA_LUXI:
243         raise NotImplementedError
244       else:
245         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
246                                    errors.ECODE_INVAL)
247
248       return result
249
250     elif method == luxi.REQ_QUERY_FIELDS:
251       req = objects.QueryFieldsRequest.FromDict(args)
252
253       try:
254         fielddefs = query.ALL_FIELDS[req.what]
255       except KeyError:
256         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
257                                    errors.ECODE_INVAL)
258
259       return query.QueryFields(fielddefs, req.fields)
260
261     elif method == luxi.REQ_QUERY_JOBS:
262       (job_ids, fields) = args
263       if isinstance(job_ids, (tuple, list)) and job_ids:
264         msg = utils.CommaJoin(job_ids)
265       else:
266         msg = str(job_ids)
267       logging.info("Received job query request for %s", msg)
268       return queue.QueryJobs(job_ids, fields)
269
270     elif method == luxi.REQ_QUERY_INSTANCES:
271       (names, fields, use_locking) = args
272       logging.info("Received instance query request for %s", names)
273       if use_locking:
274         raise errors.OpPrereqError("Sync queries are not allowed",
275                                    errors.ECODE_INVAL)
276       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
277                                    use_locking=use_locking)
278       return self._Query(op)
279
280     elif method == luxi.REQ_QUERY_NODES:
281       (names, fields, use_locking) = args
282       logging.info("Received node query request for %s", names)
283       if use_locking:
284         raise errors.OpPrereqError("Sync queries are not allowed",
285                                    errors.ECODE_INVAL)
286       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
287                                use_locking=use_locking)
288       return self._Query(op)
289
290     elif method == luxi.REQ_QUERY_GROUPS:
291       (names, fields, use_locking) = args
292       logging.info("Received group query request for %s", names)
293       if use_locking:
294         raise errors.OpPrereqError("Sync queries are not allowed",
295                                    errors.ECODE_INVAL)
296       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
297       return self._Query(op)
298
299     elif method == luxi.REQ_QUERY_EXPORTS:
300       nodes, use_locking = args
301       if use_locking:
302         raise errors.OpPrereqError("Sync queries are not allowed",
303                                    errors.ECODE_INVAL)
304       logging.info("Received exports query request")
305       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
306       return self._Query(op)
307
308     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
309       fields = args
310       logging.info("Received config values query request for %s", fields)
311       op = opcodes.OpClusterConfigQuery(output_fields=fields)
312       return self._Query(op)
313
314     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
315       logging.info("Received cluster info query request")
316       op = opcodes.OpClusterQuery()
317       return self._Query(op)
318
319     elif method == luxi.REQ_QUERY_TAGS:
320       kind, name = args
321       logging.info("Received tags query request")
322       op = opcodes.OpTagsGet(kind=kind, name=name)
323       return self._Query(op)
324
325     elif method == luxi.REQ_QUERY_LOCKS:
326       (fields, sync) = args
327       logging.info("Received locks query request")
328       if sync:
329         raise NotImplementedError("Synchronous queries are not implemented")
330       return self.server.context.glm.OldStyleQueryLocks(fields)
331
332     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
333       drain_flag = args
334       logging.info("Received queue drain flag change request to %s",
335                    drain_flag)
336       return queue.SetDrainFlag(drain_flag)
337
338     elif method == luxi.REQ_SET_WATCHER_PAUSE:
339       (until, ) = args
340
341       if until is None:
342         logging.info("Received request to no longer pause the watcher")
343       else:
344         if not isinstance(until, (int, float)):
345           raise TypeError("Duration must be an integer or float")
346
347         if until < time.time():
348           raise errors.GenericError("Unable to set pause end time in the past")
349
350         logging.info("Received request to pause the watcher until %s", until)
351
352       return _SetWatcherPause(until)
353
354     else:
355       logging.info("Received invalid request '%s'", method)
356       raise ValueError("Invalid operation '%s'" % method)
357
358   def _Query(self, op):
359     """Runs the specified opcode and returns the result.
360
361     """
362     # Queries don't have a job id
363     proc = mcpu.Processor(self.server.context, None)
364
365     # TODO: Executing an opcode using locks will acquire them in blocking mode.
366     # Consider using a timeout for retries.
367     return proc.ExecOpCode(op, None)
368
369
370 class GanetiContext(object):
371   """Context common to all ganeti threads.
372
373   This class creates and holds common objects shared by all threads.
374
375   """
376   # pylint: disable-msg=W0212
377   # we do want to ensure a singleton here
378   _instance = None
379
380   def __init__(self):
381     """Constructs a new GanetiContext object.
382
383     There should be only a GanetiContext object at any time, so this
384     function raises an error if this is not the case.
385
386     """
387     assert self.__class__._instance is None, "double GanetiContext instance"
388
389     # Create global configuration object
390     self.cfg = config.ConfigWriter()
391
392     # Locking manager
393     self.glm = locking.GanetiLockManager(
394                 self.cfg.GetNodeList(),
395                 self.cfg.GetNodeGroupList(),
396                 self.cfg.GetInstanceList())
397
398     # Job queue
399     self.jobqueue = jqueue.JobQueue(self)
400
401     # setting this also locks the class against attribute modifications
402     self.__class__._instance = self
403
404   def __setattr__(self, name, value):
405     """Setting GanetiContext attributes is forbidden after initialization.
406
407     """
408     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
409     object.__setattr__(self, name, value)
410
411   def AddNode(self, node, ec_id):
412     """Adds a node to the configuration and lock manager.
413
414     """
415     # Add it to the configuration
416     self.cfg.AddNode(node, ec_id)
417
418     # If preseeding fails it'll not be added
419     self.jobqueue.AddNode(node)
420
421     # Add the new node to the Ganeti Lock Manager
422     self.glm.add(locking.LEVEL_NODE, node.name)
423
424   def ReaddNode(self, node):
425     """Updates a node that's already in the configuration
426
427     """
428     # Synchronize the queue again
429     self.jobqueue.AddNode(node)
430
431   def RemoveNode(self, name):
432     """Removes a node from the configuration and lock manager.
433
434     """
435     # Remove node from configuration
436     self.cfg.RemoveNode(name)
437
438     # Notify job queue
439     self.jobqueue.RemoveNode(name)
440
441     # Remove the node from the Ganeti Lock Manager
442     self.glm.remove(locking.LEVEL_NODE, name)
443
444
445 def _SetWatcherPause(until):
446   """Creates or removes the watcher pause file.
447
448   @type until: None or int
449   @param until: Unix timestamp saying until when the watcher shouldn't run
450
451   """
452   if until is None:
453     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
454   else:
455     utils.WriteFile(constants.WATCHER_PAUSEFILE,
456                     data="%d\n" % (until, ))
457
458   return until
459
460
461 @rpc.RunWithRPC
462 def CheckAgreement():
463   """Check the agreement on who is the master.
464
465   The function uses a very simple algorithm: we must get more positive
466   than negative answers. Since in most of the cases we are the master,
467   we'll use our own config file for getting the node list. In the
468   future we could collect the current node list from our (possibly
469   obsolete) known nodes.
470
471   In order to account for cold-start of all nodes, we retry for up to
472   a minute until we get a real answer as the top-voted one. If the
473   nodes are more out-of-sync, for now manual startup of the master
474   should be attempted.
475
476   Note that for a even number of nodes cluster, we need at least half
477   of the nodes (beside ourselves) to vote for us. This creates a
478   problem on two-node clusters, since in this case we require the
479   other node to be up too to confirm our status.
480
481   """
482   myself = netutils.Hostname.GetSysName()
483   #temp instantiation of a config writer, used only to get the node list
484   cfg = config.ConfigWriter()
485   node_list = cfg.GetNodeList()
486   del cfg
487   retries = 6
488   while retries > 0:
489     votes = bootstrap.GatherMasterVotes(node_list)
490     if not votes:
491       # empty node list, this is a one node cluster
492       return True
493     if votes[0][0] is None:
494       retries -= 1
495       time.sleep(10)
496       continue
497     break
498   if retries == 0:
499     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
500                      " after multiple retries. Aborting startup")
501     logging.critical("Use the --no-voting option if you understand what"
502                      " effects it has on the cluster state")
503     return False
504   # here a real node is at the top of the list
505   all_votes = sum(item[1] for item in votes)
506   top_node, top_votes = votes[0]
507
508   result = False
509   if top_node != myself:
510     logging.critical("It seems we are not the master (top-voted node"
511                      " is %s with %d out of %d votes)", top_node, top_votes,
512                      all_votes)
513   elif top_votes < all_votes - top_votes:
514     logging.critical("It seems we are not the master (%d votes for,"
515                      " %d votes against)", top_votes, all_votes - top_votes)
516   else:
517     result = True
518
519   return result
520
521
522 @rpc.RunWithRPC
523 def ActivateMasterIP():
524   # activate ip
525   master_node = ssconf.SimpleStore().GetMasterNode()
526   result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
527   msg = result.fail_msg
528   if msg:
529     logging.error("Can't activate master IP address: %s", msg)
530
531
532 def CheckMasterd(options, args):
533   """Initial checks whether to run or exit with a failure.
534
535   """
536   if args: # masterd doesn't take any arguments
537     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
538     sys.exit(constants.EXIT_FAILURE)
539
540   ssconf.CheckMaster(options.debug)
541
542   try:
543     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
544     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
545   except KeyError:
546     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
547                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
548     sys.exit(constants.EXIT_FAILURE)
549
550   # Check the configuration is sane before anything else
551   try:
552     config.ConfigWriter()
553   except errors.ConfigVersionMismatch, err:
554     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
555     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
556     print >> sys.stderr,  \
557         ("Configuration version mismatch. The current Ganeti software"
558          " expects version %s, but the on-disk configuration file has"
559          " version %s. This is likely the result of upgrading the"
560          " software without running the upgrade procedure. Please contact"
561          " your cluster administrator or complete the upgrade using the"
562          " cfgupgrade utility, after reading the upgrade notes." %
563          (v1, v2))
564     sys.exit(constants.EXIT_FAILURE)
565   except errors.ConfigurationError, err:
566     print >> sys.stderr, \
567         ("Configuration error while opening the configuration file: %s\n"
568          "This might be caused by an incomplete software upgrade or"
569          " by a corrupted configuration file. Until the problem is fixed"
570          " the master daemon cannot start." % str(err))
571     sys.exit(constants.EXIT_FAILURE)
572
573   # If CheckMaster didn't fail we believe we are the master, but we have to
574   # confirm with the other nodes.
575   if options.no_voting:
576     if not options.yes_do_it:
577       sys.stdout.write("The 'no voting' option has been selected.\n")
578       sys.stdout.write("This is dangerous, please confirm by"
579                        " typing uppercase 'yes': ")
580       sys.stdout.flush()
581
582       confirmation = sys.stdin.readline().strip()
583       if confirmation != "YES":
584         print >> sys.stderr, "Aborting."
585         sys.exit(constants.EXIT_FAILURE)
586
587   else:
588     # CheckAgreement uses RPC and threads, hence it needs to be run in
589     # a separate process before we call utils.Daemonize in the current
590     # process.
591     if not utils.RunInSeparateProcess(CheckAgreement):
592       sys.exit(constants.EXIT_FAILURE)
593
594   # ActivateMasterIP also uses RPC/threads, so we run it again via a
595   # separate process.
596
597   # TODO: decide whether failure to activate the master IP is a fatal error
598   utils.RunInSeparateProcess(ActivateMasterIP)
599
600
601 def PrepMasterd(options, _):
602   """Prep master daemon function, executed with the PID file held.
603
604   """
605   # This is safe to do as the pid file guarantees against
606   # concurrent execution.
607   utils.RemoveFile(constants.MASTER_SOCKET)
608
609   mainloop = daemon.Mainloop()
610   master = MasterServer(mainloop, constants.MASTER_SOCKET,
611                         options.uid, options.gid)
612   return (mainloop, master)
613
614
615 def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
616   """Main master daemon function, executed with the PID file held.
617
618   """
619   (mainloop, master) = prep_data
620   try:
621     rpc.Init()
622     try:
623       master.setup_queue()
624       try:
625         mainloop.Run()
626       finally:
627         master.server_cleanup()
628     finally:
629       rpc.Shutdown()
630   finally:
631     utils.RemoveFile(constants.MASTER_SOCKET)
632
633
634 def Main():
635   """Main function"""
636   parser = OptionParser(description="Ganeti master daemon",
637                         usage="%prog [-f] [-d]",
638                         version="%%prog (ganeti) %s" %
639                         constants.RELEASE_VERSION)
640   parser.add_option("--no-voting", dest="no_voting",
641                     help="Do not check that the nodes agree on this node"
642                     " being the master and start the daemon unconditionally",
643                     default=False, action="store_true")
644   parser.add_option("--yes-do-it", dest="yes_do_it",
645                     help="Override interactive check for --no-voting",
646                     default=False, action="store_true")
647   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
648                      ExecMasterd, multithreaded=True)