Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 4c91d2ad

History | View | Annotate | Download (24.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61

    
62

    
63
CLIENT_REQUEST_WORKERS = 16
64

    
65
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
67

    
68

    
69
def _LogNewJob(status, info, ops):
70
  """Log information about a recently submitted job.
71

72
  """
73
  if status:
74
    logging.info("New job with id %s, summary: %s",
75
                 info, utils.CommaJoin(op.Summary() for op in ops))
76
  else:
77
    logging.info("Failed to submit job, reason: '%s', summary: %s",
78
                 info, utils.CommaJoin(op.Summary() for op in ops))
79

    
80

    
81
class ClientRequestWorker(workerpool.BaseWorker):
82
  # pylint: disable=W0221
83
  def RunTask(self, server, message, client):
84
    """Process the request.
85

86
    """
87
    client_ops = ClientOps(server)
88

    
89
    try:
90
      (method, args, version) = luxi.ParseRequest(message)
91
    except luxi.ProtocolError, err:
92
      logging.error("Protocol Error: %s", err)
93
      client.close_log()
94
      return
95

    
96
    success = False
97
    try:
98
      # Verify client's version if there was one in the request
99
      if version is not None and version != constants.LUXI_VERSION:
100
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
101
                               (constants.LUXI_VERSION, version))
102

    
103
      result = client_ops.handle_request(method, args)
104
      success = True
105
    except errors.GenericError, err:
106
      logging.exception("Unexpected exception")
107
      success = False
108
      result = errors.EncodeException(err)
109
    except:
110
      logging.exception("Unexpected exception")
111
      err = sys.exc_info()
112
      result = "Caught exception: %s" % str(err[1])
113

    
114
    try:
115
      reply = luxi.FormatResponse(success, result)
116
      client.send_message(reply)
117
      # awake the main thread so that it can write out the data.
118
      server.awaker.signal()
119
    except: # pylint: disable=W0702
120
      logging.exception("Send error")
121
      client.close_log()
122

    
123

    
124
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
125
  """Handler for master peers.
126

127
  """
128
  _MAX_UNHANDLED = 1
129

    
130
  def __init__(self, server, connected_socket, client_address, family):
131
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
132
                                                 client_address,
133
                                                 constants.LUXI_EOM,
134
                                                 family, self._MAX_UNHANDLED)
135
    self.server = server
136

    
137
  def handle_message(self, message, _):
138
    self.server.request_workers.AddTask((self.server, message, self))
139

    
140

    
141
class _MasterShutdownCheck:
142
  """Logic for master daemon shutdown.
143

144
  """
145
  #: How long to wait between checks
146
  _CHECK_INTERVAL = 5.0
147

    
148
  #: How long to wait after all jobs are done (e.g. to give clients time to
149
  #: retrieve the job status)
150
  _SHUTDOWN_LINGER = 5.0
151

    
152
  def __init__(self):
153
    """Initializes this class.
154

155
    """
156
    self._had_active_jobs = None
157
    self._linger_timeout = None
158

    
159
  def __call__(self, jq_prepare_result):
160
    """Determines if master daemon is ready for shutdown.
161

162
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
163
    @rtype: None or number
164
    @return: None if master daemon is ready, timeout if the check must be
165
             repeated
166

167
    """
168
    if jq_prepare_result:
169
      # Check again shortly
