Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 7e5a6e86

History | View | Annotate | Download (17.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57

    
58

    
59
CLIENT_REQUEST_WORKERS = 16
60

    
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63

    
64

    
65
class ClientRequestWorker(workerpool.BaseWorker):
66
   # pylint: disable-msg=W0221
67
  def RunTask(self, server, message, client):
68
    """Process the request.
69

    
70
    """
71
    client_ops = ClientOps(server)
72

    
73
    try:
74
      (method, args) = luxi.ParseRequest(message)
75
    except luxi.ProtocolError, err:
76
      logging.error("Protocol Error: %s", err)
77
      client.close_log()
78
      return
79

    
80
    success = False
81
    try:
82
      result = client_ops.handle_request(method, args)
83
      success = True
84
    except errors.GenericError, err:
85
      logging.exception("Unexpected exception")
86
      success = False
87
      result = errors.EncodeException(err)
88
    except:
89
      logging.exception("Unexpected exception")
90
      err = sys.exc_info()
91
      result = "Caught exception: %s" % str(err[1])
92

    
93
    try:
94
      reply = luxi.FormatResponse(success, result)
95
      client.send_message(reply)
96
      # awake the main thread so that it can write out the data.
97
      server.awaker.signal()
98
    except:
99
      logging.exception("Send error")
100
      client.close_log()
101

    
102

    
103
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
104
  """Handler for master peers.
105

    
106
  """
107
  _MAX_UNHANDLED = 1
108
  def __init__(self, server, connected_socket, client_address, family):
109
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
110
                                                 client_address,
111
                                                 constants.LUXI_EOM,
112
                                                 family, self._MAX_UNHANDLED)
113
    self.server = server
114

    
115
  def handle_message(self, message, _):
116
    self.server.request_workers.AddTask(self.server, message, self)
117

    
118

    
119
class MasterServer(daemon.AsyncStreamServer):
120
  """Master Server.
121

    
122
  This is the main asynchronous master server. It handles connections to the
123
  master socket.
124

    
125
  """
126
  family = socket.AF_UNIX
127

    
128
  def __init__(self, mainloop, address, uid, gid):
129
    """MasterServer constructor
130

    
131
    @type mainloop: ganeti.daemon.Mainloop
132
    @param mainloop: Mainloop used to poll for I/O events
133
    @param address: the unix socket address to bind the MasterServer to
134
    @param uid: The uid of the owner of the socket
135
    @param gid: The gid of the owner of the socket
136

    
137
    """
138
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
139
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
140
    os.chmod(temp_name, 0770)
141
    os.chown(temp_name, uid, gid)
142
    os.rename(temp_name, address)
143

    
144
    self.mainloop = mainloop
145
    self.awaker = daemon.AsyncAwaker()
146

    
147
    # We'll only start threads once we've forked.
148
    self.context = None
149
    self.request_workers = None
150

    
151
  def handle_connection(self, connected_socket, client_address):
152
    # TODO: add connection count and limit the number of open connections to a
153
    # maximum number to avoid breaking for lack of file descriptors or memory.
154
    MasterClientHandler(self, connected_socket, client_address, self.family)
155

    
156
  def setup_queue(self):
157
    self.context = GanetiContext()
158
    self.request_workers = workerpool.WorkerPool("ClientReq",
159
                                                 CLIENT_REQUEST_WORKERS,
160
                                                 ClientRequestWorker)
161

    
162
  def server_cleanup(self):
163
    """Cleanup the server.
164

    
165
    This involves shutting down the processor threads and the master
166
    socket.
167

    
168
    """
169
    try:
170
      self.close()
171
    finally:
172
      if self.request_workers:
173
        self.request_workers.TerminateWorkers()
174
      if self.context:
175
        self.context.jobqueue.Shutdown()
176

    
177

    
178
class ClientOps:
179
  """Class holding high-level client operations."""
180
  def __init__(self, server):
181
    self.server = server
182

    
183
  def handle_request(self, method, args): # pylint: disable-msg=R0911
184
    queue = self.server.context.jobqueue
185

    
186
    # TODO: Parameter validation
187

    
188
    # TODO: Rewrite to not exit in each 'if/elif' branch
