Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 912b2278

History | View | Annotate | Download (24.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
import ganeti.rpc.node as rpc
56
import ganeti.rpc.client as rpccl
57
from ganeti import bootstrap
58
from ganeti import netutils
59
from ganeti import objects
60
from ganeti import query
61
from ganeti import runtime
62
from ganeti import pathutils
63
from ganeti import ht
64

    
65
from ganeti.utils import version
66

    
67

    
68
CLIENT_REQUEST_WORKERS = 16
69

    
70
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
71
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
72

    
73

    
74
def _LogNewJob(status, info, ops):
75
  """Log information about a recently submitted job.
76

77
  """
78
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
79

    
80
  if status:
81
    logging.info("New job with id %s, summary: %s", info, op_summary)
82
  else:
83
    logging.info("Failed to submit job, reason: '%s', summary: %s",
84
                 info, op_summary)
85

    
86

    
87
class ClientRequestWorker(workerpool.BaseWorker):
88
  # pylint: disable=W0221
89
  def RunTask(self, server, message, client):
90
    """Process the request.
91

92
    """
93
    client_ops = ClientOps(server)
94

    
95
    try:
96
      (method, args, ver) = rpccl.ParseRequest(message)
97
    except luxi.ProtocolError, err:
98
      logging.error("Protocol Error: %s", err)
99
      client.close_log()
100
      return
101

    
102
    success = False
103
    try:
104
      # Verify client's version if there was one in the request
105
      if ver is not None and ver != constants.LUXI_VERSION:
106
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
107
                               (constants.LUXI_VERSION, ver))
108

    
109
      result = client_ops.handle_request(method, args)
110
      success = True
111
    except errors.GenericError, err:
112
      logging.exception("Unexpected exception")
113
      success = False
114
      result = errors.EncodeException(err)
115
    except:
116
      logging.exception("Unexpected exception")
117
      err = sys.exc_info()
118
      result = "Caught exception: %s" % str(err[1])
119

    
120
    try:
121
      reply = rpccl.FormatResponse(success, result)
122
      client.send_message(reply)
123
      # awake the main thread so that it can write out the data.
124
      server.awaker.signal()
125
    except: # pylint: disable=W0702
126
      logging.exception("Send error")
127
      client.close_log()
128

    
129

    
130
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
131
  """Handler for master peers.
132

133
  """
134
  _MAX_UNHANDLED = 1
135

    
136
  def __init__(self, server, connected_socket, client_address, family):
137
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
138
                                                 client_address,
139
                                                 constants.LUXI_EOM,
140
                                                 family, self._MAX_UNHANDLED)
141
    self.server = server
142

    
143
  def handle_message(self, message, _):
144
    self.server.request_workers.AddTask((self.server, message, self))
145

    
146

    
147
class _MasterShutdownCheck:
148
  """Logic for master daemon shutdown.
149

150
  """
151
  #: How long to wait between checks
152
  _CHECK_INTERVAL = 5.0
153

    
154
  #: How long to wait after all jobs are done (e.g. to give clients time to
155
  #: retrieve the job status)
156
  _SHUTDOWN_LINGER = 5.0
157

    
158
  def __init__(self):
159
    """Initializes this class.
160

161
    """
162
    self._had_active_jobs = None
163
    self._linger_timeout = None
164

    
165
  def __call__(self, jq_prepare_result):
166
    """Determines if master daemon is ready for shutdown.
167

168
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
169
    @rtype: None or number
170
    @return: None if master daemon is ready, timeout if the check must be
171
             repeated
172

173
    """
174
    if jq_prepare_result:
175
      # Check again shortly
