Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 3cebe102

History | View | Annotate | Download (17.8 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import sys
32
import SocketServer
33
import time
34
import collections
35
import signal
36
import logging
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import daemon
43
from ganeti import mcpu
44
from ganeti import opcodes
45
from ganeti import jqueue
46
from ganeti import locking
47
from ganeti import luxi
48
from ganeti import utils
49
from ganeti import errors
50
from ganeti import ssconf
51
from ganeti import workerpool
52
from ganeti import rpc
53
from ganeti import bootstrap
54
from ganeti import serializer
55

    
56

    
57
CLIENT_REQUEST_WORKERS = 16
58

    
59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61

    
62

    
63
class ClientRequestWorker(workerpool.BaseWorker):
64
  def RunTask(self, server, request, client_address):
65
    """Process the request.
66

    
67
    This is copied from the code in ThreadingMixIn.
68

    
69
    """
70
    try:
71
      server.finish_request(request, client_address)
72
      server.close_request(request)
73
    except:
74
      server.handle_error(request, client_address)
75
      server.close_request(request)
76

    
77

    
78
class IOServer(SocketServer.UnixStreamServer):
79
  """IO thread class.
80

    
81
  This class takes care of initializing the other threads, setting
82
  signal handlers (which are processed only in this thread), and doing
83
  cleanup at shutdown.
84

    
85
  """
86
  def __init__(self, address, rqhandler):
87
    """IOServer constructor
88

    
89
    @param address: the address to bind this IOServer to
90
    @param rqhandler: RequestHandler type object
91

    
92
    """
93
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94

    
95
    # We'll only start threads once we've forked.
96
    self.context = None
97
    self.request_workers = None
98

    
99
  def setup_queue(self):
100
    self.context = GanetiContext()
101
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
102
                                                 ClientRequestWorker)
103

    
104
  def process_request(self, request, client_address):
105
    """Add task to workerpool to process request.
106

    
107
    """
108
    self.request_workers.AddTask(self, request, client_address)
109

    
110
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
111
  def serve_forever(self, signal_handlers=None):
112
    """Handle one request at a time until told to quit."""
113
    assert isinstance(signal_handlers, dict) and \
114
           len(signal_handlers) > 0, \
115
           "Broken SignalHandled decorator"
116
    # Since we use SignalHandled only once, the resulting dict will map all
117
    # signals to the same handler. We'll just use the first one.
118
    sighandler = signal_handlers.values()[0]
119
    while not sighandler.called:
120
      self.handle_request()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = serializer.LoadJson(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = errors.EncodeException(err)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(serializer.DumpJson(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    # TODO: sendall is not guaranteed to send everything
199
    self.request.sendall(msg + self.EOM)
200

    
201

    
202
class ClientOps:
203
  """Class holding high-level client operations."""
204
  def __init__(self, server):
205
    self.server = server
206

    
207
  def handle_request(self, method, args):
208
    queue = self.server.context.jobqueue
209

    
210
    # TODO: Parameter validation
211

    
212
    if method == luxi.REQ_SUBMIT_JOB:
213
      logging.info("Received new job")
214
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215
      return queue.SubmitJob(ops)
216

    
217
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
218
      logging.info("Received multiple jobs")
219
      jobs = []
220
      for ops in args:
221
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222
      return queue.SubmitManyJobs(jobs)
223

    
224
    elif method == luxi.REQ_CANCEL_JOB:
225
      job_id = args
226
      logging.info("Received job cancel request for %s", job_id)
227
      return queue.CancelJob(job_id)
228

    
229
    elif method == luxi.REQ_ARCHIVE_JOB:
230
      job_id = args
231
      logging.info("Received job archive request for %s", job_id)
232
      return queue.ArchiveJob(job_id)
233

    
234
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
235
      (age, timeout) = args
236
      logging.info("Received job autoarchive request for age %s, timeout %s",
237
                   age, timeout)
238
      return queue.AutoArchiveJobs(age, timeout)
239

    
240
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
241
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
242
      logging.info("Received job poll request for %s", job_id)
243
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
244
                                     prev_log_serial, timeout)
245

    
246
    elif method == luxi.REQ_QUERY_JOBS:
247
      (job_ids, fields) = args
248
      if isinstance(job_ids, (tuple, list)) and job_ids:
249
        msg = ", ".join(job_ids)
250
      else:
251
        msg = str(job_ids)
252
      logging.info("Received job query request for %s", msg)
253
      return queue.QueryJobs(job_ids, fields)
254

    
255
    elif method == luxi.REQ_QUERY_INSTANCES:
256
      (names, fields, use_locking) = args
257
      logging.info("Received instance query request for %s", names)
258
      if use_locking:
259
        raise errors.OpPrereqError("Sync queries are not allowed")
260
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
261
                                    use_locking=use_locking)
262
      return self._Query(op)
263

    
264
    elif method == luxi.REQ_QUERY_NODES:
265
      (names, fields, use_locking) = args
266
      logging.info("Received node query request for %s", names)
267
      if use_locking:
268
        raise errors.OpPrereqError("Sync queries are not allowed")
269
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
270
                                use_locking=use_locking)
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUERY_EXPORTS:
274
      nodes, use_locking = args
