Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 7c4bd156

History | View | Annotate | Download (25.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62
from ganeti import ht
63

    
64

    
65
CLIENT_REQUEST_WORKERS = 16
66

    
67
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
68
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
69

    
70

    
71
def _LogNewJob(status, info, ops):
72
  """Log information about a recently submitted job.
73

74
  """
75
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
76

    
77
  if status:
78
    logging.info("New job with id %s, summary: %s", info, op_summary)
79
  else:
80
    logging.info("Failed to submit job, reason: '%s', summary: %s",
81
                 info, op_summary)
82

    
83

    
84
class ClientRequestWorker(workerpool.BaseWorker):
85
  # pylint: disable=W0221
86
  def RunTask(self, server, message, client):
87
    """Process the request.
88

89
    """
90
    client_ops = ClientOps(server)
91

    
92
    try:
93
      (method, args, version) = luxi.ParseRequest(message)
94
    except luxi.ProtocolError, err:
95
      logging.error("Protocol Error: %s", err)
96
      client.close_log()
97
      return
98

    
99
    success = False
100
    try:
101
      # Verify client's version if there was one in the request
102
      if version is not None and version != constants.LUXI_VERSION:
103
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
104
                               (constants.LUXI_VERSION, version))
105

    
106
      result = client_ops.handle_request(method, args)
107
      success = True
108
    except errors.GenericError, err:
109
      logging.exception("Unexpected exception")
110
      success = False
111
      result = errors.EncodeException(err)
112
    except:
113
      logging.exception("Unexpected exception")
114
      err = sys.exc_info()
115
      result = "Caught exception: %s" % str(err[1])
116

    
117
    try:
118
      reply = luxi.FormatResponse(success, result)
119
      client.send_message(reply)
120
      # awake the main thread so that it can write out the data.
121
      server.awaker.signal()
122
    except: # pylint: disable=W0702
123
      logging.exception("Send error")
124
      client.close_log()
125

    
126

    
127
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
128
  """Handler for master peers.
129

130
  """
131
  _MAX_UNHANDLED = 1
132

    
133
  def __init__(self, server, connected_socket, client_address, family):
134
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
135
                                                 client_address,
136
                                                 constants.LUXI_EOM,
137
                                                 family, self._MAX_UNHANDLED)
138
    self.server = server
139

    
140
  def handle_message(self, message, _):
141
    self.server.request_workers.AddTask((self.server, message, self))
142

    
143

    
144
class _MasterShutdownCheck:
145
  """Logic for master daemon shutdown.
146

147
  """
148
  #: How long to wait between checks
149
  _CHECK_INTERVAL = 5.0
150

    
151
  #: How long to wait after all jobs are done (e.g. to give clients time to
152
  #: retrieve the job status)
153
  _SHUTDOWN_LINGER = 5.0
154

    
155
  def __init__(self):
156
    """Initializes this class.
157

158
    """
159
    self._had_active_jobs = None
160
    self._linger_timeout = None
161

    
162
  def __call__(self, jq_prepare_result):
163
    """Determines if master daemon is ready for shutdown.
164

165
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
166
    @rtype: None or number
167
    @return: None if master daemon is ready, timeout if the check must be
168
             repeated
169

170
    """
171
    if jq_prepare_result:
172
      # Check again shortly
