Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ d9d1e541

History | View | Annotate | Download (26 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62
from ganeti import ht
63

    
64
from ganeti.utils import version
65

    
66

    
67
CLIENT_REQUEST_WORKERS = 16
68

    
69
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
70
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
71

    
72

    
73
def _LogNewJob(status, info, ops):
74
  """Log information about a recently submitted job.
75

76
  """
77
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
78

    
79
  if status:
80
    logging.info("New job with id %s, summary: %s", info, op_summary)
81
  else:
82
    logging.info("Failed to submit job, reason: '%s', summary: %s",
83
                 info, op_summary)
84

    
85

    
86
class ClientRequestWorker(workerpool.BaseWorker):
87
  # pylint: disable=W0221
88
  def RunTask(self, server, message, client):
89
    """Process the request.
90

91
    """
92
    client_ops = ClientOps(server)
93

    
94
    try:
95
      (method, args, ver) = luxi.ParseRequest(message)
96
    except luxi.ProtocolError, err:
97
      logging.error("Protocol Error: %s", err)
98
      client.close_log()
99
      return
100

    
101
    success = False
102
    try:
103
      # Verify client's version if there was one in the request
104
      if ver is not None and ver != constants.LUXI_VERSION:
105
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
106
                               (constants.LUXI_VERSION, ver))
107

    
108
      result = client_ops.handle_request(method, args)
109
      success = True
110
    except errors.GenericError, err:
111
      logging.exception("Unexpected exception")
112
      success = False
113
      result = errors.EncodeException(err)
114
    except:
115
      logging.exception("Unexpected exception")
116
      err = sys.exc_info()
117
      result = "Caught exception: %s" % str(err[1])
118

    
119
    try:
120
      reply = luxi.FormatResponse(success, result)
121
      client.send_message(reply)
122
      # awake the main thread so that it can write out the data.
123
      server.awaker.signal()
124
    except: # pylint: disable=W0702
125
      logging.exception("Send error")
126
      client.close_log()
127

    
128

    
129
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
130
  """Handler for master peers.
131

132
  """
133
  _MAX_UNHANDLED = 1
134

    
135
  def __init__(self, server, connected_socket, client_address, family):
136
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
137
                                                 client_address,
138
                                                 constants.LUXI_EOM,
139
                                                 family, self._MAX_UNHANDLED)
140
    self.server = server
141

    
142
  def handle_message(self, message, _):
143
    self.server.request_workers.AddTask((self.server, message, self))
144

    
145

    
146
class _MasterShutdownCheck:
147
  """Logic for master daemon shutdown.
148

149
  """
150
  #: How long to wait between checks
151
  _CHECK_INTERVAL = 5.0
152

    
153
  #: How long to wait after all jobs are done (e.g. to give clients time to
154
  #: retrieve the job status)
155
  _SHUTDOWN_LINGER = 5.0
156

    
157
  def __init__(self):
158
    """Initializes this class.
159

160
    """
161
    self._had_active_jobs = None
162
    self._linger_timeout = None
163

    
164
  def __call__(self, jq_prepare_result):
165
    """Determines if master daemon is ready for shutdown.
166

167
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
168
    @rtype: None or number
169
    @return: None if master daemon is ready, timeout if the check must be
170
             repeated
171

172
    """
173
    if jq_prepare_result:
174
      # Check again shortly
