Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ b40c0e97

History | View | Annotate | Download (24.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62
from ganeti import ht
63

    
64
from ganeti.utils import version
65

    
66

    
67
CLIENT_REQUEST_WORKERS = 16
68

    
69
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
70
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
71

    
72

    
73
def _LogNewJob(status, info, ops):
74
  """Log information about a recently submitted job.
75

76
  """
77
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
78

    
79
  if status:
80
    logging.info("New job with id %s, summary: %s", info, op_summary)
81
  else:
82
    logging.info("Failed to submit job, reason: '%s', summary: %s",
83
                 info, op_summary)
84

    
85

    
86
class ClientRequestWorker(workerpool.BaseWorker):
87
  # pylint: disable=W0221
88
  def RunTask(self, server, message, client):
89
    """Process the request.
90

91
    """
92
    client_ops = ClientOps(server)
93

    
94
    try:
95
      (method, args, ver) = luxi.ParseRequest(message)
96
    except luxi.ProtocolError, err:
97
      logging.error("Protocol Error: %s", err)
98
      client.close_log()
99
      return
100

    
101
    success = False
102
    try:
103
      # Verify client's version if there was one in the request
104
      if ver is not None and ver != constants.LUXI_VERSION:
105
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
106
                               (constants.LUXI_VERSION, ver))
107

    
108
      result = client_ops.handle_request(method, args)
109
      success = True
110
    except errors.GenericError, err:
111
      logging.exception("Unexpected exception")
112
      success = False
113
      result = errors.EncodeException(err)
114
    except:
115
      logging.exception("Unexpected exception")
116
      err = sys.exc_info()
117
      result = "Caught exception: %s" % str(err[1])
118

    
119
    try:
120
      reply = luxi.FormatResponse(success, result)
121
      client.send_message(reply)
122
      # awake the main thread so that it can write out the data.
123
      server.awaker.signal()
124
    except: # pylint: disable=W0702
125
      logging.exception("Send error")
126
      client.close_log()
127

    
128

    
129
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
130
  """Handler for master peers.
131

132
  """
133
  _MAX_UNHANDLED = 1
134

    
135
  def __init__(self, server, connected_socket, client_address, family):
136
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
137
                                                 client_address,
138
                                                 constants.LUXI_EOM,
139
                                                 family, self._MAX_UNHANDLED)
140
    self.server = server
141

    
142
  def handle_message(self, message, _):
143
    self.server.request_workers.AddTask((self.server, message, self))
144

    
145

    
146
class _MasterShutdownCheck:
147
  """Logic for master daemon shutdown.
148

149
  """
150
  #: How long to wait between checks
151
  _CHECK_INTERVAL = 5.0
152

    
153
  #: How long to wait after all jobs are done (e.g. to give clients time to
154
  #: retrieve the job status)
155
  _SHUTDOWN_LINGER = 5.0
156

    
157
  def __init__(self):
158
    """Initializes this class.
159

160
    """
161
    self._had_active_jobs = None
162
    self._linger_timeout = None
163

    
164
  def __call__(self, jq_prepare_result):
165
    """Determines if master daemon is ready for shutdown.
166

167
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
168
    @rtype: None or number
169
    @return: None if master daemon is ready, timeout if the check must be
170
             repeated
171

172
    """
173
    if jq_prepare_result:
174
      # Check again shortly