176
      logging.info("Job queue has been notified for shutdown but is still"
177
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
178
      self._had_active_jobs = True
179
      return self._CHECK_INTERVAL
180

    
181
    if not self._had_active_jobs:
182
      # Can shut down as there were no active jobs on the first check
183
      return None
184

    
185
    # No jobs are running anymore, but maybe some clients want to collect some
186
    # information. Give them a short amount of time.
187
    if self._linger_timeout is None:
188
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
189

    
190
    remaining = self._linger_timeout.Remaining()
191

    
192
    logging.info("Job queue no longer busy; shutting down master daemon"
193
                 " in %s seconds", remaining)
194

    
195
    # TODO: Should the master daemon socket be closed at this point? Doing so
196
    # wouldn't affect existing connections.
197

    
198
    if remaining < 0:
199
      return None
200
    else:
201
      return remaining
202

    
203

    
204
class MasterServer(daemon.AsyncStreamServer):
205
  """Master Server.
206

207
  This is the main asynchronous master server. It handles connections to the
208
  master socket.
209

210
  """
211
  family = socket.AF_UNIX
212

    
213
  def __init__(self, address, uid, gid):
214
    """MasterServer constructor
215

216
    @param address: the unix socket address to bind the MasterServer to
217
    @param uid: The uid of the owner of the socket
218
    @param gid: The gid of the owner of the socket
219

220
    """
221
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
222
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
223
    os.chmod(temp_name, 0770)
224
    os.chown(temp_name, uid, gid)
225
    os.rename(temp_name, address)
226

    
227
    self.awaker = daemon.AsyncAwaker()
228

    
229
    # We'll only start threads once we've forked.
230
    self.context = None
231
    self.request_workers = None
232

    
233
    self._shutdown_check = None
234

    
235
  def handle_connection(self, connected_socket, client_address):
236
    # TODO: add connection count and limit the number of open connections to a
237
    # maximum number to avoid breaking for lack of file descriptors or memory.
238
    MasterClientHandler(self, connected_socket, client_address, self.family)
239

    
240
  def setup_queue(self):
241
    self.context = GanetiContext()
242
    self.request_workers = workerpool.WorkerPool("ClientReq",
243
                                                 CLIENT_REQUEST_WORKERS,
244
                                                 ClientRequestWorker)
245

    
246
  def WaitForShutdown(self):
247
    """Prepares server for shutdown.
248

249
    """
250
    if self._shutdown_check is None:
251
      self._shutdown_check = _MasterShutdownCheck()
252

    
253
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
254

    
255
  def server_cleanup(self):
256
    """Cleanup the server.
257

258
    This involves shutting down the processor threads and the master
259
    socket.
260

261
    """
262
    try:
263
      self.close()
264
    finally:
265
      if self.request_workers:
266
        self.request_workers.TerminateWorkers()
267
      if self.context:
268
        self.context.jobqueue.Shutdown()
269

    
270

    
271
class ClientOps:
272
  """Class holding high-level client operations."""
273
  def __init__(self, server):
274
    self.server = server
275

    
276
  def handle_request(self, method, args): # pylint: disable=R0911
277
    context = self.server.context
278
    queue = context.jobqueue
279

    
280
    # TODO: Parameter validation
281
    if not isinstance(args, (tuple, list)):
282
      logging.info("Received invalid arguments of type '%s'", type(args))
283
      raise ValueError("Invalid arguments type '%s'" % type(args))
284

    
285
    if method not in luxi.REQ_ALL:
286
      logging.info("Received invalid request '%s'", method)
287
      raise ValueError("Invalid operation '%s'" % method)
288

    
289
    # TODO: Rewrite to not exit in each 'if/elif' branch
290

    
291
    if method == luxi.REQ_SUBMIT_JOB:
292
      logging.info("Receiving new job")
293
      (job_def, ) = args
294
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
295
      job_id = queue.SubmitJob(ops)
296
      _LogNewJob(True, job_id, ops)
297
      return job_id
298

    
299
    elif method == luxi.REQ_PICKUP_JOB:
300
      logging.info("Picking up new job from queue")
301
      (job_id, ) = args
302
      queue.PickupJob(job_id)
303

    
304
    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
305
      logging.info("Forcefully receiving new job")
306
      (job_def, ) = args
307
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
308
      job_id = queue.SubmitJobToDrainedQueue(ops)
309
      _LogNewJob(True, job_id, ops)
310
      return job_id
311

    
312
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
313
      logging.info("Receiving multiple jobs")
314
      (job_defs, ) = args
315
      jobs = []
316
      for ops in job_defs:
317
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
318
      job_ids = queue.SubmitManyJobs(jobs)
319
      for ((status, job_id), ops) in zip(job_ids, jobs):
320
        _LogNewJob(status, job_id, ops)
321
      return job_ids
322

    
323
    elif method == luxi.REQ_CANCEL_JOB:
324
      (job_id, ) = args
325
      logging.info("Received job cancel request for %s", job_id)
326
      return queue.CancelJob(job_id)
327

    
328
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
329
      (job_id, priority) = args
330
      logging.info("Received request to change priority for job %s to %s",
331
                   job_id, priority)
332
      return queue.ChangeJobPriority(job_id, priority)
333

    
334
    elif method == luxi.REQ_ARCHIVE_JOB:
335
      (job_id, ) = args
336
      logging.info("Received job archive request for %s", job_id)
337
      return queue.ArchiveJob(job_id)
338

    
339
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
340
      (age, timeout) = args
341
      logging.info("Received job autoarchive request for age %s, timeout %s",
342
                   age, timeout)
343
      return queue.AutoArchiveJobs(age, timeout)
344

    
345
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
346
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
347
      logging.info("Received job poll request for %s", job_id)
348
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
349
                                     prev_log_serial, timeout)
