Move python test files to test/py
[ganeti-local] / test / py / ganeti.client.gnt_job_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.client.gnt_job"""
23
24 import unittest
25 import optparse
26
27 from ganeti.client import gnt_job
28 from ganeti import utils
29 from ganeti import errors
30 from ganeti import query
31 from ganeti import qlang
32 from ganeti import objects
33 from ganeti import compat
34 from ganeti import constants
35
36 import testutils
37
38
39 class _ClientForCancelJob:
40   def __init__(self, cancel_cb, query_cb):
41     self.cancelled = []
42     self._cancel_cb = cancel_cb
43     self._query_cb = query_cb
44
45   def CancelJob(self, job_id):
46     self.cancelled.append(job_id)
47     return self._cancel_cb(job_id)
48
49   def Query(self, kind, selected, qfilter):
50     assert kind == constants.QR_JOB
51     assert selected == ["id", "status", "summary"]
52
53     fields = query.GetAllFields(query._GetQueryFields(query.JOB_FIELDS,
54                                                       selected))
55
56     return objects.QueryResponse(data=self._query_cb(qfilter),
57                                  fields=fields)
58
59
60 class TestCancelJob(unittest.TestCase):
61   def setUp(self):
62     unittest.TestCase.setUp(self)
63     self.stdout = []
64
65   def _ToStdout(self, line):
66     self.stdout.append(line)
67
68   def _Ask(self, answer, question):
69     self.assertTrue(question.endswith("?"))
70     return answer
71
72   def testStatusFilterAndArguments(self):
73     opts = optparse.Values(dict(status_filter=frozenset()))
74     try:
75       gnt_job.CancelJobs(opts, ["a"], cl=NotImplemented,
76                          _stdout_fn=NotImplemented, _ask_fn=NotImplemented)
77     except errors.OpPrereqError, err:
78       self.assertEqual(err.args[1], errors.ECODE_INVAL)
79     else:
80       self.fail("Did not raise exception")
81
82   def _TestArguments(self, force):
83     opts = optparse.Values(dict(status_filter=None, force=force))
84
85     def _CancelCb(job_id):
86       self.assertTrue(job_id in ("24185", "3252"))
87       return (True, "%s will be cancelled" % job_id)
88
89     cl = _ClientForCancelJob(_CancelCb, NotImplemented)
90     self.assertEqual(gnt_job.CancelJobs(opts, ["24185", "3252"], cl=cl,
91                                         _stdout_fn=self._ToStdout,
92                                         _ask_fn=NotImplemented),
93                      constants.EXIT_SUCCESS)
94     self.assertEqual(cl.cancelled, ["24185", "3252"])
95     self.assertEqual(self.stdout, [
96       "24185 will be cancelled",
97       "3252 will be cancelled",
98       ])
99
100   def testArgumentsWithForce(self):
101     self._TestArguments(True)
102
103   def testArgumentsNoForce(self):
104     self._TestArguments(False)
105
106   def testArgumentsWithError(self):
107     opts = optparse.Values(dict(status_filter=None, force=True))
108
109     def _CancelCb(job_id):
110       if job_id == "10788":
111         return (False, "error %s" % job_id)
112       else:
113         return (True, "%s will be cancelled" % job_id)
114
115     cl = _ClientForCancelJob(_CancelCb, NotImplemented)
116     self.assertEqual(gnt_job.CancelJobs(opts, ["203", "10788", "30801"], cl=cl,
117                                         _stdout_fn=self._ToStdout,
118                                         _ask_fn=NotImplemented),
119                      constants.EXIT_FAILURE)
120     self.assertEqual(cl.cancelled, ["203", "10788", "30801"])
121     self.assertEqual(self.stdout, [
122       "203 will be cancelled",
123       "error 10788",
124       "30801 will be cancelled",
125       ])
126
127   def testFilterPending(self):
128     opts = optparse.Values(dict(status_filter=constants.JOBS_PENDING,
129                                 force=False))
130
131     def _Query(qfilter):
132       # Need to sort as constants.JOBS_PENDING has no stable order
133       assert isinstance(constants.JOBS_PENDING, frozenset)
134       self.assertEqual(sorted(qfilter),
135                        sorted(qlang.MakeSimpleFilter("status",
136                                                      constants.JOBS_PENDING)))
137
138       return [
139         [(constants.RS_UNAVAIL, None),
140          (constants.RS_UNAVAIL, None),
141          (constants.RS_UNAVAIL, None)],
142         [(constants.RS_NORMAL, 32532),
143          (constants.RS_NORMAL, constants.JOB_STATUS_QUEUED),
144          (constants.RS_NORMAL, ["op1", "op2", "op3"])],
145         ]
146
147     cl = _ClientForCancelJob(NotImplemented, _Query)
148
149     result = gnt_job.CancelJobs(opts, [], cl=cl,
150                                 _stdout_fn=self._ToStdout,
151                                 _ask_fn=compat.partial(self._Ask, False))
152     self.assertEqual(result, constants.EXIT_CONFIRMATION)
153
154
155 if __name__ == "__main__":
156   testutils.GanetiTestProgram()