Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 6c0a75db

History | View | Annotate | Download (24.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62

    
63

    
64
CLIENT_REQUEST_WORKERS = 16
65

    
66
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68

    
69

    
70
def _LogNewJob(status, info, ops):
71
  """Log information about a recently submitted job.
72

73
  """
74
  if status:
75
    logging.info("New job with id %s, summary: %s",
76
                 info, utils.CommaJoin(op.Summary() for op in ops))
77
  else:
78
    logging.info("Failed to submit job, reason: '%s', summary: %s",
79
                 info, utils.CommaJoin(op.Summary() for op in ops))
80

    
81

    
82
class ClientRequestWorker(workerpool.BaseWorker):
83
  # pylint: disable=W0221
84
  def RunTask(self, server, message, client):
85
    """Process the request.
86

87
    """
88
    client_ops = ClientOps(server)
89

    
90
    try:
91
      (method, args, version) = luxi.ParseRequest(message)
92
    except luxi.ProtocolError, err:
93
      logging.error("Protocol Error: %s", err)
94
      client.close_log()
95
      return
96

    
97
    success = False
98
    try:
99
      # Verify client's version if there was one in the request
100
      if version is not None and version != constants.LUXI_VERSION:
101
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102
                               (constants.LUXI_VERSION, version))
103

    
104
      result = client_ops.handle_request(method, args)
105
      success = True
106
    except errors.GenericError, err:
107
      logging.exception("Unexpected exception")
108
      success = False
109
      result = errors.EncodeException(err)
110
    except:
111
      logging.exception("Unexpected exception")
112
      err = sys.exc_info()
113
      result = "Caught exception: %s" % str(err[1])
114

    
115
    try:
116
      reply = luxi.FormatResponse(success, result)
117
      client.send_message(reply)
118
      # awake the main thread so that it can write out the data.
119
      server.awaker.signal()
120
    except: # pylint: disable=W0702
121
      logging.exception("Send error")
122
      client.close_log()
123

    
124

    
125
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126
  """Handler for master peers.
127

128
  """
129
  _MAX_UNHANDLED = 1
130

    
131
  def __init__(self, server, connected_socket, client_address, family):
132
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
133
                                                 client_address,
134
                                                 constants.LUXI_EOM,
135
                                                 family, self._MAX_UNHANDLED)
136
    self.server = server
137

    
138
  def handle_message(self, message, _):
139
    self.server.request_workers.AddTask((self.server, message, self))
140

    
141

    
142
class _MasterShutdownCheck:
143
  """Logic for master daemon shutdown.
144

145
  """
146
  #: How long to wait between checks
147
  _CHECK_INTERVAL = 5.0
148

    
149
  #: How long to wait after all jobs are done (e.g. to give clients time to
150
  #: retrieve the job status)
151
  _SHUTDOWN_LINGER = 5.0
152

    
153
  def __init__(self):
154
    """Initializes this class.
155

156
    """
157
    self._had_active_jobs = None
158
    self._linger_timeout = None
159

    
160
  def __call__(self, jq_prepare_result):
161
    """Determines if master daemon is ready for shutdown.
162

163
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164
    @rtype: None or number
165
    @return: None if master daemon is ready, timeout if the check must be
166
             repeated
167

168
    """
169
    if jq_prepare_result:
170
      # Check again shortly
