Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 1de2b387

History | View | Annotate | Download (24.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
import ganeti.rpc.errors as rpcerr
52
from ganeti import utils
53
from ganeti import errors
54
from ganeti import ssconf
55
from ganeti import workerpool
56
import ganeti.rpc.node as rpc
57
import ganeti.rpc.client as rpccl
58
from ganeti import bootstrap
59
from ganeti import netutils
60
from ganeti import objects
61
from ganeti import query
62
from ganeti import runtime
63
from ganeti import pathutils
64
from ganeti import ht
65

    
66
from ganeti.utils import version
67

    
68

    
69
CLIENT_REQUEST_WORKERS = 16
70

    
71
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
72
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
73

    
74

    
75
def _LogNewJob(status, info, ops):
76
  """Log information about a recently submitted job.
77

78
  """
79
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
80

    
81
  if status:
82
    logging.info("New job with id %s, summary: %s", info, op_summary)
83
  else:
84
    logging.info("Failed to submit job, reason: '%s', summary: %s",
85
                 info, op_summary)
86

    
87

    
88
class ClientRequestWorker(workerpool.BaseWorker):
89
  # pylint: disable=W0221
90
  def RunTask(self, server, message, client):
91
    """Process the request.
92

93
    """
94
    client_ops = ClientOps(server)
95

    
96
    try:
97
      (method, args, ver) = rpccl.ParseRequest(message)
98
    except rpcerr.ProtocolError, err:
99
      logging.error("Protocol Error: %s", err)
100
      client.close_log()
101
      return
102

    
103
    success = False
104
    try:
105
      # Verify client's version if there was one in the request
106
      if ver is not None and ver != constants.LUXI_VERSION:
107
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
108
                               (constants.LUXI_VERSION, ver))
109

    
110
      result = client_ops.handle_request(method, args)
111
      success = True
112
    except errors.GenericError, err:
113
      logging.exception("Unexpected exception")
114
      success = False
115
      result = errors.EncodeException(err)
116
    except:
117
      logging.exception("Unexpected exception")
118
      err = sys.exc_info()
119
      result = "Caught exception: %s" % str(err[1])
120

    
121
    try:
122
      reply = rpccl.FormatResponse(success, result)
123
      client.send_message(reply)
124
      # awake the main thread so that it can write out the data.
125
      server.awaker.signal()
126
    except: # pylint: disable=W0702
127
      logging.exception("Send error")
128
      client.close_log()
129

    
130

    
131
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
132
  """Handler for master peers.
133

134
  """
135
  _MAX_UNHANDLED = 1
136

    
137
  def __init__(self, server, connected_socket, client_address, family):
138
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
139
                                                 client_address,
140
                                                 constants.LUXI_EOM,
141
                                                 family, self._MAX_UNHANDLED)
142
    self.server = server
143

    
144
  def handle_message(self, message, _):
145
    self.server.request_workers.AddTask((self.server, message, self))
146

    
147

    
148
class _MasterShutdownCheck:
149
  """Logic for master daemon shutdown.
150

151
  """
152
  #: How long to wait between checks
153
  _CHECK_INTERVAL = 5.0
154

    
155
  #: How long to wait after all jobs are done (e.g. to give clients time to
156
  #: retrieve the job status)
157
  _SHUTDOWN_LINGER = 5.0
158

    
159
  def __init__(self):
160
    """Initializes this class.
161

162
    """
163
    self._had_active_jobs = None
164
    self._linger_timeout = None
165

    
166
  def __call__(self, jq_prepare_result):
167
    """Determines if master daemon is ready for shutdown.
168

169
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
170
    @rtype: None or number
171
    @return: None if master daemon is ready, timeout if the check must be
172
             repeated
173

174
    """
175
    if jq_prepare_result:
176
      # Check again shortly
