Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 909064a1

History | View | Annotate | Download (24 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61

    
62

    
63
CLIENT_REQUEST_WORKERS = 16
64

    
65
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
67

    
68

    
69
class ClientRequestWorker(workerpool.BaseWorker):
70
  # pylint: disable=W0221
71
  def RunTask(self, server, message, client):
72
    """Process the request.
73

74
    """
75
    client_ops = ClientOps(server)
76

    
77
    try:
78
      (method, args, version) = luxi.ParseRequest(message)
79
    except luxi.ProtocolError, err:
80
      logging.error("Protocol Error: %s", err)
81
      client.close_log()
82
      return
83

    
84
    success = False
85
    try:
86
      # Verify client's version if there was one in the request
87
      if version is not None and version != constants.LUXI_VERSION:
88
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
89
                               (constants.LUXI_VERSION, version))
90

    
91
      result = client_ops.handle_request(method, args)
92
      success = True
93
    except errors.GenericError, err:
94
      logging.exception("Unexpected exception")
95
      success = False
96
      result = errors.EncodeException(err)
97
    except:
98
      logging.exception("Unexpected exception")
99
      err = sys.exc_info()
100
      result = "Caught exception: %s" % str(err[1])
101

    
102
    try:
103
      reply = luxi.FormatResponse(success, result)
104
      client.send_message(reply)
105
      # awake the main thread so that it can write out the data.
106
      server.awaker.signal()
107
    except: # pylint: disable=W0702
108
      logging.exception("Send error")
109
      client.close_log()
110

    
111

    
112
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
113
  """Handler for master peers.
114

115
  """
116
  _MAX_UNHANDLED = 1
117

    
118
  def __init__(self, server, connected_socket, client_address, family):
119
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
120
                                                 client_address,
121
                                                 constants.LUXI_EOM,
122
                                                 family, self._MAX_UNHANDLED)
123
    self.server = server
124

    
125
  def handle_message(self, message, _):
126
    self.server.request_workers.AddTask((self.server, message, self))
127

    
128

    
129
class _MasterShutdownCheck:
130
  """Logic for master daemon shutdown.
131

132
  """
133
  #: How long to wait between checks
134
  _CHECK_INTERVAL = 5.0
135

    
136
  #: How long to wait after all jobs are done (e.g. to give clients time to
137
  #: retrieve the job status)
138
  _SHUTDOWN_LINGER = 5.0
139

    
140
  def __init__(self):
141
    """Initializes this class.
142

143
    """
144
    self._had_active_jobs = None
145
    self._linger_timeout = None
146

    
147
  def __call__(self, jq_prepare_result):
148
    """Determines if master daemon is ready for shutdown.
149

150
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
151
    @rtype: None or number
152
    @return: None if master daemon is ready, timeout if the check must be
153
             repeated
154

155
    """
156
    if jq_prepare_result:
157
      # Check again shortly