175
      logging.info("Job queue has been notified for shutdown but is still"
176
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
177
      self._had_active_jobs = True
178
      return self._CHECK_INTERVAL
179

    
180
    if not self._had_active_jobs:
181
      # Can shut down as there were no active jobs on the first check
182
      return None
183

    
184
    # No jobs are running anymore, but maybe some clients want to collect some
185
    # information. Give them a short amount of time.
186
    if self._linger_timeout is None:
187
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
188

    
189
    remaining = self._linger_timeout.Remaining()
190

    
191
    logging.info("Job queue no longer busy; shutting down master daemon"
192
                 " in %s seconds", remaining)
193

    
194
    # TODO: Should the master daemon socket be closed at this point? Doing so
195
    # wouldn't affect existing connections.
196

    
197
    if remaining < 0:
198
      return None
199
    else:
200
      return remaining
201

    
202

    
203
class MasterServer(daemon.AsyncStreamServer):
204
  """Master Server.
205

206
  This is the main asynchronous master server. It handles connections to the
207
  master socket.
208

209
  """
210
  family = socket.AF_UNIX
211

    
212
  def __init__(self, address, uid, gid):
213
    """MasterServer constructor
214

215
    @param address: the unix socket address to bind the MasterServer to
216
    @param uid: The uid of the owner of the socket
217
    @param gid: The gid of the owner of the socket
218

219
    """
220
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
221
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
222
    os.chmod(temp_name, 0770)
223
    os.chown(temp_name, uid, gid)
224
    os.rename(temp_name, address)
225

    
226
    self.awaker = daemon.AsyncAwaker()
227

    
228
    # We'll only start threads once we've forked.
229
    self.context = None
230
    self.request_workers = None
231

    
232
    self._shutdown_check = None
233

    
234
  def handle_connection(self, connected_socket, client_address):
235
    # TODO: add connection count and limit the number of open connections to a
236
    # maximum number to avoid breaking for lack of file descriptors or memory.
237
    MasterClientHandler(self, connected_socket, client_address, self.family)
238

    
239
  def setup_queue(self):
240
    self.context = GanetiContext()
241
    self.request_workers = workerpool.WorkerPool("ClientReq",
242
                                                 CLIENT_REQUEST_WORKERS,
243
                                                 ClientRequestWorker)
244

    
245
  def WaitForShutdown(self):
246
    """Prepares server for shutdown.
247

248
    """
249
    if self._shutdown_check is None:
250
      self._shutdown_check = _MasterShutdownCheck()
251

    
252
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
253

    
254
  def server_cleanup(self):
255
    """Cleanup the server.
256

257
    This involves shutting down the processor threads and the master
258
    socket.
259

260
    """
261
    try:
262
      self.close()
263
    finally:
264
      if self.request_workers:
265
        self.request_workers.TerminateWorkers()
266
      if self.context:
267
        self.context.jobqueue.Shutdown()
268

    
269

    
270
class ClientOps:
271
  """Class holding high-level client operations."""
272
  def __init__(self, server):
273
    self.server = server
274

    
275
  def handle_request(self, method, args): # pylint: disable=R0911
276
    context = self.server.context
277
    queue = context.jobqueue
278

    
279
    # TODO: Parameter validation
280
    if not isinstance(args, (tuple, list)):
281
      logging.info("Received invalid arguments of type '%s'", type(args))
282
      raise ValueError("Invalid arguments type '%s'" % type(args))
283

    
284
    if method not in luxi.REQ_ALL:
285
      logging.info("Received invalid request '%s'", method)
286
      raise ValueError("Invalid operation '%s'" % method)
287

    
288
    # TODO: Rewrite to not exit in each 'if/elif' branch
289

    
290
    if method == luxi.REQ_SUBMIT_JOB:
291
      logging.info("Receiving new job")
292
      (job_def, ) = args
293
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
294
      job_id = queue.SubmitJob(ops)
295
      _LogNewJob(True, job_id, ops)
296
      return job_id
297

    
298
    elif method == luxi.REQ_PICKUP_JOB:
299
      logging.info("Picking up new job from queue")
300
      (job_id, ) = args
301
      queue.PickupJob(job_id)
302

    
303
    elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE:
304
      logging.info("Forcefully receiving new job")
305
      (job_def, ) = args
306
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
307
      job_id = queue.SubmitJobToDrainedQueue(ops)
308
      _LogNewJob(True, job_id, ops)
309
      return job_id
310

    
311
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
312
      logging.info("Receiving multiple jobs")
313
      (job_defs, ) = args
314
      jobs = []
315
      for ops in job_defs:
316
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
317
      job_ids = queue.SubmitManyJobs(jobs)
318
      for ((status, job_id), ops) in zip(job_ids, jobs):
319
        _LogNewJob(status, job_id, ops)
320
      return job_ids
321

    
322
    elif method == luxi.REQ_CANCEL_JOB:
323
      (job_id, ) = args
324
      logging.info("Received job cancel request for %s", job_id)
325
      return queue.CancelJob(job_id)
326

    
327
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
328
      (job_id, priority) = args
329
      logging.info("Received request to change priority for job %s to %s",
330
                   job_id, priority)
331
      return queue.ChangeJobPriority(job_id, priority)
332

    
333
    elif method == luxi.REQ_ARCHIVE_JOB:
334
      (job_id, ) = args
335
      logging.info("Received job archive request for %s", job_id)
336
      return queue.ArchiveJob(job_id)
337

    
338
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
339
      (age, timeout) = args
340
      logging.info("Received job autoarchive request for age %s, timeout %s",
341
                   age, timeout)
342
      return queue.AutoArchiveJobs(age, timeout)
343

    
344
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
345
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
346
      logging.info("Received job poll request for %s", job_id)
347
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
348
                                     prev_log_serial, timeout)
