Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.jqueue_unittest.py @ 5f6b0b71

History | View | Annotate | Download (11 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.jqueue"""
23

    
24
import os
25
import sys
26
import unittest
27
import tempfile
28
import shutil
29
import errno
30

    
31
from ganeti import constants
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import jqueue
35
from ganeti import opcodes
36
from ganeti import compat
37

    
38
import testutils
39

    
40

    
41
class _FakeJob:
42
  def __init__(self, job_id, status):
43
    self.id = job_id
44
    self._status = status
45
    self._log = []
46

    
47
  def SetStatus(self, status):
48
    self._status = status
49

    
50
  def AddLogEntry(self, msg):
51
    self._log.append((len(self._log), msg))
52

    
53
  def CalcStatus(self):
54
    return self._status
55

    
56
  def GetInfo(self, fields):
57
    result = []
58

    
59
    for name in fields:
60
      if name == "status":
61
        result.append(self._status)
62
      else:
63
        raise Exception("Unknown field")
64

    
65
    return result
66

    
67
  def GetLogEntries(self, newer_than):
68
    assert newer_than is None or newer_than >= 0
69

    
70
    if newer_than is None:
71
      return self._log
72

    
73
    return self._log[newer_than:]
74

    
75

    
76
class TestJobChangesChecker(unittest.TestCase):
77
  def testStatus(self):
78
    job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
79
    checker = jqueue._JobChangesChecker(["status"], None, None)
80
    self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
81

    
82
    job.SetStatus(constants.JOB_STATUS_RUNNING)
83
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
84

    
85
    job.SetStatus(constants.JOB_STATUS_SUCCESS)
86
    self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
87

    
88
    # job.id is used by checker
89
    self.assertEqual(job.id, 9094)
90

    
91
  def testStatusWithPrev(self):
92
    job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
93
    checker = jqueue._JobChangesChecker(["status"],
94
                                        [constants.JOB_STATUS_QUEUED], None)
95
    self.assert_(checker(job) is None)
96

    
97
    job.SetStatus(constants.JOB_STATUS_RUNNING)
98
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
99

    
100
  def testFinalStatus(self):
101
    for status in constants.JOBS_FINALIZED:
102
      job = _FakeJob(2178711, status)
103
      checker = jqueue._JobChangesChecker(["status"], [status], None)
104
      # There won't be any changes in this status, hence it should signal
105
      # a change immediately
106
      self.assertEqual(checker(job), ([status], []))
107

    
108
  def testLog(self):
109
    job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
110
    checker = jqueue._JobChangesChecker(["status"], None, None)
111
    self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
112

    
113
    job.AddLogEntry("Hello World")
114
    (job_info, log_entries) = checker(job)
115
    self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
116
    self.assertEqual(log_entries, [[0, "Hello World"]])
117

    
118
    checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
119
    self.assert_(checker2(job) is None)
120

    
121
    job.AddLogEntry("Foo Bar")
122
    job.SetStatus(constants.JOB_STATUS_ERROR)
123

    
124
    (job_info, log_entries) = checker2(job)
125
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
126
    self.assertEqual(log_entries, [[1, "Foo Bar"]])
127

    
128
    checker3 = jqueue._JobChangesChecker(["status"], None, None)
129
    (job_info, log_entries) = checker3(job)
130
    self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
131
    self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
132

    
133

    
134
class TestJobChangesWaiter(unittest.TestCase):
135
  def setUp(self):
136
    self.tmpdir = tempfile.mkdtemp()
137
    self.filename = utils.PathJoin(self.tmpdir, "job-1")
138
    utils.WriteFile(self.filename, data="")
139

    
140
  def tearDown(self):
141
    shutil.rmtree(self.tmpdir)
142

    
143
  def _EnsureNotifierClosed(self, notifier):
144
    try:
145
      os.fstat(notifier._fd)
146
    except EnvironmentError, err:
147
      self.assertEqual(err.errno, errno.EBADF)
148
    else:
149
      self.fail("File descriptor wasn't closed")
150

    
151
  def testClose(self):
152
    for wait in [False, True]:
153
      waiter = jqueue._JobFileChangesWaiter(self.filename)
154
      try:
155
        if wait:
156
          waiter.Wait(0.001)
157
      finally:
158
        waiter.Close()
159

    
160
      # Ensure file descriptor was closed
161
      self._EnsureNotifierClosed(waiter._notifier)
162

    
163
  def testChangingFile(self):
164
    waiter = jqueue._JobFileChangesWaiter(self.filename)
165
    try:
166
      self.assertFalse(waiter.Wait(0.1))
167
      utils.WriteFile(self.filename, data="changed")
168
      self.assert_(waiter.Wait(60))
169
    finally:
170
      waiter.Close()
171

    
172
    self._EnsureNotifierClosed(waiter._notifier)
173

    
174
  def testChangingFile2(self):
175
    waiter = jqueue._JobChangesWaiter(self.filename)
176
    try:
177
      self.assertFalse(waiter._filewaiter)
178
      self.assert_(waiter.Wait(0.1))
179
      self.assert_(waiter._filewaiter)
180

    
181
      # File waiter is now used, but there have been no changes
182
      self.assertFalse(waiter.Wait(0.1))
183
      utils.WriteFile(self.filename, data="changed")
184
      self.assert_(waiter.Wait(60))
185
    finally:
186
      waiter.Close()
187

    
188
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
189

    
190

    
191
class TestWaitForJobChangesHelper(unittest.TestCase):
192
  def setUp(self):
193
    self.tmpdir = tempfile.mkdtemp()
194
    self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
195
    utils.WriteFile(self.filename, data="")
196

    
197
  def tearDown(self):
198
    shutil.rmtree(self.tmpdir)
199

    
200
  def _LoadWaitingJob(self):
201
    return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
202

    
203
  def _LoadLostJob(self):
204
    return None
205

    
206
  def testNoChanges(self):
207
    wfjc = jqueue._WaitForJobChangesHelper()
208

    
209
    # No change
210
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
211
                          [constants.JOB_STATUS_WAITLOCK], None, 0.1),
212
                     constants.JOB_NOTCHANGED)
213

    
214
    # No previous information
215
    self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
216
                          ["status"], None, None, 1.0),
217
                     ([constants.JOB_STATUS_WAITLOCK], []))
218

    
219
  def testLostJob(self):
220
    wfjc = jqueue._WaitForJobChangesHelper()
221
    self.assert_(wfjc(self.filename, self._LoadLostJob,
222
                      ["status"], None, None, 1.0) is None)
223

    
224

    
225
class TestEncodeOpError(unittest.TestCase):
226
  def test(self):
227
    encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
228
    self.assert_(isinstance(encerr, tuple))
229
    self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
230

    
231
    encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
232
    self.assert_(isinstance(encerr, tuple))
233
    self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
234

    
235
    encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
236
    self.assert_(isinstance(encerr, tuple))
237
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
238

    
239
    encerr = jqueue._EncodeOpError("Hello World")
240
    self.assert_(isinstance(encerr, tuple))
241
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
242

    
243

    
244
class TestQueuedOpCode(unittest.TestCase):
245
  def testDefaults(self):
246
    def _Check(op):
247
      self.assertFalse(hasattr(op.input, "dry_run"))
248
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249
      self.assertFalse(op.log)
250
      self.assert_(op.start_timestamp is None)
251
      self.assert_(op.exec_timestamp is None)
252
      self.assert_(op.end_timestamp is None)
253
      self.assert_(op.result is None)
254
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
255

    
256
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
257
    _Check(op1)
258
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
259
    _Check(op2)
260
    self.assertEqual(op1.Serialize(), op2.Serialize())
261

    
262
  def testPriority(self):
263
    def _Check(op):
264
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265
             "Default priority equals high priority; test can't work"
266
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
268

    
269
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270
    op1 = jqueue._QueuedOpCode(inpop)
271
    _Check(op1)
272
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
273
    _Check(op2)
274
    self.assertEqual(op1.Serialize(), op2.Serialize())
275

    
276

    
277
class TestQueuedJob(unittest.TestCase):
278
  def test(self):
279
    self.assertRaises(errors.GenericError, jqueue._QueuedJob,
280
                      None, 1, [])
281

    
282
  def testDefaults(self):
283
    job_id = 4260
284
    ops = [
285
      opcodes.OpGetTags(),
286
      opcodes.OpTestDelay(),
287
      ]
288

    
289
    def _Check(job):
290
      self.assertEqual(job.id, job_id)
291
      self.assertEqual(job.log_serial, 0)
292
      self.assert_(job.received_timestamp)
293
      self.assert_(job.start_timestamp is None)
294
      self.assert_(job.end_timestamp is None)
295
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
296
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
297
      self.assert_(repr(job).startswith("<"))
298
      self.assertEqual(len(job.ops), len(ops))
299
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
300
                              for (inp, op) in zip(ops, job.ops)))
301
      self.assertRaises(errors.OpExecError, job.GetInfo,
302
                        ["unknown-field"])
303
      self.assertEqual(job.GetInfo(["summary"]),
304
                       [[op.input.Summary() for op in job.ops]])
305

    
306
    job1 = jqueue._QueuedJob(None, job_id, ops)
307
    _Check(job1)
308
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
309
    _Check(job2)
310
    self.assertEqual(job1.Serialize(), job2.Serialize())
311

    
312
  def testPriority(self):
313
    job_id = 4283
314
    ops = [
315
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
316
      opcodes.OpTestDelay(),
317
      ]
318

    
319
    def _Check(job):
320
      self.assertEqual(job.id, job_id)
321
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
322
      self.assert_(repr(job).startswith("<"))
323

    
324
    job = jqueue._QueuedJob(None, job_id, ops)
325
    _Check(job)
326
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
327
                            for op in job.ops))
328
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
329

    
330
    # Increase first
331
    job.ops[0].priority -= 1
332
    _Check(job)
333
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
334

    
335
    # Mark opcode as finished
336
    job.ops[0].status = constants.OP_STATUS_SUCCESS
337
    _Check(job)
338
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
339

    
340
    # Increase second
341
    job.ops[1].priority -= 10
342
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
343

    
344
    # Test increasing first
345
    job.ops[0].status = constants.OP_STATUS_RUNNING
346
    job.ops[0].priority -= 19
347
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
348

    
349

    
350
if __name__ == "__main__":
351
  testutils.GanetiTestProgram()