Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 19b9ba9a

History | View | Annotate | Download (18 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  # pylint: disable-msg=W0221
68
  def RunTask(self, server, message, client):
69
    """Process the request.
70

    
71
    """
72
    client_ops = ClientOps(server)
73

    
74
    try:
75
      (method, args) = luxi.ParseRequest(message)
76
    except luxi.ProtocolError, err:
77
      logging.error("Protocol Error: %s", err)
78
      client.close_log()
79
      return
80

    
81
    success = False
82
    try:
83
      result = client_ops.handle_request(method, args)
84
      success = True
85
    except errors.GenericError, err:
86
      logging.exception("Unexpected exception")
87
      success = False
88
      result = errors.EncodeException(err)
89
    except:
90
      logging.exception("Unexpected exception")
91
      err = sys.exc_info()
92
      result = "Caught exception: %s" % str(err[1])
93

    
94
    try:
95
      reply = luxi.FormatResponse(success, result)
96
      client.send_message(reply)
97
      # awake the main thread so that it can write out the data.
98
      server.awaker.signal()
99
    except: # pylint: disable-msg=W0702
100
      logging.exception("Send error")
101
      client.close_log()
102

    
103

    
104
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
105
  """Handler for master peers.
106

    
107
  """
108
  _MAX_UNHANDLED = 1
109
  def __init__(self, server, connected_socket, client_address, family):
110
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
111
                                                 client_address,
112
                                                 constants.LUXI_EOM,
113
                                                 family, self._MAX_UNHANDLED)
114
    self.server = server
115

    
116
  def handle_message(self, message, _):
117
    self.server.request_workers.AddTask((self.server, message, self))
118

    
119

    
120
class MasterServer(daemon.AsyncStreamServer):
121
  """Master Server.
122

    
123
  This is the main asynchronous master server. It handles connections to the
124
  master socket.
125

    
126
  """
127
  family = socket.AF_UNIX
128

    
129
  def __init__(self, mainloop, address, uid, gid):
130
    """MasterServer constructor
131

    
132
    @type mainloop: ganeti.daemon.Mainloop
133
    @param mainloop: Mainloop used to poll for I/O events
134
    @param address: the unix socket address to bind the MasterServer to
135
    @param uid: The uid of the owner of the socket
136
    @param gid: The gid of the owner of the socket
137

    
138
    """
139
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
140
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
141
    os.chmod(temp_name, 0770)
142
    os.chown(temp_name, uid, gid)
143
    os.rename(temp_name, address)
144

    
145
    self.mainloop = mainloop
146
    self.awaker = daemon.AsyncAwaker()
147

    
148
    # We'll only start threads once we've forked.
149
    self.context = None
150
    self.request_workers = None
151

    
152
  def handle_connection(self, connected_socket, client_address):
153
    # TODO: add connection count and limit the number of open connections to a
154
    # maximum number to avoid breaking for lack of file descriptors or memory.
155
    MasterClientHandler(self, connected_socket, client_address, self.family)
156

    
157
  def setup_queue(self):
158
    self.context = GanetiContext()
159
    self.request_workers = workerpool.WorkerPool("ClientReq",
160
                                                 CLIENT_REQUEST_WORKERS,
161
                                                 ClientRequestWorker)
162

    
163
  def server_cleanup(self):
164
    """Cleanup the server.
165

    
166
    This involves shutting down the processor threads and the master
167
    socket.
168

    
169
    """
170
    try:
171
      self.close()
172
    finally:
173
      if self.request_workers:
174
        self.request_workers.TerminateWorkers()
175
      if self.context:
176
        self.context.jobqueue.Shutdown()
177

    
178

    
179
class ClientOps:
180
  """Class holding high-level client operations."""
181
  def __init__(self, server):
182
    self.server = server
183

    
184
  def handle_request(self, method, args): # pylint: disable-msg=R0911
185
    queue = self.server.context.jobqueue
186

    
187
    # TODO: Parameter validation
188

    
189
    # TODO: Rewrite to not exit in each 'if/elif' branch
190

    
191
    if method == luxi.REQ_SUBMIT_JOB:
192
      logging.info("Received new job")
193
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
194
      return queue.SubmitJob(ops)
195

    
196
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
197
      logging.info("Received multiple jobs")