275
      if use_locking:
276
        raise errors.OpPrereqError("Sync queries are not allowed")
277
      logging.info("Received exports query request")
278
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
279
      return self._Query(op)
280

    
281
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
282
      fields = args
283
      logging.info("Received config values query request for %s", fields)
284
      op = opcodes.OpQueryConfigValues(output_fields=fields)
285
      return self._Query(op)
286

    
287
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
288
      logging.info("Received cluster info query request")
289
      op = opcodes.OpQueryClusterInfo()
290
      return self._Query(op)
291

    
292
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
293
      drain_flag = args
294
      logging.info("Received queue drain flag change request to %s",
295
                   drain_flag)
296
      return queue.SetDrainFlag(drain_flag)
297

    
298
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
299
      (until, ) = args
300

    
301
      if until is None:
302
        logging.info("Received request to no longer pause the watcher")
303
      else:
304
        if not isinstance(until, (int, float)):
305
          raise TypeError("Duration must be an integer or float")
306

    
307
        if until < time.time():
308
          raise errors.GenericError("Unable to set pause end time in the past")
309

    
310
        logging.info("Received request to pause the watcher until %s", until)
311

    
312
      return _SetWatcherPause(until)
313

    
314
    else:
315
      logging.info("Received invalid request '%s'", method)
316
      raise ValueError("Invalid operation '%s'" % method)
317

    
318
  def _Query(self, op):
319
    """Runs the specified opcode and returns the result.
320

    
321
    """
322
    proc = mcpu.Processor(self.server.context)
323
    return proc.ExecOpCode(op, None)
324

    
325

    
326
class GanetiContext(object):
327
  """Context common to all ganeti threads.
328

    
329
  This class creates and holds common objects shared by all threads.
330

    
331
  """
332
  _instance = None
333

    
334
  def __init__(self):
335
    """Constructs a new GanetiContext object.
336

    
337
    There should be only a GanetiContext object at any time, so this
338
    function raises an error if this is not the case.
339

    
340
    """
341
    assert self.__class__._instance is None, "double GanetiContext instance"
342

    
343
    # Create global configuration object
344
    self.cfg = config.ConfigWriter()
345

    
346
    # Locking manager
347
    self.glm = locking.GanetiLockManager(
348
                self.cfg.GetNodeList(),
349
                self.cfg.GetInstanceList())
350

    
351
    # Job queue
352
    self.jobqueue = jqueue.JobQueue(self)
353

    
354
    # setting this also locks the class against attribute modifications
355
    self.__class__._instance = self
356

    
357
  def __setattr__(self, name, value):
358
    """Setting GanetiContext attributes is forbidden after initialization.
359

    
360
    """
