Revision 2467e0d3 lib/jqueue.py

b/lib/jqueue.py
22 22
"""Module implementing the job queue handling."""
23 23

  
24 24
import logging
25
import Queue
26 25
import threading
27 26

  
28 27
from ganeti import constants
29 28
from ganeti import workerpool
30
from ganeti import opcodes
31 29
from ganeti import errors
32 30
from ganeti import mcpu
33 31

  
......
35 33
JOBQUEUE_THREADS = 5
36 34

  
37 35

  
38
class JobObject:
39
  """In-memory job representation.
40

  
41
  This is what we use to track the user-submitted jobs (which are of
42
  class opcodes.Job).
43

  
44
  """
45
  def __init__(self, jid, jdesc):
46
    self.data = jdesc
47
    jdesc.status = opcodes.Job.STATUS_PENDING
48
    jdesc.job_id = jid
49
    jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
50
    jdesc.op_result = [None for i in jdesc.op_list]
51
    self.lock = threading.Lock()
52

  
53
  def SetStatus(self, status, result=None):
54
    self.lock.acquire()
55
    self.data.status = status
56
    if result is not None:
57
      self.data.op_result = result
58
    self.lock.release()
59

  
60
  def GetData(self):
61
    self.lock.acquire()
62
    #FIXME(iustin): make a deep copy of result
63
    result = self.data
64
    self.lock.release()
65
    return result
66

  
67

  
68
class QueueManager:
69
  """Example queue implementation.
70

  
71
  """
72
  def __init__(self):
73
    self.job_queue = {}
74
    self.jid = 1
75
    self.lock = threading.Lock()
76
    self.new_queue = Queue.Queue()
77

  
78
  def put(self, item):
79
    """Add a new job to the queue.
80

  
81
    This enters the job into our job queue and also puts it on the new
82
    queue, in order for it to be picked up by the queue processors.
83

  
84
    """
85
    self.lock.acquire()
86
    try:
87
      rid = self.jid
88
      self.jid += 1
89
      job = JobObject(rid, item)
90
      self.job_queue[rid] = job
91
    finally:
92
      self.lock.release()
93
    self.new_queue.put(job)
94
    return rid
95

  
96
  def query(self, rid):
97
    """Query a given job ID.
98

  
99
    """
100
    self.lock.acquire()
101
    result = self.job_queue.get(rid, None)
102
    self.lock.release()
103
    return result
104

  
105
  def query_jobs(self, fields, names):
106
    """Query all jobs.
107

  
108
    The fields and names parameters are similar to the ones passed to
109
    the OpQueryInstances.
110

  
111
    """
112
    result = []
113
    self.lock.acquire()
114
    if names:
115
      values = [self.job_queue[j_id] for j_id in names]
116
    else:
117
      values = self.job_queue.itervalues()
118
    try:
119
      for jobj in values:
120
        row = []
121
        jdata = jobj.data
122
        for fname in fields:
123
          if fname == "id":
124
            row.append(jdata.job_id)
125
          elif fname == "status":
126
            row.append(jdata.status)
127
          elif fname == "op_list":
128
            row.append([op.__getstate__() for op in jdata.op_list])
129
          elif fname == "op_status":
130
            row.append(jdata.op_status)
131
          elif fname == "op_result":
132
            row.append(jdata.op_result)
133
          else:
134
            raise errors.OpExecError("Invalid job query field '%s'" %
135
                                           fname)
136
        result.append(row)
137
    finally:
138
      self.lock.release()
139
    return result
140

  
141

  
142 36
class _QueuedOpCode(object):
143 37
  """Encasulates an opcode object.
144 38

  

Also available in: Unified diff