Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ f2af0bec

History | View | Annotate | Download (21.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable-msg=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable-msg=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116
  def __init__(self, server, connected_socket, client_address, family):
117
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
118
                                                 client_address,
119
                                                 constants.LUXI_EOM,
120
                                                 family, self._MAX_UNHANDLED)
121
    self.server = server
122

    
123
  def handle_message(self, message, _):
124
    self.server.request_workers.AddTask((self.server, message, self))
125

    
126

    
127
class MasterServer(daemon.AsyncStreamServer):
128
  """Master Server.
129

130
  This is the main asynchronous master server. It handles connections to the
131
  master socket.
132

133
  """
134
  family = socket.AF_UNIX
135

    
136
  def __init__(self, mainloop, address, uid, gid):
137
    """MasterServer constructor
138

139
    @type mainloop: ganeti.daemon.Mainloop
140
    @param mainloop: Mainloop used to poll for I/O events
141
    @param address: the unix socket address to bind the MasterServer to
142
    @param uid: The uid of the owner of the socket
143
    @param gid: The gid of the owner of the socket
144

145
    """
146
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148
    os.chmod(temp_name, 0770)
149
    os.chown(temp_name, uid, gid)
150
    os.rename(temp_name, address)
151

    
152
    self.mainloop = mainloop
153
    self.awaker = daemon.AsyncAwaker()
154

    
155
    # We'll only start threads once we've forked.
156
    self.context = None
157
    self.request_workers = None
158

    
159
  def handle_connection(self, connected_socket, client_address):
160
    # TODO: add connection count and limit the number of open connections to a
161
    # maximum number to avoid breaking for lack of file descriptors or memory.
162
    MasterClientHandler(self, connected_socket, client_address, self.family)
163

    
164
  def setup_queue(self):
165
    self.context = GanetiContext()
166
    self.request_workers = workerpool.WorkerPool("ClientReq",
167
                                                 CLIENT_REQUEST_WORKERS,
168
                                                 ClientRequestWorker)
169

    
170
  def server_cleanup(self):
171
    """Cleanup the server.
172

173
    This involves shutting down the processor threads and the master
174
    socket.
175

176
    """
177
    try:
178
      self.close()
179
    finally:
180
      if self.request_workers:
181
        self.request_workers.TerminateWorkers()
182
      if self.context:
183
        self.context.jobqueue.Shutdown()
184

    
185

    
186
class ClientOps:
187
  """Class holding high-level client operations."""
188
  def __init__(self, server):
189
    self.server = server
190

    
191
  def handle_request(self, method, args): # pylint: disable-msg=R0911
192
    queue = self.server.context.jobqueue
193

    
194
    # TODO: Parameter validation
195

    
196
    # TODO: Rewrite to not exit in each 'if/elif' branch
197

    
198
    if method == luxi.REQ_SUBMIT_JOB:
199
      logging.info("Received new job")
200
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201
      return queue.SubmitJob(ops)
202

    
203
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
204
      logging.info("Received multiple jobs")
205
      jobs = []
206
      for ops in args:
207
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208
      return queue.SubmitManyJobs(jobs)
209

    
210
    elif method == luxi.REQ_CANCEL_JOB:
211
      job_id = args
212
      logging.info("Received job cancel request for %s", job_id)
213
      return queue.CancelJob(job_id)
214

    
215
    elif method == luxi.REQ_ARCHIVE_JOB:
216
      job_id = args
217
      logging.info("Received job archive request for %s", job_id)
218
      return queue.ArchiveJob(job_id)
219

    
220
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221
      (age, timeout) = args
222
      logging.info("Received job autoarchive request for age %s, timeout %s",
223
                   age, timeout)
224
      return queue.AutoArchiveJobs(age, timeout)
225

    
226
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228
      logging.info("Received job poll request for %s", job_id)
229
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230
                                     prev_log_serial, timeout)
231

    
232
    elif method == luxi.REQ_QUERY:
233
      req = objects.QueryRequest.FromDict(args)
234

    
235
      if req.what in constants.QR_OP_QUERY:
236
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
237
                                             filter=req.filter))
238
      elif req.what == constants.QR_LOCK:
239
        if req.filter is not None:
240
          raise errors.OpPrereqError("Lock queries can't be filtered")
241
        return self.server.context.glm.QueryLocks(req.fields)
242
      elif req.what in constants.QR_OP_LUXI:
243
        raise NotImplementedError
244
      else:
245
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
246
                                   errors.ECODE_INVAL)
247

    
248
      return result
249

    
250
    elif method == luxi.REQ_QUERY_FIELDS:
