Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 53eea5bb

History | View | Annotate | Download (24.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
import ganeti.rpc.errors as rpcerr
52
from ganeti import utils
53
from ganeti import errors
54
from ganeti import ssconf
55
from ganeti import workerpool
56
import ganeti.rpc.node as rpc
57
import ganeti.rpc.client as rpccl
58
from ganeti import bootstrap
59
from ganeti import netutils
60
from ganeti import objects
61
from ganeti import query
62
from ganeti import runtime
63
from ganeti import pathutils
64
from ganeti import ht
65

    
66
from ganeti.utils import version
67

    
68

    
69
CLIENT_REQUEST_WORKERS = 16
70

    
71
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
72
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
73

    
74

    
75
def _LogNewJob(status, info, ops):
76
  """Log information about a recently submitted job.
77

78
  """
79
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
80

    
81
  if status:
82
    logging.info("New job with id %s, summary: %s", info, op_summary)
83
  else:
84
    logging.info("Failed to submit job, reason: '%s', summary: %s",
85
                 info, op_summary)
86

    
87

    
88
class ClientRequestWorker(workerpool.BaseWorker):
89
  # pylint: disable=W0221
90
  def RunTask(self, server, message, client):
91
    """Process the request.
92

93
    """
94
    client_ops = ClientOps(server)
95

    
96
    try:
97
      (method, args, ver) = rpccl.ParseRequest(message)
98
    except rpcerr.ProtocolError, err:
99
      logging.error("Protocol Error: %s", err)
100
      client.close_log()
101
      return
102

    
103
    success = False
104
    try:
105
      # Verify client's version if there was one in the request
106
      if ver is not None and ver != constants.LUXI_VERSION:
107
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
108
                               (constants.LUXI_VERSION, ver))
109

    
110
      result = client_ops.handle_request(method, args)
111
      success = True
112
    except errors.GenericError, err:
113
      logging.exception("Unexpected exception")
114
      success = False
115
      result = errors.EncodeException(err)
116
    except:
117
      logging.exception("Unexpected exception")
118
      err = sys.exc_info()
119
      result = "Caught exception: %s" % str(err[1])
120

    
121
    try:
122
      reply = rpccl.FormatResponse(success, result)
123
      client.send_message(reply)
124
      # awake the main thread so that it can write out the data.
125
      server.awaker.signal()
126
    except: # pylint: disable=W0702
127
      logging.exception("Send error")
128
      client.close_log()
129

    
130

    
131
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
132
  """Handler for master peers.
133

134
  """
135
  _MAX_UNHANDLED = 1
136

    
137
  def __init__(self, server, connected_socket, client_address, family):
138
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
139
                                                 client_address,
140
                                                 constants.LUXI_EOM,
141
                                                 family, self._MAX_UNHANDLED)
142
    self.server = server
143

    
144
  def handle_message(self, message, _):
145
    self.server.request_workers.AddTask((self.server, message, self))
146

    
147

    
148
class _MasterShutdownCheck:
149
  """Logic for master daemon shutdown.
150

151
  """
152
  #: How long to wait between checks
153
  _CHECK_INTERVAL = 5.0
154

    
155
  #: How long to wait after all jobs are done (e.g. to give clients time to
156
  #: retrieve the job status)
157
  _SHUTDOWN_LINGER = 5.0
158

    
159
  def __init__(self):
160
    """Initializes this class.
161

162
    """
163
    self._had_active_jobs = None
164
    self._linger_timeout = None
165

    
166
  def __call__(self, jq_prepare_result):
167
    """Determines if master daemon is ready for shutdown.
168

169
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
170
    @rtype: None or number
171
    @return: None if master daemon is ready, timeout if the check must be
172
             repeated
173

174
    """
175
    if jq_prepare_result:
176
      # Check again shortly
