Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ effc1b86

History | View | Annotate | Download (25.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62
from ganeti import ht
63

    
64
from ganeti.utils import version
65

    
66

    
67
CLIENT_REQUEST_WORKERS = 16
68

    
69
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
70
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
71

    
72

    
73
def _LogNewJob(status, info, ops):
74
  """Log information about a recently submitted job.
75

76
  """
77
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
78

    
79
  if status:
80
    logging.info("New job with id %s, summary: %s", info, op_summary)
81
  else:
82
    logging.info("Failed to submit job, reason: '%s', summary: %s",
83
                 info, op_summary)
84

    
85

    
86
class ClientRequestWorker(workerpool.BaseWorker):
87
  # pylint: disable=W0221
88
  def RunTask(self, server, message, client):
89
    """Process the request.
90

91
    """
92
    client_ops = ClientOps(server)
93

    
94
    try:
95
      (method, args, ver) = luxi.ParseRequest(message)
96
    except luxi.ProtocolError, err:
97
      logging.error("Protocol Error: %s", err)
98
      client.close_log()
99
      return
100

    
101
    success = False
102
    try:
103
      # Verify client's version if there was one in the request
104
      if ver is not None and ver != constants.LUXI_VERSION:
105
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
106
                               (constants.LUXI_VERSION, ver))
107

    
108
      result = client_ops.handle_request(method, args)
109
      success = True
110
    except errors.GenericError, err:
111
      logging.exception("Unexpected exception")
112
      success = False
113
      result = errors.EncodeException(err)
114
    except:
115
      logging.exception("Unexpected exception")
116
      err = sys.exc_info()
117
      result = "Caught exception: %s" % str(err[1])
118

    
119
    try:
120
      reply = luxi.FormatResponse(success, result)
121
      client.send_message(reply)
122
      # awake the main thread so that it can write out the data.
123
      server.awaker.signal()
124
    except: # pylint: disable=W0702
125
      logging.exception("Send error")
126
      client.close_log()
127

    
128

    
129
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
130
  """Handler for master peers.
131

132
  """
133
  _MAX_UNHANDLED = 1
134

    
135
  def __init__(self, server, connected_socket, client_address, family):
136
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
137
                                                 client_address,
138
                                                 constants.LUXI_EOM,
139
                                                 family, self._MAX_UNHANDLED)
140
    self.server = server
141

    
142
  def handle_message(self, message, _):
143
    self.server.request_workers.AddTask((self.server, message, self))
144

    
145

    
146
class _MasterShutdownCheck:
147
  """Logic for master daemon shutdown.
148

149
  """
150
  #: How long to wait between checks
151
  _CHECK_INTERVAL = 5.0
152

    
153
  #: How long to wait after all jobs are done (e.g. to give clients time to
154
  #: retrieve the job status)
155
  _SHUTDOWN_LINGER = 5.0
156

    
157
  def __init__(self):
158
    """Initializes this class.
159

160
    """
161
    self._had_active_jobs = None
162
    self._linger_timeout = None
163

    
164
  def __call__(self, jq_prepare_result):
165
    """Determines if master daemon is ready for shutdown.
166

167
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
168
    @rtype: None or number
169
    @return: None if master daemon is ready, timeout if the check must be
170
             repeated
171

172
    """
173
    if jq_prepare_result:
174
      # Check again shortly