349

    
350
    elif method == luxi.REQ_QUERY:
351
      (what, fields, qfilter) = args
352

    
353
      if what in constants.QR_VIA_OP:
354
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
355
                                             qfilter=qfilter))
356
      elif what == constants.QR_LOCK:
357
        if qfilter is not None:
358
          raise errors.OpPrereqError("Lock queries can't be filtered",
359
                                     errors.ECODE_INVAL)
360
        return context.glm.QueryLocks(fields)
361
      elif what == constants.QR_JOB:
362
        return queue.QueryJobs(fields, qfilter)
363
      elif what in constants.QR_VIA_LUXI:
364
        raise NotImplementedError
365
      else:
366
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
367
                                   errors.ECODE_INVAL)
368

    
369
      return result
370

    
371
    elif method == luxi.REQ_QUERY_FIELDS:
372
      (what, fields) = args
373
      req = objects.QueryFieldsRequest(what=what, fields=fields)
374

    
375
      try:
376
        fielddefs = query.ALL_FIELDS[req.what]
377
      except KeyError:
378
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
379
                                   errors.ECODE_INVAL)
380

    
381
      return query.QueryFields(fielddefs, req.fields)
382

    
383
    elif method == luxi.REQ_QUERY_JOBS:
384
      (job_ids, fields) = args
385
      if isinstance(job_ids, (tuple, list)) and job_ids:
386
        msg = utils.CommaJoin(job_ids)
387
      else:
388
        msg = str(job_ids)
389
      logging.info("Received job query request for %s", msg)
390
      return queue.OldStyleQueryJobs(job_ids, fields)
391

    
392
    elif method == luxi.REQ_QUERY_INSTANCES:
393
      (names, fields, use_locking) = args
394
      logging.info("Received instance query request for %s", names)
395
      if use_locking:
396
        raise errors.OpPrereqError("Sync queries are not allowed",
397
                                   errors.ECODE_INVAL)
398
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
399
                                   use_locking=use_locking)
400
      return self._Query(op)
401

    
402
    elif method == luxi.REQ_QUERY_NODES:
403
      (names, fields, use_locking) = args
404
      logging.info("Received node query request for %s", names)
405
      if use_locking:
406
        raise errors.OpPrereqError("Sync queries are not allowed",
407
                                   errors.ECODE_INVAL)
408
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
409
                               use_locking=use_locking)
410
      return self._Query(op)
411

    
412
    elif method == luxi.REQ_QUERY_GROUPS:
413
      (names, fields, use_locking) = args
414
      logging.info("Received group query request for %s", names)
415
      if use_locking:
416
        raise errors.OpPrereqError("Sync queries are not allowed",
417
                                   errors.ECODE_INVAL)
418
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
419
      return self._Query(op)
420

    
421
    elif method == luxi.REQ_QUERY_NETWORKS:
422
      (names, fields, use_locking) = args
423
      logging.info("Received network query request for %s", names)
424
      if use_locking:
425
        raise errors.OpPrereqError("Sync queries are not allowed",
426
                                   errors.ECODE_INVAL)
427
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
428
      return self._Query(op)
429

    
430
    elif method == luxi.REQ_QUERY_EXPORTS:
431
      (nodes, use_locking) = args