251
      req = objects.QueryFieldsRequest.FromDict(args)
252

    
253
      if req.what in constants.QR_OP_QUERY:
254
        result = self._Query(opcodes.OpQueryFields(what=req.what,
255
                                                   fields=req.fields))
256
      elif req.what == constants.QR_LOCK:
257
        return query.QueryFields(query.LOCK_FIELDS, req.fields)
258
      elif req.what in constants.QR_OP_LUXI:
259
        raise NotImplementedError
260
      else:
261
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
262
                                   errors.ECODE_INVAL)
263

    
264
      return result
265

    
266
    elif method == luxi.REQ_QUERY_JOBS:
267
      (job_ids, fields) = args
268
      if isinstance(job_ids, (tuple, list)) and job_ids:
269
        msg = utils.CommaJoin(job_ids)
270
      else:
271
        msg = str(job_ids)
272
      logging.info("Received job query request for %s", msg)
273
      return queue.QueryJobs(job_ids, fields)
274

    
275
    elif method == luxi.REQ_QUERY_INSTANCES:
276
      (names, fields, use_locking) = args
277
      logging.info("Received instance query request for %s", names)
278
      if use_locking:
279
        raise errors.OpPrereqError("Sync queries are not allowed",
280
                                   errors.ECODE_INVAL)
281
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
282
                                   use_locking=use_locking)
283
      return self._Query(op)
284

    
285
    elif method == luxi.REQ_QUERY_NODES:
286
      (names, fields, use_locking) = args
287
      logging.info("Received node query request for %s", names)
288
      if use_locking:
289
        raise errors.OpPrereqError("Sync queries are not allowed",
290
                                   errors.ECODE_INVAL)
291
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
292
                                use_locking=use_locking)
293
      return self._Query(op)
294

    
295
    elif method == luxi.REQ_QUERY_GROUPS:
296
      (names, fields, use_locking) = args
297
      logging.info("Received group query request for %s", names)
298
      if use_locking:
299
        raise errors.OpPrereqError("Sync queries are not allowed",
300
                                   errors.ECODE_INVAL)
301
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
302
      return self._Query(op)
303

    
304
    elif method == luxi.REQ_QUERY_EXPORTS:
305
      nodes, use_locking = args
306
      if use_locking:
307
        raise errors.OpPrereqError("Sync queries are not allowed",
308
                                   errors.ECODE_INVAL)
309
      logging.info("Received exports query request")
310
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
311
      return self._Query(op)
312

    
313
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
314
      fields = args
315
      logging.info("Received config values query request for %s", fields)
316
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
317
      return self._Query(op)
318

    
319
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
320
      logging.info("Received cluster info query request")
321
      op = opcodes.OpClusterQuery()
322
      return self._Query(op)
323

    
324
    elif method == luxi.REQ_QUERY_TAGS:
325
      kind, name = args
326
      logging.info("Received tags query request")
327
      op = opcodes.OpGetTags(kind=kind, name=name)
328
      return self._Query(op)
329

    
330
    elif method == luxi.REQ_QUERY_LOCKS:
331
      (fields, sync) = args
332
      logging.info("Received locks query request")
333
      if sync:
334
        raise NotImplementedError("Synchronous queries are not implemented")
335
      return self.server.context.glm.OldStyleQueryLocks(fields)
336

    
337
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
338
      drain_flag = args
339
      logging.info("Received queue drain flag change request to %s",
340
                   drain_flag)
341
      return queue.SetDrainFlag(drain_flag)
342

    
343
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
344
      (until, ) = args
345

    
346
      if until is None:
347
        logging.info("Received request to no longer pause the watcher")
348
      else:
349
        if not isinstance(until, (int, float)):
350
          raise TypeError("Duration must be an integer or float")
351

    
352
        if until < time.time():
353
          raise errors.GenericError("Unable to set pause end time in the past")
354

    
355
        logging.info("Received request to pause the watcher until %s", until)
356

    
357
      return _SetWatcherPause(until)
358

    
359
    else:
360
      logging.info("Received invalid request '%s'", method)
361
      raise ValueError("Invalid operation '%s'" % method)
362

    
363
  def _Query(self, op):
364
    """Runs the specified opcode and returns the result.
365

366
    """
367
    # Queries don't have a job id
368
    proc = mcpu.Processor(self.server.context, None)
369

    
370
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
371
    # Consider using a timeout for retries.
372
    return proc.ExecOpCode(op, None)
373

    
374

    
375
class GanetiContext(object):
376
  """Context common to all ganeti threads.
377

378
  This class creates and holds common objects shared by all threads.
379

380
  """
381
  # pylint: disable-msg=W0212
