Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 4869595d

History | View | Annotate | Download (24.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
import ganeti.rpc.node as rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62
from ganeti import ht
63

    
64
from ganeti.utils import version
65

    
66

    
67
CLIENT_REQUEST_WORKERS = 16
68

    
69
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
70
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
71

    
72

    
73
def _LogNewJob(status, info, ops):
74
  """Log information about a recently submitted job.
75

76
  """
77
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
78

    
79
  if status:
80
    logging.info("New job with id %s, summary: %s", info, op_summary)
81
  else:
82
    logging.info("Failed to submit job, reason: '%s', summary: %s",
83
                 info, op_summary)
84

    
85

    
86
class ClientRequestWorker(workerpool.BaseWorker):
87
  # pylint: disable=W0221
88
  def RunTask(self, server, message, client):
89
    """Process the request.
90

91
    """
92
    client_ops = ClientOps(server)
93

    
94
    try:
95
      (method, args, ver) = luxi.ParseRequest(message)
96
    except luxi.ProtocolError, err:
97
      logging.error("Protocol Error: %s", err)
98
      client.close_log()
99
      return
100

    
101
    success = False
102
    try:
103
      # Verify client's version if there was one in the request
104
      if ver is not None and ver != constants.LUXI_VERSION:
105
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
106
                               (constants.LUXI_VERSION, ver))
107

    
108
      result = client_ops.handle_request(method, args)
109
      success = True
110
    except errors.GenericError, err:
111
      logging.exception("Unexpected exception")
112
      success = False
113
      result = errors.EncodeException(err)
114
    except:
115
      logging.exception("Unexpected exception")
116
      err = sys.exc_info()
117
      result = "Caught exception: %s" % str(err[1])
118

    
119
    try:
120
      reply = luxi.FormatResponse(success, result)
121
      client.send_message(reply)
122
      # awake the main thread so that it can write out the data.
123
      server.awaker.signal()
124
    except: # pylint: disable=W0702
125
      logging.exception("Send error")
126
      client.close_log()
127

    
128

    
129
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
130
  """Handler for master peers.
131

132
  """
133
  _MAX_UNHANDLED = 1
134

    
135
  def __init__(self, server, connected_socket, client_address, family):
136
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
137
                                                 client_address,
138
                                                 constants.LUXI_EOM,
139
                                                 family, self._MAX_UNHANDLED)
140
    self.server = server
141

    
142
  def handle_message(self, message, _):
143
    self.server.request_workers.AddTask((self.server, message, self))
144

    
145

    
146
class _MasterShutdownCheck:
147
  """Logic for master daemon shutdown.
148

149
  """
150
  #: How long to wait between checks
151
  _CHECK_INTERVAL = 5.0
152

    
153
  #: How long to wait after all jobs are done (e.g. to give clients time to
154
  #: retrieve the job status)
155
  _SHUTDOWN_LINGER = 5.0
156

    
157
  def __init__(self):
158
    """Initializes this class.
159

160
    """
161
    self._had_active_jobs = None
162
    self._linger_timeout = None
163

    
164
  def __call__(self, jq_prepare_result):
165
    """Determines if master daemon is ready for shutdown.
166

167
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
168
    @rtype: None or number
169
    @return: None if master daemon is ready, timeout if the check must be
170
             repeated
171

172
    """
173
    if jq_prepare_result:
174
      # Check again shortly