361
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
362
    object.__setattr__(self, name, value)
363

    
364
  def AddNode(self, node):
365
    """Adds a node to the configuration and lock manager.
366

    
367
    """
368
    # Add it to the configuration
369
    self.cfg.AddNode(node)
370

    
371
    # If preseeding fails it'll not be added
372
    self.jobqueue.AddNode(node)
373

    
374
    # Add the new node to the Ganeti Lock Manager
375
    self.glm.add(locking.LEVEL_NODE, node.name)
376

    
377
  def ReaddNode(self, node):
378
    """Updates a node that's already in the configuration
379

    
380
    """
381
    # Synchronize the queue again
382
    self.jobqueue.AddNode(node)
383

    
384
  def RemoveNode(self, name):
385
    """Removes a node from the configuration and lock manager.
386

    
387
    """
388
    # Remove node from configuration
389
    self.cfg.RemoveNode(name)
390

    
391
    # Notify job queue
392
    self.jobqueue.RemoveNode(name)
393

    
394
    # Remove the node from the Ganeti Lock Manager
395
    self.glm.remove(locking.LEVEL_NODE, name)
396

    
397

    
398
def _SetWatcherPause(until):
399
  """Creates or removes the watcher pause file.
400

    
401
  @type until: None or int
402
  @param until: Unix timestamp saying until when the watcher shouldn't run
403

    
404
  """
405
  if until is None:
406
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
407
  else:
408
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
409
                    data="%d\n" % (until, ))
410

    
411
  return until
412

    
413

    
414
def CheckAgreement():
415
  """Check the agreement on who is the master.
416

    
417
  The function uses a very simple algorithm: we must get more positive
418
  than negative answers. Since in most of the cases we are the master,
419
  we'll use our own config file for getting the node list. In the
420
  future we could collect the current node list from our (possibly
421
  obsolete) known nodes.
422

    
423
  In order to account for cold-start of all nodes, we retry for up to
424
  a minute until we get a real answer as the top-voted one. If the
425
  nodes are more out-of-sync, for now manual startup of the master
426
  should be attempted.
427

    
428
  Note that for a even number of nodes cluster, we need at least half
429
  of the nodes (beside ourselves) to vote for us. This creates a
430
  problem on two-node clusters, since in this case we require the
431
  other node to be up too to confirm our status.
432

    
433
  """
434
  myself = utils.HostInfo().name
435
  #temp instantiation of a config writer, used only to get the node list
436
  cfg = config.ConfigWriter()
437
  node_list = cfg.GetNodeList()
438
  del cfg
439
  retries = 6
440
  while retries > 0:
441
    votes = bootstrap.GatherMasterVotes(node_list)
442
    if not votes:
443
      # empty node list, this is a one node cluster
444
      return True
445
    if votes[0][0] is None:
446
      retries -= 1
447
      time.sleep(10)
448
      continue
449
    break
450
  if retries == 0:
451
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
452
                     " after multiple retries. Aborting startup")
453
    return False
454
  # here a real node is at the top of the list
455
  all_votes = sum(item[1] for item in votes)
456
  top_node, top_votes = votes[0]
457

    
458
  result = False
459
  if top_node != myself:
460
    logging.critical("It seems we are not the master (top-voted node"
461
                     " is %s with %d out of %d votes)", top_node, top_votes,
462
                     all_votes)
463
  elif top_votes < all_votes - top_votes:
464
    logging.critical("It seems we are not the master (%d votes for,"
465
                     " %d votes against)", top_votes, all_votes - top_votes)
466
  else:
467
    result = True
468

    
469
  return result
470

    
471

    
472
def CheckAgreementWithRpc():
473
  rpc.Init()
474
  try:
475
    return CheckAgreement()
476
  finally:
477
    rpc.Shutdown()