171
      logging.info("Job queue has been notified for shutdown but is still"
172
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
173
      self._had_active_jobs = True
174
      return self._CHECK_INTERVAL
175

    
176
    if not self._had_active_jobs:
177
      # Can shut down as there were no active jobs on the first check
178
      return None
179

    
180
    # No jobs are running anymore, but maybe some clients want to collect some
181
    # information. Give them a short amount of time.
182
    if self._linger_timeout is None:
183
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
184

    
185
    remaining = self._linger_timeout.Remaining()
186

    
187
    logging.info("Job queue no longer busy; shutting down master daemon"
188
                 " in %s seconds", remaining)
189

    
190
    # TODO: Should the master daemon socket be closed at this point? Doing so
191
    # wouldn't affect existing connections.
192

    
193
    if remaining < 0:
194
      return None
195
    else:
196
      return remaining
197

    
198

    
199
class MasterServer(daemon.AsyncStreamServer):
200
  """Master Server.
201

202
  This is the main asynchronous master server. It handles connections to the
203
  master socket.
204

205
  """
206
  family = socket.AF_UNIX
207

    
208
  def __init__(self, address, uid, gid):
209
    """MasterServer constructor
210

211
    @param address: the unix socket address to bind the MasterServer to
212
    @param uid: The uid of the owner of the socket
213
    @param gid: The gid of the owner of the socket
214

215
    """
216
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218
    os.chmod(temp_name, 0770)
219
    os.chown(temp_name, uid, gid)
220
    os.rename(temp_name, address)
221

    
222
    self.awaker = daemon.AsyncAwaker()
223

    
224
    # We'll only start threads once we've forked.
225
    self.context = None
226
    self.request_workers = None
227

    
228
    self._shutdown_check = None
229

    
230
  def handle_connection(self, connected_socket, client_address):
231
    # TODO: add connection count and limit the number of open connections to a
232
    # maximum number to avoid breaking for lack of file descriptors or memory.
233
    MasterClientHandler(self, connected_socket, client_address, self.family)
234

    
235
  def setup_queue(self):
236
    self.context = GanetiContext()
237
    self.request_workers = workerpool.WorkerPool("ClientReq",
238
                                                 CLIENT_REQUEST_WORKERS,
239
                                                 ClientRequestWorker)
240

    
241
  def WaitForShutdown(self):
242
    """Prepares server for shutdown.
243

244
    """
245
    if self._shutdown_check is None:
246
      self._shutdown_check = _MasterShutdownCheck()
247

    
248
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249

    
250
  def server_cleanup(self):
251
    """Cleanup the server.
252

253
    This involves shutting down the processor threads and the master
254
    socket.
255

256
    """
257
    try:
258
      self.close()
259
    finally:
260
      if self.request_workers:
261
        self.request_workers.TerminateWorkers()
262
      if self.context:
263
        self.context.jobqueue.Shutdown()
264

    
265

    
266
class ClientOps:
267
  """Class holding high-level client operations."""
268
  def __init__(self, server):
269
    self.server = server
270

    
271
  def handle_request(self, method, args): # pylint: disable=R0911
272
    context = self.server.context
273
    queue = context.jobqueue
274

    
275
    # TODO: Parameter validation
276
    if not isinstance(args, (tuple, list)):
277
      logging.info("Received invalid arguments of type '%s'", type(args))
278
      raise ValueError("Invalid arguments type '%s'" % type(args))
279

    
280
    # TODO: Rewrite to not exit in each 'if/elif' branch
281

    
282
    if method == luxi.REQ_SUBMIT_JOB:
283
      logging.info("Receiving new job")
284
      (job_def, ) = args
285
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286
      job_id = queue.SubmitJob(ops)
287
      _LogNewJob(True, job_id, ops)
288
      return job_id
289

    
290
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291
      logging.info("Receiving multiple jobs")
292
      (job_defs, ) = args
293
      jobs = []
294
      for ops in job_defs:
295
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296
      job_ids = queue.SubmitManyJobs(jobs)
297
      for ((status, job_id), ops) in zip(job_ids, jobs):
298
        _LogNewJob(status, job_id, ops)
299
      return job_ids
300

    
301
    elif method == luxi.REQ_CANCEL_JOB:
302
      (job_id, ) = args
303
      logging.info("Received job cancel request for %s", job_id)
304
      return queue.CancelJob(job_id)
305

    
306
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307
      (job_id, priority) = args
308
      logging.info("Received request to change priority for job %s to %s",
309
                   job_id, priority)
310
      return queue.ChangeJobPriority(job_id, priority)
311

    
312
    elif method == luxi.REQ_ARCHIVE_JOB:
313
      (job_id, ) = args
314
      logging.info("Received job archive request for %s", job_id)
315
      return queue.ArchiveJob(job_id)
316

    
317
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318
      (age, timeout) = args
319
      logging.info("Received job autoarchive request for age %s, timeout %s",
320
                   age, timeout)
321
      return queue.AutoArchiveJobs(age, timeout)
322

    
323
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325
      logging.info("Received job poll request for %s", job_id)
326
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327
                                     prev_log_serial, timeout)
