Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 78fcfd43

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62

    
63

    
64
CLIENT_REQUEST_WORKERS = 16
65

    
66
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68

    
69

    
70
def _LogNewJob(status, info, ops):
71
  """Log information about a recently submitted job.
72

73
  """
74
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
75

    
76
  if status:
77
    logging.info("New job with id %s, summary: %s", info, op_summary)
78
  else:
79
    logging.info("Failed to submit job, reason: '%s', summary: %s",
80
                 info, op_summary)
81

    
82

    
83
class ClientRequestWorker(workerpool.BaseWorker):
84
  # pylint: disable=W0221
85
  def RunTask(self, server, message, client):
86
    """Process the request.
87

88
    """
89
    client_ops = ClientOps(server)
90

    
91
    try:
92
      (method, args, version) = luxi.ParseRequest(message)
93
    except luxi.ProtocolError, err:
94
      logging.error("Protocol Error: %s", err)
95
      client.close_log()
96
      return
97

    
98
    success = False
99
    try:
100
      # Verify client's version if there was one in the request
101
      if version is not None and version != constants.LUXI_VERSION:
102
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
103
                               (constants.LUXI_VERSION, version))
104

    
105
      result = client_ops.handle_request(method, args)
106
      success = True
107
    except errors.GenericError, err:
108
      logging.exception("Unexpected exception")
109
      success = False
110
      result = errors.EncodeException(err)
111
    except:
112
      logging.exception("Unexpected exception")
113
      err = sys.exc_info()
114
      result = "Caught exception: %s" % str(err[1])
115

    
116
    try:
117
      reply = luxi.FormatResponse(success, result)
118
      client.send_message(reply)
119
      # awake the main thread so that it can write out the data.
120
      server.awaker.signal()
121
    except: # pylint: disable=W0702
122
      logging.exception("Send error")
123
      client.close_log()
124

    
125

    
126
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
127
  """Handler for master peers.
128

129
  """
130
  _MAX_UNHANDLED = 1
131

    
132
  def __init__(self, server, connected_socket, client_address, family):
133
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
134
                                                 client_address,
135
                                                 constants.LUXI_EOM,
136
                                                 family, self._MAX_UNHANDLED)
137
    self.server = server
138

    
139
  def handle_message(self, message, _):
140
    self.server.request_workers.AddTask((self.server, message, self))
141

    
142

    
143
class _MasterShutdownCheck:
144
  """Logic for master daemon shutdown.
145

146
  """
147
  #: How long to wait between checks
148
  _CHECK_INTERVAL = 5.0
149

    
150
  #: How long to wait after all jobs are done (e.g. to give clients time to
151
  #: retrieve the job status)
152
  _SHUTDOWN_LINGER = 5.0
153

    
154
  def __init__(self):
155
    """Initializes this class.
156

157
    """
158
    self._had_active_jobs = None
159
    self._linger_timeout = None
160

    
161
  def __call__(self, jq_prepare_result):
162
    """Determines if master daemon is ready for shutdown.
163

164
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
165
    @rtype: None or number
166
    @return: None if master daemon is ready, timeout if the check must be
167
             repeated
168

169
    """
170
    if jq_prepare_result:
171
      # Check again shortly
