Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 8f5c488d

History | View | Annotate | Download (10.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30

    
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import jqueue
35
from ganeti import opcodes
36
from ganeti import compat
37

    
38
import testutils
39

    
40

    
41
class _FakeJob:
42
  def __init__(self, job_id, status):
43
    self.id = job_id
44
    self._status = status
45
    self._log = []
46

    
47
  def SetStatus(self, status):
48
    self._status = status
49

    
50
  def AddLogEntry(self, msg):
51
    self._log.append((len(self._log), msg))
52

    
53
  def CalcStatus(self):
54
    return self._status
55

    
56
  def GetInfo(self, fields):
57
    result = []
58

    
59
    for name in fields:
60
      if name == "status":
61
        result.append(self._status)
62
      else:
63
        raise Exception("Unknown field")
64

    
65
    return result
66

    
67
  def GetLogEntries(self, newer_than):
68
    assert newer_than is None or newer_than >= 0
69

    
70
    if newer_than is None:
71
      return self._log
72

    
73
    return self._log[newer_than:]
74

    
75

    
76
class TestJobChangesChecker(unittest.TestCase):
77
  def testStatus(self):
78
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
79
    checker = jqueue._JobChangesChecker(["status"], None, None)
80
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
81

    
82
    job.SetStatus(constants.JOB_STATUS_RUNNING)
83
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
84

    
85
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
87

    
88
    # job.id is used by checker
89
    self.assertEqual(job.id, 9094)
90

    
91
  def testStatusWithPrev(self):
92
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
93
    checker = jqueue._JobChangesChecker(["status"],
94
                                        [constants.JOB_STATUS_QUEUED], None)
95
    self.assert_(checker(job) is None)
96

    
97
    job.SetStatus(constants.JOB_STATUS_RUNNING)
98
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
99

    
100
  def testFinalStatus(self):
101
    for status in constants.JOBS_FINALIZED:
102
      job = _FakeJob(2178711, status)
103
      checker = jqueue._JobChangesChecker(["status"], [status], None)
104
      # There won't be any changes in this status, hence it should signal
105
      # a change immediately
106
      self.assertEqual(checker(job), ([status], []))
107

    
108
  def testLog(self):
109
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
110
    checker = jqueue._JobChangesChecker(["status"], None, None)
111
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112

    
113
    job.AddLogEntry("Hello World")
114
    (job_info, log_entries) = checker(job)
115
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
116
    self.assertEqual(log_entries, [[0, "Hello World"]])
117

    
118
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
119
    self.assert_(checker2(job) is None)
120

    
121
    job.AddLogEntry("Foo Bar")
122
    job.SetStatus(constants.JOB_STATUS_ERROR)
123

    
124
    (job_info, log_entries) = checker2(job)
125
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
126
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
127

    
128
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
129
    (job_info, log_entries) = checker3(job)
130
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
132

    
133

    
134
class TestJobChangesWaiter(unittest.TestCase):
135
  def setUp(self):
136
    self.tmpdir = tempfile.mkdtemp()
137
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
138
    utils.WriteFile(self.filename, data="")
139

    
140
  def tearDown(self):
141
    shutil.rmtree(self.tmpdir)
142

    
143
  def _EnsureNotifierClosed(self, notifier):
144
    try:
145
      os.fstat(notifier._fd)
146
    except EnvironmentError, err:
147
      self.assertEqual(err.errno, errno.EBADF)
148
    else:
149
      self.fail("File descriptor wasn't closed")
150

    
151
  def testClose(self):
152
    for wait in [False, True]:
153
      waiter = jqueue._JobFileChangesWaiter(self.filename)
154
      try:
155
        if wait:
156
          waiter.Wait(0.001)
157
      finally:
158
        waiter.Close()
159

    
160
      # Ensure file descriptor was closed
161
      self._EnsureNotifierClosed(waiter._notifier)
162

    
163
  def testChangingFile(self):
164
    waiter = jqueue._JobFileChangesWaiter(self.filename)
165
    try:
166
      self.assertFalse(waiter.Wait(0.1))
167
      utils.WriteFile(self.filename, data="changed")
168
      self.assert_(waiter.Wait(60))
169
    finally:
170
      waiter.Close()
171

    
172
    self._EnsureNotifierClosed(waiter._notifier)
173

    
174
  def testChangingFile2(self):
175
    waiter = jqueue._JobChangesWaiter(self.filename)
176
    try:
177
      self.assertFalse(waiter._filewaiter)
178
      self.assert_(waiter.Wait(0.1))
179
      self.assert_(waiter._filewaiter)
180

    
181
      # File waiter is now used, but there have been no changes
182
      self.assertFalse(waiter.Wait(0.1))
183
      utils.WriteFile(self.filename, data="changed")
184
      self.assert_(waiter.Wait(60))
185
    finally:
186
      waiter.Close()
187

    
188
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
189

    
190

    
191
class TestWaitForJobChangesHelper(unittest.TestCase):
192
  def setUp(self):
193
    self.tmpdir = tempfile.mkdtemp()
194
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
195
    utils.WriteFile(self.filename, data="")
196

    
197
  def tearDown(self):
198
    shutil.rmtree(self.tmpdir)
199

    
200
  def _LoadWaitingJob(self):
201
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
202

    
203
  def _LoadLostJob(self):
204
    return None
205

    
206
  def testNoChanges(self):
207
    wfjc = jqueue._WaitForJobChangesHelper()
208

    
209
    # No change
210
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
211
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
212
                     constants.JOB_NOTCHANGED)