328

    
329
    elif method == luxi.REQ_QUERY:
330
      (what, fields, qfilter) = args
331

    
332
      if what in constants.QR_VIA_OP:
333
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
334
                                             qfilter=qfilter))
335
      elif what == constants.QR_LOCK:
336
        if qfilter is not None:
337
          raise errors.OpPrereqError("Lock queries can't be filtered",
338
                                     errors.ECODE_INVAL)
339
        return context.glm.QueryLocks(fields)
340
      elif what == constants.QR_JOB:
341
        return queue.QueryJobs(fields, qfilter)
342
      elif what in constants.QR_VIA_LUXI:
343
        raise NotImplementedError
344
      else:
345
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
346
                                   errors.ECODE_INVAL)
347

    
348
      return result
349

    
350
    elif method == luxi.REQ_QUERY_FIELDS:
351
      (what, fields) = args
352
      req = objects.QueryFieldsRequest(what=what, fields=fields)
353

    
354
      try:
355
        fielddefs = query.ALL_FIELDS[req.what]
356
      except KeyError:
357
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
358
                                   errors.ECODE_INVAL)
359

    
360
      return query.QueryFields(fielddefs, req.fields)
361

    
362
    elif method == luxi.REQ_QUERY_JOBS:
363
      (job_ids, fields) = args
364
      if isinstance(job_ids, (tuple, list)) and job_ids:
365
        msg = utils.CommaJoin(job_ids)
366
      else:
367
        msg = str(job_ids)
368
      logging.info("Received job query request for %s", msg)
369
      return queue.OldStyleQueryJobs(job_ids, fields)
370

    
371
    elif method == luxi.REQ_QUERY_INSTANCES:
372
      (names, fields, use_locking) = args
373
      logging.info("Received instance query request for %s", names)
374
      if use_locking:
375
        raise errors.OpPrereqError("Sync queries are not allowed",
376
                                   errors.ECODE_INVAL)
377
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378
                                   use_locking=use_locking)
379
      return self._Query(op)
380

    
381
    elif method == luxi.REQ_QUERY_NODES:
382
      (names, fields, use_locking) = args
383
      logging.info("Received node query request for %s", names)
384
      if use_locking:
385
        raise errors.OpPrereqError("Sync queries are not allowed",
386
                                   errors.ECODE_INVAL)
387
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388
                               use_locking=use_locking)
389
      return self._Query(op)
390

    
391
    elif method == luxi.REQ_QUERY_GROUPS:
392
      (names, fields, use_locking) = args
393
      logging.info("Received group query request for %s", names)
394
      if use_locking:
395
        raise errors.OpPrereqError("Sync queries are not allowed",
396
                                   errors.ECODE_INVAL)
397
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398
      return self._Query(op)
399

    
400
    elif method == luxi.REQ_QUERY_EXPORTS:
401
      (nodes, use_locking) = args
402
      if use_locking:
403
        raise errors.OpPrereqError("Sync queries are not allowed",
404
                                   errors.ECODE_INVAL)
405
      logging.info("Received exports query request")
406
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
407
      return self._Query(op)
408

    
409
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
410
      (fields, ) = args
411
      logging.info("Received config values query request for %s", fields)