175
      logging.info("Job queue has been notified for shutdown but is still"
176
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
177
      self._had_active_jobs = True
178
      return self._CHECK_INTERVAL
179

    
180
    if not self._had_active_jobs:
181
      # Can shut down as there were no active jobs on the first check
182
      return None
183

    
184
    # No jobs are running anymore, but maybe some clients want to collect some
185
    # information. Give them a short amount of time.
186
    if self._linger_timeout is None:
187
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
188

    
189
    remaining = self._linger_timeout.Remaining()
190

    
191
    logging.info("Job queue no longer busy; shutting down master daemon"
192
                 " in %s seconds", remaining)
193

    
194
    # TODO: Should the master daemon socket be closed at this point? Doing so
195
    # wouldn't affect existing connections.
196

    
197
    if remaining < 0:
198
      return None
199
    else:
200
      return remaining
201

    
202

    
203
class MasterServer(daemon.AsyncStreamServer):
204
  """Master Server.
205

206
  This is the main asynchronous master server. It handles connections to the
207
  master socket.
208

209
  """
210
  family = socket.AF_UNIX
211

    
212
  def __init__(self, address, uid, gid):
213
    """MasterServer constructor
214

215
    @param address: the unix socket address to bind the MasterServer to
216
    @param uid: The uid of the owner of the socket
217
    @param gid: The gid of the owner of the socket
218

219
    """
220
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
221
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
222
    os.chmod(temp_name, 0770)
223
    os.chown(temp_name, uid, gid)
224
    os.rename(temp_name, address)
225

    
226
    self.awaker = daemon.AsyncAwaker()
227

    
228
    # We'll only start threads once we've forked.
229
    self.context = None
230
    self.request_workers = None
231

    
232
    self._shutdown_check = None
233

    
234
  def handle_connection(self, connected_socket, client_address):
235
    # TODO: add connection count and limit the number of open connections to a
236
    # maximum number to avoid breaking for lack of file descriptors or memory.
237
    MasterClientHandler(self, connected_socket, client_address, self.family)
238

    
239
  def setup_queue(self):
240
    self.context = GanetiContext()
241
    self.request_workers = workerpool.WorkerPool("ClientReq",
242
                                                 CLIENT_REQUEST_WORKERS,
243
                                                 ClientRequestWorker)
244

    
245
  def WaitForShutdown(self):
246
    """Prepares server for shutdown.
247

248
    """
249
    if self._shutdown_check is None:
250
      self._shutdown_check = _MasterShutdownCheck()
251

    
252
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
253

    
254
  def server_cleanup(self):
255
    """Cleanup the server.
256

257
    This involves shutting down the processor threads and the master
258
    socket.
259

260
    """
261
    try:
262
      self.close()
263
    finally:
264
      if self.request_workers:
265
        self.request_workers.TerminateWorkers()
266
      if self.context:
267
        self.context.jobqueue.Shutdown()
268

    
269

    
270
class ClientOps:
271
  """Class holding high-level client operations."""
272
  def __init__(self, server):
273
    self.server = server
274

    
275
  def handle_request(self, method, args): # pylint: disable=R0911
276
    context = self.server.context
277
    queue = context.jobqueue
278

    
279
    # TODO: Parameter validation
280
    if not isinstance(args, (tuple, list)):
281
      logging.info("Received invalid arguments of type '%s'", type(args))
282
      raise ValueError("Invalid arguments type '%s'" % type(args))
283

    
284
    if method not in luxi.REQ_ALL:
285
      logging.info("Received invalid request '%s'", method)
286
      raise ValueError("Invalid operation '%s'" % method)
287

    
288
    # TODO: Rewrite to not exit in each 'if/elif' branch
289

    
290
    if method == luxi.REQ_SUBMIT_JOB:
291
      logging.info("Receiving new job")
292
      (job_def, ) = args
293
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
294
      job_id = queue.SubmitJob(ops)
295
      _LogNewJob(True, job_id, ops)
296
      return job_id
297

    
298
    elif method == luxi.REQ_PICKUP_JOB:
299
      logging.info("Picking up new job from queue")
300
      (job_id, ) = args
301
      queue.PickupJob(job_id)
302

    
303
    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
304
      logging.info("Forcefully receiving new job")
305
      (job_def, ) = args
306
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
307
      job_id = queue.SubmitJobToDrainedQueue(ops)
308
      _LogNewJob(True, job_id, ops)
309
      return job_id
310

    
311
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
312
      logging.info("Receiving multiple jobs")
313
      (job_defs, ) = args
314
      jobs = []
315
      for ops in job_defs:
316
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
317
      job_ids = queue.SubmitManyJobs(jobs)
318
      for ((status, job_id), ops) in zip(job_ids, jobs):
319
        _LogNewJob(status, job_id, ops)
320
      return job_ids
321

    
322
    elif method == luxi.REQ_CANCEL_JOB:
323
      (job_id, ) = args
324
      logging.info("Received job cancel request for %s", job_id)
325
      return queue.CancelJob(job_id)
326

    
327
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
328
      (job_id, priority) = args
329
      logging.info("Received request to change priority for job %s to %s",
330
                   job_id, priority)
331
      return queue.ChangeJobPriority(job_id, priority)
332

    
333
    elif method == luxi.REQ_ARCHIVE_JOB:
334
      (job_id, ) = args
335
      logging.info("Received job archive request for %s", job_id)
336
      return queue.ArchiveJob(job_id)
337

    
338
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
339
      (age, timeout) = args
340
      logging.info("Received job autoarchive request for age %s, timeout %s",
341
                   age, timeout)
342
      return queue.AutoArchiveJobs(age, timeout)
343

    
344
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
345
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
346
      logging.info("Received job poll request for %s", job_id)
347
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
348
                                     prev_log_serial, timeout)