177
      logging.info("Job queue has been notified for shutdown but is still"
178
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
179
      self._had_active_jobs = True
180
      return self._CHECK_INTERVAL
181

    
182
    if not self._had_active_jobs:
183
      # Can shut down as there were no active jobs on the first check
184
      return None
185

    
186
    # No jobs are running anymore, but maybe some clients want to collect some
187
    # information. Give them a short amount of time.
188
    if self._linger_timeout is None:
189
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
190

    
191
    remaining = self._linger_timeout.Remaining()
192

    
193
    logging.info("Job queue no longer busy; shutting down master daemon"
194
                 " in %s seconds", remaining)
195

    
196
    # TODO: Should the master daemon socket be closed at this point? Doing so
197
    # wouldn't affect existing connections.
198

    
199
    if remaining < 0:
200
      return None
201
    else:
202
      return remaining
203

    
204

    
205
class MasterServer(daemon.AsyncStreamServer):
206
  """Master Server.
207

208
  This is the main asynchronous master server. It handles connections to the
209
  master socket.
210

211
  """
212
  family = socket.AF_UNIX
213

    
214
  def __init__(self, address, uid, gid):
215
    """MasterServer constructor
216

217
    @param address: the unix socket address to bind the MasterServer to
218
    @param uid: The uid of the owner of the socket
219
    @param gid: The gid of the owner of the socket
220

221
    """
222
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
223
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
224
    os.chmod(temp_name, 0770)
225
    os.chown(temp_name, uid, gid)
226
    os.rename(temp_name, address)
227

    
228
    self.awaker = daemon.AsyncAwaker()
229

    
230
    # We'll only start threads once we've forked.
231
    self.context = None
232
    self.request_workers = None
233

    
234
    self._shutdown_check = None
235

    
236
  def handle_connection(self, connected_socket, client_address):
237
    # TODO: add connection count and limit the number of open connections to a
238
    # maximum number to avoid breaking for lack of file descriptors or memory.
239
    MasterClientHandler(self, connected_socket, client_address, self.family)
240

    
241
  def setup_context(self):
242
    self.context = GanetiContext()
243
    self.request_workers = workerpool.WorkerPool("ClientReq",
244
                                                 CLIENT_REQUEST_WORKERS,
245
                                                 ClientRequestWorker)
246

    
247
  def WaitForShutdown(self):
248
    """Prepares server for shutdown.
249

250
    """
251
    if self._shutdown_check is None:
252
      self._shutdown_check = _MasterShutdownCheck()
253

    
254
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
255

    
256
  def server_cleanup(self):
257
    """Cleanup the server.
258

259
    This involves shutting down the processor threads and the master
260
    socket.
261

262
    """
263
    try:
264
      self.close()
265
    finally:
266
      if self.request_workers:
267
        self.request_workers.TerminateWorkers()
268
      if self.context:
269
        self.context.jobqueue.Shutdown()
270

    
271

    
272
class ClientOps:
273
  """Class holding high-level client operations."""
274
  def __init__(self, server):
275
    self.server = server
276

    
277
  def handle_request(self, method, args): # pylint: disable=R0911
278
    context = self.server.context
279
    queue = context.jobqueue
280

    
281
    # TODO: Parameter validation
282
    if not isinstance(args, (tuple, list)):
283
      logging.info("Received invalid arguments of type '%s'", type(args))
284
      raise ValueError("Invalid arguments type '%s'" % type(args))
285

    
286
    if method not in luxi.REQ_ALL:
287
      logging.info("Received invalid request '%s'", method)
288
      raise ValueError("Invalid operation '%s'" % method)
289

    
290
    # TODO: Rewrite to not exit in each 'if/elif' branch
291

    
292
    if method == luxi.REQ_SUBMIT_JOB:
293
      logging.info("Receiving new job")
294
      (job_def, ) = args
295
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
296
      job_id = queue.SubmitJob(ops)
297
      _LogNewJob(True, job_id, ops)
298
      return job_id
299

    
300
    elif method == luxi.REQ_PICKUP_JOB:
301
      logging.info("Picking up new job from queue")
302
      (job_id, ) = args
303
      queue.PickupJob(job_id)
304

    
305
    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
306
      logging.info("Forcefully receiving new job")
307
      (job_def, ) = args
308
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
309
      job_id = queue.SubmitJobToDrainedQueue(ops)
310
      _LogNewJob(True, job_id, ops)
311
      return job_id
312

    
313
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
314
      logging.info("Receiving multiple jobs")
315
      (job_defs, ) = args
316
      jobs = []
317
      for ops in job_defs:
318
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
319
      job_ids = queue.SubmitManyJobs(jobs)
320
      for ((status, job_id), ops) in zip(job_ids, jobs):
321
        _LogNewJob(status, job_id, ops)
322
      return job_ids
323

    
324
    elif method == luxi.REQ_CANCEL_JOB:
325
      (job_id, ) = args
326
      logging.info("Received job cancel request for %s", job_id)
327
      return queue.CancelJob(job_id)
328

    
329
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
330
      (job_id, priority) = args
331
      logging.info("Received request to change priority for job %s to %s",
332
                   job_id, priority)
333
      return queue.ChangeJobPriority(job_id, priority)
334

    
335
    elif method == luxi.REQ_ARCHIVE_JOB:
336
      (job_id, ) = args
337
      logging.info("Received job archive request for %s", job_id)
338
      return queue.ArchiveJob(job_id)
339

    
340
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
341
      (age, timeout) = args
342
      logging.info("Received job autoarchive request for age %s, timeout %s",
343
                   age, timeout)
344
      return queue.AutoArchiveJobs(age, timeout)
345

    
346
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
347
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
348
      logging.info("Received job poll request for %s", job_id)
349
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
350
                                     prev_log_serial, timeout)
