Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 04ccf5e9

History | View | Annotate | Download (15.5 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import daemon
47
from ganeti import mcpu
48
from ganeti import opcodes
49
from ganeti import jqueue
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import utils
53
from ganeti import errors
54
from ganeti import ssconf
55
from ganeti import workerpool
56
from ganeti import rpc
57
from ganeti import bootstrap
58
from ganeti import serializer
59

    
60

    
61
CLIENT_REQUEST_WORKERS = 16
62

    
63
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
64
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
65

    
66

    
67
class ClientRequestWorker(workerpool.BaseWorker):
68
  def RunTask(self, server, request, client_address):
69
    """Process the request.
70

    
71
    This is copied from the code in ThreadingMixIn.
72

    
73
    """
74
    try:
75
      server.finish_request(request, client_address)
76
      server.close_request(request)
77
    except:
78
      server.handle_error(request, client_address)
79
      server.close_request(request)
80

    
81

    
82
class IOServer(SocketServer.UnixStreamServer):
83
  """IO thread class.
84

    
85
  This class takes care of initializing the other threads, setting
86
  signal handlers (which are processed only in this thread), and doing
87
  cleanup at shutdown.
88

    
89
  """
90
  def __init__(self, address, rqhandler):
91
    """IOServer constructor
92

    
93
    @param address: the address to bind this IOServer to
94
    @param rqhandler: RequestHandler type object
95

    
96
    """
97
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
98

    
99
    # We'll only start threads once we've forked.
100
    self.context = None
101
    self.request_workers = None
102

    
103
  def setup_queue(self):
104
    self.context = GanetiContext()
105
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
106
                                                 ClientRequestWorker)
107

    
108
  def process_request(self, request, client_address):
109
    """Add task to workerpool to process request.
110

    
111
    """
112
    self.request_workers.AddTask(self, request, client_address)
113

    
114
  def serve_forever(self):
115
    """Handle one request at a time until told to quit."""
116
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
117
    try:
118
      while not sighandler.called:
119
        self.handle_request()
120
    finally:
121
      sighandler.Reset()
122

    
123
  def server_cleanup(self):
124
    """Cleanup the server.
125

    
126
    This involves shutting down the processor threads and the master
127
    socket.
128

    
129
    """
130
    try:
131
      self.server_close()
132
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
      if self.context:
136
        self.context.jobqueue.Shutdown()
137

    
138

    
139
class ClientRqHandler(SocketServer.BaseRequestHandler):
140
  """Client handler"""
141
  EOM = '\3'
142
  READ_SIZE = 4096
143

    
144
  def setup(self):
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147
    self._ops = ClientOps(self.server)
148

    
149
  def handle(self):
150
    while True:
151
      msg = self.read_message()
152
      if msg is None:
153
        logging.debug("client closed connection")
154
        break
155

    
156
      request = serializer.LoadJson(msg)
157
      logging.debug("request: %s", request)
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
160
        break
161

    
162
      method = request.get(luxi.KEY_METHOD, None)
163
      args = request.get(luxi.KEY_ARGS, None)
164
      if method is None or args is None:
165
        logging.error("no method or args in request")
166
        break
167

    
168
      success = False
169
      try:
170
        result = self._ops.handle_request(method, args)
171
        success = True
172
      except errors.GenericError, err:
173
        success = False
174
        result = (err.__class__.__name__, err.args)
175
      except:
176
        logging.error("Unexpected exception", exc_info=True)
177
        err = sys.exc_info()
178
        result = "Caught exception: %s" % str(err[1])
179

    
180
      response = {
181
        luxi.KEY_SUCCESS: success,
182
        luxi.KEY_RESULT: result,
183
        }
184
      logging.debug("response: %s", response)
185
      self.send_message(serializer.DumpJson(response))
186

    
187
  def read_message(self):
188
    while not self._msgs:
189
      data = self.request.recv(self.READ_SIZE)
190
      if not data:
191
        return None
192
      new_msgs = (self._buffer + data).split(self.EOM)
193
      self._buffer = new_msgs.pop()
194
      self._msgs.extend(new_msgs)
195
    return self._msgs.popleft()
196

    
197
  def send_message(self, msg):
198
    #print "sending", msg
199
    # TODO: sendall is not guaranteed to send everything
200
    self.request.sendall(msg + self.EOM)
201

    
202

    
203
class ClientOps:
204
  """Class holding high-level client operations."""
205
  def __init__(self, server):
206
    self.server = server
207

    
208
  def handle_request(self, method, args):
209
    queue = self.server.context.jobqueue
210

    
211
    # TODO: Parameter validation
212

    
213
    if method == luxi.REQ_SUBMIT_JOB:
214
      logging.info("Received new job")
215
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
216
      return queue.SubmitJob(ops)
217

    
218
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
219
      logging.info("Received multiple jobs")
220
      jobs = []
221
      for ops in args:
222
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
223
      return queue.SubmitManyJobs(jobs)
224

    
225
    elif method == luxi.REQ_CANCEL_JOB:
226
      job_id = args
227
      logging.info("Received job cancel request for %s", job_id)
228
      return queue.CancelJob(job_id)
229

    
230
    elif method == luxi.REQ_ARCHIVE_JOB:
231
      job_id = args
232
      logging.info("Received job archive request for %s", job_id)
233
      return queue.ArchiveJob(job_id)
234

    
235
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
236
      (age, timeout) = args
237
      logging.info("Received job autoarchive request for age %s, timeout %s",
238
                   age, timeout)
239
      return queue.AutoArchiveJobs(age, timeout)
240

    
241
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
242
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
243
      logging.info("Received job poll request for %s", job_id)
244
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
245
                                     prev_log_serial, timeout)
246

    
247
    elif method == luxi.REQ_QUERY_JOBS:
248
      (job_ids, fields) = args
249
      if isinstance(job_ids, (tuple, list)) and job_ids:
250
        msg = ", ".join(job_ids)
251
      else:
252
        msg = str(job_ids)
253
      logging.info("Received job query request for %s", msg)
254
      return queue.QueryJobs(job_ids, fields)
255

    
256
    elif method == luxi.REQ_QUERY_INSTANCES:
257
      (names, fields, use_locking) = args
258
      logging.info("Received instance query request for %s", names)
259
      if use_locking:
260
        raise errors.OpPrereqError("Sync queries are not allowed")
261
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262
                                    use_locking=use_locking)
263
      return self._Query(op)
264

    
265
    elif method == luxi.REQ_QUERY_NODES:
266
      (names, fields, use_locking) = args
267
      logging.info("Received node query request for %s", names)
268
      if use_locking:
269
        raise errors.OpPrereqError("Sync queries are not allowed")
270
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
271
                                use_locking=use_locking)
272
      return self._Query(op)
273

    
274
    elif method == luxi.REQ_QUERY_EXPORTS:
275
      nodes, use_locking = args
276
      if use_locking:
277
        raise errors.OpPrereqError("Sync queries are not allowed")
278
      logging.info("Received exports query request")
279
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
280
      return self._Query(op)
281

    
282
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
283
      fields = args
284
      logging.info("Received config values query request for %s", fields)
285
      op = opcodes.OpQueryConfigValues(output_fields=fields)
286
      return self._Query(op)
287

    
288
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
289
      logging.info("Received cluster info query request")
290
      op = opcodes.OpQueryClusterInfo()
291
      return self._Query(op)
292

    
293
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
294
      drain_flag = args
295
      logging.info("Received queue drain flag change request to %s",
296
                   drain_flag)
297
      return queue.SetDrainFlag(drain_flag)
298

    
299
    else:
300
      logging.info("Received invalid request '%s'", method)
301
      raise ValueError("Invalid operation '%s'" % method)
302

    
303
  def _DummyLog(self, *args):
304
    pass
305

    
306
  def _Query(self, op):
307
    """Runs the specified opcode and returns the result.
308

    
309
    """
310
    proc = mcpu.Processor(self.server.context)
311
    # TODO: Where should log messages go?
312
    return proc.ExecOpCode(op, self._DummyLog, None)
313

    
314

    
315
class GanetiContext(object):
316
  """Context common to all ganeti threads.
317

    
318
  This class creates and holds common objects shared by all threads.
319

    
320
  """
321
  _instance = None
322

    
323
  def __init__(self):
324
    """Constructs a new GanetiContext object.
325

    
326
    There should be only a GanetiContext object at any time, so this
327
    function raises an error if this is not the case.
328

    
329
    """
330
    assert self.__class__._instance is None, "double GanetiContext instance"
331

    
332
    # Create global configuration object
333
    self.cfg = config.ConfigWriter()
334

    
335
    # Locking manager
336
    self.glm = locking.GanetiLockManager(
337
                self.cfg.GetNodeList(),
338
                self.cfg.GetInstanceList())
339

    
340
    # Job queue
341
    self.jobqueue = jqueue.JobQueue(self)
342

    
343
    # setting this also locks the class against attribute modifications
344
    self.__class__._instance = self
345

    
346
  def __setattr__(self, name, value):
347
    """Setting GanetiContext attributes is forbidden after initialization.
348

    
349
    """
350
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
351
    object.__setattr__(self, name, value)
352

    
353
  def AddNode(self, node):
354
    """Adds a node to the configuration and lock manager.
355

    
356
    """
357
    # Add it to the configuration
358
    self.cfg.AddNode(node)
359

    
360
    # If preseeding fails it'll not be added
361
    self.jobqueue.AddNode(node)
362

    
363
    # Add the new node to the Ganeti Lock Manager
364
    self.glm.add(locking.LEVEL_NODE, node.name)
365

    
366
  def ReaddNode(self, node):
367
    """Updates a node that's already in the configuration
368

    
369
    """
370
    # Synchronize the queue again
371
    self.jobqueue.AddNode(node)
372

    
373
  def RemoveNode(self, name):
374
    """Removes a node from the configuration and lock manager.
375

    
376
    """