177
      logging.info("Job queue has been notified for shutdown but is still"
178
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
179
      self._had_active_jobs = True
180
      return self._CHECK_INTERVAL
181

    
182
    if not self._had_active_jobs:
183
      # Can shut down as there were no active jobs on the first check
184
      return None
185

    
186
    # No jobs are running anymore, but maybe some clients want to collect some
187
    # information. Give them a short amount of time.
188
    if self._linger_timeout is None:
189
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
190

    
191
    remaining = self._linger_timeout.Remaining()
192

    
193
    logging.info("Job queue no longer busy; shutting down master daemon"
194
                 " in %s seconds", remaining)
195

    
196
    # TODO: Should the master daemon socket be closed at this point? Doing so
197
    # wouldn't affect existing connections.
198

    
199
    if remaining < 0:
200
      return None
201
    else:
202
      return remaining
203

    
204

    
205
class MasterServer(daemon.AsyncStreamServer):
206
  """Master Server.
207

208
  This is the main asynchronous master server. It handles connections to the
209
  master socket.
210

211
  """
212
  family = socket.AF_UNIX
213

    
214
  def __init__(self, address, uid, gid):
215
    """MasterServer constructor
216

217
    @param address: the unix socket address to bind the MasterServer to
218
    @param uid: The uid of the owner of the socket
219
    @param gid: The gid of the owner of the socket
220

221
    """
222
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
223
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
224
    os.chmod(temp_name, 0770)
225
    os.chown(temp_name, uid, gid)
226
    os.rename(temp_name, address)
227

    
228
    self.awaker = daemon.AsyncAwaker()
229

    
230
    # We'll only start threads once we've forked.
231
    self.context = None
232
    self.request_workers = None
233

    
234
    self._shutdown_check = None
235

    
236
  def handle_connection(self, connected_socket, client_address):
237
    # TODO: add connection count and limit the number of open connections to a
238
    # maximum number to avoid breaking for lack of file descriptors or memory.
239
    MasterClientHandler(self, connected_socket, client_address, self.family)
240

    
241
  def setup_context(self):
242
    self.context = GanetiContext()
243
    self.request_workers = workerpool.WorkerPool("ClientReq",
244
                                                 CLIENT_REQUEST_WORKERS,
245
                                                 ClientRequestWorker)
246

    
247
  def WaitForShutdown(self):
248
    """Prepares server for shutdown.
249

250
    """
251
    if self._shutdown_check is None:
252
      self._shutdown_check = _MasterShutdownCheck()
253

    
254
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
255

    
256
  def server_cleanup(self):
257
    """Cleanup the server.
258

259
    This involves shutting down the processor threads and the master
260
    socket.
261

262
    """
263
    try:
264
      self.close()
265
    finally:
266
      if self.request_workers:
267
        self.request_workers.TerminateWorkers()
268
      if self.context:
269
        self.context.jobqueue.Shutdown()
270
        self.context.livelock.close()
271

    
272

    
273
class ClientOps:
274
  """Class holding high-level client operations."""
275
  def __init__(self, server):
276
    self.server = server
277

    
278
  def handle_request(self, method, args): # pylint: disable=R0911
279
    context = self.server.context
280
    queue = context.jobqueue
281

    
282
    # TODO: Parameter validation
283
    if not isinstance(args, (tuple, list)):
284
      logging.info("Received invalid arguments of type '%s'", type(args))
285
      raise ValueError("Invalid arguments type '%s'" % type(args))
286

    
287
    if method not in luxi.REQ_ALL:
288
      logging.info("Received invalid request '%s'", method)
289
      raise ValueError("Invalid operation '%s'" % method)
290

    
291
    # TODO: Rewrite to not exit in each 'if/elif' branch
292

    
293
    if method == luxi.REQ_SUBMIT_JOB:
294
      logging.info("Receiving new job")
295
      (job_def, ) = args
296
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
297
      job_id = queue.SubmitJob(ops)
298
      _LogNewJob(True, job_id, ops)
299
      return job_id
300

    
301
    elif method == luxi.REQ_PICKUP_JOB:
302
      logging.info("Picking up new job from queue")
303
      (job_id, ) = args
304
      queue.PickupJob(job_id)
305

    
306
    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
307
      logging.info("Forcefully receiving new job")
308
      (job_def, ) = args
309
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
310
      job_id = queue.SubmitJobToDrainedQueue(ops)
311
      _LogNewJob(True, job_id, ops)
312
      return job_id
313

    
314
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
315
      logging.info("Receiving multiple jobs")
316
      (job_defs, ) = args
317
      jobs = []
318
      for ops in job_defs:
319
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
320
      job_ids = queue.SubmitManyJobs(jobs)
321
      for ((status, job_id), ops) in zip(job_ids, jobs):
322
        _LogNewJob(status, job_id, ops)
323
      return job_ids
324

    
325
    elif method == luxi.REQ_CANCEL_JOB:
326
      (job_id, ) = args
327
      logging.info("Received job cancel request for %s", job_id)
328
      return queue.CancelJob(job_id)
329

    
330
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
331
      (job_id, priority) = args
332
      logging.info("Received request to change priority for job %s to %s",
333
                   job_id, priority)
334
      return queue.ChangeJobPriority(job_id, priority)
335

    
336
    elif method == luxi.REQ_ARCHIVE_JOB:
337
      (job_id, ) = args
338
      logging.info("Received job archive request for %s", job_id)
339
      return queue.ArchiveJob(job_id)
340

    
341
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
342
      (age, timeout) = args
343
      logging.info("Received job autoarchive request for age %s, timeout %s",
344
                   age, timeout)
345
      return queue.AutoArchiveJobs(age, timeout)
346

    
347
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
348
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
349
      logging.info("Received job poll request for %s", job_id)
350
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
351
                                     prev_log_serial, timeout)
