Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 306bed0e

History | View | Annotate | Download (24.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62

    
63

    
64
CLIENT_REQUEST_WORKERS = 16
65

    
66
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
67
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
68

    
69

    
70
def _LogNewJob(status, info, ops):
71
  """Log information about a recently submitted job.
72

73
  """
74
  if status:
75
    logging.info("New job with id %s, summary: %s",
76
                 info, utils.CommaJoin(op.Summary() for op in ops))
77
  else:
78
    logging.info("Failed to submit job, reason: '%s', summary: %s",
79
                 info, utils.CommaJoin(op.Summary() for op in ops))
80

    
81

    
82
class ClientRequestWorker(workerpool.BaseWorker):
83
  # pylint: disable=W0221
84
  def RunTask(self, server, message, client):
85
    """Process the request.
86

87
    """
88
    client_ops = ClientOps(server)
89

    
90
    try:
91
      (method, args, version) = luxi.ParseRequest(message)
92
    except luxi.ProtocolError, err:
93
      logging.error("Protocol Error: %s", err)
94
      client.close_log()
95
      return
96

    
97
    success = False
98
    try:
99
      # Verify client's version if there was one in the request
100
      if version is not None and version != constants.LUXI_VERSION:
101
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
102
                               (constants.LUXI_VERSION, version))
103

    
104
      result = client_ops.handle_request(method, args)
105
      success = True
106
    except errors.GenericError, err:
107
      logging.exception("Unexpected exception")
108
      success = False
109
      result = errors.EncodeException(err)
110
    except:
111
      logging.exception("Unexpected exception")
112
      err = sys.exc_info()
113
      result = "Caught exception: %s" % str(err[1])
114

    
115
    try:
116
      reply = luxi.FormatResponse(success, result)
117
      client.send_message(reply)
118
      # awake the main thread so that it can write out the data.
119
      server.awaker.signal()
120
    except: # pylint: disable=W0702
121
      logging.exception("Send error")
122
      client.close_log()
123

    
124

    
125
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
126
  """Handler for master peers.
127

128
  """
129
  _MAX_UNHANDLED = 1
130

    
131
  def __init__(self, server, connected_socket, client_address, family):
132
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
133
                                                 client_address,
134
                                                 constants.LUXI_EOM,
135
                                                 family, self._MAX_UNHANDLED)
136
    self.server = server
137

    
138
  def handle_message(self, message, _):
139
    self.server.request_workers.AddTask((self.server, message, self))
140

    
141

    
142
class _MasterShutdownCheck:
143
  """Logic for master daemon shutdown.
144

145
  """
146
  #: How long to wait between checks
147
  _CHECK_INTERVAL = 5.0
148

    
149
  #: How long to wait after all jobs are done (e.g. to give clients time to
150
  #: retrieve the job status)
151
  _SHUTDOWN_LINGER = 5.0
152

    
153
  def __init__(self):
154
    """Initializes this class.
155

156
    """
157
    self._had_active_jobs = None
158
    self._linger_timeout = None
159

    
160
  def __call__(self, jq_prepare_result):
161
    """Determines if master daemon is ready for shutdown.
162

163
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
164
    @rtype: None or number
165
    @return: None if master daemon is ready, timeout if the check must be
166
             repeated
167

168
    """
169
    if jq_prepare_result:
170
      # Check again shortly
