Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ cdd7f900

History | View | Annotate | Download (16.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import sys
33
import socket
34
import SocketServer
35
import time
36
import collections
37
import logging
38

    
39
from optparse import OptionParser
40

    
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import daemon
44
from ganeti import mcpu
45
from ganeti import opcodes
46
from ganeti import jqueue
47
from ganeti import locking
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55

    
56

    
57
CLIENT_REQUEST_WORKERS = 16
58

    
59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61

    
62

    
63
class ClientRequestWorker(workerpool.BaseWorker):
64
   # pylint: disable-msg=W0221
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    """
69
    try:
70
      server.request_handler_class(request, client_address, server)
71
    finally:
72
      request.close()
73

    
74

    
75
class MasterServer(daemon.AsyncStreamServer):
76
  """Master Server.
77

    
78
  This is the main asynchronous master server. It handles connections to the
79
  master socket.
80

    
81
  """
82
  def __init__(self, mainloop, address, handler_class):
83
    """MasterServer constructor
84

    
85
    @type mainloop: ganeti.daemon.Mainloop
86
    @param mainloop: Mainloop used to poll for I/O events
87
    @param address: the unix socket address to bind the MasterServer to
88
    @param handler_class: handler class for the connections
89

    
90
    """
91
    daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address)
92
    self.request_handler_class = handler_class
93
    self.mainloop = mainloop
94

    
95
    # We'll only start threads once we've forked.
96
    self.context = None
97
    self.request_workers = None
98

    
99
  def handle_connection(self, connected_socket, client_address):
100
    self.request_workers.AddTask(self, connected_socket, client_address)
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool("ClientReq",
105
                                                 CLIENT_REQUEST_WORKERS,
106
                                                 ClientRequestWorker)
107

    
108
  def server_cleanup(self):
109
    """Cleanup the server.
110

    
111
    This involves shutting down the processor threads and the master
112
    socket.
113

    
114
    """
115
    try:
116
      self.close()
117
    finally:
118
      if self.request_workers:
119
        self.request_workers.TerminateWorkers()
120
      if self.context:
121
        self.context.jobqueue.Shutdown()
122

    
123

    
124
class ClientRqHandler(SocketServer.BaseRequestHandler):
125
  """Client handler"""
126
  READ_SIZE = 4096
127

    
128
  def setup(self):
129
    # pylint: disable-msg=W0201
130
    # setup() is the api for initialising for this class
131
    self._buffer = ""
132
    self._msgs = collections.deque()
133
    self._ops = ClientOps(self.server)
134

    
135
  def handle(self):
136
    while True:
137
      msg = self.read_message()
138
      if msg is None:
139
        logging.debug("client closed connection")
140
        break
141

    
142
      (method, args) = luxi.ParseRequest(msg)
143

    
144
      success = False
145
      try:
146
        result = self._ops.handle_request(method, args)
147
        success = True
148
      except errors.GenericError, err:
149
        logging.exception("Unexpected exception")
150
        result = errors.EncodeException(err)
151
      except:
152
        logging.exception("Unexpected exception")
153
        result = "Caught exception: %s" % str(sys.exc_info()[1])
154

    
155
      self.send_message(luxi.FormatResponse(success, result))
156

    
157
  def read_message(self):
158
    while not self._msgs:
159
      data = self.request.recv(self.READ_SIZE)
160
      if not data:
161
        return None
162
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
163
      self._buffer = new_msgs.pop()
164
      self._msgs.extend(new_msgs)
165
    return self._msgs.popleft()
166

    
167
  def send_message(self, msg):
168
    # TODO: sendall is not guaranteed to send everything
169
    self.request.sendall(msg + constants.LUXI_EOM)
170

    
171

    
172
class ClientOps:
173
  """Class holding high-level client operations."""
174
  def __init__(self, server):
175
    self.server = server
176

    
177
  def handle_request(self, method, args): # pylint: disable-msg=R0911
178
    queue = self.server.context.jobqueue
179

    
180
    # TODO: Parameter validation
181

    
182
    # TODO: Rewrite to not exit in each 'if/elif' branch
183

    
184
    if method == luxi.REQ_SUBMIT_JOB:
185
      logging.info("Received new job")
186
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
187
      return queue.SubmitJob(ops)
188

    
189
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
190
      logging.info("Received multiple jobs")
191
      jobs = []
192
      for ops in args:
193
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
194
      return queue.SubmitManyJobs(jobs)
195

    
196
    elif method == luxi.REQ_CANCEL_JOB:
197
      job_id = args
198
      logging.info("Received job cancel request for %s", job_id)
199
      return queue.CancelJob(job_id)
200

    
201
    elif method == luxi.REQ_ARCHIVE_JOB:
202
      job_id = args
203
      logging.info("Received job archive request for %s", job_id)
204
      return queue.ArchiveJob(job_id)
205

    
206
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
207
      (age, timeout) = args
208
      logging.info("Received job autoarchive request for age %s, timeout %s",
209
                   age, timeout)
210
      return queue.AutoArchiveJobs(age, timeout)
211

    
212
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
213
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
214
      logging.info("Received job poll request for %s", job_id)
215
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
216
                                     prev_log_serial, timeout)
217

    
218
    elif method == luxi.REQ_QUERY_JOBS:
219
      (job_ids, fields) = args
220
      if isinstance(job_ids, (tuple, list)) and job_ids:
221
        msg = utils.CommaJoin(job_ids)
222
      else:
223
        msg = str(job_ids)
224
      logging.info("Received job query request for %s", msg)
225
      return queue.QueryJobs(job_ids, fields)
226

    
227
    elif method == luxi.REQ_QUERY_INSTANCES:
228
      (names, fields, use_locking) = args
229
      logging.info("Received instance query request for %s", names)
230
      if use_locking:
231
        raise errors.OpPrereqError("Sync queries are not allowed",
232
                                   errors.ECODE_INVAL)
233
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
234
                                    use_locking=use_locking)
235
      return self._Query(op)
236

    
237
    elif method == luxi.REQ_QUERY_NODES:
238
      (names, fields, use_locking) = args
239
      logging.info("Received node query request for %s", names)
240
      if use_locking:
241
        raise errors.OpPrereqError("Sync queries are not allowed",
242
                                   errors.ECODE_INVAL)
243
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
244
                                use_locking=use_locking)
245
      return self._Query(op)
246

    
247
    elif method == luxi.REQ_QUERY_EXPORTS:
248
      nodes, use_locking = args
249
      if use_locking:
250
        raise errors.OpPrereqError("Sync queries are not allowed",
251
                                   errors.ECODE_INVAL)
252
      logging.info("Received exports query request")
253
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
254
      return self._Query(op)
255

    
256
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
257
      fields = args
258
      logging.info("Received config values query request for %s", fields)
259
      op = opcodes.OpQueryConfigValues(output_fields=fields)
260
      return self._Query(op)
261

    
262
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
263
      logging.info("Received cluster info query request")
264
      op = opcodes.OpQueryClusterInfo()
265
      return self._Query(op)
266

    
267
    elif method == luxi.REQ_QUERY_TAGS:
268
      kind, name = args
269
      logging.info("Received tags query request")
270
      op = opcodes.OpGetTags(kind=kind, name=name)
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
274
      drain_flag = args
275
      logging.info("Received queue drain flag change request to %s",
276
                   drain_flag)
277
      return queue.SetDrainFlag(drain_flag)
278

    
279
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
280
      (until, ) = args
281

    
282
      if until is None:
283
        logging.info("Received request to no longer pause the watcher")
284
      else:
285
        if not isinstance(until, (int, float)):
286
          raise TypeError("Duration must be an integer or float")
287

    
288
        if until < time.time():
289
          raise errors.GenericError("Unable to set pause end time in the past")
290

    
291
        logging.info("Received request to pause the watcher until %s", until)
292

    
293
      return _SetWatcherPause(until)
294

    
295
    else:
296
      logging.info("Received invalid request '%s'", method)
297
      raise ValueError("Invalid operation '%s'" % method)
298

    
299
  def _Query(self, op):
300
    """Runs the specified opcode and returns the result.
301

    
302
    """
303
    # Queries don't have a job id
304
    proc = mcpu.Processor(self.server.context, None)
305
    return proc.ExecOpCode(op, None)
306

    
307

    
308
class GanetiContext(object):
309
  """Context common to all ganeti threads.
310

    
311
  This class creates and holds common objects shared by all threads.
312

    
313
  """
314
  # pylint: disable-msg=W0212
315
  # we do want to ensure a singleton here
316
  _instance = None
317

    
318
  def __init__(self):
319
    """Constructs a new GanetiContext object.
320

    
321
    There should be only a GanetiContext object at any time, so this
322
    function raises an error if this is not the case.
323

    
324
    """
325
    assert self.__class__._instance is None, "double GanetiContext instance"
326

    
327
    # Create global configuration object
328
    self.cfg = config.ConfigWriter()
329

    
330
    # Locking manager
331
    self.glm = locking.GanetiLockManager(
332
                self.cfg.GetNodeList(),
333
                self.cfg.GetInstanceList())
334

    
335
    # Job queue
336
    self.jobqueue = jqueue.JobQueue(self)
337

    
338
    # setting this also locks the class against attribute modifications
339
    self.__class__._instance = self
340

    
341
  def __setattr__(self, name, value):
342
    """Setting GanetiContext attributes is forbidden after initialization.
343

    
344
    """
345
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
346
    object.__setattr__(self, name, value)
347

    
348
  def AddNode(self, node, ec_id):
349
    """Adds a node to the configuration and lock manager.
350

    
351
    """
352
    # Add it to the configuration
353
    self.cfg.AddNode(node, ec_id)
354

    
355
    # If preseeding fails it'll not be added
356
    self.jobqueue.AddNode(node)
357

    
358
    # Add the new node to the Ganeti Lock Manager
359
    self.glm.add(locking.LEVEL_NODE, node.name)
360

    
361
  def ReaddNode(self, node):
362
    """Updates a node that's already in the configuration
363

    
364
    """
365
    # Synchronize the queue again
366
    self.jobqueue.AddNode(node)
367

    
368
  def RemoveNode(self, name):
369
    """Removes a node from the configuration and lock manager.
370

    
371
    """
372
    # Remove node from configuration
373
    self.cfg.RemoveNode(name)
374

    
375
    # Notify job queue
376
    self.jobqueue.RemoveNode(name)
377

    
378
    # Remove the node from the Ganeti Lock Manager
379
    self.glm.remove(locking.LEVEL_NODE, name)
380

    
381

    
382
def _SetWatcherPause(until):
383
  """Creates or removes the watcher pause file.
384

    
385
  @type until: None or int
386
  @param until: Unix timestamp saying until when the watcher shouldn't run
387

    
388
  """
389
  if until is None:
390
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
391
  else:
392
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
393
                    data="%d\n" % (until, ))
394

    
395
  return until
396

    
397

    
398
def CheckAgreement():
399
  """Check the agreement on who is the master.
400

    
401
  The function uses a very simple algorithm: we must get more positive
402
  than negative answers. Since in most of the cases we are the master,
403
  we'll use our own config file for getting the node list. In the
404
  future we could collect the current node list from our (possibly
405
  obsolete) known nodes.
406

    
407
  In order to account for cold-start of all nodes, we retry for up to
408
  a minute until we get a real answer as the top-voted one. If the
409
  nodes are more out-of-sync, for now manual startup of the master
410
  should be attempted.
411

    
412
  Note that for a even number of nodes cluster, we need at least half
413
  of the nodes (beside ourselves) to vote for us. This creates a
414
  problem on two-node clusters, since in this case we require the
415
  other node to be up too to confirm our status.
416

    
417
  """
418
  myself = utils.HostInfo().name
419
  #temp instantiation of a config writer, used only to get the node list
420
  cfg = config.ConfigWriter()
421
  node_list = cfg.GetNodeList()
422
  del cfg
423
  retries = 6
424
  while retries > 0:
425
    votes = bootstrap.GatherMasterVotes(node_list)
426
    if not votes:
427
      # empty node list, this is a one node cluster
428
      return True
429
    if votes[0][0] is None:
430
      retries -= 1
431
      time.sleep(10)
432
      continue
433
    break
434
  if retries == 0:
435
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
436
                     " after multiple retries. Aborting startup")
437
    logging.critical("Use the --no-voting option if you understand what"
438
                     " effects it has on the cluster state")
439
    return False
440
  # here a real node is at the top of the list
441
  all_votes = sum(item[1] for item in votes)
442
  top_node, top_votes = votes[0]
443

    
444
  result = False
445
  if top_node != myself:
446
    logging.critical("It seems we are not the master (top-voted node"
447
                     " is %s with %d out of %d votes)", top_node, top_votes,
448
                     all_votes)
449
  elif top_votes < all_votes - top_votes:
450
    logging.critical("It seems we are not the master (%d votes for,"
451
                     " %d votes against)", top_votes, all_votes - top_votes)
452
  else:
453
    result = True
454

    
455
  return result
456

    
457

    
458
def CheckAgreementWithRpc():
459
  rpc.Init()
460
  try:
461
    return CheckAgreement()
462
  finally:
463
    rpc.Shutdown()
464

    
465

    
466
def CheckMasterd(options, args):
467
  """Initial checks whether to run or exit with a failure.
468

    
469
  """
470
  if args: # masterd doesn't take any arguments
471
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
472
    sys.exit(constants.EXIT_FAILURE)
473

    
474
  ssconf.CheckMaster(options.debug)
475

    
476
  # If CheckMaster didn't fail we believe we are the master, but we have to
477
  # confirm with the other nodes.
478
  if options.no_voting:
479
    if options.yes_do_it:
480
      return
481

    
482
    sys.stdout.write("The 'no voting' option has been selected.\n")
483
    sys.stdout.write("This is dangerous, please confirm by"
484
                     " typing uppercase 'yes': ")
485
    sys.stdout.flush()
486

    
487
    confirmation = sys.stdin.readline().strip()
488
    if confirmation != "YES":
489
      print >> sys.stderr, "Aborting."
490
      sys.exit(constants.EXIT_FAILURE)
491

    
492
    return
493

    
494
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
495
  # process before we call utils.Daemonize in the current process.
496
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
497
    sys.exit(constants.EXIT_FAILURE)
498

    
499

    
500
def ExecMasterd (options, args): # pylint: disable-msg=W0613
501
  """Main master daemon function, executed with the PID file held.
502

    
503
  """
504
  # This is safe to do as the pid file guarantees against
505
  # concurrent execution.
506
  utils.RemoveFile(constants.MASTER_SOCKET)
507

    
508
  mainloop = daemon.Mainloop()
509
  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler)
510
  try:
511
    rpc.Init()
512
    try:
513
      # activate ip
514
      master_node = ssconf.SimpleStore().GetMasterNode()
515
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
516
      msg = result.fail_msg
517
      if msg:
518
        logging.error("Can't activate master IP address: %s", msg)
519

    
520
      master.setup_queue()
521
      try:
522
        mainloop.Run()
523
      finally:
524
        master.server_cleanup()
525
    finally:
526
      rpc.Shutdown()
527
  finally:
528
    utils.RemoveFile(constants.MASTER_SOCKET)
529

    
530

    
531
def main():
532
  """Main function"""
533
  parser = OptionParser(description="Ganeti master daemon",
534
                        usage="%prog [-f] [-d]",
535
                        version="%%prog (ganeti) %s" %
536
                        constants.RELEASE_VERSION)
537
  parser.add_option("--no-voting", dest="no_voting",
538
                    help="Do not check that the nodes agree on this node"
539
                    " being the master and start the daemon unconditionally",
540
                    default=False, action="store_true")
541
  parser.add_option("--yes-do-it", dest="yes_do_it",
542
                    help="Override interactive check for --no-voting",
543
                    default=False, action="store_true")
544
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
545
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
546
         ]
547
  daemon.GenericMain(constants.MASTERD, parser, dirs,
548
                     CheckMasterd, ExecMasterd,
549
                     multithreaded=True)
550

    
551

    
552
if __name__ == "__main__":
553
  main()