213

    
214
    # No previous information
215
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
216
                          ["status"], None, None, 1.0),
217
                     ([constants.JOB_STATUS_WAITLOCK], []))
218

    
219
  def testLostJob(self):
220
    wfjc = jqueue._WaitForJobChangesHelper()
221
    self.assert_(wfjc(self.filename, self._LoadLostJob,
222
                      ["status"], None, None, 1.0) is None)
223

    
224

    
225
class TestEncodeOpError(unittest.TestCase):
226
  def test(self):
227
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
228
    self.assert_(isinstance(encerr, tuple))
229
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
230

    
231
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
232
    self.assert_(isinstance(encerr, tuple))
233
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
234

    
235
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
236
    self.assert_(isinstance(encerr, tuple))
237
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
238

    
239
    encerr = jqueue._EncodeOpError("Hello World")
240
    self.assert_(isinstance(encerr, tuple))
241
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
242

    
243

    
244
class TestQueuedOpCode(unittest.TestCase):
245
  def testDefaults(self):
246
    def _Check(op):
247
      self.assertFalse(hasattr(op.input, "dry_run"))
248
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249
      self.assertFalse(op.log)
250
      self.assert_(op.start_timestamp is None)
251
      self.assert_(op.exec_timestamp is None)
252
      self.assert_(op.end_timestamp is None)
253
      self.assert_(op.result is None)
254
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
255

    
256
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
257
    _Check(op1)
258
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
259
    _Check(op2)
260
    self.assertEqual(op1.Serialize(), op2.Serialize())
261

    
262
  def testPriority(self):
263
    def _Check(op):
264
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265
             "Default priority equals high priority; test can't work"
266
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
268

    
269
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270
    op1 = jqueue._QueuedOpCode(inpop)
271
    _Check(op1)
272
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
273
    _Check(op2)
274
    self.assertEqual(op1.Serialize(), op2.Serialize())
275

    
276

    
277
class TestQueuedJob(unittest.TestCase):
278
  def testDefaults(self):
279
    job_id = 4260
280
    ops = [
281
      opcodes.OpGetTags(),
282
      opcodes.OpTestDelay(),
283
      ]
284

    
285
    def _Check(job):
286
      self.assertEqual(job.id, job_id)
287
      self.assertEqual(job.log_serial, 0)
288
      self.assert_(job.received_timestamp)
289
      self.assert_(job.start_timestamp is None)
290
      self.assert_(job.end_timestamp is None)
291
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
292
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
293
      self.assert_(repr(job).startswith("<"))
294
      self.assertEqual(len(job.ops), len(ops))
295
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
296
                              for (inp, op) in zip(ops, job.ops)))
297

    
298
    job1 = jqueue._QueuedJob(None, job_id, ops)
299
    _Check(job1)
300
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
301
    _Check(job2)
302
    self.assertEqual(job1.Serialize(), job2.Serialize())
303

    
304
  def testPriority(self):
305
    job_id = 4283
306
    ops = [
307
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
308
      opcodes.OpTestDelay(),
309
      ]
310

    
311
    def _Check(job):
312
      self.assertEqual(job.id, job_id)
313
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
314
      self.assert_(repr(job).startswith("<"))
315

    
316
    job = jqueue._QueuedJob(None, job_id, ops)
317
    _Check(job)
318
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
319
                            for op in job.ops))
320
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
321

    
322
    # Increase first
323
    job.ops[0].priority -= 1
324
    _Check(job)
325
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
326

    
327
    # Mark opcode as finished
328
    job.ops[0].status = constants.OP_STATUS_SUCCESS
329
    _Check(job)
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

    
332
    # Increase second
333
    job.ops[1].priority -= 10
334
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
335

    
336
    # Test increasing first
337
    job.ops[0].status = constants.OP_STATUS_RUNNING
338
    job.ops[0].priority -= 19
339
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
340

    
341

    
342
if __name__ == "__main__":
343
  testutils.GanetiTestProgram()