Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / backup.py @ fbeb41e6

History | View | Annotate | Download (17 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with backup operations."""
23

    
24
import OpenSSL
25
import logging
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import locking
31
from ganeti import masterd
32
from ganeti import query
33
from ganeti import utils
34

    
35
from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
36
from ganeti.cmdlib.common import CheckNodeOnline, \
37
  ExpandNodeUuidAndName
38
from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
39
  ShutdownInstanceDisks
40
from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
41
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
42

    
43

    
44
class ExportQuery(QueryBase):
45
  FIELDS = query.EXPORT_FIELDS
46

    
47
  #: The node name is not a unique key for this query
48
  SORT_FIELD = "node"
49

    
50
  def ExpandNames(self, lu):
51
    raise NotImplementedError
52

    
53
  def DeclareLocks(self, lu, level):
54
    pass
55

    
56
  def _GetQueryData(self, lu):
57
    raise NotImplementedError
58

    
59

    
60
class LUBackupQuery(NoHooksLU):
61
  """Query the exports list
62

63
  """
64
  REQ_BGL = False
65

    
66
  def CheckArguments(self):
67
    raise NotImplementedError
68

    
69
  def ExpandNames(self):
70
    raise NotImplementedError
71

    
72
  def DeclareLocks(self, level):
73
    raise NotImplementedError
74

    
75
  def Exec(self, feedback_fn):
76
    raise NotImplementedError
77

    
78

    
79
class LUBackupPrepare(NoHooksLU):
80
  """Prepares an instance for an export and returns useful information.
81

82
  """
83
  REQ_BGL = False
84

    
85
  def ExpandNames(self):
86
    self._ExpandAndLockInstance()
87

    
88
  def CheckPrereq(self):
89
    """Check prerequisites.
90

91
    """
92
    self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
93
    assert self.instance is not None, \
94
          "Cannot retrieve locked instance %s" % self.op.instance_name
95
    CheckNodeOnline(self, self.instance.primary_node)
96

    
97
    self._cds = GetClusterDomainSecret()
98

    
99
  def Exec(self, feedback_fn):
100
    """Prepares an instance for an export.
101

102
    """
103
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
104
      salt = utils.GenerateSecret(8)
105

    
106
      feedback_fn("Generating X509 certificate on %s" %
107
                  self.cfg.GetNodeName(self.instance.primary_node))
108
      result = self.rpc.call_x509_cert_create(self.instance.primary_node,
109
                                              constants.RIE_CERT_VALIDITY)
110
      result.Raise("Can't create X509 key and certificate on %s" %
111
                   self.cfg.GetNodeName(result.node))
112

    
113
      (name, cert_pem) = result.payload
114

    
115
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
116
                                             cert_pem)
117

    
118
      return {
119
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
120
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
121
                          salt),
122
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
123
        }
124

    
125
    return None
126

    
127

    
128
class LUBackupExport(LogicalUnit):
129
  """Export an instance to an image in the cluster.
130

131
  """
132
  HPATH = "instance-export"
133
  HTYPE = constants.HTYPE_INSTANCE
134
  REQ_BGL = False
135

    
136
  def CheckArguments(self):
137
    """Check the arguments.
138

139
    """
140
    self.x509_key_name = self.op.x509_key_name
141
    self.dest_x509_ca_pem = self.op.destination_x509_ca
142

    
143
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
144
      if not self.x509_key_name:
145
        raise errors.OpPrereqError("Missing X509 key name for encryption",
146
                                   errors.ECODE_INVAL)
147

    
148
      if not self.dest_x509_ca_pem:
149
        raise errors.OpPrereqError("Missing destination X509 CA",
150
                                   errors.ECODE_INVAL)
151

    
152
  def ExpandNames(self):
153
    self._ExpandAndLockInstance()
154

    
155
    # Lock all nodes for local exports
156
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
157
      (self.op.target_node_uuid, self.op.target_node) = \
158
        ExpandNodeUuidAndName(self.cfg, self.op.target_node_uuid,
159
                              self.op.target_node)
160
      # FIXME: lock only instance primary and destination node
161
      #
162
      # Sad but true, for now we have do lock all nodes, as we don't know where
163
      # the previous export might be, and in this LU we search for it and
164
      # remove it from its current node. In the future we could fix this by:
165
      #  - making a tasklet to search (share-lock all), then create the
166
      #    new one, then one to remove, after
167
      #  - removing the removal operation altogether
168
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
169

    
170
      # Allocations should be stopped while this LU runs with node locks, but
171
      # it doesn't have to be exclusive
172
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
173
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
174

    
175
  def DeclareLocks(self, level):
176
    """Last minute lock declaration."""
177
    # All nodes are locked anyway, so nothing to do here.
178

    
179
  def BuildHooksEnv(self):
180
    """Build hooks env.
181

182
    This will run on the master, primary node and target node.
183

184
    """
185
    env = {
186
      "EXPORT_MODE": self.op.mode,
187
      "EXPORT_NODE": self.op.target_node,
188
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
189
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
190
      # TODO: Generic function for boolean env variables
191
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
192
      }
193

    
194
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
195

    
196
    return env
197

    
198
  def BuildHooksNodes(self):
199
    """Build hooks nodes.
200

201
    """
202
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
203

    
204
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
205
      nl.append(self.op.target_node_uuid)
206

    
207
    return (nl, nl)
208

    
209
  def CheckPrereq(self):
210
    """Check prerequisites.
211

212
    This checks that the instance and node names are valid.
213

214
    """
215
    self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
216
    assert self.instance is not None, \
217
          "Cannot retrieve locked instance %s" % self.op.instance_name
218
    CheckNodeOnline(self, self.instance.primary_node)
219

    
220
    if (self.op.remove_instance and
221
        self.instance.admin_state == constants.ADMINST_UP and
222
        not self.op.shutdown):
223
      raise errors.OpPrereqError("Can not remove instance without shutting it"
224
                                 " down before", errors.ECODE_STATE)
225

    
226
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
227
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
228
      assert self.dst_node is not None
229

    
230
      CheckNodeOnline(self, self.dst_node.uuid)
231
      CheckNodeNotDrained(self, self.dst_node.uuid)
232

    
233
      self._cds = None
234
      self.dest_disk_info = None
235
      self.dest_x509_ca = None
236

    
237
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
238
      self.dst_node = None
239

    
240
      if len(self.op.target_node) != len(self.instance.disks):
241
        raise errors.OpPrereqError(("Received destination information for %s"
242
                                    " disks, but instance %s has %s disks") %
243
                                   (len(self.op.target_node),
244
                                    self.op.instance_name,
245
                                    len(self.instance.disks)),
246
                                   errors.ECODE_INVAL)
247

    
248
      cds = GetClusterDomainSecret()
249

    
250
      # Check X509 key name
251
      try:
252
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
253
      except (TypeError, ValueError), err:
254
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
255
                                   errors.ECODE_INVAL)
256

    
257
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
258
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
259
                                   errors.ECODE_INVAL)
260

    
261
      # Load and verify CA
262
      try:
263
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
264
      except OpenSSL.crypto.Error, err:
265
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
266
                                   (err, ), errors.ECODE_INVAL)
267

    
268
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
269
      if errcode is not None:
270
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
271
                                   (msg, ), errors.ECODE_INVAL)
272

    
273
      self.dest_x509_ca = cert
274

    
275
      # Verify target information
276
      disk_info = []
277
      for idx, disk_data in enumerate(self.op.target_node):
278
        try:
279
          (host, port, magic) = \
280
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
281
        except errors.GenericError, err:
282
          raise errors.OpPrereqError("Target info for disk %s: %s" %
283
                                     (idx, err), errors.ECODE_INVAL)
284

    
285
        disk_info.append((host, port, magic))
286

    
287
      assert len(disk_info) == len(self.op.target_node)
288
      self.dest_disk_info = disk_info
289

    
290
    else:
291
      raise errors.ProgrammerError("Unhandled export mode %r" %
292
                                   self.op.mode)
293

    
294
    # instance disk type verification
295
    # TODO: Implement export support for file-based disks
296
    for disk in self.instance.disks:
297
      if disk.dev_type in constants.DTS_FILEBASED:
298
        raise errors.OpPrereqError("Export not supported for instances with"
299
                                   " file-based disks", errors.ECODE_INVAL)
300

    
301
  def _CleanupExports(self, feedback_fn):
302
    """Removes exports of current instance from all other nodes.
303

304
    If an instance in a cluster with nodes A..D was exported to node C, its
305
    exports will be removed from the nodes A, B and D.
306

307
    """
308
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
309

    
310
    node_uuids = self.cfg.GetNodeList()
311
    node_uuids.remove(self.dst_node.uuid)
312

    
313
    # on one-node clusters nodelist will be empty after the removal
314
    # if we proceed the backup would be removed because OpBackupQuery
315
    # substitutes an empty list with the full cluster node list.
316
    iname = self.instance.name
317
    if node_uuids:
318
      feedback_fn("Removing old exports for instance %s" % iname)
319
      exportlist = self.rpc.call_export_list(node_uuids)
320
      for node_uuid in exportlist:
321
        if exportlist[node_uuid].fail_msg:
322
          continue
323
        if iname in exportlist[node_uuid].payload:
324
          msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
325
          if msg:
326
            self.LogWarning("Could not remove older export for instance %s"
327
                            " on node %s: %s", iname,
328
                            self.cfg.GetNodeName(node_uuid), msg)
329

    
330
  def Exec(self, feedback_fn):
331
    """Export an instance to an image in the cluster.
332

333
    """
334
    assert self.op.mode in constants.EXPORT_MODES
335

    
336
    src_node_uuid = self.instance.primary_node
337

    
338
    if self.op.shutdown:
339
      # shutdown the instance, but not the disks
340
      feedback_fn("Shutting down instance %s" % self.instance.name)
341
      result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
342
                                               self.op.shutdown_timeout,
343
                                               self.op.reason)
344
      # TODO: Maybe ignore failures if ignore_remove_failures is set
345
      result.Raise("Could not shutdown instance %s on"
346
                   " node %s" % (self.instance.name,
347
                                 self.cfg.GetNodeName(src_node_uuid)))
348

    
349
    activate_disks = not self.instance.disks_active
350

    
351
    if activate_disks:
352
      # Activate the instance disks if we're exporting a stopped instance
353
      feedback_fn("Activating disks for %s" % self.instance.name)
354
      StartInstanceDisks(self, self.instance, None)
355

    
356
    try:
357
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
358
                                                     self.instance)
359

    
360
      helper.CreateSnapshots()
361
      try:
362
        if (self.op.shutdown and
363
            self.instance.admin_state == constants.ADMINST_UP and
364
            not self.op.remove_instance):
365
          assert not activate_disks
366
          feedback_fn("Starting instance %s" % self.instance.name)
367
          result = self.rpc.call_instance_start(src_node_uuid,
368
                                                (self.instance, None, None),
369
                                                False, self.op.reason)
370
          msg = result.fail_msg
371
          if msg:
372
            feedback_fn("Failed to start instance: %s" % msg)
373
            ShutdownInstanceDisks(self, self.instance)
374
            raise errors.OpExecError("Could not start instance: %s" % msg)
375

    
376
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
377
          (fin_resu, dresults) = helper.LocalExport(self.dst_node,
378
                                                    self.op.compress)
379
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
380
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
381
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
382

    
383
          (key_name, _, _) = self.x509_key_name
384

    
385
          dest_ca_pem = \
386
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
387
                                            self.dest_x509_ca)
388

    
389
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
390
                                                     key_name, dest_ca_pem,
391
                                                     self.op.compress,
392
                                                     timeouts)
393
      finally:
394
        helper.Cleanup()
395

    
396
      # Check for backwards compatibility
397
      assert len(dresults) == len(self.instance.disks)
398
      assert compat.all(isinstance(i, bool) for i in dresults), \
399
             "Not all results are boolean: %r" % dresults
400

    
401
    finally:
402
      if activate_disks:
403
        feedback_fn("Deactivating disks for %s" % self.instance.name)
404
        ShutdownInstanceDisks(self, self.instance)
405

    
406
    if not (compat.all(dresults) and fin_resu):
407
      failures = []
408
      if not fin_resu:
409
        failures.append("export finalization")
410
      if not compat.all(dresults):
411
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
412
                               if not dsk)
413
        failures.append("disk export: disk(s) %s" % fdsk)
414

    
415
      raise errors.OpExecError("Export failed, errors in %s" %
416
                               utils.CommaJoin(failures))
417

    
418
    # At this point, the export was successful, we can cleanup/finish
419

    
420
    # Remove instance if requested
421
    if self.op.remove_instance:
422
      feedback_fn("Removing instance %s" % self.instance.name)
423
      RemoveInstance(self, feedback_fn, self.instance,
424
                     self.op.ignore_remove_failures)
425

    
426
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
427
      self._CleanupExports(feedback_fn)
428

    
429
    return fin_resu, dresults
430

    
431

    
432
class LUBackupRemove(NoHooksLU):
433
  """Remove exports related to the named instance.
434

435
  """
436
  REQ_BGL = False
437

    
438
  def ExpandNames(self):
439
    self.needed_locks = {
440
      # We need all nodes to be locked in order for RemoveExport to work, but
441
      # we don't need to lock the instance itself, as nothing will happen to it
442
      # (and we can remove exports also for a removed instance)
443
      locking.LEVEL_NODE: locking.ALL_SET,
444

    
445
      # Removing backups is quick, so blocking allocations is justified
446
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
447
      }
448

    
449
    # Allocations should be stopped while this LU runs with node locks, but it
450
    # doesn't have to be exclusive
451
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
452

    
453
  def Exec(self, feedback_fn):
454
    """Remove any export.
455

456
    """
457
    (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
458
    # If the instance was not found we'll try with the name that was passed in.
459
    # This will only work if it was an FQDN, though.
460
    fqdn_warn = False
461
    if not inst_name:
462
      fqdn_warn = True
463
      inst_name = self.op.instance_name
464

    
465
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
466
    exportlist = self.rpc.call_export_list(locked_nodes)
467
    found = False
468
    for node_uuid in exportlist:
469
      msg = exportlist[node_uuid].fail_msg
470
      if msg:
471
        self.LogWarning("Failed to query node %s (continuing): %s",
472
                        self.cfg.GetNodeName(node_uuid), msg)
473
        continue
474
      if inst_name in exportlist[node_uuid].payload:
475
        found = True
476
        result = self.rpc.call_export_remove(node_uuid, inst_name)
477
        msg = result.fail_msg
478
        if msg:
479
          logging.error("Could not remove export for instance %s"
480
                        " on node %s: %s", inst_name,
481
                        self.cfg.GetNodeName(node_uuid), msg)
482

    
483
    if fqdn_warn and not found:
484
      feedback_fn("Export not found. If trying to remove an export belonging"
485
                  " to a deleted instance please use its Fully Qualified"
486
                  " Domain Name.")