175
      logging.info("Job queue has been notified for shutdown but is still"
176
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
177
      self._had_active_jobs = True
178
      return self._CHECK_INTERVAL
179

    
180
    if not self._had_active_jobs:
181
      # Can shut down as there were no active jobs on the first check
182
      return None
183

    
184
    # No jobs are running anymore, but maybe some clients want to collect some
185
    # information. Give them a short amount of time.
186
    if self._linger_timeout is None:
187
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
188

    
189
    remaining = self._linger_timeout.Remaining()
190

    
191
    logging.info("Job queue no longer busy; shutting down master daemon"
192
                 " in %s seconds", remaining)
193

    
194
    # TODO: Should the master daemon socket be closed at this point? Doing so
195
    # wouldn't affect existing connections.
196

    
197
    if remaining < 0:
198
      return None
199
    else:
200
      return remaining
201

    
202

    
203
class MasterServer(daemon.AsyncStreamServer):
204
  """Master Server.
205

206
  This is the main asynchronous master server. It handles connections to the
207
  master socket.
208

209
  """
210
  family = socket.AF_UNIX
211

    
212
  def __init__(self, address, uid, gid):
213
    """MasterServer constructor
214

215
    @param address: the unix socket address to bind the MasterServer to
216
    @param uid: The uid of the owner of the socket
217
    @param gid: The gid of the owner of the socket
218

219
    """
220
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
221
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
222
    os.chmod(temp_name, 0770)
223
    os.chown(temp_name, uid, gid)
224
    os.rename(temp_name, address)
225

    
226
    self.awaker = daemon.AsyncAwaker()
227

    
228
    # We'll only start threads once we've forked.
229
    self.context = None
230
    self.request_workers = None
231

    
232
    self._shutdown_check = None
233

    
234
  def handle_connection(self, connected_socket, client_address):
235
    # TODO: add connection count and limit the number of open connections to a
236
    # maximum number to avoid breaking for lack of file descriptors or memory.
237
    MasterClientHandler(self, connected_socket, client_address, self.family)
238

    
239
  def setup_queue(self):
240
    self.context = GanetiContext()
241
    self.request_workers = workerpool.WorkerPool("ClientReq",
242
                                                 CLIENT_REQUEST_WORKERS,
243
                                                 ClientRequestWorker)
244

    
245
  def WaitForShutdown(self):
246
    """Prepares server for shutdown.
247

248
    """
249
    if self._shutdown_check is None:
250
      self._shutdown_check = _MasterShutdownCheck()
251

    
252
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
253

    
254
  def server_cleanup(self):
255
    """Cleanup the server.
256

257
    This involves shutting down the processor threads and the master
258
    socket.
259

260
    """
261
    try:
262
      self.close()
263
    finally:
264
      if self.request_workers:
265
        self.request_workers.TerminateWorkers()
266
      if self.context:
267
        self.context.jobqueue.Shutdown()
268

    
269

    
270
class ClientOps:
271
  """Class holding high-level client operations."""
272
  def __init__(self, server):
273
    self.server = server
274

    
275
  def handle_request(self, method, args): # pylint: disable=R0911
276
    context = self.server.context
277
    queue = context.jobqueue
278

    
279
    # TODO: Parameter validation
280
    if not isinstance(args, (tuple, list)):
281
      logging.info("Received invalid arguments of type '%s'", type(args))
282
      raise ValueError("Invalid arguments type '%s'" % type(args))
283

    
284
    if method not in luxi.REQ_ALL:
285
      logging.info("Received invalid request '%s'", method)
286
      raise ValueError("Invalid operation '%s'" % method)
287

    
288
    # TODO: Rewrite to not exit in each 'if/elif' branch
289

    
290
    if method == luxi.REQ_SUBMIT_JOB:
291
      logging.info("Receiving new job")
292
      (job_def, ) = args
293
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
294
      job_id = queue.SubmitJob(ops)
295
      _LogNewJob(True, job_id, ops)
296
      return job_id
297

    
298
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
299
      logging.info("Receiving multiple jobs")
300
      (job_defs, ) = args
301
      jobs = []
302
      for ops in job_defs:
303
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
304
      job_ids = queue.SubmitManyJobs(jobs)
305
      for ((status, job_id), ops) in zip(job_ids, jobs):
306
        _LogNewJob(status, job_id, ops)
307
      return job_ids
308

    
309
    elif method == luxi.REQ_CANCEL_JOB:
310
      (job_id, ) = args
311
      logging.info("Received job cancel request for %s", job_id)
312
      return queue.CancelJob(job_id)
313

    
314
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
315
      (job_id, priority) = args
316
      logging.info("Received request to change priority for job %s to %s",
317
                   job_id, priority)
318
      return queue.ChangeJobPriority(job_id, priority)
319

    
320
    elif method == luxi.REQ_ARCHIVE_JOB:
321
      (job_id, ) = args
322
      logging.info("Received job archive request for %s", job_id)
323
      return queue.ArchiveJob(job_id)
324

    
325
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
326
      (age, timeout) = args
327
      logging.info("Received job autoarchive request for age %s, timeout %s",
328
                   age, timeout)
329
      return queue.AutoArchiveJobs(age, timeout)
330

    
331
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
332
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
333
      logging.info("Received job poll request for %s", job_id)
334
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
335
                                     prev_log_serial, timeout)