173
      logging.info("Job queue has been notified for shutdown but is still"
174
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
175
      self._had_active_jobs = True
176
      return self._CHECK_INTERVAL
177

    
178
    if not self._had_active_jobs:
179
      # Can shut down as there were no active jobs on the first check
180
      return None
181

    
182
    # No jobs are running anymore, but maybe some clients want to collect some
183
    # information. Give them a short amount of time.
184
    if self._linger_timeout is None:
185
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
186

    
187
    remaining = self._linger_timeout.Remaining()
188

    
189
    logging.info("Job queue no longer busy; shutting down master daemon"
190
                 " in %s seconds", remaining)
191

    
192
    # TODO: Should the master daemon socket be closed at this point? Doing so
193
    # wouldn't affect existing connections.
194

    
195
    if remaining < 0:
196
      return None
197
    else:
198
      return remaining
199

    
200

    
201
class MasterServer(daemon.AsyncStreamServer):
202
  """Master Server.
203

204
  This is the main asynchronous master server. It handles connections to the
205
  master socket.
206

207
  """
208
  family = socket.AF_UNIX
209

    
210
  def __init__(self, address, uid, gid):
211
    """MasterServer constructor
212

213
    @param address: the unix socket address to bind the MasterServer to
214
    @param uid: The uid of the owner of the socket
215
    @param gid: The gid of the owner of the socket
216

217
    """
218
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
219
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
220
    os.chmod(temp_name, 0770)
221
    os.chown(temp_name, uid, gid)
222
    os.rename(temp_name, address)
223

    
224
    self.awaker = daemon.AsyncAwaker()
225

    
226
    # We'll only start threads once we've forked.
227
    self.context = None
228
    self.request_workers = None
229

    
230
    self._shutdown_check = None
231

    
232
  def handle_connection(self, connected_socket, client_address):
233
    # TODO: add connection count and limit the number of open connections to a
234
    # maximum number to avoid breaking for lack of file descriptors or memory.
235
    MasterClientHandler(self, connected_socket, client_address, self.family)
236

    
237
  def setup_queue(self):
238
    self.context = GanetiContext()
239
    self.request_workers = workerpool.WorkerPool("ClientReq",
240
                                                 CLIENT_REQUEST_WORKERS,
241
                                                 ClientRequestWorker)
242

    
243
  def WaitForShutdown(self):
244
    """Prepares server for shutdown.
245

246
    """
247
    if self._shutdown_check is None:
248
      self._shutdown_check = _MasterShutdownCheck()
249

    
250
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251

    
252
  def server_cleanup(self):
253
    """Cleanup the server.
254

255
    This involves shutting down the processor threads and the master
256
    socket.
257

258
    """
259
    try:
260
      self.close()
261
    finally:
262
      if self.request_workers:
263
        self.request_workers.TerminateWorkers()
264
      if self.context:
265
        self.context.jobqueue.Shutdown()
266

    
267

    
268
class ClientOps:
269
  """Class holding high-level client operations."""
270
  def __init__(self, server):
271
    self.server = server
272

    
273
  def handle_request(self, method, args): # pylint: disable=R0911
274
    context = self.server.context
275
    queue = context.jobqueue
276

    
277
    # TODO: Parameter validation
278
    if not isinstance(args, (tuple, list)):
279
      logging.info("Received invalid arguments of type '%s'", type(args))
280
      raise ValueError("Invalid arguments type '%s'" % type(args))
281

    
282
    # TODO: Rewrite to not exit in each 'if/elif' branch
283

    
284
    if method == luxi.REQ_SUBMIT_JOB:
285
      logging.info("Receiving new job")
286
      (job_def, ) = args
287
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
288
      job_id = queue.SubmitJob(ops)
289
      _LogNewJob(True, job_id, ops)
290
      return job_id
291

    
292
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
293
      logging.info("Receiving multiple jobs")
294
      (job_defs, ) = args
295
      jobs = []
296
      for ops in job_defs:
297
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
298
      job_ids = queue.SubmitManyJobs(jobs)
299
      for ((status, job_id), ops) in zip(job_ids, jobs):
300
        _LogNewJob(status, job_id, ops)
301
      return job_ids
302

    
303
    elif method == luxi.REQ_CANCEL_JOB:
304
      (job_id, ) = args
305
      logging.info("Received job cancel request for %s", job_id)
306
      return queue.CancelJob(job_id)
307

    
308
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
309
      (job_id, priority) = args
310
      logging.info("Received request to change priority for job %s to %s",
311
                   job_id, priority)
312
      return queue.ChangeJobPriority(job_id, priority)
313

    
314
    elif method == luxi.REQ_ARCHIVE_JOB:
315
      (job_id, ) = args
316
      logging.info("Received job archive request for %s", job_id)
317
      return queue.ArchiveJob(job_id)
318

    
319
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
320
      (age, timeout) = args
321
      logging.info("Received job autoarchive request for age %s, timeout %s",
322
                   age, timeout)
323
      return queue.AutoArchiveJobs(age, timeout)
324

    
325
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
326
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
327
      logging.info("Received job poll request for %s", job_id)
328
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
329
                                     prev_log_serial, timeout)