351

    
352
    elif method == luxi.REQ_QUERY:
353
      (what, fields, qfilter) = args
354

    
355
      if what in constants.QR_VIA_OP:
356
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
357
                                             qfilter=qfilter))
358
      elif what == constants.QR_LOCK:
359
        if qfilter is not None:
360
          raise errors.OpPrereqError("Lock queries can't be filtered",
361
                                     errors.ECODE_INVAL)
362
        return context.glm.QueryLocks(fields)
363
      elif what == constants.QR_JOB:
364
        return queue.QueryJobs(fields, qfilter)
365
      elif what in constants.QR_VIA_LUXI:
366
        luxi_client = runtime.GetClient()
367
        result = luxi_client.Query(what, fields, qfilter).ToDict()
368
      else:
369
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
370
                                   errors.ECODE_INVAL)
371

    
372
      return result
373

    
374
    elif method == luxi.REQ_QUERY_FIELDS:
375
      (what, fields) = args
376
      req = objects.QueryFieldsRequest(what=what, fields=fields)
377

    
378
      try:
379
        fielddefs = query.ALL_FIELDS[req.what]
380
      except KeyError:
381
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
382
                                   errors.ECODE_INVAL)
383

    
384
      return query.QueryFields(fielddefs, req.fields)
385

    
386
    elif method == luxi.REQ_QUERY_JOBS:
387
      (job_ids, fields) = args
388
      if isinstance(job_ids, (tuple, list)) and job_ids:
389
        msg = utils.CommaJoin(job_ids)
390
      else:
391
        msg = str(job_ids)
392
      logging.info("Received job query request for %s", msg)
393
      return queue.OldStyleQueryJobs(job_ids, fields)
394

    
395
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
396
      (fields, ) = args
397
      logging.info("Received config values query request for %s", fields)
398
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
399
      return self._Query(op)
400

    
401
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
402
      logging.info("Received cluster info query request")
403
      op = opcodes.OpClusterQuery()
404
      return self._Query(op)
405

    
406
    elif method == luxi.REQ_QUERY_TAGS:
407
      (kind, name) = args
408
      logging.info("Received tags query request")
409
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
410
      return self._Query(op)
411

    
412
    elif method == luxi.REQ_SET_DRAIN_FLAG:
413
      (drain_flag, ) = args
414
      logging.info("Received queue drain flag change request to %s",
415
                   drain_flag)
416
      return queue.SetDrainFlag(drain_flag)
417

    
418
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
419
      (until, ) = args
420

    
421
      return _SetWatcherPause(context, until)
422

    
423
    else:
424
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
425
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
426
                                   " but not implemented" % method)
427

    
428
  def _Query(self, op):
429
    """Runs the specified opcode and returns the result.
430

431
    """
432
    # Queries don't have a job id
433
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
434

    
435
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
436
    # Consider using a timeout for retries.
437
    return proc.ExecOpCode(op, None)
438

    
439

    
440
class GanetiContext(object):
441
  """Context common to all ganeti threads.
442

443
  This class creates and holds common objects shared by all threads.
444

445
  """
446
  # pylint: disable=W0212
447
  # we do want to ensure a singleton here
448
  _instance = None
449

    
450
  def __init__(self):
451
    """Constructs a new GanetiContext object.
452

453
    There should be only a GanetiContext object at any time, so this
454
    function raises an error if this is not the case.
455

456
    """
457
    assert self.__class__._instance is None, "double GanetiContext instance"
458

    
459
    # Create global configuration object
460
    self.cfg = config.ConfigWriter()
461

    
462
    # Locking manager