336

    
337
    elif method == luxi.REQ_QUERY:
338
      (what, fields, qfilter) = args
339

    
340
      if what in constants.QR_VIA_OP:
341
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
342
                                             qfilter=qfilter))
343
      elif what == constants.QR_LOCK:
344
        if qfilter is not None:
345
          raise errors.OpPrereqError("Lock queries can't be filtered",
346
                                     errors.ECODE_INVAL)
347
        return context.glm.QueryLocks(fields)
348
      elif what == constants.QR_JOB:
349
        return queue.QueryJobs(fields, qfilter)
350
      elif what in constants.QR_VIA_LUXI:
351
        raise NotImplementedError
352
      else:
353
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
354
                                   errors.ECODE_INVAL)
355

    
356
      return result
357

    
358
    elif method == luxi.REQ_QUERY_FIELDS:
359
      (what, fields) = args
360
      req = objects.QueryFieldsRequest(what=what, fields=fields)
361

    
362
      try:
363
        fielddefs = query.ALL_FIELDS[req.what]
364
      except KeyError:
365
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
366
                                   errors.ECODE_INVAL)
367

    
368
      return query.QueryFields(fielddefs, req.fields)
369

    
370
    elif method == luxi.REQ_QUERY_JOBS:
371
      (job_ids, fields) = args
372
      if isinstance(job_ids, (tuple, list)) and job_ids:
373
        msg = utils.CommaJoin(job_ids)
374
      else:
375
        msg = str(job_ids)
376
      logging.info("Received job query request for %s", msg)
377
      return queue.OldStyleQueryJobs(job_ids, fields)
378

    
379
    elif method == luxi.REQ_QUERY_INSTANCES:
380
      (names, fields, use_locking) = args
381
      logging.info("Received instance query request for %s", names)
382
      if use_locking:
383
        raise errors.OpPrereqError("Sync queries are not allowed",
384
                                   errors.ECODE_INVAL)
385
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
386
                                   use_locking=use_locking)
387
      return self._Query(op)
388

    
389
    elif method == luxi.REQ_QUERY_NODES:
390
      (names, fields, use_locking) = args
391
      logging.info("Received node query request for %s", names)
392
      if use_locking:
393
        raise errors.OpPrereqError("Sync queries are not allowed",
394
                                   errors.ECODE_INVAL)
395
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
396
                               use_locking=use_locking)
397
      return self._Query(op)
398

    
399
    elif method == luxi.REQ_QUERY_GROUPS:
400
      (names, fields, use_locking) = args
401
      logging.info("Received group query request for %s", names)
402
      if use_locking:
403
        raise errors.OpPrereqError("Sync queries are not allowed",
404
                                   errors.ECODE_INVAL)
405
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
406
      return self._Query(op)
407

    
408
    elif method == luxi.REQ_QUERY_NETWORKS:
409
      (names, fields, use_locking) = args
410
      logging.info("Received network query request for %s", names)
411
      if use_locking:
412
        raise errors.OpPrereqError("Sync queries are not allowed",
413
                                   errors.ECODE_INVAL)
414
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
415
      return self._Query(op)
416

    
417
    elif method == luxi.REQ_QUERY_EXPORTS:
418
      (nodes, use_locking) = args
419
      if use_locking:
420
        raise errors.OpPrereqError("Sync queries are not allowed",
421
                                   errors.ECODE_INVAL)
