Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ e75d9a84

History | View | Annotate | Download (24 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62

    
63

    
64
CLIENT_REQUEST_WORKERS = 16
65

    
66
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68

    
69

    
70
class ClientRequestWorker(workerpool.BaseWorker):
71
  # pylint: disable=W0221
72
  def RunTask(self, server, message, client):
73
    """Process the request.
74

75
    """
76
    client_ops = ClientOps(server)
77

    
78
    try:
79
      (method, args, version) = luxi.ParseRequest(message)
80
    except luxi.ProtocolError, err:
81
      logging.error("Protocol Error: %s", err)
82
      client.close_log()
83
      return
84

    
85
    success = False
86
    try:
87
      # Verify client's version if there was one in the request
88
      if version is not None and version != constants.LUXI_VERSION:
89
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
90
                               (constants.LUXI_VERSION, version))
91

    
92
      result = client_ops.handle_request(method, args)
93
      success = True
94
    except errors.GenericError, err:
95
      logging.exception("Unexpected exception")
96
      success = False
97
      result = errors.EncodeException(err)
98
    except:
99
      logging.exception("Unexpected exception")
100
      err = sys.exc_info()
101
      result = "Caught exception: %s" % str(err[1])
102

    
103
    try:
104
      reply = luxi.FormatResponse(success, result)
105
      client.send_message(reply)
106
      # awake the main thread so that it can write out the data.
107
      server.awaker.signal()
108
    except: # pylint: disable=W0702
109
      logging.exception("Send error")
110
      client.close_log()
111

    
112

    
113
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
114
  """Handler for master peers.
115

116
  """
117
  _MAX_UNHANDLED = 1
118

    
119
  def __init__(self, server, connected_socket, client_address, family):
120
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
121
                                                 client_address,
122
                                                 constants.LUXI_EOM,
123
                                                 family, self._MAX_UNHANDLED)
124
    self.server = server
125

    
126
  def handle_message(self, message, _):
127
    self.server.request_workers.AddTask((self.server, message, self))
128

    
129

    
130
class _MasterShutdownCheck:
131
  """Logic for master daemon shutdown.
132

133
  """
134
  #: How long to wait between checks
135
  _CHECK_INTERVAL = 5.0
136

    
137
  #: How long to wait after all jobs are done (e.g. to give clients time to
138
  #: retrieve the job status)
139
  _SHUTDOWN_LINGER = 5.0
140

    
141
  def __init__(self):
142
    """Initializes this class.
143

144
    """
145
    self._had_active_jobs = None
146
    self._linger_timeout = None
147

    
148
  def __call__(self, jq_prepare_result):
149
    """Determines if master daemon is ready for shutdown.
150

151
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
152
    @rtype: None or number
153
    @return: None if master daemon is ready, timeout if the check must be
154
             repeated
155

156
    """
157
    if jq_prepare_result:
158
      # Check again shortly