432
      if use_locking:
433
        raise errors.OpPrereqError("Sync queries are not allowed",
434
                                   errors.ECODE_INVAL)
435
      logging.info("Received exports query request")
436
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
437
      return self._Query(op)
438

    
439
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
440
      (fields, ) = args
441
      logging.info("Received config values query request for %s", fields)
442
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
443
      return self._Query(op)
444

    
445
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
446
      logging.info("Received cluster info query request")
447
      op = opcodes.OpClusterQuery()
448
      return self._Query(op)
449

    
450
    elif method == luxi.REQ_QUERY_TAGS:
451
      (kind, name) = args
452
      logging.info("Received tags query request")
453
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
454
      return self._Query(op)
455

    
456
    elif method == luxi.REQ_SET_DRAIN_FLAG:
457
      (drain_flag, ) = args
458
      logging.info("Received queue drain flag change request to %s",
459
                   drain_flag)
460
      return queue.SetDrainFlag(drain_flag)
461

    
462
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
463
      (until, ) = args
464

    
465
      return _SetWatcherPause(context, until)
466

    
467
    else:
468
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
469
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
470
                                   " but not implemented" % method)
471

    
472
  def _Query(self, op):
473
    """Runs the specified opcode and returns the result.
474

475
    """
476
    # Queries don't have a job id
477
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
478

    
479
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
480
    # Consider using a timeout for retries.
481
    return proc.ExecOpCode(op, None)
482

    
483

    
484
class GanetiContext(object):
485
  """Context common to all ganeti threads.
486

487
  This class creates and holds common objects shared by all threads.
488

489
  """
490
  # pylint: disable=W0212
491
  # we do want to ensure a singleton here
492
  _instance = None
493

    
494
  def __init__(self):
495
    """Constructs a new GanetiContext object.
496

497
    There should be only a GanetiContext object at any time, so this
498
    function raises an error if this is not the case.
499

500
    """
501
    assert self.__class__._instance is None, "double GanetiContext instance"
502

    
503
    # Create global configuration object
504
    self.cfg = config.ConfigWriter()
505

    
506
    # Locking manager
507
    self.glm = locking.GanetiLockManager(
508
      self.cfg.GetNodeList(),
509
      self.cfg.GetNodeGroupList(),
510
      [inst.name for inst in self.cfg.GetAllInstancesInfo().values()],
511
      self.cfg.GetNetworkList())
512

    
513
    self.cfg.SetContext(self)
514

    
515
    # RPC runner
516
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
517

    
518
    # Job queue
519
    self.jobqueue = jqueue.JobQueue(self)
520

    
521
    # setting this also locks the class against attribute modifications
522
    self.__class__._instance = self
523

    
524
  def __setattr__(self, name, value):
525
    """Setting GanetiContext attributes is forbidden after initialization.
526

527
    """
528
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
529
    object.__setattr__(self, name, value)
530

    
531
  def AddNode(self, node, ec_id):
532
    """Adds a node to the configuration and lock manager.
533

534
    """
535
    # Add it to the configuration
536
    self.cfg.AddNode(node, ec_id)
537

    
538
    # If preseeding fails it'll not be added
539
    self.jobqueue.AddNode(node)
540

    
541
    # Add the new node to the Ganeti Lock Manager
542
    self.glm.add(locking.LEVEL_NODE, node.uuid)
543
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
544

    
545
  def ReaddNode(self, node):
546
    """Updates a node that's already in the configuration
547

548
    """
549
    # Synchronize the queue again
550
    self.jobqueue.AddNode(node)
551

    
552
  def RemoveNode(self, node):
553
    """Removes a node from the configuration and lock manager.
554

555
    """
556
    # Remove node from configuration
557
    self.cfg.RemoveNode(node.uuid)
558

    
559
    # Notify job queue
560
    self.jobqueue.RemoveNode(node.name)
561

    
562
    # Remove the node from the Ganeti Lock Manager
563
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
564
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
565

    
566

    
567
def _SetWatcherPause(context, until):
568
  """Creates or removes the watcher pause file.
569

570
  @type context: L{GanetiContext}
571
  @param context: Global Ganeti context
572
  @type until: None or int
573
  @param until: Unix timestamp saying until when the watcher shouldn't run
574

575
  """