352

    
353
    elif method == luxi.REQ_QUERY:
354
      (what, fields, qfilter) = args
355

    
356
      if what in constants.QR_VIA_OP:
357
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
358
                                             qfilter=qfilter))
359
      elif what == constants.QR_LOCK:
360
        if qfilter is not None:
361
          raise errors.OpPrereqError("Lock queries can't be filtered",
362
                                     errors.ECODE_INVAL)
363
        return context.glm.QueryLocks(fields)
364
      elif what == constants.QR_JOB:
365
        return queue.QueryJobs(fields, qfilter)
366
      elif what in constants.QR_VIA_LUXI:
367
        luxi_client = runtime.GetClient()
368
        result = luxi_client.Query(what, fields, qfilter).ToDict()
369
      else:
370
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
371
                                   errors.ECODE_INVAL)
372

    
373
      return result
374

    
375
    elif method == luxi.REQ_QUERY_FIELDS:
376
      (what, fields) = args
377
      req = objects.QueryFieldsRequest(what=what, fields=fields)
378

    
379
      try:
380
        fielddefs = query.ALL_FIELDS[req.what]
381
      except KeyError:
382
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
383
                                   errors.ECODE_INVAL)
384

    
385
      return query.QueryFields(fielddefs, req.fields)
386

    
387
    elif method == luxi.REQ_QUERY_JOBS:
388
      (job_ids, fields) = args
389
      if isinstance(job_ids, (tuple, list)) and job_ids:
390
        msg = utils.CommaJoin(job_ids)
391
      else:
392
        msg = str(job_ids)
393
      logging.info("Received job query request for %s", msg)
394
      return queue.OldStyleQueryJobs(job_ids, fields)
395

    
396
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
397
      (fields, ) = args
398
      logging.info("Received config values query request for %s", fields)
399
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
400
      return self._Query(op)
401

    
402
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
403
      logging.info("Received cluster info query request")
404
      op = opcodes.OpClusterQuery()
405
      return self._Query(op)
406

    
407
    elif method == luxi.REQ_QUERY_TAGS:
408
      (kind, name) = args
409
      logging.info("Received tags query request")
410
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
411
      return self._Query(op)
412

    
413
    elif method == luxi.REQ_SET_DRAIN_FLAG:
414
      (drain_flag, ) = args