170
      logging.info("Job queue has been notified for shutdown but is still"
171
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
172
      self._had_active_jobs = True
173
      return self._CHECK_INTERVAL
174

    
175
    if not self._had_active_jobs:
176
      # Can shut down as there were no active jobs on the first check
177
      return None
178

    
179
    # No jobs are running anymore, but maybe some clients want to collect some
180
    # information. Give them a short amount of time.
181
    if self._linger_timeout is None:
182
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
183

    
184
    remaining = self._linger_timeout.Remaining()
185

    
186
    logging.info("Job queue no longer busy; shutting down master daemon"
187
                 " in %s seconds", remaining)
188

    
189
    # TODO: Should the master daemon socket be closed at this point? Doing so
190
    # wouldn't affect existing connections.
191

    
192
    if remaining < 0:
193
      return None
194
    else:
195
      return remaining
196

    
197

    
198
class MasterServer(daemon.AsyncStreamServer):
199
  """Master Server.
200

201
  This is the main asynchronous master server. It handles connections to the
202
  master socket.
203

204
  """
205
  family = socket.AF_UNIX
206

    
207
  def __init__(self, address, uid, gid):
208
    """MasterServer constructor
209

210
    @param address: the unix socket address to bind the MasterServer to
211
    @param uid: The uid of the owner of the socket
212
    @param gid: The gid of the owner of the socket
213

214
    """
215
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
216
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
217
    os.chmod(temp_name, 0770)
218
    os.chown(temp_name, uid, gid)
219
    os.rename(temp_name, address)
220

    
221
    self.awaker = daemon.AsyncAwaker()
222

    
223
    # We'll only start threads once we've forked.
224
    self.context = None
225
    self.request_workers = None
226

    
227
    self._shutdown_check = None
228

    
229
  def handle_connection(self, connected_socket, client_address):
230
    # TODO: add connection count and limit the number of open connections to a
231
    # maximum number to avoid breaking for lack of file descriptors or memory.
232
    MasterClientHandler(self, connected_socket, client_address, self.family)
233

    
234
  def setup_queue(self):
235
    self.context = GanetiContext()
236
    self.request_workers = workerpool.WorkerPool("ClientReq",
237
                                                 CLIENT_REQUEST_WORKERS,
238
                                                 ClientRequestWorker)
239

    
240
  def WaitForShutdown(self):
241
    """Prepares server for shutdown.
242

243
    """
244
    if self._shutdown_check is None:
245
      self._shutdown_check = _MasterShutdownCheck()
246

    
247
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
248

    
249
  def server_cleanup(self):
250
    """Cleanup the server.
251

252
    This involves shutting down the processor threads and the master
253
    socket.
254

255
    """
256
    try:
257
      self.close()
258
    finally:
259
      if self.request_workers:
260
        self.request_workers.TerminateWorkers()
261
      if self.context:
262
        self.context.jobqueue.Shutdown()
263

    
264

    
265
class ClientOps:
266
  """Class holding high-level client operations."""
267
  def __init__(self, server):
268
    self.server = server
269

    
270
  def handle_request(self, method, args): # pylint: disable=R0911
271
    context = self.server.context
272
    queue = context.jobqueue
273

    
274
    # TODO: Parameter validation
275
    if not isinstance(args, (tuple, list)):
276
      logging.info("Received invalid arguments of type '%s'", type(args))
277
      raise ValueError("Invalid arguments type '%s'" % type(args))
278

    
279
    # TODO: Rewrite to not exit in each 'if/elif' branch
280

    
281
    if method == luxi.REQ_SUBMIT_JOB:
282
      logging.info("Receiving new job")
283
      (job_def, ) = args
284
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
285
      job_id = queue.SubmitJob(ops)
286
      _LogNewJob(True, job_id, ops)
287
      return job_id
288

    
289
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
290
      logging.info("Receiving multiple jobs")
291
      (job_defs, ) = args
292
      jobs = []
293
      for ops in job_defs:
294
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
295
      job_ids = queue.SubmitManyJobs(jobs)
296
      for ((status, job_id), ops) in zip(job_ids, jobs):
297
        _LogNewJob(status, job_id, ops)
298
      return job_ids
299

    
300
    elif method == luxi.REQ_CANCEL_JOB:
301
      (job_id, ) = args
302
      logging.info("Received job cancel request for %s", job_id)
303
      return queue.CancelJob(job_id)
304

    
305
    elif method == luxi.REQ_ARCHIVE_JOB:
306
      (job_id, ) = args
307
      logging.info("Received job archive request for %s", job_id)
308
      return queue.ArchiveJob(job_id)
309

    
310
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
311
      (age, timeout) = args
312
      logging.info("Received job autoarchive request for age %s, timeout %s",
313
                   age, timeout)
314
      return queue.AutoArchiveJobs(age, timeout)
315

    
316
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
317
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
318
      logging.info("Received job poll request for %s", job_id)
319
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
320
                                     prev_log_serial, timeout)
321

    
322
    elif method == luxi.REQ_QUERY:
323
      (what, fields, qfilter) = args
324

    
325
      if what in constants.QR_VIA_OP:
326
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
327
                                             qfilter=qfilter))
328
      elif what == constants.QR_LOCK:
329
        if qfilter is not None:
330
          raise errors.OpPrereqError("Lock queries can't be filtered")
331
        return context.glm.QueryLocks(fields)
332
      elif what == constants.QR_JOB:
333
        return queue.QueryJobs(fields, qfilter)
334
      elif what in constants.QR_VIA_LUXI:
335
        raise NotImplementedError
336
      else:
337
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
338
                                   errors.ECODE_INVAL)
339

    
340
      return result
341

    
342
    elif method == luxi.REQ_QUERY_FIELDS:
343
      (what, fields) = args
344
      req = objects.QueryFieldsRequest(what=what, fields=fields)