189

    
190
    if method == luxi.REQ_SUBMIT_JOB:
191
      logging.info("Received new job")
192
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
193
      return queue.SubmitJob(ops)
194

    
195
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
196
      logging.info("Received multiple jobs")
197
      jobs = []
198
      for ops in args:
199
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
200
      return queue.SubmitManyJobs(jobs)
201

    
202
    elif method == luxi.REQ_CANCEL_JOB:
203
      job_id = args
204
      logging.info("Received job cancel request for %s", job_id)
205
      return queue.CancelJob(job_id)
206

    
207
    elif method == luxi.REQ_ARCHIVE_JOB:
208
      job_id = args
209
      logging.info("Received job archive request for %s", job_id)
210
      return queue.ArchiveJob(job_id)
211

    
212
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
213
      (age, timeout) = args
214
      logging.info("Received job autoarchive request for age %s, timeout %s",
215
                   age, timeout)
216
      return queue.AutoArchiveJobs(age, timeout)
217

    
218
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
219
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
220
      logging.info("Received job poll request for %s", job_id)
221
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
222
                                     prev_log_serial, timeout)
223

    
224
    elif method == luxi.REQ_QUERY_JOBS:
225
      (job_ids, fields) = args
226
      if isinstance(job_ids, (tuple, list)) and job_ids:
227
        msg = utils.CommaJoin(job_ids)
228
      else:
229
        msg = str(job_ids)
230
      logging.info("Received job query request for %s", msg)
231
      return queue.QueryJobs(job_ids, fields)
232

    
233
    elif method == luxi.REQ_QUERY_INSTANCES:
234
      (names, fields, use_locking) = args
235
      logging.info("Received instance query request for %s", names)
236
      if use_locking:
237
        raise errors.OpPrereqError("Sync queries are not allowed",
238
                                   errors.ECODE_INVAL)
239
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
240
                                    use_locking=use_locking)
241
      return self._Query(op)
242

    
243
    elif method == luxi.REQ_QUERY_NODES:
244
      (names, fields, use_locking) = args
245
      logging.info("Received node query request for %s", names)
246
      if use_locking:
247
        raise errors.OpPrereqError("Sync queries are not allowed",
248
                                   errors.ECODE_INVAL)
249
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
250
                                use_locking=use_locking)
251
      return self._Query(op)
252

    
253
    elif method == luxi.REQ_QUERY_EXPORTS:
254
      nodes, use_locking = args
255
      if use_locking:
256
        raise errors.OpPrereqError("Sync queries are not allowed",
257
                                   errors.ECODE_INVAL)
258
      logging.info("Received exports query request")
259
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
260
      return self._Query(op)
261

    
262
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
263
      fields = args
264
      logging.info("Received config values query request for %s", fields)
265
      op = opcodes.OpQueryConfigValues(output_fields=fields)
266
      return self._Query(op)
267

    
268
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
269
      logging.info("Received cluster info query request")
270
      op = opcodes.OpQueryClusterInfo()
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUERY_TAGS:
274
      kind, name = args
275
      logging.info("Received tags query request")
276
      op = opcodes.OpGetTags(kind=kind, name=name)
277
      return self._Query(op)
278

    
279
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
280
      drain_flag = args
281
      logging.info("Received queue drain flag change request to %s",
282
                   drain_flag)
283
      return queue.SetDrainFlag(drain_flag)
284

    
285
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
286
      (until, ) = args
287

    
288
      if until is None:
289
        logging.info("Received request to no longer pause the watcher")
290
      else:
291
        if not isinstance(until, (int, float)):
292
          raise TypeError("Duration must be an integer or float")
293

    
294
        if until < time.time():
295
          raise errors.GenericError("Unable to set pause end time in the past")
296

    
297
        logging.info("Received request to pause the watcher until %s", until)
298

    
299
      return _SetWatcherPause(until)
300

    
301
    else:
302
      logging.info("Received invalid request '%s'", method)
303
      raise ValueError("Invalid operation '%s'" % method)
304

    
305
  def _Query(self, op):
306
    """Runs the specified opcode and returns the result.
307

    
308
    """
309
    # Queries don't have a job id
310
    proc = mcpu.Processor(self.server.context, None)