158
      logging.info("Job queue has been notified for shutdown but is still"
159
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
160
      self._had_active_jobs = True
161
      return self._CHECK_INTERVAL
162

    
163
    if not self._had_active_jobs:
164
      # Can shut down as there were no active jobs on the first check
165
      return None
166

    
167
    # No jobs are running anymore, but maybe some clients want to collect some
168
    # information. Give them a short amount of time.
169
    if self._linger_timeout is None:
170
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
171

    
172
    remaining = self._linger_timeout.Remaining()
173

    
174
    logging.info("Job queue no longer busy; shutting down master daemon"
175
                 " in %s seconds", remaining)
176

    
177
    # TODO: Should the master daemon socket be closed at this point? Doing so
178
    # wouldn't affect existing connections.
179

    
180
    if remaining < 0:
181
      return None
182
    else:
183
      return remaining
184

    
185

    
186
class MasterServer(daemon.AsyncStreamServer):
187
  """Master Server.
188

189
  This is the main asynchronous master server. It handles connections to the
190
  master socket.
191

192
  """
193
  family = socket.AF_UNIX
194

    
195
  def __init__(self, address, uid, gid):
196
    """MasterServer constructor
197

198
    @param address: the unix socket address to bind the MasterServer to
199
    @param uid: The uid of the owner of the socket
200
    @param gid: The gid of the owner of the socket
201

202
    """
203
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
204
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
205
    os.chmod(temp_name, 0770)
206
    os.chown(temp_name, uid, gid)
207
    os.rename(temp_name, address)
208

    
209
    self.awaker = daemon.AsyncAwaker()
210

    
211
    # We'll only start threads once we've forked.
212
    self.context = None
213
    self.request_workers = None
214

    
215
    self._shutdown_check = None
216

    
217
  def handle_connection(self, connected_socket, client_address):
218
    # TODO: add connection count and limit the number of open connections to a
219
    # maximum number to avoid breaking for lack of file descriptors or memory.
220
    MasterClientHandler(self, connected_socket, client_address, self.family)
221

    
222
  def setup_queue(self):
223
    self.context = GanetiContext()
224
    self.request_workers = workerpool.WorkerPool("ClientReq",
225
                                                 CLIENT_REQUEST_WORKERS,
226
                                                 ClientRequestWorker)
227

    
228
  def WaitForShutdown(self):
229
    """Prepares server for shutdown.
230

231
    """
232
    if self._shutdown_check is None:
233
      self._shutdown_check = _MasterShutdownCheck()
234

    
235
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
236

    
237
  def server_cleanup(self):
238
    """Cleanup the server.
239

240
    This involves shutting down the processor threads and the master
241
    socket.
242

243
    """
244
    try:
245
      self.close()
246
    finally:
247
      if self.request_workers:
248
        self.request_workers.TerminateWorkers()
249
      if self.context:
250
        self.context.jobqueue.Shutdown()
251

    
252

    
253
class ClientOps:
254
  """Class holding high-level client operations."""
255
  def __init__(self, server):
256
    self.server = server
257

    
258
  def handle_request(self, method, args): # pylint: disable=R0911
259
    context = self.server.context
260
    queue = context.jobqueue
261

    
262
    # TODO: Parameter validation
263
    if not isinstance(args, (tuple, list)):
264
      logging.info("Received invalid arguments of type '%s'", type(args))
265
      raise ValueError("Invalid arguments type '%s'" % type(args))
266

    
267
    # TODO: Rewrite to not exit in each 'if/elif' branch
268

    
269
    if method == luxi.REQ_SUBMIT_JOB:
270
      logging.info("Received new job")
271
      (job_def, ) = args
272
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
273
      return queue.SubmitJob(ops)
274

    
275
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
276
      logging.info("Received multiple jobs")
277
      (job_defs, ) = args
278
      jobs = []
279
      for ops in job_defs:
280
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
281
      return queue.SubmitManyJobs(jobs)
282

    
283
    elif method == luxi.REQ_CANCEL_JOB:
284
      (job_id, ) = args
285
      logging.info("Received job cancel request for %s", job_id)
286
      return queue.CancelJob(job_id)
287

    
288
    elif method == luxi.REQ_ARCHIVE_JOB:
289
      (job_id, ) = args
290
      logging.info("Received job archive request for %s", job_id)
291
      return queue.ArchiveJob(job_id)
292

    
293
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
294
      (age, timeout) = args
295
      logging.info("Received job autoarchive request for age %s, timeout %s",
296
                   age, timeout)
297
      return queue.AutoArchiveJobs(age, timeout)
298

    
299
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
300
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
301
      logging.info("Received job poll request for %s", job_id)
302
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
303
                                     prev_log_serial, timeout)
304

    
305
    elif method == luxi.REQ_QUERY:
306
      (what, fields, qfilter) = args
307

    
308
      if what in constants.QR_VIA_OP:
309
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
310
                                             qfilter=qfilter))
311
      elif what == constants.QR_LOCK:
312
        if qfilter is not None:
313
          raise errors.OpPrereqError("Lock queries can't be filtered")
314
        return context.glm.QueryLocks(fields)
315
      elif what == constants.QR_JOB:
316
        return queue.QueryJobs(fields, qfilter)
317
      elif what in constants.QR_VIA_LUXI:
318
        raise NotImplementedError
319
      else:
320
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
321
                                   errors.ECODE_INVAL)
322

    
323
      return result
324

    
325
    elif method == luxi.REQ_QUERY_FIELDS:
326
      (what, fields) = args
327
      req = objects.QueryFieldsRequest(what=what, fields=fields)
328

    
329
      try:
330
        fielddefs = query.ALL_FIELDS[req.what]
331
      except KeyError:
332
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
333
                                   errors.ECODE_INVAL)
334

    
335
      return query.QueryFields(fielddefs, req.fields)
336

    
337
    elif method == luxi.REQ_QUERY_JOBS:
338
      (job_ids, fields) = args
339
      if isinstance(job_ids, (tuple, list)) and job_ids:
340
        msg = utils.CommaJoin(job_ids)
341
      else:
342
        msg = str(job_ids)
343
      logging.info("Received job query request for %s", msg)