172
      logging.info("Job queue has been notified for shutdown but is still"
173
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
174
      self._had_active_jobs = True
175
      return self._CHECK_INTERVAL
176

    
177
    if not self._had_active_jobs:
178
      # Can shut down as there were no active jobs on the first check
179
      return None
180

    
181
    # No jobs are running anymore, but maybe some clients want to collect some
182
    # information. Give them a short amount of time.
183
    if self._linger_timeout is None:
184
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
185

    
186
    remaining = self._linger_timeout.Remaining()
187

    
188
    logging.info("Job queue no longer busy; shutting down master daemon"
189
                 " in %s seconds", remaining)
190

    
191
    # TODO: Should the master daemon socket be closed at this point? Doing so
192
    # wouldn't affect existing connections.
193

    
194
    if remaining < 0:
195
      return None
196
    else:
197
      return remaining
198

    
199

    
200
class MasterServer(daemon.AsyncStreamServer):
201
  """Master Server.
202

203
  This is the main asynchronous master server. It handles connections to the
204
  master socket.
205

206
  """
207
  family = socket.AF_UNIX
208

    
209
  def __init__(self, address, uid, gid):
210
    """MasterServer constructor
211

212
    @param address: the unix socket address to bind the MasterServer to
213
    @param uid: The uid of the owner of the socket
214
    @param gid: The gid of the owner of the socket
215

216
    """
217
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
218
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
219
    os.chmod(temp_name, 0770)
220
    os.chown(temp_name, uid, gid)
221
    os.rename(temp_name, address)
222

    
223
    self.awaker = daemon.AsyncAwaker()
224

    
225
    # We'll only start threads once we've forked.
226
    self.context = None
227
    self.request_workers = None
228

    
229
    self._shutdown_check = None
230

    
231
  def handle_connection(self, connected_socket, client_address):
232
    # TODO: add connection count and limit the number of open connections to a
233
    # maximum number to avoid breaking for lack of file descriptors or memory.
234
    MasterClientHandler(self, connected_socket, client_address, self.family)
235

    
236
  def setup_queue(self):
237
    self.context = GanetiContext()
238
    self.request_workers = workerpool.WorkerPool("ClientReq",
239
                                                 CLIENT_REQUEST_WORKERS,
240
                                                 ClientRequestWorker)
241

    
242
  def WaitForShutdown(self):
243
    """Prepares server for shutdown.
244

245
    """
246
    if self._shutdown_check is None:
247
      self._shutdown_check = _MasterShutdownCheck()
248

    
249
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
250

    
251
  def server_cleanup(self):
252
    """Cleanup the server.
253

254
    This involves shutting down the processor threads and the master
255
    socket.
256

257
    """
258
    try:
259
      self.close()
260
    finally:
261
      if self.request_workers:
262
        self.request_workers.TerminateWorkers()
263
      if self.context:
264
        self.context.jobqueue.Shutdown()
265

    
266

    
267
class ClientOps:
268
  """Class holding high-level client operations."""
269
  def __init__(self, server):
270
    self.server = server
271

    
272
  def handle_request(self, method, args): # pylint: disable=R0911
273
    context = self.server.context
274
    queue = context.jobqueue
275

    
276
    # TODO: Parameter validation
277
    if not isinstance(args, (tuple, list)):
278
      logging.info("Received invalid arguments of type '%s'", type(args))
279
      raise ValueError("Invalid arguments type '%s'" % type(args))
280

    
281
    # TODO: Rewrite to not exit in each 'if/elif' branch
282

    
283
    if method == luxi.REQ_SUBMIT_JOB:
284
      logging.info("Receiving new job")
285
      (job_def, ) = args
286
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
287
      job_id = queue.SubmitJob(ops)
288
      _LogNewJob(True, job_id, ops)
289
      return job_id
290

    
291
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
292
      logging.info("Receiving multiple jobs")
293
      (job_defs, ) = args
294
      jobs = []
295
      for ops in job_defs:
296
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
297
      job_ids = queue.SubmitManyJobs(jobs)
298
      for ((status, job_id), ops) in zip(job_ids, jobs):
299
        _LogNewJob(status, job_id, ops)
300
      return job_ids
301

    
302
    elif method == luxi.REQ_CANCEL_JOB:
303
      (job_id, ) = args
304
      logging.info("Received job cancel request for %s", job_id)
305
      return queue.CancelJob(job_id)
306

    
307
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
308
      (job_id, priority) = args
309
      logging.info("Received request to change priority for job %s to %s",
310
                   job_id, priority)
311
      return queue.ChangeJobPriority(job_id, priority)
312

    
313
    elif method == luxi.REQ_ARCHIVE_JOB:
314
      (job_id, ) = args
315
      logging.info("Received job archive request for %s", job_id)
316
      return queue.ArchiveJob(job_id)
317

    
318
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
319
      (age, timeout) = args
320
      logging.info("Received job autoarchive request for age %s, timeout %s",
321
                   age, timeout)
322
      return queue.AutoArchiveJobs(age, timeout)
323

    
324
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
325
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
326
      logging.info("Received job poll request for %s", job_id)
327
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
328
                                     prev_log_serial, timeout)