330

    
331
    elif method == luxi.REQ_QUERY:
332
      (what, fields, qfilter) = args
333

    
334
      if what in constants.QR_VIA_OP:
335
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
336
                                             qfilter=qfilter))
337
      elif what == constants.QR_LOCK:
338
        if qfilter is not None:
339
          raise errors.OpPrereqError("Lock queries can't be filtered",
340
                                     errors.ECODE_INVAL)
341
        return context.glm.QueryLocks(fields)
342
      elif what == constants.QR_JOB:
343
        return queue.QueryJobs(fields, qfilter)
344
      elif what in constants.QR_VIA_LUXI:
345
        raise NotImplementedError
346
      else:
347
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
348
                                   errors.ECODE_INVAL)
349

    
350
      return result
351

    
352
    elif method == luxi.REQ_QUERY_FIELDS:
353
      (what, fields) = args
354
      req = objects.QueryFieldsRequest(what=what, fields=fields)
355

    
356
      try:
357
        fielddefs = query.ALL_FIELDS[req.what]
358
      except KeyError:
359
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
360
                                   errors.ECODE_INVAL)
361

    
362
      return query.QueryFields(fielddefs, req.fields)
363

    
364
    elif method == luxi.REQ_QUERY_JOBS:
365
      (job_ids, fields) = args
366
      if isinstance(job_ids, (tuple, list)) and job_ids:
367
        msg = utils.CommaJoin(job_ids)
368
      else:
369
        msg = str(job_ids)
370
      logging.info("Received job query request for %s", msg)
371
      return queue.OldStyleQueryJobs(job_ids, fields)
372

    
373
    elif method == luxi.REQ_QUERY_INSTANCES:
374
      (names, fields, use_locking) = args
375
      logging.info("Received instance query request for %s", names)
376
      if use_locking:
377
        raise errors.OpPrereqError("Sync queries are not allowed",
378
                                   errors.ECODE_INVAL)
379
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
380
                                   use_locking=use_locking)
381
      return self._Query(op)
382

    
383
    elif method == luxi.REQ_QUERY_NODES:
384
      (names, fields, use_locking) = args
385
      logging.info("Received node query request for %s", names)
386
      if use_locking:
387
        raise errors.OpPrereqError("Sync queries are not allowed",
388
                                   errors.ECODE_INVAL)
389
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
390
                               use_locking=use_locking)
391
      return self._Query(op)
392

    
393
    elif method == luxi.REQ_QUERY_GROUPS:
394
      (names, fields, use_locking) = args
395
      logging.info("Received group query request for %s", names)
396
      if use_locking:
397
        raise errors.OpPrereqError("Sync queries are not allowed",
398
                                   errors.ECODE_INVAL)
399
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
400
      return self._Query(op)
401

    
402
    elif method == luxi.REQ_QUERY_NETWORKS:
403
      (names, fields, use_locking) = args
404
      logging.info("Received network query request for %s", names)
405
      if use_locking:
406
        raise errors.OpPrereqError("Sync queries are not allowed",
407
                                   errors.ECODE_INVAL)
408
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
409
      return self._Query(op)
410

    
411
    elif method == luxi.REQ_QUERY_EXPORTS:
412
      (nodes, use_locking) = args
413
      if use_locking:
414
        raise errors.OpPrereqError("Sync queries are not allowed",
415
                                   errors.ECODE_INVAL)
416
      logging.info("Received exports query request")
417
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
418
      return self._Query(op)
419

    
420
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
421
      (fields, ) = args