344
      return queue.OldStyleQueryJobs(job_ids, fields)
345

    
346
    elif method == luxi.REQ_QUERY_INSTANCES:
347
      (names, fields, use_locking) = args
348
      logging.info("Received instance query request for %s", names)
349
      if use_locking:
350
        raise errors.OpPrereqError("Sync queries are not allowed",
351
                                   errors.ECODE_INVAL)
352
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
353
                                   use_locking=use_locking)
354
      return self._Query(op)
355

    
356
    elif method == luxi.REQ_QUERY_NODES:
357
      (names, fields, use_locking) = args
358
      logging.info("Received node query request for %s", names)
359
      if use_locking:
360
        raise errors.OpPrereqError("Sync queries are not allowed",
361
                                   errors.ECODE_INVAL)
362
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
363
                               use_locking=use_locking)
364
      return self._Query(op)
365

    
366
    elif method == luxi.REQ_QUERY_GROUPS:
367
      (names, fields, use_locking) = args
368
      logging.info("Received group query request for %s", names)
369
      if use_locking:
370
        raise errors.OpPrereqError("Sync queries are not allowed",
371
                                   errors.ECODE_INVAL)
372
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
373
      return self._Query(op)
374

    
375
    elif method == luxi.REQ_QUERY_NETWORKS:
376
      (names, fields, use_locking) = args
377
      logging.info("Received network query request for %s", names)
378
      if use_locking:
379
        raise errors.OpPrereqError("Sync queries are not allowed",
380
                                   errors.ECODE_INVAL)
381
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
382
      return self._Query(op)
383

    
384
    elif method == luxi.REQ_QUERY_EXPORTS:
385
      (nodes, use_locking) = args
386
      if use_locking:
387
        raise errors.OpPrereqError("Sync queries are not allowed",
388
                                   errors.ECODE_INVAL)
389
      logging.info("Received exports query request")
390
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
391
      return self._Query(op)
392

    
393
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
394
      (fields, ) = args
395
      logging.info("Received config values query request for %s", fields)
396
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
397
      return self._Query(op)
398

    
399
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
400
      logging.info("Received cluster info query request")
401
      op = opcodes.OpClusterQuery()
402
      return self._Query(op)
403

    
404
    elif method == luxi.REQ_QUERY_TAGS:
405
      (kind, name) = args
406
      logging.info("Received tags query request")
407
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
408
      return self._Query(op)
409

    
410
    elif method == luxi.REQ_SET_DRAIN_FLAG:
411
      (drain_flag, ) = args
412
      logging.info("Received queue drain flag change request to %s",
413
                   drain_flag)
414
      return queue.SetDrainFlag(drain_flag)
415

    
416
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
417
      (until, ) = args
418

    
419
      if until is None:
420
        logging.info("Received request to no longer pause the watcher")
421
      else:
422
        if not isinstance(until, (int, float)):
423
          raise TypeError("Duration must be an integer or float")
424

    
425
        if until < time.time():
426
          raise errors.GenericError("Unable to set pause end time in the past")
427

    
428
        logging.info("Received request to pause the watcher until %s", until)
429

    
430
      return _SetWatcherPause(until)
431

    
432
    else:
433
      logging.info("Received invalid request '%s'", method)
434
      raise ValueError("Invalid operation '%s'" % method)
435

    
436
  def _Query(self, op):
437
    """Runs the specified opcode and returns the result.
438

439
    """
440
    # Queries don't have a job id
441
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
442

    
443
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
444
    # Consider using a timeout for retries.
445
    return proc.ExecOpCode(op, None)
446

    
447

    
448
class GanetiContext(object):
449
  """Context common to all ganeti threads.
450

451
  This class creates and holds common objects shared by all threads.
452

453
  """
454
  # pylint: disable=W0212
455
  # we do want to ensure a singleton here
456
  _instance = None
457

    
458
  def __init__(self):
459
    """Constructs a new GanetiContext object.
460

461
    There should be only a GanetiContext object at any time, so this
462
    function raises an error if this is not the case.
463

464
    """
465
    assert self.__class__._instance is None, "double GanetiContext instance"
466

    
467
    # Create global configuration object
468
    self.cfg = config.ConfigWriter()
469

    
470
    # Locking manager
471
    self.glm = locking.GanetiLockManager(
472
                self.cfg.GetNodeList(),
473
                self.cfg.GetNodeGroupList(),
474
                self.cfg.GetInstanceList(),
475
                self.cfg.GetNetworkList())
476

    
477
    self.cfg.SetContext(self)