412
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
413
      return self._Query(op)
414

    
415
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
416
      logging.info("Received cluster info query request")
417
      op = opcodes.OpClusterQuery()
418
      return self._Query(op)
419

    
420
    elif method == luxi.REQ_QUERY_TAGS:
421
      (kind, name) = args
422
      logging.info("Received tags query request")
423
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
424
      return self._Query(op)
425

    
426
    elif method == luxi.REQ_SET_DRAIN_FLAG:
427
      (drain_flag, ) = args
428
      logging.info("Received queue drain flag change request to %s",
429
                   drain_flag)
430
      return queue.SetDrainFlag(drain_flag)
431

    
432
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
433
      (until, ) = args
434

    
435
      if until is None:
436
        logging.info("Received request to no longer pause the watcher")
437
      else:
438
        if not isinstance(until, (int, float)):
439
          raise TypeError("Duration must be an integer or float")
440

    
441
        if until < time.time():
442
          raise errors.GenericError("Unable to set pause end time in the past")
443

    
444
        logging.info("Received request to pause the watcher until %s", until)
445

    
446
      return _SetWatcherPause(until)
447

    
448
    else:
449
      logging.info("Received invalid request '%s'", method)
450
      raise ValueError("Invalid operation '%s'" % method)
451

    
452
  def _Query(self, op):
453
    """Runs the specified opcode and returns the result.
454

455
    """
456
    # Queries don't have a job id
457
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
458

    
459
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
460
    # Consider using a timeout for retries.
461
    return proc.ExecOpCode(op, None)
462

    
463

    
464
class GanetiContext(object):
465
  """Context common to all ganeti threads.
466

467
  This class creates and holds common objects shared by all threads.
468

469
  """
470
  # pylint: disable=W0212
471
  # we do want to ensure a singleton here
472
  _instance = None
473

    
474
  def __init__(self):
475
    """Constructs a new GanetiContext object.
476

477
    There should be only a GanetiContext object at any time, so this
478
    function raises an error if this is not the case.
479

480
    """
481
    assert self.__class__._instance is None, "double GanetiContext instance"
482

    
483
    # Create global configuration object
484
    self.cfg = config.ConfigWriter()
485

    
486
    # Locking manager
487
    self.glm = locking.GanetiLockManager(
488
      self.cfg.GetNodeList(),
489
      self.cfg.GetNodeGroupList(),
490
      self.cfg.GetInstanceList(),
491
      self.cfg.GetNetworkList())
492

    
493
    self.cfg.SetContext(self)
494

    
495
    # RPC runner
496
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
497

    
498
    # Job queue
499
    self.jobqueue = jqueue.JobQueue(self)
500

    
501
    # setting this also locks the class against attribute modifications
502
    self.__class__._instance = self
503

    
504
  def __setattr__(self, name, value):
505
    """Setting GanetiContext attributes is forbidden after initialization.
506

507
    """
508
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
509
    object.__setattr__(self, name, value)
510

    
511
  def AddNode(self, node, ec_id):
512
    """Adds a node to the configuration and lock manager.
513

514
    """
515
    # Add it to the configuration
516
    self.cfg.AddNode(node, ec_id)
517

    
518
    # If preseeding fails it'll not be added
519
    self.jobqueue.AddNode(node)
520

    
521
    # Add the new node to the Ganeti Lock Manager
522
    self.glm.add(locking.LEVEL_NODE, node.name)
523
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
524

    
525
  def ReaddNode(self, node):
526
    """Updates a node that's already in the configuration
527

528
    """
529
    # Synchronize the queue again
530
    self.jobqueue.AddNode(node)
531

    
532
  def RemoveNode(self, name):
533
    """Removes a node from the configuration and lock manager.
534

535
    """
536
    # Remove node from configuration
537
    self.cfg.RemoveNode(name)
538

    
539
    # Notify job queue
