Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 7260cfbe

History | View | Annotate | Download (18.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import os
33
import sys
34
import SocketServer
35
import time
36
import collections
37
import signal
38
import logging
39

    
40
from optparse import OptionParser
41

    
42
from ganeti import config
43
from ganeti import constants
44
from ganeti import daemon
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56
from ganeti import serializer
57

    
58

    
59
CLIENT_REQUEST_WORKERS = 16
60

    
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63

    
64

    
65
class ClientRequestWorker(workerpool.BaseWorker):
66
   # pylint: disable-msg=W0221
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except: # pylint: disable-msg=W0702
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115
    """Handle one request at a time until told to quit."""
116
    assert isinstance(signal_handlers, dict) and \
117
           len(signal_handlers) > 0, \
118
           "Broken SignalHandled decorator"
119
    # Since we use SignalHandled only once, the resulting dict will map all
120
    # signals to the same handler. We'll just use the first one.
121
    sighandler = signal_handlers.values()[0]
122
    while not sighandler.called:
123
      self.handle_request()
124

    
125
  def server_cleanup(self):
126
    """Cleanup the server.
127

    
128
    This involves shutting down the processor threads and the master
129
    socket.
130

    
131
    """
132
    try:
133
      self.server_close()
134
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137
      if self.context:
138
        self.context.jobqueue.Shutdown()
139

    
140

    
141
class ClientRqHandler(SocketServer.BaseRequestHandler):
142
  """Client handler"""
143
  EOM = '\3'
144
  READ_SIZE = 4096
145

    
146
  def setup(self):
147
    # pylint: disable-msg=W0201
148
    # setup() is the api for initialising for this class
149
    self._buffer = ""
150
    self._msgs = collections.deque()
151
    self._ops = ClientOps(self.server)
152

    
153
  def handle(self):
154
    while True:
155
      msg = self.read_message()
156
      if msg is None:
157
        logging.debug("client closed connection")
158
        break
159

    
160
      request = serializer.LoadJson(msg)
161
      logging.debug("request: %s", request)
162
      if not isinstance(request, dict):
163
        logging.error("wrong request received: %s", msg)
164
        break
165

    
166
      method = request.get(luxi.KEY_METHOD, None)
167
      args = request.get(luxi.KEY_ARGS, None)
168
      if method is None or args is None:
169
        logging.error("no method or args in request")
170
        break
171

    
172
      success = False
173
      try:
174
        result = self._ops.handle_request(method, args)
175
        success = True
176
      except errors.GenericError, err:
177
        success = False
178
        result = errors.EncodeException(err)
179
      except:
180
        logging.error("Unexpected exception", exc_info=True)
181
        err = sys.exc_info()
182
        result = "Caught exception: %s" % str(err[1])
183

    
184
      response = {
185
        luxi.KEY_SUCCESS: success,
186
        luxi.KEY_RESULT: result,
187
        }
188
      logging.debug("response: %s", response)
189
      self.send_message(serializer.DumpJson(response))
190

    
191
  def read_message(self):
192
    while not self._msgs:
193
      data = self.request.recv(self.READ_SIZE)
194
      if not data:
195
        return None
196
      new_msgs = (self._buffer + data).split(self.EOM)
197
      self._buffer = new_msgs.pop()
198
      self._msgs.extend(new_msgs)
199
    return self._msgs.popleft()
200

    
201
  def send_message(self, msg):
202
    #print "sending", msg
203
    # TODO: sendall is not guaranteed to send everything
204
    self.request.sendall(msg + self.EOM)
205

    
206

    
207
class ClientOps:
208
  """Class holding high-level client operations."""
209
  def __init__(self, server):
210
    self.server = server
211

    
212
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213
    queue = self.server.context.jobqueue
214

    
215
    # TODO: Parameter validation
216

    
217
    # TODO: Rewrite to not exit in each 'if/elif' branch
218

    
219
    if method == luxi.REQ_SUBMIT_JOB:
220
      logging.info("Received new job")
221
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222
      return queue.SubmitJob(ops)
223

    
224
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
225
      logging.info("Received multiple jobs")
226
      jobs = []
227
      for ops in args:
228
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229
      return queue.SubmitManyJobs(jobs)
230

    
231
    elif method == luxi.REQ_CANCEL_JOB:
232
      job_id = args
233
      logging.info("Received job cancel request for %s", job_id)
234
      return queue.CancelJob(job_id)
235

    
236
    elif method == luxi.REQ_ARCHIVE_JOB:
237
      job_id = args
238
      logging.info("Received job archive request for %s", job_id)
239
      return queue.ArchiveJob(job_id)
240

    
241
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242
      (age, timeout) = args
243
      logging.info("Received job autoarchive request for age %s, timeout %s",
244
                   age, timeout)
245
      return queue.AutoArchiveJobs(age, timeout)
246

    
247
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249
      logging.info("Received job poll request for %s", job_id)
250
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251
                                     prev_log_serial, timeout)