478

    
479
    # RPC runner
480
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
481

    
482
    # Job queue
483
    self.jobqueue = jqueue.JobQueue(self)
484

    
485
    # setting this also locks the class against attribute modifications
486
    self.__class__._instance = self
487

    
488
  def __setattr__(self, name, value):
489
    """Setting GanetiContext attributes is forbidden after initialization.
490

491
    """
492
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
493
    object.__setattr__(self, name, value)
494

    
495
  def AddNode(self, node, ec_id):
496
    """Adds a node to the configuration and lock manager.
497

498
    """
499
    # Add it to the configuration
500
    self.cfg.AddNode(node, ec_id)
501

    
502
    # If preseeding fails it'll not be added
503
    self.jobqueue.AddNode(node)
504

    
505
    # Add the new node to the Ganeti Lock Manager
506
    self.glm.add(locking.LEVEL_NODE, node.name)
507
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
508

    
509
  def ReaddNode(self, node):
510
    """Updates a node that's already in the configuration
511

512
    """
513
    # Synchronize the queue again
514
    self.jobqueue.AddNode(node)
515

    
516
  def RemoveNode(self, name):
517
    """Removes a node from the configuration and lock manager.
518

519
    """
520
    # Remove node from configuration
521
    self.cfg.RemoveNode(name)
522

    
523
    # Notify job queue
524
    self.jobqueue.RemoveNode(name)
525

    
526
    # Remove the node from the Ganeti Lock Manager
527
    self.glm.remove(locking.LEVEL_NODE, name)
528
    self.glm.remove(locking.LEVEL_NODE_RES, name)
529

    
530

    
531
def _SetWatcherPause(until):
532
  """Creates or removes the watcher pause file.
533

534
  @type until: None or int
535
  @param until: Unix timestamp saying until when the watcher shouldn't run
536

537
  """
538
  if until is None:
539
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
540
  else:
541
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
542
                    data="%d\n" % (until, ))
543

    
544
  return until
545

    
546

    
547
@rpc.RunWithRPC
548
def CheckAgreement():
549
  """Check the agreement on who is the master.
550

551
  The function uses a very simple algorithm: we must get more positive
552
  than negative answers. Since in most of the cases we are the master,
553
  we'll use our own config file for getting the node list. In the
554
  future we could collect the current node list from our (possibly
555
  obsolete) known nodes.
556

557
  In order to account for cold-start of all nodes, we retry for up to
558
  a minute until we get a real answer as the top-voted one. If the
559
  nodes are more out-of-sync, for now manual startup of the master
560
  should be attempted.
561

562
  Note that for a even number of nodes cluster, we need at least half
563
  of the nodes (beside ourselves) to vote for us. This creates a
564
  problem on two-node clusters, since in this case we require the
565
  other node to be up too to confirm our status.
566

567
  """
568
  myself = netutils.Hostname.GetSysName()
569
  #temp instantiation of a config writer, used only to get the node list
570
  cfg = config.ConfigWriter()
571
  node_list = cfg.GetNodeList()
572
  del cfg
573
  retries = 6
574
  while retries > 0:
575
    votes = bootstrap.GatherMasterVotes(node_list)
576
    if not votes:
577
      # empty node list, this is a one node cluster
578
      return True
579
    if votes[0][0] is None:
580
      retries -= 1
581
      time.sleep(10)
582
      continue
583
    break
584
  if retries == 0:
585
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
586
                     " after multiple retries. Aborting startup")
587
    logging.critical("Use the --no-voting option if you understand what"
588
                     " effects it has on the cluster state")
589
    return False
590
  # here a real node is at the top of the list
591
  all_votes = sum(item[1] for item in votes)
592
  top_node, top_votes = votes[0]
593

    
594
  result = False
595
  if top_node != myself:
596
    logging.critical("It seems we are not the master (top-voted node"
597
                     " is %s with %d out of %d votes)", top_node, top_votes,
598
                     all_votes)
599
  elif top_votes < all_votes - top_votes:
600
    logging.critical("It seems we are not the master (%d votes for,"
601
                     " %d votes against)", top_votes, all_votes - top_votes)
602
  else:
603
    result = True
604

    
605
  return result
606

    
607

    
608
@rpc.RunWithRPC
609
def ActivateMasterIP():
610
  # activate ip
611
  cfg = config.ConfigWriter()
612
  master_params = cfg.GetMasterNetworkParameters()
613
  ems = cfg.GetUseExternalMipScript()
614
  runner = rpc.BootstrapRunner()