540
    self.jobqueue.RemoveNode(name)
541

    
542
    # Remove the node from the Ganeti Lock Manager
543
    self.glm.remove(locking.LEVEL_NODE, name)
544
    self.glm.remove(locking.LEVEL_NODE_RES, name)
545

    
546

    
547
def _SetWatcherPause(until):
548
  """Creates or removes the watcher pause file.
549

550
  @type until: None or int
551
  @param until: Unix timestamp saying until when the watcher shouldn't run
552

553
  """
554
  if until is None:
555
    utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
556
  else:
557
    utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
558
                    data="%d\n" % (until, ))
559

    
560
  return until
561

    
562

    
563
@rpc.RunWithRPC
564
def CheckAgreement():
565
  """Check the agreement on who is the master.
566

567
  The function uses a very simple algorithm: we must get more positive
568
  than negative answers. Since in most of the cases we are the master,
569
  we'll use our own config file for getting the node list. In the
570
  future we could collect the current node list from our (possibly
571
  obsolete) known nodes.
572

573
  In order to account for cold-start of all nodes, we retry for up to
574
  a minute until we get a real answer as the top-voted one. If the
575
  nodes are more out-of-sync, for now manual startup of the master
576
  should be attempted.
577

578
  Note that for a even number of nodes cluster, we need at least half
579
  of the nodes (beside ourselves) to vote for us. This creates a
580
  problem on two-node clusters, since in this case we require the
581
  other node to be up too to confirm our status.
582

583
  """
584
  myself = netutils.Hostname.GetSysName()
585
  #temp instantiation of a config writer, used only to get the node list
586
  cfg = config.ConfigWriter()
587
  node_list = cfg.GetNodeList()
588
  del cfg
589
  retries = 6
590
  while retries > 0:
591
    votes = bootstrap.GatherMasterVotes(node_list)
592
    if not votes:
593
      # empty node list, this is a one node cluster
594
      return True
595
    if votes[0][0] is None:
596
      retries -= 1
597
      time.sleep(10)
598
      continue
599
    break
600
  if retries == 0:
601
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
602
                     " after multiple retries. Aborting startup")
603
    logging.critical("Use the --no-voting option if you understand what"
604
                     " effects it has on the cluster state")
605
    return False
606
  # here a real node is at the top of the list
607
  all_votes = sum(item[1] for item in votes)
608
  top_node, top_votes = votes[0]
609

    
610
  result = False
611
  if top_node != myself:
612
    logging.critical("It seems we are not the master (top-voted node"
613
                     " is %s with %d out of %d votes)", top_node, top_votes,
614
                     all_votes)
615
  elif top_votes < all_votes - top_votes:
616
    logging.critical("It seems we are not the master (%d votes for,"
617
                     " %d votes against)", top_votes, all_votes - top_votes)
618
  else:
619
    result = True
620

    
621
  return result
622

    
623

    
624
@rpc.RunWithRPC
625
def ActivateMasterIP():
626
  # activate ip
627
  cfg = config.ConfigWriter()
628
  master_params = cfg.GetMasterNetworkParameters()
629
  ems = cfg.GetUseExternalMipScript()
630
  runner = rpc.BootstrapRunner()
631
  result = runner.call_node_activate_master_ip(master_params.name,
632
                                               master_params, ems)
633

    
634
  msg = result.fail_msg
635
  if msg:
636
    logging.error("Can't activate master IP address: %s", msg)
637

    
638

    
639
def CheckMasterd(options, args):
640
  """Initial checks whether to run or exit with a failure.
641

642
  """
643
  if args: # masterd doesn't take any arguments
644
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
645
    sys.exit(constants.EXIT_FAILURE)
646

    
647
  ssconf.CheckMaster(options.debug)
648

    
649
  try:
650
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
651
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
652
  except KeyError:
653
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
654
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
655
    sys.exit(constants.EXIT_FAILURE)