350

    
351
    elif method == luxi.REQ_QUERY:
352
      (what, fields, qfilter) = args
353

    
354
      if what in constants.QR_VIA_OP:
355
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
356
                                             qfilter=qfilter))
357
      elif what == constants.QR_LOCK:
358
        if qfilter is not None:
359
          raise errors.OpPrereqError("Lock queries can't be filtered",
360
                                     errors.ECODE_INVAL)
361
        return context.glm.QueryLocks(fields)
362
      elif what == constants.QR_JOB:
363
        return queue.QueryJobs(fields, qfilter)
364
      elif what in constants.QR_VIA_LUXI:
365
        luxi_client = runtime.GetClient(query=True)
366
        result = luxi_client.Query(what, fields, qfilter).ToDict()
367
      else:
368
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
369
                                   errors.ECODE_INVAL)
370

    
371
      return result
372

    
373
    elif method == luxi.REQ_QUERY_FIELDS:
374
      (what, fields) = args
375
      req = objects.QueryFieldsRequest(what=what, fields=fields)
376

    
377
      try:
378
        fielddefs = query.ALL_FIELDS[req.what]
379
      except KeyError:
380
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
381
                                   errors.ECODE_INVAL)
382

    
383
      return query.QueryFields(fielddefs, req.fields)
384

    
385
    elif method == luxi.REQ_QUERY_JOBS:
386
      (job_ids, fields) = args
387
      if isinstance(job_ids, (tuple, list)) and job_ids:
388
        msg = utils.CommaJoin(job_ids)
389
      else:
390
        msg = str(job_ids)
391
      logging.info("Received job query request for %s", msg)
392
      return queue.OldStyleQueryJobs(job_ids, fields)
393

    
394
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
395
      (fields, ) = args
396
      logging.info("Received config values query request for %s", fields)
397
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
398
      return self._Query(op)
399

    
400
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
401
      logging.info("Received cluster info query request")
402
      op = opcodes.OpClusterQuery()
403
      return self._Query(op)
404

    
405
    elif method == luxi.REQ_QUERY_TAGS:
406
      (kind, name) = args
407
      logging.info("Received tags query request")
408
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
409
      return self._Query(op)
410

    
411
    elif method == luxi.REQ_SET_DRAIN_FLAG:
412
      (drain_flag, ) = args
413
      logging.info("Received queue drain flag change request to %s",
414
                   drain_flag)
415
      return queue.SetDrainFlag(drain_flag)
416

    
417
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
418
      (until, ) = args
419

    
420
      return _SetWatcherPause(context, until)
421

    
422
    else:
423
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
424
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
425
                                   " but not implemented" % method)
426

    
427
  def _Query(self, op):
428
    """Runs the specified opcode and returns the result.
429

430
    """
431
    # Queries don't have a job id
432
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
433

    
434
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
435
    # Consider using a timeout for retries.
436
    return proc.ExecOpCode(op, None)
437

    
438

    
439
class GanetiContext(object):
440
  """Context common to all ganeti threads.
441

442
  This class creates and holds common objects shared by all threads.
443

444
  """
445
  # pylint: disable=W0212
446
  # we do want to ensure a singleton here
447
  _instance = None
448

    
449
  def __init__(self):
450
    """Constructs a new GanetiContext object.
451

452
    There should be only a GanetiContext object at any time, so this
453
    function raises an error if this is not the case.
454

455
    """
456
    assert self.__class__._instance is None, "double GanetiContext instance"
457

    
458
    # Create global configuration object
459
    self.cfg = config.ConfigWriter()
460

    
461
    # Locking manager