615
  result = runner.call_node_activate_master_ip(master_params.name,
616
                                               master_params, ems)
617

    
618
  msg = result.fail_msg
619
  if msg:
620
    logging.error("Can't activate master IP address: %s", msg)
621

    
622

    
623
def CheckMasterd(options, args):
624
  """Initial checks whether to run or exit with a failure.
625

626
  """
627
  if args: # masterd doesn't take any arguments
628
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
629
    sys.exit(constants.EXIT_FAILURE)
630

    
631
  ssconf.CheckMaster(options.debug)
632

    
633
  try:
634
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
635
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
636
  except KeyError:
637
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
638
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
639
    sys.exit(constants.EXIT_FAILURE)
640

    
641
  # Determine static runtime architecture information
642
  runtime.InitArchInfo()
643

    
644
  # Check the configuration is sane before anything else
645
  try:
646
    config.ConfigWriter()
647
  except errors.ConfigVersionMismatch, err:
648
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
649
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
650
    print >> sys.stderr,  \
651
        ("Configuration version mismatch. The current Ganeti software"
652
         " expects version %s, but the on-disk configuration file has"
653
         " version %s. This is likely the result of upgrading the"
654
         " software without running the upgrade procedure. Please contact"
655
         " your cluster administrator or complete the upgrade using the"
656
         " cfgupgrade utility, after reading the upgrade notes." %
657
         (v1, v2))
658
    sys.exit(constants.EXIT_FAILURE)
659
  except errors.ConfigurationError, err:
660
    print >> sys.stderr, \
661
        ("Configuration error while opening the configuration file: %s\n"
662
         "This might be caused by an incomplete software upgrade or"
663
         " by a corrupted configuration file. Until the problem is fixed"
664
         " the master daemon cannot start." % str(err))
665
    sys.exit(constants.EXIT_FAILURE)
666

    
667
  # If CheckMaster didn't fail we believe we are the master, but we have to
668
  # confirm with the other nodes.
669
  if options.no_voting:
670
    if not options.yes_do_it:
671
      sys.stdout.write("The 'no voting' option has been selected.\n")
672
      sys.stdout.write("This is dangerous, please confirm by"
673
                       " typing uppercase 'yes': ")
674
      sys.stdout.flush()
675

    
676
      confirmation = sys.stdin.readline().strip()
677
      if confirmation != "YES":
678
        print >> sys.stderr, "Aborting."
679
        sys.exit(constants.EXIT_FAILURE)
680

    
681
  else:
682
    # CheckAgreement uses RPC and threads, hence it needs to be run in
683
    # a separate process before we call utils.Daemonize in the current
684
    # process.
685
    if not utils.RunInSeparateProcess(CheckAgreement):
686
      sys.exit(constants.EXIT_FAILURE)
687

    
688
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
689
  # separate process.
690

    
691
  # TODO: decide whether failure to activate the master IP is a fatal error
692
  utils.RunInSeparateProcess(ActivateMasterIP)
693

    
694

    
695
def PrepMasterd(options, _):
696
  """Prep master daemon function, executed with the PID file held.
697

698
  """
699
  # This is safe to do as the pid file guarantees against
700
  # concurrent execution.
701
  utils.RemoveFile(constants.MASTER_SOCKET)
702

    
703
  mainloop = daemon.Mainloop()
704
  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
705
  return (mainloop, master)
706

    
707

    
708
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
709
  """Main master daemon function, executed with the PID file held.
710

711
  """
712
  (mainloop, master) = prep_data
713
  try:
714
    rpc.Init()
715
    try:
716
      master.setup_queue()
717
      try:
718
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
719
      finally:
720
        master.server_cleanup()
721
    finally:
722
      rpc.Shutdown()
723
  finally:
724
    utils.RemoveFile(constants.MASTER_SOCKET)
725

    
726
  logging.info("Clean master daemon shutdown")
727

    
728

    
729
def Main():
730
  """Main function"""
731
  parser = OptionParser(description="Ganeti master daemon",
732
                        usage="%prog [-f] [-d]",
733
                        version="%%prog (ganeti) %s" %
734
                        constants.RELEASE_VERSION)
735
  parser.add_option("--no-voting", dest="no_voting",
736
                    help="Do not check that the nodes agree on this node"
737
                    " being the master and start the daemon unconditionally",
738
                    default=False, action="store_true")
739
  parser.add_option("--yes-do-it", dest="yes_do_it",
740
                    help="Override interactive check for --no-voting",
741
                    default=False, action="store_true")
742
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
743
                     ExecMasterd, multithreaded=True)