175
      logging.info("Job queue has been notified for shutdown but is still"
176
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
177
      self._had_active_jobs = True
178
      return self._CHECK_INTERVAL
179

    
180
    if not self._had_active_jobs:
181
      # Can shut down as there were no active jobs on the first check
182
      return None
183

    
184
    # No jobs are running anymore, but maybe some clients want to collect some
185
    # information. Give them a short amount of time.
186
    if self._linger_timeout is None:
187
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
188

    
189
    remaining = self._linger_timeout.Remaining()
190

    
191
    logging.info("Job queue no longer busy; shutting down master daemon"
192
                 " in %s seconds", remaining)
193

    
194
    # TODO: Should the master daemon socket be closed at this point? Doing so
195
    # wouldn't affect existing connections.
196

    
197
    if remaining < 0:
198
      return None
199
    else:
200
      return remaining
201

    
202

    
203
class MasterServer(daemon.AsyncStreamServer):
204
  """Master Server.
205

206
  This is the main asynchronous master server. It handles connections to the
207
  master socket.
208

209
  """
210
  family = socket.AF_UNIX
211

    
212
  def __init__(self, address, uid, gid):
213
    """MasterServer constructor
214

215
    @param address: the unix socket address to bind the MasterServer to
216
    @param uid: The uid of the owner of the socket
217
    @param gid: The gid of the owner of the socket
218

219
    """
220
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
221
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
222
    os.chmod(temp_name, 0770)
223
    os.chown(temp_name, uid, gid)
224
    os.rename(temp_name, address)
225

    
226
    self.awaker = daemon.AsyncAwaker()
227

    
228
    # We'll only start threads once we've forked.
229
    self.context = None
230
    self.request_workers = None
231

    
232
    self._shutdown_check = None
233

    
234
  def handle_connection(self, connected_socket, client_address):
235
    # TODO: add connection count and limit the number of open connections to a
236
    # maximum number to avoid breaking for lack of file descriptors or memory.
237
    MasterClientHandler(self, connected_socket, client_address, self.family)
238

    
239
  def setup_queue(self):
240
    self.context = GanetiContext()
241
    self.request_workers = workerpool.WorkerPool("ClientReq",
242
                                                 CLIENT_REQUEST_WORKERS,
243
                                                 ClientRequestWorker)
244

    
245
  def WaitForShutdown(self):
246
    """Prepares server for shutdown.
247

248
    """
249
    if self._shutdown_check is None:
250
      self._shutdown_check = _MasterShutdownCheck()
251

    
252
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
253

    
254
  def server_cleanup(self):
255
    """Cleanup the server.
256

257
    This involves shutting down the processor threads and the master
258
    socket.
259

260
    """
261
    try:
262
      self.close()
263
    finally:
264
      if self.request_workers:
265
        self.request_workers.TerminateWorkers()
266
      if self.context:
267
        self.context.jobqueue.Shutdown()
268

    
269

    
270
class ClientOps:
271
  """Class holding high-level client operations."""
272
  def __init__(self, server):
273
    self.server = server
274

    
275
  def handle_request(self, method, args): # pylint: disable=R0911
276
    context = self.server.context
277
    queue = context.jobqueue
278

    
279
    # TODO: Parameter validation
280
    if not isinstance(args, (tuple, list)):
281
      logging.info("Received invalid arguments of type '%s'", type(args))
282
      raise ValueError("Invalid arguments type '%s'" % type(args))
283

    
284
    if method not in luxi.REQ_ALL:
285
      logging.info("Received invalid request '%s'", method)
286
      raise ValueError("Invalid operation '%s'" % method)
287

    
288
    # TODO: Rewrite to not exit in each 'if/elif' branch
289

    
290
    if method == luxi.REQ_SUBMIT_JOB:
291
      logging.info("Receiving new job")
292
      (job_def, ) = args
293
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
294
      job_id = queue.SubmitJob(ops)
295
      _LogNewJob(True, job_id, ops)
296
      return job_id
297

    
298
    elif method == luxi.REQ_PICKUP_JOB:
299
      logging.info("Picking up new job from queue")
300
      (job_id, ) = args
301
      queue.PickupJob(job_id)
302

    
303
    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
304
      logging.info("Forcefully receiving new job")
305
      (job_def, ) = args
306
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
307
      job_id = queue.SubmitJobToDrainedQueue(ops)
308
      _LogNewJob(True, job_id, ops)
309
      return job_id
310

    
311
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
312
      logging.info("Receiving multiple jobs")
313
      (job_defs, ) = args
314
      jobs = []
315
      for ops in job_defs:
316
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
317
      job_ids = queue.SubmitManyJobs(jobs)
318
      for ((status, job_id), ops) in zip(job_ids, jobs):
319
        _LogNewJob(status, job_id, ops)
320
      return job_ids
321

    
322
    elif method == luxi.REQ_CANCEL_JOB:
323
      (job_id, ) = args
324
      logging.info("Received job cancel request for %s", job_id)
325
      return queue.CancelJob(job_id)
326

    
327
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
328
      (job_id, priority) = args
329
      logging.info("Received request to change priority for job %s to %s",
330
                   job_id, priority)
331
      return queue.ChangeJobPriority(job_id, priority)
332

    
333
    elif method == luxi.REQ_ARCHIVE_JOB:
334
      (job_id, ) = args
335
      logging.info("Received job archive request for %s", job_id)
336
      return queue.ArchiveJob(job_id)
337

    
338
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
339
      (age, timeout) = args
340
      logging.info("Received job autoarchive request for age %s, timeout %s",
341
                   age, timeout)
342
      return queue.AutoArchiveJobs(age, timeout)
343

    
344
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
345
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
346
      logging.info("Received job poll request for %s", job_id)
347
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
348
                                     prev_log_serial, timeout)
349

    
350
    elif method == luxi.REQ_QUERY:
351
      (what, fields, qfilter) = args
