Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ db2203e0

History | View | Annotate | Download (24.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62

    
63

    
64
CLIENT_REQUEST_WORKERS = 16
65

    
66
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68

    
69

    
70
def _LogNewJob(status, info, ops):
71
  """Log information about a recently submitted job.
72

73
  """
74
  if status:
75
    logging.info("New job with id %s, summary: %s",
76
                 info, utils.CommaJoin(op.Summary() for op in ops))
77
  else:
78
    logging.info("Failed to submit job, reason: '%s', summary: %s",
79
                 info, utils.CommaJoin(op.Summary() for op in ops))
80

    
81

    
82
class ClientRequestWorker(workerpool.BaseWorker):
83
  # pylint: disable=W0221
84
  def RunTask(self, server, message, client):
85
    """Process the request.
86

87
    """
88
    client_ops = ClientOps(server)
89

    
90
    try:
91
      (method, args, version) = luxi.ParseRequest(message)
92
    except luxi.ProtocolError, err:
93
      logging.error("Protocol Error: %s", err)
94
      client.close_log()
95
      return
96

    
97
    success = False
98
    try:
99
      # Verify client's version if there was one in the request
100
      if version is not None and version != constants.LUXI_VERSION:
101
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102
                               (constants.LUXI_VERSION, version))
103

    
104
      result = client_ops.handle_request(method, args)
105
      success = True
106
    except errors.GenericError, err:
107
      logging.exception("Unexpected exception")
108
      success = False
109
      result = errors.EncodeException(err)
110
    except:
111
      logging.exception("Unexpected exception")
112
      err = sys.exc_info()
113
      result = "Caught exception: %s" % str(err[1])
114

    
115
    try:
116
      reply = luxi.FormatResponse(success, result)
117
      client.send_message(reply)
118
      # awake the main thread so that it can write out the data.
119
      server.awaker.signal()
120
    except: # pylint: disable=W0702
121
      logging.exception("Send error")
122
      client.close_log()
123

    
124

    
125
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126
  """Handler for master peers.
127

128
  """
129
  _MAX_UNHANDLED = 1
130

    
131
  def __init__(self, server, connected_socket, client_address, family):
132
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
133
                                                 client_address,
134
                                                 constants.LUXI_EOM,
135
                                                 family, self._MAX_UNHANDLED)
136
    self.server = server
137

    
138
  def handle_message(self, message, _):
139
    self.server.request_workers.AddTask((self.server, message, self))
140

    
141

    
142
class _MasterShutdownCheck:
143
  """Logic for master daemon shutdown.
144

145
  """
146
  #: How long to wait between checks
147
  _CHECK_INTERVAL = 5.0
148

    
149
  #: How long to wait after all jobs are done (e.g. to give clients time to
150
  #: retrieve the job status)
151
  _SHUTDOWN_LINGER = 5.0
152

    
153
  def __init__(self):
154
    """Initializes this class.
155

156
    """
157
    self._had_active_jobs = None
158
    self._linger_timeout = None
159

    
160
  def __call__(self, jq_prepare_result):
161
    """Determines if master daemon is ready for shutdown.
162

163
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164
    @rtype: None or number
165
    @return: None if master daemon is ready, timeout if the check must be
166
             repeated
167

168
    """
169
    if jq_prepare_result:
170
      # Check again shortly