656

    
657
  # Determine static runtime architecture information
658
  runtime.InitArchInfo()
659

    
660
  # Check the configuration is sane before anything else
661
  try:
662
    config.ConfigWriter()
663
  except errors.ConfigVersionMismatch, err:
664
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
665
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
666
    print >> sys.stderr,  \
667
        ("Configuration version mismatch. The current Ganeti software"
668
         " expects version %s, but the on-disk configuration file has"
669
         " version %s. This is likely the result of upgrading the"
670
         " software without running the upgrade procedure. Please contact"
671
         " your cluster administrator or complete the upgrade using the"
672
         " cfgupgrade utility, after reading the upgrade notes." %
673
         (v1, v2))
674
    sys.exit(constants.EXIT_FAILURE)
675
  except errors.ConfigurationError, err:
676
    print >> sys.stderr, \
677
        ("Configuration error while opening the configuration file: %s\n"
678
         "This might be caused by an incomplete software upgrade or"
679
         " by a corrupted configuration file. Until the problem is fixed"
680
         " the master daemon cannot start." % str(err))
681
    sys.exit(constants.EXIT_FAILURE)
682

    
683
  # If CheckMaster didn't fail we believe we are the master, but we have to
684
  # confirm with the other nodes.
685
  if options.no_voting:
686
    if not options.yes_do_it:
687
      sys.stdout.write("The 'no voting' option has been selected.\n")
688
      sys.stdout.write("This is dangerous, please confirm by"
689
                       " typing uppercase 'yes': ")
690
      sys.stdout.flush()
691

    
692
      confirmation = sys.stdin.readline().strip()
693
      if confirmation != "YES":
694
        print >> sys.stderr, "Aborting."
695
        sys.exit(constants.EXIT_FAILURE)
696

    
697
  else:
698
    # CheckAgreement uses RPC and threads, hence it needs to be run in
699
    # a separate process before we call utils.Daemonize in the current
700
    # process.
701
    if not utils.RunInSeparateProcess(CheckAgreement):
702
      sys.exit(constants.EXIT_FAILURE)
703

    
704
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
705
  # separate process.
706

    
707
  # TODO: decide whether failure to activate the master IP is a fatal error
708
  utils.RunInSeparateProcess(ActivateMasterIP)
709

    
710

    
711
def PrepMasterd(options, _):
712
  """Prep master daemon function, executed with the PID file held.
713

714
  """
715
  # This is safe to do as the pid file guarantees against
716
  # concurrent execution.
717
  utils.RemoveFile(pathutils.MASTER_SOCKET)
718

    
719
  mainloop = daemon.Mainloop()
720
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
721
  return (mainloop, master)
722

    
723

    
724
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
725
  """Main master daemon function, executed with the PID file held.
726

727
  """
728
  (mainloop, master) = prep_data
729
  try:
730
    rpc.Init()
731
    try:
732
      master.setup_queue()
733
      try:
734
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
735
      finally:
736
        master.server_cleanup()
737
    finally:
738
      rpc.Shutdown()
739
  finally:
740
    utils.RemoveFile(pathutils.MASTER_SOCKET)
741

    
742
  logging.info("Clean master daemon shutdown")
743

    
744

    
745
def Main():
746
  """Main function"""
747
  parser = OptionParser(description="Ganeti master daemon",
748
                        usage="%prog [-f] [-d]",
749
                        version="%%prog (ganeti) %s" %
750
                        constants.RELEASE_VERSION)
751
  parser.add_option("--no-voting", dest="no_voting",
752
                    help="Do not check that the nodes agree on this node"
753
                    " being the master and start the daemon unconditionally",
754
                    default=False, action="store_true")
755
  parser.add_option("--yes-do-it", dest="yes_do_it",
756
                    help="Override interactive check for --no-voting",
757
                    default=False, action="store_true")
758
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
759
                     ExecMasterd, multithreaded=True)