349

    
350
    elif method == luxi.REQ_QUERY:
351
      (what, fields, qfilter) = args
352

    
353
      if what in constants.QR_VIA_OP:
354
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
355
                                             qfilter=qfilter))
356
      elif what == constants.QR_LOCK:
357
        if qfilter is not None:
358
          raise errors.OpPrereqError("Lock queries can't be filtered",
359
                                     errors.ECODE_INVAL)
360
        return context.glm.QueryLocks(fields)
361
      elif what == constants.QR_JOB:
362
        return queue.QueryJobs(fields, qfilter)
363
      elif what in constants.QR_VIA_LUXI:
364
        luxi_client = runtime.GetClient(query=True)
365
        result = luxi_client.Query(what, fields, qfilter).ToDict()
366
      else:
367
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
368
                                   errors.ECODE_INVAL)
369

    
370
      return result
371

    
372
    elif method == luxi.REQ_QUERY_FIELDS:
373
      (what, fields) = args
374
      req = objects.QueryFieldsRequest(what=what, fields=fields)
375

    
376
      try:
377
        fielddefs = query.ALL_FIELDS[req.what]
378
      except KeyError:
379
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
380
                                   errors.ECODE_INVAL)
381

    
382
      return query.QueryFields(fielddefs, req.fields)
383

    
384
    elif method == luxi.REQ_QUERY_JOBS:
385
      (job_ids, fields) = args
386
      if isinstance(job_ids, (tuple, list)) and job_ids:
387
        msg = utils.CommaJoin(job_ids)
388
      else:
389
        msg = str(job_ids)
390
      logging.info("Received job query request for %s", msg)
391
      return queue.OldStyleQueryJobs(job_ids, fields)
392

    
393
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
394
      (fields, ) = args
395
      logging.info("Received config values query request for %s", fields)
396
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
397
      return self._Query(op)
398

    
399
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
400
      logging.info("Received cluster info query request")
401
      op = opcodes.OpClusterQuery()
402
      return self._Query(op)
403

    
404
    elif method == luxi.REQ_QUERY_TAGS:
405
      (kind, name) = args
406
      logging.info("Received tags query request")
407
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
408
      return self._Query(op)
409

    
410
    elif method == luxi.REQ_SET_DRAIN_FLAG:
411
      (drain_flag, ) = args
412
      logging.info("Received queue drain flag change request to %s",
413
                   drain_flag)
414
      return queue.SetDrainFlag(drain_flag)
415

    
416
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
417
      (until, ) = args
418

    
419
      return _SetWatcherPause(context, until)
420

    
421
    else:
422
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
423
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
424
                                   " but not implemented" % method)
425

    
426
  def _Query(self, op):
427
    """Runs the specified opcode and returns the result.
428

429
    """
430
    # Queries don't have a job id
431
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
432

    
433
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
434
    # Consider using a timeout for retries.
435
    return proc.ExecOpCode(op, None)
436

    
437

    
438
class GanetiContext(object):
439
  """Context common to all ganeti threads.
440

441
  This class creates and holds common objects shared by all threads.
442

443
  """
444
  # pylint: disable=W0212
445
  # we do want to ensure a singleton here
446
  _instance = None
447

    
448
  def __init__(self):
449
    """Constructs a new GanetiContext object.
450

451
    There should be only a GanetiContext object at any time, so this
452
    function raises an error if this is not the case.
453

454
    """
455
    assert self.__class__._instance is None, "double GanetiContext instance"
456

    
457
    # Create global configuration object
458
    self.cfg = config.ConfigWriter()
459

    
460
    # Locking manager