329

    
330
    elif method == luxi.REQ_QUERY:
331
      (what, fields, qfilter) = args
332

    
333
      if what in constants.QR_VIA_OP:
334
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
335
                                             qfilter=qfilter))
336
      elif what == constants.QR_LOCK:
337
        if qfilter is not None:
338
          raise errors.OpPrereqError("Lock queries can't be filtered",
339
                                     errors.ECODE_INVAL)
340
        return context.glm.QueryLocks(fields)
341
      elif what == constants.QR_JOB:
342
        return queue.QueryJobs(fields, qfilter)
343
      elif what in constants.QR_VIA_LUXI:
344
        raise NotImplementedError
345
      else:
346
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
347
                                   errors.ECODE_INVAL)
348

    
349
      return result
350

    
351
    elif method == luxi.REQ_QUERY_FIELDS:
352
      (what, fields) = args
353
      req = objects.QueryFieldsRequest(what=what, fields=fields)
354

    
355
      try:
356
        fielddefs = query.ALL_FIELDS[req.what]
357
      except KeyError:
358
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
359
                                   errors.ECODE_INVAL)
360

    
361
      return query.QueryFields(fielddefs, req.fields)
362

    
363
    elif method == luxi.REQ_QUERY_JOBS:
364
      (job_ids, fields) = args
365
      if isinstance(job_ids, (tuple, list)) and job_ids:
366
        msg = utils.CommaJoin(job_ids)
367
      else:
368
        msg = str(job_ids)
369
      logging.info("Received job query request for %s", msg)
370
      return queue.OldStyleQueryJobs(job_ids, fields)
371

    
372
    elif method == luxi.REQ_QUERY_INSTANCES:
373
      (names, fields, use_locking) = args
374
      logging.info("Received instance query request for %s", names)
375
      if use_locking:
376
        raise errors.OpPrereqError("Sync queries are not allowed",
377
                                   errors.ECODE_INVAL)
378
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
379
                                   use_locking=use_locking)
380
      return self._Query(op)
381

    
382
    elif method == luxi.REQ_QUERY_NODES:
383
      (names, fields, use_locking) = args
384
      logging.info("Received node query request for %s", names)
385
      if use_locking:
386
        raise errors.OpPrereqError("Sync queries are not allowed",
387
                                   errors.ECODE_INVAL)
388
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
389
                               use_locking=use_locking)
390
      return self._Query(op)
391

    
392
    elif method == luxi.REQ_QUERY_GROUPS:
393
      (names, fields, use_locking) = args
394
      logging.info("Received group query request for %s", names)
395
      if use_locking:
396
        raise errors.OpPrereqError("Sync queries are not allowed",
397
                                   errors.ECODE_INVAL)
398
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
399
      return self._Query(op)
400

    
401
    elif method == luxi.REQ_QUERY_NETWORKS:
402
      (names, fields, use_locking) = args
403
      logging.info("Received network query request for %s", names)
404
      if use_locking:
405
        raise errors.OpPrereqError("Sync queries are not allowed",
406
                                   errors.ECODE_INVAL)
407
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
408
      return self._Query(op)
409

    
410
    elif method == luxi.REQ_QUERY_EXPORTS:
411
      (nodes, use_locking) = args
412
      if use_locking:
413
        raise errors.OpPrereqError("Sync queries are not allowed",
414
                                   errors.ECODE_INVAL)
415
      logging.info("Received exports query request")
