Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 56d8ff91

History | View | Annotate | Download (16.2 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import serializer
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except:
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = serializer.LoadJson(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = (err.__class__.__name__, err.args)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(serializer.DumpJson(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    # TODO: sendall is not guaranteed to send everything
199
    self.request.sendall(msg + self.EOM)
200

    
201

    
202
class ClientOps:
203
  """Class holding high-level client operations."""
204
  def __init__(self, server):
205
    self.server = server
206

    
207
  def handle_request(self, method, args):
208
    queue = self.server.context.jobqueue
209

    
210
    # TODO: Parameter validation
211

    
212
    if method == luxi.REQ_SUBMIT_JOB:
213
      logging.info("Received new job")
214
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215
      return queue.SubmitJob(ops)
216

    
217
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
218
      logging.info("Received multiple jobs")
219
      jobs = []
220
      for ops in args:
221
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222
      return queue.SubmitManyJobs(jobs)
223

    
224
    elif method == luxi.REQ_CANCEL_JOB:
225
      job_id = args
226
      logging.info("Received job cancel request for %s", job_id)
227
      return queue.CancelJob(job_id)
228

    
229
    elif method == luxi.REQ_ARCHIVE_JOB:
230
      job_id = args
231
      logging.info("Received job archive request for %s", job_id)
232
      return queue.ArchiveJob(job_id)
233

    
234
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
235
      (age, timeout) = args
236
      logging.info("Received job autoarchive request for age %s, timeout %s",
237
                   age, timeout)
238
      return queue.AutoArchiveJobs(age, timeout)
239

    
240
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
241
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
242
      logging.info("Received job poll request for %s", job_id)
243
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
244
                                     prev_log_serial, timeout)
245

    
246
    elif method == luxi.REQ_QUERY_JOBS:
247
      (job_ids, fields) = args
248
      if isinstance(job_ids, (tuple, list)) and job_ids:
249
        msg = ", ".join(job_ids)
250
      else:
251
        msg = str(job_ids)
252
      logging.info("Received job query request for %s", msg)
253
      return queue.QueryJobs(job_ids, fields)
254

    
255
    elif method == luxi.REQ_QUERY_INSTANCES:
256
      (names, fields, use_locking) = args
257
      logging.info("Received instance query request for %s", names)
258
      if use_locking:
259
        raise errors.OpPrereqError("Sync queries are not allowed")
260
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
261
                                    use_locking=use_locking)
262
      return self._Query(op)
263

    
264
    elif method == luxi.REQ_QUERY_NODES:
265
      (names, fields, use_locking) = args
266
      logging.info("Received node query request for %s", names)
267
      if use_locking:
268
        raise errors.OpPrereqError("Sync queries are not allowed")
269
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
270
                                use_locking=use_locking)
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUERY_EXPORTS:
274
      nodes, use_locking = args
275
      if use_locking:
276
        raise errors.OpPrereqError("Sync queries are not allowed")
277
      logging.info("Received exports query request")
278
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
279
      return self._Query(op)
280

    
281
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
282
      fields = args
283
      logging.info("Received config values query request for %s", fields)
284
      op = opcodes.OpQueryConfigValues(output_fields=fields)
285
      return self._Query(op)
286

    
287
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
288
      logging.info("Received cluster info query request")
289
      op = opcodes.OpQueryClusterInfo()
290
      return self._Query(op)
291

    
292
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
293
      drain_flag = args
294
      logging.info("Received queue drain flag change request to %s",
295
                   drain_flag)
296
      return queue.SetDrainFlag(drain_flag)
297

    
298
    else:
299
      logging.info("Received invalid request '%s'", method)
300
      raise ValueError("Invalid operation '%s'" % method)
301

    
302
  def _DummyLog(self, *args):
303
    pass
304

    
305
  def _Query(self, op):
306
    """Runs the specified opcode and returns the result.
307

    
308
    """
309
    proc = mcpu.Processor(self.server.context)
310
    # TODO: Where should log messages go?
311
    return proc.ExecOpCode(op, self._DummyLog, None)
312

    
313

    
314
class GanetiContext(object):
315
  """Context common to all ganeti threads.
316

    
317
  This class creates and holds common objects shared by all threads.
318

    
319
  """
320
  _instance = None
321

    
322
  def __init__(self):
323
    """Constructs a new GanetiContext object.
324

    
325
    There should be only a GanetiContext object at any time, so this
326
    function raises an error if this is not the case.
327

    
328
    """
329
    assert self.__class__._instance is None, "double GanetiContext instance"
330

    
331
    # Create global configuration object
332
    self.cfg = config.ConfigWriter()
333

    
334
    # Locking manager
335
    self.glm = locking.GanetiLockManager(
336
                self.cfg.GetNodeList(),
337
                self.cfg.GetInstanceList())
338

    
339
    # Job queue
340
    self.jobqueue = jqueue.JobQueue(self)
341

    
342
    # setting this also locks the class against attribute modifications
343
    self.__class__._instance = self
344

    
345
  def __setattr__(self, name, value):
346
    """Setting GanetiContext attributes is forbidden after initialization.
347

    
348
    """
349
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
350
    object.__setattr__(self, name, value)
351

    
352
  def AddNode(self, node):
353
    """Adds a node to the configuration and lock manager.
354

    
355
    """
356
    # Add it to the configuration
357
    self.cfg.AddNode(node)
358

    
359
    # If preseeding fails it'll not be added
360
    self.jobqueue.AddNode(node)
361

    
362
    # Add the new node to the Ganeti Lock Manager
363
    self.glm.add(locking.LEVEL_NODE, node.name)
364

    
365
  def ReaddNode(self, node):
366
    """Updates a node that's already in the configuration
367

    
368
    """
369
    # Synchronize the queue again
370
    self.jobqueue.AddNode(node)
371

    
372
  def RemoveNode(self, name):
373
    """Removes a node from the configuration and lock manager.
374

    
375
    """
376
    # Remove node from configuration
377
    self.cfg.RemoveNode(name)
378

    
379
    # Notify job queue
380
    self.jobqueue.RemoveNode(name)
381

    
382
    # Remove the node from the Ganeti Lock Manager
383
    self.glm.remove(locking.LEVEL_NODE, name)
384

    
385

    
386
def ParseOptions():
387
  """Parse the command line options.
388

    
389
  @return: (options, args) as from OptionParser.parse_args()
390

    
391
  """
392
  parser = OptionParser(description="Ganeti master daemon",
393
                        usage="%prog [-f] [-d]",
394
                        version="%%prog (ganeti) %s" %
395
                        constants.RELEASE_VERSION)
396

    
397
  parser.add_option("-f", "--foreground", dest="fork",
398
                    help="Don't detach from the current terminal",
399
                    default=True, action="store_false")
400
  parser.add_option("-d", "--debug", dest="debug",
401
                    help="Enable some debug messages",
402
                    default=False, action="store_true")
403
  parser.add_option("--no-voting", dest="no_voting",
404
                    help="Do not check that the nodes agree on this node"
405
                    " being the master and start the daemon unconditionally",
406
                    default=False, action="store_true")
407
  parser.add_option("--yes-do-it", dest="yes_do_it",
408
                    help="Override interactive check for --no-voting",
409
                    default=False, action="store_true")
410

    
411
  options, args = parser.parse_args()
412
  return options, args
413

    
414

    
415
def CheckAgreement():
416
  """Check the agreement on who is the master.
417

    
418
  The function uses a very simple algorithm: we must get more positive
419
  than negative answers. Since in most of the cases we are the master,
420
  we'll use our own config file for getting the node list. In the
421
  future we could collect the current node list from our (possibly
422
  obsolete) known nodes.
423

    
424
  In order to account for cold-start of all nodes, we retry for up to
425
  a minute until we get a real answer as the top-voted one. If the
426
  nodes are more out-of-sync, for now manual startup of the master
427
  should be attempted.
428

    
429
  Note that for a even number of nodes cluster, we need at least half
430
  of the nodes (beside ourselves) to vote for us. This creates a
431
  problem on two-node clusters, since in this case we require the
432
  other node to be up too to confirm our status.
433

    
434
  """
435
  myself = utils.HostInfo().name
436
  #temp instantiation of a config writer, used only to get the node list
437
  cfg = config.ConfigWriter()
438
  node_list = cfg.GetNodeList()
439
  del cfg
440
  retries = 6
441
  while retries > 0:
442
    votes = bootstrap.GatherMasterVotes(node_list)
443
    if not votes:
444
      # empty node list, this is a one node cluster
445
      return True
446
    if votes[0][0] is None:
447
      retries -= 1
448
      time.sleep(10)
449
      continue
450
    break
451
  if retries == 0:
452
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
453
                     " after multiple retries. Aborting startup")