345

    
346
      try:
347
        fielddefs = query.ALL_FIELDS[req.what]
348
      except KeyError:
349
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
350
                                   errors.ECODE_INVAL)
351

    
352
      return query.QueryFields(fielddefs, req.fields)
353

    
354
    elif method == luxi.REQ_QUERY_JOBS:
355
      (job_ids, fields) = args
356
      if isinstance(job_ids, (tuple, list)) and job_ids:
357
        msg = utils.CommaJoin(job_ids)
358
      else:
359
        msg = str(job_ids)
360
      logging.info("Received job query request for %s", msg)
361
      return queue.OldStyleQueryJobs(job_ids, fields)
362

    
363
    elif method == luxi.REQ_QUERY_INSTANCES:
364
      (names, fields, use_locking) = args
365
      logging.info("Received instance query request for %s", names)
366
      if use_locking:
367
        raise errors.OpPrereqError("Sync queries are not allowed",
368
                                   errors.ECODE_INVAL)
369
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
370
                                   use_locking=use_locking)
371
      return self._Query(op)
372

    
373
    elif method == luxi.REQ_QUERY_NODES:
374
      (names, fields, use_locking) = args
375
      logging.info("Received node query request for %s", names)
376
      if use_locking:
377
        raise errors.OpPrereqError("Sync queries are not allowed",
378
                                   errors.ECODE_INVAL)
379
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
380
                               use_locking=use_locking)
381
      return self._Query(op)
382

    
383
    elif method == luxi.REQ_QUERY_GROUPS:
384
      (names, fields, use_locking) = args
385
      logging.info("Received group query request for %s", names)
386
      if use_locking:
387
        raise errors.OpPrereqError("Sync queries are not allowed",
388
                                   errors.ECODE_INVAL)
389
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
390
      return self._Query(op)
391

    
392
    elif method == luxi.REQ_QUERY_EXPORTS:
393
      (nodes, use_locking) = args
394
      if use_locking:
395
        raise errors.OpPrereqError("Sync queries are not allowed",
396
                                   errors.ECODE_INVAL)
397
      logging.info("Received exports query request")
398
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
399
      return self._Query(op)
400

    
401
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
402
      (fields, ) = args
403
      logging.info("Received config values query request for %s", fields)
404
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
405
      return self._Query(op)
406

    
407
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
408
      logging.info("Received cluster info query request")
409
      op = opcodes.OpClusterQuery()
410
      return self._Query(op)
411

    
412
    elif method == luxi.REQ_QUERY_TAGS:
413
      (kind, name) = args
414
      logging.info("Received tags query request")
415
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
416
      return self._Query(op)
417

    
418
    elif method == luxi.REQ_SET_DRAIN_FLAG:
419
      (drain_flag, ) = args
420
      logging.info("Received queue drain flag change request to %s",
421
                   drain_flag)
422
      return queue.SetDrainFlag(drain_flag)
423

    
424
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
425
      (until, ) = args
426

    
427
      if until is None:
428
        logging.info("Received request to no longer pause the watcher")
429
      else:
430
        if not isinstance(until, (int, float)):
431
          raise TypeError("Duration must be an integer or float")
432

    
433
        if until < time.time():
434
          raise errors.GenericError("Unable to set pause end time in the past")
435

    
436
        logging.info("Received request to pause the watcher until %s", until)
437

    
438
      return _SetWatcherPause(until)
439

    
440
    else:
441
      logging.info("Received invalid request '%s'", method)
442
      raise ValueError("Invalid operation '%s'" % method)
443

    
444
  def _Query(self, op):
445
    """Runs the specified opcode and returns the result.
446

447
    """
448
    # Queries don't have a job id
449
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
450

    
451
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
452
    # Consider using a timeout for retries.
453
    return proc.ExecOpCode(op, None)
454

    
455

    
456
class GanetiContext(object):
457
  """Context common to all ganeti threads.
458

459
  This class creates and holds common objects shared by all threads.
460

461
  """
462
  # pylint: disable=W0212
463
  # we do want to ensure a singleton here
464
  _instance = None
465

    
466
  def __init__(self):
467
    """Constructs a new GanetiContext object.
468

469
    There should be only a GanetiContext object at any time, so this
470
    function raises an error if this is not the case.
471

472
    """
473
    assert self.__class__._instance is None, "double GanetiContext instance"
474

    
475
    # Create global configuration object
476
    self.cfg = config.ConfigWriter()
477

    
478
    # Locking manager
479
    self.glm = locking.GanetiLockManager(
480
                self.cfg.GetNodeList(),
481
                self.cfg.GetNodeGroupList(),
482
                self.cfg.GetInstanceList())
