Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ c1391810

History | View | Annotate | Download (21.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable-msg=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable-msg=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116
  def __init__(self, server, connected_socket, client_address, family):
117
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
118
                                                 client_address,
119
                                                 constants.LUXI_EOM,
120
                                                 family, self._MAX_UNHANDLED)
121
    self.server = server
122

    
123
  def handle_message(self, message, _):
124
    self.server.request_workers.AddTask((self.server, message, self))
125

    
126

    
127
class MasterServer(daemon.AsyncStreamServer):
128
  """Master Server.
129

130
  This is the main asynchronous master server. It handles connections to the
131
  master socket.
132

133
  """
134
  family = socket.AF_UNIX
135

    
136
  def __init__(self, mainloop, address, uid, gid):
137
    """MasterServer constructor
138

139
    @type mainloop: ganeti.daemon.Mainloop
140
    @param mainloop: Mainloop used to poll for I/O events
141
    @param address: the unix socket address to bind the MasterServer to
142
    @param uid: The uid of the owner of the socket
143
    @param gid: The gid of the owner of the socket
144

145
    """
146
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148
    os.chmod(temp_name, 0770)
149
    os.chown(temp_name, uid, gid)
150
    os.rename(temp_name, address)
151

    
152
    self.mainloop = mainloop
153
    self.awaker = daemon.AsyncAwaker()
154

    
155
    # We'll only start threads once we've forked.
156
    self.context = None
157
    self.request_workers = None
158

    
159
  def handle_connection(self, connected_socket, client_address):
160
    # TODO: add connection count and limit the number of open connections to a
161
    # maximum number to avoid breaking for lack of file descriptors or memory.
162
    MasterClientHandler(self, connected_socket, client_address, self.family)
163

    
164
  def setup_queue(self):
165
    self.context = GanetiContext()
166
    self.request_workers = workerpool.WorkerPool("ClientReq",
167
                                                 CLIENT_REQUEST_WORKERS,
168
                                                 ClientRequestWorker)
169

    
170
  def server_cleanup(self):
171
    """Cleanup the server.
172

173
    This involves shutting down the processor threads and the master
174
    socket.
175

176
    """
177
    try:
178
      self.close()
179
    finally:
180
      if self.request_workers:
181
        self.request_workers.TerminateWorkers()
182
      if self.context:
183
        self.context.jobqueue.Shutdown()
184

    
185

    
186
class ClientOps:
187
  """Class holding high-level client operations."""
188
  def __init__(self, server):
189
    self.server = server
190

    
191
  def handle_request(self, method, args): # pylint: disable-msg=R0911
192
    queue = self.server.context.jobqueue
193

    
194
    # TODO: Parameter validation
195

    
196
    # TODO: Rewrite to not exit in each 'if/elif' branch
197

    
198
    if method == luxi.REQ_SUBMIT_JOB:
199
      logging.info("Received new job")
200
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201
      return queue.SubmitJob(ops)
202

    
203
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
204
      logging.info("Received multiple jobs")
205
      jobs = []
206
      for ops in args:
207
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208
      return queue.SubmitManyJobs(jobs)
209

    
210
    elif method == luxi.REQ_CANCEL_JOB:
211
      job_id = args
212
      logging.info("Received job cancel request for %s", job_id)
213
      return queue.CancelJob(job_id)
214

    
215
    elif method == luxi.REQ_ARCHIVE_JOB:
216
      job_id = args
217
      logging.info("Received job archive request for %s", job_id)
218
      return queue.ArchiveJob(job_id)
219

    
220
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221
      (age, timeout) = args
222
      logging.info("Received job autoarchive request for age %s, timeout %s",
223
                   age, timeout)
224
      return queue.AutoArchiveJobs(age, timeout)
225

    
226
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228
      logging.info("Received job poll request for %s", job_id)
229
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230
                                     prev_log_serial, timeout)
231

    
232
    elif method == luxi.REQ_QUERY:
233
      req = objects.QueryRequest.FromDict(args)
234

    
235
      if req.what in constants.QR_VIA_OP:
236
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
237
                                             filter=req.filter))
238
      elif req.what == constants.QR_LOCK:
239
        if req.filter is not None:
240
          raise errors.OpPrereqError("Lock queries can't be filtered")
241
        return self.server.context.glm.QueryLocks(req.fields)
242
      elif req.what in constants.QR_VIA_LUXI:
243
        raise NotImplementedError
244
      else:
245
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
246
                                   errors.ECODE_INVAL)
247

    
248
      return result
249

    
250
    elif method == luxi.REQ_QUERY_FIELDS:
251
      req = objects.QueryFieldsRequest.FromDict(args)
252

    
253
      try:
254
        fielddefs = query.ALL_FIELDS[req.what]
255
      except KeyError:
256
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
257
                                   errors.ECODE_INVAL)
258

    
259
      return query.QueryFields(fielddefs, req.fields)
260

    
261
    elif method == luxi.REQ_QUERY_JOBS:
262
      (job_ids, fields) = args
263
      if isinstance(job_ids, (tuple, list)) and job_ids:
264
        msg = utils.CommaJoin(job_ids)
265
      else:
266
        msg = str(job_ids)
267
      logging.info("Received job query request for %s", msg)
268
      return queue.QueryJobs(job_ids, fields)
269

    
270
    elif method == luxi.REQ_QUERY_INSTANCES:
271
      (names, fields, use_locking) = args
272
      logging.info("Received instance query request for %s", names)
273
      if use_locking:
274
        raise errors.OpPrereqError("Sync queries are not allowed",
275
                                   errors.ECODE_INVAL)
276
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
277
                                   use_locking=use_locking)
278
      return self._Query(op)
279

    
280
    elif method == luxi.REQ_QUERY_NODES:
281
      (names, fields, use_locking) = args
282
      logging.info("Received node query request for %s", names)
283
      if use_locking:
284
        raise errors.OpPrereqError("Sync queries are not allowed",
285
                                   errors.ECODE_INVAL)
286
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
287
                               use_locking=use_locking)
288
      return self._Query(op)
289

    
290
    elif method == luxi.REQ_QUERY_GROUPS:
291
      (names, fields, use_locking) = args
292
      logging.info("Received group query request for %s", names)
293
      if use_locking:
294
        raise errors.OpPrereqError("Sync queries are not allowed",
295
                                   errors.ECODE_INVAL)
296
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
297
      return self._Query(op)
298

    
299
    elif method == luxi.REQ_QUERY_EXPORTS:
300
      nodes, use_locking = args
301
      if use_locking:
302
        raise errors.OpPrereqError("Sync queries are not allowed",
303
                                   errors.ECODE_INVAL)
304
      logging.info("Received exports query request")
305
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
306
      return self._Query(op)
307

    
308
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
309
      fields = args
310
      logging.info("Received config values query request for %s", fields)
311
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
312
      return self._Query(op)
313

    
314
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
315
      logging.info("Received cluster info query request")
316
      op = opcodes.OpClusterQuery()
317
      return self._Query(op)
318

    
319
    elif method == luxi.REQ_QUERY_TAGS:
320
      kind, name = args
321
      logging.info("Received tags query request")
322
      op = opcodes.OpTagsGet(kind=kind, name=name)
323
      return self._Query(op)
324

    
325
    elif method == luxi.REQ_QUERY_LOCKS:
326
      (fields, sync) = args
327
      logging.info("Received locks query request")
328
      if sync:
329
        raise NotImplementedError("Synchronous queries are not implemented")
330
      return self.server.context.glm.OldStyleQueryLocks(fields)
331

    
332
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
333
      drain_flag = args
334
      logging.info("Received queue drain flag change request to %s",
335
                   drain_flag)
336
      return queue.SetDrainFlag(drain_flag)
337

    
338
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
339
      (until, ) = args
340

    
341
      if until is None:
342
        logging.info("Received request to no longer pause the watcher")
343
      else:
344
        if not isinstance(until, (int, float)):
345
          raise TypeError("Duration must be an integer or float")
346

    
347
        if until < time.time():
348
          raise errors.GenericError("Unable to set pause end time in the past")
349

    
350
        logging.info("Received request to pause the watcher until %s", until)
351

    
352
      return _SetWatcherPause(until)
353

    
354
    else:
355
      logging.info("Received invalid request '%s'", method)
356
      raise ValueError("Invalid operation '%s'" % method)
357

    
358
  def _Query(self, op):
359
    """Runs the specified opcode and returns the result.
360

361
    """
362
    # Queries don't have a job id
363
    proc = mcpu.Processor(self.server.context, None)
364

    
365
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
366
    # Consider using a timeout for retries.
367
    return proc.ExecOpCode(op, None)
368

    
369

    
370
class GanetiContext(object):
371
  """Context common to all ganeti threads.
372

373
  This class creates and holds common objects shared by all threads.
374

375
  """
376
  # pylint: disable-msg=W0212
377
  # we do want to ensure a singleton here
378
  _instance = None
379

    
380
  def __init__(self):