352

    
353
      if what in constants.QR_VIA_OP:
354
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
355
                                             qfilter=qfilter))
356
      elif what == constants.QR_LOCK:
357
        if qfilter is not None:
358
          raise errors.OpPrereqError("Lock queries can't be filtered",
359
                                     errors.ECODE_INVAL)
360
        return context.glm.QueryLocks(fields)
361
      elif what == constants.QR_JOB:
362
        return queue.QueryJobs(fields, qfilter)
363
      elif what in constants.QR_VIA_LUXI:
364
        luxi_client = runtime.GetClient(query=True)
365
        result = luxi_client.Query(what, fields, qfilter).ToDict()
366
      else:
367
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
368
                                   errors.ECODE_INVAL)
369

    
370
      return result
371

    
372
    elif method == luxi.REQ_QUERY_FIELDS:
373
      (what, fields) = args
374
      req = objects.QueryFieldsRequest(what=what, fields=fields)
375

    
376
      try:
377
        fielddefs = query.ALL_FIELDS[req.what]
378
      except KeyError:
379
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
380
                                   errors.ECODE_INVAL)
381

    
382
      return query.QueryFields(fielddefs, req.fields)
383

    
384
    elif method == luxi.REQ_QUERY_JOBS:
385
      (job_ids, fields) = args
386
      if isinstance(job_ids, (tuple, list)) and job_ids:
387
        msg = utils.CommaJoin(job_ids)
388
      else:
389
        msg = str(job_ids)
390
      logging.info("Received job query request for %s", msg)
391
      return queue.OldStyleQueryJobs(job_ids, fields)
392

    
393
    elif method == luxi.REQ_QUERY_INSTANCES:
394
      (names, fields, use_locking) = args
395
      logging.info("Received instance query request for %s", names)
396
      if use_locking:
397
        raise errors.OpPrereqError("Sync queries are not allowed",
398
                                   errors.ECODE_INVAL)
399
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
400
                                   use_locking=use_locking)
401
      return self._Query(op)
402

    
403
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
404
      (fields, ) = args
405
      logging.info("Received config values query request for %s", fields)
406
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
407
      return self._Query(op)
408

    
409
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
410
      logging.info("Received cluster info query request")
411
      op = opcodes.OpClusterQuery()
412
      return self._Query(op)
413

    
414
    elif method == luxi.REQ_QUERY_TAGS:
415
      (kind, name) = args
416
      logging.info("Received tags query request")
417
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
418
      return self._Query(op)
419

    
420
    elif method == luxi.REQ_SET_DRAIN_FLAG:
421
      (drain_flag, ) = args
422
      logging.info("Received queue drain flag change request to %s",
423
                   drain_flag)
424
      return queue.SetDrainFlag(drain_flag)
425

    
426
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
427
      (until, ) = args
428

    
429
      return _SetWatcherPause(context, until)
430

    
431
    else:
432
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
433
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
434
                                   " but not implemented" % method)
435

    
436
  def _Query(self, op):
437
    """Runs the specified opcode and returns the result.
438

439
    """
440
    # Queries don't have a job id
441
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
442

    
443
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
444
    # Consider using a timeout for retries.
445
    return proc.ExecOpCode(op, None)
446

    
447

    
448
class GanetiContext(object):
449
  """Context common to all ganeti threads.
450

451
  This class creates and holds common objects shared by all threads.
452

453
  """
454
  # pylint: disable=W0212
455
  # we do want to ensure a singleton here
456
  _instance = None
457

    
458
  def __init__(self):
459
    """Constructs a new GanetiContext object.
460

461
    There should be only a GanetiContext object at any time, so this
462
    function raises an error if this is not the case.
463

464
    """
465
    assert self.__class__._instance is None, "double GanetiContext instance"
466

    
467
    # Create global configuration object
468
    self.cfg = config.ConfigWriter()
469

    
470
    # Locking manager