483

    
484
    self.cfg.SetContext(self)
485

    
486
    # RPC runner
487
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
488

    
489
    # Job queue
490
    self.jobqueue = jqueue.JobQueue(self)
491

    
492
    # setting this also locks the class against attribute modifications
493
    self.__class__._instance = self
494

    
495
  def __setattr__(self, name, value):
496
    """Setting GanetiContext attributes is forbidden after initialization.
497

498
    """
499
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
500
    object.__setattr__(self, name, value)
501

    
502
  def AddNode(self, node, ec_id):
503
    """Adds a node to the configuration and lock manager.
504

505
    """
506
    # Add it to the configuration
507
    self.cfg.AddNode(node, ec_id)
508

    
509
    # If preseeding fails it'll not be added
510
    self.jobqueue.AddNode(node)
511

    
512
    # Add the new node to the Ganeti Lock Manager
513
    self.glm.add(locking.LEVEL_NODE, node.name)
514
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
515

    
516
  def ReaddNode(self, node):
517
    """Updates a node that's already in the configuration
518

519
    """
520
    # Synchronize the queue again
521
    self.jobqueue.AddNode(node)
522

    
523
  def RemoveNode(self, name):
524
    """Removes a node from the configuration and lock manager.
525

526
    """
527
    # Remove node from configuration
528
    self.cfg.RemoveNode(name)
529

    
530
    # Notify job queue
531
    self.jobqueue.RemoveNode(name)
532

    
533
    # Remove the node from the Ganeti Lock Manager
534
    self.glm.remove(locking.LEVEL_NODE, name)
535
    self.glm.remove(locking.LEVEL_NODE_RES, name)
536

    
537

    
538
def _SetWatcherPause(until):
539
  """Creates or removes the watcher pause file.
540

541
  @type until: None or int
542
  @param until: Unix timestamp saying until when the watcher shouldn't run
543

544
  """
545
  if until is None:
546
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
547
  else:
548
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
549
                    data="%d\n" % (until, ))
550

    
551
  return until
552

    
553

    
554
@rpc.RunWithRPC
555
def CheckAgreement():
556
  """Check the agreement on who is the master.
557

558
  The function uses a very simple algorithm: we must get more positive
559
  than negative answers. Since in most of the cases we are the master,
560
  we'll use our own config file for getting the node list. In the
561
  future we could collect the current node list from our (possibly
562
  obsolete) known nodes.
563

564
  In order to account for cold-start of all nodes, we retry for up to
565
  a minute until we get a real answer as the top-voted one. If the
566
  nodes are more out-of-sync, for now manual startup of the master
567
  should be attempted.
568

569
  Note that for a even number of nodes cluster, we need at least half
570
  of the nodes (beside ourselves) to vote for us. This creates a
571
  problem on two-node clusters, since in this case we require the
572
  other node to be up too to confirm our status.
573

574
  """
575
  myself = netutils.Hostname.GetSysName()
576
  #temp instantiation of a config writer, used only to get the node list
577
  cfg = config.ConfigWriter()
578
  node_list = cfg.GetNodeList()
579
  del cfg
580
  retries = 6
581
  while retries > 0:
582
    votes = bootstrap.GatherMasterVotes(node_list)
583
    if not votes:
584
      # empty node list, this is a one node cluster
585
      return True
586
    if votes[0][0] is None:
587
      retries -= 1
588
      time.sleep(10)
589
      continue
590
    break
591
  if retries == 0:
592
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
593
                     " after multiple retries. Aborting startup")
594
    logging.critical("Use the --no-voting option if you understand what"
595
                     " effects it has on the cluster state")
596
    return False
597
  # here a real node is at the top of the list
598
  all_votes = sum(item[1] for item in votes)
599
  top_node, top_votes = votes[0]
600

    
601
  result = False
602
  if top_node != myself:
603
    logging.critical("It seems we are not the master (top-voted node"
604
                     " is %s with %d out of %d votes)", top_node, top_votes,
605
                     all_votes)
606
  elif top_votes < all_votes - top_votes:
607
    logging.critical("It seems we are not the master (%d votes for,"
608
                     " %d votes against)", top_votes, all_votes - top_votes)
609
  else:
610
    result = True
611

    
612
  return result
613

    
614

    
615
@rpc.RunWithRPC
616
def ActivateMasterIP():
617
  # activate ip
618
  cfg = config.ConfigWriter()
619
  master_params = cfg.GetMasterNetworkParameters()
620
  ems = cfg.GetUseExternalMipScript()