159
      logging.info("Job queue has been notified for shutdown but is still"
160
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
161
      self._had_active_jobs = True
162
      return self._CHECK_INTERVAL
163

    
164
    if not self._had_active_jobs:
165
      # Can shut down as there were no active jobs on the first check
166
      return None
167

    
168
    # No jobs are running anymore, but maybe some clients want to collect some
169
    # information. Give them a short amount of time.
170
    if self._linger_timeout is None:
171
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
172

    
173
    remaining = self._linger_timeout.Remaining()
174

    
175
    logging.info("Job queue no longer busy; shutting down master daemon"
176
                 " in %s seconds", remaining)
177

    
178
    # TODO: Should the master daemon socket be closed at this point? Doing so
179
    # wouldn't affect existing connections.
180

    
181
    if remaining < 0:
182
      return None
183
    else:
184
      return remaining
185

    
186

    
187
class MasterServer(daemon.AsyncStreamServer):
188
  """Master Server.
189

190
  This is the main asynchronous master server. It handles connections to the
191
  master socket.
192

193
  """
194
  family = socket.AF_UNIX
195

    
196
  def __init__(self, address, uid, gid):
197
    """MasterServer constructor
198

199
    @param address: the unix socket address to bind the MasterServer to
200
    @param uid: The uid of the owner of the socket
201
    @param gid: The gid of the owner of the socket
202

203
    """
204
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
205
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
206
    os.chmod(temp_name, 0770)
207
    os.chown(temp_name, uid, gid)
208
    os.rename(temp_name, address)
209

    
210
    self.awaker = daemon.AsyncAwaker()
211

    
212
    # We'll only start threads once we've forked.
213
    self.context = None
214
    self.request_workers = None
215

    
216
    self._shutdown_check = None
217

    
218
  def handle_connection(self, connected_socket, client_address):
219
    # TODO: add connection count and limit the number of open connections to a
220
    # maximum number to avoid breaking for lack of file descriptors or memory.
221
    MasterClientHandler(self, connected_socket, client_address, self.family)
222

    
223
  def setup_queue(self):
224
    self.context = GanetiContext()
225
    self.request_workers = workerpool.WorkerPool("ClientReq",
226
                                                 CLIENT_REQUEST_WORKERS,
227
                                                 ClientRequestWorker)
228

    
229
  def WaitForShutdown(self):
230
    """Prepares server for shutdown.
231

232
    """
233
    if self._shutdown_check is None:
234
      self._shutdown_check = _MasterShutdownCheck()
235

    
236
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
237

    
238
  def server_cleanup(self):
239
    """Cleanup the server.
240

241
    This involves shutting down the processor threads and the master
242
    socket.
243

244
    """
245
    try:
246
      self.close()
247
    finally:
248
      if self.request_workers:
249
        self.request_workers.TerminateWorkers()
250
      if self.context:
251
        self.context.jobqueue.Shutdown()
252

    
253

    
254
class ClientOps:
255
  """Class holding high-level client operations."""
256
  def __init__(self, server):
257
    self.server = server
258

    
259
  def handle_request(self, method, args): # pylint: disable=R0911
260
    context = self.server.context
261
    queue = context.jobqueue
262

    
263
    # TODO: Parameter validation
264
    if not isinstance(args, (tuple, list)):
265
      logging.info("Received invalid arguments of type '%s'", type(args))
266
      raise ValueError("Invalid arguments type '%s'" % type(args))
267

    
268
    # TODO: Rewrite to not exit in each 'if/elif' branch
269

    
270
    if method == luxi.REQ_SUBMIT_JOB:
271
      logging.info("Received new job")
272
      (job_def, ) = args
273
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
274
      return queue.SubmitJob(ops)
275

    
276
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
277
      logging.info("Received multiple jobs")
278
      (job_defs, ) = args
279
      jobs = []
280
      for ops in job_defs:
281
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
282
      return queue.SubmitManyJobs(jobs)
283

    
284
    elif method == luxi.REQ_CANCEL_JOB:
285
      (job_id, ) = args
286
      logging.info("Received job cancel request for %s", job_id)
287
      return queue.CancelJob(job_id)
288

    
289
    elif method == luxi.REQ_ARCHIVE_JOB:
290
      (job_id, ) = args
291
      logging.info("Received job archive request for %s", job_id)
292
      return queue.ArchiveJob(job_id)
293

    
294
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
295
      (age, timeout) = args
296
      logging.info("Received job autoarchive request for age %s, timeout %s",
297
                   age, timeout)
298
      return queue.AutoArchiveJobs(age, timeout)
299

    
300
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
301
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
302
      logging.info("Received job poll request for %s", job_id)
303
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
304
                                     prev_log_serial, timeout)
305

    
306
    elif method == luxi.REQ_QUERY:
307
      (what, fields, qfilter) = args
308

    
309
      if what in constants.QR_VIA_OP:
310
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
311
                                             qfilter=qfilter))
312
      elif what == constants.QR_LOCK:
313
        if qfilter is not None:
314
          raise errors.OpPrereqError("Lock queries can't be filtered",
315
                                     errors.ECODE_INVAL)
316
        return context.glm.QueryLocks(fields)
317
      elif what == constants.QR_JOB:
318
        return queue.QueryJobs(fields, qfilter)
319
      elif what in constants.QR_VIA_LUXI:
320
        raise NotImplementedError
321
      else:
322
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
323
                                   errors.ECODE_INVAL)
324

    
325
      return result
326

    
327
    elif method == luxi.REQ_QUERY_FIELDS:
328
      (what, fields) = args
329
      req = objects.QueryFieldsRequest(what=what, fields=fields)
330

    
331
      try:
332
        fielddefs = query.ALL_FIELDS[req.what]
333
      except KeyError:
334
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
335
                                   errors.ECODE_INVAL)
336

    
337
      return query.QueryFields(fielddefs, req.fields)
338

    
339
    elif method == luxi.REQ_QUERY_JOBS:
340
      (job_ids, fields) = args
341
      if isinstance(job_ids, (tuple, list)) and job_ids:
342
        msg = utils.CommaJoin(job_ids)
343
      else:
344
        msg = str(job_ids)
345
      logging.info("Received job query request for %s", msg)
