Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / backup.py @ 1d4a4b26

History | View | Annotate | Download (18 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with backup operations."""
23

    
24
import OpenSSL
25
import logging
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import locking
31
from ganeti import masterd
32
from ganeti import qlang
33
from ganeti import query
34
from ganeti import utils
35

    
36
from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
37
from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
38
  ExpandNodeName
39
from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
40
  ShutdownInstanceDisks
41
from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
42
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
43

    
44

    
45
class ExportQuery(QueryBase):
46
  FIELDS = query.EXPORT_FIELDS
47

    
48
  #: The node name is not a unique key for this query
49
  SORT_FIELD = "node"
50

    
51
  def ExpandNames(self, lu):
52
    lu.needed_locks = {}
53

    
54
    # The following variables interact with _QueryBase._GetNames
55
    if self.names:
56
      self.wanted = GetWantedNodes(lu, self.names)
57
    else:
58
      self.wanted = locking.ALL_SET
59

    
60
    self.do_locking = self.use_locking
61

    
62
    if self.do_locking:
63
      lu.share_locks = ShareAll()
64
      lu.needed_locks = {
65
        locking.LEVEL_NODE: self.wanted,
66
        }
67

    
68
      if not self.names:
69
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70

    
71
  def DeclareLocks(self, lu, level):
72
    pass
73

    
74
  def _GetQueryData(self, lu):
75
    """Computes the list of nodes and their attributes.
76

77
    """
78
    # Locking is not used
79
    # TODO
80
    assert not (compat.any(lu.glm.is_owned(level)
81
                           for level in locking.LEVELS
82
                           if level != locking.LEVEL_CLUSTER) or
83
                self.do_locking or self.use_locking)
84

    
85
    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86

    
87
    result = []
88

    
89
    for (node, nres) in lu.rpc.call_export_list(nodes).items():
90
      if nres.fail_msg:
91
        result.append((node, None))
92
      else:
93
        result.extend((node, expname) for expname in nres.payload)
94

    
95
    return result
96

    
97

    
98
class LUBackupQuery(NoHooksLU):
99
  """Query the exports list
100

101
  """
102
  REQ_BGL = False
103

    
104
  def CheckArguments(self):
105
    self.expq = ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
106
                            ["node", "export"], self.op.use_locking)
107

    
108
  def ExpandNames(self):
109
    self.expq.ExpandNames(self)
110

    
111
  def DeclareLocks(self, level):
112
    self.expq.DeclareLocks(self, level)
113

    
114
  def Exec(self, feedback_fn):
115
    result = {}
116

    
117
    for (node, expname) in self.expq.OldStyleQuery(self):
118
      if expname is None:
119
        result[node] = False
120
      else:
121
        result.setdefault(node, []).append(expname)
122

    
123
    return result
124

    
125

    
126
class LUBackupPrepare(NoHooksLU):
127
  """Prepares an instance for an export and returns useful information.
128

129
  """
130
  REQ_BGL = False
131

    
132
  def ExpandNames(self):
133
    self._ExpandAndLockInstance()
134

    
135
  def CheckPrereq(self):
136
    """Check prerequisites.
137

138
    """
139
    instance_name = self.op.instance_name
140

    
141
    self.instance = self.cfg.GetInstanceInfo(instance_name)
142
    assert self.instance is not None, \
143
          "Cannot retrieve locked instance %s" % self.op.instance_name
144
    CheckNodeOnline(self, self.instance.primary_node)
145

    
146
    self._cds = GetClusterDomainSecret()
147

    
148
  def Exec(self, feedback_fn):
149
    """Prepares an instance for an export.
150

151
    """
152
    instance = self.instance
153

    
154
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
155
      salt = utils.GenerateSecret(8)
156

    
157
      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
158
      result = self.rpc.call_x509_cert_create(instance.primary_node,
159
                                              constants.RIE_CERT_VALIDITY)
160
      result.Raise("Can't create X509 key and certificate on %s" % result.node)
161

    
162
      (name, cert_pem) = result.payload
163

    
164
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
165
                                             cert_pem)
166

    
167
      return {
168
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
169
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
170
                          salt),
171
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
172
        }
173

    
174
    return None
175

    
176

    
177
class LUBackupExport(LogicalUnit):
178
  """Export an instance to an image in the cluster.
179

180
  """
181
  HPATH = "instance-export"
182
  HTYPE = constants.HTYPE_INSTANCE
183
  REQ_BGL = False
184

    
185
  def CheckArguments(self):
186
    """Check the arguments.
187

188
    """
189
    self.x509_key_name = self.op.x509_key_name
190
    self.dest_x509_ca_pem = self.op.destination_x509_ca
191

    
192
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
193
      if not self.x509_key_name:
194
        raise errors.OpPrereqError("Missing X509 key name for encryption",
195
                                   errors.ECODE_INVAL)
196

    
197
      if not self.dest_x509_ca_pem:
198
        raise errors.OpPrereqError("Missing destination X509 CA",
199
                                   errors.ECODE_INVAL)
200

    
201
  def ExpandNames(self):
202
    self._ExpandAndLockInstance()
203

    
204
    # Lock all nodes for local exports
205
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
206
      # FIXME: lock only instance primary and destination node
207
      #
208
      # Sad but true, for now we have do lock all nodes, as we don't know where
209
      # the previous export might be, and in this LU we search for it and
210
      # remove it from its current node. In the future we could fix this by:
211
      #  - making a tasklet to search (share-lock all), then create the
212
      #    new one, then one to remove, after
213
      #  - removing the removal operation altogether
214
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
215

    
216
      # Allocations should be stopped while this LU runs with node locks, but
217
      # it doesn't have to be exclusive
218
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
219
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
220

    
221
  def DeclareLocks(self, level):
222
    """Last minute lock declaration."""
223
    # All nodes are locked anyway, so nothing to do here.
224

    
225
  def BuildHooksEnv(self):
226
    """Build hooks env.
227

228
    This will run on the master, primary node and target node.
229

230
    """
231
    env = {
232
      "EXPORT_MODE": self.op.mode,
233
      "EXPORT_NODE": self.op.target_node,
234
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
235
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
236
      # TODO: Generic function for boolean env variables
237
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
238
      }
239

    
240
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
241

    
242
    return env
243

    
244
  def BuildHooksNodes(self):
245
    """Build hooks nodes.
246

247
    """
248
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
249

    
250
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
251
      nl.append(self.op.target_node)
252

    
253
    return (nl, nl)
254

    
255
  def CheckPrereq(self):
256
    """Check prerequisites.
257

258
    This checks that the instance and node names are valid.
259

260
    """
261
    instance_name = self.op.instance_name
262

    
263
    self.instance = self.cfg.GetInstanceInfo(instance_name)
264
    assert self.instance is not None, \
265
          "Cannot retrieve locked instance %s" % self.op.instance_name
266
    CheckNodeOnline(self, self.instance.primary_node)
267

    
268
    if (self.op.remove_instance and
269
        self.instance.admin_state == constants.ADMINST_UP and
270
        not self.op.shutdown):
271
      raise errors.OpPrereqError("Can not remove instance without shutting it"
272
                                 " down before", errors.ECODE_STATE)
273

    
274
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
275
      self.op.target_node = ExpandNodeName(self.cfg, self.op.target_node)
276
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
277
      assert self.dst_node is not None
278

    
279
      CheckNodeOnline(self, self.dst_node.name)
280
      CheckNodeNotDrained(self, self.dst_node.name)
281

    
282
      self._cds = None
283
      self.dest_disk_info = None
284
      self.dest_x509_ca = None
285

    
286
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
287
      self.dst_node = None
288

    
289
      if len(self.op.target_node) != len(self.instance.disks):
290
        raise errors.OpPrereqError(("Received destination information for %s"
291
                                    " disks, but instance %s has %s disks") %
292
                                   (len(self.op.target_node), instance_name,
293
                                    len(self.instance.disks)),
294
                                   errors.ECODE_INVAL)
295

    
296
      cds = GetClusterDomainSecret()
297

    
298
      # Check X509 key name
299
      try:
300
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
301
      except (TypeError, ValueError), err:
302
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
303
                                   errors.ECODE_INVAL)
304

    
305
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
306
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
307
                                   errors.ECODE_INVAL)
308

    
309
      # Load and verify CA
310
      try:
311
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
312
      except OpenSSL.crypto.Error, err:
313
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
314
                                   (err, ), errors.ECODE_INVAL)
315

    
316
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
317
      if errcode is not None:
318
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
319
                                   (msg, ), errors.ECODE_INVAL)
320

    
321
      self.dest_x509_ca = cert
322

    
323
      # Verify target information
324
      disk_info = []
325
      for idx, disk_data in enumerate(self.op.target_node):
326
        try:
327
          (host, port, magic) = \
328
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
329
        except errors.GenericError, err:
330
          raise errors.OpPrereqError("Target info for disk %s: %s" %
331
                                     (idx, err), errors.ECODE_INVAL)
332

    
333
        disk_info.append((host, port, magic))
334

    
335
      assert len(disk_info) == len(self.op.target_node)
336
      self.dest_disk_info = disk_info
337

    
338
    else:
339
      raise errors.ProgrammerError("Unhandled export mode %r" %
340
                                   self.op.mode)
341

    
342
    # instance disk type verification
343
    # TODO: Implement export support for file-based disks
344
    for disk in self.instance.disks:
345
      if disk.dev_type == constants.LD_FILE:
346
        raise errors.OpPrereqError("Export not supported for instances with"
347
                                   " file-based disks", errors.ECODE_INVAL)
348

    
349
  def _CleanupExports(self, feedback_fn):
350
    """Removes exports of current instance from all other nodes.
351

352
    If an instance in a cluster with nodes A..D was exported to node C, its
353
    exports will be removed from the nodes A, B and D.
354

355
    """
356
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
357

    
358
    nodelist = self.cfg.GetNodeList()
359
    nodelist.remove(self.dst_node.name)
360

    
361
    # on one-node clusters nodelist will be empty after the removal
362
    # if we proceed the backup would be removed because OpBackupQuery
363
    # substitutes an empty list with the full cluster node list.
364
    iname = self.instance.name
365
    if nodelist:
366
      feedback_fn("Removing old exports for instance %s" % iname)
367
      exportlist = self.rpc.call_export_list(nodelist)
368
      for node in exportlist:
369
        if exportlist[node].fail_msg:
370
          continue
371
        if iname in exportlist[node].payload:
372
          msg = self.rpc.call_export_remove(node, iname).fail_msg
373
          if msg:
374
            self.LogWarning("Could not remove older export for instance %s"
375
                            " on node %s: %s", iname, node, msg)
376

    
377
  def Exec(self, feedback_fn):
378
    """Export an instance to an image in the cluster.
379

380
    """
381
    assert self.op.mode in constants.EXPORT_MODES
382

    
383
    instance = self.instance
384
    src_node = instance.primary_node
385

    
386
    if self.op.shutdown:
387
      # shutdown the instance, but not the disks
388
      feedback_fn("Shutting down instance %s" % instance.name)
389
      result = self.rpc.call_instance_shutdown(src_node, instance,
390
                                               self.op.shutdown_timeout,
391
                                               self.op.reason)
392
      # TODO: Maybe ignore failures if ignore_remove_failures is set
393
      result.Raise("Could not shutdown instance %s on"
394
                   " node %s" % (instance.name, src_node))
395

    
396
    # set the disks ID correctly since call_instance_start needs the
397
    # correct drbd minor to create the symlinks
398
    for disk in instance.disks:
399
      self.cfg.SetDiskID(disk, src_node)
400

    
401
    activate_disks = not instance.disks_active
402

    
403
    if activate_disks:
404
      # Activate the instance disks if we'exporting a stopped instance
405
      feedback_fn("Activating disks for %s" % instance.name)
406
      StartInstanceDisks(self, instance, None)
407

    
408
    try:
409
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
410
                                                     instance)
411

    
412
      helper.CreateSnapshots()
413
      try:
414
        if (self.op.shutdown and
415
            instance.admin_state == constants.ADMINST_UP and
416
            not self.op.remove_instance):
417
          assert not activate_disks
418
          feedback_fn("Starting instance %s" % instance.name)
419
          result = self.rpc.call_instance_start(src_node,
420
                                                (instance, None, None), False,
421
                                                 self.op.reason)
422
          msg = result.fail_msg
423
          if msg:
424
            feedback_fn("Failed to start instance: %s" % msg)
425
            ShutdownInstanceDisks(self, instance)
426
            raise errors.OpExecError("Could not start instance: %s" % msg)
427

    
428
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
429
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
430
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
431
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
432
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
433

    
434
          (key_name, _, _) = self.x509_key_name
435

    
436
          dest_ca_pem = \
437
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
438
                                            self.dest_x509_ca)
439

    
440
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
441
                                                     key_name, dest_ca_pem,
442
                                                     timeouts)
443
      finally:
444
        helper.Cleanup()
445

    
446
      # Check for backwards compatibility
447
      assert len(dresults) == len(instance.disks)
448
      assert compat.all(isinstance(i, bool) for i in dresults), \
449
             "Not all results are boolean: %r" % dresults
450

    
451
    finally:
452
      if activate_disks:
453
        feedback_fn("Deactivating disks for %s" % instance.name)
454
        ShutdownInstanceDisks(self, instance)
455

    
456
    if not (compat.all(dresults) and fin_resu):
457
      failures = []
458
      if not fin_resu:
459
        failures.append("export finalization")
460
      if not compat.all(dresults):
461
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
462
                               if not dsk)
463
        failures.append("disk export: disk(s) %s" % fdsk)
464

    
465
      raise errors.OpExecError("Export failed, errors in %s" %
466
                               utils.CommaJoin(failures))
467

    
468
    # At this point, the export was successful, we can cleanup/finish
469

    
470
    # Remove instance if requested
471
    if self.op.remove_instance:
472
      feedback_fn("Removing instance %s" % instance.name)
473
      RemoveInstance(self, feedback_fn, instance,
474
                     self.op.ignore_remove_failures)
475

    
476
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
477
      self._CleanupExports(feedback_fn)
478

    
479
    return fin_resu, dresults
480

    
481

    
482
class LUBackupRemove(NoHooksLU):
483
  """Remove exports related to the named instance.
484

485
  """
486
  REQ_BGL = False
487

    
488
  def ExpandNames(self):
489
    self.needed_locks = {
490
      # We need all nodes to be locked in order for RemoveExport to work, but
491
      # we don't need to lock the instance itself, as nothing will happen to it
492
      # (and we can remove exports also for a removed instance)
493
      locking.LEVEL_NODE: locking.ALL_SET,
494

    
495
      # Removing backups is quick, so blocking allocations is justified
496
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
497
      }
498

    
499
    # Allocations should be stopped while this LU runs with node locks, but it
500
    # doesn't have to be exclusive
501
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
502

    
503
  def Exec(self, feedback_fn):
504
    """Remove any export.
505

506
    """
507
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
508
    # If the instance was not found we'll try with the name that was passed in.
509
    # This will only work if it was an FQDN, though.
510
    fqdn_warn = False
511
    if not instance_name:
512
      fqdn_warn = True
513
      instance_name = self.op.instance_name
514

    
515
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
516
    exportlist = self.rpc.call_export_list(locked_nodes)
517
    found = False
518
    for node in exportlist:
519
      msg = exportlist[node].fail_msg
520
      if msg:
521
        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
522
        continue
523
      if instance_name in exportlist[node].payload:
524
        found = True
525
        result = self.rpc.call_export_remove(node, instance_name)
526
        msg = result.fail_msg
527
        if msg:
528
          logging.error("Could not remove export for instance %s"
529
                        " on node %s: %s", instance_name, node, msg)
530

    
531
    if fqdn_warn and not found:
532
      feedback_fn("Export not found. If trying to remove an export belonging"
533
                  " to a deleted instance please use its Fully Qualified"
534
                  " Domain Name.")