Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 7a1ecaed

History | View | Annotate | Download (3.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import threading
25
import Queue
26

    
27
from ganeti import opcodes
28
from ganeti import errors
29

    
30
class JobObject:
31
  """In-memory job representation.
32

33
  This is what we use to track the user-submitted jobs (which are of
34
  class opcodes.Job).
35

36
  """
37
  def __init__(self, jid, jdesc):
38
    self.data = jdesc
39
    jdesc.status = opcodes.Job.STATUS_PENDING
40
    jdesc.job_id = jid
41
    self.lock = threading.Lock()
42

    
43
  def SetStatus(self, status, result=None):
44
    self.lock.acquire()
45
    self.data.status = status
46
    if result is not None:
47
      self.data.result = result
48
    self.lock.release()
49

    
50
  def GetData(self):
51
    self.lock.acquire()
52
    #FIXME(iustin): make a deep copy of result
53
    result = self.data
54
    self.lock.release()
55
    return result
56

    
57

    
58
class QueueManager:
59
  """Example queue implementation.
60

61
  """
62
  def __init__(self):
63
    self.job_queue = {}
64
    self.jid = 1
65
    self.lock = threading.Lock()
66
    self.new_queue = Queue.Queue()
67

    
68
  def put(self, item):
69
    """Add a new job to the queue.
70

71
    This enters the job into our job queue and also puts it on the new
72
    queue, in order for it to be picked up by the queue processors.
73

74
    """
75
    self.lock.acquire()
76
    try:
77
      rid = self.jid
78
      self.jid += 1
79
      job = JobObject(rid, item)
80
      self.job_queue[rid] = job
81
    finally:
82
      self.lock.release()
83
    self.new_queue.put(job)
84
    return rid
85

    
86
  def query(self, rid):
87
    """Query a given job ID.
88

89
    """
90
    self.lock.acquire()
91
    result = self.job_queue.get(rid, None)
92
    self.lock.release()
93
    return result
94

    
95
  def query_jobs(self, fields, names):
96
    """Query all jobs.
97

98
    The fields and names parameters are similar to the ones passed to
99
    the OpQueryInstances.
100

101
    """
102
    result = []
103
    self.lock.acquire()
104
    try:
105
      for jobj in self.job_queue.itervalues():
106
        row = []
107
        jdata = jobj.data
108
        for fname in fields:
109
          if fname == "id":
110
            row.append(jdata.job_id)
111
          elif fname == "status":
112
            row.append(jdata.status)
113
          elif fname == "opcodes":
114
            row.append(",".join([op.OP_ID for op in jdata.op_list]))
115
          else:
116
            raise errors.OpExecError("Invalid job query field '%s'" %
117
                                           fname)
118
        result.append(row)
119
    finally:
120
      self.lock.release()
121
    return result