Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 2feecf12

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import os
33
import sys
34
import SocketServer
35
import time
36
import collections
37
import signal
38
import logging
39

    
40
from optparse import OptionParser
41

    
42
from ganeti import config
43
from ganeti import constants
44
from ganeti import daemon
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
   # pylint: disable-msg=W0221
66
  def RunTask(self, server, request, client_address):
67
    """Process the request.
68

    
69
    This is copied from the code in ThreadingMixIn.
70

    
71
    """
72
    try:
73
      server.finish_request(request, client_address)
74
      server.close_request(request)
75
    except: # pylint: disable-msg=W0702
76
      server.handle_error(request, client_address)
77
      server.close_request(request)
78

    
79

    
80
class IOServer(SocketServer.UnixStreamServer):
81
  """IO thread class.
82

    
83
  This class takes care of initializing the other threads, setting
84
  signal handlers (which are processed only in this thread), and doing
85
  cleanup at shutdown.
86

    
87
  """
88
  def __init__(self, address, rqhandler):
89
    """IOServer constructor
90

    
91
    @param address: the address to bind this IOServer to
92
    @param rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104
                                                 ClientRequestWorker)
105

    
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

    
109
    """
110
    self.request_workers.AddTask(self, request, client_address)
111

    
112
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
113
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
114
    """Handle one request at a time until told to quit."""
115
    assert isinstance(signal_handlers, dict) and \
116
           len(signal_handlers) > 0, \
117
           "Broken SignalHandled decorator"
118
    # Since we use SignalHandled only once, the resulting dict will map all
119
    # signals to the same handler. We'll just use the first one.
120
    sighandler = signal_handlers.values()[0]
121
    while not sighandler.called:
122
      self.handle_request()
123

    
124
  def server_cleanup(self):
125
    """Cleanup the server.
126

    
127
    This involves shutting down the processor threads and the master
128
    socket.
129

    
130
    """
131
    try:
132
      self.server_close()
133
    finally:
134
      if self.request_workers:
135
        self.request_workers.TerminateWorkers()
136
      if self.context:
137
        self.context.jobqueue.Shutdown()
138

    
139

    
140
class ClientRqHandler(SocketServer.BaseRequestHandler):
141
  """Client handler"""
142
  EOM = '\3'
143
  READ_SIZE = 4096
144

    
145
  def setup(self):
146
    # pylint: disable-msg=W0201
147
    # setup() is the api for initialising for this class
148
    self._buffer = ""
149
    self._msgs = collections.deque()
150
    self._ops = ClientOps(self.server)
151

    
152
  def handle(self):
153
    while True:
154
      msg = self.read_message()
155
      if msg is None:
156
        logging.debug("client closed connection")
157
        break
158

    
159
      (method, args) = luxi.ParseRequest(msg)
160

    
161
      success = False
162
      try:
163
        result = self._ops.handle_request(method, args)
164
        success = True
165
      except errors.GenericError, err:
166
        result = errors.EncodeException(err)
167
      except:
168
        logging.error("Unexpected exception", exc_info=True)
169
        result = "Caught exception: %s" % str(sys.exc_info()[1])
170

    
171
      self.send_message(luxi.FormatResponse(success, result))
172

    
173
  def read_message(self):
174
    while not self._msgs:
175
      data = self.request.recv(self.READ_SIZE)
176
      if not data:
177
        return None
178
      new_msgs = (self._buffer + data).split(self.EOM)
179
      self._buffer = new_msgs.pop()
180
      self._msgs.extend(new_msgs)
181
    return self._msgs.popleft()
182

    
183
  def send_message(self, msg):
184
    # TODO: sendall is not guaranteed to send everything
185
    self.request.sendall(msg + self.EOM)
186

    
187

    
188
class ClientOps:
189
  """Class holding high-level client operations."""
190
  def __init__(self, server):
191
    self.server = server
192

    
193
  def handle_request(self, method, args): # pylint: disable-msg=R0911
194
    queue = self.server.context.jobqueue
195

    
196
    # TODO: Parameter validation
197

    
198
    # TODO: Rewrite to not exit in each 'if/elif' branch
199

    
200
    if method == luxi.REQ_SUBMIT_JOB:
201
      logging.info("Received new job")
202
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
203
      return queue.SubmitJob(ops)
204

    
205
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
206
      logging.info("Received multiple jobs")
207
      jobs = []
