Revision 2467e0d3

b/daemons/ganeti-masterd
66 66
  cleanup at shutdown.
67 67

  
68 68
  """
69
  QUEUE_PROCESSOR_SIZE = 5
70

  
71 69
  def __init__(self, address, rqhandler, context):
72 70
    """IOServer constructor
73 71

  
......
79 77
    """
80 78
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
81 79
    self.do_quit = False
82
    self.queue = jqueue.QueueManager()
83 80
    self.context = context
84
    self.processors = []
85 81

  
86 82
    # We'll only start threads once we've forked.
87 83
    self.jobqueue = None
......
92 88
  def setup_queue(self):
93 89
    self.jobqueue = jqueue.JobQueue(self.context)
94 90

  
95
  def setup_processors(self):
96
    """Spawn the processors threads.
97

  
98
    This initializes the queue and the thread processors. It is done
99
    separately from the constructor because we want the clone()
100
    syscalls to happen after the daemonize part.
101

  
102
    """
103
    for i in range(self.QUEUE_PROCESSOR_SIZE):
104
      self.processors.append(threading.Thread(target=PoolWorker,
105
                                              args=(i, self.queue.new_queue,
106
                                                    self.context)))
107
    for t in self.processors:
108
      t.start()
109

  
110 91
  def process_request_thread(self, request, client_address):
111 92
    """Process the request.
112 93

  
......
150 131
    try:
151 132
      self.server_close()
152 133
      utils.RemoveFile(constants.MASTER_SOCKET)
153
      for i in range(self.QUEUE_PROCESSOR_SIZE):
154
        self.queue.new_queue.put(None)
155
      for idx, t in enumerate(self.processors):
156
        logging.debug("waiting for processor thread %s...", idx)
157
        t.join()
158
      logging.debug("threads done")
159 134
    finally:
160 135
      if self.jobqueue:
161 136
        self.jobqueue.Shutdown()
......
428 403
    return
429 404

  
430 405
  try:
431
    master.setup_processors()
432 406
    master.setup_queue()
433 407
    try:
434 408
      master.serve_forever()
b/lib/jqueue.py
22 22
"""Module implementing the job queue handling."""
23 23

  
24 24
import logging
25
import Queue
26 25
import threading
27 26

  
28 27
from ganeti import constants
29 28
from ganeti import workerpool
30
from ganeti import opcodes
31 29
from ganeti import errors
32 30
from ganeti import mcpu
33 31

  
......
35 33
JOBQUEUE_THREADS = 5
36 34

  
37 35

  
38
class JobObject:
39
  """In-memory job representation.
40

  
41
  This is what we use to track the user-submitted jobs (which are of
42
  class opcodes.Job).
43

  
44
  """
45
  def __init__(self, jid, jdesc):
46
    self.data = jdesc
47
    jdesc.status = opcodes.Job.STATUS_PENDING
48
    jdesc.job_id = jid
49
    jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
50
    jdesc.op_result = [None for i in jdesc.op_list]
51
    self.lock = threading.Lock()
52

  
53
  def SetStatus(self, status, result=None):
54
    self.lock.acquire()
55
    self.data.status = status
56
    if result is not None:
57
      self.data.op_result = result
58
    self.lock.release()
59

  
60
  def GetData(self):
61
    self.lock.acquire()
62
    #FIXME(iustin): make a deep copy of result
63
    result = self.data
64
    self.lock.release()
65
    return result
66

  
67

  
68
class QueueManager:
69
  """Example queue implementation.
70

  
71
  """
72
  def __init__(self):
73
    self.job_queue = {}
74
    self.jid = 1
75
    self.lock = threading.Lock()
76
    self.new_queue = Queue.Queue()
77

  
78
  def put(self, item):
79
    """Add a new job to the queue.
80

  
81
    This enters the job into our job queue and also puts it on the new
82
    queue, in order for it to be picked up by the queue processors.
83

  
84
    """
85
    self.lock.acquire()
86
    try:
87
      rid = self.jid
88
      self.jid += 1