416
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
417
      return self._Query(op)
418

    
419
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
420
      (fields, ) = args
421
      logging.info("Received config values query request for %s", fields)
422
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
423
      return self._Query(op)
424

    
425
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
426
      logging.info("Received cluster info query request")
427
      op = opcodes.OpClusterQuery()
428
      return self._Query(op)
429

    
430
    elif method == luxi.REQ_QUERY_TAGS:
431
      (kind, name) = args
432
      logging.info("Received tags query request")
433
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
434
      return self._Query(op)
435

    
436
    elif method == luxi.REQ_SET_DRAIN_FLAG:
437
      (drain_flag, ) = args
438
      logging.info("Received queue drain flag change request to %s",
439
                   drain_flag)
440
      return queue.SetDrainFlag(drain_flag)
441

    
442
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
443
      (until, ) = args
444

    
445
      if until is None:
446
        logging.info("Received request to no longer pause the watcher")
447
      else:
448
        if not isinstance(until, (int, float)):
449
          raise TypeError("Duration must be an integer or float")
450

    
451
        if until < time.time():
452
          raise errors.GenericError("Unable to set pause end time in the past")
453

    
454
        logging.info("Received request to pause the watcher until %s", until)
455

    
456
      return _SetWatcherPause(until)
457

    
458
    else:
459
      logging.info("Received invalid request '%s'", method)
460
      raise ValueError("Invalid operation '%s'" % method)
461

    
462
  def _Query(self, op):
463
    """Runs the specified opcode and returns the result.
464

465
    """
466
    # Queries don't have a job id
467
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
468

    
469
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
470
    # Consider using a timeout for retries.
471
    return proc.ExecOpCode(op, None)
472

    
473

    
474
class GanetiContext(object):
475
  """Context common to all ganeti threads.
476

477
  This class creates and holds common objects shared by all threads.
478

479
  """
480
  # pylint: disable=W0212
481
  # we do want to ensure a singleton here
482
  _instance = None
483

    
484
  def __init__(self):
485
    """Constructs a new GanetiContext object.
486

487
    There should be only a GanetiContext object at any time, so this
488
    function raises an error if this is not the case.
489

490
    """
491
    assert self.__class__._instance is None, "double GanetiContext instance"
492

    
493
    # Create global configuration object
494
    self.cfg = config.ConfigWriter()
495

    
496
    # Locking manager
497
    self.glm = locking.GanetiLockManager(
498
      self.cfg.GetNodeList(),
499
      self.cfg.GetNodeGroupList(),
500
      self.cfg.GetInstanceList(),
501
      self.cfg.GetNetworkList())
502

    
503
    self.cfg.SetContext(self)
504

    
505
    # RPC runner
506
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
507

    
508
    # Job queue
509
    self.jobqueue = jqueue.JobQueue(self)
510

    
511
    # setting this also locks the class against attribute modifications
512
    self.__class__._instance = self
513

    
514
  def __setattr__(self, name, value):
515
    """Setting GanetiContext attributes is forbidden after initialization.
516

517
    """
518
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
519
    object.__setattr__(self, name, value)
520

    
521
  def AddNode(self, node, ec_id):
522
    """Adds a node to the configuration and lock manager.
523

524
    """
525
    # Add it to the configuration
526
    self.cfg.AddNode(node, ec_id)
527

    
528
    # If preseeding fails it'll not be added
529
    self.jobqueue.AddNode(node)
530

    
531
    # Add the new node to the Ganeti Lock Manager
532
    self.glm.add(locking.LEVEL_NODE, node.name)
533
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
534

    
535
  def ReaddNode(self, node):
536
    """Updates a node that's already in the configuration
537

538
    """
539
    # Synchronize the queue again
540
    self.jobqueue.AddNode(node)
541

    
542
  def RemoveNode(self, name):
543
    """Removes a node from the configuration and lock manager.
544

545
    """
546
    # Remove node from configuration