171
      logging.info("Job queue has been notified for shutdown but is still"
172
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
173
      self._had_active_jobs = True
174
      return self._CHECK_INTERVAL
175

    
176
    if not self._had_active_jobs:
177
      # Can shut down as there were no active jobs on the first check
178
      return None
179

    
180
    # No jobs are running anymore, but maybe some clients want to collect some
181
    # information. Give them a short amount of time.
182
    if self._linger_timeout is None:
183
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
184

    
185
    remaining = self._linger_timeout.Remaining()
186

    
187
    logging.info("Job queue no longer busy; shutting down master daemon"
188
                 " in %s seconds", remaining)
189

    
190
    # TODO: Should the master daemon socket be closed at this point? Doing so
191
    # wouldn't affect existing connections.
192

    
193
    if remaining < 0:
194
      return None
195
    else:
196
      return remaining
197

    
198

    
199
class MasterServer(daemon.AsyncStreamServer):
200
  """Master Server.
201

202
  This is the main asynchronous master server. It handles connections to the
203
  master socket.
204

205
  """
206
  family = socket.AF_UNIX
207

    
208
  def __init__(self, address, uid, gid):
209
    """MasterServer constructor
210

211
    @param address: the unix socket address to bind the MasterServer to
212
    @param uid: The uid of the owner of the socket
213
    @param gid: The gid of the owner of the socket
214

215
    """
216
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218
    os.chmod(temp_name, 0770)
219
    os.chown(temp_name, uid, gid)
220
    os.rename(temp_name, address)
221

    
222
    self.awaker = daemon.AsyncAwaker()
223

    
224
    # We'll only start threads once we've forked.
225
    self.context = None
226
    self.request_workers = None
227

    
228
    self._shutdown_check = None
229

    
230
  def handle_connection(self, connected_socket, client_address):
231
    # TODO: add connection count and limit the number of open connections to a
232
    # maximum number to avoid breaking for lack of file descriptors or memory.
233
    MasterClientHandler(self, connected_socket, client_address, self.family)
234

    
235
  def setup_queue(self):
236
    self.context = GanetiContext()
237
    self.request_workers = workerpool.WorkerPool("ClientReq",
238
                                                 CLIENT_REQUEST_WORKERS,
239
                                                 ClientRequestWorker)
240

    
241
  def WaitForShutdown(self):
242
    """Prepares server for shutdown.
243

244
    """
245
    if self._shutdown_check is None:
246
      self._shutdown_check = _MasterShutdownCheck()
247

    
248
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249

    
250
  def server_cleanup(self):
251
    """Cleanup the server.
252

253
    This involves shutting down the processor threads and the master
254
    socket.
255

256
    """
257
    try:
258
      self.close()
259
    finally:
260
      if self.request_workers:
261
        self.request_workers.TerminateWorkers()
262
      if self.context:
263
        self.context.jobqueue.Shutdown()
264

    
265

    
266
class ClientOps:
267
  """Class holding high-level client operations."""
268
  def __init__(self, server):
269
    self.server = server
270

    
271
  def handle_request(self, method, args): # pylint: disable=R0911
272
    context = self.server.context
273
    queue = context.jobqueue
274

    
275
    # TODO: Parameter validation
276
    if not isinstance(args, (tuple, list)):
277
      logging.info("Received invalid arguments of type '%s'", type(args))
278
      raise ValueError("Invalid arguments type '%s'" % type(args))
279

    
280
    # TODO: Rewrite to not exit in each 'if/elif' branch
281

    
282
    if method == luxi.REQ_SUBMIT_JOB:
283
      logging.info("Receiving new job")
284
      (job_def, ) = args
285
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286
      job_id = queue.SubmitJob(ops)
287
      _LogNewJob(True, job_id, ops)
288
      return job_id
289

    
290
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291
      logging.info("Receiving multiple jobs")
292
      (job_defs, ) = args
293
      jobs = []
294
      for ops in job_defs:
295
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296
      job_ids = queue.SubmitManyJobs(jobs)
297
      for ((status, job_id), ops) in zip(job_ids, jobs):
298
        _LogNewJob(status, job_id, ops)
299
      return job_ids
300

    
301
    elif method == luxi.REQ_CANCEL_JOB:
302
      (job_id, ) = args
303
      logging.info("Received job cancel request for %s", job_id)
304
      return queue.CancelJob(job_id)
305

    
306
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307
      (job_id, priority) = args
308
      logging.info("Received request to change priority for job %s to %s",
309
                   job_id, priority)
310
      return queue.ChangeJobPriority(job_id, priority)
311

    
312
    elif method == luxi.REQ_ARCHIVE_JOB:
313
      (job_id, ) = args
314
      logging.info("Received job archive request for %s", job_id)
315
      return queue.ArchiveJob(job_id)
316

    
317
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318
      (age, timeout) = args
319
      logging.info("Received job autoarchive request for age %s, timeout %s",
320
                   age, timeout)
321
      return queue.AutoArchiveJobs(age, timeout)
322

    
323
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325
      logging.info("Received job poll request for %s", job_id)
326
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327
                                     prev_log_serial, timeout)
328

    
329
    elif method == luxi.REQ_QUERY:
330
      (what, fields, qfilter) = args
331

    
332
      if what in constants.QR_VIA_OP:
333
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
334
                                             qfilter=qfilter))
335
      elif what == constants.QR_LOCK:
336
        if qfilter is not None:
337
          raise errors.OpPrereqError("Lock queries can't be filtered",
338
                                     errors.ECODE_INVAL)
339
        return context.glm.QueryLocks(fields)
340
      elif what == constants.QR_JOB:
341
        return queue.QueryJobs(fields, qfilter)
342
      elif what in constants.QR_VIA_LUXI:
343
        raise NotImplementedError
344
      else:
345
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
346
                                   errors.ECODE_INVAL)
347

    
348
      return result
349

    
350
    elif method == luxi.REQ_QUERY_FIELDS:
351
      (what, fields) = args
352
      req = objects.QueryFieldsRequest(what=what, fields=fields)
353

    
354
      try:
355
        fielddefs = query.ALL_FIELDS[req.what]
356
      except KeyError:
357
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
358
                                   errors.ECODE_INVAL)
359

    
360
      return query.QueryFields(fielddefs, req.fields)
361

    
362
    elif method == luxi.REQ_QUERY_JOBS:
363
      (job_ids, fields) = args
364
      if isinstance(job_ids, (tuple, list)) and job_ids:
365
        msg = utils.CommaJoin(job_ids)
366
      else:
367
        msg = str(job_ids)
368
      logging.info("Received job query request for %s", msg)
369
      return queue.OldStyleQueryJobs(job_ids, fields)
370

    
371
    elif method == luxi.REQ_QUERY_INSTANCES:
372
      (names, fields, use_locking) = args
373
      logging.info("Received instance query request for %s", names)
374
      if use_locking:
375
        raise errors.OpPrereqError("Sync queries are not allowed",
376
                                   errors.ECODE_INVAL)
377
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378
                                   use_locking=use_locking)
379
      return self._Query(op)
380

    
381
    elif method == luxi.REQ_QUERY_NODES:
382
      (names, fields, use_locking) = args
383
      logging.info("Received node query request for %s", names)
384
      if use_locking:
385
        raise errors.OpPrereqError("Sync queries are not allowed",
386
                                   errors.ECODE_INVAL)
387
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388
                               use_locking=use_locking)
389
      return self._Query(op)
390

    
391
    elif method == luxi.REQ_QUERY_GROUPS:
392
      (names, fields, use_locking) = args
393
      logging.info("Received group query request for %s", names)
394
      if use_locking:
395
        raise errors.OpPrereqError("Sync queries are not allowed",
396
                                   errors.ECODE_INVAL)
397
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398
      return self._Query(op)
399

    
400
    elif method == luxi.REQ_QUERY_EXPORTS:
401
      (nodes, use_locking) = args
402
      if use_locking:
403
        raise errors.OpPrereqError("Sync queries are not allowed",
404
                                   errors.ECODE_INVAL)
405
      logging.info("Received exports query request")
406
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
407
      return self._Query(op)
408

    
409
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
410
      (fields, ) = args
411
      logging.info("Received config values query request for %s", fields)
412
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
413
      return self._Query(op)
414

    
415
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
416
      logging.info("Received cluster info query request")
417
      op = opcodes.OpClusterQuery()
418
      return self._Query(op)
419

    
420
    elif method == luxi.REQ_QUERY_TAGS:
421
      (kind, name) = args
422
      logging.info("Received tags query request")
423
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
424
      return self._Query(op)
425

    
426
    elif method == luxi.REQ_SET_DRAIN_FLAG:
427
      (drain_flag, ) = args
428
      logging.info("Received queue drain flag change request to %s",
429
                   drain_flag)
430
      return queue.SetDrainFlag(drain_flag)
431

    
432
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
433
      (until, ) = args
434

    
435
      if until is None:
436
        logging.info("Received request to no longer pause the watcher")
437
      else:
438
        if not isinstance(until, (int, float)):
439
          raise TypeError("Duration must be an integer or float")
440

    
441
        if until < time.time():
442
          raise errors.GenericError("Unable to set pause end time in the past")
443

    
444
        logging.info("Received request to pause the watcher until %s", until)
445

    
446
      return _SetWatcherPause(until)
447

    
448
    else:
449
      logging.info("Received invalid request '%s'", method)
450
      raise ValueError("Invalid operation '%s'" % method)
451

    
452
  def _Query(self, op):
453
    """Runs the specified opcode and returns the result.
454

455
    """
456
    # Queries don't have a job id
457
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
458

    
459
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
460
    # Consider using a timeout for retries.
461
    return proc.ExecOpCode(op, None)
462

    
463

    
464
class GanetiContext(object):
465
  """Context common to all ganeti threads.
466

467
  This class creates and holds common objects shared by all threads.
468

469
  """
470
  # pylint: disable=W0212
471
  # we do want to ensure a singleton here
472
  _instance = None
473

    
474
  def __init__(self):
475
    """Constructs a new GanetiContext object.
476

477
    There should be only a GanetiContext object at any time, so this
478
    function raises an error if this is not the case.
479

480
    """
481
    assert self.__class__._instance is None, "double GanetiContext instance"
482

    
483
    # Create global configuration object
484
    self.cfg = config.ConfigWriter()
485

    
486
    # Locking manager