454
    return False
455
  # here a real node is at the top of the list
456
  all_votes = sum(item[1] for item in votes)
457
  top_node, top_votes = votes[0]
458
  result = False
459
  if top_node != myself:
460
    logging.critical("It seems we are not the master (top-voted node"
461
                     " is %s with %d out of %d votes)", top_node, top_votes,
462
                     all_votes)
463
  elif top_votes < all_votes - top_votes:
464
    logging.critical("It seems we are not the master (%d votes for,"
465
                     " %d votes against)", top_votes, all_votes - top_votes)
466
  else:
467
    result = True
468

    
469
  return result
470

    
471

    
472
def main():
473
  """Main function"""
474

    
475
  options, args = ParseOptions()
476
  utils.debug = options.debug
477
  utils.no_fork = True
478

    
479
  if options.fork:
480
    utils.CloseFDs()
481

    
482
  rpc.Init()
483
  try:
484
    ssconf.CheckMaster(options.debug)
485

    
486
    # we believe we are the master, let's ask the other nodes...
487
    if options.no_voting and not options.yes_do_it:
488
      sys.stdout.write("The 'no voting' option has been selected.\n")
489
      sys.stdout.write("This is dangerous, please confirm by"
490
                       " typing uppercase 'yes': ")
491
      sys.stdout.flush()