89
      job = JobObject(rid, item)
90
      self.job_queue[rid] = job
91
    finally:
92
      self.lock.release()
93
    self.new_queue.put(job)
94
    return rid
95

  
96
  def query(self, rid):
97
    """Query a given job ID.
98

  
99
    """
100
    self.lock.acquire()
101
    result = self.job_queue.get(rid, None)
102
    self.lock.release()
103
    return result
104

  
105
  def query_jobs(self, fields, names):
106
    """Query all jobs.
107

  
108
    The fields and names parameters are similar to the ones passed to
109
    the OpQueryInstances.
110

  
111
    """
112
    result = []
113
    self.lock.acquire()
114
    if names:
115
      values = [self.job_queue[j_id] for j_id in names]
116
    else:
117
      values = self.job_queue.itervalues()
118
    try:
119
      for jobj in values:
120
        row = []
121
        jdata = jobj.data
122
        for fname in fields:
123
          if fname == "id":
124
            row.append(jdata.job_id)
125
          elif fname == "status":
126
            row.append(jdata.status)
127
          elif fname == "op_list":
128
            row.append([op.__getstate__() for op in jdata.op_list])
129
          elif fname == "op_status":
130
            row.append(jdata.op_status)
131
          elif fname == "op_result":
132
            row.append(jdata.op_result)
133
          else:
134
            raise errors.OpExecError("Invalid job query field '%s'" %
135
                                           fname)
136
        result.append(row)
137
    finally:
138
      self.lock.release()
139
    return result
140

  
141

  
142 36
class _QueuedOpCode(object):
143 37
  """Encasulates an opcode object.
144 38

  
b/lib/luxi.py
96 96
  """
97 97

  
98 98

  
99
def SerializeJob(job):
100
  """Convert a job description to a string format.
101

  
102
  """
103
  return simplejson.dumps(job.__getstate__())
104

  
105

  
106
def UnserializeJob(data):
107
  """Load a job from a string format"""
108
  try:
109
    new_data = simplejson.loads(data)
110
  except Exception, err:
111
    raise DecodingError("Error while unserializing: %s" % str(err))
112
  job = opcodes.Job()
113
  job.__setstate__(new_data)
114
  return job
115

  
116

  
117 99
class Transport:
118 100
  """Low-level transport class.
119 101

  
b/lib/opcodes.py
73 73
      setattr(self, name, state[name])
74 74

  
75 75

  
76
class Job(BaseJO):
77
  """Job definition structure
78

  
79
  The Job definitions has two sets of parameters:
80
    - the parameters of the job itself (all filled by server):
81
      - job_id,
82
      - status: pending, running, successfull, failed, aborted
83
    - opcode parameters:
84
      - op_list, list of opcodes, clients creates this
85
      - op_status, status for each opcode, server fills in
86
      - op_result, result for each opcode, server fills in
87

  
88
  """
89
  STATUS_PENDING = 1
90
  STATUS_RUNNING = 2
91
  STATUS_SUCCESS = 3
92
  STATUS_FAIL = 4
93
  STATUS_ABORT = 5
94

  
95
  __slots__ = [
96
    "job_id",
97
    "status",
98
    "op_list",
99
    "op_status",
100
    "op_result",
101
    ]
102

  
103
  def __getstate__(self):
104
    """Specialized getstate for jobs
105

  
106
    """
107
    data = BaseJO.__getstate__(self)
108
    if "op_list" in data:
109
      data["op_list"] = [op.__getstate__() for op in data["op_list"]]
110
    return data
111

  
112
  def __setstate__(self, state):
113
    """Specialized setstate for jobs
114

  
115
    """
116
    BaseJO.__setstate__(self, state)
117
    if "op_list" in state:
118
      self.op_list = [OpCode.LoadOpCode(op) for op in state["op_list"]]
119

  
120

  
121 76
class OpCode(BaseJO):
122 77
  """Abstract OpCode"""
123 78
  OP_ID = "OP_ABSTRACT"

Also available in: Unified diff