208
      for ops in args:
209
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
210
      return queue.SubmitManyJobs(jobs)
211

    
212
    elif method == luxi.REQ_CANCEL_JOB:
213
      job_id = args
214
      logging.info("Received job cancel request for %s", job_id)
215
      return queue.CancelJob(job_id)
216

    
217
    elif method == luxi.REQ_ARCHIVE_JOB:
218
      job_id = args
219
      logging.info("Received job archive request for %s", job_id)
220
      return queue.ArchiveJob(job_id)
221

    
222
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
223
      (age, timeout) = args
224
      logging.info("Received job autoarchive request for age %s, timeout %s",
225
                   age, timeout)
226
      return queue.AutoArchiveJobs(age, timeout)
227

    
228
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
229
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
230
      logging.info("Received job poll request for %s", job_id)
231
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
232
                                     prev_log_serial, timeout)
233

    
234
    elif method == luxi.REQ_QUERY_JOBS:
235
      (job_ids, fields) = args
236
      if isinstance(job_ids, (tuple, list)) and job_ids:
237
        msg = utils.CommaJoin(job_ids)
238
      else:
239
        msg = str(job_ids)
240
      logging.info("Received job query request for %s", msg)
241
      return queue.QueryJobs(job_ids, fields)
242

    
243
    elif method == luxi.REQ_QUERY_INSTANCES:
244
      (names, fields, use_locking) = args
245
      logging.info("Received instance query request for %s", names)
246
      if use_locking:
247
        raise errors.OpPrereqError("Sync queries are not allowed",
248
                                   errors.ECODE_INVAL)
249
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
250
                                    use_locking=use_locking)
251
      return self._Query(op)
252

    
253
    elif method == luxi.REQ_QUERY_NODES:
254
      (names, fields, use_locking) = args
255
      logging.info("Received node query request for %s", names)
256
      if use_locking:
257
        raise errors.OpPrereqError("Sync queries are not allowed",
258
                                   errors.ECODE_INVAL)
259
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
260
                                use_locking=use_locking)
261
      return self._Query(op)
262

    
263
    elif method == luxi.REQ_QUERY_EXPORTS:
264
      nodes, use_locking = args
265
      if use_locking:
266
        raise errors.OpPrereqError("Sync queries are not allowed",
267
                                   errors.ECODE_INVAL)
268
      logging.info("Received exports query request")
269
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
270
      return self._Query(op)
271

    
272
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
273
      fields = args
274
      logging.info("Received config values query request for %s", fields)
275
      op = opcodes.OpQueryConfigValues(output_fields=fields)
276
      return self._Query(op)
277

    
278
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
279
      logging.info("Received cluster info query request")
280
      op = opcodes.OpQueryClusterInfo()
281
      return self._Query(op)
282

    
283
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
284
      drain_flag = args
285
      logging.info("Received queue drain flag change request to %s",
286
                   drain_flag)
287
      return queue.SetDrainFlag(drain_flag)
288

    
289
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
290
      (until, ) = args
291

    
292
      if until is None:
293
        logging.info("Received request to no longer pause the watcher")
294
      else:
295
        if not isinstance(until, (int, float)):
296
          raise TypeError("Duration must be an integer or float")
297

    
298
        if until < time.time():
299
          raise errors.GenericError("Unable to set pause end time in the past")
300

    
301
        logging.info("Received request to pause the watcher until %s", until)
302

    
303
      return _SetWatcherPause(until)
304

    
305
    else:
306
      logging.info("Received invalid request '%s'", method)
307
      raise ValueError("Invalid operation '%s'" % method)
308

    
309
  def _Query(self, op):
310
    """Runs the specified opcode and returns the result.
311

    
312
    """
313
    # Queries don't have a job id
314
    proc = mcpu.Processor(self.server.context, None)
315
    return proc.ExecOpCode(op, None)
316

    
317

    
318
class GanetiContext(object):
319
  """Context common to all ganeti threads.
320

    
321
  This class creates and holds common objects shared by all threads.
322

    
323
  """
324
  # pylint: disable-msg=W0212
325
  # we do want to ensure a singleton here
326
  _instance = None
327

    
328
  def __init__(self):
329
    """Constructs a new GanetiContext object.
330

    
331
    There should be only a GanetiContext object at any time, so this
332
    function raises an error if this is not the case.
333

    
334
    """