547
    self.cfg.RemoveNode(name)
548

    
549
    # Notify job queue
550
    self.jobqueue.RemoveNode(name)
551

    
552
    # Remove the node from the Ganeti Lock Manager
553
    self.glm.remove(locking.LEVEL_NODE, name)
554
    self.glm.remove(locking.LEVEL_NODE_RES, name)
555

    
556

    
557
def _SetWatcherPause(until):
558
  """Creates or removes the watcher pause file.
559

560
  @type until: None or int
561
  @param until: Unix timestamp saying until when the watcher shouldn't run
562

563
  """
564
  if until is None:
565
    utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
566
  else:
567
    utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
568
                    data="%d\n" % (until, ))
569

    
570
  return until
571

    
572

    
573
@rpc.RunWithRPC
574
def CheckAgreement():
575
  """Check the agreement on who is the master.
576

577
  The function uses a very simple algorithm: we must get more positive
578
  than negative answers. Since in most of the cases we are the master,
579
  we'll use our own config file for getting the node list. In the
580
  future we could collect the current node list from our (possibly
581
  obsolete) known nodes.
582

583
  In order to account for cold-start of all nodes, we retry for up to
584
  a minute until we get a real answer as the top-voted one. If the
585
  nodes are more out-of-sync, for now manual startup of the master
586
  should be attempted.
587

588
  Note that for a even number of nodes cluster, we need at least half
589
  of the nodes (beside ourselves) to vote for us. This creates a
590
  problem on two-node clusters, since in this case we require the
591
  other node to be up too to confirm our status.
592

593
  """
594
  myself = netutils.Hostname.GetSysName()
595
  #temp instantiation of a config writer, used only to get the node list
596
  cfg = config.ConfigWriter()
597
  node_list = cfg.GetNodeList()
598
  del cfg
599
  retries = 6
600
  while retries > 0:
601
    votes = bootstrap.GatherMasterVotes(node_list)
602
    if not votes:
603
      # empty node list, this is a one node cluster
604
      return True
605
    if votes[0][0] is None:
606
      retries -= 1
607
      time.sleep(10)
608
      continue
609
    break
610
  if retries == 0:
611
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
612
                     " after multiple retries. Aborting startup")
613
    logging.critical("Use the --no-voting option if you understand what"
614
                     " effects it has on the cluster state")
615
    return False
616
  # here a real node is at the top of the list
617
  all_votes = sum(item[1] for item in votes)
618
  top_node, top_votes = votes[0]
619

    
620
  result = False
621
  if top_node != myself:
622
    logging.critical("It seems we are not the master (top-voted node"
623
                     " is %s with %d out of %d votes)", top_node, top_votes,
624
                     all_votes)
625
  elif top_votes < all_votes - top_votes:
626
    logging.critical("It seems we are not the master (%d votes for,"
627
                     " %d votes against)", top_votes, all_votes - top_votes)
628
  else:
629
    result = True
630

    
631
  return result
632

    
633

    
634
@rpc.RunWithRPC
635
def ActivateMasterIP():
636
  # activate ip
637
  cfg = config.ConfigWriter()
638
  master_params = cfg.GetMasterNetworkParameters()
639
  ems = cfg.GetUseExternalMipScript()
640
  runner = rpc.BootstrapRunner()
641
  result = runner.call_node_activate_master_ip(master_params.name,
642
                                               master_params, ems)
643

    
644
  msg = result.fail_msg
645
  if msg:
646
    logging.error("Can't activate master IP address: %s", msg)
647

    
648

    
649
def CheckMasterd(options, args):
650
  """Initial checks whether to run or exit with a failure.
651

652
  """
653
  if args: # masterd doesn't take any arguments
654
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
655
    sys.exit(constants.EXIT_FAILURE)
656

    
657
  ssconf.CheckMaster(options.debug)
658

    
659
  try:
660
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
661
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
662
  except KeyError:
663
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
664
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
665
    sys.exit(constants.EXIT_FAILURE)
666

    
667
  # Determine static runtime architecture information
668
  runtime.InitArchInfo()
669

    
670
  # Check the configuration is sane before anything else
671
  try:
672
    config.ConfigWriter()
673
  except errors.ConfigVersionMismatch, err:
674
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
675
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
676
    print >> sys.stderr,  \
677
        ("Configuration version mismatch. The current Ganeti software"
678
         " expects version %s, but the on-disk configuration file has"
679
         " version %s. This is likely the result of upgrading the"
680
         " software without running the upgrade procedure. Please contact"
681
         " your cluster administrator or complete the upgrade using the"
682
         " cfgupgrade utility, after reading the upgrade notes." %
683
         (v1, v2))
684
    sys.exit(constants.EXIT_FAILURE)
685
  except errors.ConfigurationError, err:
686
    print >> sys.stderr, \
687
        ("Configuration error while opening the configuration file: %s\n"
688
         "This might be caused by an incomplete software upgrade or"
689
         " by a corrupted configuration file. Until the problem is fixed"
690
         " the master daemon cannot start." % str(err))
691
    sys.exit(constants.EXIT_FAILURE)
692

    
693
  # If CheckMaster didn't fail we believe we are the master, but we have to
694
  # confirm with the other nodes.
695
  if options.no_voting:
696
    if not options.yes_do_it:
697
      sys.stdout.write("The 'no voting' option has been selected.\n")
698
      sys.stdout.write("This is dangerous, please confirm by"
699
                       " typing uppercase 'yes': ")
700
      sys.stdout.flush()
701

    
702
      confirmation = sys.stdin.readline().strip()
703
      if confirmation != "YES":
704
        print >> sys.stderr, "Aborting."
705
        sys.exit(constants.EXIT_FAILURE)
706

    
707
  else:
708
    # CheckAgreement uses RPC and threads, hence it needs to be run in
709
    # a separate process before we call utils.Daemonize in the current
710
    # process.
711
    if not utils.RunInSeparateProcess(CheckAgreement):
712
      sys.exit(constants.EXIT_FAILURE)
713

    
714
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
715
  # separate process.
716

    
717
  # TODO: decide whether failure to activate the master IP is a fatal error
718
  utils.RunInSeparateProcess(ActivateMasterIP)
719

    
720

    
721
def PrepMasterd(options, _):
722
  """Prep master daemon function, executed with the PID file held.
723

724
  """
725
  # This is safe to do as the pid file guarantees against
726
  # concurrent execution.
727
  utils.RemoveFile(pathutils.MASTER_SOCKET)
728

    
729
  mainloop = daemon.Mainloop()
730
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
731
  return (mainloop, master)
732

    
733

    
734
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
735
  """Main master daemon function, executed with the PID file held.
736

737
  """
738
  (mainloop, master) = prep_data
739
  try:
740
    rpc.Init()
741
    try:
742
      master.setup_queue()
743
      try:
744
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
745
      finally:
746
        master.server_cleanup()
747
    finally:
748
      rpc.Shutdown()
749
  finally:
750
    utils.RemoveFile(pathutils.MASTER_SOCKET)
751

    
752
  logging.info("Clean master daemon shutdown")
753

    
754

    
755
def Main():
756
  """Main function"""
757
  parser = OptionParser(description="Ganeti master daemon",
758
                        usage="%prog [-f] [-d]",
759
                        version="%%prog (ganeti) %s" %
760
                        constants.RELEASE_VERSION)
761
  parser.add_option("--no-voting", dest="no_voting",
762
                    help="Do not check that the nodes agree on this node"
763
                    " being the master and start the daemon unconditionally",
764
                    default=False, action="store_true")
765
  parser.add_option("--yes-do-it", dest="yes_do_it",
766
                    help="Override interactive check for --no-voting",
767
                    default=False, action="store_true")
768
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
769
                     ExecMasterd, multithreaded=True)