grow-disk: wait until resync is completed
[ganeti-local] / tools / burnin
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Burnin program
23
24 """
25
26 import os
27 import sys
28 import optparse
29 import time
30 from itertools import izip, islice, cycle
31 from cStringIO import StringIO
32
33 from ganeti import opcodes
34 from ganeti import mcpu
35 from ganeti import constants
36 from ganeti import cli
37 from ganeti import logger
38 from ganeti import errors
39 from ganeti import utils
40
41
42 USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...")
43
44
45 def Usage():
46   """Shows program usage information and exits the program."""
47
48   print >> sys.stderr, "Usage:"
49   print >> sys.stderr, USAGE
50   sys.exit(2)
51
52
53 def Log(msg):
54   """Simple function that prints out its argument.
55
56   """
57   print msg
58   sys.stdout.flush()
59
60
61 class Burner(object):
62   """Burner class."""
63
64   def __init__(self):
65     """Constructor."""
66     logger.SetupLogging(constants.LOG_BURNIN, debug=False, stderr_logging=True)
67     self._feed_buf = StringIO()
68     self.nodes = []
69     self.instances = []
70     self.to_rem = []
71     self.opts = None
72     self.cl = cli.GetClient()
73     self.ParseOptions()
74     self.GetState()
75
76   def ClearFeedbackBuf(self):
77     """Clear the feedback buffer."""
78     self._feed_buf.truncate(0)
79
80   def GetFeedbackBuf(self):
81     """Return the contents of the buffer."""
82     return self._feed_buf.getvalue()
83
84   def Feedback(self, msg):
85     """Acumulate feedback in our buffer."""
86     self._feed_buf.write("%s %s\n" % (time.ctime(utils.MergeTime(msg[0])),
87                                       msg[2]))
88     if self.opts.verbose:
89       Log(msg)
90
91   def ExecOp(self, op):
92     """Execute an opcode and manage the exec buffer."""
93     self.ClearFeedbackBuf()
94     return cli.SubmitOpCode(op, feedback_fn=self.Feedback, cl=self.cl)
95
96   def ExecJobSet(self, jobs):
97     """Execute a set of jobs and return once all are done.
98
99     The method will return the list of results, if all jobs are
100     successfull. Otherwise, OpExecError will be raised from within
101     cli.py.
102
103     """
104     self.ClearFeedbackBuf()
105     job_ids = [cli.SendJob(job, cl=self.cl) for job in jobs]
106     Log("- Submitted job IDs %s" % ", ".join(job_ids))
107     results = []
108     for jid in job_ids:
109       Log("- Waiting for job %s" % jid)
110       results.append(cli.PollJob(jid, cl=self.cl, feedback_fn=self.Feedback))
111
112     return results
113
114   def ParseOptions(self):
115     """Parses the command line options.
116
117     In case of command line errors, it will show the usage and exit the
118     program.
119
120     """
121
122     parser = optparse.OptionParser(usage="\n%s" % USAGE,
123                                    version="%%prog (ganeti) %s" %
124                                    constants.RELEASE_VERSION,
125                                    option_class=cli.CliOption)
126
127     parser.add_option("-o", "--os", dest="os", default=None,
128                       help="OS to use during burnin",
129                       metavar="<OS>")
130     parser.add_option("--os-size", dest="os_size", help="Disk size",
131                       default=4 * 1024, type="unit", metavar="<size>")
132     parser.add_option("--os-growth", dest="sda_growth", help="Disk growth",
133                       default=1024, type="unit", metavar="<size>")
134     parser.add_option("--swap-size", dest="swap_size", help="Swap size",
135                       default=4 * 1024, type="unit", metavar="<size>")
136     parser.add_option("--swap-growth", dest="sdb_growth", help="Swap growth",
137                       default=1024, type="unit", metavar="<size>")
138     parser.add_option("--mem-size", dest="mem_size", help="Memory size",
139                       default=128, type="unit", metavar="<size>")
140     parser.add_option("-v", "--verbose",
141                       action="store_true", dest="verbose", default=False,
142                       help="print command execution messages to stdout")
143     parser.add_option("--no-replace1", dest="do_replace1",
144                       help="Skip disk replacement with the same secondary",
145                       action="store_false", default=True)
146     parser.add_option("--no-replace2", dest="do_replace2",
147                       help="Skip disk replacement with a different secondary",
148                       action="store_false", default=True)
149     parser.add_option("--no-failover", dest="do_failover",
150                       help="Skip instance failovers", action="store_false",
151                       default=True)
152     parser.add_option("--no-importexport", dest="do_importexport",
153                       help="Skip instance export/import", action="store_false",
154                       default=True)
155     parser.add_option("--no-startstop", dest="do_startstop",
156                       help="Skip instance stop/start", action="store_false",
157                       default=True)
158     parser.add_option("--rename", dest="rename", default=None,
159                       help="Give one unused instance name which is taken"
160                            " to start the renaming sequence",
161                       metavar="<instance_name>")
162     parser.add_option("-t", "--disk-template", dest="disk_template",
163                       choices=("diskless", "file", "plain", "drbd"),
164                       default="drbd",
165                       help="Disk template (diskless, file, plain or drbd)"
166                             " [drbd]")
167     parser.add_option("-n", "--nodes", dest="nodes", default="",
168                       help="Comma separated list of nodes to perform"
169                       " the burnin on (defaults to all nodes)")
170     parser.add_option("--iallocator", dest="iallocator",
171                       default=None, type="string",
172                       help="Perform the allocation using an iallocator"
173                       " instead of fixed node spread (node restrictions no"
174                       " longer apply, therefore -n/--nodes must not be used")
175     parser.add_option("-p", "--parallel", default=False, action="store_true",
176                       dest="parallel",
177                       help="Enable parallelization of some operations in"
178                       " order to speed burnin or to test granular locking")
179
180     options, args = parser.parse_args()
181     if len(args) < 1 or options.os is None:
182       Usage()
183
184     supported_disk_templates = (constants.DT_DISKLESS,
185                                 constants.DT_FILE,
186                                 constants.DT_PLAIN,
187                                 constants.DT_DRBD8)
188     if options.disk_template not in supported_disk_templates:
189       Log("Unknown disk template '%s'" % options.disk_template)
190       sys.exit(1)
191
192     if options.nodes and options.iallocator:
193       Log("Give either the nodes option or the iallocator option, not both")
194       sys.exit(1)
195
196     self.opts = options
197     self.instances = args
198     self.bep = {
199       constants.BE_MEMORY: options.mem_size,
200       constants.BE_VCPUS: 1,
201       }
202     self.hvp = {}
203
204   def GetState(self):
205     """Read the cluster state from the config."""
206     if self.opts.nodes:
207       names = self.opts.nodes.split(",")
208     else:
209       names = []
210     try:
211       op = opcodes.OpQueryNodes(output_fields=["name"], names=names)
212       result = self.ExecOp(op)
213     except errors.GenericError, err:
214       err_code, msg = cli.FormatError(err)
215       Log(msg)
216       sys.exit(err_code)
217     self.nodes = [data[0] for data in result]
218
219     result = self.ExecOp(opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
220                                               names=[]))
221
222     if not result:
223       Log("Can't get the OS list")
224       sys.exit(1)
225
226     # filter non-valid OS-es
227     os_set = [val[0] for val in result if val[1]]
228
229     if self.opts.os not in os_set:
230       Log("OS '%s' not found" % self.opts.os)
231       sys.exit(1)
232
233   def CreateInstances(self):
234     """Create the given instances.
235
236     """
237     self.to_rem = []
238     mytor = izip(cycle(self.nodes),
239                  islice(cycle(self.nodes), 1, None),
240                  self.instances)
241     jobset = []
242
243     for pnode, snode, instance in mytor:
244       if self.opts.iallocator:
245         pnode = snode = None
246         Log("- Add instance %s (iallocator: %s)" %
247               (instance, self.opts.iallocator))
248       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
249         snode = None
250         Log("- Add instance %s on node %s" % (instance, pnode))
251       else:
252         Log("- Add instance %s on nodes %s/%s" % (instance, pnode, snode))
253
254       op = opcodes.OpCreateInstance(instance_name=instance,
255                                     disk_size=self.opts.os_size,
256                                     swap_size=self.opts.swap_size,
257                                     disk_template=self.opts.disk_template,
258                                     mode=constants.INSTANCE_CREATE,
259                                     os_type=self.opts.os,
260                                     pnode=pnode,
261                                     snode=snode,
262                                     start=True,
263                                     ip_check=True,
264                                     wait_for_sync=True,
265                                     mac="auto",
266                                     file_driver="loop",
267                                     file_storage_dir=None,
268                                     iallocator=self.opts.iallocator,
269                                     beparams=self.bep,
270                                     hvparams=self.hvp,
271                                     )
272
273       if self.opts.parallel:
274         jobset.append([op])
275         # FIXME: here we should not append to to_rem uncoditionally,
276         # but only when the job is successful
277         self.to_rem.append(instance)
278       else:
279         self.ExecOp(op)
280         self.to_rem.append(instance)
281     if self.opts.parallel:
282       self.ExecJobSet(jobset)
283
284   def GrowDisks(self):
285     """Grow both the os and the swap disks by the requested amount, if any."""
286     for instance in self.instances:
287       for disk in ['sda', 'sdb']:
288         growth = getattr(self.opts, '%s_growth' % disk)
289         if growth > 0:
290           op = opcodes.OpGrowDisk(instance_name=instance, disk=disk,
291                                   amount=growth, wait_for_sync=True)
292           Log("- Increase %s's %s disk by %s MB" % (instance, disk, growth))
293           self.ExecOp(op)
294
295   def ReplaceDisks1D8(self):
296     """Replace disks on primary and secondary for drbd8."""
297     for instance in self.instances:
298       for mode in constants.REPLACE_DISK_SEC, constants.REPLACE_DISK_PRI:
299         op = opcodes.OpReplaceDisks(instance_name=instance,
300                                     mode=mode,
301                                     disks=["sda", "sdb"])
302         Log("- Replace disks (%s) for instance %s" % (mode, instance))
303         self.ExecOp(op)
304
305   def ReplaceDisks2(self):
306     """Replace secondary node."""
307     mode = constants.REPLACE_DISK_SEC
308
309     mytor = izip(islice(cycle(self.nodes), 2, None),
310                  self.instances)
311     for tnode, instance in mytor:
312       if self.opts.iallocator:
313         tnode = None
314       op = opcodes.OpReplaceDisks(instance_name=instance,
315                                   mode=mode,
316                                   remote_node=tnode,
317                                   iallocator=self.opts.iallocator,
318                                   disks=["sda", "sdb"])
319       Log("- Replace secondary (%s) for instance %s" % (mode, instance))
320       self.ExecOp(op)
321
322   def Failover(self):
323     """Failover the instances."""
324
325     for instance in self.instances:
326       op = opcodes.OpFailoverInstance(instance_name=instance,
327                                       ignore_consistency=False)
328
329       Log("- Failover instance %s" % (instance))
330       self.ExecOp(op)
331
332   def ImportExport(self):
333     """Export the instance, delete it, and import it back.
334
335     """
336
337     mytor = izip(cycle(self.nodes),
338                  islice(cycle(self.nodes), 1, None),
339                  islice(cycle(self.nodes), 2, None),
340                  self.instances)
341
342     for pnode, snode, enode, instance in mytor:
343
344       if self.opts.iallocator:
345         pnode = snode = None
346         import_log_msg = ("- Import instance %s from node %s (iallocator: %s)" %
347                           (instance, enode, self.opts.iallocator))
348       elif self.opts.disk_template not in constants.DTS_NET_MIRROR:
349         snode = None
350         import_log_msg = ("- Import instance %s from node %s to node %s" %
351                           (instance, enode, pnode))
352       else:
353         import_log_msg = ("- Import instance %s from node %s to nodes %s/%s" %
354                           (instance, enode, pnode, snode))
355
356       exp_op = opcodes.OpExportInstance(instance_name=instance,
357                                            target_node=enode,
358                                            shutdown=True)
359       rem_op = opcodes.OpRemoveInstance(instance_name=instance,
360                                         ignore_failures=True)
361       nam_op = opcodes.OpQueryInstances(output_fields=["name"],
362                                            names=[instance])
363       full_name = self.ExecOp(nam_op)[0][0]
364       imp_dir = os.path.join(constants.EXPORT_DIR, full_name)
365       imp_op = opcodes.OpCreateInstance(instance_name=instance,
366                                         disk_size=self.opts.os_size,
367                                         swap_size=self.opts.swap_size,
368                                         disk_template=self.opts.disk_template,
369                                         mode=constants.INSTANCE_IMPORT,
370                                         src_node=enode,
371                                         src_path=imp_dir,
372                                         pnode=pnode,
373                                         snode=snode,
374                                         start=True,
375                                         ip_check=True,
376                                         wait_for_sync=True,
377                                         mac="auto",
378                                         file_storage_dir=None,
379                                         file_driver=None,
380                                         iallocator=self.opts.iallocator,
381                                         beparams=self.bep,
382                                         hvparams=self.hvp,
383                                         )
384
385       erem_op = opcodes.OpRemoveExport(instance_name=instance)
386
387       Log("- Export instance %s to node %s" % (instance, enode))
388       self.ExecOp(exp_op)
389       Log("- Remove instance %s" % (instance))
390       self.ExecOp(rem_op)
391       self.to_rem.remove(instance)
392       Log(import_log_msg)
393       self.ExecOp(imp_op)
394       Log("- Remove export of instance %s" % (instance))
395       self.ExecOp(erem_op)
396
397       self.to_rem.append(instance)
398
399   def StopInstance(self, instance):
400     """Stop given instance."""
401     op = opcodes.OpShutdownInstance(instance_name=instance)
402     Log("- Shutdown instance %s" % instance)
403     self.ExecOp(op)
404
405   def StartInstance(self, instance):
406     """Start given instance."""
407     op = opcodes.OpStartupInstance(instance_name=instance, force=False)
408     Log("- Start instance %s" % instance)
409     self.ExecOp(op)
410
411   def RenameInstance(self, instance, instance_new):
412     """Rename instance."""
413     op = opcodes.OpRenameInstance(instance_name=instance,
414                                   new_name=instance_new)
415     Log("- Rename instance %s to %s" % (instance, instance_new))
416     self.ExecOp(op)
417
418   def StopStart(self):
419     """Stop/start the instances."""
420     for instance in self.instances:
421       self.StopInstance(instance)
422       self.StartInstance(instance)
423
424   def Remove(self):
425     """Remove the instances."""
426     for instance in self.to_rem:
427       op = opcodes.OpRemoveInstance(instance_name=instance,
428                                     ignore_failures=True)
429       Log("- Remove instance %s" % instance)
430       self.ExecOp(op)
431
432
433   def Rename(self):
434     """Rename the instances."""
435     rename = self.opts.rename
436     for instance in self.instances:
437       self.StopInstance(instance)
438       self.RenameInstance(instance, rename)
439       self.StartInstance(rename)
440       self.StopInstance(rename)
441       self.RenameInstance(rename, instance)
442       self.StartInstance(instance)
443
444   def BurninCluster(self):
445     """Test a cluster intensively.
446
447     This will create instances and then start/stop/failover them.
448     It is safe for existing instances but could impact performance.
449
450     """
451
452     opts = self.opts
453
454     Log("- Testing global parameters")
455
456     if (len(self.nodes) == 1 and
457         opts.disk_template not in (constants.DT_DISKLESS, constants.DT_PLAIN,
458                                    constants.DT_FILE)):
459       Log("When one node is available/selected the disk template must"
460           " be 'diskless', 'file' or 'plain'")
461       sys.exit(1)
462
463     has_err = True
464     try:
465       self.CreateInstances()
466       if opts.do_replace1 and opts.disk_template in constants.DTS_NET_MIRROR:
467         self.ReplaceDisks1D8()
468       if (opts.do_replace2 and len(self.nodes) > 2 and
469           opts.disk_template in constants.DTS_NET_MIRROR) :
470         self.ReplaceDisks2()
471
472       if opts.disk_template != constants.DT_DISKLESS:
473         self.GrowDisks()
474
475       if opts.do_failover and opts.disk_template in constants.DTS_NET_MIRROR:
476         self.Failover()
477
478       if opts.do_importexport:
479         self.ImportExport()
480
481       if opts.do_startstop:
482         self.StopStart()
483
484       if opts.rename:
485         self.Rename()
486
487       has_err = False
488     finally:
489       if has_err:
490         Log("Error detected: opcode buffer follows:\n\n")
491         Log(self.GetFeedbackBuf())
492         Log("\n\n")
493       self.Remove()
494
495     return 0
496
497
498 def main():
499   """Main function"""
500
501   burner = Burner()
502   return burner.BurninCluster()
503
504
505 if __name__ == "__main__":
506   main()