171
      logging.info("Job queue has been notified for shutdown but is still"
172
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
173
      self._had_active_jobs = True
174
      return self._CHECK_INTERVAL
175

    
176
    if not self._had_active_jobs:
177
      # Can shut down as there were no active jobs on the first check
178
      return None
179

    
180
    # No jobs are running anymore, but maybe some clients want to collect some
181
    # information. Give them a short amount of time.
182
    if self._linger_timeout is None:
183
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
184

    
185
    remaining = self._linger_timeout.Remaining()
186

    
187
    logging.info("Job queue no longer busy; shutting down master daemon"
188
                 " in %s seconds", remaining)
189

    
190
    # TODO: Should the master daemon socket be closed at this point? Doing so
191
    # wouldn't affect existing connections.
192

    
193
    if remaining < 0:
194
      return None
195
    else:
196
      return remaining
197

    
198

    
199
class MasterServer(daemon.AsyncStreamServer):
200
  """Master Server.
201

202
  This is the main asynchronous master server. It handles connections to the
203
  master socket.
204

205
  """
206
  family = socket.AF_UNIX
207

    
208
  def __init__(self, address, uid, gid):
209
    """MasterServer constructor
210

211
    @param address: the unix socket address to bind the MasterServer to
212
    @param uid: The uid of the owner of the socket
213
    @param gid: The gid of the owner of the socket
214

215
    """
216
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
217
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
218
    os.chmod(temp_name, 0770)
219
    os.chown(temp_name, uid, gid)
220
    os.rename(temp_name, address)
221

    
222
    self.awaker = daemon.AsyncAwaker()
223

    
224
    # We'll only start threads once we've forked.
225
    self.context = None
226
    self.request_workers = None
227

    
228
    self._shutdown_check = None
229

    
230
  def handle_connection(self, connected_socket, client_address):
231
    # TODO: add connection count and limit the number of open connections to a
232
    # maximum number to avoid breaking for lack of file descriptors or memory.
233
    MasterClientHandler(self, connected_socket, client_address, self.family)
234

    
235
  def setup_queue(self):
236
    self.context = GanetiContext()
237
    self.request_workers = workerpool.WorkerPool("ClientReq",
238
                                                 CLIENT_REQUEST_WORKERS,
239
                                                 ClientRequestWorker)
240

    
241
  def WaitForShutdown(self):
242
    """Prepares server for shutdown.
243

244
    """
245
    if self._shutdown_check is None:
246
      self._shutdown_check = _MasterShutdownCheck()
247

    
248
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
249

    
250
  def server_cleanup(self):
251
    """Cleanup the server.
252

253
    This involves shutting down the processor threads and the master
254
    socket.
255

256
    """
257
    try:
258
      self.close()
259
    finally:
260
      if self.request_workers:
261
        self.request_workers.TerminateWorkers()
262
      if self.context:
263
        self.context.jobqueue.Shutdown()
264

    
265

    
266
class ClientOps:
267
  """Class holding high-level client operations."""
268
  def __init__(self, server):
269
    self.server = server
270

    
271
  def handle_request(self, method, args): # pylint: disable=R0911
272
    context = self.server.context
273
    queue = context.jobqueue
274

    
275
    # TODO: Parameter validation
276
    if not isinstance(args, (tuple, list)):
277
      logging.info("Received invalid arguments of type '%s'", type(args))
278
      raise ValueError("Invalid arguments type '%s'" % type(args))
279

    
280
    # TODO: Rewrite to not exit in each 'if/elif' branch
281

    
282
    if method == luxi.REQ_SUBMIT_JOB:
283
      logging.info("Receiving new job")
284
      (job_def, ) = args
285
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
286
      job_id = queue.SubmitJob(ops)
287
      _LogNewJob(True, job_id, ops)
288
      return job_id
289

    
290
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
291
      logging.info("Receiving multiple jobs")
292
      (job_defs, ) = args
293
      jobs = []
294
      for ops in job_defs:
295
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
296
      job_ids = queue.SubmitManyJobs(jobs)
297
      for ((status, job_id), ops) in zip(job_ids, jobs):
298
        _LogNewJob(status, job_id, ops)
299
      return job_ids
300

    
301
    elif method == luxi.REQ_CANCEL_JOB:
302
      (job_id, ) = args
303
      logging.info("Received job cancel request for %s", job_id)
304
      return queue.CancelJob(job_id)
305

    
306
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
307
      (job_id, priority) = args
308
      logging.info("Received request to change priority for job %s to %s",
309
                   job_id, priority)
310
      return queue.ChangeJobPriority(job_id, priority)
311

    
312
    elif method == luxi.REQ_ARCHIVE_JOB:
313
      (job_id, ) = args
314
      logging.info("Received job archive request for %s", job_id)
315
      return queue.ArchiveJob(job_id)
316

    
317
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
318
      (age, timeout) = args
319
      logging.info("Received job autoarchive request for age %s, timeout %s",
320
                   age, timeout)
321
      return queue.AutoArchiveJobs(age, timeout)
322

    
323
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
324
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
325
      logging.info("Received job poll request for %s", job_id)
326
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
327
                                     prev_log_serial, timeout)