382
  # we do want to ensure a singleton here
383
  _instance = None
384

    
385
  def __init__(self):
386
    """Constructs a new GanetiContext object.
387

388
    There should be only a GanetiContext object at any time, so this
389
    function raises an error if this is not the case.
390

391
    """
392
    assert self.__class__._instance is None, "double GanetiContext instance"
393

    
394
    # Create global configuration object
395
    self.cfg = config.ConfigWriter()
396

    
397
    # Locking manager
398
    self.glm = locking.GanetiLockManager(
399
                self.cfg.GetNodeList(),
400
                self.cfg.GetNodeGroupList(),
401
                self.cfg.GetInstanceList())
402

    
403
    # Job queue
404
    self.jobqueue = jqueue.JobQueue(self)
405

    
406
    # setting this also locks the class against attribute modifications
407
    self.__class__._instance = self
408

    
409
  def __setattr__(self, name, value):
410
    """Setting GanetiContext attributes is forbidden after initialization.
411

412
    """
413
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
414
    object.__setattr__(self, name, value)
415

    
416
  def AddNode(self, node, ec_id):
417
    """Adds a node to the configuration and lock manager.
418

419
    """
420
    # Add it to the configuration
421
    self.cfg.AddNode(node, ec_id)
422

    
423
    # If preseeding fails it'll not be added
424
    self.jobqueue.AddNode(node)
425

    
426
    # Add the new node to the Ganeti Lock Manager
427
    self.glm.add(locking.LEVEL_NODE, node.name)
428

    
429
  def ReaddNode(self, node):
430
    """Updates a node that's already in the configuration
431

432
    """
433
    # Synchronize the queue again
434
    self.jobqueue.AddNode(node)
435

    
436
  def RemoveNode(self, name):
437
    """Removes a node from the configuration and lock manager.
438

439
    """
440
    # Remove node from configuration
441
    self.cfg.RemoveNode(name)
442

    
443
    # Notify job queue
444
    self.jobqueue.RemoveNode(name)
445

    
446
    # Remove the node from the Ganeti Lock Manager
447
    self.glm.remove(locking.LEVEL_NODE, name)
448

    
449

    
450
def _SetWatcherPause(until):
451
  """Creates or removes the watcher pause file.
452

453
  @type until: None or int
454
  @param until: Unix timestamp saying until when the watcher shouldn't run
455

456
  """
457
  if until is None:
458
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
459
  else:
460
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
461
                    data="%d\n" % (until, ))
462

    
463
  return until
464

    
465

    
466
@rpc.RunWithRPC
467
def CheckAgreement():
468
  """Check the agreement on who is the master.
469

470
  The function uses a very simple algorithm: we must get more positive
471
  than negative answers. Since in most of the cases we are the master,
472
  we'll use our own config file for getting the node list. In the
473
  future we could collect the current node list from our (possibly
474
  obsolete) known nodes.
475

476
  In order to account for cold-start of all nodes, we retry for up to
477
  a minute until we get a real answer as the top-voted one. If the
478
  nodes are more out-of-sync, for now manual startup of the master
479
  should be attempted.
480

481
  Note that for a even number of nodes cluster, we need at least half
482
  of the nodes (beside ourselves) to vote for us. This creates a
483
  problem on two-node clusters, since in this case we require the
484
  other node to be up too to confirm our status.
485

486
  """
487
  myself = netutils.Hostname.GetSysName()
488
  #temp instantiation of a config writer, used only to get the node list
489
  cfg = config.ConfigWriter()
490
  node_list = cfg.GetNodeList()
491
  del cfg
492
  retries = 6
493
  while retries > 0:
494
    votes = bootstrap.GatherMasterVotes(node_list)
495
    if not votes:
496
      # empty node list, this is a one node cluster
497
      return True
498
    if votes[0][0] is None:
499
      retries -= 1
500
      time.sleep(10)
501
      continue
502
    break
503
  if retries == 0:
504
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
505
                     " after multiple retries. Aborting startup")
506
    logging.critical("Use the --no-voting option if you understand what"
507
                     " effects it has on the cluster state")
508
    return False
509
  # here a real node is at the top of the list
510
  all_votes = sum(item[1] for item in votes)
511
  top_node, top_votes = votes[0]
512

    
513
  result = False
514
  if top_node != myself:
515
    logging.critical("It seems we are not the master (top-voted node"
516
                     " is %s with %d out of %d votes)", top_node, top_votes,
517
                     all_votes)
518
  elif top_votes < all_votes - top_votes:
519
    logging.critical("It seems we are not the master (%d votes for,"
520
                     " %d votes against)", top_votes, all_votes - top_votes)
