Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 50a3fbb2

History | View | Annotate | Download (12.3 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import logger
55

    
56

    
57
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59

    
60

    
61
class IOServer(SocketServer.UnixStreamServer):
62
  """IO thread class.
63

    
64
  This class takes care of initializing the other threads, setting
65
  signal handlers (which are processed only in this thread), and doing
66
  cleanup at shutdown.
67

    
68
  """
69
  QUEUE_PROCESSOR_SIZE = 5
70

    
71
  def __init__(self, address, rqhandler, context):
72
    """IOServer constructor
73

    
74
    Args:
75
      address: the address to bind this IOServer to
76
      rqhandler: RequestHandler type object
77
      context: Context Object common to all worker threads
78

    
79
    """
80
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
81
    self.do_quit = False
82
    self.queue = jqueue.QueueManager()
83
    self.context = context
84
    self.processors = []
85

    
86
    # We'll only start threads once we've forked.
87
    self.jobqueue = None
88

    
89
    signal.signal(signal.SIGINT, self.handle_quit_signals)
90
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
91

    
92
  def setup_queue(self):
93
    self.jobqueue = jqueue.JobQueue(self.context)
94

    
95
  def setup_processors(self):
96
    """Spawn the processors threads.
97

    
98
    This initializes the queue and the thread processors. It is done
99
    separately from the constructor because we want the clone()
100
    syscalls to happen after the daemonize part.
101

    
102
    """
103
    for i in range(self.QUEUE_PROCESSOR_SIZE):
104
      self.processors.append(threading.Thread(target=PoolWorker,
105
                                              args=(i, self.queue.new_queue,
106
                                                    self.context)))
107
    for t in self.processors:
108
      t.start()
109

    
110
  def process_request_thread(self, request, client_address):
111
    """Process the request.
112

    
113
    This is copied from the code in ThreadingMixIn.
114

    
115
    """
116
    try:
117
      self.finish_request(request, client_address)
118
      self.close_request(request)
119
    except:
120
      self.handle_error(request, client_address)
121
      self.close_request(request)
122

    
123
  def process_request(self, request, client_address):
124
    """Start a new thread to process the request.
125

    
126
    This is copied from the coode in ThreadingMixIn.
127

    
128
    """
129
    t = threading.Thread(target=self.process_request_thread,
130
                         args=(request, client_address))
131
    t.start()
132

    
133
  def handle_quit_signals(self, signum, frame):
134
    print "received %s in %s" % (signum, frame)
135
    self.do_quit = True
136

    
137
  def serve_forever(self):
138
    """Handle one request at a time until told to quit."""
139
    while not self.do_quit:
140
      self.handle_request()
141
      print "served request, quit=%s" % (self.do_quit)
142

    
143
  def server_cleanup(self):
144
    """Cleanup the server.
145

    
146
    This involves shutting down the processor threads and the master
147
    socket.
148

    
149
    """
150
    try:
151
      self.server_close()
152
      utils.RemoveFile(constants.MASTER_SOCKET)
153
      for i in range(self.QUEUE_PROCESSOR_SIZE):
154
        self.queue.new_queue.put(None)
155
      for idx, t in enumerate(self.processors):
156
        logging.debug("waiting for processor thread %s...", idx)
157
        t.join()
158
      logging.debug("threads done")
159
    finally:
160
      if self.jobqueue:
161
        self.jobqueue.Shutdown()
162

    
163

    
164
class ClientRqHandler(SocketServer.BaseRequestHandler):
165
  """Client handler"""
166
  EOM = '\3'
167
  READ_SIZE = 4096
168

    
169
  def setup(self):
170
    self._buffer = ""
171
    self._msgs = collections.deque()
172
    self._ops = ClientOps(self.server)
173

    
174
  def handle(self):
175
    while True:
176
      msg = self.read_message()
177
      if msg is None:
178
        print "client closed connection"
179
        break
180
      request = simplejson.loads(msg)
181
      if not isinstance(request, dict):
182
        print "wrong request received: %s" % msg
183
        break
184
      method = request.get('request', None)
185
      data = request.get('data', None)
186
      if method is None or data is None:
187
        print "no method or data in request"
188
        break
189
      print "request:", method, data
190
      result = self._ops.handle_request(method, data)
191
      print "result:", result
192
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
193

    
194
  def read_message(self):
195
    while not self._msgs:
196
      data = self.request.recv(self.READ_SIZE)
197
      if not data:
198
        return None
199
      new_msgs = (self._buffer + data).split(self.EOM)
200
      self._buffer = new_msgs.pop()
201
      self._msgs.extend(new_msgs)
202
    return self._msgs.popleft()
203

    
204
  def send_message(self, msg):
205
    #print "sending", msg
206
    self.request.sendall(msg + self.EOM)
207

    
208

    
209
class ClientOps:
210
  """Class holding high-level client operations."""
211
  def __init__(self, server):
212
    self.server = server
213
    self._cpu = None
214

    
215
  def _getcpu(self):
216
    if self._cpu is None:
217
      self._cpu = mcpu.Processor(lambda x: None)
218
    return self._cpu
219

    
220
  def handle_request(self, operation, args):
221
    print operation, args
222
    if operation == "submit":
223
      return self.put(args)
224
    elif operation == "query":
225
      return self.query(args)
226
    else:
227
      raise ValueError("Invalid operation")
228

    
229
  def put(self, args):
230
    job = luxi.UnserializeJob(args)
231
    rid = self.server.queue.put(job)
232
    return rid
233

    
234
  def query(self, args):
235
    path = args["object"]
236
    fields = args["fields"]
237
    names = args["names"]
238
    if path == "instances":
239
      opclass = opcodes.OpQueryInstances
240
    elif path == "jobs":
241
      # early exit because job query-ing is special (not via opcodes)
242
      return self.query_jobs(fields, names)
243
    else:
244
      raise ValueError("Invalid object %s" % path)
245

    
246
    op = opclass(output_fields = fields, names=names)
247
    cpu = self._getcpu()
248
    result = cpu.ExecOpCode(op)
249
    return result
250

    
251
  def query_jobs(self, fields, names):
252
    return self.server.queue.query_jobs(fields, names)
253

    
254

    
255
def JobRunner(proc, job, context):
256
  """Job executor.
257

    
258
  This functions processes a single job in the context of given
259
  processor instance.
260

    
261
  Args:
262
    proc: Ganeti Processor to run the job on
263
    job: The job to run (unserialized format)
264
    context: Ganeti shared context
265

    
266
  """
267
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
268
  fail = False
269
  for idx, op in enumerate(job.data.op_list):
270
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
271
    try:
272
      job.data.op_result[idx] = proc.ExecOpCode(op)
273
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
274
    except (errors.OpPrereqError, errors.OpExecError), err:
275
      fail = True
276
      job.data.op_result[idx] = str(err)
277
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
278
  if fail:
279
    job.SetStatus(opcodes.Job.STATUS_FAIL)
280
  else:
281
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
282

    
283

    
284
def PoolWorker(worker_id, incoming_queue, context):
285
  """A worker thread function.
286

    
287
  This is the actual processor of a single thread of Job execution.
288

    
289
  Args:
290
    worker_id: the unique id for this worker
291
    incoming_queue: a queue to get jobs from
292
    context: the common server context, containing all shared data and
293
             synchronization structures.
294

    
295
  """
296
  while True:
297
    logging.debug("worker %s sleeping", worker_id)
298
    item = incoming_queue.get(True)
299
    if item is None:
300
      break
301
    logging.debug("worker %s processing job %s", worker_id, item.data.job_id)
302
    proc = mcpu.Processor(context, feedback=lambda x: None)
303
    try:
304
      JobRunner(proc, item, context)
305
    except errors.GenericError, err:
306
      msg = "ganeti exception"
307
      logging.error(msg, exc_info=err)
308
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
309
    except Exception, err:
310
      msg = "unhandled exception"
311
      logging.error(msg, exc_info=err)
312
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
313
    except:
314
      msg = "unhandled unknown exception"
315
      logging.error(msg, exc_info=True)
316
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
317
    logging.debug("worker %s finish job %s", worker_id, item.data.job_id)
318
  logging.debug("worker %s exiting", worker_id)
319

    
320

    
321
class GanetiContext(object):
322
  """Context common to all ganeti threads.
323

    
324
  This class creates and holds common objects shared by all threads.
325

    
326
  """
327
  _instance = None
328

    
329
  def __init__(self):
330
    """Constructs a new GanetiContext object.
331

    
332
    There should be only a GanetiContext object at any time, so this
333
    function raises an error if this is not the case.
334

    
335
    """
336
    assert self.__class__._instance is None, "double GanetiContext instance"
337

    
338
    # Create a ConfigWriter...
339
    self.cfg = config.ConfigWriter()
340
    # And a GanetiLockingManager...
341
    self.glm = locking.GanetiLockManager(
342
                self.cfg.GetNodeList(),
343
                self.cfg.GetInstanceList())
344

    
345
    # setting this also locks the class against attribute modifications
346
    self.__class__._instance = self
347

    
348
  def __setattr__(self, name, value):
349
    """Setting GanetiContext attributes is forbidden after initialization.
350

    
351
    """
352
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
353
    object.__setattr__(self, name, value)
354

    
355

    
356
def CheckMaster(debug):
357
  """Checks the node setup.
358

    
359
  If this is the master, the function will return. Otherwise it will
360
  exit with an exit code based on the node status.
361

    
362
  """
363
  try:
364
    ss = ssconf.SimpleStore()
365
    master_name = ss.GetMasterNode()
366
  except errors.ConfigurationError, err:
367
    print "Cluster configuration incomplete: '%s'" % str(err)
368
    sys.exit(EXIT_NODESETUP_ERROR)
369

    
370
  try:
371
    myself = utils.HostInfo()
372
  except errors.ResolverError, err:
373
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
374
    sys.exit(EXIT_NODESETUP_ERROR)
375

    
376
  if myself.name != master_name:
377
    if debug:
378
      sys.stderr.write("Not master, exiting.\n")
379
    sys.exit(EXIT_NOTMASTER)
380

    
381

    
382
def ParseOptions():
383
  """Parse the command line options.
384

    
385
  Returns:
386
    (options, args) as from OptionParser.parse_args()
387

    
388
  """
389
  parser = OptionParser(description="Ganeti master daemon",
390
                        usage="%prog [-f] [-d]",
391
                        version="%%prog (ganeti) %s" %
392
                        constants.RELEASE_VERSION)
393

    
394
  parser.add_option("-f", "--foreground", dest="fork",
395
                    help="Don't detach from the current terminal",
396
                    default=True, action="store_false")
397
  parser.add_option("-d", "--debug", dest="debug",
398
                    help="Enable some debug messages",
399
                    default=False, action="store_true")
400
  options, args = parser.parse_args()
401
  return options, args
402

    
403

    
404
def main():
405
  """Main function"""
406

    
407
  options, args = ParseOptions()
408
  utils.debug = options.debug
409
  utils.no_fork = True
410

    
411
  CheckMaster(options.debug)
412

    
413
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
414

    
415
  # become a daemon
416
  if options.fork:
417
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
418
                    noclose_fds=[master.fileno()])
419

    
420
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug)
421

    
422
  logger.Info("ganeti master daemon startup")
423

    
424
  try:
425
    utils.Lock('cmd', debug=options.debug)
426
  except errors.LockError, err:
427
    print >> sys.stderr, str(err)
428
    master.server_cleanup()
429
    return
430

    
431
  try:
432
    master.setup_processors()
433
    master.setup_queue()
434
    try:
435
      master.serve_forever()
436
    finally:
437
      master.server_cleanup()
438
  finally:
439
    utils.Unlock('cmd')
440
    utils.LockCleanup()
441

    
442

    
443
if __name__ == "__main__":
444
  main()