422
      logging.info("Received exports query request")
423
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
424
      return self._Query(op)
425

    
426
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
427
      (fields, ) = args
428
      logging.info("Received config values query request for %s", fields)
429
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
430
      return self._Query(op)
431

    
432
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
433
      logging.info("Received cluster info query request")
434
      op = opcodes.OpClusterQuery()
435
      return self._Query(op)
436

    
437
    elif method == luxi.REQ_QUERY_TAGS:
438
      (kind, name) = args
439
      logging.info("Received tags query request")
440
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
441
      return self._Query(op)
442

    
443
    elif method == luxi.REQ_SET_DRAIN_FLAG:
444
      (drain_flag, ) = args
445
      logging.info("Received queue drain flag change request to %s",
446
                   drain_flag)
447
      return queue.SetDrainFlag(drain_flag)
448

    
449
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
450
      (until, ) = args
451

    
452
      return _SetWatcherPause(context, until)
453

    
454
    else:
455
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
456
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
457
                                   " but not implemented" % method)
458

    
459
  def _Query(self, op):
460
    """Runs the specified opcode and returns the result.
461

462
    """
463
    # Queries don't have a job id
464
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
465

    
466
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
467
    # Consider using a timeout for retries.
468
    return proc.ExecOpCode(op, None)
469

    
470

    
471
class GanetiContext(object):
472
  """Context common to all ganeti threads.
473

474
  This class creates and holds common objects shared by all threads.
475

476
  """
477
  # pylint: disable=W0212
478
  # we do want to ensure a singleton here
479
  _instance = None
480

    
481
  def __init__(self):
482
    """Constructs a new GanetiContext object.
483

484
    There should be only a GanetiContext object at any time, so this
485
    function raises an error if this is not the case.
486

487
    """
488
    assert self.__class__._instance is None, "double GanetiContext instance"
489

    
490
    # Create global configuration object
491
    self.cfg = config.ConfigWriter()
492

    
493
    # Locking manager
494
    self.glm = locking.GanetiLockManager(
495
      self.cfg.GetNodeList(),
496
      self.cfg.GetNodeGroupList(),
497
      [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
498
      self.cfg.GetNetworkList())
499

    
500
    self.cfg.SetContext(self)
501

    
502
    # RPC runner
503
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
504

    
505
    # Job queue
506
    self.jobqueue = jqueue.JobQueue(self)
507

    
508
    # setting this also locks the class against attribute modifications
509
    self.__class__._instance = self
510

    
511
  def __setattr__(self, name, value):
512
    """Setting GanetiContext attributes is forbidden after initialization.
513

514
    """
515
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
516
    object.__setattr__(self, name, value)
517

    
518
  def AddNode(self, node, ec_id):
519
    """Adds a node to the configuration and lock manager.
520

521
    """
522
    # Add it to the configuration
523
    self.cfg.AddNode(node, ec_id)
524

    
525
    # If preseeding fails it'll not be added
526
    self.jobqueue.AddNode(node)
527

    
528
    # Add the new node to the Ganeti Lock Manager
529
    self.glm.add(locking.LEVEL_NODE, node.uuid)
530
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
531

    
532
  def ReaddNode(self, node):
533
    """Updates a node that's already in the configuration
534

535
    """
536
    # Synchronize the queue again
537
    self.jobqueue.AddNode(node)
538

    
539
  def RemoveNode(self, node):
540
    """Removes a node from the configuration and lock manager.
541

542
    """
543
    # Remove node from configuration
544
    self.cfg.RemoveNode(node.uuid)
545

    
546
    # Notify job queue
547
    self.jobqueue.RemoveNode(node.name)
548

    
549
    # Remove the node from the Ganeti Lock Manager
550
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
551
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
552

    
553

    
554
def _SetWatcherPause(context, until):
555
  """Creates or removes the watcher pause file.
556

557
  @type context: L{GanetiContext}
558
  @param context: Global Ganeti context
559
  @type until: None or int
560
  @param until: Unix timestamp saying until when the watcher shouldn't run
561

562
  """
563
  node_names = context.cfg.GetNodeList()
564

    
565
  if until is None:
566
    logging.info("Received request to no longer pause watcher")
567
  else:
568
    if not ht.TNumber(until):
569
      raise TypeError("Duration must be numeric")
570

    
571
    if until < time.time():
572
      raise errors.GenericError("Unable to set pause end time in the past")
573

    
574
    logging.info("Received request to pause watcher until %s", until)
575

    
576
  result = context.rpc.call_set_watcher_pause(node_names, until)
577

    
578
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
579
                           for (node_name, nres) in result.items()
580
                           if nres.fail_msg and not nres.offline)