335
    assert self.__class__._instance is None, "double GanetiContext instance"
336

    
337
    # Create global configuration object
338
    self.cfg = config.ConfigWriter()
339

    
340
    # Locking manager
341
    self.glm = locking.GanetiLockManager(
342
                self.cfg.GetNodeList(),
343
                self.cfg.GetInstanceList())
344

    
345
    # Job queue
346
    self.jobqueue = jqueue.JobQueue(self)
347

    
348
    # setting this also locks the class against attribute modifications
349
    self.__class__._instance = self
350

    
351
  def __setattr__(self, name, value):
352
    """Setting GanetiContext attributes is forbidden after initialization.
353

    
354
    """
355
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
356
    object.__setattr__(self, name, value)
357

    
358
  def AddNode(self, node, ec_id):
359
    """Adds a node to the configuration and lock manager.
360

    
361
    """
362
    # Add it to the configuration
363
    self.cfg.AddNode(node, ec_id)
364

    
365
    # If preseeding fails it'll not be added
366
    self.jobqueue.AddNode(node)
367

    
368
    # Add the new node to the Ganeti Lock Manager
369
    self.glm.add(locking.LEVEL_NODE, node.name)
370

    
371
  def ReaddNode(self, node):
372
    """Updates a node that's already in the configuration
373

    
374
    """
375
    # Synchronize the queue again
376
    self.jobqueue.AddNode(node)
377

    
378
  def RemoveNode(self, name):
379
    """Removes a node from the configuration and lock manager.
380

    
381
    """
382
    # Remove node from configuration
383
    self.cfg.RemoveNode(name)
384

    
385
    # Notify job queue
386
    self.jobqueue.RemoveNode(name)
387

    
388
    # Remove the node from the Ganeti Lock Manager
389
    self.glm.remove(locking.LEVEL_NODE, name)
390

    
391

    
392
def _SetWatcherPause(until):
393
  """Creates or removes the watcher pause file.
394

    
395
  @type until: None or int
396
  @param until: Unix timestamp saying until when the watcher shouldn't run
397

    
398
  """
399
  if until is None:
400
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
401
  else:
402
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
403
                    data="%d\n" % (until, ))
404

    
405
  return until
406

    
407

    
408
def CheckAgreement():
409
  """Check the agreement on who is the master.
410

    
411
  The function uses a very simple algorithm: we must get more positive
412
  than negative answers. Since in most of the cases we are the master,
413
  we'll use our own config file for getting the node list. In the
414
  future we could collect the current node list from our (possibly
415
  obsolete) known nodes.
416

    
417
  In order to account for cold-start of all nodes, we retry for up to
418
  a minute until we get a real answer as the top-voted one. If the
419
  nodes are more out-of-sync, for now manual startup of the master
420
  should be attempted.
421

    
422
  Note that for a even number of nodes cluster, we need at least half
423
  of the nodes (beside ourselves) to vote for us. This creates a
424
  problem on two-node clusters, since in this case we require the
425
  other node to be up too to confirm our status.
426

    
427
  """
428
  myself = utils.HostInfo().name
429
  #temp instantiation of a config writer, used only to get the node list
430
  cfg = config.ConfigWriter()
431
  node_list = cfg.GetNodeList()
432
  del cfg
433
  retries = 6
434
  while retries > 0:
435
    votes = bootstrap.GatherMasterVotes(node_list)
436
    if not votes:
437
      # empty node list, this is a one node cluster
438
      return True
439
    if votes[0][0] is None:
440
      retries -= 1
441
      time.sleep(10)
442
      continue
443
    break
444
  if retries == 0:
445
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
446
                     " after multiple retries. Aborting startup")
447
    return False
448
  # here a real node is at the top of the list
449
  all_votes = sum(item[1] for item in votes)
450
  top_node, top_votes = votes[0]
451

    
452
  result = False
453
  if top_node != myself:
454
    logging.critical("It seems we are not the master (top-voted node"
455
                     " is %s with %d out of %d votes)", top_node, top_votes,
456
                     all_votes)
457
  elif top_votes < all_votes - top_votes:
458
    logging.critical("It seems we are not the master (%d votes for,"
459
                     " %d votes against)", top_votes, all_votes - top_votes)
460
  else:
461
    result = True
462

    
463
  return result
464

    
465

    
466
def CheckAgreementWithRpc():
467
  rpc.Init()
468
  try:
469
    return CheckAgreement()