346
      return queue.OldStyleQueryJobs(job_ids, fields)
347

    
348
    elif method == luxi.REQ_QUERY_INSTANCES:
349
      (names, fields, use_locking) = args
350
      logging.info("Received instance query request for %s", names)
351
      if use_locking:
352
        raise errors.OpPrereqError("Sync queries are not allowed",
353
                                   errors.ECODE_INVAL)
354
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
355
                                   use_locking=use_locking)
356
      return self._Query(op)
357

    
358
    elif method == luxi.REQ_QUERY_NODES:
359
      (names, fields, use_locking) = args
360
      logging.info("Received node query request for %s", names)
361
      if use_locking:
362
        raise errors.OpPrereqError("Sync queries are not allowed",
363
                                   errors.ECODE_INVAL)
364
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
365
                               use_locking=use_locking)
366
      return self._Query(op)
367

    
368
    elif method == luxi.REQ_QUERY_GROUPS:
369
      (names, fields, use_locking) = args
370
      logging.info("Received group query request for %s", names)
371
      if use_locking:
372
        raise errors.OpPrereqError("Sync queries are not allowed",
373
                                   errors.ECODE_INVAL)
374
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
375
      return self._Query(op)
376

    
377
    elif method == luxi.REQ_QUERY_NETWORKS:
378
      (names, fields, use_locking) = args
379
      logging.info("Received network query request for %s", names)
380
      if use_locking:
381
        raise errors.OpPrereqError("Sync queries are not allowed",
382
                                   errors.ECODE_INVAL)
383
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
384
      return self._Query(op)
385

    
386
    elif method == luxi.REQ_QUERY_EXPORTS:
387
      (nodes, use_locking) = args
388
      if use_locking:
389
        raise errors.OpPrereqError("Sync queries are not allowed",
390
                                   errors.ECODE_INVAL)
391
      logging.info("Received exports query request")
392
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
393
      return self._Query(op)
394

    
395
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
396
      (fields, ) = args
397
      logging.info("Received config values query request for %s", fields)
398
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
399
      return self._Query(op)
400

    
401
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
402
      logging.info("Received cluster info query request")
403
      op = opcodes.OpClusterQuery()
404
      return self._Query(op)
405

    
406
    elif method == luxi.REQ_QUERY_TAGS:
407
      (kind, name) = args
408
      logging.info("Received tags query request")
409
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
410
      return self._Query(op)
411

    
412
    elif method == luxi.REQ_SET_DRAIN_FLAG:
413
      (drain_flag, ) = args
414
      logging.info("Received queue drain flag change request to %s",
415
                   drain_flag)
416
      return queue.SetDrainFlag(drain_flag)
417

    
418
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
419
      (until, ) = args
420

    
421
      if until is None:
422
        logging.info("Received request to no longer pause the watcher")
423
      else:
424
        if not isinstance(until, (int, float)):
425
          raise TypeError("Duration must be an integer or float")
426

    
427
        if until < time.time():
428
          raise errors.GenericError("Unable to set pause end time in the past")
429

    
430
        logging.info("Received request to pause the watcher until %s", until)
431

    
432
      return _SetWatcherPause(until)
433

    
434
    else:
435
      logging.info("Received invalid request '%s'", method)
436
      raise ValueError("Invalid operation '%s'" % method)
437

    
438
  def _Query(self, op):
439
    """Runs the specified opcode and returns the result.
440

441
    """
442
    # Queries don't have a job id
443
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
444

    
445
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
446
    # Consider using a timeout for retries.
447
    return proc.ExecOpCode(op, None)
448

    
449

    
450
class GanetiContext(object):
451
  """Context common to all ganeti threads.
452

453
  This class creates and holds common objects shared by all threads.
454

455
  """
456
  # pylint: disable=W0212
457
  # we do want to ensure a singleton here
458
  _instance = None
459

    
460
  def __init__(self):
461
    """Constructs a new GanetiContext object.
462

463
    There should be only a GanetiContext object at any time, so this
464
    function raises an error if this is not the case.
465

466
    """
467
    assert self.__class__._instance is None, "double GanetiContext instance"
468

    
469
    # Create global configuration object
470
    self.cfg = config.ConfigWriter()
471

    
472
    # Locking manager
473
    self.glm = locking.GanetiLockManager(
474
      self.cfg.GetNodeList(),
475
      self.cfg.GetNodeGroupList(),
476
      self.cfg.GetInstanceList(),
477
      self.cfg.GetNetworkList())
478

    
479
    self.cfg.SetContext(self)