377
    # Remove node from configuration
378
    self.cfg.RemoveNode(name)
379

    
380
    # Notify job queue
381
    self.jobqueue.RemoveNode(name)
382

    
383
    # Remove the node from the Ganeti Lock Manager
384
    self.glm.remove(locking.LEVEL_NODE, name)
385

    
386

    
387
def CheckAgreement():
388
  """Check the agreement on who is the master.
389

    
390
  The function uses a very simple algorithm: we must get more positive
391
  than negative answers. Since in most of the cases we are the master,
392
  we'll use our own config file for getting the node list. In the
393
  future we could collect the current node list from our (possibly
394
  obsolete) known nodes.
395

    
396
  In order to account for cold-start of all nodes, we retry for up to
397
  a minute until we get a real answer as the top-voted one. If the
398
  nodes are more out-of-sync, for now manual startup of the master
399
  should be attempted.
400

    
401
  Note that for a even number of nodes cluster, we need at least half
402
  of the nodes (beside ourselves) to vote for us. This creates a
403
  problem on two-node clusters, since in this case we require the
404
  other node to be up too to confirm our status.
405

    
406
  """
407
  myself = utils.HostInfo().name
408
  #temp instantiation of a config writer, used only to get the node list
409
  cfg = config.ConfigWriter()
410
  node_list = cfg.GetNodeList()
411
  del cfg
412
  retries = 6
413
  while retries > 0:
414
    votes = bootstrap.GatherMasterVotes(node_list)
415
    if not votes:
416
      # empty node list, this is a one node cluster
417
      return True
418
    if votes[0][0] is None:
419
      retries -= 1
420
      time.sleep(10)
421
      continue
422
    break
423
  if retries == 0:
424
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
425
                     " after multiple retries. Aborting startup")
426
    return False
427
  # here a real node is at the top of the list
428
  all_votes = sum(item[1] for item in votes)
429
  top_node, top_votes = votes[0]
430
  result = False
431
  if top_node != myself:
432
    logging.critical("It seems we are not the master (top-voted node"
433
                     " is %s with %d out of %d votes)", top_node, top_votes,
434
                     all_votes)
435
  elif top_votes < all_votes - top_votes:
436
    logging.critical("It seems we are not the master (%d votes for,"
437
                     " %d votes against)", top_votes, all_votes - top_votes)
438
  else:
439
    result = True
440

    
441
  return result
442

    
443
def CheckMASTERD(options, args):
444
  """Initial checks whether to run or exit with a failure
445

    
446
  """
447
  rpc.Init()
448
  try:
449
    ssconf.CheckMaster(options.debug)
450

    
451
    # we believe we are the master, let's ask the other nodes...
452
    if options.no_voting and not options.yes_do_it:
453
      sys.stdout.write("The 'no voting' option has been selected.\n")
454
      sys.stdout.write("This is dangerous, please confirm by"
455
                       " typing uppercase 'yes': ")
456
      sys.stdout.flush()
457
      confirmation = sys.stdin.readline().strip()
458
      if confirmation != "YES":
459
        print "Aborting."
460
        return
461
    elif not options.no_voting:
462
      if not CheckAgreement():
463
        return
464
  finally:
465
    rpc.Shutdown()
466

    
467

    
468
def ExecMASTERD(options, args):
469
  """Main MASTERD function, executed with the pidfile held.
470

    
471
  """
472
  # This is safe to do as the pid file guarantees against
473
  # concurrent execution.
474
  utils.RemoveFile(constants.MASTER_SOCKET)
475

    
476
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
477
  try:
478
    rpc.Init()
479
    try:
480
      # activate ip
481
      master_node = ssconf.SimpleStore().GetMasterNode()
482
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
483
      msg = result.RemoteFailMsg()
484
      if msg:
485
        logging.error("Can't activate master IP address: %s", msg)
486

    
487
      master.setup_queue()
488
      try:
489
        master.serve_forever()
490
      finally:
491
        master.server_cleanup()
492
    finally:
493
      rpc.Shutdown()
494
  finally:
495
    utils.RemoveFile(constants.MASTER_SOCKET)
496

    
497

    
498
def main():
499
  """Main function"""
500
  parser = OptionParser(description="Ganeti master daemon",
501
                        usage="%prog [-f] [-d]",
502
                        version="%%prog (ganeti) %s" %
503
                        constants.RELEASE_VERSION)
504
  parser.add_option("--no-voting", dest="no_voting",
505
                    help="Do not check that the nodes agree on this node"
506
                    " being the master and start the daemon unconditionally",
507
                    default=False, action="store_true")
508
  parser.add_option("--yes-do-it", dest="yes_do_it",
509
                    help="Override interactive check for --no-voting",
510
                    default=False, action="store_true")
511
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
512
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
513
         ]
514
  daemon.GenericMain(constants.MASTERD, parser, dirs,
515
                     CheckMASTERD, ExecMASTERD)
516

    
517
if __name__ == "__main__":
518
  main()