Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 0bbe448c

History | View | Annotate | Download (12.2 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import logger
55

    
56

    
57
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59

    
60

    
61
class IOServer(SocketServer.UnixStreamServer):
62
  """IO thread class.
63

    
64
  This class takes care of initializing the other threads, setting
65
  signal handlers (which are processed only in this thread), and doing
66
  cleanup at shutdown.
67

    
68
  """
69
  QUEUE_PROCESSOR_SIZE = 5
70

    
71
  def __init__(self, address, rqhandler, context):
72
    """IOServer constructor
73

    
74
    Args:
75
      address: the address to bind this IOServer to
76
      rqhandler: RequestHandler type object
77
      context: Context Object common to all worker threads
78

    
79
    """
80
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
81
    self.do_quit = False
82
    self.queue = jqueue.QueueManager()
83
    self.context = context
84
    self.processors = []
85

    
86
    # We'll only start threads once we've forked.
87
    self.jobqueue = None
88

    
89
    signal.signal(signal.SIGINT, self.handle_quit_signals)
90
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
91

    
92
  def setup_queue(self):
93
    self.jobqueue = jqueue.JobQueue(self.context)
94

    
95
  def setup_processors(self):
96
    """Spawn the processors threads.
97

    
98
    This initializes the queue and the thread processors. It is done
99
    separately from the constructor because we want the clone()
100
    syscalls to happen after the daemonize part.
101

    
102
    """
103
    for i in range(self.QUEUE_PROCESSOR_SIZE):
104
      self.processors.append(threading.Thread(target=PoolWorker,
105
                                              args=(i, self.queue.new_queue,
106
                                                    self.context)))
107
    for t in self.processors:
108
      t.start()
109

    
110
  def process_request_thread(self, request, client_address):
111
    """Process the request.
112

    
113
    This is copied from the code in ThreadingMixIn.
114

    
115
    """
116
    try:
117
      self.finish_request(request, client_address)
118
      self.close_request(request)
119
    except:
120
      self.handle_error(request, client_address)
121
      self.close_request(request)
122

    
123
  def process_request(self, request, client_address):
124
    """Start a new thread to process the request.
125

    
126
    This is copied from the coode in ThreadingMixIn.
127

    
128
    """
129
    t = threading.Thread(target=self.process_request_thread,
130
                         args=(request, client_address))
131
    t.start()
132

    
133
  def handle_quit_signals(self, signum, frame):
134
    print "received %s in %s" % (signum, frame)
135
    self.do_quit = True
136

    
137
  def serve_forever(self):
138
    """Handle one request at a time until told to quit."""
139
    while not self.do_quit:
140
      self.handle_request()
141
      print "served request, quit=%s" % (self.do_quit)
142

    
143
  def server_cleanup(self):
144
    """Cleanup the server.
145

    
146
    This involves shutting down the processor threads and the master
147
    socket.
148

    
149
    """
150
    try:
151
      self.server_close()
152
      utils.RemoveFile(constants.MASTER_SOCKET)
153
      for i in range(self.QUEUE_PROCESSOR_SIZE):
154
        self.queue.new_queue.put(None)
155
      for idx, t in enumerate(self.processors):
156
        logging.debug("waiting for processor thread %s...", idx)
157
        t.join()
158
      logging.debug("threads done")
159
    finally:
160
      if self.jobqueue:
161
        self.jobqueue.Shutdown()
162

    
163

    
164
class ClientRqHandler(SocketServer.BaseRequestHandler):
165
  """Client handler"""
166
  EOM = '\3'
167
  READ_SIZE = 4096
168

    
169
  def setup(self):
170
    self._buffer = ""
171
    self._msgs = collections.deque()
172
    self._ops = ClientOps(self.server)
173

    
174
  def handle(self):
175
    while True:
176
      msg = self.read_message()
177
      if msg is None:
178
        logging.info("client closed connection")
179
        break
180

    
181
      request = simplejson.loads(msg)
182
      logging.debug("request: %s", request)
183
      if not isinstance(request, dict):
184
        logging.error("wrong request received: %s", msg)
185
        break
186

    
187
      method = request.get(luxi.KEY_METHOD, None)
188
      args = request.get(luxi.KEY_ARGS, None)
189
      if method is None or args is None:
190
        logging.error("no method or args in request")
191
        break
192

    
193
      success = False
194
      try:
195
        result = self._ops.handle_request(method, args)
196
        success = True
197
      except:
198
        logging.error("Unexpected exception", exc_info=True)
199
        err = sys.exc_info()
200
        result = "Caught exception: %s" % str(err[1])
201

    
202
      response = {
203
        luxi.KEY_SUCCESS: success,
204
        luxi.KEY_RESULT: result,
205
        }
206
      logging.debug("response: %s", response)
207
      self.send_message(simplejson.dumps(response))
208

    
209
  def read_message(self):
210
    while not self._msgs:
211
      data = self.request.recv(self.READ_SIZE)
212
      if not data:
213
        return None
214
      new_msgs = (self._buffer + data).split(self.EOM)
215
      self._buffer = new_msgs.pop()
216
      self._msgs.extend(new_msgs)
217
    return self._msgs.popleft()
218

    
219
  def send_message(self, msg):
220
    #print "sending", msg
221
    self.request.sendall(msg + self.EOM)
222

    
223

    
224
class ClientOps:
225
  """Class holding high-level client operations."""
226
  def __init__(self, server):
227
    self.server = server
228

    
229
  def handle_request(self, method, args):
230
    queue = self.server.jobqueue
231

    
232
    # TODO: Parameter validation
233

    
234
    if method == luxi.REQ_SUBMIT_JOB:
235
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
236
      return queue.SubmitJob(ops)
237

    
238
    elif method == luxi.REQ_CANCEL_JOB:
239
      (job_id, ) = args
240
      return queue.CancelJob(job_id)
241

    
242
    elif method == luxi.REQ_ARCHIVE_JOB:
243
      (job_id, ) = args
244
      return queue.ArchiveJob(job_id)
245

    
246
    elif method == luxi.REQ_QUERY_JOBS:
247
      (job_ids, fields) = args
248
      return queue.QueryJobs(job_ids, fields)
249

    
250
    else:
251
      raise ValueError("Invalid operation")
252

    
253

    
254
def JobRunner(proc, job, context):
255
  """Job executor.
256

    
257
  This functions processes a single job in the context of given
258
  processor instance.
259

    
260
  Args:
261
    proc: Ganeti Processor to run the job on
262
    job: The job to run (unserialized format)
263
    context: Ganeti shared context
264

    
265
  """
266
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
267
  fail = False
268
  for idx, op in enumerate(job.data.op_list):
269
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
270
    try:
271
      job.data.op_result[idx] = proc.ExecOpCode(op)
272
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
273
    except (errors.OpPrereqError, errors.OpExecError), err:
274
      fail = True
275
      job.data.op_result[idx] = str(err)
276
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
277
  if fail:
278
    job.SetStatus(opcodes.Job.STATUS_FAIL)
279
  else:
280
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
281

    
282

    
283
def PoolWorker(worker_id, incoming_queue, context):
284
  """A worker thread function.
285

    
286
  This is the actual processor of a single thread of Job execution.
287

    
288
  Args:
289
    worker_id: the unique id for this worker
290
    incoming_queue: a queue to get jobs from
291
    context: the common server context, containing all shared data and
292
             synchronization structures.
293

    
294
  """
295
  while True:
296
    logging.debug("worker %s sleeping", worker_id)
297
    item = incoming_queue.get(True)
298
    if item is None:
299
      break
300
    logging.debug("worker %s processing job %s", worker_id, item.data.job_id)
301
    proc = mcpu.Processor(context, feedback=lambda x: None)
302
    try:
303
      JobRunner(proc, item, context)
304
    except errors.GenericError, err:
305
      msg = "ganeti exception"
306
      logging.error(msg, exc_info=err)
307
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
308
    except Exception, err:
309
      msg = "unhandled exception"
310
      logging.error(msg, exc_info=err)
311
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
312
    except:
313
      msg = "unhandled unknown exception"
314
      logging.error(msg, exc_info=True)
315
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
316
    logging.debug("worker %s finish job %s", worker_id, item.data.job_id)
317
  logging.debug("worker %s exiting", worker_id)
318

    
319

    
320
class GanetiContext(object):
321
  """Context common to all ganeti threads.
322

    
323
  This class creates and holds common objects shared by all threads.
324

    
325
  """
326
  _instance = None
327

    
328
  def __init__(self):
329
    """Constructs a new GanetiContext object.
330

    
331
    There should be only a GanetiContext object at any time, so this
332
    function raises an error if this is not the case.
333

    
334
    """
335
    assert self.__class__._instance is None, "double GanetiContext instance"
336

    
337
    # Create a ConfigWriter...
338
    self.cfg = config.ConfigWriter()
339
    # And a GanetiLockingManager...
340
    self.glm = locking.GanetiLockManager(
341
                self.cfg.GetNodeList(),
342
                self.cfg.GetInstanceList())
343

    
344
    # setting this also locks the class against attribute modifications
345
    self.__class__._instance = self
346

    
347
  def __setattr__(self, name, value):
348
    """Setting GanetiContext attributes is forbidden after initialization.
349

    
350
    """
351
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
352
    object.__setattr__(self, name, value)
353

    
354

    
355
def CheckMaster(debug):
356
  """Checks the node setup.
357

    
358
  If this is the master, the function will return. Otherwise it will
359
  exit with an exit code based on the node status.
360

    
361
  """
362
  try:
363
    ss = ssconf.SimpleStore()
364
    master_name = ss.GetMasterNode()
365
  except errors.ConfigurationError, err:
366
    print "Cluster configuration incomplete: '%s'" % str(err)
367
    sys.exit(EXIT_NODESETUP_ERROR)
368

    
369
  try:
370
    myself = utils.HostInfo()
371
  except errors.ResolverError, err:
372
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
373
    sys.exit(EXIT_NODESETUP_ERROR)
374

    
375
  if myself.name != master_name:
376
    if debug:
377
      sys.stderr.write("Not master, exiting.\n")
378
    sys.exit(EXIT_NOTMASTER)
379

    
380

    
381
def ParseOptions():
382
  """Parse the command line options.
383

    
384
  Returns:
385
    (options, args) as from OptionParser.parse_args()
386

    
387
  """
388
  parser = OptionParser(description="Ganeti master daemon",
389
                        usage="%prog [-f] [-d]",
390
                        version="%%prog (ganeti) %s" %
391
                        constants.RELEASE_VERSION)
392

    
393
  parser.add_option("-f", "--foreground", dest="fork",
394
                    help="Don't detach from the current terminal",
395
                    default=True, action="store_false")
396
  parser.add_option("-d", "--debug", dest="debug",
397
                    help="Enable some debug messages",
398
                    default=False, action="store_true")
399
  options, args = parser.parse_args()
400
  return options, args
401

    
402

    
403
def main():
404
  """Main function"""
405

    
406
  options, args = ParseOptions()
407
  utils.debug = options.debug
408
  utils.no_fork = True
409

    
410
  CheckMaster(options.debug)
411

    
412
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
413

    
414
  # become a daemon
415
  if options.fork:
416
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
417
                    noclose_fds=[master.fileno()])
418

    
419
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug)
420

    
421
  logger.Info("ganeti master daemon startup")
422

    
423
  try:
424
    utils.Lock('cmd', debug=options.debug)
425
  except errors.LockError, err:
426
    print >> sys.stderr, str(err)
427
    master.server_cleanup()
428
    return
429

    
430
  try:
431
    master.setup_processors()
432
    master.setup_queue()
433
    try:
434
      master.serve_forever()
435
    finally:
436
      master.server_cleanup()
437
  finally:
438
    utils.Unlock('cmd')
439
    utils.LockCleanup()
440

    
441

    
442
if __name__ == "__main__":
443
  main()