462
    self.glm = locking.GanetiLockManager(
463
      self.cfg.GetNodeList(),
464
      self.cfg.GetNodeGroupList(),
465
      [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
466
      self.cfg.GetNetworkList())
467

    
468
    self.cfg.SetContext(self)
469

    
470
    # RPC runner
471
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
472

    
473
    # Job queue
474
    self.jobqueue = jqueue.JobQueue(self)
475

    
476
    # setting this also locks the class against attribute modifications
477
    self.__class__._instance = self
478

    
479
  def __setattr__(self, name, value):
480
    """Setting GanetiContext attributes is forbidden after initialization.
481

482
    """
483
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
484
    object.__setattr__(self, name, value)
485

    
486
  def AddNode(self, node, ec_id):
487
    """Adds a node to the configuration and lock manager.
488

489
    """
490
    # Add it to the configuration
491
    self.cfg.AddNode(node, ec_id)
492

    
493
    # If preseeding fails it'll not be added
494
    self.jobqueue.AddNode(node)
495

    
496
    # Add the new node to the Ganeti Lock Manager
497
    self.glm.add(locking.LEVEL_NODE, node.uuid)
498
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
499

    
500
  def ReaddNode(self, node):
501
    """Updates a node that's already in the configuration
502

503
    """
504
    # Synchronize the queue again
505
    self.jobqueue.AddNode(node)
506

    
507
  def RemoveNode(self, node):
508
    """Removes a node from the configuration and lock manager.
509

510
    """
511
    # Remove node from configuration
512
    self.cfg.RemoveNode(node.uuid)
513

    
514
    # Notify job queue
515
    self.jobqueue.RemoveNode(node.name)
516

    
517
    # Remove the node from the Ganeti Lock Manager
518
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
519
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
520

    
521

    
522
def _SetWatcherPause(context, until):
523
  """Creates or removes the watcher pause file.
524

525
  @type context: L{GanetiContext}
526
  @param context: Global Ganeti context
527
  @type until: None or int
528
  @param until: Unix timestamp saying until when the watcher shouldn't run
529

530
  """
531
  node_names = context.cfg.GetNodeList()
532

    
533
  if until is None:
534
    logging.info("Received request to no longer pause watcher")
535
  else:
536
    if not ht.TNumber(until):
537
      raise TypeError("Duration must be numeric")
538

    
539
    if until < time.time():
540
      raise errors.GenericError("Unable to set pause end time in the past")
541

    
542
    logging.info("Received request to pause watcher until %s", until)
543

    
544
  result = context.rpc.call_set_watcher_pause(node_names, until)
545

    
546
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
547
                           for (node_name, nres) in result.items()
548
                           if nres.fail_msg and not nres.offline)
549
  if errmsg:
550
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
551
                             " on the following node(s): %s" % errmsg)
552

    
553
  return until
554

    
555

    
556
@rpc.RunWithRPC
557
def CheckAgreement():
558
  """Check the agreement on who is the master.
559

560
  The function uses a very simple algorithm: we must get more positive
561
  than negative answers. Since in most of the cases we are the master,
562
  we'll use our own config file for getting the node list. In the
563
  future we could collect the current node list from our (possibly
564
  obsolete) known nodes.
565

566
  In order to account for cold-start of all nodes, we retry for up to
567
  a minute until we get a real answer as the top-voted one. If the
568
  nodes are more out-of-sync, for now manual startup of the master
569
  should be attempted.
570

571
  Note that for a even number of nodes cluster, we need at least half
572
  of the nodes (beside ourselves) to vote for us. This creates a
573
  problem on two-node clusters, since in this case we require the
574
  other node to be up too to confirm our status.
575

576
  """
577
  myself = netutils.Hostname.GetSysName()
578
  #temp instantiation of a config writer, used only to get the node list
579
  cfg = config.ConfigWriter()
580
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
581
  del cfg
582
  retries = 6
583
  while retries > 0:
584
    votes = bootstrap.GatherMasterVotes(node_names)
585
    if not votes:
586
      # empty node list, this is a one node cluster
587
      return True
588
    if votes[0][0] is None:
589
      retries -= 1
590
      time.sleep(10)
591
      continue
592
    break
593
  if retries == 0:
594
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
595
                     " after multiple retries. Aborting startup")
596
    logging.critical("Use the --no-voting option if you understand what"
597
                     " effects it has on the cluster state")
598
    return False
599
  # here a real node is at the top of the list
600
  all_votes = sum(item[1] for item in votes)
601
  top_node, top_votes = votes[0]
602

    
603
  result = False
604
  if top_node != myself:
605
    logging.critical("It seems we are not the master (top-voted node"
606
                     " is %s with %d out of %d votes)", top_node, top_votes,
607
                     all_votes)
608
  elif top_votes < all_votes - top_votes:
609
    logging.critical("It seems we are not the master (%d votes for,"
610
                     " %d votes against)", top_votes, all_votes - top_votes)
611
  else:
612
    result = True
613

    
614
  return result
615

    
616

    
617
@rpc.RunWithRPC
618
def ActivateMasterIP():
619
  # activate ip
620
  cfg = config.ConfigWriter()
621
  master_params = cfg.GetMasterNetworkParameters()
622
  ems = cfg.GetUseExternalMipScript()
623
  runner = rpc.BootstrapRunner()
624
  # we use the node name, as the configuration is only available here yet
