Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / backup.py @ a57e502a

History | View | Annotate | Download (18.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with backup operations."""
23

    
24
import OpenSSL
25
import logging
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import locking
31
from ganeti import masterd
32
from ganeti import qlang
33
from ganeti import query
34
from ganeti import utils
35

    
36
from ganeti.cmdlib.base import QueryBase, NoHooksLU, LogicalUnit
37
from ganeti.cmdlib.common import GetWantedNodes, ShareAll, CheckNodeOnline, \
38
  ExpandNodeUuidAndName
39
from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
40
  ShutdownInstanceDisks
41
from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
42
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance
43

    
44

    
45
class ExportQuery(QueryBase):
46
  FIELDS = query.EXPORT_FIELDS
47

    
48
  #: The node name is not a unique key for this query
49
  SORT_FIELD = "node"
50

    
51
  def ExpandNames(self, lu):
52
    lu.needed_locks = {}
53

    
54
    # The following variables interact with _QueryBase._GetNames
55
    if self.names:
56
      (self.wanted, _) = GetWantedNodes(lu, self.names)
57
    else:
58
      self.wanted = locking.ALL_SET
59

    
60
    self.do_locking = self.use_locking
61

    
62
    if self.do_locking:
63
      lu.share_locks = ShareAll()
64
      lu.needed_locks = {
65
        locking.LEVEL_NODE: self.wanted,
66
        }
67

    
68
      if not self.names:
69
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
70

    
71
  def DeclareLocks(self, lu, level):
72
    pass
73

    
74
  def _GetQueryData(self, lu):
75
    """Computes the list of nodes and their attributes.
76

77
    """
78
    # Locking is not used
79
    # TODO
80
    assert not (compat.any(lu.glm.is_owned(level)
81
                           for level in locking.LEVELS
82
                           if level != locking.LEVEL_CLUSTER) or
83
                self.do_locking or self.use_locking)
84

    
85
    node_uuids = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
86

    
87
    result = []
88

    
89
    for (node_uuid, nres) in lu.rpc.call_export_list(node_uuids).items():
90
      if nres.fail_msg:
91
        result.append((node_uuid, None))
92
      else:
93
        result.extend((node_uuid, expname) for expname in nres.payload)
94

    
95
    return result
96

    
97

    
98
class LUBackupQuery(NoHooksLU):
99
  """Query the exports list
100

101
  """
102
  REQ_BGL = False
103

    
104
  def CheckArguments(self):
105
    self.expq = ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
106
                            ["node", "export"], self.op.use_locking)
107

    
108
  def ExpandNames(self):
109
    self.expq.ExpandNames(self)
110

    
111
  def DeclareLocks(self, level):
112
    self.expq.DeclareLocks(self, level)
113

    
114
  def Exec(self, feedback_fn):
115
    result = {}
116

    
117
    for (node, expname) in self.expq.OldStyleQuery(self):
118
      if expname is None:
119
        result[node] = False
120
      else:
121
        result.setdefault(node, []).append(expname)
122

    
123
    return result
124

    
125

    
126
class LUBackupPrepare(NoHooksLU):
127
  """Prepares an instance for an export and returns useful information.
128

129
  """
130
  REQ_BGL = False
131

    
132
  def ExpandNames(self):
133
    self._ExpandAndLockInstance()
134

    
135
  def CheckPrereq(self):
136
    """Check prerequisites.
137

138
    """
139
    self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
140
    assert self.instance is not None, \
141
          "Cannot retrieve locked instance %s" % self.op.instance_name
142
    CheckNodeOnline(self, self.instance.primary_node)
143

    
144
    self._cds = GetClusterDomainSecret()
145

    
146
  def Exec(self, feedback_fn):
147
    """Prepares an instance for an export.
148

149
    """
150
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
151
      salt = utils.GenerateSecret(8)
152

    
153
      feedback_fn("Generating X509 certificate on %s" %
154
                  self.cfg.GetNodeName(self.instance.primary_node))
155
      result = self.rpc.call_x509_cert_create(self.instance.primary_node,
156
                                              constants.RIE_CERT_VALIDITY)
157
      result.Raise("Can't create X509 key and certificate on %s" %
158
                   self.cfg.GetNodeName(result.node))
159

    
160
      (name, cert_pem) = result.payload
161

    
162
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
163
                                             cert_pem)
164

    
165
      return {
166
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
167
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
168
                          salt),
169
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
170
        }
171

    
172
    return None
173

    
174

    
175
class LUBackupExport(LogicalUnit):
176
  """Export an instance to an image in the cluster.
177

178
  """
179
  HPATH = "instance-export"
180
  HTYPE = constants.HTYPE_INSTANCE
181
  REQ_BGL = False
182

    
183
  def CheckArguments(self):
184
    """Check the arguments.
185

186
    """
187
    self.x509_key_name = self.op.x509_key_name
188
    self.dest_x509_ca_pem = self.op.destination_x509_ca
189

    
190
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
191
      if not self.x509_key_name:
192
        raise errors.OpPrereqError("Missing X509 key name for encryption",
193
                                   errors.ECODE_INVAL)
194

    
195
      if not self.dest_x509_ca_pem:
196
        raise errors.OpPrereqError("Missing destination X509 CA",
197
                                   errors.ECODE_INVAL)
198

    
199
  def ExpandNames(self):
200
    self._ExpandAndLockInstance()
201

    
202
    # Lock all nodes for local exports
203
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
204
      (self.op.target_node_uuid, self.op.target_node) = \
205
        ExpandNodeUuidAndName(self.cfg, self.op.target_node_uuid,
206
                              self.op.target_node)
207
      # FIXME: lock only instance primary and destination node
208
      #
209
      # Sad but true, for now we have do lock all nodes, as we don't know where
210
      # the previous export might be, and in this LU we search for it and
211
      # remove it from its current node. In the future we could fix this by:
212
      #  - making a tasklet to search (share-lock all), then create the
213
      #    new one, then one to remove, after
214
      #  - removing the removal operation altogether
215
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
216

    
217
      # Allocations should be stopped while this LU runs with node locks, but
218
      # it doesn't have to be exclusive
219
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
220
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
221

    
222
  def DeclareLocks(self, level):
223
    """Last minute lock declaration."""
224
    # All nodes are locked anyway, so nothing to do here.
225

    
226
  def BuildHooksEnv(self):
227
    """Build hooks env.
228

229
    This will run on the master, primary node and target node.
230

231
    """
232
    env = {
233
      "EXPORT_MODE": self.op.mode,
234
      "EXPORT_NODE": self.op.target_node,
235
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
236
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
237
      # TODO: Generic function for boolean env variables
238
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
239
      }
240

    
241
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
242

    
243
    return env
244

    
245
  def BuildHooksNodes(self):
246
    """Build hooks nodes.
247

248
    """
249
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
250

    
251
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
252
      nl.append(self.op.target_node_uuid)
253

    
254
    return (nl, nl)
255

    
256
  def CheckPrereq(self):
257
    """Check prerequisites.
258

259
    This checks that the instance and node names are valid.
260

261
    """
262
    self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
263
    assert self.instance is not None, \
264
          "Cannot retrieve locked instance %s" % self.op.instance_name
265
    CheckNodeOnline(self, self.instance.primary_node)
266

    
267
    if (self.op.remove_instance and
268
        self.instance.admin_state == constants.ADMINST_UP and
269
        not self.op.shutdown):
270
      raise errors.OpPrereqError("Can not remove instance without shutting it"
271
                                 " down before", errors.ECODE_STATE)
272

    
273
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
274
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
275
      assert self.dst_node is not None
276

    
277
      CheckNodeOnline(self, self.dst_node.uuid)
278
      CheckNodeNotDrained(self, self.dst_node.uuid)
279

    
280
      self._cds = None
281
      self.dest_disk_info = None
282
      self.dest_x509_ca = None
283

    
284
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
285
      self.dst_node = None
286

    
287
      if len(self.op.target_node) != len(self.instance.disks):
288
        raise errors.OpPrereqError(("Received destination information for %s"
289
                                    " disks, but instance %s has %s disks") %
290
                                   (len(self.op.target_node),
291
                                    self.op.instance_name,
292
                                    len(self.instance.disks)),
293
                                   errors.ECODE_INVAL)
294

    
295
      cds = GetClusterDomainSecret()
296

    
297
      # Check X509 key name
298
      try:
299
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
300
      except (TypeError, ValueError), err:
301
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
302
                                   errors.ECODE_INVAL)
303

    
304
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
305
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
306
                                   errors.ECODE_INVAL)
307

    
308
      # Load and verify CA
309
      try:
310
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
311
      except OpenSSL.crypto.Error, err:
312
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
313
                                   (err, ), errors.ECODE_INVAL)
314

    
315
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
316
      if errcode is not None:
317
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
318
                                   (msg, ), errors.ECODE_INVAL)
319

    
320
      self.dest_x509_ca = cert
321

    
322
      # Verify target information
323
      disk_info = []
324
      for idx, disk_data in enumerate(self.op.target_node):
325
        try:
326
          (host, port, magic) = \
327
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
328
        except errors.GenericError, err:
329
          raise errors.OpPrereqError("Target info for disk %s: %s" %
330
                                     (idx, err), errors.ECODE_INVAL)
331

    
332
        disk_info.append((host, port, magic))
333

    
334
      assert len(disk_info) == len(self.op.target_node)
335
      self.dest_disk_info = disk_info
336

    
337
    else:
338
      raise errors.ProgrammerError("Unhandled export mode %r" %
339
                                   self.op.mode)
340

    
341
    # instance disk type verification
342
    # TODO: Implement export support for file-based disks
343
    for disk in self.instance.disks:
344
      if disk.dev_type in [constants.DT_FILE, constants.DT_SHARED_FILE]:
345
        raise errors.OpPrereqError("Export not supported for instances with"
346
                                   " file-based disks", errors.ECODE_INVAL)
347

    
348
  def _CleanupExports(self, feedback_fn):
349
    """Removes exports of current instance from all other nodes.
350

351
    If an instance in a cluster with nodes A..D was exported to node C, its
352
    exports will be removed from the nodes A, B and D.
353

354
    """
355
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
356

    
357
    node_uuids = self.cfg.GetNodeList()
358
    node_uuids.remove(self.dst_node.uuid)
359

    
360
    # on one-node clusters nodelist will be empty after the removal
361
    # if we proceed the backup would be removed because OpBackupQuery
362
    # substitutes an empty list with the full cluster node list.
363
    iname = self.instance.name
364
    if node_uuids:
365
      feedback_fn("Removing old exports for instance %s" % iname)
366
      exportlist = self.rpc.call_export_list(node_uuids)
367
      for node_uuid in exportlist:
368
        if exportlist[node_uuid].fail_msg:
369
          continue
370
        if iname in exportlist[node_uuid].payload:
371
          msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
372
          if msg:
373
            self.LogWarning("Could not remove older export for instance %s"
374
                            " on node %s: %s", iname,
375
                            self.cfg.GetNodeName(node_uuid), msg)
376

    
377
  def Exec(self, feedback_fn):
378
    """Export an instance to an image in the cluster.
379

380
    """
381
    assert self.op.mode in constants.EXPORT_MODES
382

    
383
    src_node_uuid = self.instance.primary_node
384

    
385
    if self.op.shutdown:
386
      # shutdown the instance, but not the disks
387
      feedback_fn("Shutting down instance %s" % self.instance.name)
388
      result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
389
                                               self.op.shutdown_timeout,
390
                                               self.op.reason)
391
      # TODO: Maybe ignore failures if ignore_remove_failures is set
392
      result.Raise("Could not shutdown instance %s on"
393
                   " node %s" % (self.instance.name,
394
                                 self.cfg.GetNodeName(src_node_uuid)))
395

    
396
    activate_disks = not self.instance.disks_active
397

    
398
    if activate_disks:
399
      # Activate the instance disks if we'exporting a stopped instance
400
      feedback_fn("Activating disks for %s" % self.instance.name)
401
      StartInstanceDisks(self, self.instance, None)
402

    
403
    try:
404
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
405
                                                     self.instance)
406

    
407
      helper.CreateSnapshots()
408
      try:
409
        if (self.op.shutdown and
410
            self.instance.admin_state == constants.ADMINST_UP and
411
            not self.op.remove_instance):
412
          assert not activate_disks
413
          feedback_fn("Starting instance %s" % self.instance.name)
414
          result = self.rpc.call_instance_start(src_node_uuid,
415
                                                (self.instance, None, None),
416
                                                False, self.op.reason)
417
          msg = result.fail_msg
418
          if msg:
419
            feedback_fn("Failed to start instance: %s" % msg)
420
            ShutdownInstanceDisks(self, self.instance)
421
            raise errors.OpExecError("Could not start instance: %s" % msg)
422

    
423
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
424
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
425
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
426
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
427
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
428

    
429
          (key_name, _, _) = self.x509_key_name
430

    
431
          dest_ca_pem = \
432
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
433
                                            self.dest_x509_ca)
434

    
435
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
436
                                                     key_name, dest_ca_pem,
437
                                                     timeouts)
438
      finally:
439
        helper.Cleanup()
440

    
441
      # Check for backwards compatibility
442
      assert len(dresults) == len(self.instance.disks)
443
      assert compat.all(isinstance(i, bool) for i in dresults), \
444
             "Not all results are boolean: %r" % dresults
445

    
446
    finally:
447
      if activate_disks:
448
        feedback_fn("Deactivating disks for %s" % self.instance.name)
449
        ShutdownInstanceDisks(self, self.instance)
450

    
451
    if not (compat.all(dresults) and fin_resu):
452
      failures = []
453
      if not fin_resu:
454
        failures.append("export finalization")
455
      if not compat.all(dresults):
456
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
457
                               if not dsk)
458
        failures.append("disk export: disk(s) %s" % fdsk)
459

    
460
      raise errors.OpExecError("Export failed, errors in %s" %
461
                               utils.CommaJoin(failures))
462

    
463
    # At this point, the export was successful, we can cleanup/finish
464

    
465
    # Remove instance if requested
466
    if self.op.remove_instance:
467
      feedback_fn("Removing instance %s" % self.instance.name)
468
      RemoveInstance(self, feedback_fn, self.instance,
469
                     self.op.ignore_remove_failures)
470

    
471
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
472
      self._CleanupExports(feedback_fn)
473

    
474
    return fin_resu, dresults
475

    
476

    
477
class LUBackupRemove(NoHooksLU):
478
  """Remove exports related to the named instance.
479

480
  """
481
  REQ_BGL = False
482

    
483
  def ExpandNames(self):
484
    self.needed_locks = {
485
      # We need all nodes to be locked in order for RemoveExport to work, but
486
      # we don't need to lock the instance itself, as nothing will happen to it
487
      # (and we can remove exports also for a removed instance)
488
      locking.LEVEL_NODE: locking.ALL_SET,
489

    
490
      # Removing backups is quick, so blocking allocations is justified
491
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
492
      }
493

    
494
    # Allocations should be stopped while this LU runs with node locks, but it
495
    # doesn't have to be exclusive
496
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
497

    
498
  def Exec(self, feedback_fn):
499
    """Remove any export.
500

501
    """
502
    (_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
503
    # If the instance was not found we'll try with the name that was passed in.
504
    # This will only work if it was an FQDN, though.
505
    fqdn_warn = False
506
    if not inst_name:
507
      fqdn_warn = True
508
      inst_name = self.op.instance_name
509

    
510
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
511
    exportlist = self.rpc.call_export_list(locked_nodes)
512
    found = False
513
    for node_uuid in exportlist:
514
      msg = exportlist[node_uuid].fail_msg
515
      if msg:
516
        self.LogWarning("Failed to query node %s (continuing): %s",
517
                        self.cfg.GetNodeName(node_uuid), msg)
518
        continue
519
      if inst_name in exportlist[node_uuid].payload:
520
        found = True
521
        result = self.rpc.call_export_remove(node_uuid, inst_name)
522
        msg = result.fail_msg
523
        if msg:
524
          logging.error("Could not remove export for instance %s"
525
                        " on node %s: %s", inst_name,
526
                        self.cfg.GetNodeName(node_uuid), msg)
527

    
528
    if fqdn_warn and not found:
529
      feedback_fn("Export not found. If trying to remove an export belonging"
530
                  " to a deleted instance please use its Fully Qualified"
531
                  " Domain Name.")