471
    self.glm = locking.GanetiLockManager(
472
      self.cfg.GetNodeList(),
473
      self.cfg.GetNodeGroupList(),
474
      [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
475
      self.cfg.GetNetworkList())
476

    
477
    self.cfg.SetContext(self)
478

    
479
    # RPC runner
480
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
481

    
482
    # Job queue
483
    self.jobqueue = jqueue.JobQueue(self)
484

    
485
    # setting this also locks the class against attribute modifications
486
    self.__class__._instance = self
487

    
488
  def __setattr__(self, name, value):
489
    """Setting GanetiContext attributes is forbidden after initialization.
490

491
    """
492
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
493
    object.__setattr__(self, name, value)
494

    
495
  def AddNode(self, node, ec_id):
496
    """Adds a node to the configuration and lock manager.
497

498
    """
499
    # Add it to the configuration
500
    self.cfg.AddNode(node, ec_id)
501

    
502
    # If preseeding fails it'll not be added
503
    self.jobqueue.AddNode(node)
504

    
505
    # Add the new node to the Ganeti Lock Manager
506
    self.glm.add(locking.LEVEL_NODE, node.uuid)
507
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
508

    
509
  def ReaddNode(self, node):
510
    """Updates a node that's already in the configuration
511

512
    """
513
    # Synchronize the queue again
514
    self.jobqueue.AddNode(node)
515

    
516
  def RemoveNode(self, node):
517
    """Removes a node from the configuration and lock manager.
518

519
    """
520
    # Remove node from configuration
521
    self.cfg.RemoveNode(node.uuid)
522

    
523
    # Notify job queue
524
    self.jobqueue.RemoveNode(node.name)
525

    
526
    # Remove the node from the Ganeti Lock Manager
527
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
528
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
529

    
530

    
531
def _SetWatcherPause(context, until):
532
  """Creates or removes the watcher pause file.
533

534
  @type context: L{GanetiContext}
535
  @param context: Global Ganeti context
536
  @type until: None or int
537
  @param until: Unix timestamp saying until when the watcher shouldn't run
538

539
  """
540
  node_names = context.cfg.GetNodeList()
541

    
542
  if until is None:
543
    logging.info("Received request to no longer pause watcher")
544
  else:
545
    if not ht.TNumber(until):
546
      raise TypeError("Duration must be numeric")
547

    
548
    if until < time.time():
549
      raise errors.GenericError("Unable to set pause end time in the past")
550

    
551
    logging.info("Received request to pause watcher until %s", until)
552

    
553
  result = context.rpc.call_set_watcher_pause(node_names, until)
554

    
555
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
556
                           for (node_name, nres) in result.items()
557
                           if nres.fail_msg and not nres.offline)
558
  if errmsg:
559
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
560
                             " on the following node(s): %s" % errmsg)
561

    
562
  return until
563

    
564

    
565
@rpc.RunWithRPC
566
def CheckAgreement():
567
  """Check the agreement on who is the master.
568

569
  The function uses a very simple algorithm: we must get more positive
570
  than negative answers. Since in most of the cases we are the master,
571
  we'll use our own config file for getting the node list. In the
572
  future we could collect the current node list from our (possibly
573
  obsolete) known nodes.
574

575
  In order to account for cold-start of all nodes, we retry for up to
576
  a minute until we get a real answer as the top-voted one. If the
577
  nodes are more out-of-sync, for now manual startup of the master
578
  should be attempted.
579

580
  Note that for a even number of nodes cluster, we need at least half
581
  of the nodes (beside ourselves) to vote for us. This creates a
582
  problem on two-node clusters, since in this case we require the
583
  other node to be up too to confirm our status.
584

585
  """
586
  myself = netutils.Hostname.GetSysName()
587
  #temp instantiation of a config writer, used only to get the node list
588
  cfg = config.ConfigWriter()
589
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
590
  del cfg
591
  retries = 6
592
  while retries > 0:
593
    votes = bootstrap.GatherMasterVotes(node_names)
594
    if not votes:
595
      # empty node list, this is a one node cluster
596
      return True
597
    if votes[0][0] is None:
598
      retries -= 1
599
      time.sleep(10)
600
      continue
601
    break
602
  if retries == 0:
603
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
604
                     " after multiple retries. Aborting startup")
605
    logging.critical("Use the --no-voting option if you understand what"
606
                     " effects it has on the cluster state")
607
    return False
608
  # here a real node is at the top of the list
609
  all_votes = sum(item[1] for item in votes)
610
  top_node, top_votes = votes[0]
611

    
612
  result = False
613
  if top_node != myself:
614
    logging.critical("It seems we are not the master (top-voted node"
615
                     " is %s with %d out of %d votes)", top_node, top_votes,
616
                     all_votes)
617
  elif top_votes < all_votes - top_votes:
618
    logging.critical("It seems we are not the master (%d votes for,"
619
                     " %d votes against)", top_votes, all_votes - top_votes)
620
  else:
621
    result = True
622

    
623
  return result
624

    
625

    
626
@rpc.RunWithRPC
627
def ActivateMasterIP():
628
  # activate ip
629
  cfg = config.ConfigWriter()
630
  master_params = cfg.GetMasterNetworkParameters()
631
  ems = cfg.GetUseExternalMipScript()
632
  runner = rpc.BootstrapRunner()
633
  # we use the node name, as the configuration is only available here yet