463
    self.glm = locking.GanetiLockManager(
464
      self.cfg.GetNodeList(),
465
      self.cfg.GetNodeGroupList(),
466
      [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
467
      self.cfg.GetNetworkList())
468

    
469
    self.cfg.SetContext(self)
470

    
471
    # RPC runner
472
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
473

    
474
    # Job queue
475
    self.jobqueue = jqueue.JobQueue(self)
476

    
477
    # setting this also locks the class against attribute modifications
478
    self.__class__._instance = self
479

    
480
  def __setattr__(self, name, value):
481
    """Setting GanetiContext attributes is forbidden after initialization.
482

483
    """
484
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
485
    object.__setattr__(self, name, value)
486

    
487
  def AddNode(self, node, ec_id):
488
    """Adds a node to the configuration and lock manager.
489

490
    """
491
    # Add it to the configuration
492
    self.cfg.AddNode(node, ec_id)
493

    
494
    # If preseeding fails it'll not be added
495
    self.jobqueue.AddNode(node)
496

    
497
    # Add the new node to the Ganeti Lock Manager
498
    self.glm.add(locking.LEVEL_NODE, node.uuid)
499
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
500

    
501
  def ReaddNode(self, node):
502
    """Updates a node that's already in the configuration
503

504
    """
505
    # Synchronize the queue again
506
    self.jobqueue.AddNode(node)
507

    
508
  def RemoveNode(self, node):
509
    """Removes a node from the configuration and lock manager.
510

511
    """
512
    # Remove node from configuration
513
    self.cfg.RemoveNode(node.uuid)
514

    
515
    # Notify job queue
516
    self.jobqueue.RemoveNode(node.name)
517

    
518
    # Remove the node from the Ganeti Lock Manager
519
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
520
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
521

    
522

    
523
def _SetWatcherPause(context, until):
524
  """Creates or removes the watcher pause file.
525

526
  @type context: L{GanetiContext}
527
  @param context: Global Ganeti context
528
  @type until: None or int
529
  @param until: Unix timestamp saying until when the watcher shouldn't run
530

531
  """
532
  node_names = context.cfg.GetNodeList()
533

    
534
  if until is None:
535
    logging.info("Received request to no longer pause watcher")
536
  else:
537
    if not ht.TNumber(until):
538
      raise TypeError("Duration must be numeric")
539

    
540
    if until < time.time():
541
      raise errors.GenericError("Unable to set pause end time in the past")
542

    
543
    logging.info("Received request to pause watcher until %s", until)
544

    
545
  result = context.rpc.call_set_watcher_pause(node_names, until)
546

    
547
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
548
                           for (node_name, nres) in result.items()
549
                           if nres.fail_msg and not nres.offline)
550
  if errmsg:
551
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
552
                             " on the following node(s): %s" % errmsg)
553

    
554
  return until
555

    
556

    
557
@rpc.RunWithRPC
558
def CheckAgreement():
559
  """Check the agreement on who is the master.
560

561
  The function uses a very simple algorithm: we must get more positive
562
  than negative answers. Since in most of the cases we are the master,
563
  we'll use our own config file for getting the node list. In the
564
  future we could collect the current node list from our (possibly
565
  obsolete) known nodes.
566

567
  In order to account for cold-start of all nodes, we retry for up to
568
  a minute until we get a real answer as the top-voted one. If the
569
  nodes are more out-of-sync, for now manual startup of the master
570
  should be attempted.
571

572
  Note that for a even number of nodes cluster, we need at least half
573
  of the nodes (beside ourselves) to vote for us. This creates a
574
  problem on two-node clusters, since in this case we require the
575
  other node to be up too to confirm our status.
576

577
  """
578
  myself = netutils.Hostname.GetSysName()
579
  #temp instantiation of a config writer, used only to get the node list
580
  cfg = config.ConfigWriter()
581
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
582
  del cfg
583
  retries = 6
584
  while retries > 0:
585
    votes = bootstrap.GatherMasterVotes(node_names)
586
    if not votes:
587
      # empty node list, this is a one node cluster
588
      return True
589
    if votes[0][0] is None:
590
      retries -= 1
591
      time.sleep(10)
592
      continue
593
    break
594
  if retries == 0:
595
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
596
                     " after multiple retries. Aborting startup")
597
    logging.critical("Use the --no-voting option if you understand what"
598
                     " effects it has on the cluster state")
599
    return False
600
  # here a real node is at the top of the list
601
  all_votes = sum(item[1] for item in votes)
602
  top_node, top_votes = votes[0]
603

    
604
  result = False
605
  if top_node != myself:
606
    logging.critical("It seems we are not the master (top-voted node"
607
                     " is %s with %d out of %d votes)", top_node, top_votes,
608
                     all_votes)
609
  elif top_votes < all_votes - top_votes:
610
    logging.critical("It seems we are not the master (%d votes for,"
611
                     " %d votes against)", top_votes, all_votes - top_votes)
612
  else:
613
    result = True
614

    
615
  return result
616

    
617

    
618
@rpc.RunWithRPC
619
def ActivateMasterIP():
620
  # activate ip
621
  cfg = config.ConfigWriter()
622
  master_params = cfg.GetMasterNetworkParameters()
623
  ems = cfg.GetUseExternalMipScript()
624
  runner = rpc.BootstrapRunner()