576
  node_names = context.cfg.GetNodeList()
577

    
578
  if until is None:
579
    logging.info("Received request to no longer pause watcher")
580
  else:
581
    if not ht.TNumber(until):
582
      raise TypeError("Duration must be numeric")
583

    
584
    if until < time.time():
585
      raise errors.GenericError("Unable to set pause end time in the past")
586

    
587
    logging.info("Received request to pause watcher until %s", until)
588

    
589
  result = context.rpc.call_set_watcher_pause(node_names, until)
590

    
591
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
592
                           for (node_name, nres) in result.items()
593
                           if nres.fail_msg and not nres.offline)
594
  if errmsg:
595
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
596
                             " on the following node(s): %s" % errmsg)
597

    
598
  return until
599

    
600

    
601
@rpc.RunWithRPC
602
def CheckAgreement():
603
  """Check the agreement on who is the master.
604

605
  The function uses a very simple algorithm: we must get more positive
606
  than negative answers. Since in most of the cases we are the master,
607
  we'll use our own config file for getting the node list. In the
608
  future we could collect the current node list from our (possibly
609
  obsolete) known nodes.
610

611
  In order to account for cold-start of all nodes, we retry for up to
612
  a minute until we get a real answer as the top-voted one. If the
613
  nodes are more out-of-sync, for now manual startup of the master
614
  should be attempted.
615

616
  Note that for a even number of nodes cluster, we need at least half
617
  of the nodes (beside ourselves) to vote for us. This creates a
618
  problem on two-node clusters, since in this case we require the
619
  other node to be up too to confirm our status.
620

621
  """
622
  myself = netutils.Hostname.GetSysName()
623
  #temp instantiation of a config writer, used only to get the node list
624
  cfg = config.ConfigWriter()
625
  node_names = cfg.GetNodeNames(cfg.GetNodeList())
626
  del cfg
627
  retries = 6
628
  while retries > 0:
629
    votes = bootstrap.GatherMasterVotes(node_names)
630
    if not votes:
631
      # empty node list, this is a one node cluster
632
      return True
633
    if votes[0][0] is None:
634
      retries -= 1
635
      time.sleep(10)
636
      continue
637
    break
638
  if retries == 0:
639
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
640
                     " after multiple retries. Aborting startup")
641
    logging.critical("Use the --no-voting option if you understand what"
642
                     " effects it has on the cluster state")
643
    return False
644
  # here a real node is at the top of the list
645
  all_votes = sum(item[1] for item in votes)
646
  top_node, top_votes = votes[0]
647

    
648
  result = False
649
  if top_node != myself:
650
    logging.critical("It seems we are not the master (top-voted node"
651
                     " is %s with %d out of %d votes)", top_node, top_votes,
652
                     all_votes)
653
  elif top_votes < all_votes - top_votes:
654
    logging.critical("It seems we are not the master (%d votes for,"
655
                     " %d votes against)", top_votes, all_votes - top_votes)
656
  else:
657
    result = True
658

    
659
  return result
660

    
661

    
662
@rpc.RunWithRPC
663
def ActivateMasterIP():
664
  # activate ip
665
  cfg = config.ConfigWriter()
666
  master_params = cfg.GetMasterNetworkParameters()
667
  ems = cfg.GetUseExternalMipScript()
668
  runner = rpc.BootstrapRunner()
669
  # we use the node name, as the configuration is only available here yet
670
  result = runner.call_node_activate_master_ip(
671
             cfg.GetNodeName(master_params.uuid), master_params, ems)
672

    
673
  msg = result.fail_msg
674
  if msg:
675
    logging.error("Can't activate master IP address: %s", msg)
676

    
677

    
678
def CheckMasterd(options, args):
679
  """Initial checks whether to run or exit with a failure.
680

681
  """
682
  if args: # masterd doesn't take any arguments
683
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
684
    sys.exit(constants.EXIT_FAILURE)
685

    
686
  ssconf.CheckMaster(options.debug)
687

    
688
  try:
689
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
690
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
691
  except KeyError:
692
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
693
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
694
    sys.exit(constants.EXIT_FAILURE)
695

    
696
  # Determine static runtime architecture information
697
  runtime.InitArchInfo()
698

    
699
  # Check the configuration is sane before anything else
700
  try:
701
    config.ConfigWriter()
702
  except errors.ConfigVersionMismatch, err:
703
    v1 = "%s.%s.%s" % version.SplitVersion(err.args[0])
704
    v2 = "%s.%s.%s" % version.SplitVersion(err.args[1])
705
    print >> sys.stderr,  \
706
        ("Configuration version mismatch. The current Ganeti software"
707
         " expects version %s, but the on-disk configuration file has"
708
         " version %s. This is likely the result of upgrading the"
709
         " software without running the upgrade procedure. Please contact"
710
         " your cluster administrator or complete the upgrade using the"
711
         " cfgupgrade utility, after reading the upgrade notes." %
712
         (v1, v2))
713
    sys.exit(constants.EXIT_FAILURE)
714
  except errors.ConfigurationError, err:
715
    print >> sys.stderr, \
716
        ("Configuration error while opening the configuration file: %s\n"
717
         "This might be caused by an incomplete software upgrade or"
718
         " by a corrupted configuration file. Until the problem is fixed"
719
         " the master daemon cannot start." % str(err))
720
    sys.exit(constants.EXIT_FAILURE)
721

    
722
  # If CheckMaster didn't fail we believe we are the master, but we have to
723
  # confirm with the other nodes.
724
  if options.no_voting:
725
    if not options.yes_do_it:
726
      sys.stdout.write("The 'no voting' option has been selected.\n")
727
      sys.stdout.write("This is dangerous, please confirm by"
728
                       " typing uppercase 'yes': ")
729
      sys.stdout.flush()
730

    
731
      confirmation = sys.stdin.readline().strip()
732
      if confirmation != "YES":
733
        print >> sys.stderr, "Aborting."
734
        sys.exit(constants.EXIT_FAILURE)
735

    
736
  else:
737
    # CheckAgreement uses RPC and threads, hence it needs to be run in
738
    # a separate process before we call utils.Daemonize in the current
739
    # process.
740
    if not utils.RunInSeparateProcess(CheckAgreement):
741
      sys.exit(constants.EXIT_FAILURE)
742

    
743
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
744
  # separate process.
745

    
746
  # TODO: decide whether failure to activate the master IP is a fatal error
747
  utils.RunInSeparateProcess(ActivateMasterIP)
748

    
749

    
750
def PrepMasterd(options, _):
751
  """Prep master daemon function, executed with the PID file held.
752

753
  """
754
  # This is safe to do as the pid file guarantees against
755
  # concurrent execution.
756
  utils.RemoveFile(pathutils.MASTER_SOCKET)
757

    
758
  mainloop = daemon.Mainloop()
759
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
760
  return (mainloop, master)
761

    
762

    
763
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
764
  """Main master daemon function, executed with the PID file held.
765

766
  """
767
  (mainloop, master) = prep_data
768
  try:
769
    rpc.Init()
770
    try:
771
      master.setup_queue()
772
      try:
773
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
774
      finally:
775
        master.server_cleanup()
776
    finally:
777
      rpc.Shutdown()
778
  finally:
779
    utils.RemoveFile(pathutils.MASTER_SOCKET)
780

    
781
  logging.info("Clean master daemon shutdown")
782

    
783

    
784
def Main():
785
  """Main function"""
786
  parser = OptionParser(description="Ganeti master daemon",
787
                        usage="%prog [-f] [-d]",
788
                        version="%%prog (ganeti) %s" %
789
                        constants.RELEASE_VERSION)
790
  parser.add_option("--no-voting", dest="no_voting",
791
                    help="Do not check that the nodes agree on this node"
792
                    " being the master and start the daemon unconditionally",
793
                    default=False, action="store_true")
794
  parser.add_option("--yes-do-it", dest="yes_do_it",
795
                    help="Override interactive check for --no-voting",
796
                    default=False, action="store_true")
797
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
798
                     ExecMasterd, multithreaded=True)