492
      confirmation = sys.stdin.readline().strip()
493
      if confirmation != "YES":
494
        print "Aborting."
495
        return
496
    elif not options.no_voting:
497
      if not CheckAgreement():
498
        return
499

    
500
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
501
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
502
           ]
503
    utils.EnsureDirs(dirs)
504

    
505
    # This is safe to do as the pid file guarantees against
506
    # concurrent execution.
507
    utils.RemoveFile(constants.MASTER_SOCKET)
508

    
509
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
510
  finally:
511
    rpc.Shutdown()
512

    
513
  # become a daemon
514
  if options.fork:
515
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
516

    
517
  utils.WritePidFile(constants.MASTERD_PID)
518
  try:
519
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
520
                       stderr_logging=not options.fork, multithreaded=True)
521

    
522
    logging.info("Ganeti master daemon startup")
523

    
524
    rpc.Init()
525
    try:
526
      # activate ip
527
      master_node = ssconf.SimpleStore().GetMasterNode()
528
      if not rpc.RpcRunner.call_node_start_master(master_node, False):
529
        logging.error("Can't activate master IP address")
530

    
531
      master.setup_queue()
532
      try:
533
        master.serve_forever()
534
      finally:
535
        master.server_cleanup()
536
    finally:
537
      rpc.Shutdown()
538
  finally:
539
    utils.RemovePidFile(constants.MASTERD_PID)
540
    utils.RemoveFile(constants.MASTER_SOCKET)
541

    
542

    
543
if __name__ == "__main__":
544
  main()