328

    
329
    elif method == luxi.REQ_QUERY:
330
      (what, fields, qfilter) = args
331

    
332
      if what in constants.QR_VIA_OP:
333
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
334
                                             qfilter=qfilter))
335
      elif what == constants.QR_LOCK:
336
        if qfilter is not None:
337
          raise errors.OpPrereqError("Lock queries can't be filtered",
338
                                     errors.ECODE_INVAL)
339
        return context.glm.QueryLocks(fields)
340
      elif what == constants.QR_JOB:
341
        return queue.QueryJobs(fields, qfilter)
342
      elif what in constants.QR_VIA_LUXI:
343
        raise NotImplementedError
344
      else:
345
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
346
                                   errors.ECODE_INVAL)
347

    
348
      return result
349

    
350
    elif method == luxi.REQ_QUERY_FIELDS:
351
      (what, fields) = args
352
      req = objects.QueryFieldsRequest(what=what, fields=fields)
353

    
354
      try:
355
        fielddefs = query.ALL_FIELDS[req.what]
356
      except KeyError:
357
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
358
                                   errors.ECODE_INVAL)
359

    
360
      return query.QueryFields(fielddefs, req.fields)
361

    
362
    elif method == luxi.REQ_QUERY_JOBS:
363
      (job_ids, fields) = args
364
      if isinstance(job_ids, (tuple, list)) and job_ids:
365
        msg = utils.CommaJoin(job_ids)
366
      else:
367
        msg = str(job_ids)
368
      logging.info("Received job query request for %s", msg)
369
      return queue.OldStyleQueryJobs(job_ids, fields)
370

    
371
    elif method == luxi.REQ_QUERY_INSTANCES:
372
      (names, fields, use_locking) = args
373
      logging.info("Received instance query request for %s", names)
374
      if use_locking:
375
        raise errors.OpPrereqError("Sync queries are not allowed",
376
                                   errors.ECODE_INVAL)
377
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
378
                                   use_locking=use_locking)
379
      return self._Query(op)
380

    
381
    elif method == luxi.REQ_QUERY_NODES:
382
      (names, fields, use_locking) = args
383
      logging.info("Received node query request for %s", names)
384
      if use_locking:
385
        raise errors.OpPrereqError("Sync queries are not allowed",
386
                                   errors.ECODE_INVAL)
387
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
388
                               use_locking=use_locking)
389
      return self._Query(op)
390

    
391
    elif method == luxi.REQ_QUERY_GROUPS:
392
      (names, fields, use_locking) = args
393
      logging.info("Received group query request for %s", names)
394
      if use_locking:
395
        raise errors.OpPrereqError("Sync queries are not allowed",
396
                                   errors.ECODE_INVAL)
397
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
398
      return self._Query(op)
399

    
400
    elif method == luxi.REQ_QUERY_NETWORKS:
401
      (names, fields, use_locking) = args
402
      logging.info("Received network query request for %s", names)
403
      if use_locking:
404
        raise errors.OpPrereqError("Sync queries are not allowed",
405
                                   errors.ECODE_INVAL)
406
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
407
      return self._Query(op)
408

    
409
    elif method == luxi.REQ_QUERY_EXPORTS:
410
      (nodes, use_locking) = args
411
      if use_locking:
412
        raise errors.OpPrereqError("Sync queries are not allowed",
413
                                   errors.ECODE_INVAL)
414
      logging.info("Received exports query request")
