Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 231db3a5

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import os
33
import sys
34
import SocketServer
35
import time
36
import collections
37
import signal
38
import logging
39

    
40
from optparse import OptionParser
41

    
42
from ganeti import config
43
from ganeti import constants
44
from ganeti import daemon
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56
from ganeti import serializer
57

    
58

    
59
CLIENT_REQUEST_WORKERS = 16
60

    
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63

    
64

    
65
class ClientRequestWorker(workerpool.BaseWorker):
66
   # pylint: disable-msg=W0221
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except: # pylint: disable-msg=W0702
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115
    """Handle one request at a time until told to quit."""
116
    assert isinstance(signal_handlers, dict) and \
117
           len(signal_handlers) > 0, \
118
           "Broken SignalHandled decorator"
119
    # Since we use SignalHandled only once, the resulting dict will map all
120
    # signals to the same handler. We'll just use the first one.
121
    sighandler = signal_handlers.values()[0]
122
    while not sighandler.called:
123
      self.handle_request()
124

    
125
  def server_cleanup(self):
126
    """Cleanup the server.
127

    
128
    This involves shutting down the processor threads and the master
129
    socket.
130

    
131
    """
132
    try:
133
      self.server_close()
134
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137
      if self.context:
138
        self.context.jobqueue.Shutdown()
139

    
140

    
141
class ClientRqHandler(SocketServer.BaseRequestHandler):
142
  """Client handler"""
143
  EOM = '\3'
144
  READ_SIZE = 4096
145

    
146
  def setup(self):
147
    # pylint: disable-msg=W0201
148
    # setup() is the api for initialising for this class
149
    self._buffer = ""
150
    self._msgs = collections.deque()
151
    self._ops = ClientOps(self.server)
152

    
153
  def handle(self):
154
    while True:
155
      msg = self.read_message()
156
      if msg is None:
157
        logging.debug("client closed connection")
158
        break
159

    
160
      (method, args) = luxi.ParseRequest(msg)
161

    
162
      success = False
163
      try:
164
        result = self._ops.handle_request(method, args)
165
        success = True
166
      except errors.GenericError, err:
167
        result = errors.EncodeException(err)
168
      except:
169
        logging.error("Unexpected exception", exc_info=True)
170
        result = "Caught exception: %s" % str(sys.exc_info()[1])
171

    
172
      self.send_message(luxi.FormatResponse(success, result))
173

    
174
  def read_message(self):
175
    while not self._msgs:
176
      data = self.request.recv(self.READ_SIZE)
177
      if not data:
178
        return None
179
      new_msgs = (self._buffer + data).split(self.EOM)
180
      self._buffer = new_msgs.pop()
181
      self._msgs.extend(new_msgs)
182
    return self._msgs.popleft()
183

    
184
  def send_message(self, msg):
185
    # TODO: sendall is not guaranteed to send everything
186
    self.request.sendall(msg + self.EOM)
187

    
188

    
189
class ClientOps:
190
  """Class holding high-level client operations."""
191
  def __init__(self, server):
192
    self.server = server
193

    
194
  def handle_request(self, method, args): # pylint: disable-msg=R0911
195
    queue = self.server.context.jobqueue
196

    
197
    # TODO: Parameter validation
198

    
199
    # TODO: Rewrite to not exit in each 'if/elif' branch
200

    
201
    if method == luxi.REQ_SUBMIT_JOB:
202
      logging.info("Received new job")
203
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
204
      return queue.SubmitJob(ops)
205

    
206
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
207
      logging.info("Received multiple jobs")
208
      jobs = []
209
      for ops in args:
210
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
211
      return queue.SubmitManyJobs(jobs)
212

    
213
    elif method == luxi.REQ_CANCEL_JOB:
214
      job_id = args
215
      logging.info("Received job cancel request for %s", job_id)
216
      return queue.CancelJob(job_id)
217

    
218
    elif method == luxi.REQ_ARCHIVE_JOB:
219
      job_id = args
220
      logging.info("Received job archive request for %s", job_id)
221
      return queue.ArchiveJob(job_id)
222

    
223
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
224
      (age, timeout) = args
