Fix a node name vs. UUID bug in instance import
[ganeti-local] / lib / cmdlib / backup.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with backup operations."""
23
24 import OpenSSL
25 import logging
26
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import locking
31 from ganeti import masterd
32 from ganeti import qlang
33 from ganeti import query
34 from ganeti import utils
35
36 from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
37 from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
38   ExpandNodeUuidAndName
39 from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
40   ShutdownInstanceDisks
41 from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
42   BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
43
44
45 class ExportQuery(QueryBase):
46   FIELDS = query.EXPORT_FIELDS
47
48   #: The node name is not a unique key for this query
49   SORT_FIELD = "node"
50
51   def ExpandNames(self, lu):
52     lu.needed_locks = {}
53
54     # The following variables interact with _QueryBase._GetNames
55     if self.names:
56       (self.wanted, _) = GetWantedNodes(lu, self.names)
57     else:
58       self.wanted = locking.ALL_SET
59
60     self.do_locking = self.use_locking
61
62     if self.do_locking:
63       lu.share_locks = ShareAll()
64       lu.needed_locks = {
65         locking.LEVEL_NODE: self.wanted,
66         }
67
68       if not self.names:
69         lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70
71   def DeclareLocks(self, lu, level):
72     pass
73
74   def _GetQueryData(self, lu):
75     """Computes the list of nodes and their attributes.
76
77     """
78     # Locking is not used
79     # TODO
80     assert not (compat.any(lu.glm.is_owned(level)
81                            for level in locking.LEVELS
82                            if level != locking.LEVEL_CLUSTER) or
83                 self.do_locking or self.use_locking)
84
85     node_uuids = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86
87     result = []
88
89     for (node_uuid, nres) in lu.rpc.call_export_list(node_uuids).items():
90       if nres.fail_msg:
91         result.append((node_uuid, None))
92       else:
93         result.extend((node_uuid, expname) for expname in nres.payload)
94
95     return result
96
97
98 class LUBackupQuery(NoHooksLU):
99   """Query the exports list
100
101   """
102   REQ_BGL = False
103
104   def CheckArguments(self):
105     self.expq = ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
106                             ["node", "export"], self.op.use_locking)
107
108   def ExpandNames(self):
109     self.expq.ExpandNames(self)
110
111   def DeclareLocks(self, level):
112     self.expq.DeclareLocks(self, level)
113
114   def Exec(self, feedback_fn):
115     result = {}
116
117     for (node, expname) in self.expq.OldStyleQuery(self):
118       if expname is None:
119         result[node] = False
120       else:
121         result.setdefault(node, []).append(expname)
122
123     return result
124
125
126 class LUBackupPrepare(NoHooksLU):
127   """Prepares an instance for an export and returns useful information.
128
129   """
130   REQ_BGL = False
131
132   def ExpandNames(self):
133     self._ExpandAndLockInstance()
134
135   def CheckPrereq(self):
136     """Check prerequisites.
137
138     """
139     self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
140     assert self.instance is not None, \
141           "Cannot retrieve locked instance %s" % self.op.instance_name
142     CheckNodeOnline(self, self.instance.primary_node)
143
144     self._cds = GetClusterDomainSecret()
145
146   def Exec(self, feedback_fn):
147     """Prepares an instance for an export.
148
149     """
150     if self.op.mode == constants.EXPORT_MODE_REMOTE:
151       salt = utils.GenerateSecret(8)
152
153       feedback_fn("Generating X509 certificate on %s" %
154                   self.cfg.GetNodeName(self.instance.primary_node))
155       result = self.rpc.call_x509_cert_create(self.instance.primary_node,
156                                               constants.RIE_CERT_VALIDITY)
157       result.Raise("Can't create X509 key and certificate on %s" %
158                    self.cfg.GetNodeName(result.node))
159
160       (name, cert_pem) = result.payload
161
162       cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
163                                              cert_pem)
164
165       return {
166         "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
167         "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
168                           salt),
169         "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
170         }
171
172     return None
173
174
175 class LUBackupExport(LogicalUnit):
176   """Export an instance to an image in the cluster.
177
178   """
179   HPATH = "instance-export"
180   HTYPE = constants.HTYPE_INSTANCE
181   REQ_BGL = False
182
183   def CheckArguments(self):
184     """Check the arguments.
185
186     """
187     self.x509_key_name = self.op.x509_key_name
188     self.dest_x509_ca_pem = self.op.destination_x509_ca
189
190     if self.op.mode == constants.EXPORT_MODE_REMOTE:
191       if not self.x509_key_name:
192         raise errors.OpPrereqError("Missing X509 key name for encryption",
193                                    errors.ECODE_INVAL)
194
195       if not self.dest_x509_ca_pem:
196         raise errors.OpPrereqError("Missing destination X509 CA",
197                                    errors.ECODE_INVAL)
198
199   def ExpandNames(self):
200     self._ExpandAndLockInstance()
201
202     # Lock all nodes for local exports
203     if self.op.mode == constants.EXPORT_MODE_LOCAL:
204       (self.op.target_node_uuid, self.op.target_node) = \
205         ExpandNodeUuidAndName(self.cfg, self.op.target_node_uuid,
206                               self.op.target_node)
207       # FIXME: lock only instance primary and destination node
208       #
209       # Sad but true, for now we have do lock all nodes, as we don't know where
210       # the previous export might be, and in this LU we search for it and
211       # remove it from its current node. In the future we could fix this by:
212       #  - making a tasklet to search (share-lock all), then create the
213       #    new one, then one to remove, after
214       #  - removing the removal operation altogether
215       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
216
217       # Allocations should be stopped while this LU runs with node locks, but
218       # it doesn't have to be exclusive
219       self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
220       self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
221
222   def DeclareLocks(self, level):
223     """Last minute lock declaration."""
224     # All nodes are locked anyway, so nothing to do here.
225
226   def BuildHooksEnv(self):
227     """Build hooks env.
228
229     This will run on the master, primary node and target node.
230
231     """
232     env = {
233       "EXPORT_MODE": self.op.mode,
234       "EXPORT_NODE": self.op.target_node,
235       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
236       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
237       # TODO: Generic function for boolean env variables
238       "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
239       }
240
241     env.update(BuildInstanceHookEnvByObject(self, self.instance))
242
243     return env
244
245   def BuildHooksNodes(self):
246     """Build hooks nodes.
247
248     """
249     nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
250
251     if self.op.mode == constants.EXPORT_MODE_LOCAL:
252       nl.append(self.op.target_node_uuid)
253
254     return (nl, nl)
255
256   def CheckPrereq(self):
257     """Check prerequisites.
258
259     This checks that the instance and node names are valid.
260
261     """
262     self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
263     assert self.instance is not None, \
264           "Cannot retrieve locked instance %s" % self.op.instance_name
265     CheckNodeOnline(self, self.instance.primary_node)
266
267     if (self.op.remove_instance and
268         self.instance.admin_state == constants.ADMINST_UP and
269         not self.op.shutdown):
270       raise errors.OpPrereqError("Can not remove instance without shutting it"
271                                  " down before", errors.ECODE_STATE)
272
273     if self.op.mode == constants.EXPORT_MODE_LOCAL:
274       self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
275       assert self.dst_node is not None
276
277       CheckNodeOnline(self, self.dst_node.uuid)
278       CheckNodeNotDrained(self, self.dst_node.uuid)
279
280       self._cds = None
281       self.dest_disk_info = None
282       self.dest_x509_ca = None
283
284     elif self.op.mode == constants.EXPORT_MODE_REMOTE:
285       self.dst_node = None
286
287       if len(self.op.target_node) != len(self.instance.disks):
288         raise errors.OpPrereqError(("Received destination information for %s"
289                                     " disks, but instance %s has %s disks") %
290                                    (len(self.op.target_node),
291                                     self.op.instance_name,
292                                     len(self.instance.disks)),
293                                    errors.ECODE_INVAL)
294
295       cds = GetClusterDomainSecret()
296
297       # Check X509 key name
298       try:
299         (key_name, hmac_digest, hmac_salt) = self.x509_key_name
300       except (TypeError, ValueError), err:
301         raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
302                                    errors.ECODE_INVAL)
303
304       if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
305         raise errors.OpPrereqError("HMAC for X509 key name is wrong",
306                                    errors.ECODE_INVAL)
307
308       # Load and verify CA
309       try:
310         (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
311       except OpenSSL.crypto.Error, err:
312         raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
313                                    (err, ), errors.ECODE_INVAL)
314
315       (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
316       if errcode is not None:
317         raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
318                                    (msg, ), errors.ECODE_INVAL)
319
320       self.dest_x509_ca = cert
321
322       # Verify target information
323       disk_info = []
324       for idx, disk_data in enumerate(self.op.target_node):
325         try:
326           (host, port, magic) = \
327             masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
328         except errors.GenericError, err:
329           raise errors.OpPrereqError("Target info for disk %s: %s" %
330                                      (idx, err), errors.ECODE_INVAL)
331
332         disk_info.append((host, port, magic))
333
334       assert len(disk_info) == len(self.op.target_node)
335       self.dest_disk_info = disk_info
336
337     else:
338       raise errors.ProgrammerError("Unhandled export mode %r" %
339                                    self.op.mode)
340
341     # instance disk type verification
342     # TODO: Implement export support for file-based disks
343     for disk in self.instance.disks:
344       if disk.dev_type == constants.LD_FILE:
345         raise errors.OpPrereqError("Export not supported for instances with"
346                                    " file-based disks", errors.ECODE_INVAL)
347
348   def _CleanupExports(self, feedback_fn):
349     """Removes exports of current instance from all other nodes.
350
351     If an instance in a cluster with nodes A..D was exported to node C, its
352     exports will be removed from the nodes A, B and D.
353
354     """
355     assert self.op.mode != constants.EXPORT_MODE_REMOTE
356
357     node_uuids = self.cfg.GetNodeList()
358     node_uuids.remove(self.dst_node.uuid)
359
360     # on one-node clusters nodelist will be empty after the removal
361     # if we proceed the backup would be removed because OpBackupQuery
362     # substitutes an empty list with the full cluster node list.
363     iname = self.instance.name
364     if node_uuids:
365       feedback_fn("Removing old exports for instance %s" % iname)
366       exportlist = self.rpc.call_export_list(node_uuids)
367       for node_uuid in exportlist:
368         if exportlist[node_uuid].fail_msg:
369           continue
370         if iname in exportlist[node_uuid].payload:
371           msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
372           if msg:
373             self.LogWarning("Could not remove older export for instance %s"
374                             " on node %s: %s", iname,
375                             self.cfg.GetNodeName(node_uuid), msg)
376
377   def Exec(self, feedback_fn):
378     """Export an instance to an image in the cluster.
379
380     """
381     assert self.op.mode in constants.EXPORT_MODES
382
383     src_node_uuid = self.instance.primary_node
384
385     if self.op.shutdown:
386       # shutdown the instance, but not the disks
387       feedback_fn("Shutting down instance %s" % self.instance.name)
388       result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
389                                                self.op.shutdown_timeout,
390                                                self.op.reason)
391       # TODO: Maybe ignore failures if ignore_remove_failures is set
392       result.Raise("Could not shutdown instance %s on"
393                    " node %s" % (self.instance.name,
394                                  self.cfg.GetNodeName(src_node_uuid)))
395
396     # set the disks ID correctly since call_instance_start needs the
397     # correct drbd minor to create the symlinks
398     for disk in self.instance.disks:
399       self.cfg.SetDiskID(disk, src_node_uuid)
400
401     activate_disks = not self.instance.disks_active
402
403     if activate_disks:
404       # Activate the instance disks if we'exporting a stopped instance
405       feedback_fn("Activating disks for %s" % self.instance.name)
406       StartInstanceDisks(self, self.instance, None)
407
408     try:
409       helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
410                                                      self.instance)
411
412       helper.CreateSnapshots()
413       try:
414         if (self.op.shutdown and
415             self.instance.admin_state == constants.ADMINST_UP and
416             not self.op.remove_instance):
417           assert not activate_disks
418           feedback_fn("Starting instance %s" % self.instance.name)
419           result = self.rpc.call_instance_start(src_node_uuid,
420                                                 (self.instance, None, None),
421                                                 False, self.op.reason)
422           msg = result.fail_msg
423           if msg:
424             feedback_fn("Failed to start instance: %s" % msg)
425             ShutdownInstanceDisks(self, self.instance)
426             raise errors.OpExecError("Could not start instance: %s" % msg)
427
428         if self.op.mode == constants.EXPORT_MODE_LOCAL:
429           (fin_resu, dresults) = helper.LocalExport(self.dst_node)
430         elif self.op.mode == constants.EXPORT_MODE_REMOTE:
431           connect_timeout = constants.RIE_CONNECT_TIMEOUT
432           timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
433
434           (key_name, _, _) = self.x509_key_name
435
436           dest_ca_pem = \
437             OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
438                                             self.dest_x509_ca)
439
440           (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
441                                                      key_name, dest_ca_pem,
442                                                      timeouts)
443       finally:
444         helper.Cleanup()
445
446       # Check for backwards compatibility
447       assert len(dresults) == len(self.instance.disks)
448       assert compat.all(isinstance(i, bool) for i in dresults), \
449              "Not all results are boolean: %r" % dresults
450
451     finally:
452       if activate_disks:
453         feedback_fn("Deactivating disks for %s" % self.instance.name)
454         ShutdownInstanceDisks(self, self.instance)
455
456     if not (compat.all(dresults) and fin_resu):
457       failures = []
458       if not fin_resu:
459         failures.append("export finalization")
460       if not compat.all(dresults):
461         fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
462                                if not dsk)
463         failures.append("disk export: disk(s) %s" % fdsk)
464
465       raise errors.OpExecError("Export failed, errors in %s" %
466                                utils.CommaJoin(failures))
467
468     # At this point, the export was successful, we can cleanup/finish
469
470     # Remove instance if requested
471     if self.op.remove_instance:
472       feedback_fn("Removing instance %s" % self.instance.name)
473       RemoveInstance(self, feedback_fn, self.instance,
474                      self.op.ignore_remove_failures)
475
476     if self.op.mode == constants.EXPORT_MODE_LOCAL:
477       self._CleanupExports(feedback_fn)
478
479     return fin_resu, dresults
480
481
482 class LUBackupRemove(NoHooksLU):
483   """Remove exports related to the named instance.
484
485   """
486   REQ_BGL = False
487
488   def ExpandNames(self):
489     self.needed_locks = {
490       # We need all nodes to be locked in order for RemoveExport to work, but
491       # we don't need to lock the instance itself, as nothing will happen to it
492       # (and we can remove exports also for a removed instance)
493       locking.LEVEL_NODE: locking.ALL_SET,
494
495       # Removing backups is quick, so blocking allocations is justified
496       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
497       }
498
499     # Allocations should be stopped while this LU runs with node locks, but it
500     # doesn't have to be exclusive
501     self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
502
503   def Exec(self, feedback_fn):
504     """Remove any export.
505
506     """
507     (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
508     # If the instance was not found we'll try with the name that was passed in.
509     # This will only work if it was an FQDN, though.
510     fqdn_warn = False
511     if not inst_name:
512       fqdn_warn = True
513       inst_name = self.op.instance_name
514
515     locked_nodes = self.owned_locks(locking.LEVEL_NODE)
516     exportlist = self.rpc.call_export_list(locked_nodes)
517     found = False
518     for node_uuid in exportlist:
519       msg = exportlist[node_uuid].fail_msg
520       if msg:
521         self.LogWarning("Failed to query node %s (continuing): %s",
522                         self.cfg.GetNodeName(node_uuid), msg)
523         continue
524       if inst_name in exportlist[node_uuid].payload:
525         found = True
526         result = self.rpc.call_export_remove(node_uuid, inst_name)
527         msg = result.fail_msg
528         if msg:
529           logging.error("Could not remove export for instance %s"
530                         " on node %s: %s", inst_name,
531                         self.cfg.GetNodeName(node_uuid), msg)
532
533     if fqdn_warn and not found:
534       feedback_fn("Export not found. If trying to remove an export belonging"
535                   " to a deleted instance please use its Fully Qualified"
536                   " Domain Name.")