415
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
416
      return self._Query(op)
417

    
418
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
419
      (fields, ) = args
420
      logging.info("Received config values query request for %s", fields)
421
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
422
      return self._Query(op)
423

    
424
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
425
      logging.info("Received cluster info query request")
426
      op = opcodes.OpClusterQuery()
427
      return self._Query(op)
428

    
429
    elif method == luxi.REQ_QUERY_TAGS:
430
      (kind, name) = args
431
      logging.info("Received tags query request")
432
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
433
      return self._Query(op)
434

    
435
    elif method == luxi.REQ_SET_DRAIN_FLAG:
436
      (drain_flag, ) = args
437
      logging.info("Received queue drain flag change request to %s",
438
                   drain_flag)
439
      return queue.SetDrainFlag(drain_flag)
440

    
441
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
442
      (until, ) = args
443

    
444
      if until is None:
445
        logging.info("Received request to no longer pause the watcher")
446
      else:
447
        if not isinstance(until, (int, float)):
448
          raise TypeError("Duration must be an integer or float")
449

    
450
        if until < time.time():
451
          raise errors.GenericError("Unable to set pause end time in the past")
452

    
453
        logging.info("Received request to pause the watcher until %s", until)
454

    
455
      return _SetWatcherPause(until)
456

    
457
    else:
458
      logging.info("Received invalid request '%s'", method)
459
      raise ValueError("Invalid operation '%s'" % method)
460

    
461
  def _Query(self, op):
462
    """Runs the specified opcode and returns the result.
463

464
    """
465
    # Queries don't have a job id
466
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
467

    
468
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
469
    # Consider using a timeout for retries.
470
    return proc.ExecOpCode(op, None)
471

    
472

    
473
class GanetiContext(object):
474
  """Context common to all ganeti threads.
475

476
  This class creates and holds common objects shared by all threads.
477

478
  """
479
  # pylint: disable=W0212
480
  # we do want to ensure a singleton here
481
  _instance = None
482

    
483
  def __init__(self):
484
    """Constructs a new GanetiContext object.
485

486
    There should be only a GanetiContext object at any time, so this
487
    function raises an error if this is not the case.
488

489
    """
490
    assert self.__class__._instance is None, "double GanetiContext instance"
491

    
492
    # Create global configuration object
493
    self.cfg = config.ConfigWriter()
494

    
495
    # Locking manager
496
    self.glm = locking.GanetiLockManager(
497
      self.cfg.GetNodeList(),
498
      self.cfg.GetNodeGroupList(),
499
      self.cfg.GetInstanceList(),
500
      self.cfg.GetNetworkList())
501

    
502
    self.cfg.SetContext(self)
503

    
504
    # RPC runner
505
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
506

    
507
    # Job queue
508
    self.jobqueue = jqueue.JobQueue(self)
509

    
510
    # setting this also locks the class against attribute modifications
511
    self.__class__._instance = self
512

    
513
  def __setattr__(self, name, value):
514
    """Setting GanetiContext attributes is forbidden after initialization.
515

516
    """
517
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
518
    object.__setattr__(self, name, value)
519

    
520
  def AddNode(self, node, ec_id):
521
    """Adds a node to the configuration and lock manager.
522

523
    """
524
    # Add it to the configuration
525
    self.cfg.AddNode(node, ec_id)
526

    
527
    # If preseeding fails it'll not be added
528
    self.jobqueue.AddNode(node)
529

    
530
    # Add the new node to the Ganeti Lock Manager
531
    self.glm.add(locking.LEVEL_NODE, node.name)
532
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
533

    
534
  def ReaddNode(self, node):
535
    """Updates a node that's already in the configuration
536

537
    """
538
    # Synchronize the queue again
539
    self.jobqueue.AddNode(node)
540

    
541
  def RemoveNode(self, name):
542
    """Removes a node from the configuration and lock manager.
543

544
    """
545
    # Remove node from configuration
546
    self.cfg.RemoveNode(name)