480

    
481
    # RPC runner
482
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
483

    
484
    # Job queue
485
    self.jobqueue = jqueue.JobQueue(self)
486

    
487
    # setting this also locks the class against attribute modifications
488
    self.__class__._instance = self
489

    
490
  def __setattr__(self, name, value):
491
    """Setting GanetiContext attributes is forbidden after initialization.
492

493
    """
494
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
495
    object.__setattr__(self, name, value)
496

    
497
  def AddNode(self, node, ec_id):
498
    """Adds a node to the configuration and lock manager.
499

500
    """
501
    # Add it to the configuration
502
    self.cfg.AddNode(node, ec_id)
503

    
504
    # If preseeding fails it'll not be added
505
    self.jobqueue.AddNode(node)
506

    
507
    # Add the new node to the Ganeti Lock Manager
508
    self.glm.add(locking.LEVEL_NODE, node.name)
509
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
510

    
511
  def ReaddNode(self, node):
512
    """Updates a node that's already in the configuration
513

514
    """
515
    # Synchronize the queue again
516
    self.jobqueue.AddNode(node)
517

    
518
  def RemoveNode(self, name):
519
    """Removes a node from the configuration and lock manager.
520

521
    """
522
    # Remove node from configuration
523
    self.cfg.RemoveNode(name)
524

    
525
    # Notify job queue
526
    self.jobqueue.RemoveNode(name)
527

    
528
    # Remove the node from the Ganeti Lock Manager
529
    self.glm.remove(locking.LEVEL_NODE, name)
530
    self.glm.remove(locking.LEVEL_NODE_RES, name)
531

    
532

    
533
def _SetWatcherPause(until):
534
  """Creates or removes the watcher pause file.
535

536
  @type until: None or int
537
  @param until: Unix timestamp saying until when the watcher shouldn't run
538

539
  """
540
  if until is None:
541
    utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
542
  else:
543
    utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
544
                    data="%d\n" % (until, ))
545

    
546
  return until
547

    
548

    
549
@rpc.RunWithRPC
550
def CheckAgreement():
551
  """Check the agreement on who is the master.
552

553
  The function uses a very simple algorithm: we must get more positive
554
  than negative answers. Since in most of the cases we are the master,
555
  we'll use our own config file for getting the node list. In the
556
  future we could collect the current node list from our (possibly
557
  obsolete) known nodes.
558

559
  In order to account for cold-start of all nodes, we retry for up to
560
  a minute until we get a real answer as the top-voted one. If the
561
  nodes are more out-of-sync, for now manual startup of the master
562
  should be attempted.
563

564
  Note that for a even number of nodes cluster, we need at least half
565
  of the nodes (beside ourselves) to vote for us. This creates a
566
  problem on two-node clusters, since in this case we require the
567
  other node to be up too to confirm our status.
568

569
  """
570
  myself = netutils.Hostname.GetSysName()
571
  #temp instantiation of a config writer, used only to get the node list
572
  cfg = config.ConfigWriter()
573
  node_list = cfg.GetNodeList()
574
  del cfg
575
  retries = 6
576
  while retries > 0:
577
    votes = bootstrap.GatherMasterVotes(node_list)
578
    if not votes:
579
      # empty node list, this is a one node cluster
580
      return True
581
    if votes[0][0] is None:
582
      retries -= 1
583
      time.sleep(10)
584
      continue
585
    break
586
  if retries == 0:
587
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
588
                     " after multiple retries. Aborting startup")
589
    logging.critical("Use the --no-voting option if you understand what"
590
                     " effects it has on the cluster state")
591
    return False
592
  # here a real node is at the top of the list
593
  all_votes = sum(item[1] for item in votes)
594
  top_node, top_votes = votes[0]
595

    
596
  result = False
597
  if top_node != myself:
598
    logging.critical("It seems we are not the master (top-voted node"
599
                     " is %s with %d out of %d votes)", top_node, top_votes,
600
                     all_votes)
601
  elif top_votes < all_votes - top_votes:
602
    logging.critical("It seems we are not the master (%d votes for,"
603
                     " %d votes against)", top_votes, all_votes - top_votes)
604
  else:
605
    result = True
606

    
607
  return result
608

    
609

    
610
@rpc.RunWithRPC
611
def ActivateMasterIP():
612
  # activate ip
613
  cfg = config.ConfigWriter()
614
  master_params = cfg.GetMasterNetworkParameters()
615
  ems = cfg.GetUseExternalMipScript()
616
  runner = rpc.BootstrapRunner()
