Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 819ca990

History | View | Annotate | Download (19.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  # pylint: disable-msg=W0221
68
  def RunTask(self, server, message, client):
69
    """Process the request.
70

71
    """
72
    client_ops = ClientOps(server)
73

    
74
    try:
75
      (method, args, version) = luxi.ParseRequest(message)
76
    except luxi.ProtocolError, err:
77
      logging.error("Protocol Error: %s", err)
78
      client.close_log()
79
      return
80

    
81
    success = False
82
    try:
83
      # Verify client's version if there was one in the request
84
      if version is not None and version != constants.LUXI_VERSION:
85
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
86
                               (constants.LUXI_VERSION, version))
87

    
88
      result = client_ops.handle_request(method, args)
89
      success = True
90
    except errors.GenericError, err:
91
      logging.exception("Unexpected exception")
92
      success = False
93
      result = errors.EncodeException(err)
94
    except:
95
      logging.exception("Unexpected exception")
96
      err = sys.exc_info()
97
      result = "Caught exception: %s" % str(err[1])
98

    
99
    try:
100
      reply = luxi.FormatResponse(success, result)
101
      client.send_message(reply)
102
      # awake the main thread so that it can write out the data.
103
      server.awaker.signal()
104
    except: # pylint: disable-msg=W0702
105
      logging.exception("Send error")
106
      client.close_log()
107

    
108

    
109
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
110
  """Handler for master peers.
111

112
  """
113
  _MAX_UNHANDLED = 1
114
  def __init__(self, server, connected_socket, client_address, family):
115
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
116
                                                 client_address,
117
                                                 constants.LUXI_EOM,
118
                                                 family, self._MAX_UNHANDLED)
119
    self.server = server
120

    
121
  def handle_message(self, message, _):
122
    self.server.request_workers.AddTask((self.server, message, self))
123

    
124

    
125
class MasterServer(daemon.AsyncStreamServer):
126
  """Master Server.
127

128
  This is the main asynchronous master server. It handles connections to the
129
  master socket.
130

131
  """
132
  family = socket.AF_UNIX
133

    
134
  def __init__(self, mainloop, address, uid, gid):
135
    """MasterServer constructor
136

137
    @type mainloop: ganeti.daemon.Mainloop
138
    @param mainloop: Mainloop used to poll for I/O events
139
    @param address: the unix socket address to bind the MasterServer to
140
    @param uid: The uid of the owner of the socket
141
    @param gid: The gid of the owner of the socket
142

143
    """
144
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
145
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
146
    os.chmod(temp_name, 0770)
147
    os.chown(temp_name, uid, gid)
148
    os.rename(temp_name, address)
149

    
150
    self.mainloop = mainloop
151
    self.awaker = daemon.AsyncAwaker()
152

    
153
    # We'll only start threads once we've forked.
154
    self.context = None
155
    self.request_workers = None
156

    
157
  def handle_connection(self, connected_socket, client_address):
158
    # TODO: add connection count and limit the number of open connections to a
159
    # maximum number to avoid breaking for lack of file descriptors or memory.
160
    MasterClientHandler(self, connected_socket, client_address, self.family)
161

    
162
  def setup_queue(self):
163
    self.context = GanetiContext()
164
    self.request_workers = workerpool.WorkerPool("ClientReq",
165
                                                 CLIENT_REQUEST_WORKERS,
166
                                                 ClientRequestWorker)
167

    
168
  def server_cleanup(self):
169
    """Cleanup the server.
170

171
    This involves shutting down the processor threads and the master
172
    socket.
173

174
    """
175
    try:
176
      self.close()
177
    finally:
178
      if self.request_workers:
179
        self.request_workers.TerminateWorkers()
180
      if self.context:
181
        self.context.jobqueue.Shutdown()
182

    
183

    
184
class ClientOps:
185
  """Class holding high-level client operations."""
186
  def __init__(self, server):
187
    self.server = server
188

    
189
  def handle_request(self, method, args): # pylint: disable-msg=R0911
190
    queue = self.server.context.jobqueue
191

    
192
    # TODO: Parameter validation
193

    
194
    # TODO: Rewrite to not exit in each 'if/elif' branch
195

    
196
    if method == luxi.REQ_SUBMIT_JOB:
197
      logging.info("Received new job")
198
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
199
      return queue.SubmitJob(ops)
200

    
201
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
202
      logging.info("Received multiple jobs")
203
      jobs = []
204
      for ops in args:
205
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
206
      return queue.SubmitManyJobs(jobs)
207

    
208
    elif method == luxi.REQ_CANCEL_JOB:
209
      job_id = args
210
      logging.info("Received job cancel request for %s", job_id)
