Show RPC calls from config in lock monitor
[ganeti-local] / lib / server / masterd.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import grp
33 import os
34 import pwd
35 import sys
36 import socket
37 import time
38 import tempfile
39 import logging
40
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import daemon
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import netutils
58 from ganeti import objects
59 from ganeti import query
60
61
62 CLIENT_REQUEST_WORKERS = 16
63
64 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66
67
68 class ClientRequestWorker(workerpool.BaseWorker):
69   # pylint: disable=W0221
70   def RunTask(self, server, message, client):
71     """Process the request.
72
73     """
74     client_ops = ClientOps(server)
75
76     try:
77       (method, args, version) = luxi.ParseRequest(message)
78     except luxi.ProtocolError, err:
79       logging.error("Protocol Error: %s", err)
80       client.close_log()
81       return
82
83     success = False
84     try:
85       # Verify client's version if there was one in the request
86       if version is not None and version != constants.LUXI_VERSION:
87         raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88                                (constants.LUXI_VERSION, version))
89
90       result = client_ops.handle_request(method, args)
91       success = True
92     except errors.GenericError, err:
93       logging.exception("Unexpected exception")
94       success = False
95       result = errors.EncodeException(err)
96     except:
97       logging.exception("Unexpected exception")
98       err = sys.exc_info()
99       result = "Caught exception: %s" % str(err[1])
100
101     try:
102       reply = luxi.FormatResponse(success, result)
103       client.send_message(reply)
104       # awake the main thread so that it can write out the data.
105       server.awaker.signal()
106     except: # pylint: disable=W0702
107       logging.exception("Send error")
108       client.close_log()
109
110
111 class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112   """Handler for master peers.
113
114   """
115   _MAX_UNHANDLED = 1
116
117   def __init__(self, server, connected_socket, client_address, family):
118     daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119                                                  client_address,
120                                                  constants.LUXI_EOM,
121                                                  family, self._MAX_UNHANDLED)
122     self.server = server
123
124   def handle_message(self, message, _):
125     self.server.request_workers.AddTask((self.server, message, self))
126
127
128 class MasterServer(daemon.AsyncStreamServer):
129   """Master Server.
130
131   This is the main asynchronous master server. It handles connections to the
132   master socket.
133
134   """
135   family = socket.AF_UNIX
136
137   def __init__(self, mainloop, address, uid, gid):
138     """MasterServer constructor
139
140     @type mainloop: ganeti.daemon.Mainloop
141     @param mainloop: Mainloop used to poll for I/O events
142     @param address: the unix socket address to bind the MasterServer to
143     @param uid: The uid of the owner of the socket
144     @param gid: The gid of the owner of the socket
145
146     """
147     temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148     daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149     os.chmod(temp_name, 0770)
150     os.chown(temp_name, uid, gid)
151     os.rename(temp_name, address)
152
153     self.mainloop = mainloop
154     self.awaker = daemon.AsyncAwaker()
155
156     # We'll only start threads once we've forked.
157     self.context = None
158     self.request_workers = None
159
160   def handle_connection(self, connected_socket, client_address):
161     # TODO: add connection count and limit the number of open connections to a
162     # maximum number to avoid breaking for lack of file descriptors or memory.
163     MasterClientHandler(self, connected_socket, client_address, self.family)
164
165   def setup_queue(self):
166     self.context = GanetiContext()
167     self.request_workers = workerpool.WorkerPool("ClientReq",
168                                                  CLIENT_REQUEST_WORKERS,
169                                                  ClientRequestWorker)
170
171   def server_cleanup(self):
172     """Cleanup the server.
173
174     This involves shutting down the processor threads and the master
175     socket.
176
177     """
178     try:
179       self.close()
180     finally:
181       if self.request_workers:
182         self.request_workers.TerminateWorkers()
183       if self.context:
184         self.context.jobqueue.Shutdown()
185
186
187 class ClientOps:
188   """Class holding high-level client operations."""
189   def __init__(self, server):
190     self.server = server
191
192   def handle_request(self, method, args): # pylint: disable=R0911
193     queue = self.server.context.jobqueue
194
195     # TODO: Parameter validation
196     if not isinstance(args, (tuple, list)):
197       logging.info("Received invalid arguments of type '%s'", type(args))
198       raise ValueError("Invalid arguments type '%s'" % type(args))
199
200     # TODO: Rewrite to not exit in each 'if/elif' branch
201
202     if method == luxi.REQ_SUBMIT_JOB:
203       logging.info("Received new job")
204       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
205       return queue.SubmitJob(ops)
206
207     if method == luxi.REQ_SUBMIT_MANY_JOBS:
208       logging.info("Received multiple jobs")
209       jobs = []
210       for ops in args:
211         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
212       return queue.SubmitManyJobs(jobs)
213
214     elif method == luxi.REQ_CANCEL_JOB:
215       (job_id, ) = args
216       logging.info("Received job cancel request for %s", job_id)
217       return queue.CancelJob(job_id)
218
219     elif method == luxi.REQ_ARCHIVE_JOB:
220       (job_id, ) = args
221       logging.info("Received job archive request for %s", job_id)
222       return queue.ArchiveJob(job_id)
223
224     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
225       (age, timeout) = args
226       logging.info("Received job autoarchive request for age %s, timeout %s",
227                    age, timeout)
228       return queue.AutoArchiveJobs(age, timeout)
229
230     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
231       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
232       logging.info("Received job poll request for %s", job_id)
233       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
234                                      prev_log_serial, timeout)
235
236     elif method == luxi.REQ_QUERY:
237       (what, fields, qfilter) = args
238       req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
239
240       if req.what in constants.QR_VIA_OP:
241         result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
242                                              qfilter=req.qfilter))
243       elif req.what == constants.QR_LOCK:
244         if req.qfilter is not None:
245           raise errors.OpPrereqError("Lock queries can't be filtered")
246         return self.server.context.glm.QueryLocks(req.fields)
247       elif req.what in constants.QR_VIA_LUXI:
248         raise NotImplementedError
249       else:
250         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
251                                    errors.ECODE_INVAL)
252
253       return result
254
255     elif method == luxi.REQ_QUERY_FIELDS:
256       (what, fields) = args
257       req = objects.QueryFieldsRequest(what=what, fields=fields)
258
259       try:
260         fielddefs = query.ALL_FIELDS[req.what]
261       except KeyError:
262         raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
263                                    errors.ECODE_INVAL)
264
265       return query.QueryFields(fielddefs, req.fields)
266
267     elif method == luxi.REQ_QUERY_JOBS:
268       (job_ids, fields) = args
269       if isinstance(job_ids, (tuple, list)) and job_ids:
270         msg = utils.CommaJoin(job_ids)
271       else:
272         msg = str(job_ids)
273       logging.info("Received job query request for %s", msg)
274       return queue.QueryJobs(job_ids, fields)
275
276     elif method == luxi.REQ_QUERY_INSTANCES:
277       (names, fields, use_locking) = args
278       logging.info("Received instance query request for %s", names)
279       if use_locking:
280         raise errors.OpPrereqError("Sync queries are not allowed",
281                                    errors.ECODE_INVAL)
282       op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
283                                    use_locking=use_locking)
284       return self._Query(op)
285
286     elif method == luxi.REQ_QUERY_NODES:
287       (names, fields, use_locking) = args
288       logging.info("Received node query request for %s", names)
289       if use_locking:
290         raise errors.OpPrereqError("Sync queries are not allowed",
291                                    errors.ECODE_INVAL)
292       op = opcodes.OpNodeQuery(names=names, output_fields=fields,
293                                use_locking=use_locking)
294       return self._Query(op)
295
296     elif method == luxi.REQ_QUERY_GROUPS:
297       (names, fields, use_locking) = args
298       logging.info("Received group query request for %s", names)
299       if use_locking:
300         raise errors.OpPrereqError("Sync queries are not allowed",
301                                    errors.ECODE_INVAL)
302       op = opcodes.OpGroupQuery(names=names, output_fields=fields)
303       return self._Query(op)
304
305     elif method == luxi.REQ_QUERY_EXPORTS:
306       (nodes, use_locking) = args
307       if use_locking:
308         raise errors.OpPrereqError("Sync queries are not allowed",
309                                    errors.ECODE_INVAL)
310       logging.info("Received exports query request")
311       op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
312       return self._Query(op)
313
314     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
315       (fields, ) = args
316       logging.info("Received config values query request for %s", fields)
317       op = opcodes.OpClusterConfigQuery(output_fields=fields)
318       return self._Query(op)
319
320     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
321       logging.info("Received cluster info query request")
322       op = opcodes.OpClusterQuery()
323       return self._Query(op)
324
325     elif method == luxi.REQ_QUERY_TAGS:
326       (kind, name) = args
327       logging.info("Received tags query request")
328       op = opcodes.OpTagsGet(kind=kind, name=name)
329       return self._Query(op)
330
331     elif method == luxi.REQ_QUERY_LOCKS:
332       (fields, sync) = args
333       logging.info("Received locks query request")
334       if sync:
335         raise NotImplementedError("Synchronous queries are not implemented")
336       return self.server.context.glm.OldStyleQueryLocks(fields)
337
338     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
339       (drain_flag, ) = args
340       logging.info("Received queue drain flag change request to %s",
341                    drain_flag)
342       return queue.SetDrainFlag(drain_flag)
343
344     elif method == luxi.REQ_SET_WATCHER_PAUSE:
345       (until, ) = args
346
347       if until is None:
348         logging.info("Received request to no longer pause the watcher")
349       else:
350         if not isinstance(until, (int, float)):
351           raise TypeError("Duration must be an integer or float")
352
353         if until < time.time():
354           raise errors.GenericError("Unable to set pause end time in the past")
355
356         logging.info("Received request to pause the watcher until %s", until)
357
358       return _SetWatcherPause(until)
359
360     else:
361       logging.info("Received invalid request '%s'", method)
362       raise ValueError("Invalid operation '%s'" % method)
363
364   def _Query(self, op):
365     """Runs the specified opcode and returns the result.
366
367     """
368     # Queries don't have a job id
369     proc = mcpu.Processor(self.server.context, None)
370
371     # TODO: Executing an opcode using locks will acquire them in blocking mode.
372     # Consider using a timeout for retries.
373     return proc.ExecOpCode(op, None)
374
375
376 class GanetiContext(object):
377   """Context common to all ganeti threads.
378
379   This class creates and holds common objects shared by all threads.
380
381   """
382   # pylint: disable=W0212
383   # we do want to ensure a singleton here
384   _instance = None
385
386   def __init__(self):
387     """Constructs a new GanetiContext object.
388
389     There should be only a GanetiContext object at any time, so this
390     function raises an error if this is not the case.
391
392     """
393     assert self.__class__._instance is None, "double GanetiContext instance"
394
395     # Create global configuration object
396     self.cfg = config.ConfigWriter()
397
398     # Locking manager
399     self.glm = locking.GanetiLockManager(
400                 self.cfg.GetNodeList(),
401                 self.cfg.GetNodeGroupList(),
402                 self.cfg.GetInstanceList())
403
404     self.cfg.SetContext(self)
405
406     # Job queue
407     self.jobqueue = jqueue.JobQueue(self)
408
409     # RPC runner
410     self.rpc = rpc.RpcRunner(self)
411
412     # setting this also locks the class against attribute modifications
413     self.__class__._instance = self
414
415   def __setattr__(self, name, value):
416     """Setting GanetiContext attributes is forbidden after initialization.
417
418     """
419     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
420     object.__setattr__(self, name, value)
421
422   def AddNode(self, node, ec_id):
423     """Adds a node to the configuration and lock manager.
424
425     """
426     # Add it to the configuration
427     self.cfg.AddNode(node, ec_id)
428
429     # If preseeding fails it'll not be added
430     self.jobqueue.AddNode(node)
431
432     # Add the new node to the Ganeti Lock Manager
433     self.glm.add(locking.LEVEL_NODE, node.name)
434     self.glm.add(locking.LEVEL_NODE_RES, node.name)
435
436   def ReaddNode(self, node):
437     """Updates a node that's already in the configuration
438
439     """
440     # Synchronize the queue again
441     self.jobqueue.AddNode(node)
442
443   def RemoveNode(self, name):
444     """Removes a node from the configuration and lock manager.
445
446     """
447     # Remove node from configuration
448     self.cfg.RemoveNode(name)
449
450     # Notify job queue
451     self.jobqueue.RemoveNode(name)
452
453     # Remove the node from the Ganeti Lock Manager
454     self.glm.remove(locking.LEVEL_NODE, name)
455     self.glm.remove(locking.LEVEL_NODE_RES, name)
456
457
458 def _SetWatcherPause(until):
459   """Creates or removes the watcher pause file.
460
461   @type until: None or int
462   @param until: Unix timestamp saying until when the watcher shouldn't run
463
464   """
465   if until is None:
466     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
467   else:
468     utils.WriteFile(constants.WATCHER_PAUSEFILE,
469                     data="%d\n" % (until, ))
470
471   return until
472
473
474 @rpc.RunWithRPC
475 def CheckAgreement():
476   """Check the agreement on who is the master.
477
478   The function uses a very simple algorithm: we must get more positive
479   than negative answers. Since in most of the cases we are the master,
480   we'll use our own config file for getting the node list. In the
481   future we could collect the current node list from our (possibly
482   obsolete) known nodes.
483
484   In order to account for cold-start of all nodes, we retry for up to
485   a minute until we get a real answer as the top-voted one. If the
486   nodes are more out-of-sync, for now manual startup of the master
487   should be attempted.
488
489   Note that for a even number of nodes cluster, we need at least half
490   of the nodes (beside ourselves) to vote for us. This creates a
491   problem on two-node clusters, since in this case we require the
492   other node to be up too to confirm our status.
493
494   """
495   myself = netutils.Hostname.GetSysName()
496   #temp instantiation of a config writer, used only to get the node list
497   cfg = config.ConfigWriter()
498   node_list = cfg.GetNodeList()
499   del cfg
500   retries = 6
501   while retries > 0:
502     votes = bootstrap.GatherMasterVotes(node_list)
503     if not votes:
504       # empty node list, this is a one node cluster
505       return True
506     if votes[0][0] is None:
507       retries -= 1
508       time.sleep(10)
509       continue
510     break
511   if retries == 0:
512     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
513                      " after multiple retries. Aborting startup")
514     logging.critical("Use the --no-voting option if you understand what"
515                      " effects it has on the cluster state")
516     return False
517   # here a real node is at the top of the list
518   all_votes = sum(item[1] for item in votes)
519   top_node, top_votes = votes[0]
520
521   result = False
522   if top_node != myself:
523     logging.critical("It seems we are not the master (top-voted node"
524                      " is %s with %d out of %d votes)", top_node, top_votes,
525                      all_votes)
526   elif top_votes < all_votes - top_votes:
527     logging.critical("It seems we are not the master (%d votes for,"
528                      " %d votes against)", top_votes, all_votes - top_votes)
529   else:
530     result = True
531
532   return result
533
534
535 @rpc.RunWithRPC
536 def ActivateMasterIP():
537   # activate ip
538   cfg = config.ConfigWriter()
539   master_params = cfg.GetMasterNetworkParameters()
540   runner = rpc.BootstrapRunner()
541   result = runner.call_node_activate_master_ip(master_params.name,
542                                                master_params)
543
544   msg = result.fail_msg
545   if msg:
546     logging.error("Can't activate master IP address: %s", msg)
547
548
549 def CheckMasterd(options, args):
550   """Initial checks whether to run or exit with a failure.
551
552   """
553   if args: # masterd doesn't take any arguments
554     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
555     sys.exit(constants.EXIT_FAILURE)
556
557   ssconf.CheckMaster(options.debug)
558
559   try:
560     options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
561     options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
562   except KeyError:
563     print >> sys.stderr, ("User or group not existing on system: %s:%s" %
564                           (constants.MASTERD_USER, constants.DAEMONS_GROUP))
565     sys.exit(constants.EXIT_FAILURE)
566
567   # Check the configuration is sane before anything else
568   try:
569     config.ConfigWriter()
570   except errors.ConfigVersionMismatch, err:
571     v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
572     v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
573     print >> sys.stderr,  \
574         ("Configuration version mismatch. The current Ganeti software"
575          " expects version %s, but the on-disk configuration file has"
576          " version %s. This is likely the result of upgrading the"
577          " software without running the upgrade procedure. Please contact"
578          " your cluster administrator or complete the upgrade using the"
579          " cfgupgrade utility, after reading the upgrade notes." %
580          (v1, v2))
581     sys.exit(constants.EXIT_FAILURE)
582   except errors.ConfigurationError, err:
583     print >> sys.stderr, \
584         ("Configuration error while opening the configuration file: %s\n"
585          "This might be caused by an incomplete software upgrade or"
586          " by a corrupted configuration file. Until the problem is fixed"
587          " the master daemon cannot start." % str(err))
588     sys.exit(constants.EXIT_FAILURE)
589
590   # If CheckMaster didn't fail we believe we are the master, but we have to
591   # confirm with the other nodes.
592   if options.no_voting:
593     if not options.yes_do_it:
594       sys.stdout.write("The 'no voting' option has been selected.\n")
595       sys.stdout.write("This is dangerous, please confirm by"
596                        " typing uppercase 'yes': ")
597       sys.stdout.flush()
598
599       confirmation = sys.stdin.readline().strip()
600       if confirmation != "YES":
601         print >> sys.stderr, "Aborting."
602         sys.exit(constants.EXIT_FAILURE)
603
604   else:
605     # CheckAgreement uses RPC and threads, hence it needs to be run in
606     # a separate process before we call utils.Daemonize in the current
607     # process.
608     if not utils.RunInSeparateProcess(CheckAgreement):
609       sys.exit(constants.EXIT_FAILURE)
610
611   # ActivateMasterIP also uses RPC/threads, so we run it again via a
612   # separate process.
613
614   # TODO: decide whether failure to activate the master IP is a fatal error
615   utils.RunInSeparateProcess(ActivateMasterIP)
616
617
618 def PrepMasterd(options, _):
619   """Prep master daemon function, executed with the PID file held.
620
621   """
622   # This is safe to do as the pid file guarantees against
623   # concurrent execution.
624   utils.RemoveFile(constants.MASTER_SOCKET)
625
626   mainloop = daemon.Mainloop()
627   master = MasterServer(mainloop, constants.MASTER_SOCKET,
628                         options.uid, options.gid)
629   return (mainloop, master)
630
631
632 def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
633   """Main master daemon function, executed with the PID file held.
634
635   """
636   (mainloop, master) = prep_data
637   try:
638     rpc.Init()
639     try:
640       master.setup_queue()
641       try:
642         mainloop.Run()
643       finally:
644         master.server_cleanup()
645     finally:
646       rpc.Shutdown()
647   finally:
648     utils.RemoveFile(constants.MASTER_SOCKET)
649
650
651 def Main():
652   """Main function"""
653   parser = OptionParser(description="Ganeti master daemon",
654                         usage="%prog [-f] [-d]",
655                         version="%%prog (ganeti) %s" %
656                         constants.RELEASE_VERSION)
657   parser.add_option("--no-voting", dest="no_voting",
658                     help="Do not check that the nodes agree on this node"
659                     " being the master and start the daemon unconditionally",
660                     default=False, action="store_true")
661   parser.add_option("--yes-do-it", dest="yes_do_it",
662                     help="Override interactive check for --no-voting",
663                     default=False, action="store_true")
664   daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
665                      ExecMasterd, multithreaded=True)