198
      jobs = []
199
      for ops in args:
200
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
201
      return queue.SubmitManyJobs(jobs)
202

    
203
    elif method == luxi.REQ_CANCEL_JOB:
204
      job_id = args
205
      logging.info("Received job cancel request for %s", job_id)
206
      return queue.CancelJob(job_id)
207

    
208
    elif method == luxi.REQ_ARCHIVE_JOB:
209
      job_id = args
210
      logging.info("Received job archive request for %s", job_id)
211
      return queue.ArchiveJob(job_id)
212

    
213
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
214
      (age, timeout) = args
215
      logging.info("Received job autoarchive request for age %s, timeout %s",
216
                   age, timeout)
217
      return queue.AutoArchiveJobs(age, timeout)
218

    
219
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
221
      logging.info("Received job poll request for %s", job_id)
222
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
223
                                     prev_log_serial, timeout)
224

    
225
    elif method == luxi.REQ_QUERY_JOBS:
226
      (job_ids, fields) = args
227
      if isinstance(job_ids, (tuple, list)) and job_ids:
228
        msg = utils.CommaJoin(job_ids)
229
      else:
230
        msg = str(job_ids)
231
      logging.info("Received job query request for %s", msg)
232
      return queue.QueryJobs(job_ids, fields)
233

    
234
    elif method == luxi.REQ_QUERY_INSTANCES:
235
      (names, fields, use_locking) = args
236
      logging.info("Received instance query request for %s", names)
237
      if use_locking:
238
        raise errors.OpPrereqError("Sync queries are not allowed",
239
                                   errors.ECODE_INVAL)
240
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
241
                                    use_locking=use_locking)
242
      return self._Query(op)
243

    
244
    elif method == luxi.REQ_QUERY_NODES:
245
      (names, fields, use_locking) = args
246
      logging.info("Received node query request for %s", names)
247
      if use_locking:
248
        raise errors.OpPrereqError("Sync queries are not allowed",
249
                                   errors.ECODE_INVAL)
250
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
251
                                use_locking=use_locking)
252
      return self._Query(op)
253

    
254
    elif method == luxi.REQ_QUERY_EXPORTS:
255
      nodes, use_locking = args
256
      if use_locking:
257
        raise errors.OpPrereqError("Sync queries are not allowed",
258
                                   errors.ECODE_INVAL)
259
      logging.info("Received exports query request")
260
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
261
      return self._Query(op)
262

    
263
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
264
      fields = args
265
      logging.info("Received config values query request for %s", fields)
266
      op = opcodes.OpQueryConfigValues(output_fields=fields)
267
      return self._Query(op)
268

    
269
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
270
      logging.info("Received cluster info query request")
271
      op = opcodes.OpQueryClusterInfo()
272
      return self._Query(op)
273

    
274
    elif method == luxi.REQ_QUERY_TAGS:
275
      kind, name = args
276
      logging.info("Received tags query request")
277
      op = opcodes.OpGetTags(kind=kind, name=name)
278
      return self._Query(op)
279

    
280
    elif method == luxi.REQ_QUERY_LOCKS:
281
      (fields, sync) = args
282
      logging.info("Received locks query request")
283
      return self.server.context.glm.QueryLocks(fields, sync)
284

    
285
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
286
      drain_flag = args
287
      logging.info("Received queue drain flag change request to %s",
288
                   drain_flag)
289
      return queue.SetDrainFlag(drain_flag)
290

    
291
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
292
      (until, ) = args
293

    
294
      if until is None:
295
        logging.info("Received request to no longer pause the watcher")
296
      else:
297
        if not isinstance(until, (int, float)):
298
          raise TypeError("Duration must be an integer or float")
299

    
300
        if until < time.time():
301
          raise errors.GenericError("Unable to set pause end time in the past")
302

    
303
        logging.info("Received request to pause the watcher until %s", until)
304

    
305
      return _SetWatcherPause(until)
306

    
307
    else:
308
      logging.info("Received invalid request '%s'", method)
309
      raise ValueError("Invalid operation '%s'" % method)
310

    
311
  def _Query(self, op):
312
    """Runs the specified opcode and returns the result.
313

    
314
    """
315
    # Queries don't have a job id
316
    proc = mcpu.Processor(self.server.context, None)
