Further pylint disables, mostly for Unused args
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-masterd
31
32 import os
33 import sys
34 import SocketServer
35 import time
36 import collections
37 import signal
38 import logging
39
40 from optparse import OptionParser
41
42 from ganeti import config
43 from ganeti import constants
44 from ganeti import daemon
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import workerpool
54 from ganeti import rpc
55 from ganeti import bootstrap
56 from ganeti import serializer
57
58
59 CLIENT_REQUEST_WORKERS = 16
60
61 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63
64
65 class ClientRequestWorker(workerpool.BaseWorker):
66    # pylint: disable-msg=W0221
67   def RunTask(self, server, request, client_address):
68     """Process the request.
69
70     This is copied from the code in ThreadingMixIn.
71
72     """
73     try:
74       server.finish_request(request, client_address)
75       server.close_request(request)
76     except: # pylint: disable-msg=W0702
77       server.handle_error(request, client_address)
78       server.close_request(request)
79
80
81 class IOServer(SocketServer.UnixStreamServer):
82   """IO thread class.
83
84   This class takes care of initializing the other threads, setting
85   signal handlers (which are processed only in this thread), and doing
86   cleanup at shutdown.
87
88   """
89   def __init__(self, address, rqhandler):
90     """IOServer constructor
91
92     @param address: the address to bind this IOServer to
93     @param rqhandler: RequestHandler type object
94
95     """
96     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97
98     # We'll only start threads once we've forked.
99     self.context = None
100     self.request_workers = None
101
102   def setup_queue(self):
103     self.context = GanetiContext()
104     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105                                                  ClientRequestWorker)
106
107   def process_request(self, request, client_address):
108     """Add task to workerpool to process request.
109
110     """
111     self.request_workers.AddTask(self, request, client_address)
112
113   @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114   def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115     """Handle one request at a time until told to quit."""
116     assert isinstance(signal_handlers, dict) and \
117            len(signal_handlers) > 0, \
118            "Broken SignalHandled decorator"
119     # Since we use SignalHandled only once, the resulting dict will map all
120     # signals to the same handler. We'll just use the first one.
121     sighandler = signal_handlers.values()[0]
122     while not sighandler.called:
123       self.handle_request()
124
125   def server_cleanup(self):
126     """Cleanup the server.
127
128     This involves shutting down the processor threads and the master
129     socket.
130
131     """
132     try:
133       self.server_close()
134     finally:
135       if self.request_workers:
136         self.request_workers.TerminateWorkers()
137       if self.context:
138         self.context.jobqueue.Shutdown()
139
140
141 class ClientRqHandler(SocketServer.BaseRequestHandler):
142   """Client handler"""
143   EOM = '\3'
144   READ_SIZE = 4096
145
146   def setup(self):
147     # pylint: disable-msg=W0201
148     # setup() is the api for initialising for this class
149     self._buffer = ""
150     self._msgs = collections.deque()
151     self._ops = ClientOps(self.server)
152
153   def handle(self):
154     while True:
155       msg = self.read_message()
156       if msg is None:
157         logging.debug("client closed connection")
158         break
159
160       request = serializer.LoadJson(msg)
161       logging.debug("request: %s", request)
162       if not isinstance(request, dict):
163         logging.error("wrong request received: %s", msg)
164         break
165
166       method = request.get(luxi.KEY_METHOD, None)
167       args = request.get(luxi.KEY_ARGS, None)
168       if method is None or args is None:
169         logging.error("no method or args in request")
170         break
171
172       success = False
173       try:
174         result = self._ops.handle_request(method, args)
175         success = True
176       except errors.GenericError, err:
177         success = False
178         result = errors.EncodeException(err)
179       except:
180         logging.error("Unexpected exception", exc_info=True)
181         err = sys.exc_info()
182         result = "Caught exception: %s" % str(err[1])
183
184       response = {
185         luxi.KEY_SUCCESS: success,
186         luxi.KEY_RESULT: result,
187         }
188       logging.debug("response: %s", response)
189       self.send_message(serializer.DumpJson(response))
190
191   def read_message(self):
192     while not self._msgs:
193       data = self.request.recv(self.READ_SIZE)
194       if not data:
195         return None
196       new_msgs = (self._buffer + data).split(self.EOM)
197       self._buffer = new_msgs.pop()
198       self._msgs.extend(new_msgs)
199     return self._msgs.popleft()
200
201   def send_message(self, msg):
202     #print "sending", msg
203     # TODO: sendall is not guaranteed to send everything
204     self.request.sendall(msg + self.EOM)
205
206
207 class ClientOps:
208   """Class holding high-level client operations."""
209   def __init__(self, server):
210     self.server = server
211
212   def handle_request(self, method, args): # pylint: disable-msg=R0911
213     queue = self.server.context.jobqueue
214
215     # TODO: Parameter validation
216
217     # TODO: Rewrite to not exit in each 'if/elif' branch
218
219     if method == luxi.REQ_SUBMIT_JOB:
220       logging.info("Received new job")
221       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222       return queue.SubmitJob(ops)
223
224     if method == luxi.REQ_SUBMIT_MANY_JOBS:
225       logging.info("Received multiple jobs")
226       jobs = []
227       for ops in args:
228         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229       return queue.SubmitManyJobs(jobs)
230
231     elif method == luxi.REQ_CANCEL_JOB:
232       job_id = args
233       logging.info("Received job cancel request for %s", job_id)
234       return queue.CancelJob(job_id)
235
236     elif method == luxi.REQ_ARCHIVE_JOB:
237       job_id = args
238       logging.info("Received job archive request for %s", job_id)
239       return queue.ArchiveJob(job_id)
240
241     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242       (age, timeout) = args
243       logging.info("Received job autoarchive request for age %s, timeout %s",
244                    age, timeout)
245       return queue.AutoArchiveJobs(age, timeout)
246
247     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249       logging.info("Received job poll request for %s", job_id)
250       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251                                      prev_log_serial, timeout)
252
253     elif method == luxi.REQ_QUERY_JOBS:
254       (job_ids, fields) = args
255       if isinstance(job_ids, (tuple, list)) and job_ids:
256         msg = utils.CommaJoin(job_ids)
257       else:
258         msg = str(job_ids)
259       logging.info("Received job query request for %s", msg)
260       return queue.QueryJobs(job_ids, fields)
261
262     elif method == luxi.REQ_QUERY_INSTANCES:
263       (names, fields, use_locking) = args
264       logging.info("Received instance query request for %s", names)
265       if use_locking:
266         raise errors.OpPrereqError("Sync queries are not allowed",
267                                    errors.ECODE_INVAL)
268       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269                                     use_locking=use_locking)
270       return self._Query(op)
271
272     elif method == luxi.REQ_QUERY_NODES:
273       (names, fields, use_locking) = args
274       logging.info("Received node query request for %s", names)
275       if use_locking:
276         raise errors.OpPrereqError("Sync queries are not allowed",
277                                    errors.ECODE_INVAL)
278       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279                                 use_locking=use_locking)
280       return self._Query(op)
281
282     elif method == luxi.REQ_QUERY_EXPORTS:
283       nodes, use_locking = args
284       if use_locking:
285         raise errors.OpPrereqError("Sync queries are not allowed",
286                                    errors.ECODE_INVAL)
287       logging.info("Received exports query request")
288       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289       return self._Query(op)
290
291     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292       fields = args
293       logging.info("Received config values query request for %s", fields)
294       op = opcodes.OpQueryConfigValues(output_fields=fields)
295       return self._Query(op)
296
297     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298       logging.info("Received cluster info query request")
299       op = opcodes.OpQueryClusterInfo()
300       return self._Query(op)
301
302     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
303       drain_flag = args
304       logging.info("Received queue drain flag change request to %s",
305                    drain_flag)
306       return queue.SetDrainFlag(drain_flag)
307
308     elif method == luxi.REQ_SET_WATCHER_PAUSE:
309       (until, ) = args
310
311       if until is None:
312         logging.info("Received request to no longer pause the watcher")
313       else:
314         if not isinstance(until, (int, float)):
315           raise TypeError("Duration must be an integer or float")
316
317         if until < time.time():
318           raise errors.GenericError("Unable to set pause end time in the past")
319
320         logging.info("Received request to pause the watcher until %s", until)
321
322       return _SetWatcherPause(until)
323
324     else:
325       logging.info("Received invalid request '%s'", method)
326       raise ValueError("Invalid operation '%s'" % method)
327
328   def _Query(self, op):
329     """Runs the specified opcode and returns the result.
330
331     """
332     # Queries don't have a job id
333     proc = mcpu.Processor(self.server.context, None)
334     return proc.ExecOpCode(op, None)
335
336
337 class GanetiContext(object):
338   """Context common to all ganeti threads.
339
340   This class creates and holds common objects shared by all threads.
341
342   """
343   # pylint: disable-msg=W0212
344   # we do want to ensure a singleton here
345   _instance = None
346
347   def __init__(self):
348     """Constructs a new GanetiContext object.
349
350     There should be only a GanetiContext object at any time, so this
351     function raises an error if this is not the case.
352
353     """
354     assert self.__class__._instance is None, "double GanetiContext instance"
355
356     # Create global configuration object
357     self.cfg = config.ConfigWriter()
358
359     # Locking manager
360     self.glm = locking.GanetiLockManager(
361                 self.cfg.GetNodeList(),
362                 self.cfg.GetInstanceList())
363
364     # Job queue
365     self.jobqueue = jqueue.JobQueue(self)
366
367     # setting this also locks the class against attribute modifications
368     self.__class__._instance = self
369
370   def __setattr__(self, name, value):
371     """Setting GanetiContext attributes is forbidden after initialization.
372
373     """
374     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
375     object.__setattr__(self, name, value)
376
377   def AddNode(self, node, ec_id):
378     """Adds a node to the configuration and lock manager.
379
380     """
381     # Add it to the configuration
382     self.cfg.AddNode(node, ec_id)
383
384     # If preseeding fails it'll not be added
385     self.jobqueue.AddNode(node)
386
387     # Add the new node to the Ganeti Lock Manager
388     self.glm.add(locking.LEVEL_NODE, node.name)
389
390   def ReaddNode(self, node):
391     """Updates a node that's already in the configuration
392
393     """
394     # Synchronize the queue again
395     self.jobqueue.AddNode(node)
396
397   def RemoveNode(self, name):
398     """Removes a node from the configuration and lock manager.
399
400     """
401     # Remove node from configuration
402     self.cfg.RemoveNode(name)
403
404     # Notify job queue
405     self.jobqueue.RemoveNode(name)
406
407     # Remove the node from the Ganeti Lock Manager
408     self.glm.remove(locking.LEVEL_NODE, name)
409
410
411 def _SetWatcherPause(until):
412   """Creates or removes the watcher pause file.
413
414   @type until: None or int
415   @param until: Unix timestamp saying until when the watcher shouldn't run
416
417   """
418   if until is None:
419     utils.RemoveFile(constants.WATCHER_PAUSEFILE)
420   else:
421     utils.WriteFile(constants.WATCHER_PAUSEFILE,
422                     data="%d\n" % (until, ))
423
424   return until
425
426
427 def CheckAgreement():
428   """Check the agreement on who is the master.
429
430   The function uses a very simple algorithm: we must get more positive
431   than negative answers. Since in most of the cases we are the master,
432   we'll use our own config file for getting the node list. In the
433   future we could collect the current node list from our (possibly
434   obsolete) known nodes.
435
436   In order to account for cold-start of all nodes, we retry for up to
437   a minute until we get a real answer as the top-voted one. If the
438   nodes are more out-of-sync, for now manual startup of the master
439   should be attempted.
440
441   Note that for a even number of nodes cluster, we need at least half
442   of the nodes (beside ourselves) to vote for us. This creates a
443   problem on two-node clusters, since in this case we require the
444   other node to be up too to confirm our status.
445
446   """
447   myself = utils.HostInfo().name
448   #temp instantiation of a config writer, used only to get the node list
449   cfg = config.ConfigWriter()
450   node_list = cfg.GetNodeList()
451   del cfg
452   retries = 6
453   while retries > 0:
454     votes = bootstrap.GatherMasterVotes(node_list)
455     if not votes:
456       # empty node list, this is a one node cluster
457       return True
458     if votes[0][0] is None:
459       retries -= 1
460       time.sleep(10)
461       continue
462     break
463   if retries == 0:
464     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
465                      " after multiple retries. Aborting startup")
466     return False
467   # here a real node is at the top of the list
468   all_votes = sum(item[1] for item in votes)
469   top_node, top_votes = votes[0]
470
471   result = False
472   if top_node != myself:
473     logging.critical("It seems we are not the master (top-voted node"
474                      " is %s with %d out of %d votes)", top_node, top_votes,
475                      all_votes)
476   elif top_votes < all_votes - top_votes:
477     logging.critical("It seems we are not the master (%d votes for,"
478                      " %d votes against)", top_votes, all_votes - top_votes)
479   else:
480     result = True
481
482   return result
483
484
485 def CheckAgreementWithRpc():
486   rpc.Init()
487   try:
488     return CheckAgreement()
489   finally:
490     rpc.Shutdown()
491
492
493 def _RunInSeparateProcess(fn):
494   """Runs a function in a separate process.
495
496   Note: Only boolean return values are supported.
497
498   @type fn: callable
499   @param fn: Function to be called
500   @rtype: bool
501
502   """
503   pid = os.fork()
504   if pid == 0:
505     # Child process
506     try:
507       # Call function
508       result = int(bool(fn()))
509       assert result in (0, 1)
510     except: # pylint: disable-msg=W0702
511       logging.exception("Error while calling function in separate process")
512       # 0 and 1 are reserved for the return value
513       result = 33
514
515     os._exit(result) # pylint: disable-msg=W0212
516
517   # Parent process
518
519   # Avoid zombies and check exit code
520   (_, status) = os.waitpid(pid, 0)
521
522   if os.WIFSIGNALED(status):
523     signum = os.WTERMSIG(status)
524     exitcode = None
525   else:
526     signum = None
527     exitcode = os.WEXITSTATUS(status)
528
529   if not (exitcode in (0, 1) and signum is None):
530     logging.error("Child program failed (code=%s, signal=%s)",
531                   exitcode, signum)
532     sys.exit(constants.EXIT_FAILURE)
533
534   return bool(exitcode)
535
536
537 def CheckMasterd(options, args):
538   """Initial checks whether to run or exit with a failure.
539
540   """
541   if args: # masterd doesn't take any arguments
542     print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543     sys.exit(constants.EXIT_FAILURE)
544
545   ssconf.CheckMaster(options.debug)
546
547   # If CheckMaster didn't fail we believe we are the master, but we have to
548   # confirm with the other nodes.
549   if options.no_voting:
550     if options.yes_do_it:
551       return
552
553     sys.stdout.write("The 'no voting' option has been selected.\n")
554     sys.stdout.write("This is dangerous, please confirm by"
555                      " typing uppercase 'yes': ")
556     sys.stdout.flush()
557
558     confirmation = sys.stdin.readline().strip()
559     if confirmation != "YES":
560       print >> sys.stderr, "Aborting."
561       sys.exit(constants.EXIT_FAILURE)
562
563     return
564
565   # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
566   # process before we call utils.Daemonize in the current process.
567   if not _RunInSeparateProcess(CheckAgreementWithRpc):
568     sys.exit(constants.EXIT_FAILURE)
569
570
571 def ExecMasterd (options, args): # pylint: disable-msg=W0613
572   """Main master daemon function, executed with the PID file held.
573
574   """
575   # This is safe to do as the pid file guarantees against
576   # concurrent execution.
577   utils.RemoveFile(constants.MASTER_SOCKET)
578
579   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
580   try:
581     rpc.Init()
582     try:
583       # activate ip
584       master_node = ssconf.SimpleStore().GetMasterNode()
585       result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
586       msg = result.fail_msg
587       if msg:
588         logging.error("Can't activate master IP address: %s", msg)
589
590       master.setup_queue()
591       try:
592         master.serve_forever()
593       finally:
594         master.server_cleanup()
595     finally:
596       rpc.Shutdown()
597   finally:
598     utils.RemoveFile(constants.MASTER_SOCKET)
599
600
601 def main():
602   """Main function"""
603   parser = OptionParser(description="Ganeti master daemon",
604                         usage="%prog [-f] [-d]",
605                         version="%%prog (ganeti) %s" %
606                         constants.RELEASE_VERSION)
607   parser.add_option("--no-voting", dest="no_voting",
608                     help="Do not check that the nodes agree on this node"
609                     " being the master and start the daemon unconditionally",
610                     default=False, action="store_true")
611   parser.add_option("--yes-do-it", dest="yes_do_it",
612                     help="Override interactive check for --no-voting",
613                     default=False, action="store_true")
614   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
615           (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
616          ]
617   daemon.GenericMain(constants.MASTERD, parser, dirs,
618                      CheckMasterd, ExecMasterd)
619
620
621 if __name__ == "__main__":
622   main()