311
    return proc.ExecOpCode(op, None)
312

    
313

    
314
class GanetiContext(object):
315
  """Context common to all ganeti threads.
316

    
317
  This class creates and holds common objects shared by all threads.
318

    
319
  """
320
  # pylint: disable-msg=W0212
321
  # we do want to ensure a singleton here
322
  _instance = None
323

    
324
  def __init__(self):
325
    """Constructs a new GanetiContext object.
326

    
327
    There should be only a GanetiContext object at any time, so this
328
    function raises an error if this is not the case.
329

    
330
    """
331
    assert self.__class__._instance is None, "double GanetiContext instance"
332

    
333
    # Create global configuration object
334
    self.cfg = config.ConfigWriter()
335

    
336
    # Locking manager
337
    self.glm = locking.GanetiLockManager(
338
                self.cfg.GetNodeList(),
339
                self.cfg.GetInstanceList())
340

    
341
    # Job queue
342
    self.jobqueue = jqueue.JobQueue(self)
343

    
344
    # setting this also locks the class against attribute modifications
345
    self.__class__._instance = self
346

    
347
  def __setattr__(self, name, value):
348
    """Setting GanetiContext attributes is forbidden after initialization.
349

    
350
    """
351
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
352
    object.__setattr__(self, name, value)
353

    
354
  def AddNode(self, node, ec_id):
355
    """Adds a node to the configuration and lock manager.
356

    
357
    """
358
    # Add it to the configuration
359
    self.cfg.AddNode(node, ec_id)
360

    
361
    # If preseeding fails it'll not be added
362
    self.jobqueue.AddNode(node)
363

    
364
    # Add the new node to the Ganeti Lock Manager
365
    self.glm.add(locking.LEVEL_NODE, node.name)
366

    
367
  def ReaddNode(self, node):
368
    """Updates a node that's already in the configuration
369

    
370
    """
371
    # Synchronize the queue again
372
    self.jobqueue.AddNode(node)
373

    
374
  def RemoveNode(self, name):
375
    """Removes a node from the configuration and lock manager.
376

    
377
    """
378
    # Remove node from configuration
379
    self.cfg.RemoveNode(name)
380

    
381
    # Notify job queue
382
    self.jobqueue.RemoveNode(name)
383

    
384
    # Remove the node from the Ganeti Lock Manager
385
    self.glm.remove(locking.LEVEL_NODE, name)
386

    
387

    
388
def _SetWatcherPause(until):
389
  """Creates or removes the watcher pause file.
390

    
391
  @type until: None or int
392
  @param until: Unix timestamp saying until when the watcher shouldn't run
393

    
394
  """
395
  if until is None:
396
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
397
  else:
398
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
399
                    data="%d\n" % (until, ))
400

    
401
  return until
402

    
403

    
404
def CheckAgreement():
405
  """Check the agreement on who is the master.
406

    
407
  The function uses a very simple algorithm: we must get more positive
408
  than negative answers. Since in most of the cases we are the master,
409
  we'll use our own config file for getting the node list. In the
410
  future we could collect the current node list from our (possibly
411
  obsolete) known nodes.
412

    
413
  In order to account for cold-start of all nodes, we retry for up to
414
  a minute until we get a real answer as the top-voted one. If the
415
  nodes are more out-of-sync, for now manual startup of the master
416
  should be attempted.
417

    
418
  Note that for a even number of nodes cluster, we need at least half
419
  of the nodes (beside ourselves) to vote for us. This creates a
420
  problem on two-node clusters, since in this case we require the
421
  other node to be up too to confirm our status.
422

    
423
  """
424
  myself = utils.HostInfo().name
425
  #temp instantiation of a config writer, used only to get the node list
426
  cfg = config.ConfigWriter()
427
  node_list = cfg.GetNodeList()
428
  del cfg
429
  retries = 6
430
  while retries > 0:
431
    votes = bootstrap.GatherMasterVotes(node_list)
432
    if not votes:
433
      # empty node list, this is a one node cluster
434
      return True
435
    if votes[0][0] is None:
436
      retries -= 1
437
      time.sleep(10)
438
      continue
439
    break
440
  if retries == 0:
441
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
442
                     " after multiple retries. Aborting startup")
443
    logging.critical("Use the --no-voting option if you understand what"
444
                     " effects it has on the cluster state")
445
    return False
446
  # here a real node is at the top of the list
447
  all_votes = sum(item[1] for item in votes)
448
  top_node, top_votes = votes[0]
449

    
450
  result = False
451
  if top_node != myself:
452
    logging.critical("It seems we are not the master (top-voted node"
453
                     " is %s with %d out of %d votes)", top_node, top_votes,
454
                     all_votes)
455
  elif top_votes < all_votes - top_votes:
456
    logging.critical("It seems we are not the master (%d votes for,"
457
                     " %d votes against)", top_votes, all_votes - top_votes)
458
  else:
459
    result = True
460

    
461
  return result
462

    
463

    
464
def CheckAgreementWithRpc():
465
  rpc.Init()
466
  try:
467
    return CheckAgreement()
468
  finally:
469
    rpc.Shutdown()
470

    
471

    
472
def CheckMasterd(options, args):
473
  """Initial checks whether to run or exit with a failure.
474

    
475
  """
476
  if args: # masterd doesn't take any arguments
477
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
478
    sys.exit(constants.EXIT_FAILURE)
479

    
480
  ssconf.CheckMaster(options.debug)
481

    
482
  try:
483
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
484
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
485
  except KeyError:
486
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
487
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
488
    sys.exit(constants.EXIT_FAILURE)
489

    
490

    
491
  # If CheckMaster didn't fail we believe we are the master, but we have to
492
  # confirm with the other nodes.
493
  if options.no_voting:
494
    if options.yes_do_it:
495
      return
496

    
497
    sys.stdout.write("The 'no voting' option has been selected.\n")
498
    sys.stdout.write("This is dangerous, please confirm by"
499
                     " typing uppercase 'yes': ")
500
    sys.stdout.flush()
501

    
502
    confirmation = sys.stdin.readline().strip()
503
    if confirmation != "YES":
504
      print >> sys.stderr, "Aborting."
505
      sys.exit(constants.EXIT_FAILURE)
506

    
507
    return
508

    
509
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
510
  # process before we call utils.Daemonize in the current process.
511
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
512
    sys.exit(constants.EXIT_FAILURE)
513

    
514

    
515
def ExecMasterd (options, args): # pylint: disable-msg=W0613
516
  """Main master daemon function, executed with the PID file held.
517

    
518
  """
519
  # This is safe to do as the pid file guarantees against
520
  # concurrent execution.
521
  utils.RemoveFile(constants.MASTER_SOCKET)
522

    
523
  mainloop = daemon.Mainloop()
524
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
525
                        options.uid, options.gid)
526
  try:
527
    rpc.Init()
528
    try:
529
      # activate ip
530
      master_node = ssconf.SimpleStore().GetMasterNode()
531
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
532
      msg = result.fail_msg
533
      if msg:
534
        logging.error("Can't activate master IP address: %s", msg)
535

    
536
      master.setup_queue()
537
      try:
538
        mainloop.Run()
539
      finally:
540
        master.server_cleanup()
541
    finally:
542
      rpc.Shutdown()
543
  finally:
544
    utils.RemoveFile(constants.MASTER_SOCKET)
545

    
546

    
547
def main():
548
  """Main function"""
549
  parser = OptionParser(description="Ganeti master daemon",
550
                        usage="%prog [-f] [-d]",
551
                        version="%%prog (ganeti) %s" %
552
                        constants.RELEASE_VERSION)
553
  parser.add_option("--no-voting", dest="no_voting",
554
                    help="Do not check that the nodes agree on this node"
555
                    " being the master and start the daemon unconditionally",
556
                    default=False, action="store_true")
557
  parser.add_option("--yes-do-it", dest="yes_do_it",
558
                    help="Override interactive check for --no-voting",
559
                    default=False, action="store_true")
560
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
561
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
562
         ]
563
  daemon.GenericMain(constants.MASTERD, parser, dirs,
564
                     CheckMasterd, ExecMasterd,
565
                     multithreaded=True)
566

    
567

    
568
if __name__ == "__main__":
569
  main()