Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 90b54c26

History | View | Annotate | Download (16 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import serializer
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except:
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = serializer.LoadJson(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = (err.__class__.__name__, err.args)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(serializer.DumpJson(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    self.request.sendall(msg + self.EOM)
199

    
200

    
201
class ClientOps:
202
  """Class holding high-level client operations."""
203
  def __init__(self, server):
204
    self.server = server
205

    
206
  def handle_request(self, method, args):
207
    queue = self.server.context.jobqueue
208

    
209
    # TODO: Parameter validation
210

    
211
    if method == luxi.REQ_SUBMIT_JOB:
212
      logging.info("Received new job")
213
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
214
      return queue.SubmitJob(ops)
215

    
216
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
217
      logging.info("Received multiple jobs")
218
      jobs = []
219
      for ops in args:
220
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
221
      return queue.SubmitManyJobs(jobs)
222

    
223
    elif method == luxi.REQ_CANCEL_JOB:
224
      job_id = args
225
      logging.info("Received job cancel request for %s", job_id)
226
      return queue.CancelJob(job_id)
227

    
228
    elif method == luxi.REQ_ARCHIVE_JOB:
229
      job_id = args
230
      logging.info("Received job archive request for %s", job_id)
231
      return queue.ArchiveJob(job_id)
232

    
233
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
234
      (age, timeout) = args
235
      logging.info("Received job autoarchive request for age %s, timeout %s",
236
                   age, timeout)
237
      return queue.AutoArchiveJobs(age, timeout)
238

    
239
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
240
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
241
      logging.info("Received job poll request for %s", job_id)
242
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
243
                                     prev_log_serial, timeout)
244

    
245
    elif method == luxi.REQ_QUERY_JOBS:
246
      (job_ids, fields) = args
247
      if isinstance(job_ids, (tuple, list)) and job_ids:
248
        msg = ", ".join(job_ids)
249
      else:
250
        msg = str(job_ids)
251
      logging.info("Received job query request for %s", msg)
252
      return queue.QueryJobs(job_ids, fields)
253

    
254
    elif method == luxi.REQ_QUERY_INSTANCES:
255
      (names, fields, use_locking) = args
256
      logging.info("Received instance query request for %s", names)
257
      if use_locking:
258
        raise errors.OpPrereqError("Sync queries are not allowed")
259
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
260
                                    use_locking=use_locking)
261
      return self._Query(op)
262

    
263
    elif method == luxi.REQ_QUERY_NODES:
264
      (names, fields, use_locking) = args
265
      logging.info("Received node query request for %s", names)
266
      if use_locking:
267
        raise errors.OpPrereqError("Sync queries are not allowed")
268
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
269
                                use_locking=use_locking)
270
      return self._Query(op)
271

    
272
    elif method == luxi.REQ_QUERY_EXPORTS:
273
      nodes, use_locking = args
274
      if use_locking:
275
        raise errors.OpPrereqError("Sync queries are not allowed")
276
      logging.info("Received exports query request")
277
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
278
      return self._Query(op)
279

    
280
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
281
      fields = args
282
      logging.info("Received config values query request for %s", fields)
283
      op = opcodes.OpQueryConfigValues(output_fields=fields)
284
      return self._Query(op)
285

    
286
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
287
      logging.info("Received cluster info query request")
288
      op = opcodes.OpQueryClusterInfo()
289
      return self._Query(op)
290

    
291
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
292
      drain_flag = args
293
      logging.info("Received queue drain flag change request to %s",
294
                   drain_flag)
295
      return queue.SetDrainFlag(drain_flag)
296

    
297
    else:
298
      logging.info("Received invalid request '%s'", method)
299
      raise ValueError("Invalid operation '%s'" % method)
300

    
301
  def _DummyLog(self, *args):
302
    pass
303

    
304
  def _Query(self, op):
305
    """Runs the specified opcode and returns the result.
306

    
307
    """
308
    proc = mcpu.Processor(self.server.context)
309
    # TODO: Where should log messages go?
310
    return proc.ExecOpCode(op, self._DummyLog, None)
311

    
312

    
313
class GanetiContext(object):
314
  """Context common to all ganeti threads.
315

    
316
  This class creates and holds common objects shared by all threads.
317

    
318
  """
319
  _instance = None
320

    
321
  def __init__(self):
322
    """Constructs a new GanetiContext object.
323

    
324
    There should be only a GanetiContext object at any time, so this
325
    function raises an error if this is not the case.
326

    
327
    """
328
    assert self.__class__._instance is None, "double GanetiContext instance"
329

    
330
    # Create global configuration object
331
    self.cfg = config.ConfigWriter()
332

    
333
    # Locking manager
334
    self.glm = locking.GanetiLockManager(
335
                self.cfg.GetNodeList(),
336
                self.cfg.GetInstanceList())
337

    
338
    # Job queue
339
    self.jobqueue = jqueue.JobQueue(self)
340

    
341
    # setting this also locks the class against attribute modifications
342
    self.__class__._instance = self
343

    
344
  def __setattr__(self, name, value):
345
    """Setting GanetiContext attributes is forbidden after initialization.
346

    
347
    """
348
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
349
    object.__setattr__(self, name, value)
350

    
351
  def AddNode(self, node):
352
    """Adds a node to the configuration and lock manager.
353

    
354
    """
355
    # Add it to the configuration
356
    self.cfg.AddNode(node)
357

    
358
    # If preseeding fails it'll not be added
359
    self.jobqueue.AddNode(node)
360

    
361
    # Add the new node to the Ganeti Lock Manager
362
    self.glm.add(locking.LEVEL_NODE, node.name)
363

    
364
  def ReaddNode(self, node):
365
    """Updates a node that's already in the configuration
366

    
367
    """
368
    # Synchronize the queue again
369
    self.jobqueue.AddNode(node)
370

    
371
  def RemoveNode(self, name):
372
    """Removes a node from the configuration and lock manager.
373

    
374
    """
375
    # Remove node from configuration
376
    self.cfg.RemoveNode(name)
377

    
378
    # Notify job queue
379
    self.jobqueue.RemoveNode(name)
380

    
381
    # Remove the node from the Ganeti Lock Manager
382
    self.glm.remove(locking.LEVEL_NODE, name)
383

    
384

    
385
def ParseOptions():
386
  """Parse the command line options.
387

    
388
  @return: (options, args) as from OptionParser.parse_args()
389

    
390
  """
391
  parser = OptionParser(description="Ganeti master daemon",
392
                        usage="%prog [-f] [-d]",
393
                        version="%%prog (ganeti) %s" %
394
                        constants.RELEASE_VERSION)
395

    
396
  parser.add_option("-f", "--foreground", dest="fork",
397
                    help="Don't detach from the current terminal",
398
                    default=True, action="store_false")
399
  parser.add_option("-d", "--debug", dest="debug",
400
                    help="Enable some debug messages",
401
                    default=False, action="store_true")
402
  parser.add_option("--no-voting", dest="no_voting",
403
                    help="Do not check that the nodes agree on this node"
404
                    " being the master and start the daemon unconditionally",
405
                    default=False, action="store_true")
406
  options, args = parser.parse_args()
407
  return options, args
408

    
409

    
410
def CheckAgreement():
411
  """Check the agreement on who is the master.
412

    
413
  The function uses a very simple algorithm: we must get more positive
414
  than negative answers. Since in most of the cases we are the master,
415
  we'll use our own config file for getting the node list. In the
416
  future we could collect the current node list from our (possibly
417
  obsolete) known nodes.
418

    
419
  In order to account for cold-start of all nodes, we retry for up to
420
  a minute until we get a real answer as the top-voted one. If the
421
  nodes are more out-of-sync, for now manual startup of the master
422
  should be attempted.
423

    
424
  Note that for a even number of nodes cluster, we need at least half
425
  of the nodes (beside ourselves) to vote for us. This creates a
426
  problem on two-node clusters, since in this case we require the
427
  other node to be up too to confirm our status.
428

    
429
  """
430
  myself = utils.HostInfo().name
431
  #temp instantiation of a config writer, used only to get the node list
432
  cfg = config.ConfigWriter()
433
  node_list = cfg.GetNodeList()
434
  del cfg
435
  retries = 6
436
  while retries > 0:
437
    votes = bootstrap.GatherMasterVotes(node_list)
438
    if not votes:
439
      # empty node list, this is a one node cluster
440
      return True
441
    if votes[0][0] is None:
442
      retries -= 1
443
      time.sleep(10)
444
      continue
445
    break
446
  if retries == 0:
447
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
448
                     " after multiple retries. Aborting startup")