211
      return queue.CancelJob(job_id)
212

    
213
    elif method == luxi.REQ_ARCHIVE_JOB:
214
      job_id = args
215
      logging.info("Received job archive request for %s", job_id)
216
      return queue.ArchiveJob(job_id)
217

    
218
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
219
      (age, timeout) = args
220
      logging.info("Received job autoarchive request for age %s, timeout %s",
221
                   age, timeout)
222
      return queue.AutoArchiveJobs(age, timeout)
223

    
224
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
225
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
226
      logging.info("Received job poll request for %s", job_id)
227
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
228
                                     prev_log_serial, timeout)
229

    
230
    elif method == luxi.REQ_QUERY_JOBS:
231
      (job_ids, fields) = args
232
      if isinstance(job_ids, (tuple, list)) and job_ids:
233
        msg = utils.CommaJoin(job_ids)
234
      else:
235
        msg = str(job_ids)
236
      logging.info("Received job query request for %s", msg)
237
      return queue.QueryJobs(job_ids, fields)
238

    
239
    elif method == luxi.REQ_QUERY_INSTANCES:
240
      (names, fields, use_locking) = args
241
      logging.info("Received instance query request for %s", names)
242
      if use_locking:
243
        raise errors.OpPrereqError("Sync queries are not allowed",
244
                                   errors.ECODE_INVAL)
245
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
246
                                    use_locking=use_locking)
247
      return self._Query(op)
248

    
249
    elif method == luxi.REQ_QUERY_NODES:
250
      (names, fields, use_locking) = args
251
      logging.info("Received node query request for %s", names)
252
      if use_locking:
253
        raise errors.OpPrereqError("Sync queries are not allowed",
254
                                   errors.ECODE_INVAL)
255
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
256
                                use_locking=use_locking)
257
      return self._Query(op)
258

    
259
    elif method == luxi.REQ_QUERY_GROUPS:
260
      (names, fields, use_locking) = args
261
      logging.info("Received group query request for %s", names)
262
      if use_locking:
263
        raise errors.OpPrereqError("Sync queries are not allowed",
264
                                   errors.ECODE_INVAL)
265
      op = opcodes.OpQueryGroups(names=names, output_fields=fields)
266
      return self._Query(op)
267

    
268
    elif method == luxi.REQ_QUERY_EXPORTS:
269
      nodes, use_locking = args
270
      if use_locking:
271
        raise errors.OpPrereqError("Sync queries are not allowed",
272
                                   errors.ECODE_INVAL)
273
      logging.info("Received exports query request")
274
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
275
      return self._Query(op)
276

    
277
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
278
      fields = args
279
      logging.info("Received config values query request for %s", fields)
280
      op = opcodes.OpQueryConfigValues(output_fields=fields)
281
      return self._Query(op)
282

    
283
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
284
      logging.info("Received cluster info query request")
285
      op = opcodes.OpQueryClusterInfo()
286
      return self._Query(op)
287

    
288
    elif method == luxi.REQ_QUERY_TAGS:
289
      kind, name = args
290
      logging.info("Received tags query request")
291
      op = opcodes.OpGetTags(kind=kind, name=name)
292
      return self._Query(op)
293

    
294
    elif method == luxi.REQ_QUERY_LOCKS:
295
      (fields, sync) = args
296
      logging.info("Received locks query request")
297
      return self.server.context.glm.QueryLocks(fields, sync)
298

    
299
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
300
      drain_flag = args
301
      logging.info("Received queue drain flag change request to %s",
302
                   drain_flag)
303
      return queue.SetDrainFlag(drain_flag)
304

    
305
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
306
      (until, ) = args
307

    
308
      if until is None:
309
        logging.info("Received request to no longer pause the watcher")
310
      else:
311
        if not isinstance(until, (int, float)):
312
          raise TypeError("Duration must be an integer or float")
313

    
314
        if until < time.time():
315
          raise errors.GenericError("Unable to set pause end time in the past")
316

    
317
        logging.info("Received request to pause the watcher until %s", until)
318

    
319
      return _SetWatcherPause(until)
320

    
321
    else:
322
      logging.info("Received invalid request '%s'", method)
323
      raise ValueError("Invalid operation '%s'" % method)
324

    
325
  def _Query(self, op):
326
    """Runs the specified opcode and returns the result.
327

328
    """
329
    # Queries don't have a job id
330
    proc = mcpu.Processor(self.server.context, None)
331

    
332
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
333
    # Consider using a timeout for retries.
334
    return proc.ExecOpCode(op, None)
335

    
336

    
337
class GanetiContext(object):
338
  """Context common to all ganeti threads.
339

340
  This class creates and holds common objects shared by all threads.
341

342
  """