478

    
479

    
480
def _RunInSeparateProcess(fn):
481
  """Runs a function in a separate process.
482

    
483
  Note: Only boolean return values are supported.
484

    
485
  @type fn: callable
486
  @param fn: Function to be called
487
  @rtype: bool
488

    
489
  """
490
  pid = os.fork()
491
  if pid == 0:
492
    # Child process
493
    try:
494
      # Call function
495
      result = int(bool(fn()))
496
      assert result in (0, 1)
497
    except:
498
      logging.exception("Error while calling function in separate process")
499
      # 0 and 1 are reserved for the return value
500
      result = 33
501

    
502
    os._exit(result)
503

    
504
  # Parent process
505

    
506
  # Avoid zombies and check exit code
507
  (_, status) = os.waitpid(pid, 0)
508

    
509
  if os.WIFSIGNALED(status):
510
    signum = os.WTERMSIG(status)
511
    exitcode = None
512
  else:
513
    signum = None
514
    exitcode = os.WEXITSTATUS(status)
515

    
516
  if not (exitcode in (0, 1) and signum is None):
517
    logging.error("Child program failed (code=%s, signal=%s)",
518
                  exitcode, signum)
519
    sys.exit(constants.EXIT_FAILURE)
520

    
521
  return bool(exitcode)
522

    
523

    
524
def CheckMasterd(options, args):
525
  """Initial checks whether to run or exit with a failure.
526

    
527
  """
528
  ssconf.CheckMaster(options.debug)
529

    
530
  # If CheckMaster didn't fail we believe we are the master, but we have to
531
  # confirm with the other nodes.
532
  if options.no_voting:
533
    if options.yes_do_it:
534
      return
535

    
536
    sys.stdout.write("The 'no voting' option has been selected.\n")
537
    sys.stdout.write("This is dangerous, please confirm by"
538
                     " typing uppercase 'yes': ")
539
    sys.stdout.flush()
540

    
541
    confirmation = sys.stdin.readline().strip()
542
    if confirmation != "YES":
543
      print >>sys.stderr, "Aborting."
544
      sys.exit(constants.EXIT_FAILURE)
545

    
546
    return
547

    
548
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
549
  # process before we call utils.Daemonize in the current process.
550
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
551
    sys.exit(constants.EXIT_FAILURE)
552

    
553

    
554
def ExecMasterd (options, args):
555
  """Main master daemon function, executed with the PID file held.
556

    
557
  """
558
  # This is safe to do as the pid file guarantees against
559
  # concurrent execution.
560
  utils.RemoveFile(constants.MASTER_SOCKET)
561

    
562
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
563
  try:
564
    rpc.Init()
565
    try:
566
      # activate ip
567
      master_node = ssconf.SimpleStore().GetMasterNode()
568
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
569
      msg = result.fail_msg
570
      if msg:
571
        logging.error("Can't activate master IP address: %s", msg)
572

    
573
      master.setup_queue()
574
      try:
575
        master.serve_forever()
576
      finally:
577
        master.server_cleanup()
578
    finally:
579
      rpc.Shutdown()
580
  finally:
581
    utils.RemoveFile(constants.MASTER_SOCKET)
582

    
583

    
584
def main():
585
  """Main function"""
586
  parser = OptionParser(description="Ganeti master daemon",
587
                        usage="%prog [-f] [-d]",
588
                        version="%%prog (ganeti) %s" %
589
                        constants.RELEASE_VERSION)
590
  parser.add_option("--no-voting", dest="no_voting",
591
                    help="Do not check that the nodes agree on this node"
592
                    " being the master and start the daemon unconditionally",
593
                    default=False, action="store_true")
594
  parser.add_option("--yes-do-it", dest="yes_do_it",
595
                    help="Override interactive check for --no-voting",
596
                    default=False, action="store_true")
597
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
598
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
599
         ]
600
  daemon.GenericMain(constants.MASTERD, parser, dirs,
601
                     CheckMasterd, ExecMasterd)
602

    
603

    
604
if __name__ == "__main__":
605
  main()