461
    self.glm = locking.GanetiLockManager(
462
      self.cfg.GetNodeList(),
463
      self.cfg.GetNodeGroupList(),
464
      [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
465
      self.cfg.GetNetworkList())
466

    
467
    self.cfg.SetContext(self)
468

    
469
    # RPC runner
470
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
471

    
472
    # Job queue
473
    self.jobqueue = jqueue.JobQueue(self)
474

    
475
    # setting this also locks the class against attribute modifications
476
    self.__class__._instance = self
477

    
478
  def __setattr__(self, name, value):
479
    """Setting GanetiContext attributes is forbidden after initialization.
480

481
    """
482
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
483
    object.__setattr__(self, name, value)
484

    
485
  def AddNode(self, node, ec_id):
486
    """Adds a node to the configuration and lock manager.
487

488
    """
489
    # Add it to the configuration
490
    self.cfg.AddNode(node, ec_id)
491

    
492
    # If preseeding fails it'll not be added
493
    self.jobqueue.AddNode(node)
494

    
495
    # Add the new node to the Ganeti Lock Manager
496
    self.glm.add(locking.LEVEL_NODE, node.uuid)
497
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
498

    
499
  def ReaddNode(self, node):
500
    """Updates a node that's already in the configuration
501

502
    """
503
    # Synchronize the queue again
504
    self.jobqueue.AddNode(node)
505

    
506
  def RemoveNode(self, node):
507
    """Removes a node from the configuration and lock manager.
508

509
    """
510
    # Remove node from configuration
511
    self.cfg.RemoveNode(node.uuid)
512

    
513
    # Notify job queue
514
    self.jobqueue.RemoveNode(node.name)
515

    
516
    # Remove the node from the Ganeti Lock Manager
517
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
518
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
519

    
520

    
521
def _SetWatcherPause(context, until):
522
  """Creates or removes the watcher pause file.
523

524
  @type context: L{GanetiContext}
525
  @param context: Global Ganeti context
526
  @type until: None or int
527
  @param until: Unix timestamp saying until when the watcher shouldn't run
528

529
  """
530
  node_names = context.cfg.GetNodeList()
531

    
532
  if until is None:
533
    logging.info("Received request to no longer pause watcher")
534
  else:
535
    if not ht.TNumber(until):
536
      raise TypeError("Duration must be numeric")
537

    
538
    if until < time.time():
539
      raise errors.GenericError("Unable to set pause end time in the past")
540

    
541
    logging.info("Received request to pause watcher until %s", until)
542

    
543
  result = context.rpc.call_set_watcher_pause(node_names, until)
544

    
545
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
546
                           for (node_name, nres) in result.items()
547
                           if nres.fail_msg and not nres.offline)
548
  if errmsg:
549
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
550
                             " on the following node(s): %s" % errmsg)
551

    
552
  return until
553

    
554

    
555
@rpc.RunWithRPC
556
def CheckAgreement():
557
  """Check the agreement on who is the master.
558

559
  The function uses a very simple algorithm: we must get more positive
560
  than negative answers. Since in most of the cases we are the master,
561
  we'll use our own config file for getting the node list. In the
562
  future we could collect the current node list from our (possibly
563
  obsolete) known nodes.
564

565
  In order to account for cold-start of all nodes, we retry for up to
566
  a minute until we get a real answer as the top-voted one. If the
567
  nodes are more out-of-sync, for now manual startup of the master
568
  should be attempted.
569

570
  Note that for a even number of nodes cluster, we need at least half
571
  of the nodes (beside ourselves) to vote for us. This creates a
572
  problem on two-node clusters, since in this case we require the
573
  other node to be up too to confirm our status.
574

575
  """
576
  myself = netutils.Hostname.GetSysName()
577
  #temp instantiation of a config writer, used only to get the node list
578
  cfg = config.ConfigWriter()
579
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
580
  del cfg
581
  retries = 6
582
  while retries > 0:
583
    votes = bootstrap.GatherMasterVotes(node_names)
584
    if not votes:
585
      # empty node list, this is a one node cluster
586
      return True
587
    if votes[0][0] is None:
588
      retries -= 1
589
      time.sleep(10)
590
      continue
591
    break
592
  if retries == 0:
593
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
594
                     " after multiple retries. Aborting startup")
595
    logging.critical("Use the --no-voting option if you understand what"
596
                     " effects it has on the cluster state")
597
    return False
598
  # here a real node is at the top of the list
599
  all_votes = sum(item[1] for item in votes)
600
  top_node, top_votes = votes[0]
601

    
602
  result = False
603
  if top_node != myself:
604
    logging.critical("It seems we are not the master (top-voted node"
605
                     " is %s with %d out of %d votes)", top_node, top_votes,
606
                     all_votes)
607
  elif top_votes < all_votes - top_votes:
608
    logging.critical("It seems we are not the master (%d votes for,"
609
                     " %d votes against)", top_votes, all_votes - top_votes)
610
  else:
611
    result = True
612

    
613
  return result
614

    
615

    
616
@rpc.RunWithRPC
617
def ActivateMasterIP():
618
  # activate ip
619
  cfg = config.ConfigWriter()
620
  master_params = cfg.GetMasterNetworkParameters()
621
  ems = cfg.GetUseExternalMipScript()
622
  runner = rpc.BootstrapRunner()
623
  # we use the node name, as the configuration is only available here yet