343
  # pylint: disable-msg=W0212
344
  # we do want to ensure a singleton here
345
  _instance = None
346

    
347
  def __init__(self):
348
    """Constructs a new GanetiContext object.
349

350
    There should be only a GanetiContext object at any time, so this
351
    function raises an error if this is not the case.
352

353
    """
354
    assert self.__class__._instance is None, "double GanetiContext instance"
355

    
356
    # Create global configuration object
357
    self.cfg = config.ConfigWriter()
358

    
359
    # Locking manager
360
    self.glm = locking.GanetiLockManager(
361
                self.cfg.GetNodeList(),
362
                self.cfg.GetNodeGroupList(),
363
                self.cfg.GetInstanceList())
364

    
365
    # Job queue
366
    self.jobqueue = jqueue.JobQueue(self)
367

    
368
    # setting this also locks the class against attribute modifications
369
    self.__class__._instance = self
370

    
371
  def __setattr__(self, name, value):
372
    """Setting GanetiContext attributes is forbidden after initialization.
373

374
    """
375
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
376
    object.__setattr__(self, name, value)
377

    
378
  def AddNode(self, node, ec_id):
379
    """Adds a node to the configuration and lock manager.
380

381
    """
382
    # Add it to the configuration
383
    self.cfg.AddNode(node, ec_id)
384

    
385
    # If preseeding fails it'll not be added
386
    self.jobqueue.AddNode(node)
387

    
388
    # Add the new node to the Ganeti Lock Manager
389
    self.glm.add(locking.LEVEL_NODE, node.name)
390

    
391
  def ReaddNode(self, node):
392
    """Updates a node that's already in the configuration
393

394
    """
395
    # Synchronize the queue again
396
    self.jobqueue.AddNode(node)
397

    
398
  def RemoveNode(self, name):
399
    """Removes a node from the configuration and lock manager.
400

401
    """
402
    # Remove node from configuration
403
    self.cfg.RemoveNode(name)
404

    
405
    # Notify job queue
406
    self.jobqueue.RemoveNode(name)
407

    
408
    # Remove the node from the Ganeti Lock Manager
409
    self.glm.remove(locking.LEVEL_NODE, name)
410

    
411

    
412
def _SetWatcherPause(until):
413
  """Creates or removes the watcher pause file.
414

415
  @type until: None or int
416
  @param until: Unix timestamp saying until when the watcher shouldn't run
417

418
  """
419
  if until is None:
420
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
421
  else:
422
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
423
                    data="%d\n" % (until, ))
424

    
425
  return until
426

    
427

    
428
@rpc.RunWithRPC
429
def CheckAgreement():
430
  """Check the agreement on who is the master.
431

432
  The function uses a very simple algorithm: we must get more positive
433
  than negative answers. Since in most of the cases we are the master,
434
  we'll use our own config file for getting the node list. In the
435
  future we could collect the current node list from our (possibly
436
  obsolete) known nodes.
437

438
  In order to account for cold-start of all nodes, we retry for up to
439
  a minute until we get a real answer as the top-voted one. If the
440
  nodes are more out-of-sync, for now manual startup of the master
441
  should be attempted.
442

443
  Note that for a even number of nodes cluster, we need at least half
444
  of the nodes (beside ourselves) to vote for us. This creates a
445
  problem on two-node clusters, since in this case we require the
446
  other node to be up too to confirm our status.
447

448
  """
449
  myself = netutils.Hostname.GetSysName()
450
  #temp instantiation of a config writer, used only to get the node list
451
  cfg = config.ConfigWriter()
452
  node_list = cfg.GetNodeList()
453
  del cfg
454
  retries = 6
455
  while retries > 0:
456
    votes = bootstrap.GatherMasterVotes(node_list)
457
    if not votes:
458
      # empty node list, this is a one node cluster
459
      return True
460
    if votes[0][0] is None:
461
      retries -= 1
462
      time.sleep(10)
463
      continue
464
    break
465
  if retries == 0:
466
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
467
                     " after multiple retries. Aborting startup")
468
    logging.critical("Use the --no-voting option if you understand what"
469
                     " effects it has on the cluster state")
470
    return False
471
  # here a real node is at the top of the list
472
  all_votes = sum(item[1] for item in votes)
473
  top_node, top_votes = votes[0]
474

    
475
  result = False
476
  if top_node != myself:
477
    logging.critical("It seems we are not the master (top-voted node"
478
                     " is %s with %d out of %d votes)", top_node, top_votes,
479
                     all_votes)
480
  elif top_votes < all_votes - top_votes:
481
    logging.critical("It seems we are not the master (%d votes for,"
482
                     " %d votes against)", top_votes, all_votes - top_votes)
483
  else:
484
    result = True
485

    
486
  return result
487

    
488

    
489
@rpc.RunWithRPC
490
def ActivateMasterIP():
491
  # activate ip
492
  master_node = ssconf.SimpleStore().GetMasterNode()
493
  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
494
  msg = result.fail_msg
495
  if msg:
496
    logging.error("Can't activate master IP address: %s", msg)
497

    
498

    
499
def CheckMasterd(options, args):
500
  """Initial checks whether to run or exit with a failure.
501

502
  """
503
  if args: # masterd doesn't take any arguments
504
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
505
    sys.exit(constants.EXIT_FAILURE)
506

    
507
  ssconf.CheckMaster(options.debug)
508

    
509
  try:
510
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
511
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
512
  except KeyError:
513
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
514
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
515
    sys.exit(constants.EXIT_FAILURE)
516

    
517
  # Check the configuration is sane before anything else
518
  try:
519
    config.ConfigWriter()
520
  except errors.ConfigVersionMismatch, err:
521
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
522
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
523
    print >> sys.stderr,  \
524
        ("Configuration version mismatch. The current Ganeti software"
525
         " expects version %s, but the on-disk configuration file has"
526
         " version %s. This is likely the result of upgrading the"
527
         " software without running the upgrade procedure. Please contact"
528
         " your cluster administrator or complete the upgrade using the"
529
         " cfgupgrade utility, after reading the upgrade notes." %
530
         (v1, v2))
531
    sys.exit(constants.EXIT_FAILURE)
532
  except errors.ConfigurationError, err:
533
    print >> sys.stderr, \
534
        ("Configuration error while opening the configuration file: %s\n"
535
         "This might be caused by an incomplete software upgrade or"
536
         " by a corrupted configuration file. Until the problem is fixed"
537
         " the master daemon cannot start." % str(err))
538
    sys.exit(constants.EXIT_FAILURE)
539

    
540
  # If CheckMaster didn't fail we believe we are the master, but we have to
541
  # confirm with the other nodes.
542
  if options.no_voting:
543
    if options.yes_do_it:
544
      return
545

    
546
    sys.stdout.write("The 'no voting' option has been selected.\n")
547
    sys.stdout.write("This is dangerous, please confirm by"
548
                     " typing uppercase 'yes': ")
549
    sys.stdout.flush()
550

    
551
    confirmation = sys.stdin.readline().strip()
552
    if confirmation != "YES":
553
      print >> sys.stderr, "Aborting."
554
      sys.exit(constants.EXIT_FAILURE)
555

    
556
    return
557

    
558
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
559
  # process before we call utils.Daemonize in the current process.
560
  if not utils.RunInSeparateProcess(CheckAgreement):
561
    sys.exit(constants.EXIT_FAILURE)
562

    
563
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
564
  # separate process.
565

    
566
  # TODO: decide whether failure to activate the master IP is a fatal error
567
  utils.RunInSeparateProcess(ActivateMasterIP)
568

    
569

    
570
def PrepMasterd(options, _):
571
  """Prep master daemon function, executed with the PID file held.
572

573
  """
574
  # This is safe to do as the pid file guarantees against
575
  # concurrent execution.
576
  utils.RemoveFile(constants.MASTER_SOCKET)
577

    
578
  mainloop = daemon.Mainloop()
579
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
580
                        options.uid, options.gid)
581
  return (mainloop, master)
582

    
583

    
584
def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
585
  """Main master daemon function, executed with the PID file held.
586

587
  """
588
  (mainloop, master) = prep_data
589
  try:
590
    rpc.Init()
591
    try:
592
      master.setup_queue()
593
      try:
594
        mainloop.Run()
595
      finally:
596
        master.server_cleanup()
597
    finally:
598
      rpc.Shutdown()
599
  finally:
600
    utils.RemoveFile(constants.MASTER_SOCKET)
601

    
602

    
603
def Main():
604
  """Main function"""
605
  parser = OptionParser(description="Ganeti master daemon",
606
                        usage="%prog [-f] [-d]",
607
                        version="%%prog (ganeti) %s" %
608
                        constants.RELEASE_VERSION)
609
  parser.add_option("--no-voting", dest="no_voting",
610
                    help="Do not check that the nodes agree on this node"
611
                    " being the master and start the daemon unconditionally",
612
                    default=False, action="store_true")
613
  parser.add_option("--yes-do-it", dest="yes_do_it",
614
                    help="Override interactive check for --no-voting",
615
                    default=False, action="store_true")
616
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
617
                     ExecMasterd, multithreaded=True)