Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b74159ee

History | View | Annotate | Download (3.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import threading
25
import Queue
26

    
27
from ganeti import opcodes
28
from ganeti import errors
29

    
30
class JobObject:
31
  """In-memory job representation.
32

33
  This is what we use to track the user-submitted jobs (which are of
34
  class opcodes.Job).
35

36
  """
37
  def __init__(self, jid, jdesc):
38
    self.data = jdesc
39
    jdesc.status = opcodes.Job.STATUS_PENDING
40
    jdesc.job_id = jid
41
    jdesc.op_status = [opcodes.Job.STATUS_PENDING for i in jdesc.op_list]
42
    jdesc.op_result = [None for i in jdesc.op_list]
43
    self.lock = threading.Lock()
44

    
45
  def SetStatus(self, status, result=None):
46
    self.lock.acquire()
47
    self.data.status = status
48
    if result is not None:
49
      self.data.result = result
50
    self.lock.release()
51

    
52
  def GetData(self):
53
    self.lock.acquire()
54
    #FIXME(iustin): make a deep copy of result
55
    result = self.data
56
    self.lock.release()
57
    return result
58

    
59

    
60
class QueueManager:
61
  """Example queue implementation.
62

63
  """
64
  def __init__(self):
65
    self.job_queue = {}
66
    self.jid = 1
67
    self.lock = threading.Lock()
68
    self.new_queue = Queue.Queue()
69

    
70
  def put(self, item):
71
    """Add a new job to the queue.
72

73
    This enters the job into our job queue and also puts it on the new
74
    queue, in order for it to be picked up by the queue processors.
75

76
    """
77
    self.lock.acquire()
78
    try:
79
      rid = self.jid
80
      self.jid += 1
81
      job = JobObject(rid, item)
82
      self.job_queue[rid] = job
83
    finally:
84
      self.lock.release()
85
    self.new_queue.put(job)
86
    return rid
87

    
88
  def query(self, rid):
89
    """Query a given job ID.
90

91
    """
92
    self.lock.acquire()
93
    result = self.job_queue.get(rid, None)
94
    self.lock.release()
95
    return result
96

    
97
  def query_jobs(self, fields, names):
98
    """Query all jobs.
99

100
    The fields and names parameters are similar to the ones passed to
101
    the OpQueryInstances.
102

103
    """
104
    result = []
105
    self.lock.acquire()
106
    if names:
107
      values = [self.job_queue[j_id] for j_id in names]
108
    else:
109
      values = self.job_queue.itervalues()
110
    try:
111
      for jobj in values:
112
        row = []
113
        jdata = jobj.data
114
        for fname in fields:
115
          if fname == "id":
116
            row.append(jdata.job_id)
117
          elif fname == "status":
118
            row.append(jdata.status)
119
          elif fname == "op_list":
120
            row.append([op.__getstate__() for op in jdata.op_list])
121
          elif fname == "op_status":
122
            row.append(jdata.op_status)
123
          elif fname == "op_result":
124
            row.append(jdata.op_result)
125
          else:
126
            raise errors.OpExecError("Invalid job query field '%s'" %
127
                                           fname)
128
        result.append(row)
129
    finally:
130
      self.lock.release()
131
    return result