487
    self.glm = locking.GanetiLockManager(
488
      self.cfg.GetNodeList(),
489
      self.cfg.GetNodeGroupList(),
490
      self.cfg.GetInstanceList())
491

    
492
    self.cfg.SetContext(self)
493

    
494
    # RPC runner
495
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
496

    
497
    # Job queue
498
    self.jobqueue = jqueue.JobQueue(self)
499

    
500
    # setting this also locks the class against attribute modifications
501
    self.__class__._instance = self
502

    
503
  def __setattr__(self, name, value):
504
    """Setting GanetiContext attributes is forbidden after initialization.
505

506
    """
507
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
508
    object.__setattr__(self, name, value)
509

    
510
  def AddNode(self, node, ec_id):
511
    """Adds a node to the configuration and lock manager.
512

513
    """
514
    # Add it to the configuration
515
    self.cfg.AddNode(node, ec_id)
516

    
517
    # If preseeding fails it'll not be added
518
    self.jobqueue.AddNode(node)
519

    
520
    # Add the new node to the Ganeti Lock Manager
521
    self.glm.add(locking.LEVEL_NODE, node.name)
522
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
523

    
524
  def ReaddNode(self, node):
525
    """Updates a node that's already in the configuration
526

527
    """
528
    # Synchronize the queue again
529
    self.jobqueue.AddNode(node)
530

    
531
  def RemoveNode(self, name):
532
    """Removes a node from the configuration and lock manager.
533

534
    """
535
    # Remove node from configuration
536
    self.cfg.RemoveNode(name)
537

    
538
    # Notify job queue
539
    self.jobqueue.RemoveNode(name)
540

    
541
    # Remove the node from the Ganeti Lock Manager
542
    self.glm.remove(locking.LEVEL_NODE, name)
543
    self.glm.remove(locking.LEVEL_NODE_RES, name)
544

    
545

    
546
def _SetWatcherPause(until):
547
  """Creates or removes the watcher pause file.
548

549
  @type until: None or int
550
  @param until: Unix timestamp saying until when the watcher shouldn't run
551

552
  """
553
  if until is None:
554
    utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
555
  else:
556
    utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
557
                    data="%d\n" % (until, ))
558

    
559
  return until
560

    
561

    
562
@rpc.RunWithRPC
563
def CheckAgreement():
564
  """Check the agreement on who is the master.
565

566
  The function uses a very simple algorithm: we must get more positive
567
  than negative answers. Since in most of the cases we are the master,
568
  we'll use our own config file for getting the node list. In the
569
  future we could collect the current node list from our (possibly
570
  obsolete) known nodes.
571

572
  In order to account for cold-start of all nodes, we retry for up to
573
  a minute until we get a real answer as the top-voted one. If the
574
  nodes are more out-of-sync, for now manual startup of the master
575
  should be attempted.
576

577
  Note that for a even number of nodes cluster, we need at least half
578
  of the nodes (beside ourselves) to vote for us. This creates a
579
  problem on two-node clusters, since in this case we require the
580
  other node to be up too to confirm our status.
581

582
  """
583
  myself = netutils.Hostname.GetSysName()
584
  #temp instantiation of a config writer, used only to get the node list
585
  cfg = config.ConfigWriter()
586
  node_list = cfg.GetNodeList()
587
  del cfg
588
  retries = 6
589
  while retries > 0:
590
    votes = bootstrap.GatherMasterVotes(node_list)
591
    if not votes:
592
      # empty node list, this is a one node cluster
593
      return True
594
    if votes[0][0] is None:
595
      retries -= 1
596
      time.sleep(10)
597
      continue
598
    break
599
  if retries == 0:
600
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
601
                     " after multiple retries. Aborting startup")
602
    logging.critical("Use the --no-voting option if you understand what"
603
                     " effects it has on the cluster state")
604
    return False
605
  # here a real node is at the top of the list
606
  all_votes = sum(item[1] for item in votes)
607
  top_node, top_votes = votes[0]
608

    
609
  result = False
610
  if top_node != myself:
611
    logging.critical("It seems we are not the master (top-voted node"
612
                     " is %s with %d out of %d votes)", top_node, top_votes,
613
                     all_votes)
614
  elif top_votes < all_votes - top_votes:
615
    logging.critical("It seems we are not the master (%d votes for,"
616
                     " %d votes against)", top_votes, all_votes - top_votes)
617
  else:
618
    result = True
619

    
620
  return result
621

    
622

    
623
@rpc.RunWithRPC
624
def ActivateMasterIP():
625
  # activate ip
626
  cfg = config.ConfigWriter()
627
  master_params = cfg.GetMasterNetworkParameters()
628
  ems = cfg.GetUseExternalMipScript()