422
      logging.info("Received config values query request for %s", fields)
423
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
424
      return self._Query(op)
425

    
426
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
427
      logging.info("Received cluster info query request")
428
      op = opcodes.OpClusterQuery()
429
      return self._Query(op)
430

    
431
    elif method == luxi.REQ_QUERY_TAGS:
432
      (kind, name) = args
433
      logging.info("Received tags query request")
434
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
435
      return self._Query(op)
436

    
437
    elif method == luxi.REQ_SET_DRAIN_FLAG:
438
      (drain_flag, ) = args
439
      logging.info("Received queue drain flag change request to %s",
440
                   drain_flag)
441
      return queue.SetDrainFlag(drain_flag)
442

    
443
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
444
      (until, ) = args
445

    
446
      return _SetWatcherPause(context, until)
447

    
448
    else:
449
      logging.info("Received invalid request '%s'", method)
450
      raise ValueError("Invalid operation '%s'" % method)
451

    
452
  def _Query(self, op):
453
    """Runs the specified opcode and returns the result.
454

455
    """
456
    # Queries don't have a job id
457
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
458

    
459
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
460
    # Consider using a timeout for retries.
461
    return proc.ExecOpCode(op, None)
462

    
463

    
464
class GanetiContext(object):
465
  """Context common to all ganeti threads.
466

467
  This class creates and holds common objects shared by all threads.
468

469
  """
470
  # pylint: disable=W0212
471
  # we do want to ensure a singleton here
472
  _instance = None
473

    
474
  def __init__(self):
475
    """Constructs a new GanetiContext object.
476

477
    There should be only a GanetiContext object at any time, so this
478
    function raises an error if this is not the case.
479

480
    """
481
    assert self.__class__._instance is None, "double GanetiContext instance"
482

    
483
    # Create global configuration object
484
    self.cfg = config.ConfigWriter()
485

    
486
    # Locking manager
487
    self.glm = locking.GanetiLockManager(
488
      self.cfg.GetNodeList(),
489
      self.cfg.GetNodeGroupList(),
490
      self.cfg.GetInstanceList(),
491
      self.cfg.GetNetworkList())
492

    
493
    self.cfg.SetContext(self)
494

    
495
    # RPC runner
496
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
497

    
498
    # Job queue
499
    self.jobqueue = jqueue.JobQueue(self)
500

    
501
    # setting this also locks the class against attribute modifications
502
    self.__class__._instance = self
503

    
504
  def __setattr__(self, name, value):
505
    """Setting GanetiContext attributes is forbidden after initialization.
506

507
    """
508
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
509
    object.__setattr__(self, name, value)
510

    
511
  def AddNode(self, node, ec_id):
512
    """Adds a node to the configuration and lock manager.
513

514
    """
515
    # Add it to the configuration
516
    self.cfg.AddNode(node, ec_id)
517

    
518
    # If preseeding fails it'll not be added
519
    self.jobqueue.AddNode(node)
520

    
521
    # Add the new node to the Ganeti Lock Manager
522
    self.glm.add(locking.LEVEL_NODE, node.name)
523
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
524

    
525
  def ReaddNode(self, node):
526
    """Updates a node that's already in the configuration
527

528
    """
529
    # Synchronize the queue again
530
    self.jobqueue.AddNode(node)
531

    
532
  def RemoveNode(self, name):
533
    """Removes a node from the configuration and lock manager.
534

535
    """
536
    # Remove node from configuration
537
    self.cfg.RemoveNode(name)
538

    
539
    # Notify job queue
540
    self.jobqueue.RemoveNode(name)
541

    
542
    # Remove the node from the Ganeti Lock Manager
543
    self.glm.remove(locking.LEVEL_NODE, name)
544
    self.glm.remove(locking.LEVEL_NODE_RES, name)
545

    
546

    
547
def _SetWatcherPause(context, until):
548
  """Creates or removes the watcher pause file.
549

550
  @type context: L{GanetiContext}
551
  @param context: Global Ganeti context
552
  @type until: None or int
553
  @param until: Unix timestamp saying until when the watcher shouldn't run
554

555
  """