547

    
548
    # Notify job queue
549
    self.jobqueue.RemoveNode(name)
550

    
551
    # Remove the node from the Ganeti Lock Manager
552
    self.glm.remove(locking.LEVEL_NODE, name)
553
    self.glm.remove(locking.LEVEL_NODE_RES, name)
554

    
555

    
556
def _SetWatcherPause(until):
557
  """Creates or removes the watcher pause file.
558

559
  @type until: None or int
560
  @param until: Unix timestamp saying until when the watcher shouldn't run
561

562
  """
563
  if until is None:
564
    utils.RemoveFile(pathutils.WATCHER_PAUSEFILE)
565
  else:
566
    utils.WriteFile(pathutils.WATCHER_PAUSEFILE,
567
                    data="%d\n" % (until, ))
568

    
569
  return until
570

    
571

    
572
@rpc.RunWithRPC
573
def CheckAgreement():
574
  """Check the agreement on who is the master.
575

576
  The function uses a very simple algorithm: we must get more positive
577
  than negative answers. Since in most of the cases we are the master,
578
  we'll use our own config file for getting the node list. In the
579
  future we could collect the current node list from our (possibly
580
  obsolete) known nodes.
581

582
  In order to account for cold-start of all nodes, we retry for up to
583
  a minute until we get a real answer as the top-voted one. If the
584
  nodes are more out-of-sync, for now manual startup of the master
585
  should be attempted.
586

587
  Note that for a even number of nodes cluster, we need at least half
588
  of the nodes (beside ourselves) to vote for us. This creates a
589
  problem on two-node clusters, since in this case we require the
590
  other node to be up too to confirm our status.
591

592
  """
593
  myself = netutils.Hostname.GetSysName()
594
  #temp instantiation of a config writer, used only to get the node list
595
  cfg = config.ConfigWriter()
596
  node_list = cfg.GetNodeList()
597
  del cfg
598
  retries = 6
599
  while retries > 0:
600
    votes = bootstrap.GatherMasterVotes(node_list)
601
    if not votes:
602
      # empty node list, this is a one node cluster
603
      return True
604
    if votes[0][0] is None:
605
      retries -= 1
606
      time.sleep(10)
607
      continue
608
    break
609
  if retries == 0:
610
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
611
                     " after multiple retries. Aborting startup")
612
    logging.critical("Use the --no-voting option if you understand what"
613
                     " effects it has on the cluster state")
614
    return False
615
  # here a real node is at the top of the list
616
  all_votes = sum(item[1] for item in votes)
617
  top_node, top_votes = votes[0]
618

    
619
  result = False
620
  if top_node != myself:
621
    logging.critical("It seems we are not the master (top-voted node"
622
                     " is %s with %d out of %d votes)", top_node, top_votes,
623
                     all_votes)
624
  elif top_votes < all_votes - top_votes:
625
    logging.critical("It seems we are not the master (%d votes for,"
626
                     " %d votes against)", top_votes, all_votes - top_votes)
627
  else:
628
    result = True
629

    
630
  return result
631

    
632

    
633
@rpc.RunWithRPC
634
def ActivateMasterIP():
635
  # activate ip
636
  cfg = config.ConfigWriter()
637
  master_params = cfg.GetMasterNetworkParameters()
638
  ems = cfg.GetUseExternalMipScript()
639
  runner = rpc.BootstrapRunner()
640
  result = runner.call_node_activate_master_ip(master_params.name,
641
                                               master_params, ems)
642

    
643
  msg = result.fail_msg
644
  if msg:
645
    logging.error("Can't activate master IP address: %s", msg)
646

    
647

    
648
def CheckMasterd(options, args):
649
  """Initial checks whether to run or exit with a failure.
650

651
  """
652
  if args: # masterd doesn't take any arguments
653
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
654
    sys.exit(constants.EXIT_FAILURE)
655

    
656
  ssconf.CheckMaster(options.debug)
657

    
658
  try:
659
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
660
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
661
  except KeyError:
662
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
663
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
664
    sys.exit(constants.EXIT_FAILURE)
665

    
666
  # Determine static runtime architecture information
667
  runtime.InitArchInfo()
668

    
669
  # Check the configuration is sane before anything else
670
  try:
671
    config.ConfigWriter()
672
  except errors.ConfigVersionMismatch, err:
673
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
674
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
675
    print >> sys.stderr,  \
676
        ("Configuration version mismatch. The current Ganeti software"
677
         " expects version %s, but the on-disk configuration file has"
678
         " version %s. This is likely the result of upgrading the"
679
         " software without running the upgrade procedure. Please contact"
680
         " your cluster administrator or complete the upgrade using the"
681
         " cfgupgrade utility, after reading the upgrade notes." %
682
         (v1, v2))
683
    sys.exit(constants.EXIT_FAILURE)
684
  except errors.ConfigurationError, err:
685
    print >> sys.stderr, \
686
        ("Configuration error while opening the configuration file: %s\n"
687
         "This might be caused by an incomplete software upgrade or"
688
         " by a corrupted configuration file. Until the problem is fixed"
689
         " the master daemon cannot start." % str(err))
690
    sys.exit(constants.EXIT_FAILURE)
691

    
692
  # If CheckMaster didn't fail we believe we are the master, but we have to
693
  # confirm with the other nodes.
694
  if options.no_voting:
695
    if not options.yes_do_it:
696
      sys.stdout.write("The 'no voting' option has been selected.\n")
697
      sys.stdout.write("This is dangerous, please confirm by"
698
                       " typing uppercase 'yes': ")
699
      sys.stdout.flush()
700

    
701
      confirmation = sys.stdin.readline().strip()
702
      if confirmation != "YES":
703
        print >> sys.stderr, "Aborting."
704
        sys.exit(constants.EXIT_FAILURE)
705

    
706
  else:
707
    # CheckAgreement uses RPC and threads, hence it needs to be run in
708
    # a separate process before we call utils.Daemonize in the current
709
    # process.
710
    if not utils.RunInSeparateProcess(CheckAgreement):
711
      sys.exit(constants.EXIT_FAILURE)
712

    
713
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
714
  # separate process.
715

    
716
  # TODO: decide whether failure to activate the master IP is a fatal error
717
  utils.RunInSeparateProcess(ActivateMasterIP)
718

    
719

    
720
def PrepMasterd(options, _):
721
  """Prep master daemon function, executed with the PID file held.
722

723
  """
724
  # This is safe to do as the pid file guarantees against
725
  # concurrent execution.
726
  utils.RemoveFile(pathutils.MASTER_SOCKET)
727

    
728
  mainloop = daemon.Mainloop()
729
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
730
  return (mainloop, master)
731

    
732

    
733
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
734
  """Main master daemon function, executed with the PID file held.
735

736
  """
737
  (mainloop, master) = prep_data
738
  try:
739
    rpc.Init()
740
    try:
741
      master.setup_queue()
742
      try:
743
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
744
      finally:
745
        master.server_cleanup()
746
    finally:
747
      rpc.Shutdown()
748
  finally:
749
    utils.RemoveFile(pathutils.MASTER_SOCKET)
750

    
751
  logging.info("Clean master daemon shutdown")
752

    
753

    
754
def Main():
755
  """Main function"""
756
  parser = OptionParser(description="Ganeti master daemon",
757
                        usage="%prog [-f] [-d]",
758
                        version="%%prog (ganeti) %s" %
759
                        constants.RELEASE_VERSION)
760
  parser.add_option("--no-voting", dest="no_voting",
761
                    help="Do not check that the nodes agree on this node"
762
                    " being the master and start the daemon unconditionally",
763
                    default=False, action="store_true")
764
  parser.add_option("--yes-do-it", dest="yes_do_it",
765
                    help="Override interactive check for --no-voting",
766
                    default=False, action="store_true")
767
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
768
                     ExecMasterd, multithreaded=True)