521
  else:
522
    result = True
523

    
524
  return result
525

    
526

    
527
@rpc.RunWithRPC
528
def ActivateMasterIP():
529
  # activate ip
530
  master_node = ssconf.SimpleStore().GetMasterNode()
531
  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
532
  msg = result.fail_msg
533
  if msg:
534
    logging.error("Can't activate master IP address: %s", msg)
535

    
536

    
537
def CheckMasterd(options, args):
538
  """Initial checks whether to run or exit with a failure.
539

540
  """
541
  if args: # masterd doesn't take any arguments
542
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543
    sys.exit(constants.EXIT_FAILURE)
544

    
545
  ssconf.CheckMaster(options.debug)
546

    
547
  try:
548
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
549
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
550
  except KeyError:
551
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
552
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
553
    sys.exit(constants.EXIT_FAILURE)
554

    
555
  # Check the configuration is sane before anything else
556
  try:
557
    config.ConfigWriter()
558
  except errors.ConfigVersionMismatch, err:
559
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
560
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
561
    print >> sys.stderr,  \
562
        ("Configuration version mismatch. The current Ganeti software"
563
         " expects version %s, but the on-disk configuration file has"
564
         " version %s. This is likely the result of upgrading the"
565
         " software without running the upgrade procedure. Please contact"
566
         " your cluster administrator or complete the upgrade using the"
567
         " cfgupgrade utility, after reading the upgrade notes." %
568
         (v1, v2))
569
    sys.exit(constants.EXIT_FAILURE)
570
  except errors.ConfigurationError, err:
571
    print >> sys.stderr, \
572
        ("Configuration error while opening the configuration file: %s\n"
573
         "This might be caused by an incomplete software upgrade or"
574
         " by a corrupted configuration file. Until the problem is fixed"
575
         " the master daemon cannot start." % str(err))
576
    sys.exit(constants.EXIT_FAILURE)
577

    
578
  # If CheckMaster didn't fail we believe we are the master, but we have to
579
  # confirm with the other nodes.
580
  if options.no_voting:
581
    if options.yes_do_it:
582
      return
583

    
584
    sys.stdout.write("The 'no voting' option has been selected.\n")
585
    sys.stdout.write("This is dangerous, please confirm by"
586
                     " typing uppercase 'yes': ")
587
    sys.stdout.flush()
588

    
589
    confirmation = sys.stdin.readline().strip()
590
    if confirmation != "YES":
591
      print >> sys.stderr, "Aborting."
592
      sys.exit(constants.EXIT_FAILURE)
593

    
594
    return
595

    
596
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
597
  # process before we call utils.Daemonize in the current process.
598
  if not utils.RunInSeparateProcess(CheckAgreement):
599
    sys.exit(constants.EXIT_FAILURE)
600

    
601
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
602
  # separate process.
603

    
604
  # TODO: decide whether failure to activate the master IP is a fatal error
605
  utils.RunInSeparateProcess(ActivateMasterIP)
606

    
607

    
608
def PrepMasterd(options, _):
609
  """Prep master daemon function, executed with the PID file held.
610

611
  """
612
  # This is safe to do as the pid file guarantees against
613
  # concurrent execution.
614
  utils.RemoveFile(constants.MASTER_SOCKET)
615

    
616
  mainloop = daemon.Mainloop()
617
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
618
                        options.uid, options.gid)
619
  return (mainloop, master)
620

    
621

    
622
def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
623
  """Main master daemon function, executed with the PID file held.
624

625
  """
626
  (mainloop, master) = prep_data
627
  try:
628
    rpc.Init()
629
    try:
630
      master.setup_queue()
631
      try:
632
        mainloop.Run()
633
      finally:
634
        master.server_cleanup()
635
    finally:
636
      rpc.Shutdown()
637
  finally:
638
    utils.RemoveFile(constants.MASTER_SOCKET)
639

    
640

    
641
def Main():
642
  """Main function"""
643
  parser = OptionParser(description="Ganeti master daemon",
644
                        usage="%prog [-f] [-d]",
645
                        version="%%prog (ganeti) %s" %
646
                        constants.RELEASE_VERSION)
647
  parser.add_option("--no-voting", dest="no_voting",
648
                    help="Do not check that the nodes agree on this node"
649
                    " being the master and start the daemon unconditionally",
650
                    default=False, action="store_true")
651
  parser.add_option("--yes-do-it", dest="yes_do_it",
652
                    help="Override interactive check for --no-voting",
653
                    default=False, action="store_true")
654
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
655
                     ExecMasterd, multithreaded=True)