617
  result = runner.call_node_activate_master_ip(master_params.name,
618
                                               master_params, ems)
619

    
620
  msg = result.fail_msg
621
  if msg:
622
    logging.error("Can't activate master IP address: %s", msg)
623

    
624

    
625
def CheckMasterd(options, args):
626
  """Initial checks whether to run or exit with a failure.
627

628
  """
629
  if args: # masterd doesn't take any arguments
630
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
631
    sys.exit(constants.EXIT_FAILURE)
632

    
633
  ssconf.CheckMaster(options.debug)
634

    
635
  try:
636
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
637
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
638
  except KeyError:
639
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
640
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
641
    sys.exit(constants.EXIT_FAILURE)
642

    
643
  # Determine static runtime architecture information
644
  runtime.InitArchInfo()
645

    
646
  # Check the configuration is sane before anything else
647
  try:
648
    config.ConfigWriter()
649
  except errors.ConfigVersionMismatch, err:
650
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
651
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
652
    print >> sys.stderr,  \
653
        ("Configuration version mismatch. The current Ganeti software"
654
         " expects version %s, but the on-disk configuration file has"
655
         " version %s. This is likely the result of upgrading the"
656
         " software without running the upgrade procedure. Please contact"
657
         " your cluster administrator or complete the upgrade using the"
658
         " cfgupgrade utility, after reading the upgrade notes." %
659
         (v1, v2))
660
    sys.exit(constants.EXIT_FAILURE)
661
  except errors.ConfigurationError, err:
662
    print >> sys.stderr, \
663
        ("Configuration error while opening the configuration file: %s\n"
664
         "This might be caused by an incomplete software upgrade or"
665
         " by a corrupted configuration file. Until the problem is fixed"
666
         " the master daemon cannot start." % str(err))
667
    sys.exit(constants.EXIT_FAILURE)
668

    
669
  # If CheckMaster didn't fail we believe we are the master, but we have to
670
  # confirm with the other nodes.
671
  if options.no_voting:
672
    if not options.yes_do_it:
673
      sys.stdout.write("The 'no voting' option has been selected.\n")
674
      sys.stdout.write("This is dangerous, please confirm by"
675
                       " typing uppercase 'yes': ")
676
      sys.stdout.flush()
677

    
678
      confirmation = sys.stdin.readline().strip()
679
      if confirmation != "YES":
680
        print >> sys.stderr, "Aborting."
681
        sys.exit(constants.EXIT_FAILURE)
682

    
683
  else:
684
    # CheckAgreement uses RPC and threads, hence it needs to be run in
685
    # a separate process before we call utils.Daemonize in the current
686
    # process.
687
    if not utils.RunInSeparateProcess(CheckAgreement):
688
      sys.exit(constants.EXIT_FAILURE)
689

    
690
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
691
  # separate process.
692

    
693
  # TODO: decide whether failure to activate the master IP is a fatal error
694
  utils.RunInSeparateProcess(ActivateMasterIP)
695

    
696

    
697
def PrepMasterd(options, _):
698
  """Prep master daemon function, executed with the PID file held.
699

700
  """
701
  # This is safe to do as the pid file guarantees against
702
  # concurrent execution.
703
  utils.RemoveFile(pathutils.MASTER_SOCKET)
704

    
705
  mainloop = daemon.Mainloop()
706
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
707
  return (mainloop, master)
708

    
709

    
710
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
711
  """Main master daemon function, executed with the PID file held.
712

713
  """
714
  (mainloop, master) = prep_data
715
  try:
716
    rpc.Init()
717
    try:
718
      master.setup_queue()
719
      try:
720
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
721
      finally:
722
        master.server_cleanup()
723
    finally:
724
      rpc.Shutdown()
725
  finally:
726
    utils.RemoveFile(pathutils.MASTER_SOCKET)
727

    
728
  logging.info("Clean master daemon shutdown")
729

    
730

    
731
def Main():
732
  """Main function"""
733
  parser = OptionParser(description="Ganeti master daemon",
734
                        usage="%prog [-f] [-d]",
735
                        version="%%prog (ganeti) %s" %
736
                        constants.RELEASE_VERSION)
737
  parser.add_option("--no-voting", dest="no_voting",
738
                    help="Do not check that the nodes agree on this node"
739
                    " being the master and start the daemon unconditionally",
740
                    default=False, action="store_true")
741
  parser.add_option("--yes-do-it", dest="yes_do_it",
742
                    help="Override interactive check for --no-voting",
743
                    default=False, action="store_true")
744
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
745
                     ExecMasterd, multithreaded=True)