629
  runner = rpc.BootstrapRunner()
630
  result = runner.call_node_activate_master_ip(master_params.name,
631
                                               master_params, ems)
632

    
633
  msg = result.fail_msg
634
  if msg:
635
    logging.error("Can't activate master IP address: %s", msg)
636

    
637

    
638
def CheckMasterd(options, args):
639
  """Initial checks whether to run or exit with a failure.
640

641
  """
642
  if args: # masterd doesn't take any arguments
643
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
644
    sys.exit(constants.EXIT_FAILURE)
645

    
646
  ssconf.CheckMaster(options.debug)
647

    
648
  try:
649
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
650
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
651
  except KeyError:
652
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
653
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
654
    sys.exit(constants.EXIT_FAILURE)
655

    
656
  # Determine static runtime architecture information
657
  runtime.InitArchInfo()
658

    
659
  # Check the configuration is sane before anything else
660
  try:
661
    config.ConfigWriter()
662
  except errors.ConfigVersionMismatch, err:
663
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
664
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
665
    print >> sys.stderr,  \
666
        ("Configuration version mismatch. The current Ganeti software"
667
         " expects version %s, but the on-disk configuration file has"
668
         " version %s. This is likely the result of upgrading the"
669
         " software without running the upgrade procedure. Please contact"
670
         " your cluster administrator or complete the upgrade using the"
671
         " cfgupgrade utility, after reading the upgrade notes." %
672
         (v1, v2))
673
    sys.exit(constants.EXIT_FAILURE)
674
  except errors.ConfigurationError, err:
675
    print >> sys.stderr, \
676
        ("Configuration error while opening the configuration file: %s\n"
677
         "This might be caused by an incomplete software upgrade or"
678
         " by a corrupted configuration file. Until the problem is fixed"
679
         " the master daemon cannot start." % str(err))
680
    sys.exit(constants.EXIT_FAILURE)
681

    
682
  # If CheckMaster didn't fail we believe we are the master, but we have to
683
  # confirm with the other nodes.
684
  if options.no_voting:
685
    if not options.yes_do_it:
686
      sys.stdout.write("The 'no voting' option has been selected.\n")
687
      sys.stdout.write("This is dangerous, please confirm by"
688
                       " typing uppercase 'yes': ")
689
      sys.stdout.flush()
690

    
691
      confirmation = sys.stdin.readline().strip()
692
      if confirmation != "YES":
693
        print >> sys.stderr, "Aborting."
694
        sys.exit(constants.EXIT_FAILURE)
695

    
696
  else:
697
    # CheckAgreement uses RPC and threads, hence it needs to be run in
698
    # a separate process before we call utils.Daemonize in the current
699
    # process.
700
    if not utils.RunInSeparateProcess(CheckAgreement):
701
      sys.exit(constants.EXIT_FAILURE)
702

    
703
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
704
  # separate process.
705

    
706
  # TODO: decide whether failure to activate the master IP is a fatal error
707
  utils.RunInSeparateProcess(ActivateMasterIP)
708

    
709

    
710
def PrepMasterd(options, _):
711
  """Prep master daemon function, executed with the PID file held.
712

713
  """
714
  # This is safe to do as the pid file guarantees against
715
  # concurrent execution.
716
  utils.RemoveFile(pathutils.MASTER_SOCKET)
717

    
718
  mainloop = daemon.Mainloop()
719
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
720
  return (mainloop, master)
721

    
722

    
723
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
724
  """Main master daemon function, executed with the PID file held.
725

726
  """
727
  (mainloop, master) = prep_data
728
  try:
729
    rpc.Init()
730
    try:
731
      master.setup_queue()
732
      try:
733
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
734
      finally:
735
        master.server_cleanup()
736
    finally:
737
      rpc.Shutdown()
738
  finally:
739
    utils.RemoveFile(pathutils.MASTER_SOCKET)
740

    
741
  logging.info("Clean master daemon shutdown")
742

    
743

    
744
def Main():
745
  """Main function"""
746
  parser = OptionParser(description="Ganeti master daemon",
747
                        usage="%prog [-f] [-d]",
748
                        version="%%prog (ganeti) %s" %
749
                        constants.RELEASE_VERSION)
750
  parser.add_option("--no-voting", dest="no_voting",
751
                    help="Do not check that the nodes agree on this node"
752
                    " being the master and start the daemon unconditionally",
753
                    default=False, action="store_true")
754
  parser.add_option("--yes-do-it", dest="yes_do_it",
755
                    help="Override interactive check for --no-voting",
756
                    default=False, action="store_true")
757
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
758
                     ExecMasterd, multithreaded=True)