225
      logging.info("Received job autoarchive request for age %s, timeout %s",
226
                   age, timeout)
227
      return queue.AutoArchiveJobs(age, timeout)
228

    
229
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
230
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
231
      logging.info("Received job poll request for %s", job_id)
232
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
233
                                     prev_log_serial, timeout)
234

    
235
    elif method == luxi.REQ_QUERY_JOBS:
236
      (job_ids, fields) = args
237
      if isinstance(job_ids, (tuple, list)) and job_ids:
238
        msg = utils.CommaJoin(job_ids)
239
      else:
240
        msg = str(job_ids)
241
      logging.info("Received job query request for %s", msg)
242
      return queue.QueryJobs(job_ids, fields)
243

    
244
    elif method == luxi.REQ_QUERY_INSTANCES:
245
      (names, fields, use_locking) = args
246
      logging.info("Received instance query request for %s", names)
247
      if use_locking:
248
        raise errors.OpPrereqError("Sync queries are not allowed",
249
                                   errors.ECODE_INVAL)
250
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
251
                                    use_locking=use_locking)
252
      return self._Query(op)
253

    
254
    elif method == luxi.REQ_QUERY_NODES:
255
      (names, fields, use_locking) = args
256
      logging.info("Received node query request for %s", names)
257
      if use_locking:
258
        raise errors.OpPrereqError("Sync queries are not allowed",
259
                                   errors.ECODE_INVAL)
260
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
261
                                use_locking=use_locking)
262
      return self._Query(op)
263

    
264
    elif method == luxi.REQ_QUERY_EXPORTS:
265
      nodes, use_locking = args
266
      if use_locking:
267
        raise errors.OpPrereqError("Sync queries are not allowed",
268
                                   errors.ECODE_INVAL)
269
      logging.info("Received exports query request")
270
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
274
      fields = args
275
      logging.info("Received config values query request for %s", fields)
276
      op = opcodes.OpQueryConfigValues(output_fields=fields)
277
      return self._Query(op)
278

    
279
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
280
      logging.info("Received cluster info query request")
281
      op = opcodes.OpQueryClusterInfo()
282
      return self._Query(op)
283

    
284
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
285
      drain_flag = args
286
      logging.info("Received queue drain flag change request to %s",
287
                   drain_flag)
288
      return queue.SetDrainFlag(drain_flag)
289

    
290
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
291
      (until, ) = args
292

    
293
      if until is None:
294
        logging.info("Received request to no longer pause the watcher")
295
      else:
296
        if not isinstance(until, (int, float)):
297
          raise TypeError("Duration must be an integer or float")
298

    
299
        if until < time.time():
300
          raise errors.GenericError("Unable to set pause end time in the past")
301

    
302
        logging.info("Received request to pause the watcher until %s", until)
303

    
304
      return _SetWatcherPause(until)
305

    
306
    else:
307
      logging.info("Received invalid request '%s'", method)
308
      raise ValueError("Invalid operation '%s'" % method)
309

    
310
  def _Query(self, op):
311
    """Runs the specified opcode and returns the result.
312

    
313
    """
314
    # Queries don't have a job id
315
    proc = mcpu.Processor(self.server.context, None)
316
    return proc.ExecOpCode(op, None)
317

    
318

    
319
class GanetiContext(object):
320
  """Context common to all ganeti threads.
321

    
322
  This class creates and holds common objects shared by all threads.
323

    
324
  """
325
  # pylint: disable-msg=W0212
326
  # we do want to ensure a singleton here
327
  _instance = None
328

    
329
  def __init__(self):
330
    """Constructs a new GanetiContext object.
331

    
332
    There should be only a GanetiContext object at any time, so this
333
    function raises an error if this is not the case.
334

    
335
    """
336
    assert self.__class__._instance is None, "double GanetiContext instance"
337

    
338
    # Create global configuration object
339
    self.cfg = config.ConfigWriter()
340

    
341
    # Locking manager
342
    self.glm = locking.GanetiLockManager(
343
                self.cfg.GetNodeList(),
344
                self.cfg.GetInstanceList())
345

    
346
    # Job queue
347
    self.jobqueue = jqueue.JobQueue(self)
