Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 28b498cd

History | View | Annotate | Download (17.9 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import sys
32
import SocketServer
33
import time
34
import collections
35
import signal
36
import logging
37

    
38
from cStringIO import StringIO
39
from optparse import OptionParser
40

    
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import daemon
44
from ganeti import mcpu
45
from ganeti import opcodes
46
from ganeti import jqueue
47
from ganeti import locking
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55
from ganeti import serializer
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
89

    
90
    @param address: the address to bind this IOServer to
91
    @param rqhandler: RequestHandler type object
92

    
93
    """
94
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
95

    
96
    # We'll only start threads once we've forked.
97
    self.context = None
98
    self.request_workers = None
99

    
100
  def setup_queue(self):
101
    self.context = GanetiContext()
102
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
103
                                                 ClientRequestWorker)
104

    
105
  def process_request(self, request, client_address):
106
    """Add task to workerpool to process request.
107

    
108
    """
109
    self.request_workers.AddTask(self, request, client_address)
110

    
111
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
112
  def serve_forever(self, signal_handlers=None):
113
    """Handle one request at a time until told to quit."""
114
    assert isinstance(signal_handlers, dict) and \
115
           len(signal_handlers) > 0, \
116
           "Broken SignalHandled decorator"
117
    # Since we use SignalHandled only once, the resulting dict will map all
118
    # signals to the same handler. We'll just use the first one.
119
    sighandler = signal_handlers.values()[0]
120
    while not sighandler.called:
121
      self.handle_request()
122

    
123
  def server_cleanup(self):
124
    """Cleanup the server.
125

    
126
    This involves shutting down the processor threads and the master
127
    socket.
128

    
129
    """
130
    try:
131
      self.server_close()
132
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
      if self.context:
136
        self.context.jobqueue.Shutdown()
137

    
138

    
139
class ClientRqHandler(SocketServer.BaseRequestHandler):
140
  """Client handler"""
141
  EOM = '\3'
142
  READ_SIZE = 4096
143

    
144
  def setup(self):
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147
    self._ops = ClientOps(self.server)
148

    
149
  def handle(self):
150
    while True:
151
      msg = self.read_message()
152
      if msg is None:
153
        logging.debug("client closed connection")
154
        break
155

    
156
      request = serializer.LoadJson(msg)
157
      logging.debug("request: %s", request)
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
160
        break
161

    
162
      method = request.get(luxi.KEY_METHOD, None)
163
      args = request.get(luxi.KEY_ARGS, None)
164
      if method is None or args is None:
165
        logging.error("no method or args in request")
166
        break
167

    
168
      success = False
169
      try:
170
        result = self._ops.handle_request(method, args)
171
        success = True
172
      except errors.GenericError, err:
173
        success = False
174
        result = errors.EncodeException(err)
175
      except:
176
        logging.error("Unexpected exception", exc_info=True)
177
        err = sys.exc_info()
178
        result = "Caught exception: %s" % str(err[1])
179

    
180
      response = {
181
        luxi.KEY_SUCCESS: success,
182
        luxi.KEY_RESULT: result,
183
        }
184
      logging.debug("response: %s", response)
185
      self.send_message(serializer.DumpJson(response))
186

    
187
  def read_message(self):
188
    while not self._msgs:
189
      data = self.request.recv(self.READ_SIZE)
190
      if not data:
191
        return None
192
      new_msgs = (self._buffer + data).split(self.EOM)
193
      self._buffer = new_msgs.pop()
194
      self._msgs.extend(new_msgs)
195
    return self._msgs.popleft()
196

    
197
  def send_message(self, msg):
198
    #print "sending", msg
199
    # TODO: sendall is not guaranteed to send everything
200
    self.request.sendall(msg + self.EOM)
201

    
202

    
203
class ClientOps:
204
  """Class holding high-level client operations."""
205
  def __init__(self, server):
206
    self.server = server
207

    
208
  def handle_request(self, method, args):
209
    queue = self.server.context.jobqueue
210

    
211
    # TODO: Parameter validation
212

    
213
    if method == luxi.REQ_SUBMIT_JOB:
214
      logging.info("Received new job")
215
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
216
      return queue.SubmitJob(ops)
217

    
218
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
219
      logging.info("Received multiple jobs")
220
      jobs = []
221
      for ops in args:
222
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
223
      return queue.SubmitManyJobs(jobs)
224

    
225
    elif method == luxi.REQ_CANCEL_JOB:
226
      job_id = args
227
      logging.info("Received job cancel request for %s", job_id)
228
      return queue.CancelJob(job_id)
229

    
230
    elif method == luxi.REQ_ARCHIVE_JOB:
231
      job_id = args
232
      logging.info("Received job archive request for %s", job_id)
233
      return queue.ArchiveJob(job_id)
234

    
235
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
236
      (age, timeout) = args
237
      logging.info("Received job autoarchive request for age %s, timeout %s",
238
                   age, timeout)
239
      return queue.AutoArchiveJobs(age, timeout)
240

    
241
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
242
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
243
      logging.info("Received job poll request for %s", job_id)
244
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
245
                                     prev_log_serial, timeout)
246

    
247
    elif method == luxi.REQ_QUERY_JOBS:
248
      (job_ids, fields) = args
249
      if isinstance(job_ids, (tuple, list)) and job_ids:
250
        msg = ", ".join(job_ids)
251
      else:
252
        msg = str(job_ids)
253
      logging.info("Received job query request for %s", msg)
254
      return queue.QueryJobs(job_ids, fields)
255

    
256
    elif method == luxi.REQ_QUERY_INSTANCES:
257
      (names, fields, use_locking) = args
258
      logging.info("Received instance query request for %s", names)
259
      if use_locking:
260
        raise errors.OpPrereqError("Sync queries are not allowed")
261
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262
                                    use_locking=use_locking)
263
      return self._Query(op)
264

    
265
    elif method == luxi.REQ_QUERY_NODES:
266
      (names, fields, use_locking) = args
267
      logging.info("Received node query request for %s", names)
268
      if use_locking:
269
        raise errors.OpPrereqError("Sync queries are not allowed")
270
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
271
                                use_locking=use_locking)
272
      return self._Query(op)
273

    
274
    elif method == luxi.REQ_QUERY_EXPORTS:
275
      nodes, use_locking = args
276
      if use_locking:
277
        raise errors.OpPrereqError("Sync queries are not allowed")
278
      logging.info("Received exports query request")
279
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
280
      return self._Query(op)
281

    
282
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
283
      fields = args
284
      logging.info("Received config values query request for %s", fields)
285
      op = opcodes.OpQueryConfigValues(output_fields=fields)
286
      return self._Query(op)
287

    
288
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
289
      logging.info("Received cluster info query request")
290
      op = opcodes.OpQueryClusterInfo()
291
      return self._Query(op)
292

    
293
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
294
      drain_flag = args
295
      logging.info("Received queue drain flag change request to %s",
296
                   drain_flag)
297
      return queue.SetDrainFlag(drain_flag)
298

    
299
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
300
      (until, ) = args
301

    
302
      if until is None:
303
        logging.info("Received request to no longer pause the watcher")
304
      else:
305
        if not isinstance(until, (int, float)):
306
          raise TypeError("Duration must be an integer or float")
307

    
308
        if until < time.time():
309
          raise errors.GenericError("Unable to set pause end time in the past")
310

    
311
        logging.info("Received request to pause the watcher until %s", until)
312

    
313
      return _SetWatcherPause(until)
314

    
315
    else:
316
      logging.info("Received invalid request '%s'", method)
317
      raise ValueError("Invalid operation '%s'" % method)
318

    
319
  def _DummyLog(self, *args):
320
    pass
321

    
322
  def _Query(self, op):
323
    """Runs the specified opcode and returns the result.
324

    
325
    """
326
    proc = mcpu.Processor(self.server.context)
327
    # TODO: Where should log messages go?
328
    return proc.ExecOpCode(op, self._DummyLog, None)
329

    
330

    
331
class GanetiContext(object):
332
  """Context common to all ganeti threads.
333

    
334
  This class creates and holds common objects shared by all threads.
335

    
336
  """
337
  _instance = None
338

    
339
  def __init__(self):
340
    """Constructs a new GanetiContext object.
341

    
342
    There should be only a GanetiContext object at any time, so this
343
    function raises an error if this is not the case.
344

    
345
    """
346
    assert self.__class__._instance is None, "double GanetiContext instance"
347

    
348
    # Create global configuration object
349
    self.cfg = config.ConfigWriter()
350

    
351
    # Locking manager
352
    self.glm = locking.GanetiLockManager(
353
                self.cfg.GetNodeList(),
354
                self.cfg.GetInstanceList())
355

    
356
    # Job queue
357
    self.jobqueue = jqueue.JobQueue(self)
358

    
359
    # setting this also locks the class against attribute modifications
360
    self.__class__._instance = self
361

    
362
  def __setattr__(self, name, value):
363
    """Setting GanetiContext attributes is forbidden after initialization.
364

    
365
    """
366
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
367
    object.__setattr__(self, name, value)
368

    
369
  def AddNode(self, node):
370
    """Adds a node to the configuration and lock manager.
371

    
372
    """
373
    # Add it to the configuration
374
    self.cfg.AddNode(node)
375

    
376
    # If preseeding fails it'll not be added
377
    self.jobqueue.AddNode(node)
378

    
379
    # Add the new node to the Ganeti Lock Manager
380
    self.glm.add(locking.LEVEL_NODE, node.name)
381

    
382
  def ReaddNode(self, node):
383
    """Updates a node that's already in the configuration
384

    
385
    """
386
    # Synchronize the queue again
387
    self.jobqueue.AddNode(node)
388

    
389
  def RemoveNode(self, name):
390
    """Removes a node from the configuration and lock manager.
391

    
392
    """
393
    # Remove node from configuration
394
    self.cfg.RemoveNode(name)
395

    
396
    # Notify job queue
397
    self.jobqueue.RemoveNode(name)
398

    
399
    # Remove the node from the Ganeti Lock Manager
400
    self.glm.remove(locking.LEVEL_NODE, name)
401

    
402

    
403
def _SetWatcherPause(until):
404
  """Creates or removes the watcher pause file.
405

    
406
  @type until: None or int
407
  @param until: Unix timestamp saying until when the watcher shouldn't run
408

    
409
  """
410
  if until is None:
411
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
412
  else:
413
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
414
                    data="%d\n" % (until, ))
415

    
416
  return until
417

    
418

    
419
def CheckAgreement():
420
  """Check the agreement on who is the master.
421

    
422
  The function uses a very simple algorithm: we must get more positive
423
  than negative answers. Since in most of the cases we are the master,
424
  we'll use our own config file for getting the node list. In the
425
  future we could collect the current node list from our (possibly
426
  obsolete) known nodes.
427

    
428
  In order to account for cold-start of all nodes, we retry for up to
429
  a minute until we get a real answer as the top-voted one. If the
430
  nodes are more out-of-sync, for now manual startup of the master
431
  should be attempted.
432

    
433
  Note that for a even number of nodes cluster, we need at least half
434
  of the nodes (beside ourselves) to vote for us. This creates a
435
  problem on two-node clusters, since in this case we require the
436
  other node to be up too to confirm our status.
437

    
438
  """
439
  myself = utils.HostInfo().name
440
  #temp instantiation of a config writer, used only to get the node list
441
  cfg = config.ConfigWriter()
442
  node_list = cfg.GetNodeList()
443
  del cfg
444
  retries = 6
445
  while retries > 0:
446
    votes = bootstrap.GatherMasterVotes(node_list)
447
    if not votes:
448
      # empty node list, this is a one node cluster
449
      return True
450
    if votes[0][0] is None:
451
      retries -= 1
452
      time.sleep(10)
453
      continue
454
    break
455
  if retries == 0:
456
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
457
                     " after multiple retries. Aborting startup")
458
    return False
459
  # here a real node is at the top of the list
460
  all_votes = sum(item[1] for item in votes)
461
  top_node, top_votes = votes[0]
462

    
463
  result = False
464
  if top_node != myself:
465
    logging.critical("It seems we are not the master (top-voted node"
466
                     " is %s with %d out of %d votes)", top_node, top_votes,
467
                     all_votes)
468
  elif top_votes < all_votes - top_votes:
469
    logging.critical("It seems we are not the master (%d votes for,"
470
                     " %d votes against)", top_votes, all_votes - top_votes)
471
  else:
472
    result = True
473

    
474
  return result
475

    
476

    
477
def CheckAgreementWithRpc():
478
  rpc.Init()
479
  try:
480
    return CheckAgreement()
481
  finally:
482
    rpc.Shutdown()
483

    
484

    
485
def _RunInSeparateProcess(fn):
486
  """Runs a function in a separate process.
487

    
488
  Note: Only boolean return values are supported.
489

    
490
  @type fn: callable
491
  @param fn: Function to be called
492
  @rtype: bool
493

    
494
  """
495
  pid = os.fork()
496
  if pid == 0:
497
    # Child process
498
    try:
499
      # Call function
500
      result = int(bool(fn()))
501
      assert result in (0, 1)
502
    except:
503
      logging.exception("Error while calling function in separate process")
504
      # 0 and 1 are reserved for the return value
505
      result = 33
506

    
507
    os._exit(result)
508

    
509
  # Parent process
510

    
511
  # Avoid zombies and check exit code
512
  (_, status) = os.waitpid(pid, 0)
513

    
514
  if os.WIFSIGNALED(status):
515
    signum = os.WTERMSIG(status)
516
    exitcode = None
517
  else:
518
    signum = None
519
    exitcode = os.WEXITSTATUS(status)
520

    
521
  if not (exitcode in (0, 1) and signum is None):
522
    logging.error("Child program failed (code=%s, signal=%s)",
523
                  exitcode, signum)
524
    sys.exit(constants.EXIT_FAILURE)
525

    
526
  return bool(exitcode)
527

    
528

    
529
def CheckMasterd(options, args):
530
  """Initial checks whether to run or exit with a failure.
531

    
532
  """
533
  ssconf.CheckMaster(options.debug)
534

    
535
  # If CheckMaster didn't fail we believe we are the master, but we have to
536
  # confirm with the other nodes.
537
  if options.no_voting:
538
    if options.yes_do_it:
539
      return
540

    
541
    sys.stdout.write("The 'no voting' option has been selected.\n")
542
    sys.stdout.write("This is dangerous, please confirm by"
543
                     " typing uppercase 'yes': ")
544
    sys.stdout.flush()
545

    
546
    confirmation = sys.stdin.readline().strip()
547
    if confirmation != "YES":
548
      print >>sys.stderr, "Aborting."
549
      sys.exit(constants.EXIT_FAILURE)
550

    
551
    return
552

    
553
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
554
  # process before we call utils.Daemonize in the current process.
555
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
556
    sys.exit(constants.EXIT_FAILURE)
557

    
558

    
559
def ExecMasterd (options, args):
560
  """Main master daemon function, executed with the PID file held.
561

    
562
  """
563
  # This is safe to do as the pid file guarantees against
564
  # concurrent execution.
565
  utils.RemoveFile(constants.MASTER_SOCKET)
566

    
567
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
568
  try:
569
    rpc.Init()
570
    try:
571
      # activate ip
572
      master_node = ssconf.SimpleStore().GetMasterNode()
573
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
574
      msg = result.RemoteFailMsg()
575
      if msg:
576
        logging.error("Can't activate master IP address: %s", msg)
577

    
578
      master.setup_queue()
579
      try:
580
        master.serve_forever()
581
      finally:
582
        master.server_cleanup()
583
    finally:
584
      rpc.Shutdown()
585
  finally:
586
    utils.RemoveFile(constants.MASTER_SOCKET)
587

    
588

    
589
def main():
590
  """Main function"""
591
  parser = OptionParser(description="Ganeti master daemon",
592
                        usage="%prog [-f] [-d]",
593
                        version="%%prog (ganeti) %s" %
594
                        constants.RELEASE_VERSION)
595
  parser.add_option("--no-voting", dest="no_voting",
596
                    help="Do not check that the nodes agree on this node"
597
                    " being the master and start the daemon unconditionally",
598
                    default=False, action="store_true")
599
  parser.add_option("--yes-do-it", dest="yes_do_it",
600
                    help="Override interactive check for --no-voting",
601
                    default=False, action="store_true")
602
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
603
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
604
         ]
605
  daemon.GenericMain(constants.MASTERD, parser, dirs,
606
                     CheckMasterd, ExecMasterd)
607

    
608

    
609
if __name__ == "__main__":
610
  main()