317
    return proc.ExecOpCode(op, None)
318

    
319

    
320
class GanetiContext(object):
321
  """Context common to all ganeti threads.
322

    
323
  This class creates and holds common objects shared by all threads.
324

    
325
  """
326
  # pylint: disable-msg=W0212
327
  # we do want to ensure a singleton here
328
  _instance = None
329

    
330
  def __init__(self):
331
    """Constructs a new GanetiContext object.
332

    
333
    There should be only a GanetiContext object at any time, so this
334
    function raises an error if this is not the case.
335

    
336
    """
337
    assert self.__class__._instance is None, "double GanetiContext instance"
338

    
339
    # Create global configuration object
340
    self.cfg = config.ConfigWriter()
341

    
342
    # Locking manager
343
    self.glm = locking.GanetiLockManager(
344
                self.cfg.GetNodeList(),
345
                self.cfg.GetInstanceList())
346

    
347
    # Job queue
348
    self.jobqueue = jqueue.JobQueue(self)
349

    
350
    # setting this also locks the class against attribute modifications
351
    self.__class__._instance = self
352

    
353
  def __setattr__(self, name, value):
354
    """Setting GanetiContext attributes is forbidden after initialization.
355

    
356
    """
357
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
358
    object.__setattr__(self, name, value)
359

    
360
  def AddNode(self, node, ec_id):
361
    """Adds a node to the configuration and lock manager.
362

    
363
    """
364
    # Add it to the configuration
365
    self.cfg.AddNode(node, ec_id)
366

    
367
    # If preseeding fails it'll not be added
368
    self.jobqueue.AddNode(node)
369

    
370
    # Add the new node to the Ganeti Lock Manager
371
    self.glm.add(locking.LEVEL_NODE, node.name)
372

    
373
  def ReaddNode(self, node):
374
    """Updates a node that's already in the configuration
375

    
376
    """
377
    # Synchronize the queue again
378
    self.jobqueue.AddNode(node)
379

    
380
  def RemoveNode(self, name):
381
    """Removes a node from the configuration and lock manager.
382

    
383
    """
384
    # Remove node from configuration
385
    self.cfg.RemoveNode(name)
386

    
387
    # Notify job queue
388
    self.jobqueue.RemoveNode(name)
389

    
390
    # Remove the node from the Ganeti Lock Manager
391
    self.glm.remove(locking.LEVEL_NODE, name)
392

    
393

    
394
def _SetWatcherPause(until):
395
  """Creates or removes the watcher pause file.
396

    
397
  @type until: None or int
398
  @param until: Unix timestamp saying until when the watcher shouldn't run
399

    
400
  """
401
  if until is None:
402
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
403
  else:
404
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
405
                    data="%d\n" % (until, ))
406

    
407
  return until
408

    
409

    
410
@rpc.RunWithRPC
411
def CheckAgreement():
412
  """Check the agreement on who is the master.
413

    
414
  The function uses a very simple algorithm: we must get more positive
415
  than negative answers. Since in most of the cases we are the master,
416
  we'll use our own config file for getting the node list. In the
417
  future we could collect the current node list from our (possibly
418
  obsolete) known nodes.
419

    
420
  In order to account for cold-start of all nodes, we retry for up to
421
  a minute until we get a real answer as the top-voted one. If the
422
  nodes are more out-of-sync, for now manual startup of the master
423
  should be attempted.
424

    
425
  Note that for a even number of nodes cluster, we need at least half
426
  of the nodes (beside ourselves) to vote for us. This creates a
427
  problem on two-node clusters, since in this case we require the
428
  other node to be up too to confirm our status.
429

    
430
  """
431
  myself = netutils.HostInfo().name
432
  #temp instantiation of a config writer, used only to get the node list
433
  cfg = config.ConfigWriter()
434
  node_list = cfg.GetNodeList()
435
  del cfg
436
  retries = 6
437
  while retries > 0:
438
    votes = bootstrap.GatherMasterVotes(node_list)
439
    if not votes:
440
      # empty node list, this is a one node cluster
441
      return True
442
    if votes[0][0] is None:
443
      retries -= 1
444
      time.sleep(10)
445
      continue
446
    break
447
  if retries == 0:
448
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
449
                     " after multiple retries. Aborting startup")