556
  node_names = context.cfg.GetNodeList()
557

    
558
  if until is None:
559
    logging.info("Received request to no longer pause watcher")
560
  else:
561
    if not ht.TNumber(until):
562
      raise TypeError("Duration must be numeric")
563

    
564
    if until < time.time():
565
      raise errors.GenericError("Unable to set pause end time in the past")
566

    
567
    logging.info("Received request to pause watcher until %s", until)
568

    
569
  result = context.rpc.call_set_watcher_pause(node_names, until)
570

    
571
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
572
                           for (node_name, nres) in result.items()
573
                           if nres.fail_msg and not nres.offline)
574
  if errmsg:
575
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
576
                             " on the following node(s): %s" % errmsg)
577

    
578
  return until
579

    
580

    
581
@rpc.RunWithRPC
582
def CheckAgreement():
583
  """Check the agreement on who is the master.
584

585
  The function uses a very simple algorithm: we must get more positive
586
  than negative answers. Since in most of the cases we are the master,
587
  we'll use our own config file for getting the node list. In the
588
  future we could collect the current node list from our (possibly
589
  obsolete) known nodes.
590

591
  In order to account for cold-start of all nodes, we retry for up to
592
  a minute until we get a real answer as the top-voted one. If the
593
  nodes are more out-of-sync, for now manual startup of the master
594
  should be attempted.
595

596
  Note that for a even number of nodes cluster, we need at least half
597
  of the nodes (beside ourselves) to vote for us. This creates a
598
  problem on two-node clusters, since in this case we require the
599
  other node to be up too to confirm our status.
600

601
  """
602
  myself = netutils.Hostname.GetSysName()
603
  #temp instantiation of a config writer, used only to get the node list
604
  cfg = config.ConfigWriter()
605
  node_list = cfg.GetNodeList()
606
  del cfg
607
  retries = 6
608
  while retries > 0:
609
    votes = bootstrap.GatherMasterVotes(node_list)
610
    if not votes:
611
      # empty node list, this is a one node cluster
612
      return True
613
    if votes[0][0] is None:
614
      retries -= 1
615
      time.sleep(10)
616
      continue
617
    break
618
  if retries == 0:
619
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
620
                     " after multiple retries. Aborting startup")
621
    logging.critical("Use the --no-voting option if you understand what"
622
                     " effects it has on the cluster state")
623
    return False
624
  # here a real node is at the top of the list
625
  all_votes = sum(item[1] for item in votes)
626
  top_node, top_votes = votes[0]
627

    
628
  result = False
629
  if top_node != myself:
630
    logging.critical("It seems we are not the master (top-voted node"
631
                     " is %s with %d out of %d votes)", top_node, top_votes,
632
                     all_votes)
633
  elif top_votes < all_votes - top_votes:
634
    logging.critical("It seems we are not the master (%d votes for,"
635
                     " %d votes against)", top_votes, all_votes - top_votes)
636
  else:
637
    result = True
638

    
639
  return result
640

    
641

    
642
@rpc.RunWithRPC
643
def ActivateMasterIP():
644
  # activate ip
645
  cfg = config.ConfigWriter()
646
  master_params = cfg.GetMasterNetworkParameters()
647
  ems = cfg.GetUseExternalMipScript()
648
  runner = rpc.BootstrapRunner()
649
  result = runner.call_node_activate_master_ip(master_params.name,
650
                                               master_params, ems)
651

    
652
  msg = result.fail_msg
653
  if msg:
654
    logging.error("Can't activate master IP address: %s", msg)
655

    
656

    
657
def CheckMasterd(options, args):
658
  """Initial checks whether to run or exit with a failure.
659

660
  """
661
  if args: # masterd doesn't take any arguments
662
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
663
    sys.exit(constants.EXIT_FAILURE)
664

    
665
  ssconf.CheckMaster(options.debug)
666

    
667
  try:
668
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
669
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
670
  except KeyError:
671
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
672
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
673
    sys.exit(constants.EXIT_FAILURE)
674

    
675
  # Determine static runtime architecture information
676
  runtime.InitArchInfo()
677

    
678
  # Check the configuration is sane before anything else
679
  try:
680
    config.ConfigWriter()
681
  except errors.ConfigVersionMismatch, err:
682
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
683
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
684
    print >> sys.stderr,  \
685
        ("Configuration version mismatch. The current Ganeti software"
686
         " expects version %s, but the on-disk configuration file has"
687
         " version %s. This is likely the result of upgrading the"
688
         " software without running the upgrade procedure. Please contact"
689
         " your cluster administrator or complete the upgrade using the"
690
         " cfgupgrade utility, after reading the upgrade notes." %
691
         (v1, v2))
692
    sys.exit(constants.EXIT_FAILURE)
693
  except errors.ConfigurationError, err:
694
    print >> sys.stderr, \
695
        ("Configuration error while opening the configuration file: %s\n"
696
         "This might be caused by an incomplete software upgrade or"
697
         " by a corrupted configuration file. Until the problem is fixed"
698
         " the master daemon cannot start." % str(err))
699
    sys.exit(constants.EXIT_FAILURE)
700

    
701
  # If CheckMaster didn't fail we believe we are the master, but we have to
702
  # confirm with the other nodes.
703
  if options.no_voting:
704
    if not options.yes_do_it:
705
      sys.stdout.write("The 'no voting' option has been selected.\n")
706
      sys.stdout.write("This is dangerous, please confirm by"
707
                       " typing uppercase 'yes': ")
708
      sys.stdout.flush()
709

    
710
      confirmation = sys.stdin.readline().strip()
711
      if confirmation != "YES":
712
        print >> sys.stderr, "Aborting."
713
        sys.exit(constants.EXIT_FAILURE)
714

    
715
  else:
716
    # CheckAgreement uses RPC and threads, hence it needs to be run in
717
    # a separate process before we call utils.Daemonize in the current
718
    # process.
719
    if not utils.RunInSeparateProcess(CheckAgreement):
720
      sys.exit(constants.EXIT_FAILURE)
721

    
722
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
723
  # separate process.
724

    
725
  # TODO: decide whether failure to activate the master IP is a fatal error
726
  utils.RunInSeparateProcess(ActivateMasterIP)
727

    
728

    
729
def PrepMasterd(options, _):
730
  """Prep master daemon function, executed with the PID file held.
731

732
  """
733
  # This is safe to do as the pid file guarantees against
734
  # concurrent execution.
735
  utils.RemoveFile(pathutils.MASTER_SOCKET)
736

    
737
  mainloop = daemon.Mainloop()
738
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
739
  return (mainloop, master)
740

    
741

    
742
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
743
  """Main master daemon function, executed with the PID file held.
744

745
  """
746
  (mainloop, master) = prep_data
747
  try:
748
    rpc.Init()
749
    try:
750
      master.setup_queue()
751
      try:
752
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
753
      finally:
754
        master.server_cleanup()
755
    finally:
756
      rpc.Shutdown()
757
  finally:
758
    utils.RemoveFile(pathutils.MASTER_SOCKET)
759

    
760
  logging.info("Clean master daemon shutdown")
761

    
762

    
763
def Main():
764
  """Main function"""
765
  parser = OptionParser(description="Ganeti master daemon",
766
                        usage="%prog [-f] [-d]",
767
                        version="%%prog (ganeti) %s" %
768
                        constants.RELEASE_VERSION)
769
  parser.add_option("--no-voting", dest="no_voting",
770
                    help="Do not check that the nodes agree on this node"
771
                    " being the master and start the daemon unconditionally",
772
                    default=False, action="store_true")
773
  parser.add_option("--yes-do-it", dest="yes_do_it",
774
                    help="Override interactive check for --no-voting",
775
                    default=False, action="store_true")
776
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
777
                     ExecMasterd, multithreaded=True)