Extract GetJobStatuses and use an unified version
[ganeti-local] / qa / qa_performance.py
1 #
2 #
3
4 # Copyright (C) 2014 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Performance testing QA tests.
23
24 """
25
26 import datetime
27 import functools
28 import itertools
29 import threading
30 import time
31
32 from ganeti import constants
33
34 import qa_config
35 import qa_error
36 from qa_instance_utils import GetGenericAddParameters
37 import qa_job_utils
38 import qa_logging
39 import qa_utils
40
41
42 MAX_JOB_SUBMISSION_DURATION = 15.0
43
44
45 class _JobQueueDriver(object):
46   """This class handles polling of jobs and reacting on status changes.
47
48   Jobs are added via the L{AddJob} method, and can have callback functions
49   assigned to them. Those are called as soon as the job enters the appropriate
50   state. Callback functions can add new jobs to the driver as needed.
51
52   A call to L{WaitForCompletion} finally polls Ganeti until all jobs have
53   succeeded.
54
55   """
56
57   _UNKNOWN_STATUS = "unknown"
58
59   class _JobEntry(object):
60     """Internal class representing a job entry.
61
62     """
63     def __init__(self, job_id, running_fn, success_fn):
64       self.job_id = job_id
65       self.running_fn = running_fn
66       self.success_fn = success_fn
67
68     def __str__(self):
69       return str(self.job_id)
70
71   def __init__(self):
72     self._jobs = {}
73     self._running_notified = set()
74     self._jobs_per_status = {}
75     self._lock = threading.RLock()
76
77   def AddJob(self, job_id, running_fn=None, success_fn=None):
78     """Add a job to the driver.
79
80     @type job_id: of ints
81     @param job_id: job id to add to the driver
82     @type running_fn: function taking a L{_JobQueueDriver} and an int
83     @param running_fn: function called once when a job changes to running state
84                        (or success state, if the running state was too short)
85     @type success_fn: function taking a L{_JobQueueDriver} and an int
86     @param success_fn: function called for each successful job id
87
88     """
89     with self._lock:
90       self._jobs[job_id] = _JobQueueDriver._JobEntry(job_id,
91                                                      running_fn,
92                                                      success_fn)
93       # the status will be updated on the next call to _FetchJobStatuses
94       self._jobs_per_status.setdefault(self._UNKNOWN_STATUS, []).append(job_id)
95
96   def _FetchJobStatuses(self):
97     """Retrieves status information of the given jobs.
98
99     """
100     job_statuses = qa_job_utils.GetJobStatuses(self._GetJobIds())
101
102     new_statuses = {}
103     for job_id, status in job_statuses.items():
104       new_statuses.setdefault(status, []).append(self._jobs[int(job_id)])
105     self._jobs_per_status = new_statuses
106
107   def _GetJobIds(self):
108     return list(self._jobs.keys())
109
110   def _GetJobsInStatuses(self, statuses):
111     """Returns a list of L{_JobEntry} of all jobs in the given statuses.
112
113     @type statuses: iterable of strings
114     @param statuses: jobs in those statuses are returned
115     @rtype: list of L{_JobEntry}
116     @return: list of job entries in the requested statuses
117
118     """
119     ret = []
120     for state in statuses:
121       ret.extend(self._jobs_per_status.get(state, []))
122     return ret
123
124   def _UpdateJobStatuses(self):
125     """Retrieves job statuses from the cluster and updates internal state.
126
127     """
128     self._FetchJobStatuses()
129     error_jobs = self._GetJobsInStatuses([constants.JOB_STATUS_ERROR])
130     if error_jobs:
131       raise qa_error.Error(
132         "Jobs %s are in error state!" % [job.job_id for job in error_jobs])
133
134     for job in self._GetJobsInStatuses([constants.JOB_STATUS_RUNNING,
135                                         constants.JOB_STATUS_SUCCESS]):
136       if job.job_id not in self._running_notified:
137         if job.running_fn is not None:
138           job.running_fn(self, job.job_id)
139         self._running_notified.add(job.job_id)
140
141     for job in self._GetJobsInStatuses([constants.JOB_STATUS_SUCCESS]):
142       if job.success_fn is not None:
143         job.success_fn(self, job.job_id)
144
145       # we're done with this job
146       del self._jobs[job.job_id]
147
148   def _HasPendingJobs(self):
149     """Checks if there are still jobs pending.
150
151     @rtype: bool
152     @return: C{True} if there are still jobs which have not succeeded
153
154     """
155     with self._lock:
156       self._UpdateJobStatuses()
157       uncompleted_jobs = self._GetJobsInStatuses(
158         constants.JOB_STATUS_ALL - constants.JOBS_FINALIZED)
159       unknown_jobs = self._GetJobsInStatuses([self._UNKNOWN_STATUS])
160       return len(uncompleted_jobs) > 0 or len(unknown_jobs) > 0
161
162   def WaitForCompletion(self):
163     """Wait for the completion of all registered jobs.
164
165     """
166     while self._HasPendingJobs():
167       time.sleep(2)
168
169     with self._lock:
170       if self._jobs:
171         raise qa_error.Error(
172           "Jobs %s didn't finish in success state!" % self._GetJobIds())
173
174
175 def _AcquireAllInstances():
176   """Generator for acquiring all instances in the QA config.
177
178   """
179   try:
180     while True:
181       instance = qa_config.AcquireInstance()
182       yield instance
183   except qa_error.OutOfInstancesError:
184     pass
185
186
187 def _AcquireAllNodes():
188   """Generator for acquiring all nodes in the QA config.
189
190   """
191   exclude = []
192   try:
193     while True:
194       node = qa_config.AcquireNode(exclude=exclude)
195       exclude.append(node)
196       yield node
197   except qa_error.OutOfNodesError:
198     pass
199
200
201 def _ExecuteJobSubmittingCmd(cmd):
202   """Executes a job submitting command and returns the resulting job ID.
203
204   This will fail if submitting the job takes longer than
205   L{MAX_JOB_SUBMISSION_DURATION}.
206
207   @type cmd: list of string or string
208   @param cmd: the job producing command to execute on the cluster
209   @rtype: int
210   @return: job-id
211
212   """
213   start = datetime.datetime.now()
214   result = qa_job_utils.ExecuteJobProducingCommand(cmd)
215   duration = qa_utils.TimedeltaToTotalSeconds(datetime.datetime.now() - start)
216   if duration > MAX_JOB_SUBMISSION_DURATION:
217     raise qa_error.Error(
218       "Executing '%s' took %f seconds, only %f are allowed" %
219       (cmd, duration, MAX_JOB_SUBMISSION_DURATION))
220   return result
221
222
223 def _SubmitInstanceCreationJob(instance, disk_template=None):
224   """Submit an instance creation job.
225
226   @type instance: L{qa_config._QaInstance}
227   @param instance: instance to submit a create command for
228   @type disk_template: string
229   @param disk_template: disk template for the new instance or C{None} which
230                         causes the default disk template to be used
231   @rtype: int
232   @return: job id of the submitted creation job
233
234   """
235   if disk_template is None:
236     disk_template = qa_config.GetDefaultDiskTemplate()
237   try:
238     cmd = (["gnt-instance", "add", "--submit",
239             "--os-type=%s" % qa_config.get("os"),
240             "--disk-template=%s" % disk_template] +
241            GetGenericAddParameters(instance, disk_template))
242     cmd.append(instance.name)
243
244     instance.SetDiskTemplate(disk_template)
245
246     return _ExecuteJobSubmittingCmd(cmd)
247   except:
248     instance.Release()
249     raise
250
251
252 def _SubmitInstanceRemoveJob(instance):
253   """Submit an instance remove job.
254
255   @type instance: L{qa_config._QaInstance}
256   @param instance: the instance to remove
257   @rtype: int
258   @return: job id of the submitted remove job
259
260   """
261   try:
262     cmd = (["gnt-instance", "remove", "--submit", "-f"])
263     cmd.append(instance.name)
264
265     return _ExecuteJobSubmittingCmd(cmd)
266   finally:
267     instance.Release()
268
269
270 def _TestParallelInstanceCreationAndRemoval(max_instances=None,
271                                             disk_template=None,
272                                             custom_job_driver=None):
273   """Tests parallel creation and immediate removal of instances.
274
275   @type max_instances: int
276   @param max_instances: maximum number of instances to create
277   @type disk_template: string
278   @param disk_template: disk template for the new instances or C{None} which
279                         causes the default disk template to be used
280   @type custom_job_driver: _JobQueueDriver
281   @param custom_job_driver: a custom L{_JobQueueDriver} to use if not L{None}.
282                             If one is specified, C{WaitForCompletion} is _not_
283                             called on it.
284
285   """
286   job_driver = custom_job_driver or _JobQueueDriver()
287
288   def _CreateSuccessFn(instance, job_driver, _):
289     job_id = _SubmitInstanceRemoveJob(instance)
290     job_driver.AddJob(job_id)
291
292   instance_generator = _AcquireAllInstances()
293   if max_instances is not None:
294     instance_generator = itertools.islice(instance_generator, max_instances)
295
296   for instance in instance_generator:
297     job_id = _SubmitInstanceCreationJob(instance, disk_template=disk_template)
298     job_driver.AddJob(
299       job_id, success_fn=functools.partial(_CreateSuccessFn, instance))
300
301   if custom_job_driver is None:
302     job_driver.WaitForCompletion()
303
304
305 def TestParallelMaxInstanceCreationPerformance():
306   """PERFORMANCE: Parallel instance creation (instance count = max).
307
308   """
309   _TestParallelInstanceCreationAndRemoval()
310
311
312 def TestParallelNodeCountInstanceCreationPerformance():
313   """PERFORMANCE: Parallel instance creation (instance count = node count).
314
315   """
316   nodes = list(_AcquireAllNodes())
317   _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes))
318   qa_config.ReleaseManyNodes(nodes)
319
320
321 def CreateAllInstances():
322   """Create all instances configured in QA config in the cluster.
323
324   @rtype: list of L{qa_config._QaInstance}
325   @return: list of instances created in the cluster
326
327   """
328   job_driver = _JobQueueDriver()
329   instances = list(_AcquireAllInstances())
330   for instance in instances:
331     job_id = _SubmitInstanceCreationJob(instance)
332     job_driver.AddJob(job_id)
333
334   job_driver.WaitForCompletion()
335   return instances
336
337
338 def RemoveAllInstances(instances):
339   """Removes all given instances from the cluster.
340
341   @type instances: list of L{qa_config._QaInstance}
342   @param instances:
343
344   """
345   job_driver = _JobQueueDriver()
346   for instance in instances:
347     job_id = _SubmitInstanceRemoveJob(instance)
348     job_driver.AddJob(job_id)
349
350   job_driver.WaitForCompletion()
351
352
353 def TestParallelModify(instances):
354   """PERFORMANCE: Parallel instance modify.
355
356   @type instances: list of L{qa_config._QaInstance}
357   @param instances: list of instances to issue modify commands against
358
359   """
360   job_driver = _JobQueueDriver()
361   # set min mem to same value as max mem
362   new_min_mem = qa_config.get(constants.BE_MAXMEM)
363   for instance in instances:
364     cmd = (["gnt-instance", "modify", "--submit",
365             "-B", "%s=%s" % (constants.BE_MINMEM, new_min_mem)])
366     cmd.append(instance.name)
367     job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
368
369     cmd = (["gnt-instance", "modify", "--submit",
370             "-O", "fake_os_param=fake_value"])
371     cmd.append(instance.name)
372     job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
373
374     cmd = (["gnt-instance", "modify", "--submit",
375             "-O", "fake_os_param=fake_value",
376             "-B", "%s=%s" % (constants.BE_MINMEM, new_min_mem)])
377     cmd.append(instance.name)
378     job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
379
380   job_driver.WaitForCompletion()
381
382
383 def TestParallelInstanceOSOperations(instances):
384   """PERFORMANCE: Parallel instance OS operations.
385
386   Note: This test leaves the instances either running or stopped, there's no
387   guarantee on the actual status.
388
389   @type instances: list of L{qa_config._QaInstance}
390   @param instances: list of instances to issue lifecycle commands against
391
392   """
393   OPS = ["start", "shutdown", "reboot", "reinstall"]
394   job_driver = _JobQueueDriver()
395
396   def _SubmitNextOperation(instance, start, idx, job_driver, _):
397     if idx == len(OPS):
398       return
399     op_idx = (start + idx) % len(OPS)
400
401     next_fn = functools.partial(_SubmitNextOperation, instance, start, idx + 1)
402
403     if OPS[op_idx] == "reinstall" and \
404         instance.disk_template == constants.DT_DISKLESS:
405       # no reinstall possible with diskless instances
406       next_fn(job_driver, None)
407       return
408     elif OPS[op_idx] == "reinstall":
409       # the instance has to be shut down for reinstall to work
410       shutdown_cmd = ["gnt-instance", "shutdown", "--submit", instance.name]
411       cmd = ["gnt-instance", "reinstall", "--submit", "-f", instance.name]
412
413       job_driver.AddJob(_ExecuteJobSubmittingCmd(shutdown_cmd),
414                         running_fn=lambda _, __: job_driver.AddJob(
415                           _ExecuteJobSubmittingCmd(cmd),
416                           running_fn=next_fn))
417     else:
418       cmd = ["gnt-instance", OPS[op_idx], "--submit"]
419       if OPS[op_idx] == "reinstall":
420         cmd.append("-f")
421       cmd.append(instance.name)
422
423       job_id = _ExecuteJobSubmittingCmd(cmd)
424       job_driver.AddJob(job_id, running_fn=next_fn)
425
426   for start, instance in enumerate(instances):
427     _SubmitNextOperation(instance, start % len(OPS), 0, job_driver, None)
428
429   job_driver.WaitForCompletion()
430
431
432 # TODO(thomasth): move to qa_job_utils.py once stable-2.10 is merged to master
433 class _QAThreadGroup(object):
434   """This class manages a list of QAThreads.
435
436   """
437   def __init__(self):
438     self._threads = []
439
440   def Start(self, thread):
441     """Starts the given thread and adds it to this group.
442
443     @type thread: qa_job_utils.QAThread
444     @param thread: the thread to start and to add to this group.
445
446     """
447     thread.start()
448     self._threads.append(thread)
449
450   def JoinAndReraise(self):
451     """Joins all threads in this group and calls their C{reraise} method.
452
453     """
454     for thread in self._threads:
455       thread.join()
456       thread.reraise()
457
458
459 def TestParallelInstanceQueries(instances):
460   """PERFORMANCE: Parallel instance queries.
461
462   @type instances: list of L{qa_config._QaInstance}
463   @param instances: list of instances to issue queries against
464
465   """
466   threads = _QAThreadGroup()
467   for instance in instances:
468     cmd = ["gnt-instance", "info", instance.name]
469     info_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
470     threads.Start(info_thread)
471
472     cmd = ["gnt-instance", "list"]
473     list_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
474     threads.Start(list_thread)
475
476   threads.JoinAndReraise()
477
478
479 def TestJobQueueSubmissionPerformance():
480   """PERFORMANCE: Job queue submission performance.
481
482   This test exercises the job queue and verifies that the job submission time
483   does not increase as more jobs are added.
484
485   """
486   MAX_CLUSTER_INFO_SECONDS = 15.0
487   job_driver = _JobQueueDriver()
488   submission_durations = []
489
490   def _VerifySubmissionDuration(duration_seconds):
491     # only start to verify the submission duration once we got data from the
492     # first 10 job submissions
493     if len(submission_durations) >= 10:
494       avg_duration = sum(submission_durations) / len(submission_durations)
495       max_duration = avg_duration * 1.5
496       if duration_seconds > max_duration:
497         print(qa_logging.FormatWarning(
498           "Submitting a delay job took %f seconds, max %f expected" %
499           (duration_seconds, max_duration)))
500     else:
501       submission_durations.append(duration_seconds)
502
503   def _SubmitDelayJob(count):
504     for _ in range(count):
505       cmd = ["gnt-debug", "delay", "--submit", "0.1"]
506
507       start = datetime.datetime.now()
508       job_id = _ExecuteJobSubmittingCmd(cmd)
509       duration_seconds = \
510         qa_utils.TimedeltaToTotalSeconds(datetime.datetime.now() - start)
511       _VerifySubmissionDuration(duration_seconds)
512
513       job_driver.AddJob(job_id)
514
515   threads = _QAThreadGroup()
516   for i in range(10):
517     thread = qa_job_utils.QAThread(_SubmitDelayJob, [20], {})
518     threads.Start(thread)
519
520   threads.JoinAndReraise()
521
522   qa_utils.AssertCommand(["gnt-cluster", "info"],
523                          max_seconds=MAX_CLUSTER_INFO_SECONDS)
524
525   job_driver.WaitForCompletion()
526
527
528 def TestParallelDRBDInstanceCreationPerformance():
529   """PERFORMANCE: Parallel DRBD backed instance creation.
530
531   """
532   assert qa_config.IsTemplateSupported(constants.DT_DRBD8)
533
534   nodes = list(_AcquireAllNodes())
535   _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2,
536                                           disk_template=constants.DT_DRBD8)
537   qa_config.ReleaseManyNodes(nodes)
538
539
540 def TestParallelPlainInstanceCreationPerformance():
541   """PERFORMANCE: Parallel plain backed instance creation.
542
543   """
544   assert qa_config.IsTemplateSupported(constants.DT_PLAIN)
545
546   nodes = list(_AcquireAllNodes())
547   _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2,
548                                           disk_template=constants.DT_PLAIN)
549   qa_config.ReleaseManyNodes(nodes)
550
551
552 def _TestInstanceOperationInParallelToInstanceCreation(*cmds):
553   """Run the given test command in parallel to an instance creation.
554
555   @type cmds: list of list of strings
556   @param cmds: commands to execute in parallel to an instance creation. Each
557                command in the list is executed once the previous job starts
558                to run.
559
560   """
561   def _SubmitNextCommand(cmd_idx, job_driver, _):
562     if cmd_idx >= len(cmds):
563       return
564     job_id = _ExecuteJobSubmittingCmd(cmds[cmd_idx])
565     job_driver.AddJob(
566       job_id, running_fn=functools.partial(_SubmitNextCommand, cmd_idx + 1))
567
568   assert qa_config.IsTemplateSupported(constants.DT_DRBD8)
569   assert len(cmds) > 0
570
571   job_driver = _JobQueueDriver()
572   _SubmitNextCommand(0, job_driver, None)
573
574   _TestParallelInstanceCreationAndRemoval(max_instances=1,
575                                           disk_template=constants.DT_DRBD8,
576                                           custom_job_driver=job_driver)
577
578   job_driver.WaitForCompletion()
579
580
581 def TestParallelInstanceFailover(instance):
582   """PERFORMANCE: Instance failover with parallel instance creation.
583
584   """
585   _TestInstanceOperationInParallelToInstanceCreation(
586     ["gnt-instance", "failover", "--submit", "-f", "--shutdown-timeout=0",
587      instance.name])
588
589
590 def TestParallelInstanceMigration(instance):
591   """PERFORMANCE: Instance migration with parallel instance creation.
592
593   """
594   _TestInstanceOperationInParallelToInstanceCreation(
595     ["gnt-instance", "migrate", "--submit", "-f", instance.name])
596
597
598 def TestParallelInstanceReplaceDisks(instance):
599   """PERFORMANCE: Instance replace-disks with parallel instance creation.
600
601   """
602   _TestInstanceOperationInParallelToInstanceCreation(
603     ["gnt-instance", "replace-disks", "--submit", "--early-release", "-p",
604      instance.name])
605
606
607 def TestParallelInstanceReboot(instance):
608   """PERFORMANCE: Instance reboot with parallel instance creation.
609
610   """
611   _TestInstanceOperationInParallelToInstanceCreation(
612     ["gnt-instance", "reboot", "--submit", instance.name])
613
614
615 def TestParallelInstanceReinstall(instance):
616   """PERFORMANCE: Instance reinstall with parallel instance creation.
617
618   """
619   # instance reinstall requires the instance to be down
620   qa_utils.AssertCommand(["gnt-instance", "stop", instance.name])
621
622   _TestInstanceOperationInParallelToInstanceCreation(
623     ["gnt-instance", "reinstall", "--submit", "-f", instance.name])
624
625   qa_utils.AssertCommand(["gnt-instance", "start", instance.name])
626
627
628 def TestParallelInstanceRename(instance):
629   """PERFORMANCE: Instance rename with parallel instance creation.
630
631   """
632   # instance rename requires the instance to be down
633   qa_utils.AssertCommand(["gnt-instance", "stop", instance.name])
634
635   new_instance = qa_config.AcquireInstance()
636   try:
637     _TestInstanceOperationInParallelToInstanceCreation(
638       ["gnt-instance", "rename", "--submit", instance.name, new_instance.name],
639       ["gnt-instance", "rename", "--submit", new_instance.name, instance.name])
640   finally:
641     new_instance.Release()
642
643   qa_utils.AssertCommand(["gnt-instance", "start", instance.name])