450
    logging.critical("Use the --no-voting option if you understand what"
451
                     " effects it has on the cluster state")
452
    return False
453
  # here a real node is at the top of the list
454
  all_votes = sum(item[1] for item in votes)
455
  top_node, top_votes = votes[0]
456

    
457
  result = False
458
  if top_node != myself:
459
    logging.critical("It seems we are not the master (top-voted node"
460
                     " is %s with %d out of %d votes)", top_node, top_votes,
461
                     all_votes)
462
  elif top_votes < all_votes - top_votes:
463
    logging.critical("It seems we are not the master (%d votes for,"
464
                     " %d votes against)", top_votes, all_votes - top_votes)
465
  else:
466
    result = True
467

    
468
  return result
469

    
470

    
471
@rpc.RunWithRPC
472
def ActivateMasterIP():
473
  # activate ip
474
  master_node = ssconf.SimpleStore().GetMasterNode()
475
  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
476
  msg = result.fail_msg
477
  if msg:
478
    logging.error("Can't activate master IP address: %s", msg)
479

    
480

    
481
def CheckMasterd(options, args):
482
  """Initial checks whether to run or exit with a failure.
483

    
484
  """
485
  if args: # masterd doesn't take any arguments
486
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
487
    sys.exit(constants.EXIT_FAILURE)
488

    
489
  ssconf.CheckMaster(options.debug)
490

    
491
  try:
492
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
493
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
494
  except KeyError:
495
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
496
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
497
    sys.exit(constants.EXIT_FAILURE)
498

    
499

    
500
  # If CheckMaster didn't fail we believe we are the master, but we have to
501
  # confirm with the other nodes.
502
  if options.no_voting:
503
    if options.yes_do_it:
504
      return
505

    
506
    sys.stdout.write("The 'no voting' option has been selected.\n")
507
    sys.stdout.write("This is dangerous, please confirm by"
508
                     " typing uppercase 'yes': ")
509
    sys.stdout.flush()
510

    
511
    confirmation = sys.stdin.readline().strip()
512
    if confirmation != "YES":
513
      print >> sys.stderr, "Aborting."
514
      sys.exit(constants.EXIT_FAILURE)
515

    
516
    return
517

    
518
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
519
  # process before we call utils.Daemonize in the current process.
520
  if not utils.RunInSeparateProcess(CheckAgreement):
521
    sys.exit(constants.EXIT_FAILURE)
522

    
523
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
524
  # separate process.
525

    
526
  # TODO: decide whether failure to activate the master IP is a fatal error
527
  utils.RunInSeparateProcess(ActivateMasterIP)
528

    
529

    
530
def ExecMasterd(options, args): # pylint: disable-msg=W0613
531
  """Main master daemon function, executed with the PID file held.
532

    
533
  """
534
  # This is safe to do as the pid file guarantees against
535
  # concurrent execution.
536
  utils.RemoveFile(constants.MASTER_SOCKET)
537

    
538
  mainloop = daemon.Mainloop()
539
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
540
                        options.uid, options.gid)
541
  try:
542
    rpc.Init()
543
    try:
544
      master.setup_queue()
545
      try:
546
        mainloop.Run()
547
      finally:
548
        master.server_cleanup()
549
    finally:
550
      rpc.Shutdown()
551
  finally:
552
    utils.RemoveFile(constants.MASTER_SOCKET)
553

    
554

    
555
def main():
556
  """Main function"""
557
  parser = OptionParser(description="Ganeti master daemon",
558
                        usage="%prog [-f] [-d]",
559
                        version="%%prog (ganeti) %s" %
560
                        constants.RELEASE_VERSION)
561
  parser.add_option("--no-voting", dest="no_voting",
562
                    help="Do not check that the nodes agree on this node"
563
                    " being the master and start the daemon unconditionally",
564
                    default=False, action="store_true")
565
  parser.add_option("--yes-do-it", dest="yes_do_it",
566
                    help="Override interactive check for --no-voting",
567
                    default=False, action="store_true")
568
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
569
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
570
         ]
571
  daemon.GenericMain(constants.MASTERD, parser, dirs,
572
                     CheckMasterd, ExecMasterd,
573
                     multithreaded=True)
574

    
575

    
576
if __name__ == "__main__":
577
  main()