Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / backup.py @ 1c3231aa

History | View | Annotate | Download (18.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with backup operations."""
23

    
24
import OpenSSL
25
import logging
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import locking
31
from ganeti import masterd
32
from ganeti import qlang
33
from ganeti import query
34
from ganeti import utils
35

    
36
from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
37
from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
38
  ExpandNodeUuidAndName
39
from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
40
  ShutdownInstanceDisks
41
from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
42
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
43

    
44

    
45
class ExportQuery(QueryBase):
46
  FIELDS = query.EXPORT_FIELDS
47

    
48
  #: The node name is not a unique key for this query
49
  SORT_FIELD = "node"
50

    
51
  def ExpandNames(self, lu):
52
    lu.needed_locks = {}
53

    
54
    # The following variables interact with _QueryBase._GetNames
55
    if self.names:
56
      (self.wanted, _) = GetWantedNodes(lu, self.names)
57
    else:
58
      self.wanted = locking.ALL_SET
59

    
60
    self.do_locking = self.use_locking
61

    
62
    if self.do_locking:
63
      lu.share_locks = ShareAll()
64
      lu.needed_locks = {
65
        locking.LEVEL_NODE: self.wanted,
66
        }
67

    
68
      if not self.names:
69
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70

    
71
  def DeclareLocks(self, lu, level):
72
    pass
73

    
74
  def _GetQueryData(self, lu):
75
    """Computes the list of nodes and their attributes.
76

77
    """
78
    # Locking is not used
79
    # TODO
80
    assert not (compat.any(lu.glm.is_owned(level)
81
                           for level in locking.LEVELS
82
                           if level != locking.LEVEL_CLUSTER) or
83
                self.do_locking or self.use_locking)
84

    
85
    node_uuids = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86

    
87
    result = []
88

    
89
    for (node_uuid, nres) in lu.rpc.call_export_list(node_uuids).items():
90
      if nres.fail_msg:
91
        result.append((node_uuid, None))
92
      else:
93
        result.extend((node_uuid, expname) for expname in nres.payload)
94

    
95
    return result
96

    
97

    
98
class LUBackupQuery(NoHooksLU):
99
  """Query the exports list
100

101
  """
102
  REQ_BGL = False
103

    
104
  def CheckArguments(self):
105
    self.expq = ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
106
                            ["node", "export"], self.op.use_locking)
107

    
108
  def ExpandNames(self):
109
    self.expq.ExpandNames(self)
110

    
111
  def DeclareLocks(self, level):
112
    self.expq.DeclareLocks(self, level)
113

    
114
  def Exec(self, feedback_fn):
115
    result = {}
116

    
117
    for (node, expname) in self.expq.OldStyleQuery(self):
118
      if expname is None:
119
        result[node] = False
120
      else:
121
        result.setdefault(node, []).append(expname)
122

    
123
    return result
124

    
125

    
126
class LUBackupPrepare(NoHooksLU):
127
  """Prepares an instance for an export and returns useful information.
128

129
  """
130
  REQ_BGL = False
131

    
132
  def ExpandNames(self):
133
    self._ExpandAndLockInstance()
134

    
135
  def CheckPrereq(self):
136
    """Check prerequisites.
137

138
    """
139
    instance_name = self.op.instance_name
140

    
141
    self.instance = self.cfg.GetInstanceInfo(instance_name)
142
    assert self.instance is not None, \
143
          "Cannot retrieve locked instance %s" % self.op.instance_name
144
    CheckNodeOnline(self, self.instance.primary_node)
145

    
146
    self._cds = GetClusterDomainSecret()
147

    
148
  def Exec(self, feedback_fn):
149
    """Prepares an instance for an export.
150

151
    """
152
    instance = self.instance
153

    
154
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
155
      salt = utils.GenerateSecret(8)
156

    
157
      feedback_fn("Generating X509 certificate on %s" %
158
                  self.cfg.GetNodeName(instance.primary_node))
159
      result = self.rpc.call_x509_cert_create(instance.primary_node,
160
                                              constants.RIE_CERT_VALIDITY)
161
      result.Raise("Can't create X509 key and certificate on %s" %
162
                   self.cfg.GetNodeName(result.node))
163

    
164
      (name, cert_pem) = result.payload
165

    
166
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
167
                                             cert_pem)
168

    
169
      return {
170
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
171
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
172
                          salt),
173
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
174
        }
175

    
176
    return None
177

    
178

    
179
class LUBackupExport(LogicalUnit):
180
  """Export an instance to an image in the cluster.
181

182
  """
183
  HPATH = "instance-export"
184
  HTYPE = constants.HTYPE_INSTANCE
185
  REQ_BGL = False
186

    
187
  def CheckArguments(self):
188
    """Check the arguments.
189

190
    """
191
    self.x509_key_name = self.op.x509_key_name
192
    self.dest_x509_ca_pem = self.op.destination_x509_ca
193

    
194
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
195
      if not self.x509_key_name:
196
        raise errors.OpPrereqError("Missing X509 key name for encryption",
197
                                   errors.ECODE_INVAL)
198

    
199
      if not self.dest_x509_ca_pem:
200
        raise errors.OpPrereqError("Missing destination X509 CA",
201
                                   errors.ECODE_INVAL)
202

    
203
  def ExpandNames(self):
204
    self._ExpandAndLockInstance()
205

    
206
    # Lock all nodes for local exports
207
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
208
      (self.op.target_node_uuid, self.op.target_node) = \
209
        ExpandNodeUuidAndName(self.cfg, self.op.target_node_uuid,
210
                              self.op.target_node)
211
      # FIXME: lock only instance primary and destination node
212
      #
213
      # Sad but true, for now we have do lock all nodes, as we don't know where
214
      # the previous export might be, and in this LU we search for it and
215
      # remove it from its current node. In the future we could fix this by:
216
      #  - making a tasklet to search (share-lock all), then create the
217
      #    new one, then one to remove, after
218
      #  - removing the removal operation altogether
219
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
220

    
221
      # Allocations should be stopped while this LU runs with node locks, but
222
      # it doesn't have to be exclusive
223
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
224
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
225

    
226
  def DeclareLocks(self, level):
227
    """Last minute lock declaration."""
228
    # All nodes are locked anyway, so nothing to do here.
229

    
230
  def BuildHooksEnv(self):
231
    """Build hooks env.
232

233
    This will run on the master, primary node and target node.
234

235
    """
236
    env = {
237
      "EXPORT_MODE": self.op.mode,
238
      "EXPORT_NODE": self.op.target_node,
239
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
240
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
241
      # TODO: Generic function for boolean env variables
242
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
243
      }
244

    
245
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
246

    
247
    return env
248

    
249
  def BuildHooksNodes(self):
250
    """Build hooks nodes.
251

252
    """
253
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
254

    
255
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
256
      nl.append(self.op.target_node_uuid)
257

    
258
    return (nl, nl)
259

    
260
  def CheckPrereq(self):
261
    """Check prerequisites.
262

263
    This checks that the instance and node names are valid.
264

265
    """
266
    instance_name = self.op.instance_name
267

    
268
    self.instance = self.cfg.GetInstanceInfo(instance_name)
269
    assert self.instance is not None, \
270
          "Cannot retrieve locked instance %s" % self.op.instance_name
271
    CheckNodeOnline(self, self.instance.primary_node)
272

    
273
    if (self.op.remove_instance and
274
        self.instance.admin_state == constants.ADMINST_UP and
275
        not self.op.shutdown):
276
      raise errors.OpPrereqError("Can not remove instance without shutting it"
277
                                 " down before", errors.ECODE_STATE)
278

    
279
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
280
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
281
      assert self.dst_node is not None
282

    
283
      CheckNodeOnline(self, self.dst_node.uuid)
284
      CheckNodeNotDrained(self, self.dst_node.uuid)
285

    
286
      self._cds = None
287
      self.dest_disk_info = None
288
      self.dest_x509_ca = None
289

    
290
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
291
      self.dst_node = None
292

    
293
      if len(self.op.target_node) != len(self.instance.disks):
294
        raise errors.OpPrereqError(("Received destination information for %s"
295
                                    " disks, but instance %s has %s disks") %
296
                                   (len(self.op.target_node), instance_name,
297
                                    len(self.instance.disks)),
298
                                   errors.ECODE_INVAL)
299

    
300
      cds = GetClusterDomainSecret()
301

    
302
      # Check X509 key name
303
      try:
304
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
305
      except (TypeError, ValueError), err:
306
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
307
                                   errors.ECODE_INVAL)
308

    
309
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
310
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
311
                                   errors.ECODE_INVAL)
312

    
313
      # Load and verify CA
314
      try:
315
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
316
      except OpenSSL.crypto.Error, err:
317
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
318
                                   (err, ), errors.ECODE_INVAL)
319

    
320
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
321
      if errcode is not None:
322
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
323
                                   (msg, ), errors.ECODE_INVAL)
324

    
325
      self.dest_x509_ca = cert
326

    
327
      # Verify target information
328
      disk_info = []
329
      for idx, disk_data in enumerate(self.op.target_node):
330
        try:
331
          (host, port, magic) = \
332
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
333
        except errors.GenericError, err:
334
          raise errors.OpPrereqError("Target info for disk %s: %s" %
335
                                     (idx, err), errors.ECODE_INVAL)
336

    
337
        disk_info.append((host, port, magic))
338

    
339
      assert len(disk_info) == len(self.op.target_node)
340
      self.dest_disk_info = disk_info
341

    
342
    else:
343
      raise errors.ProgrammerError("Unhandled export mode %r" %
344
                                   self.op.mode)
345

    
346
    # instance disk type verification
347
    # TODO: Implement export support for file-based disks
348
    for disk in self.instance.disks:
349
      if disk.dev_type == constants.LD_FILE:
350
        raise errors.OpPrereqError("Export not supported for instances with"
351
                                   " file-based disks", errors.ECODE_INVAL)
352

    
353
  def _CleanupExports(self, feedback_fn):
354
    """Removes exports of current instance from all other nodes.
355

356
    If an instance in a cluster with nodes A..D was exported to node C, its
357
    exports will be removed from the nodes A, B and D.
358

359
    """
360
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
361

    
362
    node_uuids = self.cfg.GetNodeList()
363
    node_uuids.remove(self.dst_node.uuid)
364

    
365
    # on one-node clusters nodelist will be empty after the removal
366
    # if we proceed the backup would be removed because OpBackupQuery
367
    # substitutes an empty list with the full cluster node list.
368
    iname = self.instance.name
369
    if node_uuids:
370
      feedback_fn("Removing old exports for instance %s" % iname)
371
      exportlist = self.rpc.call_export_list(node_uuids)
372
      for node_uuid in exportlist:
373
        if exportlist[node_uuid].fail_msg:
374
          continue
375
        if iname in exportlist[node_uuid].payload:
376
          msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
377
          if msg:
378
            self.LogWarning("Could not remove older export for instance %s"
379
                            " on node %s: %s", iname,
380
                            self.cfg.GetNodeName(node_uuid), msg)
381

    
382
  def Exec(self, feedback_fn):
383
    """Export an instance to an image in the cluster.
384

385
    """
386
    assert self.op.mode in constants.EXPORT_MODES
387

    
388
    instance = self.instance
389
    src_node_uuid = instance.primary_node
390

    
391
    if self.op.shutdown:
392
      # shutdown the instance, but not the disks
393
      feedback_fn("Shutting down instance %s" % instance.name)
394
      result = self.rpc.call_instance_shutdown(src_node_uuid, instance,
395
                                               self.op.shutdown_timeout,
396
                                               self.op.reason)
397
      # TODO: Maybe ignore failures if ignore_remove_failures is set
398
      result.Raise("Could not shutdown instance %s on"
399
                   " node %s" % (instance.name,
400
                                 self.cfg.GetNodeName(src_node_uuid)))
401

    
402
    # set the disks ID correctly since call_instance_start needs the
403
    # correct drbd minor to create the symlinks
404
    for disk in instance.disks:
405
      self.cfg.SetDiskID(disk, src_node_uuid)
406

    
407
    activate_disks = not instance.disks_active
408

    
409
    if activate_disks:
410
      # Activate the instance disks if we'exporting a stopped instance
411
      feedback_fn("Activating disks for %s" % instance.name)
412
      StartInstanceDisks(self, instance, None)
413

    
414
    try:
415
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
416
                                                     instance)
417

    
418
      helper.CreateSnapshots()
419
      try:
420
        if (self.op.shutdown and
421
            instance.admin_state == constants.ADMINST_UP and
422
            not self.op.remove_instance):
423
          assert not activate_disks
424
          feedback_fn("Starting instance %s" % instance.name)
425
          result = self.rpc.call_instance_start(src_node_uuid,
426
                                                (instance, None, None), False,
427
                                                 self.op.reason)
428
          msg = result.fail_msg
429
          if msg:
430
            feedback_fn("Failed to start instance: %s" % msg)
431
            ShutdownInstanceDisks(self, instance)
432
            raise errors.OpExecError("Could not start instance: %s" % msg)
433

    
434
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
435
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
436
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
437
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
438
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
439

    
440
          (key_name, _, _) = self.x509_key_name
441

    
442
          dest_ca_pem = \
443
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
444
                                            self.dest_x509_ca)
445

    
446
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
447
                                                     key_name, dest_ca_pem,
448
                                                     timeouts)
449
      finally:
450
        helper.Cleanup()
451

    
452
      # Check for backwards compatibility
453
      assert len(dresults) == len(instance.disks)
454
      assert compat.all(isinstance(i, bool) for i in dresults), \
455
             "Not all results are boolean: %r" % dresults
456

    
457
    finally:
458
      if activate_disks:
459
        feedback_fn("Deactivating disks for %s" % instance.name)
460
        ShutdownInstanceDisks(self, instance)
461

    
462
    if not (compat.all(dresults) and fin_resu):
463
      failures = []
464
      if not fin_resu:
465
        failures.append("export finalization")
466
      if not compat.all(dresults):
467
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
468
                               if not dsk)
469
        failures.append("disk export: disk(s) %s" % fdsk)
470

    
471
      raise errors.OpExecError("Export failed, errors in %s" %
472
                               utils.CommaJoin(failures))
473

    
474
    # At this point, the export was successful, we can cleanup/finish
475

    
476
    # Remove instance if requested
477
    if self.op.remove_instance:
478
      feedback_fn("Removing instance %s" % instance.name)
479
      RemoveInstance(self, feedback_fn, instance,
480
                     self.op.ignore_remove_failures)
481

    
482
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
483
      self._CleanupExports(feedback_fn)
484

    
485
    return fin_resu, dresults
486

    
487

    
488
class LUBackupRemove(NoHooksLU):
489
  """Remove exports related to the named instance.
490

491
  """
492
  REQ_BGL = False
493

    
494
  def ExpandNames(self):
495
    self.needed_locks = {
496
      # We need all nodes to be locked in order for RemoveExport to work, but
497
      # we don't need to lock the instance itself, as nothing will happen to it
498
      # (and we can remove exports also for a removed instance)
499
      locking.LEVEL_NODE: locking.ALL_SET,
500

    
501
      # Removing backups is quick, so blocking allocations is justified
502
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
503
      }
504

    
505
    # Allocations should be stopped while this LU runs with node locks, but it
506
    # doesn't have to be exclusive
507
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
508

    
509
  def Exec(self, feedback_fn):
510
    """Remove any export.
511

512
    """
513
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
514
    # If the instance was not found we'll try with the name that was passed in.
515
    # This will only work if it was an FQDN, though.
516
    fqdn_warn = False
517
    if not instance_name:
518
      fqdn_warn = True
519
      instance_name = self.op.instance_name
520

    
521
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
522
    exportlist = self.rpc.call_export_list(locked_nodes)
523
    found = False
524
    for node_uuid in exportlist:
525
      msg = exportlist[node_uuid].fail_msg
526
      if msg:
527
        self.LogWarning("Failed to query node %s (continuing): %s",
528
                        self.cfg.GetNodeName(node_uuid), msg)
529
        continue
530
      if instance_name in exportlist[node_uuid].payload:
531
        found = True
532
        result = self.rpc.call_export_remove(node_uuid, instance_name)
533
        msg = result.fail_msg
534
        if msg:
535
          logging.error("Could not remove export for instance %s"
536
                        " on node %s: %s", instance_name,
537
                        self.cfg.GetNodeName(node_uuid), msg)
538

    
539
    if fqdn_warn and not found:
540
      feedback_fn("Export not found. If trying to remove an export belonging"
541
                  " to a deleted instance please use its Fully Qualified"
542
                  " Domain Name.")