252

    
253
    elif method == luxi.REQ_QUERY_JOBS:
254
      (job_ids, fields) = args
255
      if isinstance(job_ids, (tuple, list)) and job_ids:
256
        msg = utils.CommaJoin(job_ids)
257
      else:
258
        msg = str(job_ids)
259
      logging.info("Received job query request for %s", msg)
260
      return queue.QueryJobs(job_ids, fields)
261

    
262
    elif method == luxi.REQ_QUERY_INSTANCES:
263
      (names, fields, use_locking) = args
264
      logging.info("Received instance query request for %s", names)
265
      if use_locking:
266
        raise errors.OpPrereqError("Sync queries are not allowed",
267
                                   errors.ECODE_INVAL)
268
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269
                                    use_locking=use_locking)
270
      return self._Query(op)
271

    
272
    elif method == luxi.REQ_QUERY_NODES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received node query request for %s", names)
275
      if use_locking:
276
        raise errors.OpPrereqError("Sync queries are not allowed",
277
                                   errors.ECODE_INVAL)
278
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279
                                use_locking=use_locking)
280
      return self._Query(op)
281

    
282
    elif method == luxi.REQ_QUERY_EXPORTS:
283
      nodes, use_locking = args
284
      if use_locking:
285
        raise errors.OpPrereqError("Sync queries are not allowed",
286
                                   errors.ECODE_INVAL)
287
      logging.info("Received exports query request")
288
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289
      return self._Query(op)
290

    
291
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292
      fields = args
293
      logging.info("Received config values query request for %s", fields)
294
      op = opcodes.OpQueryConfigValues(output_fields=fields)
295
      return self._Query(op)
296

    
297
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298
      logging.info("Received cluster info query request")
299
      op = opcodes.OpQueryClusterInfo()
300
      return self._Query(op)
301

    
302
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
303
      drain_flag = args
304
      logging.info("Received queue drain flag change request to %s",
305
                   drain_flag)
306
      return queue.SetDrainFlag(drain_flag)
307

    
308
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
309
      (until, ) = args
310

    
311
      if until is None:
312
        logging.info("Received request to no longer pause the watcher")
313
      else:
314
        if not isinstance(until, (int, float)):
315
          raise TypeError("Duration must be an integer or float")
316

    
317
        if until < time.time():
318
          raise errors.GenericError("Unable to set pause end time in the past")
319

    
320
        logging.info("Received request to pause the watcher until %s", until)
321

    
322
      return _SetWatcherPause(until)
323

    
324
    else:
325
      logging.info("Received invalid request '%s'", method)
326
      raise ValueError("Invalid operation '%s'" % method)
327

    
328
  def _Query(self, op):
329
    """Runs the specified opcode and returns the result.
330

    
331
    """
332
    # Queries don't have a job id
333
    proc = mcpu.Processor(self.server.context, None)
334
    return proc.ExecOpCode(op, None)
335

    
336

    
337
class GanetiContext(object):
338
  """Context common to all ganeti threads.
339

    
340
  This class creates and holds common objects shared by all threads.
341

    
342
  """
343
  # pylint: disable-msg=W0212
344
  # we do want to ensure a singleton here
345
  _instance = None
346

    
347
  def __init__(self):
348
    """Constructs a new GanetiContext object.
349

    
350
    There should be only a GanetiContext object at any time, so this
351
    function raises an error if this is not the case.
352

    
353
    """
354
    assert self.__class__._instance is None, "double GanetiContext instance"
355

    
356
    # Create global configuration object