381
    """Constructs a new GanetiContext object.
382

383
    There should be only a GanetiContext object at any time, so this
384
    function raises an error if this is not the case.
385

386
    """
387
    assert self.__class__._instance is None, "double GanetiContext instance"
388

    
389
    # Create global configuration object
390
    self.cfg = config.ConfigWriter()
391

    
392
    # Locking manager
393
    self.glm = locking.GanetiLockManager(
394
                self.cfg.GetNodeList(),
395
                self.cfg.GetNodeGroupList(),
396
                self.cfg.GetInstanceList())
397

    
398
    # Job queue
399
    self.jobqueue = jqueue.JobQueue(self)
400

    
401
    # setting this also locks the class against attribute modifications
402
    self.__class__._instance = self
403

    
404
  def __setattr__(self, name, value):
405
    """Setting GanetiContext attributes is forbidden after initialization.
406

407
    """
408
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
409
    object.__setattr__(self, name, value)
410

    
411
  def AddNode(self, node, ec_id):
412
    """Adds a node to the configuration and lock manager.
413

414
    """
415
    # Add it to the configuration
416
    self.cfg.AddNode(node, ec_id)
417

    
418
    # If preseeding fails it'll not be added
419
    self.jobqueue.AddNode(node)
420

    
421
    # Add the new node to the Ganeti Lock Manager
422
    self.glm.add(locking.LEVEL_NODE, node.name)
423

    
424
  def ReaddNode(self, node):
425
    """Updates a node that's already in the configuration
426

427
    """
428
    # Synchronize the queue again
429
    self.jobqueue.AddNode(node)
430

    
431
  def RemoveNode(self, name):
432
    """Removes a node from the configuration and lock manager.
433

434
    """
435
    # Remove node from configuration
436
    self.cfg.RemoveNode(name)
437

    
438
    # Notify job queue
439
    self.jobqueue.RemoveNode(name)
440

    
441
    # Remove the node from the Ganeti Lock Manager
442
    self.glm.remove(locking.LEVEL_NODE, name)
443

    
444

    
445
def _SetWatcherPause(until):
446
  """Creates or removes the watcher pause file.
447

448
  @type until: None or int
449
  @param until: Unix timestamp saying until when the watcher shouldn't run
450

451
  """
452
  if until is None:
453
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
454
  else:
455
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
456
                    data="%d\n" % (until, ))
457

    
458
  return until
459

    
460

    
461
@rpc.RunWithRPC
462
def CheckAgreement():
463
  """Check the agreement on who is the master.
464

465
  The function uses a very simple algorithm: we must get more positive
466
  than negative answers. Since in most of the cases we are the master,
467
  we'll use our own config file for getting the node list. In the
468
  future we could collect the current node list from our (possibly
469
  obsolete) known nodes.
470

471
  In order to account for cold-start of all nodes, we retry for up to
472
  a minute until we get a real answer as the top-voted one. If the
473
  nodes are more out-of-sync, for now manual startup of the master
474
  should be attempted.
475

476
  Note that for a even number of nodes cluster, we need at least half
477
  of the nodes (beside ourselves) to vote for us. This creates a
478
  problem on two-node clusters, since in this case we require the
479
  other node to be up too to confirm our status.
480

481
  """
482
  myself = netutils.Hostname.GetSysName()
483
  #temp instantiation of a config writer, used only to get the node list
484
  cfg = config.ConfigWriter()
485
  node_list = cfg.GetNodeList()
486
  del cfg
487
  retries = 6
488
  while retries > 0:
489
    votes = bootstrap.GatherMasterVotes(node_list)
490
    if not votes:
491
      # empty node list, this is a one node cluster
492
      return True
493
    if votes[0][0] is None:
494
      retries -= 1
495
      time.sleep(10)
496
      continue
497
    break
498
  if retries == 0:
499
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
500
                     " after multiple retries. Aborting startup")
501
    logging.critical("Use the --no-voting option if you understand what"
502
                     " effects it has on the cluster state")
503
    return False
504
  # here a real node is at the top of the list
505
  all_votes = sum(item[1] for item in votes)
506
  top_node, top_votes = votes[0]
507

    
508
  result = False
509
  if top_node != myself:
510
    logging.critical("It seems we are not the master (top-voted node"
511
                     " is %s with %d out of %d votes)", top_node, top_votes,
512
                     all_votes)
513
  elif top_votes < all_votes - top_votes:
514
    logging.critical("It seems we are not the master (%d votes for,"
515
                     " %d votes against)", top_votes, all_votes - top_votes)
516
  else:
517
    result = True
518

    
519
  return result
520

    
521

    
522
@rpc.RunWithRPC
523
def ActivateMasterIP():
524
  # activate ip
525
  master_node = ssconf.SimpleStore().GetMasterNode()
526
  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
527
  msg = result.fail_msg
528
  if msg:
529
    logging.error("Can't activate master IP address: %s", msg)
530

    
531

    
532
def CheckMasterd(options, args):
533
  """Initial checks whether to run or exit with a failure.
534

535
  """
536
  if args: # masterd doesn't take any arguments
537
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
538
    sys.exit(constants.EXIT_FAILURE)
539

    
540
  ssconf.CheckMaster(options.debug)
541

    
542
  try:
543
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
544
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
545
  except KeyError:
546
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
547
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
548
    sys.exit(constants.EXIT_FAILURE)
549

    
550
  # Check the configuration is sane before anything else
551
  try:
552
    config.ConfigWriter()
553
  except errors.ConfigVersionMismatch, err:
554
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
555
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
556
    print >> sys.stderr,  \
557
        ("Configuration version mismatch. The current Ganeti software"
558
         " expects version %s, but the on-disk configuration file has"
559
         " version %s. This is likely the result of upgrading the"
560
         " software without running the upgrade procedure. Please contact"
561
         " your cluster administrator or complete the upgrade using the"
562
         " cfgupgrade utility, after reading the upgrade notes." %
563
         (v1, v2))
564
    sys.exit(constants.EXIT_FAILURE)
565
  except errors.ConfigurationError, err:
566
    print >> sys.stderr, \
567
        ("Configuration error while opening the configuration file: %s\n"
568
         "This might be caused by an incomplete software upgrade or"
569
         " by a corrupted configuration file. Until the problem is fixed"
570
         " the master daemon cannot start." % str(err))
571
    sys.exit(constants.EXIT_FAILURE)
572

    
573
  # If CheckMaster didn't fail we believe we are the master, but we have to
574
  # confirm with the other nodes.
575
  if options.no_voting:
576
    if options.yes_do_it:
577
      return
578

    
579
    sys.stdout.write("The 'no voting' option has been selected.\n")
580
    sys.stdout.write("This is dangerous, please confirm by"
581
                     " typing uppercase 'yes': ")
582
    sys.stdout.flush()
583

    
584
    confirmation = sys.stdin.readline().strip()
585
    if confirmation != "YES":
586
      print >> sys.stderr, "Aborting."
587
      sys.exit(constants.EXIT_FAILURE)
588

    
589
    return
590

    
591
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
592
  # process before we call utils.Daemonize in the current process.
593
  if not utils.RunInSeparateProcess(CheckAgreement):
594
    sys.exit(constants.EXIT_FAILURE)
595

    
596
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
597
  # separate process.
598

    
599
  # TODO: decide whether failure to activate the master IP is a fatal error
600
  utils.RunInSeparateProcess(ActivateMasterIP)
601

    
602

    
603
def PrepMasterd(options, _):
604
  """Prep master daemon function, executed with the PID file held.
605

606
  """
607
  # This is safe to do as the pid file guarantees against
608
  # concurrent execution.
609
  utils.RemoveFile(constants.MASTER_SOCKET)
610

    
611
  mainloop = daemon.Mainloop()
612
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
613
                        options.uid, options.gid)
614
  return (mainloop, master)
615

    
616

    
617
def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
618
  """Main master daemon function, executed with the PID file held.
619

620
  """
621
  (mainloop, master) = prep_data
622
  try:
623
    rpc.Init()
624
    try:
625
      master.setup_queue()
626
      try:
627
        mainloop.Run()
628
      finally:
629
        master.server_cleanup()
630
    finally:
631
      rpc.Shutdown()
632
  finally:
633
    utils.RemoveFile(constants.MASTER_SOCKET)
634

    
635

    
636
def Main():
637
  """Main function"""
638
  parser = OptionParser(description="Ganeti master daemon",
639
                        usage="%prog [-f] [-d]",
640
                        version="%%prog (ganeti) %s" %
641
                        constants.RELEASE_VERSION)
642
  parser.add_option("--no-voting", dest="no_voting",
643
                    help="Do not check that the nodes agree on this node"
644
                    " being the master and start the daemon unconditionally",
645
                    default=False, action="store_true")
646
  parser.add_option("--yes-do-it", dest="yes_do_it",
647
                    help="Override interactive check for --no-voting",
648
                    default=False, action="store_true")
649
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
650
                     ExecMasterd, multithreaded=True)