581
  if errmsg:
582
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
583
                             " on the following node(s): %s" % errmsg)
584

    
585
  return until
586

    
587

    
588
@rpc.RunWithRPC
589
def CheckAgreement():
590
  """Check the agreement on who is the master.
591

592
  The function uses a very simple algorithm: we must get more positive
593
  than negative answers. Since in most of the cases we are the master,
594
  we'll use our own config file for getting the node list. In the
595
  future we could collect the current node list from our (possibly
596
  obsolete) known nodes.
597

598
  In order to account for cold-start of all nodes, we retry for up to
599
  a minute until we get a real answer as the top-voted one. If the
600
  nodes are more out-of-sync, for now manual startup of the master
601
  should be attempted.
602

603
  Note that for a even number of nodes cluster, we need at least half
604
  of the nodes (beside ourselves) to vote for us. This creates a
605
  problem on two-node clusters, since in this case we require the
606
  other node to be up too to confirm our status.
607

608
  """
609
  myself = netutils.Hostname.GetSysName()
610
  #temp instantiation of a config writer, used only to get the node list
611
  cfg = config.ConfigWriter()
612
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
613
  del cfg
614
  retries = 6
615
  while retries > 0:
616
    votes = bootstrap.GatherMasterVotes(node_names)
617
    if not votes:
618
      # empty node list, this is a one node cluster
619
      return True
620
    if votes[0][0] is None:
621
      retries -= 1
622
      time.sleep(10)
623
      continue
624
    break
625
  if retries == 0:
626
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
627
                     " after multiple retries. Aborting startup")
628
    logging.critical("Use the --no-voting option if you understand what"
629
                     " effects it has on the cluster state")
630
    return False
631
  # here a real node is at the top of the list
632
  all_votes = sum(item[1] for item in votes)
633
  top_node, top_votes = votes[0]
634

    
635
  result = False
636
  if top_node != myself:
637
    logging.critical("It seems we are not the master (top-voted node"
638
                     " is %s with %d out of %d votes)", top_node, top_votes,
639
                     all_votes)
640
  elif top_votes < all_votes - top_votes:
641
    logging.critical("It seems we are not the master (%d votes for,"
642
                     " %d votes against)", top_votes, all_votes - top_votes)
643
  else:
644
    result = True
645

    
646
  return result
647

    
648

    
649
@rpc.RunWithRPC
650
def ActivateMasterIP():
651
  # activate ip
652
  cfg = config.ConfigWriter()
653
  master_params = cfg.GetMasterNetworkParameters()
654
  ems = cfg.GetUseExternalMipScript()
655
  runner = rpc.BootstrapRunner()
656
  # we use the node name, as the configuration is only available here yet
657
  result = runner.call_node_activate_master_ip(
658
             cfg.GetNodeName(master_params.uuid), master_params, ems)
659

    
660
  msg = result.fail_msg
661
  if msg:
662
    logging.error("Can't activate master IP address: %s", msg)
663

    
664

    
665
def CheckMasterd(options, args):
666
  """Initial checks whether to run or exit with a failure.
667

668
  """
669
  if args: # masterd doesn't take any arguments
670
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
671
    sys.exit(constants.EXIT_FAILURE)
672

    
673
  ssconf.CheckMaster(options.debug)
674

    
675
  try:
676
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
677
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
678
  except KeyError:
679
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
680
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
681
    sys.exit(constants.EXIT_FAILURE)
682

    
683
  # Determine static runtime architecture information
684
  runtime.InitArchInfo()
685

    
686
  # Check the configuration is sane before anything else
687
  try:
688
    config.ConfigWriter()
689
  except errors.ConfigVersionMismatch, err:
690
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
691
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
692
    print >> sys.stderr,  \
693
        ("Configuration version mismatch. The current Ganeti software"
694
         " expects version %s, but the on-disk configuration file has"
695
         " version %s. This is likely the result of upgrading the"
696
         " software without running the upgrade procedure. Please contact"
697
         " your cluster administrator or complete the upgrade using the"
698
         " cfgupgrade utility, after reading the upgrade notes." %
699
         (v1, v2))
700
    sys.exit(constants.EXIT_FAILURE)
701
  except errors.ConfigurationError, err:
702
    print >> sys.stderr, \
703
        ("Configuration error while opening the configuration file: %s\n"
704
         "This might be caused by an incomplete software upgrade or"
705
         " by a corrupted configuration file. Until the problem is fixed"
706
         " the master daemon cannot start." % str(err))
707
    sys.exit(constants.EXIT_FAILURE)
708

    
709
  # If CheckMaster didn't fail we believe we are the master, but we have to
710
  # confirm with the other nodes.
711
  if options.no_voting:
712
    if not options.yes_do_it:
713
      sys.stdout.write("The 'no voting' option has been selected.\n")
714
      sys.stdout.write("This is dangerous, please confirm by"
715
                       " typing uppercase 'yes': ")
716
      sys.stdout.flush()
717

    
718
      confirmation = sys.stdin.readline().strip()
719
      if confirmation != "YES":
720
        print >> sys.stderr, "Aborting."
721
        sys.exit(constants.EXIT_FAILURE)
722

    
723
  else:
724
    # CheckAgreement uses RPC and threads, hence it needs to be run in
725
    # a separate process before we call utils.Daemonize in the current
726
    # process.
727
    if not utils.RunInSeparateProcess(CheckAgreement):
728
      sys.exit(constants.EXIT_FAILURE)
729

    
730
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
731
  # separate process.
732

    
733
  # TODO: decide whether failure to activate the master IP is a fatal error
734
  utils.RunInSeparateProcess(ActivateMasterIP)
735

    
736

    
737
def PrepMasterd(options, _):
738
  """Prep master daemon function, executed with the PID file held.
739

740
  """
741
  # This is safe to do as the pid file guarantees against
742
  # concurrent execution.
743
  utils.RemoveFile(pathutils.MASTER_SOCKET)
744

    
745
  mainloop = daemon.Mainloop()
746
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
747
  return (mainloop, master)
748

    
749

    
750
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
751
  """Main master daemon function, executed with the PID file held.
752

753
  """
754
  (mainloop, master) = prep_data
755
  try:
756
    rpc.Init()
757
    try:
758
      master.setup_queue()
759
      try:
760
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
761
      finally:
762
        master.server_cleanup()
763
    finally:
764
      rpc.Shutdown()
765
  finally:
766
    utils.RemoveFile(pathutils.MASTER_SOCKET)
767

    
768
  logging.info("Clean master daemon shutdown")
769

    
770

    
771
def Main():
772
  """Main function"""
773
  parser = OptionParser(description="Ganeti master daemon",
774
                        usage="%prog [-f] [-d]",
775
                        version="%%prog (ganeti) %s" %
776
                        constants.RELEASE_VERSION)
777
  parser.add_option("--no-voting", dest="no_voting",
778
                    help="Do not check that the nodes agree on this node"
779
                    " being the master and start the daemon unconditionally",
780
                    default=False, action="store_true")
781
  parser.add_option("--yes-do-it", dest="yes_do_it",
782
                    help="Override interactive check for --no-voting",
783
                    default=False, action="store_true")
784
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
785
                     ExecMasterd, multithreaded=True)