constants: Rename QR_OP_*, add QR_VIA_RAPI
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66
67
68 class ClientRequestWorker(workerpool.BaseWorker):
69   # pylint: disable-msg=W0221
70   def RunTask(self, server, message, client):
71     """Process the request.
72
73     """
74     client_ops = ClientOps(server)
75
76     try:
77       (method, args, version) = luxi.ParseRequest(message)
78     except luxi.ProtocolError, err:
79       logging.error("Protocol Error: %s", err)
80       client.close_log()
81       return
82
83     success = False
84     try:
85       # Verify client's version if there was one in the request
86       if version is not None and version != constants.LUXI_VERSION:
87         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88                                (constants.LUXI_VERSION, version))
89
90       result = client_ops.handle_request(method, args)
91       success = True
92     except errors.GenericError, err:
93       logging.exception("Unexpected exception")
94       success = False
95       result = errors.EncodeException(err)
96     except:
97       logging.exception("Unexpected exception")
98       err = sys.exc_info()
99       result = "Caught exception: %s" % str(err[1])
100
101     try:
102       reply = luxi.FormatResponse(success, result)
103       client.send_message(reply)
104       # awake the main thread so that it can write out the data.
105       server.awaker.signal()
106     except: # pylint: disable-msg=W0702
107       logging.exception("Send error")
108       client.close_log()
109
110
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112   """Handler for master peers.
113
114   """
115   _MAX_UNHANDLED = 1
116   def __init__(self, server, connected_socket, client_address, family):
117     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
118                                                  client_address,
119                                                  constants.LUXI_EOM,
120                                                  family, self._MAX_UNHANDLED)
121     self.server = server
122
123   def handle_message(self, message, _):
124     self.server.request_workers.AddTask((self.server, message, self))
125
126
127 class MasterServer(daemon.AsyncStreamServer):
128   """Master Server.
129
130   This is the main asynchronous master server. It handles connections to the
131   master socket.
132
133   """
134   family = socket.AF_UNIX
135
136   def __init__(self, mainloop, address, uid, gid):
137     """MasterServer constructor
138
139     @type mainloop: ganeti.daemon.Mainloop
140     @param mainloop: Mainloop used to poll for I/O events
141     @param address: the unix socket address to bind the MasterServer to
142     @param uid: The uid of the owner of the socket
143     @param gid: The gid of the owner of the socket
144
145     """
146     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148     os.chmod(temp_name, 0770)
149     os.chown(temp_name, uid, gid)
150     os.rename(temp_name, address)
151
152     self.mainloop = mainloop
153     self.awaker = daemon.AsyncAwaker()
154
155     # We'll only start threads once we've forked.
156     self.context = None
157     self.request_workers = None
158
159   def handle_connection(self, connected_socket, client_address):
160     # TODO: add connection count and limit the number of open connections to a
161     # maximum number to avoid breaking for lack of file descriptors or memory.
162     MasterClientHandler(self, connected_socket, client_address, self.family)
163
164   def setup_queue(self):
165     self.context = GanetiContext()
166     self.request_workers = workerpool.WorkerPool("ClientReq",
167                                                  CLIENT_REQUEST_WORKERS,
168                                                  ClientRequestWorker)
169
170   def server_cleanup(self):
171     """Cleanup the server.
172
173     This involves shutting down the processor threads and the master
174     socket.
175
176     """
177     try:
178       self.close()
179     finally:
180       if self.request_workers:
181         self.request_workers.TerminateWorkers()
182       if self.context:
183         self.context.jobqueue.Shutdown()
184
185
186 class ClientOps:
187   """Class holding high-level client operations."""
188   def __init__(self, server):
189     self.server = server
190
191   def handle_request(self, method, args): # pylint: disable-msg=R0911
192     queue = self.server.context.jobqueue
193
194     # TODO: Parameter validation
195
196     # TODO: Rewrite to not exit in each 'if/elif' branch
197
198     if method == luxi.REQ_SUBMIT_JOB:
199       logging.info("Received new job")
200       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201       return queue.SubmitJob(ops)
202
203     if method == luxi.REQ_SUBMIT_MANY_JOBS:
204       logging.info("Received multiple jobs")
205       jobs = []
206       for ops in args:
207         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208       return queue.SubmitManyJobs(jobs)
209
210     elif method == luxi.REQ_CANCEL_JOB:
211       job_id = args
212       logging.info("Received job cancel request for %s", job_id)
213       return queue.CancelJob(job_id)
214
215     elif method == luxi.REQ_ARCHIVE_JOB:
216       job_id = args
217       logging.info("Received job archive request for %s", job_id)
218       return queue.ArchiveJob(job_id)
219
220     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221       (age, timeout) = args
222       logging.info("Received job autoarchive request for age %s, timeout %s",
223                    age, timeout)
224       return queue.AutoArchiveJobs(age, timeout)
225
226     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228       logging.info("Received job poll request for %s", job_id)
229       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230                                      prev_log_serial, timeout)
231
232     elif method == luxi.REQ_QUERY:
233       req = objects.QueryRequest.FromDict(args)
234
235       if req.what in constants.QR_VIA_OP:
236         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
237                                              filter=req.filter))
238       elif req.what == constants.QR_LOCK:
239         if req.filter is not None:
240           raise errors.OpPrereqError("Lock queries can't be filtered")
241         return self.server.context.glm.QueryLocks(req.fields)
242       elif req.what in constants.QR_VIA_LUXI:
243         raise NotImplementedError
244       else:
245         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
246                                    errors.ECODE_INVAL)
247
248       return result
249
250     elif method == luxi.REQ_QUERY_FIELDS:
251       req = objects.QueryFieldsRequest.FromDict(args)
252
253       if req.what in constants.QR_VIA_OP:
254         result = self._Query(opcodes.OpQueryFields(what=req.what,
255                                                    fields=req.fields))
256       elif req.what == constants.QR_LOCK:
257         return query.QueryFields(query.LOCK_FIELDS, req.fields)
258       elif req.what in constants.QR_VIA_LUXI:
259         raise NotImplementedError
260       else:
261         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
262                                    errors.ECODE_INVAL)
263
264       return result
265
266     elif method == luxi.REQ_QUERY_JOBS:
267       (job_ids, fields) = args
268       if isinstance(job_ids, (tuple, list)) and job_ids:
269         msg = utils.CommaJoin(job_ids)
270       else:
271         msg = str(job_ids)
272       logging.info("Received job query request for %s", msg)
273       return queue.QueryJobs(job_ids, fields)
274
275     elif method == luxi.REQ_QUERY_INSTANCES:
276       (names, fields, use_locking) = args
277       logging.info("Received instance query request for %s", names)
278       if use_locking:
279         raise errors.OpPrereqError("Sync queries are not allowed",
280                                    errors.ECODE_INVAL)
281       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
282                                    use_locking=use_locking)
283       return self._Query(op)
284
285     elif method == luxi.REQ_QUERY_NODES:
286       (names, fields, use_locking) = args
287       logging.info("Received node query request for %s", names)
288       if use_locking:
289         raise errors.OpPrereqError("Sync queries are not allowed",
290                                    errors.ECODE_INVAL)
291       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
292                                use_locking=use_locking)
293       return self._Query(op)
294
295     elif method == luxi.REQ_QUERY_GROUPS:
296       (names, fields, use_locking) = args
297       logging.info("Received group query request for %s", names)
298       if use_locking:
299         raise errors.OpPrereqError("Sync queries are not allowed",
300                                    errors.ECODE_INVAL)
301       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
302       return self._Query(op)
303
304     elif method == luxi.REQ_QUERY_EXPORTS:
305       nodes, use_locking = args
306       if use_locking:
307         raise errors.OpPrereqError("Sync queries are not allowed",
308                                    errors.ECODE_INVAL)
309       logging.info("Received exports query request")
310       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
311       return self._Query(op)
312
313     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
314       fields = args
315       logging.info("Received config values query request for %s", fields)
316       op = opcodes.OpClusterConfigQuery(output_fields=fields)
317       return self._Query(op)
318
319     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
320       logging.info("Received cluster info query request")
321       op = opcodes.OpClusterQuery()
322       return self._Query(op)
323
324     elif method == luxi.REQ_QUERY_TAGS:
325       kind, name = args
326       logging.info("Received tags query request")
327       op = opcodes.OpTagsGet(kind=kind, name=name)
328       return self._Query(op)
329
330     elif method == luxi.REQ_QUERY_LOCKS:
331       (fields, sync) = args
332       logging.info("Received locks query request")
333       if sync:
334         raise NotImplementedError("Synchronous queries are not implemented")
335       return self.server.context.glm.OldStyleQueryLocks(fields)
336
337     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
338       drain_flag = args
339       logging.info("Received queue drain flag change request to %s",
340                    drain_flag)
341       return queue.SetDrainFlag(drain_flag)
342
343     elif method == luxi.REQ_SET_WATCHER_PAUSE:
344       (until, ) = args
345
346       if until is None:
347         logging.info("Received request to no longer pause the watcher")
348       else:
349         if not isinstance(until, (int, float)):
350           raise TypeError("Duration must be an integer or float")
351
352         if until < time.time():
353           raise errors.GenericError("Unable to set pause end time in the past")
354
355         logging.info("Received request to pause the watcher until %s", until)
356
357       return _SetWatcherPause(until)
358
359     else:
360       logging.info("Received invalid request '%s'", method)
361       raise ValueError("Invalid operation '%s'" % method)
362
363   def _Query(self, op):
364     """Runs the specified opcode and returns the result.
365
366     """
367     # Queries don't have a job id
368     proc = mcpu.Processor(self.server.context, None)
369
370     # TODO: Executing an opcode using locks will acquire them in blocking mode.
371     # Consider using a timeout for retries.
372     return proc.ExecOpCode(op, None)
373
374
375 class GanetiContext(object):
376   """Context common to all ganeti threads.
377
378   This class creates and holds common objects shared by all threads.
379
380   """
381   # pylint: disable-msg=W0212
382   # we do want to ensure a singleton here
383   _instance = None
384
385   def __init__(self):
386     """Constructs a new GanetiContext object.
387
388     There should be only a GanetiContext object at any time, so this
389     function raises an error if this is not the case.
390
391     """
392     assert self.__class__._instance is None, "double GanetiContext instance"
393
394     # Create global configuration object
395     self.cfg = config.ConfigWriter()
396
397     # Locking manager
398     self.glm = locking.GanetiLockManager(
399                 self.cfg.GetNodeList(),
400                 self.cfg.GetNodeGroupList(),
401                 self.cfg.GetInstanceList())
402
403     # Job queue
404     self.jobqueue = jqueue.JobQueue(self)
405
406     # setting this also locks the class against attribute modifications
407     self.__class__._instance = self
408
409   def __setattr__(self, name, value):
410     """Setting GanetiContext attributes is forbidden after initialization.
411
412     """
413     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
414     object.__setattr__(self, name, value)
415
416   def AddNode(self, node, ec_id):
417     """Adds a node to the configuration and lock manager.
418
419     """
420     # Add it to the configuration
421     self.cfg.AddNode(node, ec_id)
422
423     # If preseeding fails it'll not be added
424     self.jobqueue.AddNode(node)
425
426     # Add the new node to the Ganeti Lock Manager
427     self.glm.add(locking.LEVEL_NODE, node.name)
428
429   def ReaddNode(self, node):
430     """Updates a node that's already in the configuration
431
432     """
433     # Synchronize the queue again
434     self.jobqueue.AddNode(node)
435
436   def RemoveNode(self, name):
437     """Removes a node from the configuration and lock manager.
438
439     """
440     # Remove node from configuration
441     self.cfg.RemoveNode(name)
442
443     # Notify job queue
444     self.jobqueue.RemoveNode(name)
445
446     # Remove the node from the Ganeti Lock Manager
447     self.glm.remove(locking.LEVEL_NODE, name)
448
449
450 def _SetWatcherPause(until):
451   """Creates or removes the watcher pause file.
452
453   @type until: None or int
454   @param until: Unix timestamp saying until when the watcher shouldn't run
455
456   """
457   if until is None:
458     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
459   else:
460     utils.WriteFile(constants.WATCHER_PAUSEFILE,
461                     data="%d\n" % (until, ))
462
463   return until
464
465
466 @rpc.RunWithRPC
467 def CheckAgreement():
468   """Check the agreement on who is the master.
469
470   The function uses a very simple algorithm: we must get more positive
471   than negative answers. Since in most of the cases we are the master,
472   we'll use our own config file for getting the node list. In the
473   future we could collect the current node list from our (possibly
474   obsolete) known nodes.
475
476   In order to account for cold-start of all nodes, we retry for up to
477   a minute until we get a real answer as the top-voted one. If the
478   nodes are more out-of-sync, for now manual startup of the master
479   should be attempted.
480
481   Note that for a even number of nodes cluster, we need at least half
482   of the nodes (beside ourselves) to vote for us. This creates a
483   problem on two-node clusters, since in this case we require the
484   other node to be up too to confirm our status.
485
486   """
487   myself = netutils.Hostname.GetSysName()
488   #temp instantiation of a config writer, used only to get the node list
489   cfg = config.ConfigWriter()
490   node_list = cfg.GetNodeList()
491   del cfg
492   retries = 6
493   while retries > 0:
494     votes = bootstrap.GatherMasterVotes(node_list)
495     if not votes:
496       # empty node list, this is a one node cluster
497       return True
498     if votes[0][0] is None:
499       retries -= 1
500       time.sleep(10)
501       continue
502     break
503   if retries == 0:
504     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
505                      " after multiple retries. Aborting startup")
506     logging.critical("Use the --no-voting option if you understand what"
507                      " effects it has on the cluster state")
508     return False
509   # here a real node is at the top of the list
510   all_votes = sum(item[1] for item in votes)
511   top_node, top_votes = votes[0]
512
513   result = False
514   if top_node != myself:
515     logging.critical("It seems we are not the master (top-voted node"
516                      " is %s with %d out of %d votes)", top_node, top_votes,
517                      all_votes)
518   elif top_votes < all_votes - top_votes:
519     logging.critical("It seems we are not the master (%d votes for,"
520                      " %d votes against)", top_votes, all_votes - top_votes)
521   else:
522     result = True
523
524   return result
525
526
527 @rpc.RunWithRPC
528 def ActivateMasterIP():
529   # activate ip
530   master_node = ssconf.SimpleStore().GetMasterNode()
531   result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
532   msg = result.fail_msg
533   if msg:
534     logging.error("Can't activate master IP address: %s", msg)
535
536
537 def CheckMasterd(options, args):
538   """Initial checks whether to run or exit with a failure.
539
540   """
541   if args: # masterd doesn't take any arguments
542     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543     sys.exit(constants.EXIT_FAILURE)
544
545   ssconf.CheckMaster(options.debug)
546
547   try:
548     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
549     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
550   except KeyError:
551     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
552                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
553     sys.exit(constants.EXIT_FAILURE)
554
555   # Check the configuration is sane before anything else
556   try:
557     config.ConfigWriter()
558   except errors.ConfigVersionMismatch, err:
559     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
560     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
561     print >> sys.stderr,  \
562         ("Configuration version mismatch. The current Ganeti software"
563          " expects version %s, but the on-disk configuration file has"
564          " version %s. This is likely the result of upgrading the"
565          " software without running the upgrade procedure. Please contact"
566          " your cluster administrator or complete the upgrade using the"
567          " cfgupgrade utility, after reading the upgrade notes." %
568          (v1, v2))
569     sys.exit(constants.EXIT_FAILURE)
570   except errors.ConfigurationError, err:
571     print >> sys.stderr, \
572         ("Configuration error while opening the configuration file: %s\n"
573          "This might be caused by an incomplete software upgrade or"
574          " by a corrupted configuration file. Until the problem is fixed"
575          " the master daemon cannot start." % str(err))
576     sys.exit(constants.EXIT_FAILURE)
577
578   # If CheckMaster didn't fail we believe we are the master, but we have to
579   # confirm with the other nodes.
580   if options.no_voting:
581     if options.yes_do_it:
582       return
583
584     sys.stdout.write("The 'no voting' option has been selected.\n")
585     sys.stdout.write("This is dangerous, please confirm by"
586                      " typing uppercase 'yes': ")
587     sys.stdout.flush()
588
589     confirmation = sys.stdin.readline().strip()
590     if confirmation != "YES":
591       print >> sys.stderr, "Aborting."
592       sys.exit(constants.EXIT_FAILURE)
593
594     return
595
596   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
597   # process before we call utils.Daemonize in the current process.
598   if not utils.RunInSeparateProcess(CheckAgreement):
599     sys.exit(constants.EXIT_FAILURE)
600
601   # ActivateMasterIP also uses RPC/threads, so we run it again via a
602   # separate process.
603
604   # TODO: decide whether failure to activate the master IP is a fatal error
605   utils.RunInSeparateProcess(ActivateMasterIP)
606
607
608 def PrepMasterd(options, _):
609   """Prep master daemon function, executed with the PID file held.
610
611   """
612   # This is safe to do as the pid file guarantees against
613   # concurrent execution.
614   utils.RemoveFile(constants.MASTER_SOCKET)
615
616   mainloop = daemon.Mainloop()
617   master = MasterServer(mainloop, constants.MASTER_SOCKET,
618                         options.uid, options.gid)
619   return (mainloop, master)
620
621
622 def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
623   """Main master daemon function, executed with the PID file held.
624
625   """
626   (mainloop, master) = prep_data
627   try:
628     rpc.Init()
629     try:
630       master.setup_queue()
631       try:
632         mainloop.Run()
633       finally:
634         master.server_cleanup()
635     finally:
636       rpc.Shutdown()
637   finally:
638     utils.RemoveFile(constants.MASTER_SOCKET)
639
640
641 def Main():
642   """Main function"""
643   parser = OptionParser(description="Ganeti master daemon",
644                         usage="%prog [-f] [-d]",
645                         version="%%prog (ganeti) %s" %
646                         constants.RELEASE_VERSION)
647   parser.add_option("--no-voting", dest="no_voting",
648                     help="Do not check that the nodes agree on this node"
649                     " being the master and start the daemon unconditionally",
650                     default=False, action="store_true")
651   parser.add_option("--yes-do-it", dest="yes_do_it",
652                     help="Override interactive check for --no-voting",
653                     default=False, action="store_true")
654   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
655                      ExecMasterd, multithreaded=True)