348

    
349
    # setting this also locks the class against attribute modifications
350
    self.__class__._instance = self
351

    
352
  def __setattr__(self, name, value):
353
    """Setting GanetiContext attributes is forbidden after initialization.
354

    
355
    """
356
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
357
    object.__setattr__(self, name, value)
358

    
359
  def AddNode(self, node, ec_id):
360
    """Adds a node to the configuration and lock manager.
361

    
362
    """
363
    # Add it to the configuration
364
    self.cfg.AddNode(node, ec_id)
365

    
366
    # If preseeding fails it'll not be added
367
    self.jobqueue.AddNode(node)
368

    
369
    # Add the new node to the Ganeti Lock Manager
370
    self.glm.add(locking.LEVEL_NODE, node.name)
371

    
372
  def ReaddNode(self, node):
373
    """Updates a node that's already in the configuration
374

    
375
    """
376
    # Synchronize the queue again
377
    self.jobqueue.AddNode(node)
378

    
379
  def RemoveNode(self, name):
380
    """Removes a node from the configuration and lock manager.
381

    
382
    """
383
    # Remove node from configuration
384
    self.cfg.RemoveNode(name)
385

    
386
    # Notify job queue
387
    self.jobqueue.RemoveNode(name)
388

    
389
    # Remove the node from the Ganeti Lock Manager
390
    self.glm.remove(locking.LEVEL_NODE, name)
391

    
392

    
393
def _SetWatcherPause(until):
394
  """Creates or removes the watcher pause file.
395

    
396
  @type until: None or int
397
  @param until: Unix timestamp saying until when the watcher shouldn't run
398

    
399
  """
400
  if until is None:
401
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
402
  else:
403
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
404
                    data="%d\n" % (until, ))
405

    
406
  return until
407

    
408

    
409
def CheckAgreement():
410
  """Check the agreement on who is the master.
411

    
412
  The function uses a very simple algorithm: we must get more positive
413
  than negative answers. Since in most of the cases we are the master,
414
  we'll use our own config file for getting the node list. In the
415
  future we could collect the current node list from our (possibly
416
  obsolete) known nodes.
417

    
418
  In order to account for cold-start of all nodes, we retry for up to
419
  a minute until we get a real answer as the top-voted one. If the
420
  nodes are more out-of-sync, for now manual startup of the master
421
  should be attempted.
422

    
423
  Note that for a even number of nodes cluster, we need at least half
424
  of the nodes (beside ourselves) to vote for us. This creates a
425
  problem on two-node clusters, since in this case we require the
426
  other node to be up too to confirm our status.
427

    
428
  """
429
  myself = utils.HostInfo().name
430
  #temp instantiation of a config writer, used only to get the node list
431
  cfg = config.ConfigWriter()
432
  node_list = cfg.GetNodeList()
433
  del cfg
434
  retries = 6
435
  while retries > 0:
436
    votes = bootstrap.GatherMasterVotes(node_list)
437
    if not votes:
438
      # empty node list, this is a one node cluster
439
      return True
440
    if votes[0][0] is None:
441
      retries -= 1
442
      time.sleep(10)
443
      continue
444
    break
445
  if retries == 0:
446
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
447
                     " after multiple retries. Aborting startup")
448
    return False
449
  # here a real node is at the top of the list
450
  all_votes = sum(item[1] for item in votes)
451
  top_node, top_votes = votes[0]
452

    
453
  result = False
454
  if top_node != myself:
455
    logging.critical("It seems we are not the master (top-voted node"
456
                     " is %s with %d out of %d votes)", top_node, top_votes,
457
                     all_votes)
458
  elif top_votes < all_votes - top_votes:
459
    logging.critical("It seems we are not the master (%d votes for,"
460
                     " %d votes against)", top_votes, all_votes - top_votes)
461
  else:
462
    result = True
463

    
464
  return result
465

    
466

    
467
def CheckAgreementWithRpc():
468
  rpc.Init()
469
  try:
470
    return CheckAgreement()
471
  finally:
472
    rpc.Shutdown()
473

    
474

    
475
def _RunInSeparateProcess(fn):
476
  """Runs a function in a separate process.
477

    
478
  Note: Only boolean return values are supported.
479

    
480
  @type fn: callable
481
  @param fn: Function to be called
482
  @rtype: bool
483

    
484
  """