415
      logging.info("Received queue drain flag change request to %s",
416
                   drain_flag)
417
      return queue.SetDrainFlag(drain_flag)
418

    
419
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
420
      (until, ) = args
421

    
422
      # FIXME!
423
      ec_id = None
424
      return _SetWatcherPause(context, ec_id, until)
425

    
426
    else:
427
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
428
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
429
                                   " but not implemented" % method)
430

    
431
  def _Query(self, op):
432
    """Runs the specified opcode and returns the result.
433

434
    """
435
    # Queries don't have a job id
436
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
437

    
438
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
439
    # Consider using a timeout for retries.
440
    return proc.ExecOpCode(op, None)
441

    
442

    
443
class GanetiContext(object):
444
  """Context common to all ganeti threads.
445

446
  This class creates and holds common objects shared by all threads.
447

448
  """
449
  # pylint: disable=W0212
450
  # we do want to ensure a singleton here
451
  _instance = None
452

    
453
  def __init__(self, livelock=None):
454
    """Constructs a new GanetiContext object.
455

456
    There should be only a GanetiContext object at any time, so this
457
    function raises an error if this is not the case.
458

459
    """
460
    assert self.__class__._instance is None, "double GanetiContext instance"
461

    
462
    # Create a livelock file
463
    if livelock is None:
464
      self.livelock = utils.livelock.LiveLock("masterd")
465
    else:
466
      self.livelock = livelock
467

    
468
    # Locking manager
469
    cfg = self.GetConfig(None)
470
    self.glm = locking.GanetiLockManager(
471
      cfg.GetNodeList(),
472
      cfg.GetNodeGroupList(),
473
      [inst.name for inst in cfg.GetAllInstancesInfo().values()],
474
      cfg.GetNetworkList())
475

    
476
    # Job queue
477
    logging.debug("Creating the job queue")
478
    self.jobqueue = jqueue.JobQueue(self, cfg)
479

    
480
    # setting this also locks the class against attribute modifications
481
    self.__class__._instance = self
482

    
483
  def __setattr__(self, name, value):
484
    """Setting GanetiContext attributes is forbidden after initialization.
485

486
    """
487
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
488
    object.__setattr__(self, name, value)
489

    
490
  def GetWConfdContext(self, ec_id):
491
    return config.GetWConfdContext(ec_id, self.livelock)
492

    
493
  def GetConfig(self, ec_id):
494
    return config.GetConfig(ec_id, self.livelock)
495

    
496
  def GetRpc(self, cfg):
497
    return rpc.RpcRunner(cfg, self.glm.AddToLockMonitor)
498

    
499
  def AddNode(self, cfg, node, ec_id):
500
    """Adds a node to the configuration.
501

502
    """
503
    # Add it to the configuration
504
    cfg.AddNode(node, ec_id)
505

    
506
    # If preseeding fails it'll not be added
507
    self.jobqueue.AddNode(node)
508

    
509
  def ReaddNode(self, node):
510
    """Updates a node that's already in the configuration
511

512
    """
513
    # Synchronize the queue again
514
    self.jobqueue.AddNode(node)
515

    
516
  def RemoveNode(self, cfg, node):
517
    """Removes a node from the configuration and lock manager.
518

519
    """
520
    # Remove node from configuration
521
    cfg.RemoveNode(node.uuid)
522

    
523
    # Notify job queue
524
    self.jobqueue.RemoveNode(node.name)
525

    
526

    
527
def _SetWatcherPause(context, ec_id, until):
528
  """Creates or removes the watcher pause file.
529

530
  @type context: L{GanetiContext}
531
  @param context: Global Ganeti context
532
  @type until: None or int
533
  @param until: Unix timestamp saying until when the watcher shouldn't run
534

535
  """
536
  node_names = context.GetConfig(ec_id).GetNodeList()
537

    
538
  if until is None:
539
    logging.info("Received request to no longer pause watcher")
540
  else:
541
    if not ht.TNumber(until):
542
      raise TypeError("Duration must be numeric")