625
  # we use the node name, as the configuration is only available here yet
626
  result = runner.call_node_activate_master_ip(
627
             cfg.GetNodeName(master_params.uuid), master_params, ems)
628

    
629
  msg = result.fail_msg
630
  if msg:
631
    logging.error("Can't activate master IP address: %s", msg)
632

    
633

    
634
def CheckMasterd(options, args):
635
  """Initial checks whether to run or exit with a failure.
636

637
  """
638
  if args: # masterd doesn't take any arguments
639
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
640
    sys.exit(constants.EXIT_FAILURE)
641

    
642
  ssconf.CheckMaster(options.debug)
643

    
644
  try:
645
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
646
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
647
  except KeyError:
648
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
649
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
650
    sys.exit(constants.EXIT_FAILURE)
651

    
652
  # Determine static runtime architecture information
653
  runtime.InitArchInfo()
654

    
655
  # Check the configuration is sane before anything else
656
  try:
657
    config.ConfigWriter()
658
  except errors.ConfigVersionMismatch, err:
659
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
660
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
661
    print >> sys.stderr,  \
662
        ("Configuration version mismatch. The current Ganeti software"
663
         " expects version %s, but the on-disk configuration file has"
664
         " version %s. This is likely the result of upgrading the"
665
         " software without running the upgrade procedure. Please contact"
666
         " your cluster administrator or complete the upgrade using the"
667
         " cfgupgrade utility, after reading the upgrade notes." %
668
         (v1, v2))
669
    sys.exit(constants.EXIT_FAILURE)
670
  except errors.ConfigurationError, err:
671
    print >> sys.stderr, \
672
        ("Configuration error while opening the configuration file: %s\n"
673
         "This might be caused by an incomplete software upgrade or"
674
         " by a corrupted configuration file. Until the problem is fixed"
675
         " the master daemon cannot start." % str(err))
676
    sys.exit(constants.EXIT_FAILURE)
677

    
678
  # If CheckMaster didn't fail we believe we are the master, but we have to
679
  # confirm with the other nodes.
680
  if options.no_voting:
681
    if not options.yes_do_it:
682
      sys.stdout.write("The 'no voting' option has been selected.\n")
683
      sys.stdout.write("This is dangerous, please confirm by"
684
                       " typing uppercase 'yes': ")
685
      sys.stdout.flush()
686

    
687
      confirmation = sys.stdin.readline().strip()
688
      if confirmation != "YES":
689
        print >> sys.stderr, "Aborting."
690
        sys.exit(constants.EXIT_FAILURE)
691

    
692
  else:
693
    # CheckAgreement uses RPC and threads, hence it needs to be run in
694
    # a separate process before we call utils.Daemonize in the current
695
    # process.
696
    if not utils.RunInSeparateProcess(CheckAgreement):
697
      sys.exit(constants.EXIT_FAILURE)
698

    
699
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
700
  # separate process.
701

    
702
  # TODO: decide whether failure to activate the master IP is a fatal error
703
  utils.RunInSeparateProcess(ActivateMasterIP)
704

    
705

    
706
def PrepMasterd(options, _):
707
  """Prep master daemon function, executed with the PID file held.
708

709
  """
710
  # This is safe to do as the pid file guarantees against
711
  # concurrent execution.
712
  utils.RemoveFile(pathutils.MASTER_SOCKET)
713

    
714
  mainloop = daemon.Mainloop()
715
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
716
  return (mainloop, master)
717

    
718

    
719
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
720
  """Main master daemon function, executed with the PID file held.
721

722
  """
723
  (mainloop, master) = prep_data
724
  try:
725
    rpc.Init()
726
    try:
727
      master.setup_context()
728
      try:
729
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
730
      finally:
731
        master.server_cleanup()
732
    finally:
733
      rpc.Shutdown()
734
  finally:
735
    utils.RemoveFile(pathutils.MASTER_SOCKET)
736

    
737
  logging.info("Clean master daemon shutdown")
738

    
739

    
740
def Main():
741
  """Main function"""
742
  parser = OptionParser(description="Ganeti master daemon",
743
                        usage="%prog [-f] [-d]",
744
                        version="%%prog (ganeti) %s" %
745
                        constants.RELEASE_VERSION)
746
  parser.add_option("--no-voting", dest="no_voting",
747
                    help="Do not check that the nodes agree on this node"
748
                    " being the master and start the daemon unconditionally",
749
                    default=False, action="store_true")
750
  parser.add_option("--yes-do-it", dest="yes_do_it",
751
                    help="Override interactive check for --no-voting",
752
                    default=False, action="store_true")
753
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
754
                     ExecMasterd, multithreaded=True)