621
  runner = rpc.BootstrapRunner()
622
  result = runner.call_node_activate_master_ip(master_params.name,
623
                                               master_params, ems)
624

    
625
  msg = result.fail_msg
626
  if msg:
627
    logging.error("Can't activate master IP address: %s", msg)
628

    
629

    
630
def CheckMasterd(options, args):
631
  """Initial checks whether to run or exit with a failure.
632

633
  """
634
  if args: # masterd doesn't take any arguments
635
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
636
    sys.exit(constants.EXIT_FAILURE)
637

    
638
  ssconf.CheckMaster(options.debug)
639

    
640
  try:
641
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
642
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
643
  except KeyError:
644
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
645
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
646
    sys.exit(constants.EXIT_FAILURE)
647

    
648
  # Determine static runtime architecture information
649
  runtime.InitArchInfo()
650

    
651
  # Check the configuration is sane before anything else
652
  try:
653
    config.ConfigWriter()
654
  except errors.ConfigVersionMismatch, err:
655
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
656
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
657
    print >> sys.stderr,  \
658
        ("Configuration version mismatch. The current Ganeti software"
659
         " expects version %s, but the on-disk configuration file has"
660
         " version %s. This is likely the result of upgrading the"
661
         " software without running the upgrade procedure. Please contact"
662
         " your cluster administrator or complete the upgrade using the"
663
         " cfgupgrade utility, after reading the upgrade notes." %
664
         (v1, v2))
665
    sys.exit(constants.EXIT_FAILURE)
666
  except errors.ConfigurationError, err:
667
    print >> sys.stderr, \
668
        ("Configuration error while opening the configuration file: %s\n"
669
         "This might be caused by an incomplete software upgrade or"
670
         " by a corrupted configuration file. Until the problem is fixed"
671
         " the master daemon cannot start." % str(err))
672
    sys.exit(constants.EXIT_FAILURE)
673

    
674
  # If CheckMaster didn't fail we believe we are the master, but we have to
675
  # confirm with the other nodes.
676
  if options.no_voting:
677
    if not options.yes_do_it:
678
      sys.stdout.write("The 'no voting' option has been selected.\n")
679
      sys.stdout.write("This is dangerous, please confirm by"
680
                       " typing uppercase 'yes': ")
681
      sys.stdout.flush()
682

    
683
      confirmation = sys.stdin.readline().strip()
684
      if confirmation != "YES":
685
        print >> sys.stderr, "Aborting."
686
        sys.exit(constants.EXIT_FAILURE)
687

    
688
  else:
689
    # CheckAgreement uses RPC and threads, hence it needs to be run in
690
    # a separate process before we call utils.Daemonize in the current
691
    # process.
692
    if not utils.RunInSeparateProcess(CheckAgreement):
693
      sys.exit(constants.EXIT_FAILURE)
694

    
695
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
696
  # separate process.
697

    
698
  # TODO: decide whether failure to activate the master IP is a fatal error
699
  utils.RunInSeparateProcess(ActivateMasterIP)
700

    
701

    
702
def PrepMasterd(options, _):
703
  """Prep master daemon function, executed with the PID file held.
704

705
  """
706
  # This is safe to do as the pid file guarantees against
707
  # concurrent execution.
708
  utils.RemoveFile(constants.MASTER_SOCKET)
709

    
710
  mainloop = daemon.Mainloop()
711
  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
712
  return (mainloop, master)
713

    
714

    
715
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
716
  """Main master daemon function, executed with the PID file held.
717

718
  """
719
  (mainloop, master) = prep_data
720
  try:
721
    rpc.Init()
722
    try:
723
      master.setup_queue()
724
      try:
725
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
726
      finally:
727
        master.server_cleanup()
728
    finally:
729
      rpc.Shutdown()
730
  finally:
731
    utils.RemoveFile(constants.MASTER_SOCKET)
732

    
733
  logging.info("Clean master daemon shutdown")
734

    
735

    
736
def Main():
737
  """Main function"""
738
  parser = OptionParser(description="Ganeti master daemon",
739
                        usage="%prog [-f] [-d]",
740
                        version="%%prog (ganeti) %s" %
741
                        constants.RELEASE_VERSION)
742
  parser.add_option("--no-voting", dest="no_voting",
743
                    help="Do not check that the nodes agree on this node"
744
                    " being the master and start the daemon unconditionally",
745
                    default=False, action="store_true")
746
  parser.add_option("--yes-do-it", dest="yes_do_it",
747
                    help="Override interactive check for --no-voting",
748
                    default=False, action="store_true")
749
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
750
                     ExecMasterd, multithreaded=True)