4 # Copyright (C) 2014 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Performance testing QA tests.
32 from ganeti import constants
36 from qa_instance_utils import GetGenericAddParameters
42 MAX_JOB_SUBMISSION_DURATION = 15.0
45 class _JobQueueDriver(object):
46 """This class handles polling of jobs and reacting on status changes.
48 Jobs are added via the L{AddJob} method, and can have callback functions
49 assigned to them. Those are called as soon as the job enters the appropriate
50 state. Callback functions can add new jobs to the driver as needed.
52 A call to L{WaitForCompletion} finally polls Ganeti until all jobs have
57 _UNKNOWN_STATUS = "unknown"
59 class _JobEntry(object):
60 """Internal class representing a job entry.
63 def __init__(self, job_id, running_fn, success_fn):
65 self.running_fn = running_fn
66 self.success_fn = success_fn
69 return str(self.job_id)
73 self._running_notified = set()
74 self._jobs_per_status = {}
75 self._lock = threading.RLock()
77 def AddJob(self, job_id, running_fn=None, success_fn=None):
78 """Add a job to the driver.
81 @param job_id: job id to add to the driver
82 @type running_fn: function taking a L{_JobQueueDriver} and an int
83 @param running_fn: function called once when a job changes to running state
84 (or success state, if the running state was too short)
85 @type success_fn: function taking a L{_JobQueueDriver} and an int
86 @param success_fn: function called for each successful job id
90 self._jobs[job_id] = _JobQueueDriver._JobEntry(job_id,
93 # the status will be updated on the next call to _FetchJobStatuses
94 self._jobs_per_status.setdefault(self._UNKNOWN_STATUS, []).append(job_id)
96 def _FetchJobStatuses(self):
97 """Retrieves status information of the given jobs.
100 job_statuses = qa_job_utils.GetJobStatuses(self._GetJobIds())
103 for job_id, status in job_statuses.items():
104 new_statuses.setdefault(status, []).append(self._jobs[int(job_id)])
105 self._jobs_per_status = new_statuses
107 def _GetJobIds(self):
108 return list(self._jobs.keys())
110 def _GetJobsInStatuses(self, statuses):
111 """Returns a list of L{_JobEntry} of all jobs in the given statuses.
113 @type statuses: iterable of strings
114 @param statuses: jobs in those statuses are returned
115 @rtype: list of L{_JobEntry}
116 @return: list of job entries in the requested statuses
120 for state in statuses:
121 ret.extend(self._jobs_per_status.get(state, []))
124 def _UpdateJobStatuses(self):
125 """Retrieves job statuses from the cluster and updates internal state.
128 self._FetchJobStatuses()
129 error_jobs = self._GetJobsInStatuses([constants.JOB_STATUS_ERROR])
131 raise qa_error.Error(
132 "Jobs %s are in error state!" % [job.job_id for job in error_jobs])
134 for job in self._GetJobsInStatuses([constants.JOB_STATUS_RUNNING,
135 constants.JOB_STATUS_SUCCESS]):
136 if job.job_id not in self._running_notified:
137 if job.running_fn is not None:
138 job.running_fn(self, job.job_id)
139 self._running_notified.add(job.job_id)
141 for job in self._GetJobsInStatuses([constants.JOB_STATUS_SUCCESS]):
142 if job.success_fn is not None:
143 job.success_fn(self, job.job_id)
145 # we're done with this job
146 del self._jobs[job.job_id]
148 def _HasPendingJobs(self):
149 """Checks if there are still jobs pending.
152 @return: C{True} if there are still jobs which have not succeeded
156 self._UpdateJobStatuses()
157 uncompleted_jobs = self._GetJobsInStatuses(
158 constants.JOB_STATUS_ALL - constants.JOBS_FINALIZED)
159 unknown_jobs = self._GetJobsInStatuses([self._UNKNOWN_STATUS])
160 return len(uncompleted_jobs) > 0 or len(unknown_jobs) > 0
162 def WaitForCompletion(self):
163 """Wait for the completion of all registered jobs.
166 while self._HasPendingJobs():
171 raise qa_error.Error(
172 "Jobs %s didn't finish in success state!" % self._GetJobIds())
175 def _AcquireAllInstances():
176 """Generator for acquiring all instances in the QA config.
181 instance = qa_config.AcquireInstance()
183 except qa_error.OutOfInstancesError:
187 def _AcquireAllNodes():
188 """Generator for acquiring all nodes in the QA config.
194 node = qa_config.AcquireNode(exclude=exclude)
197 except qa_error.OutOfNodesError:
201 def _ExecuteJobSubmittingCmd(cmd):
202 """Executes a job submitting command and returns the resulting job ID.
204 This will fail if submitting the job takes longer than
205 L{MAX_JOB_SUBMISSION_DURATION}.
207 @type cmd: list of string or string
208 @param cmd: the job producing command to execute on the cluster
213 start = datetime.datetime.now()
214 result = qa_job_utils.ExecuteJobProducingCommand(cmd)
215 duration = qa_utils.TimedeltaToTotalSeconds(datetime.datetime.now() - start)
216 if duration > MAX_JOB_SUBMISSION_DURATION:
217 raise qa_error.Error(
218 "Executing '%s' took %f seconds, only %f are allowed" %
219 (cmd, duration, MAX_JOB_SUBMISSION_DURATION))
223 def _SubmitInstanceCreationJob(instance, disk_template=None):
224 """Submit an instance creation job.
226 @type instance: L{qa_config._QaInstance}
227 @param instance: instance to submit a create command for
228 @type disk_template: string
229 @param disk_template: disk template for the new instance or C{None} which
230 causes the default disk template to be used
232 @return: job id of the submitted creation job
235 if disk_template is None:
236 disk_template = qa_config.GetDefaultDiskTemplate()
238 cmd = (["gnt-instance", "add", "--submit",
239 "--os-type=%s" % qa_config.get("os"),
240 "--disk-template=%s" % disk_template] +
241 GetGenericAddParameters(instance, disk_template))
242 cmd.append(instance.name)
244 instance.SetDiskTemplate(disk_template)
246 return _ExecuteJobSubmittingCmd(cmd)
252 def _SubmitInstanceRemoveJob(instance):
253 """Submit an instance remove job.
255 @type instance: L{qa_config._QaInstance}
256 @param instance: the instance to remove
258 @return: job id of the submitted remove job
262 cmd = (["gnt-instance", "remove", "--submit", "-f"])
263 cmd.append(instance.name)
265 return _ExecuteJobSubmittingCmd(cmd)
270 def _TestParallelInstanceCreationAndRemoval(max_instances=None,
272 custom_job_driver=None):
273 """Tests parallel creation and immediate removal of instances.
275 @type max_instances: int
276 @param max_instances: maximum number of instances to create
277 @type disk_template: string
278 @param disk_template: disk template for the new instances or C{None} which
279 causes the default disk template to be used
280 @type custom_job_driver: _JobQueueDriver
281 @param custom_job_driver: a custom L{_JobQueueDriver} to use if not L{None}.
282 If one is specified, C{WaitForCompletion} is _not_
286 job_driver = custom_job_driver or _JobQueueDriver()
288 def _CreateSuccessFn(instance, job_driver, _):
289 job_id = _SubmitInstanceRemoveJob(instance)
290 job_driver.AddJob(job_id)
292 instance_generator = _AcquireAllInstances()
293 if max_instances is not None:
294 instance_generator = itertools.islice(instance_generator, max_instances)
296 for instance in instance_generator:
297 job_id = _SubmitInstanceCreationJob(instance, disk_template=disk_template)
299 job_id, success_fn=functools.partial(_CreateSuccessFn, instance))
301 if custom_job_driver is None:
302 job_driver.WaitForCompletion()
305 def TestParallelMaxInstanceCreationPerformance():
306 """PERFORMANCE: Parallel instance creation (instance count = max).
309 _TestParallelInstanceCreationAndRemoval()
312 def TestParallelNodeCountInstanceCreationPerformance():
313 """PERFORMANCE: Parallel instance creation (instance count = node count).
316 nodes = list(_AcquireAllNodes())
317 _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes))
318 qa_config.ReleaseManyNodes(nodes)
321 def CreateAllInstances():
322 """Create all instances configured in QA config in the cluster.
324 @rtype: list of L{qa_config._QaInstance}
325 @return: list of instances created in the cluster
328 job_driver = _JobQueueDriver()
329 instances = list(_AcquireAllInstances())
330 for instance in instances:
331 job_id = _SubmitInstanceCreationJob(instance)
332 job_driver.AddJob(job_id)
334 job_driver.WaitForCompletion()
338 def RemoveAllInstances(instances):
339 """Removes all given instances from the cluster.
341 @type instances: list of L{qa_config._QaInstance}
345 job_driver = _JobQueueDriver()
346 for instance in instances:
347 job_id = _SubmitInstanceRemoveJob(instance)
348 job_driver.AddJob(job_id)
350 job_driver.WaitForCompletion()
353 def TestParallelModify(instances):
354 """PERFORMANCE: Parallel instance modify.
356 @type instances: list of L{qa_config._QaInstance}
357 @param instances: list of instances to issue modify commands against
360 job_driver = _JobQueueDriver()
361 # set min mem to same value as max mem
362 new_min_mem = qa_config.get(constants.BE_MAXMEM)
363 for instance in instances:
364 cmd = (["gnt-instance", "modify", "--submit",
365 "-B", "%s=%s" % (constants.BE_MINMEM, new_min_mem)])
366 cmd.append(instance.name)
367 job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
369 cmd = (["gnt-instance", "modify", "--submit",
370 "-O", "fake_os_param=fake_value"])
371 cmd.append(instance.name)
372 job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
374 cmd = (["gnt-instance", "modify", "--submit",
375 "-O", "fake_os_param=fake_value",
376 "-B", "%s=%s" % (constants.BE_MINMEM, new_min_mem)])
377 cmd.append(instance.name)
378 job_driver.AddJob(_ExecuteJobSubmittingCmd(cmd))
380 job_driver.WaitForCompletion()
383 def TestParallelInstanceOSOperations(instances):
384 """PERFORMANCE: Parallel instance OS operations.
386 Note: This test leaves the instances either running or stopped, there's no
387 guarantee on the actual status.
389 @type instances: list of L{qa_config._QaInstance}
390 @param instances: list of instances to issue lifecycle commands against
393 OPS = ["start", "shutdown", "reboot", "reinstall"]
394 job_driver = _JobQueueDriver()
396 def _SubmitNextOperation(instance, start, idx, job_driver, _):
399 op_idx = (start + idx) % len(OPS)
401 next_fn = functools.partial(_SubmitNextOperation, instance, start, idx + 1)
403 if OPS[op_idx] == "reinstall" and \
404 instance.disk_template == constants.DT_DISKLESS:
405 # no reinstall possible with diskless instances
406 next_fn(job_driver, None)
408 elif OPS[op_idx] == "reinstall":
409 # the instance has to be shut down for reinstall to work
410 shutdown_cmd = ["gnt-instance", "shutdown", "--submit", instance.name]
411 cmd = ["gnt-instance", "reinstall", "--submit", "-f", instance.name]
413 job_driver.AddJob(_ExecuteJobSubmittingCmd(shutdown_cmd),
414 running_fn=lambda _, __: job_driver.AddJob(
415 _ExecuteJobSubmittingCmd(cmd),
418 cmd = ["gnt-instance", OPS[op_idx], "--submit"]
419 if OPS[op_idx] == "reinstall":
421 cmd.append(instance.name)
423 job_id = _ExecuteJobSubmittingCmd(cmd)
424 job_driver.AddJob(job_id, running_fn=next_fn)
426 for start, instance in enumerate(instances):
427 _SubmitNextOperation(instance, start % len(OPS), 0, job_driver, None)
429 job_driver.WaitForCompletion()
432 # TODO(thomasth): move to qa_job_utils.py once stable-2.10 is merged to master
433 class _QAThreadGroup(object):
434 """This class manages a list of QAThreads.
440 def Start(self, thread):
441 """Starts the given thread and adds it to this group.
443 @type thread: qa_job_utils.QAThread
444 @param thread: the thread to start and to add to this group.
448 self._threads.append(thread)
450 def JoinAndReraise(self):
451 """Joins all threads in this group and calls their C{reraise} method.
454 for thread in self._threads:
459 def TestParallelInstanceQueries(instances):
460 """PERFORMANCE: Parallel instance queries.
462 @type instances: list of L{qa_config._QaInstance}
463 @param instances: list of instances to issue queries against
466 threads = _QAThreadGroup()
467 for instance in instances:
468 cmd = ["gnt-instance", "info", instance.name]
469 info_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
470 threads.Start(info_thread)
472 cmd = ["gnt-instance", "list"]
473 list_thread = qa_job_utils.QAThread(qa_utils.AssertCommand, [cmd], {})
474 threads.Start(list_thread)
476 threads.JoinAndReraise()
479 def TestJobQueueSubmissionPerformance():
480 """PERFORMANCE: Job queue submission performance.
482 This test exercises the job queue and verifies that the job submission time
483 does not increase as more jobs are added.
486 MAX_CLUSTER_INFO_SECONDS = 15.0
487 job_driver = _JobQueueDriver()
488 submission_durations = []
490 def _VerifySubmissionDuration(duration_seconds):
491 # only start to verify the submission duration once we got data from the
492 # first 10 job submissions
493 if len(submission_durations) >= 10:
494 avg_duration = sum(submission_durations) / len(submission_durations)
495 max_duration = avg_duration * 1.5
496 if duration_seconds > max_duration:
497 print(qa_logging.FormatWarning(
498 "Submitting a delay job took %f seconds, max %f expected" %
499 (duration_seconds, max_duration)))
501 submission_durations.append(duration_seconds)
503 def _SubmitDelayJob(count):
504 for _ in range(count):
505 cmd = ["gnt-debug", "delay", "--submit", "0.1"]
507 start = datetime.datetime.now()
508 job_id = _ExecuteJobSubmittingCmd(cmd)
510 qa_utils.TimedeltaToTotalSeconds(datetime.datetime.now() - start)
511 _VerifySubmissionDuration(duration_seconds)
513 job_driver.AddJob(job_id)
515 threads = _QAThreadGroup()
517 thread = qa_job_utils.QAThread(_SubmitDelayJob, [20], {})
518 threads.Start(thread)
520 threads.JoinAndReraise()
522 qa_utils.AssertCommand(["gnt-cluster", "info"],
523 max_seconds=MAX_CLUSTER_INFO_SECONDS)
525 job_driver.WaitForCompletion()
528 def TestParallelDRBDInstanceCreationPerformance():
529 """PERFORMANCE: Parallel DRBD backed instance creation.
532 assert qa_config.IsTemplateSupported(constants.DT_DRBD8)
534 nodes = list(_AcquireAllNodes())
535 _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2,
536 disk_template=constants.DT_DRBD8)
537 qa_config.ReleaseManyNodes(nodes)
540 def TestParallelPlainInstanceCreationPerformance():
541 """PERFORMANCE: Parallel plain backed instance creation.
544 assert qa_config.IsTemplateSupported(constants.DT_PLAIN)
546 nodes = list(_AcquireAllNodes())
547 _TestParallelInstanceCreationAndRemoval(max_instances=len(nodes) * 2,
548 disk_template=constants.DT_PLAIN)
549 qa_config.ReleaseManyNodes(nodes)
552 def _TestInstanceOperationInParallelToInstanceCreation(*cmds):
553 """Run the given test command in parallel to an instance creation.
555 @type cmds: list of list of strings
556 @param cmds: commands to execute in parallel to an instance creation. Each
557 command in the list is executed once the previous job starts
561 def _SubmitNextCommand(cmd_idx, job_driver, _):
562 if cmd_idx >= len(cmds):
564 job_id = _ExecuteJobSubmittingCmd(cmds[cmd_idx])
566 job_id, running_fn=functools.partial(_SubmitNextCommand, cmd_idx + 1))
568 assert qa_config.IsTemplateSupported(constants.DT_DRBD8)
571 job_driver = _JobQueueDriver()
572 _SubmitNextCommand(0, job_driver, None)
574 _TestParallelInstanceCreationAndRemoval(max_instances=1,
575 disk_template=constants.DT_DRBD8,
576 custom_job_driver=job_driver)
578 job_driver.WaitForCompletion()
581 def TestParallelInstanceFailover(instance):
582 """PERFORMANCE: Instance failover with parallel instance creation.
585 _TestInstanceOperationInParallelToInstanceCreation(
586 ["gnt-instance", "failover", "--submit", "-f", "--shutdown-timeout=0",
590 def TestParallelInstanceMigration(instance):
591 """PERFORMANCE: Instance migration with parallel instance creation.
594 _TestInstanceOperationInParallelToInstanceCreation(
595 ["gnt-instance", "migrate", "--submit", "-f", instance.name])
598 def TestParallelInstanceReplaceDisks(instance):
599 """PERFORMANCE: Instance replace-disks with parallel instance creation.
602 _TestInstanceOperationInParallelToInstanceCreation(
603 ["gnt-instance", "replace-disks", "--submit", "--early-release", "-p",
607 def TestParallelInstanceReboot(instance):
608 """PERFORMANCE: Instance reboot with parallel instance creation.
611 _TestInstanceOperationInParallelToInstanceCreation(
612 ["gnt-instance", "reboot", "--submit", instance.name])
615 def TestParallelInstanceReinstall(instance):
616 """PERFORMANCE: Instance reinstall with parallel instance creation.
619 # instance reinstall requires the instance to be down
620 qa_utils.AssertCommand(["gnt-instance", "stop", instance.name])
622 _TestInstanceOperationInParallelToInstanceCreation(
623 ["gnt-instance", "reinstall", "--submit", "-f", instance.name])
625 qa_utils.AssertCommand(["gnt-instance", "start", instance.name])
628 def TestParallelInstanceRename(instance):
629 """PERFORMANCE: Instance rename with parallel instance creation.
632 # instance rename requires the instance to be down
633 qa_utils.AssertCommand(["gnt-instance", "stop", instance.name])
635 new_instance = qa_config.AcquireInstance()
637 _TestInstanceOperationInParallelToInstanceCreation(
638 ["gnt-instance", "rename", "--submit", instance.name, new_instance.name],
639 ["gnt-instance", "rename", "--submit", new_instance.name, instance.name])
641 new_instance.Release()
643 qa_utils.AssertCommand(["gnt-instance", "start", instance.name])