Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ a478cd7e

History | View | Annotate | Download (11.9 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39

    
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54

    
55

    
56
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
57
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
58

    
59

    
60
class IOServer(SocketServer.UnixStreamServer):
61
  """IO thread class.
62

    
63
  This class takes care of initializing the other threads, setting
64
  signal handlers (which are processed only in this thread), and doing
65
  cleanup at shutdown.
66

    
67
  """
68
  QUEUE_PROCESSOR_SIZE = 5
69

    
70
  def __init__(self, address, rqhandler, context):
71
    """IOServer constructor
72

    
73
    Args:
74
      address: the address to bind this IOServer to
75
      rqhandler: RequestHandler type object
76
      context: Context Object common to all worker threads
77

    
78
    """
79
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
80
    self.do_quit = False
81
    self.queue = jqueue.QueueManager()
82
    self.context = context
83
    self.processors = []
84
    signal.signal(signal.SIGINT, self.handle_quit_signals)
85
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
86

    
87
  def setup_processors(self):
88
    """Spawn the processors threads.
89

    
90
    This initializes the queue and the thread processors. It is done
91
    separately from the constructor because we want the clone()
92
    syscalls to happen after the daemonize part.
93

    
94
    """
95
    for i in range(self.QUEUE_PROCESSOR_SIZE):
96
      self.processors.append(threading.Thread(target=PoolWorker,
97
                                              args=(i, self.queue.new_queue,
98
                                                    self.context)))
99
    for t in self.processors:
100
      t.start()
101

    
102
  def process_request_thread(self, request, client_address):
103
    """Process the request.
104

    
105
    This is copied from the code in ThreadingMixIn.
106

    
107
    """
108
    try:
109
      self.finish_request(request, client_address)
110
      self.close_request(request)
111
    except:
112
      self.handle_error(request, client_address)
113
      self.close_request(request)
114

    
115
  def process_request(self, request, client_address):
116
    """Start a new thread to process the request.
117

    
118
    This is copied from the coode in ThreadingMixIn.
119

    
120
    """
121
    t = threading.Thread(target=self.process_request_thread,
122
                         args=(request, client_address))
123
    t.start()
124

    
125
  def handle_quit_signals(self, signum, frame):
126
    print "received %s in %s" % (signum, frame)
127
    self.do_quit = True
128

    
129
  def serve_forever(self):
130
    """Handle one request at a time until told to quit."""
131
    while not self.do_quit:
132
      self.handle_request()
133
      print "served request, quit=%s" % (self.do_quit)
134

    
135
  def server_cleanup(self):
136
    """Cleanup the server.
137

    
138
    This involves shutting down the processor threads and the master
139
    socket.
140

    
141
    """
142
    self.server_close()
143
    utils.RemoveFile(constants.MASTER_SOCKET)
144
    for i in range(self.QUEUE_PROCESSOR_SIZE):
145
      self.queue.new_queue.put(None)
146
    for idx, t in enumerate(self.processors):
147
      print "waiting for processor thread %s..." % idx
148
      t.join()
149
    print "done threads"
150

    
151

    
152
class ClientRqHandler(SocketServer.BaseRequestHandler):
153
  """Client handler"""
154
  EOM = '\3'
155
  READ_SIZE = 4096
156

    
157
  def setup(self):
158
    self._buffer = ""
159
    self._msgs = collections.deque()
160
    self._ops = ClientOps(self.server)
161

    
162
  def handle(self):
163
    while True:
164
      msg = self.read_message()
165
      if msg is None:
166
        print "client closed connection"
167
        break
168
      request = simplejson.loads(msg)
169
      if not isinstance(request, dict):
170
        print "wrong request received: %s" % msg
171
        break
172
      method = request.get('request', None)
173
      data = request.get('data', None)
174
      if method is None or data is None:
175
        print "no method or data in request"
176
        break
177
      print "request:", method, data
178
      result = self._ops.handle_request(method, data)
179
      print "result:", result
180
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
181

    
182
  def read_message(self):
183
    while not self._msgs:
184
      data = self.request.recv(self.READ_SIZE)
185
      if not data:
186
        return None
187
      new_msgs = (self._buffer + data).split(self.EOM)
188
      self._buffer = new_msgs.pop()
189
      self._msgs.extend(new_msgs)
190
    return self._msgs.popleft()
191

    
192
  def send_message(self, msg):
193
    #print "sending", msg
194
    self.request.sendall(msg + self.EOM)
195

    
196

    
197
class ClientOps:
198
  """Class holding high-level client operations."""
199
  def __init__(self, server):
200
    self.server = server
201
    self._cpu = None
202

    
203
  def _getcpu(self):
204
    if self._cpu is None:
205
      self._cpu = mcpu.Processor(lambda x: None)
206
    return self._cpu
207

    
208
  def handle_request(self, operation, args):
209
    print operation, args
210
    if operation == "submit":
211
      return self.put(args)
212
    elif operation == "query":
213
      return self.query(args)
214
    else:
215
      raise ValueError("Invalid operation")
216

    
217
  def put(self, args):
218
    job = luxi.UnserializeJob(args)
219
    rid = self.server.queue.put(job)
220
    return rid
221

    
222
  def query(self, args):
223
    path = args["object"]
224
    fields = args["fields"]
225
    names = args["names"]
226
    if path == "instances":
227
      opclass = opcodes.OpQueryInstances
228
    elif path == "jobs":
229
      # early exit because job query-ing is special (not via opcodes)
230
      return self.query_jobs(fields, names)
231
    else:
232
      raise ValueError("Invalid object %s" % path)
233

    
234
    op = opclass(output_fields = fields, names=names)
235
    cpu = self._getcpu()
236
    result = cpu.ExecOpCode(op)
237
    return result
238

    
239
  def query_jobs(self, fields, names):
240
    return self.server.queue.query_jobs(fields, names)
241

    
242

    
243
def JobRunner(proc, job, context):
244
  """Job executor.
245

    
246
  This functions processes a single job in the context of given
247
  processor instance.
248

    
249
  Args:
250
    proc: Ganeti Processor to run the job on
251
    job: The job to run (unserialized format)
252
    context: Ganeti shared context
253

    
254
  """
255
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
256
  fail = False
257
  for idx, op in enumerate(job.data.op_list):
258
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
259
    try:
260
      job.data.op_result[idx] = proc.ExecOpCode(op)
261
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
262
    except (errors.OpPrereqError, errors.OpExecError), err:
263
      fail = True
264
      job.data.op_result[idx] = str(err)
265
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
266
  if fail:
267
    job.SetStatus(opcodes.Job.STATUS_FAIL)
268
  else:
269
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
270

    
271

    
272
def PoolWorker(worker_id, incoming_queue, context):
273
  """A worker thread function.
274

    
275
  This is the actual processor of a single thread of Job execution.
276

    
277
  Args:
278
    worker_id: the unique id for this worker
279
    incoming_queue: a queue to get jobs from
280
    context: the common server context, containing all shared data and
281
             synchronization structures.
282

    
283
  """
284
  while True:
285
    print "worker %s sleeping" % worker_id
286
    item = incoming_queue.get(True)
287
    if item is None:
288
      break
289
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
290
    #utils.Lock('cmd')
291
    try:
292
      proc = mcpu.Processor(context, feedback=lambda x: None)
293
      try:
294
        JobRunner(proc, item, context)
295
      except errors.GenericError, err:
296
        msg = "ganeti exception %s" % err
297
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
298
        print msg
299
      except Exception, err:
300
        msg = "unhandled exception %s" % err
301
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
302
        print msg
303
      except:
304
        msg = "unhandled unknown exception"
305
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
306
        print msg
307

    
308
    finally:
309
      #utils.Unlock('cmd')
310
      #utils.LockCleanup()
311
      pass
312
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
313
  print "worker %s exiting" % worker_id
314

    
315

    
316
class GanetiContext(object):
317
  """Context common to all ganeti threads.
318

    
319
  This class creates and holds common objects shared by all threads.
320

    
321
  """
322
  _instance = None
323

    
324
  def __init__(self):
325
    """Constructs a new GanetiContext object.
326

    
327
    There should be only a GanetiContext object at any time, so this
328
    function raises an error if this is not the case.
329

    
330
    """
331
    assert self.__class__._instance is None, "double GanetiContext instance"
332

    
333
    # Create a ConfigWriter...
334
    self.cfg = config.ConfigWriter()
335
    # And a GanetiLockingManager...
336
    self.GLM = locking.GanetiLockManager(
337
                self.cfg.GetNodeList(),
338
                self.cfg.GetInstanceList())
339

    
340
    # setting this also locks the class against attribute modifications
341
    self.__class__._instance = self
342

    
343
  def __setattr__(self, name, value):
344
    """Setting GanetiContext attributes is forbidden after initialization.
345

    
346
    """
347
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
348
    object.__setattr__(self, name, value)
349

    
350

    
351
def CheckMaster(debug):
352
  """Checks the node setup.
353

    
354
  If this is the master, the function will return. Otherwise it will
355
  exit with an exit code based on the node status.
356

    
357
  """
358
  try:
359
    ss = ssconf.SimpleStore()
360
    master_name = ss.GetMasterNode()
361
  except errors.ConfigurationError, err:
362
    print "Cluster configuration incomplete: '%s'" % str(err)
363
    sys.exit(EXIT_NODESETUP_ERROR)
364

    
365
  try:
366
    myself = utils.HostInfo()
367
  except errors.ResolverError, err:
368
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
369
    sys.exit(EXIT_NODESETUP_ERROR)
370

    
371
  if myself.name != master_name:
372
    if debug:
373
      sys.stderr.write("Not master, exiting.\n")
374
    sys.exit(EXIT_NOTMASTER)
375

    
376

    
377
def ParseOptions():
378
  """Parse the command line options.
379

    
380
  Returns:
381
    (options, args) as from OptionParser.parse_args()
382

    
383
  """
384
  parser = OptionParser(description="Ganeti master daemon",
385
                        usage="%prog [-f] [-d]",
386
                        version="%%prog (ganeti) %s" %
387
                        constants.RELEASE_VERSION)
388

    
389
  parser.add_option("-f", "--foreground", dest="fork",
390
                    help="Don't detach from the current terminal",
391
                    default=True, action="store_false")
392
  parser.add_option("-d", "--debug", dest="debug",
393
                    help="Enable some debug messages",
394
                    default=False, action="store_true")
395
  options, args = parser.parse_args()
396
  return options, args
397

    
398

    
399
def main():
400
  """Main function"""
401

    
402
  options, args = ParseOptions()
403
  utils.debug = options.debug
404
  utils.no_fork = True
405

    
406
  CheckMaster(options.debug)
407

    
408
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
409

    
410
  # become a daemon
411
  if options.fork:
412
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
413
                    noclose_fds=[master.fileno()])
414

    
415
  try:
416
    utils.Lock('cmd', debug=options.debug)
417
  except errors.LockError, err:
418
    print >> sys.stderr, str(err)
419
    master.server_cleanup()
420
    return
421

    
422
  try:
423
    master.setup_processors()
424
    try:
425
      master.serve_forever()
426
    finally:
427
      master.server_cleanup()
428
  finally:
429
    utils.Unlock('cmd')
430
    utils.LockCleanup()
431

    
432

    
433
if __name__ == "__main__":
434
  main()