634
  result = runner.call_node_activate_master_ip(
635
             cfg.GetNodeName(master_params.uuid), master_params, ems)
636

    
637
  msg = result.fail_msg
638
  if msg:
639
    logging.error("Can't activate master IP address: %s", msg)
640

    
641

    
642
def CheckMasterd(options, args):
643
  """Initial checks whether to run or exit with a failure.
644

645
  """
646
  if args: # masterd doesn't take any arguments
647
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
648
    sys.exit(constants.EXIT_FAILURE)
649

    
650
  ssconf.CheckMaster(options.debug)
651

    
652
  try:
653
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
654
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
655
  except KeyError:
656
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
657
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
658
    sys.exit(constants.EXIT_FAILURE)
659

    
660
  # Determine static runtime architecture information
661
  runtime.InitArchInfo()
662

    
663
  # Check the configuration is sane before anything else
664
  try:
665
    config.ConfigWriter()
666
  except errors.ConfigVersionMismatch, err:
667
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
668
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
669
    print >> sys.stderr,  \
670
        ("Configuration version mismatch. The current Ganeti software"
671
         " expects version %s, but the on-disk configuration file has"
672
         " version %s. This is likely the result of upgrading the"
673
         " software without running the upgrade procedure. Please contact"
674
         " your cluster administrator or complete the upgrade using the"
675
         " cfgupgrade utility, after reading the upgrade notes." %
676
         (v1, v2))
677
    sys.exit(constants.EXIT_FAILURE)
678
  except errors.ConfigurationError, err:
679
    print >> sys.stderr, \
680
        ("Configuration error while opening the configuration file: %s\n"
681
         "This might be caused by an incomplete software upgrade or"
682
         " by a corrupted configuration file. Until the problem is fixed"
683
         " the master daemon cannot start." % str(err))
684
    sys.exit(constants.EXIT_FAILURE)
685

    
686
  # If CheckMaster didn't fail we believe we are the master, but we have to
687
  # confirm with the other nodes.
688
  if options.no_voting:
689
    if not options.yes_do_it:
690
      sys.stdout.write("The 'no voting' option has been selected.\n")
691
      sys.stdout.write("This is dangerous, please confirm by"
692
                       " typing uppercase 'yes': ")
693
      sys.stdout.flush()
694

    
695
      confirmation = sys.stdin.readline().strip()
696
      if confirmation != "YES":
697
        print >> sys.stderr, "Aborting."
698
        sys.exit(constants.EXIT_FAILURE)
699

    
700
  else:
701
    # CheckAgreement uses RPC and threads, hence it needs to be run in
702
    # a separate process before we call utils.Daemonize in the current
703
    # process.
704
    if not utils.RunInSeparateProcess(CheckAgreement):
705
      sys.exit(constants.EXIT_FAILURE)
706

    
707
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
708
  # separate process.
709

    
710
  # TODO: decide whether failure to activate the master IP is a fatal error
711
  utils.RunInSeparateProcess(ActivateMasterIP)
712

    
713

    
714
def PrepMasterd(options, _):
715
  """Prep master daemon function, executed with the PID file held.
716

717
  """
718
  # This is safe to do as the pid file guarantees against
719
  # concurrent execution.
720
  utils.RemoveFile(pathutils.MASTER_SOCKET)
721

    
722
  mainloop = daemon.Mainloop()
723
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
724
  return (mainloop, master)
725

    
726

    
727
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
728
  """Main master daemon function, executed with the PID file held.
729

730
  """
731
  (mainloop, master) = prep_data
732
  try:
733
    rpc.Init()
734
    try:
735
      master.setup_queue()
736
      try:
737
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
738
      finally:
739
        master.server_cleanup()
740
    finally:
741
      rpc.Shutdown()
742
  finally:
743
    utils.RemoveFile(pathutils.MASTER_SOCKET)
744

    
745
  logging.info("Clean master daemon shutdown")
746

    
747

    
748
def Main():
749
  """Main function"""
750
  parser = OptionParser(description="Ganeti master daemon",
751
                        usage="%prog [-f] [-d]",
752
                        version="%%prog (ganeti) %s" %
753
                        constants.RELEASE_VERSION)
754
  parser.add_option("--no-voting", dest="no_voting",
755
                    help="Do not check that the nodes agree on this node"
756
                    " being the master and start the daemon unconditionally",
757
                    default=False, action="store_true")
758
  parser.add_option("--yes-do-it", dest="yes_do_it",
759
                    help="Override interactive check for --no-voting",
760
                    default=False, action="store_true")
761
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
762
                     ExecMasterd, multithreaded=True)