470
  finally:
471
    rpc.Shutdown()
472

    
473

    
474
def _RunInSeparateProcess(fn):
475
  """Runs a function in a separate process.
476

    
477
  Note: Only boolean return values are supported.
478

    
479
  @type fn: callable
480
  @param fn: Function to be called
481
  @rtype: bool
482

    
483
  """
484
  pid = os.fork()
485
  if pid == 0:
486
    # Child process
487
    try:
488
      # Call function
489
      result = int(bool(fn()))
490
      assert result in (0, 1)
491
    except: # pylint: disable-msg=W0702
492
      logging.exception("Error while calling function in separate process")
493
      # 0 and 1 are reserved for the return value
494
      result = 33
495

    
496
    os._exit(result) # pylint: disable-msg=W0212
497

    
498
  # Parent process
499

    
500
  # Avoid zombies and check exit code
501
  (_, status) = os.waitpid(pid, 0)
502

    
503
  if os.WIFSIGNALED(status):
504
    signum = os.WTERMSIG(status)
505
    exitcode = None
506
  else:
507
    signum = None
508
    exitcode = os.WEXITSTATUS(status)
509

    
510
  if not (exitcode in (0, 1) and signum is None):
511
    logging.error("Child program failed (code=%s, signal=%s)",
512
                  exitcode, signum)
513
    sys.exit(constants.EXIT_FAILURE)
514

    
515
  return bool(exitcode)
516

    
517

    
518
def CheckMasterd(options, args):
519
  """Initial checks whether to run or exit with a failure.
520

    
521
  """
522
  if args: # masterd doesn't take any arguments
523
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
524
    sys.exit(constants.EXIT_FAILURE)
525

    
526
  ssconf.CheckMaster(options.debug)
527

    
528
  # If CheckMaster didn't fail we believe we are the master, but we have to
529
  # confirm with the other nodes.
530
  if options.no_voting:
531
    if options.yes_do_it:
532
      return
533

    
534
    sys.stdout.write("The 'no voting' option has been selected.\n")
535
    sys.stdout.write("This is dangerous, please confirm by"
536
                     " typing uppercase 'yes': ")
537
    sys.stdout.flush()
538

    
539
    confirmation = sys.stdin.readline().strip()
540
    if confirmation != "YES":
541
      print >> sys.stderr, "Aborting."
542
      sys.exit(constants.EXIT_FAILURE)
543

    
544
    return
545

    
546
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
547
  # process before we call utils.Daemonize in the current process.
548
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
549
    sys.exit(constants.EXIT_FAILURE)
550

    
551

    
552
def ExecMasterd (options, args): # pylint: disable-msg=W0613
553
  """Main master daemon function, executed with the PID file held.
554

    
555
  """
556
  # This is safe to do as the pid file guarantees against
557
  # concurrent execution.
558
  utils.RemoveFile(constants.MASTER_SOCKET)
559

    
560
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
561
  try:
562
    rpc.Init()
563
    try:
564
      # activate ip
565
      master_node = ssconf.SimpleStore().GetMasterNode()
566
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
567
      msg = result.fail_msg
568
      if msg:
569
        logging.error("Can't activate master IP address: %s", msg)
570

    
571
      master.setup_queue()
572
      try:
573
        master.serve_forever()
574
      finally:
575
        master.server_cleanup()
576
    finally:
577
      rpc.Shutdown()
578
  finally:
579
    utils.RemoveFile(constants.MASTER_SOCKET)
580

    
581

    
582
def main():
583
  """Main function"""
584
  parser = OptionParser(description="Ganeti master daemon",
585
                        usage="%prog [-f] [-d]",
586
                        version="%%prog (ganeti) %s" %
587
                        constants.RELEASE_VERSION)
588
  parser.add_option("--no-voting", dest="no_voting",
589
                    help="Do not check that the nodes agree on this node"
590
                    " being the master and start the daemon unconditionally",
591
                    default=False, action="store_true")
592
  parser.add_option("--yes-do-it", dest="yes_do_it",
593
                    help="Override interactive check for --no-voting",
594
                    default=False, action="store_true")
595
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
596
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
597
         ]
598
  daemon.GenericMain(constants.MASTERD, parser, dirs,
599
                     CheckMasterd, ExecMasterd,
600
                     multithreaded=True)
601

    
602

    
603
if __name__ == "__main__":
604
  main()