357
    self.cfg = config.ConfigWriter()
358

    
359
    # Locking manager
360
    self.glm = locking.GanetiLockManager(
361
                self.cfg.GetNodeList(),
362
                self.cfg.GetInstanceList())
363

    
364
    # Job queue
365
    self.jobqueue = jqueue.JobQueue(self)
366

    
367
    # setting this also locks the class against attribute modifications
368
    self.__class__._instance = self
369

    
370
  def __setattr__(self, name, value):
371
    """Setting GanetiContext attributes is forbidden after initialization.
372

    
373
    """
374
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
375
    object.__setattr__(self, name, value)
376

    
377
  def AddNode(self, node, ec_id):
378
    """Adds a node to the configuration and lock manager.
379

    
380
    """
381
    # Add it to the configuration
382
    self.cfg.AddNode(node, ec_id)
383

    
384
    # If preseeding fails it'll not be added
385
    self.jobqueue.AddNode(node)
386

    
387
    # Add the new node to the Ganeti Lock Manager
388
    self.glm.add(locking.LEVEL_NODE, node.name)
389

    
390
  def ReaddNode(self, node):
391
    """Updates a node that's already in the configuration
392

    
393
    """
394
    # Synchronize the queue again
395
    self.jobqueue.AddNode(node)
396

    
397
  def RemoveNode(self, name):
398
    """Removes a node from the configuration and lock manager.
399

    
400
    """
401
    # Remove node from configuration
402
    self.cfg.RemoveNode(name)
403

    
404
    # Notify job queue
405
    self.jobqueue.RemoveNode(name)
406

    
407
    # Remove the node from the Ganeti Lock Manager
408
    self.glm.remove(locking.LEVEL_NODE, name)
409

    
410

    
411
def _SetWatcherPause(until):
412
  """Creates or removes the watcher pause file.
413

    
414
  @type until: None or int
415
  @param until: Unix timestamp saying until when the watcher shouldn't run
416

    
417
  """
418
  if until is None:
419
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
420
  else:
421
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
422
                    data="%d\n" % (until, ))
423

    
424
  return until
425

    
426

    
427
def CheckAgreement():
428
  """Check the agreement on who is the master.
429

    
430
  The function uses a very simple algorithm: we must get more positive
431
  than negative answers. Since in most of the cases we are the master,
432
  we'll use our own config file for getting the node list. In the
433
  future we could collect the current node list from our (possibly
434
  obsolete) known nodes.
435

    
436
  In order to account for cold-start of all nodes, we retry for up to
437
  a minute until we get a real answer as the top-voted one. If the
438
  nodes are more out-of-sync, for now manual startup of the master
439
  should be attempted.
440

    
441
  Note that for a even number of nodes cluster, we need at least half
442
  of the nodes (beside ourselves) to vote for us. This creates a
443
  problem on two-node clusters, since in this case we require the
444
  other node to be up too to confirm our status.
445

    
446
  """
447
  myself = utils.HostInfo().name
448
  #temp instantiation of a config writer, used only to get the node list
449
  cfg = config.ConfigWriter()
450
  node_list = cfg.GetNodeList()
451
  del cfg
452
  retries = 6
453
  while retries > 0:
454
    votes = bootstrap.GatherMasterVotes(node_list)
455
    if not votes:
456
      # empty node list, this is a one node cluster
457
      return True
458
    if votes[0][0] is None:
459
      retries -= 1
460
      time.sleep(10)
461
      continue
462
    break
463
  if retries == 0:
464
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
465
                     " after multiple retries. Aborting startup")
466
    return False
467
  # here a real node is at the top of the list
468
  all_votes = sum(item[1] for item in votes)
469
  top_node, top_votes = votes[0]
470

    
471
  result = False
472
  if top_node != myself:
473
    logging.critical("It seems we are not the master (top-voted node"
474
                     " is %s with %d out of %d votes)", top_node, top_votes,
475
                     all_votes)
476
  elif top_votes < all_votes - top_votes:
477
    logging.critical("It seems we are not the master (%d votes for,"
478
                     " %d votes against)", top_votes, all_votes - top_votes)
479
  else:
480
    result = True
481

    
482
  return result
483

    
484

    
485
def CheckAgreementWithRpc():
486
  rpc.Init()