543

    
544
    if until < time.time():
545
      raise errors.GenericError("Unable to set pause end time in the past")
546

    
547
    logging.info("Received request to pause watcher until %s", until)
548

    
549
  result = context.rpc.call_set_watcher_pause(node_names, until)
550

    
551
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
552
                           for (node_name, nres) in result.items()
553
                           if nres.fail_msg and not nres.offline)
554
  if errmsg:
555
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
556
                             " on the following node(s): %s" % errmsg)
557

    
558
  return until
559

    
560

    
561
@rpc.RunWithRPC
562
def CheckAgreement():
563
  """Check the agreement on who is the master.
564

565
  The function uses a very simple algorithm: we must get more positive
566
  than negative answers. Since in most of the cases we are the master,
567
  we'll use our own config file for getting the node list. In the
568
  future we could collect the current node list from our (possibly
569
  obsolete) known nodes.
570

571
  In order to account for cold-start of all nodes, we retry for up to
572
  a minute until we get a real answer as the top-voted one. If the
573
  nodes are more out-of-sync, for now manual startup of the master
574
  should be attempted.
575

576
  Note that for a even number of nodes cluster, we need at least half
577
  of the nodes (beside ourselves) to vote for us. This creates a
578
  problem on two-node clusters, since in this case we require the
579
  other node to be up too to confirm our status.
580

581
  """
582
  myself = netutils.Hostname.GetSysName()
583
  # Create a livelock file
584
  livelock = utils.livelock.LiveLock("masterd_check_agreement")
585
  #temp instantiation of a config writer, used only to get the node list
586
  cfg = config.GetConfig(None, livelock)
587
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
588
  del cfg
589
  retries = 6
590
  while retries > 0:
591
    votes = bootstrap.GatherMasterVotes(node_names)
592
    if not votes:
593
      # empty node list, this is a one node cluster
594
      return True
595
    if votes[0][0] is None:
596
      retries -= 1
597
      time.sleep(10)
598
      continue
599
    break
600
  if retries == 0:
601
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
602
                     " after multiple retries. Aborting startup")
603
    logging.critical("Use the --no-voting option if you understand what"
604
                     " effects it has on the cluster state")
605
    return False
606
  # here a real node is at the top of the list
607
  all_votes = sum(item[1] for item in votes)
608
  top_node, top_votes = votes[0]
609

    
610
  result = False
611
  if top_node != myself:
612
    logging.critical("It seems we are not the master (top-voted node"
613
                     " is %s with %d out of %d votes)", top_node, top_votes,
614
                     all_votes)
615
  elif top_votes < all_votes - top_votes:
616
    logging.critical("It seems we are not the master (%d votes for,"
617
                     " %d votes against)", top_votes, all_votes - top_votes)
618
  else:
619
    result = True
620

    
621
  return result
622

    
623

    
624
@rpc.RunWithRPC
625
def ActivateMasterIP():
626
  # activate ip
627
  # Create a livelock file
628
  livelock = utils.livelock.LiveLock("masterd_activate_ip")
629
  cfg = config.GetConfig(None, livelock)
630
  master_params = cfg.GetMasterNetworkParameters()
631
  ems = cfg.GetUseExternalMipScript()
632
  runner = rpc.BootstrapRunner()
633
  # we use the node name, as the configuration is only available here yet
634
  result = runner.call_node_activate_master_ip(
635
             cfg.GetNodeName(master_params.uuid), master_params, ems)
636

    
637
  msg = result.fail_msg
638
  if msg:
639
    logging.error("Can't activate master IP address: %s", msg)
640

    
641

    
642
def CheckMasterd(options, args):
643
  """Initial checks whether to run or exit with a failure.
644

645
  """
646
  if args: # masterd doesn't take any arguments
647
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
648
    sys.exit(constants.EXIT_FAILURE)
649

    
650
  ssconf.CheckMaster(options.debug)
651

    
652
  try:
653
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
654
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
655
  except KeyError:
656
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
657
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
658
    sys.exit(constants.EXIT_FAILURE)
