Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 3d8548c4

History | View | Annotate | Download (12.7 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import logger
55

    
56

    
57
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59

    
60

    
61
class IOServer(SocketServer.UnixStreamServer):
62
  """IO thread class.
63

    
64
  This class takes care of initializing the other threads, setting
65
  signal handlers (which are processed only in this thread), and doing
66
  cleanup at shutdown.
67

    
68
  """
69
  QUEUE_PROCESSOR_SIZE = 5
70

    
71
  def __init__(self, address, rqhandler, context):
72
    """IOServer constructor
73

    
74
    Args:
75
      address: the address to bind this IOServer to
76
      rqhandler: RequestHandler type object
77
      context: Context Object common to all worker threads
78

    
79
    """
80
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
81
    self.do_quit = False
82
    self.queue = jqueue.QueueManager()
83
    self.context = context
84
    self.processors = []
85

    
86
    # We'll only start threads once we've forked.
87
    self.jobqueue = None
88

    
89
    signal.signal(signal.SIGINT, self.handle_quit_signals)
90
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
91

    
92
  def setup_queue(self):
93
    self.jobqueue = jqueue.JobQueue(self.context)
94

    
95
  def setup_processors(self):
96
    """Spawn the processors threads.
97

    
98
    This initializes the queue and the thread processors. It is done
99
    separately from the constructor because we want the clone()
100
    syscalls to happen after the daemonize part.
101

    
102
    """
103
    for i in range(self.QUEUE_PROCESSOR_SIZE):
104
      self.processors.append(threading.Thread(target=PoolWorker,
105
                                              args=(i, self.queue.new_queue,
106
                                                    self.context)))
107
    for t in self.processors:
108
      t.start()
109

    
110
  def process_request_thread(self, request, client_address):
111
    """Process the request.
112

    
113
    This is copied from the code in ThreadingMixIn.
114

    
115
    """
116
    try:
117
      self.finish_request(request, client_address)
118
      self.close_request(request)
119
    except:
120
      self.handle_error(request, client_address)
121
      self.close_request(request)
122

    
123
  def process_request(self, request, client_address):
124
    """Start a new thread to process the request.
125

    
126
    This is copied from the coode in ThreadingMixIn.
127

    
128
    """
129
    t = threading.Thread(target=self.process_request_thread,
130
                         args=(request, client_address))
131
    t.start()
132

    
133
  def handle_quit_signals(self, signum, frame):
134
    print "received %s in %s" % (signum, frame)
135
    self.do_quit = True
136

    
137
  def serve_forever(self):
138
    """Handle one request at a time until told to quit."""
139
    while not self.do_quit:
140
      self.handle_request()
141
      print "served request, quit=%s" % (self.do_quit)
142

    
143
  def server_cleanup(self):
144
    """Cleanup the server.
145

    
146
    This involves shutting down the processor threads and the master
147
    socket.
148

    
149
    """
150
    try:
151
      self.server_close()
152
      utils.RemoveFile(constants.MASTER_SOCKET)
153
      for i in range(self.QUEUE_PROCESSOR_SIZE):
154
        self.queue.new_queue.put(None)
155
      for idx, t in enumerate(self.processors):
156
        logging.debug("waiting for processor thread %s...", idx)
157
        t.join()
158
      logging.debug("threads done")
159
    finally:
160
      if self.jobqueue:
161
        self.jobqueue.Shutdown()
162

    
163

    
164
class ClientRqHandler(SocketServer.BaseRequestHandler):
165
  """Client handler"""
166
  EOM = '\3'
167
  READ_SIZE = 4096
168

    
169
  def setup(self):
170
    self._buffer = ""
171
    self._msgs = collections.deque()
172
    self._ops = ClientOps(self.server)
173

    
174
  def handle(self):
175
    while True:
176
      msg = self.read_message()
177
      if msg is None:
178
        logging.info("client closed connection")
179
        break
180

    
181
      request = simplejson.loads(msg)
182
      logging.debug("request: %s", request)
183
      if not isinstance(request, dict):
184
        logging.error("wrong request received: %s", msg)
185
        break
186

    
187
      method = request.get(luxi.KEY_METHOD, None)
188
      args = request.get(luxi.KEY_ARGS, None)
189
      if method is None or args is None:
190
        logging.error("no method or args in request")
191
        break
192

    
193
      success = False
194
      try:
195
        result = self._ops.handle_request(method, args)
196
        success = True
197
      except:
198
        logging.error("Unexpected exception", exc_info=True)
199
        err = sys.exc_info()
200
        result = "Caught exception: %s" % str(err[1])
201

    
202
      response = {
203
        luxi.KEY_SUCCESS: success,
204
        luxi.KEY_RESULT: result,
205
        }
206
      logging.debug("response: %s", response)
207
      self.send_message(simplejson.dumps(response))
208

    
209
  def read_message(self):
210
    while not self._msgs:
211
      data = self.request.recv(self.READ_SIZE)
212
      if not data:
213
        return None
214
      new_msgs = (self._buffer + data).split(self.EOM)
215
      self._buffer = new_msgs.pop()
216
      self._msgs.extend(new_msgs)
217
    return self._msgs.popleft()
218

    
219
  def send_message(self, msg):
220
    #print "sending", msg
221
    self.request.sendall(msg + self.EOM)
222

    
223

    
224
class ClientOps:
225
  """Class holding high-level client operations."""
226
  def __init__(self, server):
227
    self.server = server
228
    self._cpu = None
229

    
230
  def _getcpu(self):
231
    if self._cpu is None:
232
      self._cpu = mcpu.Processor(lambda x: None)
233
    return self._cpu
234

    
235
  def handle_request(self, operation, args):
236
    print operation, args
237
    if operation == "submit":
238
      return self.put(args)
239
    elif operation == "query":
240
      return self.query(args)
241
    else:
242
      raise ValueError("Invalid operation")
243

    
244
  def put(self, args):
245
    job = luxi.UnserializeJob(args)
246
    rid = self.server.queue.put(job)
247
    return rid
248

    
249
  def query(self, args):
250
    path = args["object"]
251
    fields = args["fields"]
252
    names = args["names"]
253
    if path == "instances":
254
      opclass = opcodes.OpQueryInstances
255
    elif path == "jobs":
256
      # early exit because job query-ing is special (not via opcodes)
257
      return self.query_jobs(fields, names)
258
    else:
259
      raise ValueError("Invalid object %s" % path)
260

    
261
    op = opclass(output_fields = fields, names=names)
262
    cpu = self._getcpu()
263
    result = cpu.ExecOpCode(op)
264
    return result
265

    
266
  def query_jobs(self, fields, names):
267
    return self.server.queue.query_jobs(fields, names)
268

    
269

    
270
def JobRunner(proc, job, context):
271
  """Job executor.
272

    
273
  This functions processes a single job in the context of given
274
  processor instance.
275

    
276
  Args:
277
    proc: Ganeti Processor to run the job on
278
    job: The job to run (unserialized format)
279
    context: Ganeti shared context
280

    
281
  """
282
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
283
  fail = False
284
  for idx, op in enumerate(job.data.op_list):
285
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
286
    try:
287
      job.data.op_result[idx] = proc.ExecOpCode(op)
288
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
289
    except (errors.OpPrereqError, errors.OpExecError), err:
290
      fail = True
291
      job.data.op_result[idx] = str(err)
292
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
293
  if fail:
294
    job.SetStatus(opcodes.Job.STATUS_FAIL)
295
  else:
296
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
297

    
298

    
299
def PoolWorker(worker_id, incoming_queue, context):
300
  """A worker thread function.
301

    
302
  This is the actual processor of a single thread of Job execution.
303

    
304
  Args:
305
    worker_id: the unique id for this worker
306
    incoming_queue: a queue to get jobs from
307
    context: the common server context, containing all shared data and
308
             synchronization structures.
309

    
310
  """
311
  while True:
312
    logging.debug("worker %s sleeping", worker_id)
313
    item = incoming_queue.get(True)
314
    if item is None:
315
      break
316
    logging.debug("worker %s processing job %s", worker_id, item.data.job_id)
317
    proc = mcpu.Processor(context, feedback=lambda x: None)
318
    try:
319
      JobRunner(proc, item, context)
320
    except errors.GenericError, err:
321
      msg = "ganeti exception"
322
      logging.error(msg, exc_info=err)
323
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
324
    except Exception, err:
325
      msg = "unhandled exception"
326
      logging.error(msg, exc_info=err)
327
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
328
    except:
329
      msg = "unhandled unknown exception"
330
      logging.error(msg, exc_info=True)
331
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
332
    logging.debug("worker %s finish job %s", worker_id, item.data.job_id)
333
  logging.debug("worker %s exiting", worker_id)
334

    
335

    
336
class GanetiContext(object):
337
  """Context common to all ganeti threads.
338

    
339
  This class creates and holds common objects shared by all threads.
340

    
341
  """
342
  _instance = None
343

    
344
  def __init__(self):
345
    """Constructs a new GanetiContext object.
346

    
347
    There should be only a GanetiContext object at any time, so this
348
    function raises an error if this is not the case.
349

    
350
    """
351
    assert self.__class__._instance is None, "double GanetiContext instance"
352

    
353
    # Create a ConfigWriter...
354
    self.cfg = config.ConfigWriter()
355
    # And a GanetiLockingManager...
356
    self.glm = locking.GanetiLockManager(
357
                self.cfg.GetNodeList(),
358
                self.cfg.GetInstanceList())
359

    
360
    # setting this also locks the class against attribute modifications
361
    self.__class__._instance = self
362

    
363
  def __setattr__(self, name, value):
364
    """Setting GanetiContext attributes is forbidden after initialization.
365

    
366
    """
367
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
368
    object.__setattr__(self, name, value)
369

    
370

    
371
def CheckMaster(debug):
372
  """Checks the node setup.
373

    
374
  If this is the master, the function will return. Otherwise it will
375
  exit with an exit code based on the node status.
376

    
377
  """
378
  try:
379
    ss = ssconf.SimpleStore()
380
    master_name = ss.GetMasterNode()
381
  except errors.ConfigurationError, err:
382
    print "Cluster configuration incomplete: '%s'" % str(err)
383
    sys.exit(EXIT_NODESETUP_ERROR)
384

    
385
  try:
386
    myself = utils.HostInfo()
387
  except errors.ResolverError, err:
388
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
389
    sys.exit(EXIT_NODESETUP_ERROR)
390

    
391
  if myself.name != master_name:
392
    if debug:
393
      sys.stderr.write("Not master, exiting.\n")
394
    sys.exit(EXIT_NOTMASTER)
395

    
396

    
397
def ParseOptions():
398
  """Parse the command line options.
399

    
400
  Returns:
401
    (options, args) as from OptionParser.parse_args()
402

    
403
  """
404
  parser = OptionParser(description="Ganeti master daemon",
405
                        usage="%prog [-f] [-d]",
406
                        version="%%prog (ganeti) %s" %
407
                        constants.RELEASE_VERSION)
408

    
409
  parser.add_option("-f", "--foreground", dest="fork",
410
                    help="Don't detach from the current terminal",
411
                    default=True, action="store_false")
412
  parser.add_option("-d", "--debug", dest="debug",
413
                    help="Enable some debug messages",
414
                    default=False, action="store_true")
415
  options, args = parser.parse_args()
416
  return options, args
417

    
418

    
419
def main():
420
  """Main function"""
421

    
422
  options, args = ParseOptions()
423
  utils.debug = options.debug
424
  utils.no_fork = True
425

    
426
  CheckMaster(options.debug)
427

    
428
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
429

    
430
  # become a daemon
431
  if options.fork:
432
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
433
                    noclose_fds=[master.fileno()])
434

    
435
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug)
436

    
437
  logger.Info("ganeti master daemon startup")
438

    
439
  try:
440
    utils.Lock('cmd', debug=options.debug)
441
  except errors.LockError, err:
442
    print >> sys.stderr, str(err)
443
    master.server_cleanup()
444
    return
445

    
446
  try:
447
    master.setup_processors()
448
    master.setup_queue()
449
    try:
450
      master.serve_forever()
451
    finally:
452
      master.server_cleanup()
453
  finally:
454
    utils.Unlock('cmd')
455
    utils.LockCleanup()
456

    
457

    
458
if __name__ == "__main__":
459
  main()