625
  result = runner.call_node_activate_master_ip(
626
             cfg.GetNodeName(master_params.uuid), master_params, ems)
627

    
628
  msg = result.fail_msg
629
  if msg:
630
    logging.error("Can't activate master IP address: %s", msg)
631

    
632

    
633
def CheckMasterd(options, args):
634
  """Initial checks whether to run or exit with a failure.
635

636
  """
637
  if args: # masterd doesn't take any arguments
638
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
639
    sys.exit(constants.EXIT_FAILURE)
640

    
641
  ssconf.CheckMaster(options.debug)
642

    
643
  try:
644
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
645
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
646
  except KeyError:
647
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
648
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
649
    sys.exit(constants.EXIT_FAILURE)
650

    
651
  # Determine static runtime architecture information
652
  runtime.InitArchInfo()
653

    
654
  # Check the configuration is sane before anything else
655
  try:
656
    config.ConfigWriter()
657
  except errors.ConfigVersionMismatch, err:
658
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
659
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
660
    print >> sys.stderr,  \
661
        ("Configuration version mismatch. The current Ganeti software"
662
         " expects version %s, but the on-disk configuration file has"
663
         " version %s. This is likely the result of upgrading the"
664
         " software without running the upgrade procedure. Please contact"
665
         " your cluster administrator or complete the upgrade using the"
666
         " cfgupgrade utility, after reading the upgrade notes." %
667
         (v1, v2))
668
    sys.exit(constants.EXIT_FAILURE)
669
  except errors.ConfigurationError, err:
670
    print >> sys.stderr, \
671
        ("Configuration error while opening the configuration file: %s\n"
672
         "This might be caused by an incomplete software upgrade or"
673
         " by a corrupted configuration file. Until the problem is fixed"
674
         " the master daemon cannot start." % str(err))
675
    sys.exit(constants.EXIT_FAILURE)
676

    
677
  # If CheckMaster didn't fail we believe we are the master, but we have to
678
  # confirm with the other nodes.
679
  if options.no_voting:
680
    if not options.yes_do_it:
681
      sys.stdout.write("The 'no voting' option has been selected.\n")
682
      sys.stdout.write("This is dangerous, please confirm by"
683
                       " typing uppercase 'yes': ")
684
      sys.stdout.flush()
685

    
686
      confirmation = sys.stdin.readline().strip()
687
      if confirmation != "YES":
688
        print >> sys.stderr, "Aborting."
689
        sys.exit(constants.EXIT_FAILURE)
690

    
691
  else:
692
    # CheckAgreement uses RPC and threads, hence it needs to be run in
693
    # a separate process before we call utils.Daemonize in the current
694
    # process.
695
    if not utils.RunInSeparateProcess(CheckAgreement):
696
      sys.exit(constants.EXIT_FAILURE)
697

    
698
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
699
  # separate process.
700

    
701
  # TODO: decide whether failure to activate the master IP is a fatal error
702
  utils.RunInSeparateProcess(ActivateMasterIP)
703

    
704

    
705
def PrepMasterd(options, _):
706
  """Prep master daemon function, executed with the PID file held.
707

708
  """
709
  # This is safe to do as the pid file guarantees against
710
  # concurrent execution.
711
  utils.RemoveFile(pathutils.MASTER_SOCKET)
712

    
713
  mainloop = daemon.Mainloop()
714
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
715
  return (mainloop, master)
716

    
717

    
718
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
719
  """Main master daemon function, executed with the PID file held.
720

721
  """
722
  (mainloop, master) = prep_data
723
  try:
724
    rpc.Init()
725
    try:
726
      master.setup_queue()
727
      try:
728
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
729
      finally:
730
        master.server_cleanup()
731
    finally:
732
      rpc.Shutdown()
733
  finally:
734
    utils.RemoveFile(pathutils.MASTER_SOCKET)
735

    
736
  logging.info("Clean master daemon shutdown")
737

    
738

    
739
def Main():
740
  """Main function"""
741
  parser = OptionParser(description="Ganeti master daemon",
742
                        usage="%prog [-f] [-d]",
743
                        version="%%prog (ganeti) %s" %
744
                        constants.RELEASE_VERSION)
745
  parser.add_option("--no-voting", dest="no_voting",
746
                    help="Do not check that the nodes agree on this node"
747
                    " being the master and start the daemon unconditionally",
748
                    default=False, action="store_true")
749
  parser.add_option("--yes-do-it", dest="yes_do_it",
750
                    help="Override interactive check for --no-voting",
751
                    default=False, action="store_true")
752
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
753
                     ExecMasterd, multithreaded=True)