659

    
660
  # Determine static runtime architecture information
661
  runtime.InitArchInfo()
662

    
663
  # Check the configuration is sane before anything else
664
  try:
665
    livelock = utils.livelock.LiveLock("masterd_check")
666
    config.GetConfig(None, livelock)
667
  except errors.ConfigVersionMismatch, err:
668
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
669
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
670
    print >> sys.stderr,  \
671
        ("Configuration version mismatch. The current Ganeti software"
672
         " expects version %s, but the on-disk configuration file has"
673
         " version %s. This is likely the result of upgrading the"
674
         " software without running the upgrade procedure. Please contact"
675
         " your cluster administrator or complete the upgrade using the"
676
         " cfgupgrade utility, after reading the upgrade notes." %
677
         (v1, v2))
678
    sys.exit(constants.EXIT_FAILURE)
679
  except errors.ConfigurationError, err:
680
    print >> sys.stderr, \
681
        ("Configuration error while opening the configuration file: %s\n"
682
         "This might be caused by an incomplete software upgrade or"
683
         " by a corrupted configuration file. Until the problem is fixed"
684
         " the master daemon cannot start." % str(err))
685
    sys.exit(constants.EXIT_FAILURE)
686

    
687
  # If CheckMaster didn't fail we believe we are the master, but we have to
688
  # confirm with the other nodes.
689
  if options.no_voting:
690
    if not options.yes_do_it:
691
      sys.stdout.write("The 'no voting' option has been selected.\n")
692
      sys.stdout.write("This is dangerous, please confirm by"
693
                       " typing uppercase 'yes': ")
694
      sys.stdout.flush()
695

    
696
      confirmation = sys.stdin.readline().strip()
697
      if confirmation != "YES":
698
        print >> sys.stderr, "Aborting."
699
        sys.exit(constants.EXIT_FAILURE)
700

    
701
  else:
702
    # CheckAgreement uses RPC and threads, hence it needs to be run in
703
    # a separate process before we call utils.Daemonize in the current
704
    # process.
705
    if not utils.RunInSeparateProcess(CheckAgreement):
706
      sys.exit(constants.EXIT_FAILURE)
707

    
708
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
709
  # separate process.
710

    
711
  # TODO: decide whether failure to activate the master IP is a fatal error
712
  utils.RunInSeparateProcess(ActivateMasterIP)
713

    
714

    
715
def PrepMasterd(options, _):
716
  """Prep master daemon function, executed with the PID file held.
717

718
  """
719
  # This is safe to do as the pid file guarantees against
720
  # concurrent execution.
721
  utils.RemoveFile(pathutils.MASTER_SOCKET)
722

    
723
  mainloop = daemon.Mainloop()
724
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
725
  return (mainloop, master)
726

    
727

    
728
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
729
  """Main master daemon function, executed with the PID file held.
730

731
  """
732
  (mainloop, master) = prep_data
733
  try:
734
    rpc.Init()
735
    try:
736
      master.setup_context()
737
      try:
738
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
739
      finally:
740
        master.server_cleanup()
741
    finally:
742
      rpc.Shutdown()
743
  finally:
744
    utils.RemoveFile(pathutils.MASTER_SOCKET)
745

    
746
  logging.info("Clean master daemon shutdown")
747

    
748

    
749
def Main():
750
  """Main function"""
751
  parser = OptionParser(description="Ganeti master daemon",
752
                        usage="%prog [-f] [-d]",
753
                        version="%%prog (ganeti) %s" %
754
                        constants.RELEASE_VERSION)
755
  parser.add_option("--no-voting", dest="no_voting",
756
                    help="Do not check that the nodes agree on this node"
757
                    " being the master and start the daemon unconditionally",
758
                    default=False, action="store_true")
759
  parser.add_option("--yes-do-it", dest="yes_do_it",
760
                    help="Override interactive check for --no-voting",
761
                    default=False, action="store_true")
762
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
763
                     ExecMasterd, multithreaded=True)