624
  result = runner.call_node_activate_master_ip(
625
             cfg.GetNodeName(master_params.uuid), master_params, ems)
626

    
627
  msg = result.fail_msg
628
  if msg:
629
    logging.error("Can't activate master IP address: %s", msg)
630

    
631

    
632
def CheckMasterd(options, args):
633
  """Initial checks whether to run or exit with a failure.
634

635
  """
636
  if args: # masterd doesn't take any arguments
637
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
638
    sys.exit(constants.EXIT_FAILURE)
639

    
640
  ssconf.CheckMaster(options.debug)
641

    
642
  try:
643
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
644
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
645
  except KeyError:
646
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
647
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
648
    sys.exit(constants.EXIT_FAILURE)
649

    
650
  # Determine static runtime architecture information
651
  runtime.InitArchInfo()
652

    
653
  # Check the configuration is sane before anything else
654
  try:
655
    config.ConfigWriter()
656
  except errors.ConfigVersionMismatch, err:
657
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
658
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
659
    print >> sys.stderr,  \
660
        ("Configuration version mismatch. The current Ganeti software"
661
         " expects version %s, but the on-disk configuration file has"
662
         " version %s. This is likely the result of upgrading the"
663
         " software without running the upgrade procedure. Please contact"
664
         " your cluster administrator or complete the upgrade using the"
665
         " cfgupgrade utility, after reading the upgrade notes." %
666
         (v1, v2))
667
    sys.exit(constants.EXIT_FAILURE)
668
  except errors.ConfigurationError, err:
669
    print >> sys.stderr, \
670
        ("Configuration error while opening the configuration file: %s\n"
671
         "This might be caused by an incomplete software upgrade or"
672
         " by a corrupted configuration file. Until the problem is fixed"
673
         " the master daemon cannot start." % str(err))
674
    sys.exit(constants.EXIT_FAILURE)
675

    
676
  # If CheckMaster didn't fail we believe we are the master, but we have to
677
  # confirm with the other nodes.
678
  if options.no_voting:
679
    if not options.yes_do_it:
680
      sys.stdout.write("The 'no voting' option has been selected.\n")
681
      sys.stdout.write("This is dangerous, please confirm by"
682
                       " typing uppercase 'yes': ")
683
      sys.stdout.flush()
684

    
685
      confirmation = sys.stdin.readline().strip()
686
      if confirmation != "YES":
687
        print >> sys.stderr, "Aborting."
688
        sys.exit(constants.EXIT_FAILURE)
689

    
690
  else:
691
    # CheckAgreement uses RPC and threads, hence it needs to be run in
692
    # a separate process before we call utils.Daemonize in the current
693
    # process.
694
    if not utils.RunInSeparateProcess(CheckAgreement):
695
      sys.exit(constants.EXIT_FAILURE)
696

    
697
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
698
  # separate process.
699

    
700
  # TODO: decide whether failure to activate the master IP is a fatal error
701
  utils.RunInSeparateProcess(ActivateMasterIP)
702

    
703

    
704
def PrepMasterd(options, _):
705
  """Prep master daemon function, executed with the PID file held.
706

707
  """
708
  # This is safe to do as the pid file guarantees against
709
  # concurrent execution.
710
  utils.RemoveFile(pathutils.MASTER_SOCKET)
711

    
712
  mainloop = daemon.Mainloop()
713
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
714
  return (mainloop, master)
715

    
716

    
717
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
718
  """Main master daemon function, executed with the PID file held.
719

720
  """
721
  (mainloop, master) = prep_data
722
  try:
723
    rpc.Init()
724
    try:
725
      master.setup_queue()
726
      try:
727
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
728
      finally:
729
        master.server_cleanup()
730
    finally:
731
      rpc.Shutdown()
732
  finally:
733
    utils.RemoveFile(pathutils.MASTER_SOCKET)
734

    
735
  logging.info("Clean master daemon shutdown")
736

    
737

    
738
def Main():
739
  """Main function"""
740
  parser = OptionParser(description="Ganeti master daemon",
741
                        usage="%prog [-f] [-d]",
742
                        version="%%prog (ganeti) %s" %
743
                        constants.RELEASE_VERSION)
744
  parser.add_option("--no-voting", dest="no_voting",
745
                    help="Do not check that the nodes agree on this node"
746
                    " being the master and start the daemon unconditionally",
747
                    default=False, action="store_true")
748
  parser.add_option("--yes-do-it", dest="yes_do_it",
749
                    help="Override interactive check for --no-voting",
750
                    default=False, action="store_true")
751
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
752
                     ExecMasterd, multithreaded=True)