449
    return False
450
  # here a real node is at the top of the list
451
  all_votes = sum(item[1] for item in votes)
452
  top_node, top_votes = votes[0]
453
  result = False
454
  if top_node != myself:
455
    logging.critical("It seems we are not the master (top-voted node"
456
                     " is %s with %d out of %d votes)", top_node, top_votes,
457
                     all_votes)
458
  elif top_votes < all_votes - top_votes:
459
    logging.critical("It seems we are not the master (%d votes for,"
460
                     " %d votes against)", top_votes, all_votes - top_votes)
461
  else:
462
    result = True
463

    
464
  return result
465

    
466

    
467
def main():
468
  """Main function"""
469

    
470
  options, args = ParseOptions()
471
  utils.debug = options.debug
472
  utils.no_fork = True
473

    
474
  if options.fork:
475
    utils.CloseFDs()
476

    
477
  rpc.Init()
478
  try:
479
    ssconf.CheckMaster(options.debug)
480

    
481
    # we believe we are the master, let's ask the other nodes...
482
    if options.no_voting:
483
      sys.stdout.write("The 'no voting' option has been selected.\n")
484
      sys.stdout.write("This is dangerous, please confirm by"
485
                       " typing uppercase 'yes': ")
486
      sys.stdout.flush()
487
      confirmation = sys.stdin.readline().strip()
488
      if confirmation != "YES":
489
        print "Aborting."
490
        return
491
    else:
492
      if not CheckAgreement():
493
        return
494

    
495
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
496
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
497
           ]
498
    utils.EnsureDirs(dirs)
499

    
500
    # This is safe to do as the pid file guarantees against
501
    # concurrent execution.
502
    utils.RemoveFile(constants.MASTER_SOCKET)
503

    
504
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
505
  finally:
506
    rpc.Shutdown()
507

    
508
  # become a daemon
509
  if options.fork:
510
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
511

    
512
  utils.WritePidFile(constants.MASTERD_PID)
513
  try:
514
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
515
                       stderr_logging=not options.fork, multithreaded=True)
516

    
517
    logging.info("Ganeti master daemon startup")
518

    
519
    rpc.Init()
520
    try:
521
      # activate ip
522
      master_node = ssconf.SimpleConfigReader().GetMasterNode()
523
      result = rpc.RpcRunner.call_node_start_master(master_node, False)
524
      msg = result.RemoteFailMsg()
525
      if msg:
526
        logging.error("Can't activate master IP address: %s", msg)
527

    
528
      master.setup_queue()
529
      try:
530
        master.serve_forever()
531
      finally:
532
        master.server_cleanup()
533
    finally:
534
      rpc.Shutdown()
535
  finally:
536
    utils.RemovePidFile(constants.MASTERD_PID)
537
    utils.RemoveFile(constants.MASTER_SOCKET)
538

    
539

    
540
if __name__ == "__main__":
541
  main()