Add correct locking of master node to gnt-debug delay
[ganeti-local] / lib / cmdlib / backup.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with backup operations."""
23
24 import OpenSSL
25 import logging
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import locking
31 from ganeti import masterd
32 from ganeti import qlang
33 from ganeti import query
34 from ganeti import utils
35
36 from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
37 from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
38   ExpandNodeName
39 from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
40   ShutdownInstanceDisks
41 from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
42   BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
43
44
45 class ExportQuery(QueryBase):
46   FIELDS = query.EXPORT_FIELDS
47
48   #: The node name is not a unique key for this query
49   SORT_FIELD = "node"
50
51   def ExpandNames(self, lu):
52     lu.needed_locks = {}
53
54     # The following variables interact with _QueryBase._GetNames
55     if self.names:
56       self.wanted = GetWantedNodes(lu, self.names)
57     else:
58       self.wanted = locking.ALL_SET
59
60     self.do_locking = self.use_locking
61
62     if self.do_locking:
63       lu.share_locks = ShareAll()
64       lu.needed_locks = {
65         locking.LEVEL_NODE: self.wanted,
66         }
67
68       if not self.names:
69         lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70
71   def DeclareLocks(self, lu, level):
72     pass
73
74   def _GetQueryData(self, lu):
75     """Computes the list of nodes and their attributes.
76
77     """
78     # Locking is not used
79     # TODO
80     assert not (compat.any(lu.glm.is_owned(level)
81                            for level in locking.LEVELS
82                            if level != locking.LEVEL_CLUSTER) or
83                 self.do_locking or self.use_locking)
84
85     nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86
87     result = []
88
89     for (node, nres) in lu.rpc.call_export_list(nodes).items():
90       if nres.fail_msg:
91         result.append((node, None))
92       else:
93         result.extend((node, expname) for expname in nres.payload)
94
95     return result
96
97
98 class LUBackupQuery(NoHooksLU):
99   """Query the exports list
100
101   """
102   REQ_BGL = False
103
104   def CheckArguments(self):
105     self.expq = ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
106                             ["node", "export"], self.op.use_locking)
107
108   def ExpandNames(self):
109     self.expq.ExpandNames(self)
110
111   def DeclareLocks(self, level):
112     self.expq.DeclareLocks(self, level)
113
114   def Exec(self, feedback_fn):
115     result = {}
116
117     for (node, expname) in self.expq.OldStyleQuery(self):
118       if expname is None:
119         result[node] = False
120       else:
121         result.setdefault(node, []).append(expname)
122
123     return result
124
125
126 class LUBackupPrepare(NoHooksLU):
127   """Prepares an instance for an export and returns useful information.
128
129   """
130   REQ_BGL = False
131
132   def ExpandNames(self):
133     self._ExpandAndLockInstance()
134
135   def CheckPrereq(self):
136     """Check prerequisites.
137
138     """
139     instance_name = self.op.instance_name
140
141     self.instance = self.cfg.GetInstanceInfo(instance_name)
142     assert self.instance is not None, \
143           "Cannot retrieve locked instance %s" % self.op.instance_name
144     CheckNodeOnline(self, self.instance.primary_node)
145
146     self._cds = GetClusterDomainSecret()
147
148   def Exec(self, feedback_fn):
149     """Prepares an instance for an export.
150
151     """
152     instance = self.instance
153
154     if self.op.mode == constants.EXPORT_MODE_REMOTE:
155       salt = utils.GenerateSecret(8)
156
157       feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
158       result = self.rpc.call_x509_cert_create(instance.primary_node,
159                                               constants.RIE_CERT_VALIDITY)
160       result.Raise("Can't create X509 key and certificate on %s" % result.node)
161
162       (name, cert_pem) = result.payload
163
164       cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
165                                              cert_pem)
166
167       return {
168         "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
169         "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
170                           salt),
171         "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
172         }
173
174     return None
175
176
177 class LUBackupExport(LogicalUnit):
178   """Export an instance to an image in the cluster.
179
180   """
181   HPATH = "instance-export"
182   HTYPE = constants.HTYPE_INSTANCE
183   REQ_BGL = False
184
185   def CheckArguments(self):
186     """Check the arguments.
187
188     """
189     self.x509_key_name = self.op.x509_key_name
190     self.dest_x509_ca_pem = self.op.destination_x509_ca
191
192     if self.op.mode == constants.EXPORT_MODE_REMOTE:
193       if not self.x509_key_name:
194         raise errors.OpPrereqError("Missing X509 key name for encryption",
195                                    errors.ECODE_INVAL)
196
197       if not self.dest_x509_ca_pem:
198         raise errors.OpPrereqError("Missing destination X509 CA",
199                                    errors.ECODE_INVAL)
200
201   def ExpandNames(self):
202     self._ExpandAndLockInstance()
203
204     # Lock all nodes for local exports
205     if self.op.mode == constants.EXPORT_MODE_LOCAL:
206       # FIXME: lock only instance primary and destination node
207       #
208       # Sad but true, for now we have do lock all nodes, as we don't know where
209       # the previous export might be, and in this LU we search for it and
210       # remove it from its current node. In the future we could fix this by:
211       #  - making a tasklet to search (share-lock all), then create the
212       #    new one, then one to remove, after
213       #  - removing the removal operation altogether
214       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
215
216       # Allocations should be stopped while this LU runs with node locks, but
217       # it doesn't have to be exclusive
218       self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
219       self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
220
221   def DeclareLocks(self, level):
222     """Last minute lock declaration."""
223     # All nodes are locked anyway, so nothing to do here.
224
225   def BuildHooksEnv(self):
226     """Build hooks env.
227
228     This will run on the master, primary node and target node.
229
230     """
231     env = {
232       "EXPORT_MODE": self.op.mode,
233       "EXPORT_NODE": self.op.target_node,
234       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
235       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
236       # TODO: Generic function for boolean env variables
237       "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
238       }
239
240     env.update(BuildInstanceHookEnvByObject(self, self.instance))
241
242     return env
243
244   def BuildHooksNodes(self):
245     """Build hooks nodes.
246
247     """
248     nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
249
250     if self.op.mode == constants.EXPORT_MODE_LOCAL:
251       nl.append(self.op.target_node)
252
253     return (nl, nl)
254
255   def CheckPrereq(self):
256     """Check prerequisites.
257
258     This checks that the instance and node names are valid.
259
260     """
261     instance_name = self.op.instance_name
262
263     self.instance = self.cfg.GetInstanceInfo(instance_name)
264     assert self.instance is not None, \
265           "Cannot retrieve locked instance %s" % self.op.instance_name
266     CheckNodeOnline(self, self.instance.primary_node)
267
268     if (self.op.remove_instance and
269         self.instance.admin_state == constants.ADMINST_UP and
270         not self.op.shutdown):
271       raise errors.OpPrereqError("Can not remove instance without shutting it"
272                                  " down before", errors.ECODE_STATE)
273
274     if self.op.mode == constants.EXPORT_MODE_LOCAL:
275       self.op.target_node = ExpandNodeName(self.cfg, self.op.target_node)
276       self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
277       assert self.dst_node is not None
278
279       CheckNodeOnline(self, self.dst_node.name)
280       CheckNodeNotDrained(self, self.dst_node.name)
281
282       self._cds = None
283       self.dest_disk_info = None
284       self.dest_x509_ca = None
285
286     elif self.op.mode == constants.EXPORT_MODE_REMOTE:
287       self.dst_node = None
288
289       if len(self.op.target_node) != len(self.instance.disks):
290         raise errors.OpPrereqError(("Received destination information for %s"
291                                     " disks, but instance %s has %s disks") %
292                                    (len(self.op.target_node), instance_name,
293                                     len(self.instance.disks)),
294                                    errors.ECODE_INVAL)
295
296       cds = GetClusterDomainSecret()
297
298       # Check X509 key name
299       try:
300         (key_name, hmac_digest, hmac_salt) = self.x509_key_name
301       except (TypeError, ValueError), err:
302         raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
303                                    errors.ECODE_INVAL)
304
305       if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
306         raise errors.OpPrereqError("HMAC for X509 key name is wrong",
307                                    errors.ECODE_INVAL)
308
309       # Load and verify CA
310       try:
311         (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
312       except OpenSSL.crypto.Error, err:
313         raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
314                                    (err, ), errors.ECODE_INVAL)
315
316       (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
317       if errcode is not None:
318         raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
319                                    (msg, ), errors.ECODE_INVAL)
320
321       self.dest_x509_ca = cert
322
323       # Verify target information
324       disk_info = []
325       for idx, disk_data in enumerate(self.op.target_node):
326         try:
327           (host, port, magic) = \
328             masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
329         except errors.GenericError, err:
330           raise errors.OpPrereqError("Target info for disk %s: %s" %
331                                      (idx, err), errors.ECODE_INVAL)
332
333         disk_info.append((host, port, magic))
334
335       assert len(disk_info) == len(self.op.target_node)
336       self.dest_disk_info = disk_info
337
338     else:
339       raise errors.ProgrammerError("Unhandled export mode %r" %
340                                    self.op.mode)
341
342     # instance disk type verification
343     # TODO: Implement export support for file-based disks
344     for disk in self.instance.disks:
345       if disk.dev_type == constants.LD_FILE:
346         raise errors.OpPrereqError("Export not supported for instances with"
347                                    " file-based disks", errors.ECODE_INVAL)
348
349   def _CleanupExports(self, feedback_fn):
350     """Removes exports of current instance from all other nodes.
351
352     If an instance in a cluster with nodes A..D was exported to node C, its
353     exports will be removed from the nodes A, B and D.
354
355     """
356     assert self.op.mode != constants.EXPORT_MODE_REMOTE
357
358     nodelist = self.cfg.GetNodeList()
359     nodelist.remove(self.dst_node.name)
360
361     # on one-node clusters nodelist will be empty after the removal
362     # if we proceed the backup would be removed because OpBackupQuery
363     # substitutes an empty list with the full cluster node list.
364     iname = self.instance.name
365     if nodelist:
366       feedback_fn("Removing old exports for instance %s" % iname)
367       exportlist = self.rpc.call_export_list(nodelist)
368       for node in exportlist:
369         if exportlist[node].fail_msg:
370           continue
371         if iname in exportlist[node].payload:
372           msg = self.rpc.call_export_remove(node, iname).fail_msg
373           if msg:
374             self.LogWarning("Could not remove older export for instance %s"
375                             " on node %s: %s", iname, node, msg)
376
377   def Exec(self, feedback_fn):
378     """Export an instance to an image in the cluster.
379
380     """
381     assert self.op.mode in constants.EXPORT_MODES
382
383     instance = self.instance
384     src_node = instance.primary_node
385
386     if self.op.shutdown:
387       # shutdown the instance, but not the disks
388       feedback_fn("Shutting down instance %s" % instance.name)
389       result = self.rpc.call_instance_shutdown(src_node, instance,
390                                                self.op.shutdown_timeout,
391                                                self.op.reason)
392       # TODO: Maybe ignore failures if ignore_remove_failures is set
393       result.Raise("Could not shutdown instance %s on"
394                    " node %s" % (instance.name, src_node))
395
396     # set the disks ID correctly since call_instance_start needs the
397     # correct drbd minor to create the symlinks
398     for disk in instance.disks:
399       self.cfg.SetDiskID(disk, src_node)
400
401     activate_disks = not instance.disks_active
402
403     if activate_disks:
404       # Activate the instance disks if we'exporting a stopped instance
405       feedback_fn("Activating disks for %s" % instance.name)
406       StartInstanceDisks(self, instance, None)
407
408     try:
409       helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
410                                                      instance)
411
412       helper.CreateSnapshots()
413       try:
414         if (self.op.shutdown and
415             instance.admin_state == constants.ADMINST_UP and
416             not self.op.remove_instance):
417           assert not activate_disks
418           feedback_fn("Starting instance %s" % instance.name)
419           result = self.rpc.call_instance_start(src_node,
420                                                 (instance, None, None), False,
421                                                  self.op.reason)
422           msg = result.fail_msg
423           if msg:
424             feedback_fn("Failed to start instance: %s" % msg)
425             ShutdownInstanceDisks(self, instance)
426             raise errors.OpExecError("Could not start instance: %s" % msg)
427
428         if self.op.mode == constants.EXPORT_MODE_LOCAL:
429           (fin_resu, dresults) = helper.LocalExport(self.dst_node)
430         elif self.op.mode == constants.EXPORT_MODE_REMOTE:
431           connect_timeout = constants.RIE_CONNECT_TIMEOUT
432           timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
433
434           (key_name, _, _) = self.x509_key_name
435
436           dest_ca_pem = \
437             OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
438                                             self.dest_x509_ca)
439
440           (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
441                                                      key_name, dest_ca_pem,
442                                                      timeouts)
443       finally:
444         helper.Cleanup()
445
446       # Check for backwards compatibility
447       assert len(dresults) == len(instance.disks)
448       assert compat.all(isinstance(i, bool) for i in dresults), \
449              "Not all results are boolean: %r" % dresults
450
451     finally:
452       if activate_disks:
453         feedback_fn("Deactivating disks for %s" % instance.name)
454         ShutdownInstanceDisks(self, instance)
455
456     if not (compat.all(dresults) and fin_resu):
457       failures = []
458       if not fin_resu:
459         failures.append("export finalization")
460       if not compat.all(dresults):
461         fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
462                                if not dsk)
463         failures.append("disk export: disk(s) %s" % fdsk)
464
465       raise errors.OpExecError("Export failed, errors in %s" %
466                                utils.CommaJoin(failures))
467
468     # At this point, the export was successful, we can cleanup/finish
469
470     # Remove instance if requested
471     if self.op.remove_instance:
472       feedback_fn("Removing instance %s" % instance.name)
473       RemoveInstance(self, feedback_fn, instance,
474                      self.op.ignore_remove_failures)
475
476     if self.op.mode == constants.EXPORT_MODE_LOCAL:
477       self._CleanupExports(feedback_fn)
478
479     return fin_resu, dresults
480
481
482 class LUBackupRemove(NoHooksLU):
483   """Remove exports related to the named instance.
484
485   """
486   REQ_BGL = False
487
488   def ExpandNames(self):
489     self.needed_locks = {
490       # We need all nodes to be locked in order for RemoveExport to work, but
491       # we don't need to lock the instance itself, as nothing will happen to it
492       # (and we can remove exports also for a removed instance)
493       locking.LEVEL_NODE: locking.ALL_SET,
494
495       # Removing backups is quick, so blocking allocations is justified
496       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
497       }
498
499     # Allocations should be stopped while this LU runs with node locks, but it
500     # doesn't have to be exclusive
501     self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
502
503   def Exec(self, feedback_fn):
504     """Remove any export.
505
506     """
507     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
508     # If the instance was not found we'll try with the name that was passed in.
509     # This will only work if it was an FQDN, though.
510     fqdn_warn = False
511     if not instance_name:
512       fqdn_warn = True
513       instance_name = self.op.instance_name
514
515     locked_nodes = self.owned_locks(locking.LEVEL_NODE)
516     exportlist = self.rpc.call_export_list(locked_nodes)
517     found = False
518     for node in exportlist:
519       msg = exportlist[node].fail_msg
520       if msg:
521         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
522         continue
523       if instance_name in exportlist[node].payload:
524         found = True
525         result = self.rpc.call_export_remove(node, instance_name)
526         msg = result.fail_msg
527         if msg:
528           logging.error("Could not remove export for instance %s"
529                         " on node %s: %s", instance_name, node, msg)
530
531     if fqdn_warn and not found:
532       feedback_fn("Export not found. If trying to remove an export belonging"
533                   " to a deleted instance please use its Fully Qualified"
534                   " Domain Name.")