485
  pid = os.fork()
486
  if pid == 0:
487
    # Child process
488
    try:
489
      # Call function
490
      result = int(bool(fn()))
491
      assert result in (0, 1)
492
    except: # pylint: disable-msg=W0702
493
      logging.exception("Error while calling function in separate process")
494
      # 0 and 1 are reserved for the return value
495
      result = 33
496

    
497
    os._exit(result) # pylint: disable-msg=W0212
498

    
499
  # Parent process
500

    
501
  # Avoid zombies and check exit code
502
  (_, status) = os.waitpid(pid, 0)
503

    
504
  if os.WIFSIGNALED(status):
505
    signum = os.WTERMSIG(status)
506
    exitcode = None
507
  else:
508
    signum = None
509
    exitcode = os.WEXITSTATUS(status)
510

    
511
  if not (exitcode in (0, 1) and signum is None):
512
    logging.error("Child program failed (code=%s, signal=%s)",
513
                  exitcode, signum)
514
    sys.exit(constants.EXIT_FAILURE)
515

    
516
  return bool(exitcode)
517

    
518

    
519
def CheckMasterd(options, args):
520
  """Initial checks whether to run or exit with a failure.
521

    
522
  """
523
  if args: # masterd doesn't take any arguments
524
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
525
    sys.exit(constants.EXIT_FAILURE)
526

    
527
  ssconf.CheckMaster(options.debug)
528

    
529
  # If CheckMaster didn't fail we believe we are the master, but we have to
530
  # confirm with the other nodes.
531
  if options.no_voting:
532
    if options.yes_do_it:
533
      return
534

    
535
    sys.stdout.write("The 'no voting' option has been selected.\n")
536
    sys.stdout.write("This is dangerous, please confirm by"
537
                     " typing uppercase 'yes': ")
538
    sys.stdout.flush()
539

    
540
    confirmation = sys.stdin.readline().strip()
541
    if confirmation != "YES":
542
      print >> sys.stderr, "Aborting."
543
      sys.exit(constants.EXIT_FAILURE)
544

    
545
    return
546

    
547
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
548
  # process before we call utils.Daemonize in the current process.
549
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
550
    sys.exit(constants.EXIT_FAILURE)
551

    
552

    
553
def ExecMasterd (options, args): # pylint: disable-msg=W0613
554
  """Main master daemon function, executed with the PID file held.
555

    
556
  """
557
  # This is safe to do as the pid file guarantees against
558
  # concurrent execution.
559
  utils.RemoveFile(constants.MASTER_SOCKET)
560

    
561
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
562
  try:
563
    rpc.Init()
564
    try:
565
      # activate ip
566
      master_node = ssconf.SimpleStore().GetMasterNode()
567
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
568
      msg = result.fail_msg
569
      if msg:
570
        logging.error("Can't activate master IP address: %s", msg)
571

    
572
      master.setup_queue()
573
      try:
574
        master.serve_forever()
575
      finally:
576
        master.server_cleanup()
577
    finally:
578
      rpc.Shutdown()
579
  finally:
580
    utils.RemoveFile(constants.MASTER_SOCKET)
581

    
582

    
583
def main():
584
  """Main function"""
585
  parser = OptionParser(description="Ganeti master daemon",
586
                        usage="%prog [-f] [-d]",
587
                        version="%%prog (ganeti) %s" %
588
                        constants.RELEASE_VERSION)
589
  parser.add_option("--no-voting", dest="no_voting",
590
                    help="Do not check that the nodes agree on this node"
591
                    " being the master and start the daemon unconditionally",
592
                    default=False, action="store_true")
593
  parser.add_option("--yes-do-it", dest="yes_do_it",
594
                    help="Override interactive check for --no-voting",
595
                    default=False, action="store_true")
596
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
597
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
598
         ]
599
  daemon.GenericMain(constants.MASTERD, parser, dirs,
600
                     CheckMasterd, ExecMasterd,
601
                     multithreaded=True)
602

    
603

    
604
if __name__ == "__main__":
605
  main()