487
  try:
488
    return CheckAgreement()
489
  finally:
490
    rpc.Shutdown()
491

    
492

    
493
def _RunInSeparateProcess(fn):
494
  """Runs a function in a separate process.
495

    
496
  Note: Only boolean return values are supported.
497

    
498
  @type fn: callable
499
  @param fn: Function to be called
500
  @rtype: bool
501

    
502
  """
503
  pid = os.fork()
504
  if pid == 0:
505
    # Child process
506
    try:
507
      # Call function
508
      result = int(bool(fn()))
509
      assert result in (0, 1)
510
    except: # pylint: disable-msg=W0702
511
      logging.exception("Error while calling function in separate process")
512
      # 0 and 1 are reserved for the return value
513
      result = 33
514

    
515
    os._exit(result) # pylint: disable-msg=W0212
516

    
517
  # Parent process
518

    
519
  # Avoid zombies and check exit code
520
  (_, status) = os.waitpid(pid, 0)
521

    
522
  if os.WIFSIGNALED(status):
523
    signum = os.WTERMSIG(status)
524
    exitcode = None
525
  else:
526
    signum = None
527
    exitcode = os.WEXITSTATUS(status)
528

    
529
  if not (exitcode in (0, 1) and signum is None):
530
    logging.error("Child program failed (code=%s, signal=%s)",
531
                  exitcode, signum)
532
    sys.exit(constants.EXIT_FAILURE)
533

    
534
  return bool(exitcode)
535

    
536

    
537
def CheckMasterd(options, args):
538
  """Initial checks whether to run or exit with a failure.
539

    
540
  """
541
  ssconf.CheckMaster(options.debug)
542

    
543
  # If CheckMaster didn't fail we believe we are the master, but we have to
544
  # confirm with the other nodes.
545
  if options.no_voting:
546
    if options.yes_do_it:
547
      return
548

    
549
    sys.stdout.write("The 'no voting' option has been selected.\n")
550
    sys.stdout.write("This is dangerous, please confirm by"
551
                     " typing uppercase 'yes': ")
552
    sys.stdout.flush()
553

    
554
    confirmation = sys.stdin.readline().strip()
555
    if confirmation != "YES":
556
      print >> sys.stderr, "Aborting."
557
      sys.exit(constants.EXIT_FAILURE)
558

    
559
    return
560

    
561
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
562
  # process before we call utils.Daemonize in the current process.
563
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
564
    sys.exit(constants.EXIT_FAILURE)
565

    
566

    
567
def ExecMasterd (options, args):
568
  """Main master daemon function, executed with the PID file held.
569

    
570
  """
571
  # This is safe to do as the pid file guarantees against
572
  # concurrent execution.
573
  utils.RemoveFile(constants.MASTER_SOCKET)
574

    
575
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
576
  try:
577
    rpc.Init()
578
    try:
579
      # activate ip
580
      master_node = ssconf.SimpleStore().GetMasterNode()
581
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
582
      msg = result.fail_msg
583
      if msg:
584
        logging.error("Can't activate master IP address: %s", msg)
585

    
586
      master.setup_queue()
587
      try:
588
        master.serve_forever()
589
      finally:
590
        master.server_cleanup()
591
    finally:
592
      rpc.Shutdown()
593
  finally:
594
    utils.RemoveFile(constants.MASTER_SOCKET)
595

    
596

    
597
def main():
598
  """Main function"""
599
  parser = OptionParser(description="Ganeti master daemon",
600
                        usage="%prog [-f] [-d]",
601
                        version="%%prog (ganeti) %s" %
602
                        constants.RELEASE_VERSION)
603
  parser.add_option("--no-voting", dest="no_voting",
604
                    help="Do not check that the nodes agree on this node"
605
                    " being the master and start the daemon unconditionally",
606
                    default=False, action="store_true")
607
  parser.add_option("--yes-do-it", dest="yes_do_it",
608
                    help="Override interactive check for --no-voting",
609
                    default=False, action="store_true")
610
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
611
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
612
         ]
613
  daemon.GenericMain(constants.MASTERD, parser, dirs,
614
                     CheckMasterd, ExecMasterd)
615

    
616

    
617
if __name__ == "__main__":
618
  main()