Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ b82d7182

History | View | Annotate | Download (46.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37

    
38

    
39
class _ImportExportError(Exception):
40
  """Local exception to report import/export errors.
41

42
  """
43

    
44

    
45
class ImportExportTimeouts(object):
46
  #: Time until daemon starts writing status file
47
  DEFAULT_READY_TIMEOUT = 10
48

    
49
  #: Length of time until errors cause hard failure
50
  DEFAULT_ERROR_TIMEOUT = 10
51

    
52
  #: Time after which daemon must be listening
53
  DEFAULT_LISTEN_TIMEOUT = 10
54

    
55
  #: Progress update interval
56
  DEFAULT_PROGRESS_INTERVAL = 60
57

    
58
  __slots__ = [
59
    "error",
60
    "ready",
61
    "listen",
62
    "connect",
63
    "progress",
64
    ]
65

    
66
  def __init__(self, connect,
67
               listen=DEFAULT_LISTEN_TIMEOUT,
68
               error=DEFAULT_ERROR_TIMEOUT,
69
               ready=DEFAULT_READY_TIMEOUT,
70
               progress=DEFAULT_PROGRESS_INTERVAL):
71
    """Initializes this class.
72

73
    @type connect: number
74
    @param connect: Timeout for establishing connection
75
    @type listen: number
76
    @param listen: Timeout for starting to listen for connections
77
    @type error: number
78
    @param error: Length of time until errors cause hard failure
79
    @type ready: number
80
    @param ready: Timeout for daemon to become ready
81
    @type progress: number
82
    @param progress: Progress update interval
83

84
    """
85
    self.error = error
86
    self.ready = ready
87
    self.listen = listen
88
    self.connect = connect
89
    self.progress = progress
90

    
91

    
92
class ImportExportCbBase(object):
93
  """Callbacks for disk import/export.
94

95
  """
96
  def ReportListening(self, ie, private, component):
97
    """Called when daemon started listening.
98

99
    @type ie: Subclass of L{_DiskImportExportBase}
100
    @param ie: Import/export object
101
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103

104
    """
105

    
106
  def ReportConnected(self, ie, private):
107
    """Called when a connection has been established.
108

109
    @type ie: Subclass of L{_DiskImportExportBase}
110
    @param ie: Import/export object
111
    @param private: Private data passed to import/export object
112

113
    """
114

    
115
  def ReportProgress(self, ie, private):
116
    """Called when new progress information should be reported.
117

118
    @type ie: Subclass of L{_DiskImportExportBase}
119
    @param ie: Import/export object
120
    @param private: Private data passed to import/export object
121

122
    """
123

    
124
  def ReportFinished(self, ie, private):
125
    """Called when a transfer has finished.
126

127
    @type ie: Subclass of L{_DiskImportExportBase}
128
    @param ie: Import/export object
129
    @param private: Private data passed to import/export object
130

131
    """
132

    
133

    
134
class _DiskImportExportBase(object):
135
  MODE_TEXT = None
136

    
137
  def __init__(self, lu, node_uuid, opts,
138
               instance, component, timeouts, cbs, private=None):
139
    """Initializes this class.
140

141
    @param lu: Logical unit instance
142
    @type node_uuid: string
143
    @param node_uuid: Node UUID for import
144
    @type opts: L{objects.ImportExportOptions}
145
    @param opts: Import/export daemon options
146
    @type instance: L{objects.Instance}
147
    @param instance: Instance object
148
    @type component: string
149
    @param component: which part of the instance is being imported
150
    @type timeouts: L{ImportExportTimeouts}
151
    @param timeouts: Timeouts for this import
152
    @type cbs: L{ImportExportCbBase}
153
    @param cbs: Callbacks
154
    @param private: Private data for callback functions
155

156
    """
157
    assert self.MODE_TEXT
158

    
159
    self._lu = lu
160
    self.node_uuid = node_uuid
161
    self.node_name = lu.cfg.GetNodeName(node_uuid)
162
    self._opts = opts.Copy()
163
    self._instance = instance
164
    self._component = component
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Set master daemon's timeout in options for import/export daemon
170
    assert self._opts.connect_timeout is None
171
    self._opts.connect_timeout = timeouts.connect
172

    
173
    # Parent loop
174
    self._loop = None
175

    
176
    # Timestamps
177
    self._ts_begin = None
178
    self._ts_connected = None
179
    self._ts_finished = None
180
    self._ts_cleanup = None
181
    self._ts_last_progress = None
182
    self._ts_last_error = None
183

    
184
    # Transfer status
185
    self.success = None
186
    self.final_message = None
187

    
188
    # Daemon status
189
    self._daemon_name = None
190
    self._daemon = None
191

    
192
  @property
193
  def recent_output(self):
194
    """Returns the most recent output from the daemon.
195

196
    """
197
    if self._daemon:
198
      return "\n".join(self._daemon.recent_output)
199

    
200
    return None
201

    
202
  @property
203
  def progress(self):
204
    """Returns transfer progress information.
205

206
    """
207
    if not self._daemon:
208
      return None
209

    
210
    return (self._daemon.progress_mbytes,
211
            self._daemon.progress_throughput,
212
            self._daemon.progress_percent,
213
            self._daemon.progress_eta)
214

    
215
  @property
216
  def magic(self):
217
    """Returns the magic value for this import/export.
218

219
    """
220
    return self._opts.magic
221

    
222
  @property
223
  def active(self):
224
    """Determines whether this transport is still active.
225

226
    """
227
    return self.success is None
228

    
229
  @property
230
  def loop(self):
231
    """Returns parent loop.
232

233
    @rtype: L{ImportExportLoop}
234

235
    """
236
    return self._loop
237

    
238
  def SetLoop(self, loop):
239
    """Sets the parent loop.
240

241
    @type loop: L{ImportExportLoop}
242

243
    """
244
    if self._loop:
245
      raise errors.ProgrammerError("Loop can only be set once")
246

    
247
    self._loop = loop
248

    
249
  def _StartDaemon(self):
250
    """Starts the import/export daemon.
251

252
    """
253
    raise NotImplementedError()
254

    
255
  def CheckDaemon(self):
256
    """Checks whether daemon has been started and if not, starts it.
257

258
    @rtype: string
259
    @return: Daemon name
260

261
    """
262
    assert self._ts_cleanup is None
263

    
264
    if self._daemon_name is None:
265
      assert self._ts_begin is None
266

    
267
      result = self._StartDaemon()
268
      if result.fail_msg:
269
        raise _ImportExportError("Failed to start %s on %s: %s" %
270
                                 (self.MODE_TEXT, self.node_name,
271
                                  result.fail_msg))
272

    
273
      daemon_name = result.payload
274

    
275
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276
                   self.node_name)
277

    
278
      self._ts_begin = time.time()
279
      self._daemon_name = daemon_name
280

    
281
    return self._daemon_name
282

    
283
  def GetDaemonName(self):
284
    """Returns the daemon name.
285

286
    """
287
    assert self._daemon_name, "Daemon has not been started"
288
    assert self._ts_cleanup is None
289
    return self._daemon_name
290

    
291
  def Abort(self):
292
    """Sends SIGTERM to import/export daemon (if still active).
293

294
    """
295
    if self._daemon_name:
296
      self._lu.LogWarning("Aborting %s '%s' on %s",
297
                          self.MODE_TEXT, self._daemon_name, self.node_uuid)
298
      result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299
      if result.fail_msg:
300
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301
                            self.MODE_TEXT, self._daemon_name,
302
                            self.node_uuid, result.fail_msg)
303
        return False
304

    
305
    return True
306

    
307
  def _SetDaemonData(self, data):
308
    """Internal function for updating status daemon data.
309

310
    @type data: L{objects.ImportExportStatus}
311
    @param data: Daemon status data
312

313
    """
314
    assert self._ts_begin is not None
315

    
316
    if not data:
317
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318
        raise _ImportExportError("Didn't become ready after %s seconds" %
319
                                 self._timeouts.ready)
320

    
321
      return False
322

    
323
    self._daemon = data
324

    
325
    return True
326

    
327
  def SetDaemonData(self, success, data):
328
    """Updates daemon status data.
329

330
    @type success: bool
331
    @param success: Whether fetching data was successful or not
332
    @type data: L{objects.ImportExportStatus}
333
    @param data: Daemon status data
334

335
    """
336
    if not success:
337
      if self._ts_last_error is None:
338
        self._ts_last_error = time.time()
339

    
340
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341
        raise _ImportExportError("Too many errors while updating data")
342

    
343
      return False
344

    
345
    self._ts_last_error = None
346

    
347
    return self._SetDaemonData(data)
348

    
349
  def CheckListening(self):
350
    """Checks whether the daemon is listening.
351

352
    """
353
    raise NotImplementedError()
354

    
355
  def _GetConnectedCheckEpoch(self):
356
    """Returns timeout to calculate connect timeout.
357

358
    """
359
    raise NotImplementedError()
360

    
361
  def CheckConnected(self):
362
    """Checks whether the daemon is connected.
363

364
    @rtype: bool
365
    @return: Whether the daemon is connected
366

367
    """
368
    assert self._daemon, "Daemon status missing"
369

    
370
    if self._ts_connected is not None:
371
      return True
372

    
373
    if self._daemon.connected:
374
      self._ts_connected = time.time()
375

    
376
      # TODO: Log remote peer
377
      logging.debug("%s '%s' on %s is now connected",
378
                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
379

    
380
      self._cbs.ReportConnected(self, self._private)
381

    
382
      return True
383

    
384
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385
                            self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
        utils.TimeoutExpired(self._ts_last_progress,
397
                             self._timeouts.progress)) and
398
        self._daemon and
399
        self._daemon.progress_mbytes is not None and
400
        self._daemon.progress_throughput is not None):
401
      self._cbs.ReportProgress(self, self._private)
402
      self._ts_last_progress = time.time()
403

    
404
  def CheckFinished(self):
405
    """Checks whether the daemon exited.
406

407
    @rtype: bool
408
    @return: Whether the transfer is finished
409

410
    """
411
    assert self._daemon, "Daemon status missing"
412

    
413
    if self._ts_finished:
414
      return True
415

    
416
    if self._daemon.exit_status is None:
417
      # TODO: Adjust delay for ETA expiring soon
418
      self._CheckProgress()
419
      return False
420

    
421
    self._ts_finished = time.time()
422

    
423
    self._ReportFinished(self._daemon.exit_status == 0,
424
                         self._daemon.error_message)
425

    
426
    return True
427

    
428
  def _ReportFinished(self, success, message):
429
    """Transfer is finished or daemon exited.
430

431
    @type success: bool
432
    @param success: Whether the transfer was successful
433
    @type message: string
434
    @param message: Error message
435

436
    """
437
    assert self.success is None
438

    
439
    self.success = success
440
    self.final_message = message
441

    
442
    if success:
443
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444
                   self._daemon_name, self.node_uuid)
445
    elif self._daemon_name:
446
      self._lu.LogWarning("%s '%s' on %s failed: %s",
447
                          self.MODE_TEXT, self._daemon_name,
448
                          self._lu.cfg.GetNodeName(self.node_uuid),
449
                          message)
450
    else:
451
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452
                          self._lu.cfg.GetNodeName(self.node_uuid), message)
453

    
454
    self._cbs.ReportFinished(self, self._private)
455

    
456
  def _Finalize(self):
457
    """Makes the RPC call to finalize this import/export.
458

459
    """
460
    return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461

    
462
  def Finalize(self, error=None):
463
    """Finalizes this import/export.
464

465
    """
466
    if self._daemon_name:
467
      logging.info("Finalizing %s '%s' on %s",
468
                   self.MODE_TEXT, self._daemon_name, self.node_uuid)
469

    
470
      result = self._Finalize()
471
      if result.fail_msg:
472
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473
                            self.MODE_TEXT, self._daemon_name,
474
                            self.node_uuid, result.fail_msg)
475
        return False
476

    
477
      # Daemon is no longer running
478
      self._daemon_name = None
479
      self._ts_cleanup = time.time()
480

    
481
    if error:
482
      self._ReportFinished(False, error)
483

    
484
    return True
485

    
486

    
487
class DiskImport(_DiskImportExportBase):
488
  MODE_TEXT = "import"
489

    
490
  def __init__(self, lu, node_uuid, opts, instance, component,
491
               dest, dest_args, timeouts, cbs, private=None):
492
    """Initializes this class.
493

494
    @param lu: Logical unit instance
495
    @type node_uuid: string
496
    @param node_uuid: Node name for import
497
    @type opts: L{objects.ImportExportOptions}
498
    @param opts: Import/export daemon options
499
    @type instance: L{objects.Instance}
500
    @param instance: Instance object
501
    @type component: string
502
    @param component: which part of the instance is being imported
503
    @param dest: I/O destination
504
    @param dest_args: I/O arguments
505
    @type timeouts: L{ImportExportTimeouts}
506
    @param timeouts: Timeouts for this import
507
    @type cbs: L{ImportExportCbBase}
508
    @param cbs: Callbacks
509
    @param private: Private data for callback functions
510

511
    """
512
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513
                                   component, timeouts, cbs, private)
514
    self._dest = dest
515
    self._dest_args = dest_args
516

    
517
    # Timestamps
518
    self._ts_listening = None
519

    
520
  @property
521
  def listen_port(self):
522
    """Returns the port the daemon is listening on.
523

524
    """
525
    if self._daemon:
526
      return self._daemon.listen_port
527

    
528
    return None
529

    
530
  def _StartDaemon(self):
531
    """Starts the import daemon.
532

533
    """
534
    return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535
                                          self._instance, self._component,
536
                                          (self._dest, self._dest_args))
537

    
538
  def CheckListening(self):
539
    """Checks whether the daemon is listening.
540

541
    @rtype: bool
542
    @return: Whether the daemon is listening
543

544
    """
545
    assert self._daemon, "Daemon status missing"
546

    
547
    if self._ts_listening is not None:
548
      return True
549

    
550
    port = self._daemon.listen_port
551
    if port is not None:
552
      self._ts_listening = time.time()
553

    
554
      logging.debug("Import '%s' on %s is now listening on port %s",
555
                    self._daemon_name, self.node_uuid, port)
556

    
557
      self._cbs.ReportListening(self, self._private, self._component)
558

    
559
      return True
560

    
561
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562
      raise _ImportExportError("Not listening after %s seconds" %
563
                               self._timeouts.listen)
564

    
565
    return False
566

    
567
  def _GetConnectedCheckEpoch(self):
568
    """Returns the time since we started listening.
569

570
    """
571
    assert self._ts_listening is not None, \
572
           ("Checking whether an import is connected is only useful"
573
            " once it's been listening")
574

    
575
    return self._ts_listening
576

    
577

    
578
class DiskExport(_DiskImportExportBase):
579
  MODE_TEXT = "export"
580

    
581
  def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582
               instance, component, source, source_args,
583
               timeouts, cbs, private=None):
584
    """Initializes this class.
585

586
    @param lu: Logical unit instance
587
    @type node_uuid: string
588
    @param node_uuid: Node UUID for import
589
    @type opts: L{objects.ImportExportOptions}
590
    @param opts: Import/export daemon options
591
    @type dest_host: string
592
    @param dest_host: Destination host name or IP address
593
    @type dest_port: number
594
    @param dest_port: Destination port number
595
    @type instance: L{objects.Instance}
596
    @param instance: Instance object
597
    @type component: string
598
    @param component: which part of the instance is being imported
599
    @param source: I/O source
600
    @param source_args: I/O source
601
    @type timeouts: L{ImportExportTimeouts}
602
    @param timeouts: Timeouts for this import
603
    @type cbs: L{ImportExportCbBase}
604
    @param cbs: Callbacks
605
    @param private: Private data for callback functions
606

607
    """
608
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609
                                   component, timeouts, cbs, private)
610
    self._dest_host = dest_host
611
    self._dest_port = dest_port
612
    self._source = source
613
    self._source_args = source_args
614

    
615
  def _StartDaemon(self):
616
    """Starts the export daemon.
617

618
    """
619
    return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620
                                          self._dest_host, self._dest_port,
621
                                          self._instance, self._component,
622
                                          (self._source, self._source_args))
623

    
624
  def CheckListening(self):
625
    """Checks whether the daemon is listening.
626

627
    """
628
    # Only an import can be listening
629
    return True
630

    
631
  def _GetConnectedCheckEpoch(self):
632
    """Returns the time since the daemon started.
633

634
    """
635
    assert self._ts_begin is not None
636

    
637
    return self._ts_begin
638

    
639

    
640
def FormatProgress(progress):
641
  """Formats progress information for user consumption
642

643
  """
644
  (mbytes, throughput, percent, eta) = progress
645

    
646
  parts = [
647
    utils.FormatUnit(mbytes, "h"),
648

    
649
    # Not using FormatUnit as it doesn't support kilobytes
650
    "%0.1f MiB/s" % throughput,
651
    ]
652

    
653
  if percent is not None:
654
    parts.append("%d%%" % percent)
655

    
656
  if eta is not None:
657
    parts.append("ETA %s" % utils.FormatSeconds(eta))
658

    
659
  return utils.CommaJoin(parts)
660

    
661

    
662
class ImportExportLoop:
663
  MIN_DELAY = 1.0
664
  MAX_DELAY = 20.0
665

    
666
  def __init__(self, lu):
667
    """Initializes this class.
668

669
    """
670
    self._lu = lu
671
    self._queue = []
672
    self._pending_add = []
673

    
674
  def Add(self, diskie):
675
    """Adds an import/export object to the loop.
676

677
    @type diskie: Subclass of L{_DiskImportExportBase}
678
    @param diskie: Import/export object
679

680
    """
681
    assert diskie not in self._pending_add
682
    assert diskie.loop is None
683

    
684
    diskie.SetLoop(self)
685

    
686
    # Adding new objects to a staging list is necessary, otherwise the main
687
    # loop gets confused if callbacks modify the queue while the main loop is
688
    # iterating over it.
689
    self._pending_add.append(diskie)
690

    
691
  @staticmethod
692
  def _CollectDaemonStatus(lu, daemons):
693
    """Collects the status for all import/export daemons.
694

695
    """
696
    daemon_status = {}
697

    
698
    for node_name, names in daemons.iteritems():
699
      result = lu.rpc.call_impexp_status(node_name, names)
700
      if result.fail_msg:
701
        lu.LogWarning("Failed to get daemon status on %s: %s",
702
                      node_name, result.fail_msg)
703
        continue
704

    
705
      assert len(names) == len(result.payload)
706

    
707
      daemon_status[node_name] = dict(zip(names, result.payload))
708

    
709
    return daemon_status
710

    
711
  @staticmethod
712
  def _GetActiveDaemonNames(queue):
713
    """Gets the names of all active daemons.
714

715
    """
716
    result = {}
717
    for diskie in queue:
718
      if not diskie.active:
719
        continue
720

    
721
      try:
722
        # Start daemon if necessary
723
        daemon_name = diskie.CheckDaemon()
724
      except _ImportExportError, err:
725
        logging.exception("%s failed", diskie.MODE_TEXT)
726
        diskie.Finalize(error=str(err))
727
        continue
728

    
729
      result.setdefault(diskie.node_name, []).append(daemon_name)
730

    
731
    assert len(queue) >= len(result)
732
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
733

    
734
    logging.debug("daemons=%r", result)
735

    
736
    return result
737

    
738
  def _AddPendingToQueue(self):
739
    """Adds all pending import/export objects to the internal queue.
740

741
    """
742
    assert compat.all(diskie not in self._queue and diskie.loop == self
743
                      for diskie in self._pending_add)
744

    
745
    self._queue.extend(self._pending_add)
746

    
747
    del self._pending_add[:]
748

    
749
  def Run(self):
750
    """Utility main loop.
751

752
    """
753
    while True:
754
      self._AddPendingToQueue()
755

    
756
      # Collect all active daemon names
757
      daemons = self._GetActiveDaemonNames(self._queue)
758
      if not daemons:
759
        break
760

    
761
      # Collection daemon status data
762
      data = self._CollectDaemonStatus(self._lu, daemons)
763

    
764
      # Use data
765
      delay = self.MAX_DELAY
766
      for diskie in self._queue:
767
        if not diskie.active:
768
          continue
769

    
770
        try:
771
          try:
772
            all_daemon_data = data[diskie.node_name]
773
          except KeyError:
774
            result = diskie.SetDaemonData(False, None)
775
          else:
776
            result = \
777
              diskie.SetDaemonData(True,
778
                                   all_daemon_data[diskie.GetDaemonName()])
779

    
780
          if not result:
781
            # Daemon not yet ready, retry soon
782
            delay = min(3.0, delay)
783
            continue
784

    
785
          if diskie.CheckFinished():
786
            # Transfer finished
787
            diskie.Finalize()
788
            continue
789

    
790
          # Normal case: check again in 5 seconds
791
          delay = min(5.0, delay)
792

    
793
          if not diskie.CheckListening():
794
            # Not yet listening, retry soon
795
            delay = min(1.0, delay)
796
            continue
797

    
798
          if not diskie.CheckConnected():
799
            # Not yet connected, retry soon
800
            delay = min(1.0, delay)
801
            continue
802

    
803
        except _ImportExportError, err:
804
          logging.exception("%s failed", diskie.MODE_TEXT)
805
          diskie.Finalize(error=str(err))
806

    
807
      if not compat.any(diskie.active for diskie in self._queue):
808
        break
809

    
810
      # Wait a bit
811
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812
      logging.debug("Waiting for %ss", delay)
813
      time.sleep(delay)
814

    
815
  def FinalizeAll(self):
816
    """Finalizes all pending transfers.
817

818
    """
819
    success = True
820

    
821
    for diskie in self._queue:
822
      success = diskie.Finalize() and success
823

    
824
    return success
825

    
826

    
827
class _TransferInstCbBase(ImportExportCbBase):
828
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829
               src_cbs, dest_node_uuid, dest_ip):
830
    """Initializes this class.
831

832
    """
833
    ImportExportCbBase.__init__(self)
834

    
835
    self.lu = lu
836
    self.feedback_fn = feedback_fn
837
    self.instance = instance
838
    self.timeouts = timeouts
839
    self.src_node_uuid = src_node_uuid
840
    self.src_cbs = src_cbs
841
    self.dest_node_uuid = dest_node_uuid
842
    self.dest_ip = dest_ip
843

    
844

    
845
class _TransferInstSourceCb(_TransferInstCbBase):
846
  def ReportConnected(self, ie, dtp):
847
    """Called when a connection has been established.
848

849
    """
850
    assert self.src_cbs is None
851
    assert dtp.src_export == ie
852
    assert dtp.dest_import
853

    
854
    self.feedback_fn("%s is sending data on %s" %
855
                     (dtp.data.name, ie.node_name))
856

    
857
  def ReportProgress(self, ie, dtp):
858
    """Called when new progress information should be reported.
859

860
    """
861
    progress = ie.progress
862
    if not progress:
863
      return
864

    
865
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866

    
867
  def ReportFinished(self, ie, dtp):
868
    """Called when a transfer has finished.
869

870
    """
871
    assert self.src_cbs is None
872
    assert dtp.src_export == ie
873
    assert dtp.dest_import
874

    
875
    if ie.success:
876
      self.feedback_fn("%s finished sending data" % dtp.data.name)
877
    else:
878
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879
                       (dtp.data.name, ie.final_message, ie.recent_output))
880

    
881
    dtp.RecordResult(ie.success)
882

    
883
    cb = dtp.data.finished_fn
884
    if cb:
885
      cb()
886

    
887
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888
    # give the daemon a moment to sort things out
889
    if dtp.dest_import and not ie.success:
890
      dtp.dest_import.Abort()
891

    
892

    
893
class _TransferInstDestCb(_TransferInstCbBase):
894
  def ReportListening(self, ie, dtp, component):
895
    """Called when daemon started listening.
896

897
    """
898
    assert self.src_cbs
899
    assert dtp.src_export is None
900
    assert dtp.dest_import
901
    assert dtp.export_opts
902

    
903
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904

    
905
    # Start export on source node
906
    de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907
                    self.dest_ip, ie.listen_port, self.instance,
908
                    component, dtp.data.src_io, dtp.data.src_ioargs,
909
                    self.timeouts, self.src_cbs, private=dtp)
910
    ie.loop.Add(de)
911

    
912
    dtp.src_export = de
913

    
914
  def ReportConnected(self, ie, dtp):
915
    """Called when a connection has been established.
916

917
    """
918
    self.feedback_fn("%s is receiving data on %s" %
919
                     (dtp.data.name,
920
                      self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921

    
922
  def ReportFinished(self, ie, dtp):
923
    """Called when a transfer has finished.
924

925
    """
926
    if ie.success:
927
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
928
    else:
929
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930
                       (dtp.data.name, ie.final_message, ie.recent_output))
931

    
932
    dtp.RecordResult(ie.success)
933

    
934
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935
    # give the daemon a moment to sort things out
936
    if dtp.src_export and not ie.success:
937
      dtp.src_export.Abort()
938

    
939

    
940
class DiskTransfer(object):
941
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942
               finished_fn):
943
    """Initializes this class.
944

945
    @type name: string
946
    @param name: User-visible name for this transfer (e.g. "disk/0")
947
    @param src_io: Source I/O type
948
    @param src_ioargs: Source I/O arguments
949
    @param dest_io: Destination I/O type
950
    @param dest_ioargs: Destination I/O arguments
951
    @type finished_fn: callable
952
    @param finished_fn: Function called once transfer has finished
953

954
    """
955
    self.name = name
956

    
957
    self.src_io = src_io
958
    self.src_ioargs = src_ioargs
959

    
960
    self.dest_io = dest_io
961
    self.dest_ioargs = dest_ioargs
962

    
963
    self.finished_fn = finished_fn
964

    
965

    
966
class _DiskTransferPrivate(object):
967
  def __init__(self, data, success, export_opts):
968
    """Initializes this class.
969

970
    @type data: L{DiskTransfer}
971
    @type success: bool
972

973
    """
974
    self.data = data
975
    self.success = success
976
    self.export_opts = export_opts
977

    
978
    self.src_export = None
979
    self.dest_import = None
980

    
981
  def RecordResult(self, success):
982
    """Updates the status.
983

984
    One failed part will cause the whole transfer to fail.
985

986
    """
987
    self.success = self.success and success
988

    
989

    
990
def _GetInstDiskMagic(base, instance_name, index):
991
  """Computes the magic value for a disk export or import.
992

993
  @type base: string
994
  @param base: Random seed value (can be the same for all disks of a transfer)
995
  @type instance_name: string
996
  @param instance_name: Name of instance
997
  @type index: number
998
  @param index: Disk index
999

1000
  """
1001
  h = compat.sha1_hash()
1002
  h.update(str(constants.RIE_VERSION))
1003
  h.update(base)
1004
  h.update(instance_name)
1005
  h.update(str(index))
1006
  return h.hexdigest()
1007

    
1008

    
1009
def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010
                         dest_ip, instance, all_transfers):
1011
  """Transfers an instance's data from one node to another.
1012

1013
  @param lu: Logical unit instance
1014
  @param feedback_fn: Feedback function
1015
  @type src_node_uuid: string
1016
  @param src_node_uuid: Source node UUID
1017
  @type dest_node_uuid: string
1018
  @param dest_node_uuid: Destination node UUID
1019
  @type dest_ip: string
1020
  @param dest_ip: IP address of destination node
1021
  @type instance: L{objects.Instance}
1022
  @param instance: Instance object
1023
  @type all_transfers: list of L{DiskTransfer} instances
1024
  @param all_transfers: List of all disk transfers to be made
1025
  @rtype: list
1026
  @return: List with a boolean (True=successful, False=failed) for success for
1027
           each transfer
1028

1029
  """
1030
  # Disable compression for all moves as these are all within the same cluster
1031
  compress = constants.IEC_NONE
1032

    
1033
  src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1034
  dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1035

    
1036
  logging.debug("Source node %s, destination node %s, compression '%s'",
1037
                src_node_name, dest_node_name, compress)
1038

    
1039
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1040
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1041
                                  src_node_uuid, None, dest_node_uuid, dest_ip)
1042
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1043
                                 src_node_uuid, src_cbs, dest_node_uuid,
1044
                                 dest_ip)
1045

    
1046
  all_dtp = []
1047

    
1048
  base_magic = utils.GenerateSecret(6)
1049

    
1050
  ieloop = ImportExportLoop(lu)
1051
  try:
1052
    for idx, transfer in enumerate(all_transfers):
1053
      if transfer:
1054
        feedback_fn("Exporting %s from %s to %s" %
1055
                    (transfer.name, src_node_name, dest_node_name))
1056

    
1057
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1058
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1059
                                           compress=compress, magic=magic)
1060

    
1061
        dtp = _DiskTransferPrivate(transfer, True, opts)
1062

    
1063
        di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1064
                        transfer.dest_io, transfer.dest_ioargs,
1065
                        timeouts, dest_cbs, private=dtp)
1066
        ieloop.Add(di)
1067

    
1068
        dtp.dest_import = di
1069
      else:
1070
        dtp = _DiskTransferPrivate(None, False, None)
1071

    
1072
      all_dtp.append(dtp)
1073

    
1074
    ieloop.Run()
1075
  finally:
1076
    ieloop.FinalizeAll()
1077

    
1078
  assert len(all_dtp) == len(all_transfers)
1079
  assert compat.all((dtp.src_export is None or
1080
                      dtp.src_export.success is not None) and
1081
                     (dtp.dest_import is None or
1082
                      dtp.dest_import.success is not None)
1083
                     for dtp in all_dtp), \
1084
         "Not all imports/exports are finalized"
1085

    
1086
  return [bool(dtp.success) for dtp in all_dtp]
1087

    
1088

    
1089
class _RemoteExportCb(ImportExportCbBase):
1090
  def __init__(self, feedback_fn, disk_count):
1091
    """Initializes this class.
1092

1093
    """
1094
    ImportExportCbBase.__init__(self)
1095
    self._feedback_fn = feedback_fn
1096
    self._dresults = [None] * disk_count
1097

    
1098
  @property
1099
  def disk_results(self):
1100
    """Returns per-disk results.
1101

1102
    """
1103
    return self._dresults
1104

    
1105
  def ReportConnected(self, ie, private):
1106
    """Called when a connection has been established.
1107

1108
    """
1109
    (idx, _) = private
1110

    
1111
    self._feedback_fn("Disk %s is now sending data" % idx)
1112

    
1113
  def ReportProgress(self, ie, private):
1114
    """Called when new progress information should be reported.
1115

1116
    """
1117
    (idx, _) = private
1118

    
1119
    progress = ie.progress
1120
    if not progress:
1121
      return
1122

    
1123
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1124

    
1125
  def ReportFinished(self, ie, private):
1126
    """Called when a transfer has finished.
1127

1128
    """
1129
    (idx, finished_fn) = private
1130

    
1131
    if ie.success:
1132
      self._feedback_fn("Disk %s finished sending data" % idx)
1133
    else:
1134
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1135
                        (idx, ie.final_message, ie.recent_output))
1136

    
1137
    self._dresults[idx] = bool(ie.success)
1138

    
1139
    if finished_fn:
1140
      finished_fn()
1141

    
1142

    
1143
class ExportInstanceHelper:
1144
  def __init__(self, lu, feedback_fn, instance):
1145
    """Initializes this class.
1146

1147
    @param lu: Logical unit instance
1148
    @param feedback_fn: Feedback function
1149
    @type instance: L{objects.Instance}
1150
    @param instance: Instance object
1151

1152
    """
1153
    self._lu = lu
1154
    self._feedback_fn = feedback_fn
1155
    self._instance = instance
1156

    
1157
    self._snap_disks = []
1158
    self._removed_snaps = [False] * len(instance.disks)
1159

    
1160
  def CreateSnapshots(self):
1161
    """Creates an LVM snapshot for every disk of the instance.
1162

1163
    """
1164
    assert not self._snap_disks
1165

    
1166
    instance = self._instance
1167
    src_node = instance.primary_node
1168

    
1169
    for idx, disk in enumerate(instance.disks):
1170
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171
                        (idx, src_node))
1172

    
1173
      # result.payload will be a snapshot of an lvm leaf of the one we
1174
      # passed
1175
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176
      new_dev = False
1177
      msg = result.fail_msg
1178
      if msg:
1179
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180
                            idx, src_node, msg)
1181
      elif (not isinstance(result.payload, (tuple, list)) or
1182
            len(result.payload) != 2):
1183
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184
                            " result '%s'", idx, src_node, result.payload)
1185
      else:
1186
        disk_id = tuple(result.payload)
1187
        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1188
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1189
                               logical_id=disk_id, physical_id=disk_id,
1190
                               iv_name=disk.iv_name,
1191
                               params=disk_params)
1192

    
1193
      self._snap_disks.append(new_dev)
1194

    
1195
    assert len(self._snap_disks) == len(instance.disks)
1196
    assert len(self._removed_snaps) == len(instance.disks)
1197

    
1198
  def _RemoveSnapshot(self, disk_index):
1199
    """Removes an LVM snapshot.
1200

1201
    @type disk_index: number
1202
    @param disk_index: Index of the snapshot to be removed
1203

1204
    """
1205
    disk = self._snap_disks[disk_index]
1206
    if disk and not self._removed_snaps[disk_index]:
1207
      src_node = self._instance.primary_node
1208

    
1209
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210
                        (disk_index, src_node))
1211

    
1212
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1213
      if result.fail_msg:
1214
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1215
                            " %s: %s", disk_index, src_node, result.fail_msg)
1216
      else:
1217
        self._removed_snaps[disk_index] = True
1218

    
1219
  def LocalExport(self, dest_node):
1220
    """Intra-cluster instance export.
1221

1222
    @type dest_node: L{objects.Node}
1223
    @param dest_node: Destination node
1224

1225
    """
1226
    instance = self._instance
1227
    src_node_uuid = instance.primary_node
1228

    
1229
    assert len(self._snap_disks) == len(instance.disks)
1230

    
1231
    transfers = []
1232

    
1233
    for idx, dev in enumerate(self._snap_disks):
1234
      if not dev:
1235
        transfers.append(None)
1236
        continue
1237

    
1238
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1239
                            dev.physical_id[1])
1240

    
1241
      finished_fn = compat.partial(self._TransferFinished, idx)
1242

    
1243
      # FIXME: pass debug option from opcode to backend
1244
      dt = DiskTransfer("snapshot/%s" % idx,
1245
                        constants.IEIO_SCRIPT, (dev, idx),
1246
                        constants.IEIO_FILE, (path, ),
1247
                        finished_fn)
1248
      transfers.append(dt)
1249

    
1250
    # Actually export data
1251
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1252
                                    src_node_uuid, dest_node.uuid,
1253
                                    dest_node.secondary_ip,
1254
                                    instance, transfers)
1255

    
1256
    assert len(dresults) == len(instance.disks)
1257

    
1258
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1259
    result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1260
                                               self._snap_disks)
1261
    msg = result.fail_msg
1262
    fin_resu = not msg
1263
    if msg:
1264
      self._lu.LogWarning("Could not finalize export for instance %s"
1265
                          " on node %s: %s", instance.name, dest_node.name, msg)
1266

    
1267
    return (fin_resu, dresults)
1268

    
1269
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1270
    """Inter-cluster instance export.
1271

1272
    @type disk_info: list
1273
    @param disk_info: Per-disk destination information
1274
    @type key_name: string
1275
    @param key_name: Name of X509 key to use
1276
    @type dest_ca_pem: string
1277
    @param dest_ca_pem: Destination X509 CA in PEM format
1278
    @type timeouts: L{ImportExportTimeouts}
1279
    @param timeouts: Timeouts for this import
1280

1281
    """
1282
    instance = self._instance
1283

    
1284
    assert len(disk_info) == len(instance.disks)
1285

    
1286
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1287

    
1288
    ieloop = ImportExportLoop(self._lu)
1289
    try:
1290
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1291
                                                           disk_info)):
1292
        # Decide whether to use IPv6
1293
        ipv6 = netutils.IP6Address.IsValid(host)
1294

    
1295
        opts = objects.ImportExportOptions(key_name=key_name,
1296
                                           ca_pem=dest_ca_pem,
1297
                                           magic=magic, ipv6=ipv6)
1298

    
1299
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1300
        finished_fn = compat.partial(self._TransferFinished, idx)
1301
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1302
                              opts, host, port, instance, "disk%d" % idx,
1303
                              constants.IEIO_SCRIPT, (dev, idx),
1304
                              timeouts, cbs, private=(idx, finished_fn)))
1305

    
1306
      ieloop.Run()
1307
    finally:
1308
      ieloop.FinalizeAll()
1309

    
1310
    return (True, cbs.disk_results)
1311

    
1312
  def _TransferFinished(self, idx):
1313
    """Called once a transfer has finished.
1314

1315
    @type idx: number
1316
    @param idx: Disk index
1317

1318
    """
1319
    logging.debug("Transfer %s finished", idx)
1320
    self._RemoveSnapshot(idx)
1321

    
1322
  def Cleanup(self):
1323
    """Remove all snapshots.
1324

1325
    """
1326
    assert len(self._removed_snaps) == len(self._instance.disks)
1327
    for idx in range(len(self._instance.disks)):
1328
      self._RemoveSnapshot(idx)
1329

    
1330

    
1331
class _RemoteImportCb(ImportExportCbBase):
1332
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1333
               external_address):
1334
    """Initializes this class.
1335

1336
    @type cds: string
1337
    @param cds: Cluster domain secret
1338
    @type x509_cert_pem: string
1339
    @param x509_cert_pem: CA used for signing import key
1340
    @type disk_count: number
1341
    @param disk_count: Number of disks
1342
    @type external_address: string
1343
    @param external_address: External address of destination node
1344

1345
    """
1346
    ImportExportCbBase.__init__(self)
1347
    self._feedback_fn = feedback_fn
1348
    self._cds = cds
1349
    self._x509_cert_pem = x509_cert_pem
1350
    self._disk_count = disk_count
1351
    self._external_address = external_address
1352

    
1353
    self._dresults = [None] * disk_count
1354
    self._daemon_port = [None] * disk_count
1355

    
1356
    self._salt = utils.GenerateSecret(8)
1357

    
1358
  @property
1359
  def disk_results(self):
1360
    """Returns per-disk results.
1361

1362
    """
1363
    return self._dresults
1364

    
1365
  def _CheckAllListening(self):
1366
    """Checks whether all daemons are listening.
1367

1368
    If all daemons are listening, the information is sent to the client.
1369

1370
    """
1371
    if not compat.all(dp is not None for dp in self._daemon_port):
1372
      return
1373

    
1374
    host = self._external_address
1375

    
1376
    disks = []
1377
    for idx, (port, magic) in enumerate(self._daemon_port):
1378
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1379
                                               idx, host, port, magic))
1380

    
1381
    assert len(disks) == self._disk_count
1382

    
1383
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1384
      "disks": disks,
1385
      "x509_ca": self._x509_cert_pem,
1386
      })
1387

    
1388
  def ReportListening(self, ie, private, _):
1389
    """Called when daemon started listening.
1390

1391
    """
1392
    (idx, ) = private
1393

    
1394
    self._feedback_fn("Disk %s is now listening" % idx)
1395

    
1396
    assert self._daemon_port[idx] is None
1397

    
1398
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1399

    
1400
    self._CheckAllListening()
1401

    
1402
  def ReportConnected(self, ie, private):
1403
    """Called when a connection has been established.
1404

1405
    """
1406
    (idx, ) = private
1407

    
1408
    self._feedback_fn("Disk %s is now receiving data" % idx)
1409

    
1410
  def ReportFinished(self, ie, private):
1411
    """Called when a transfer has finished.
1412

1413
    """
1414
    (idx, ) = private
1415

    
1416
    # Daemon is certainly no longer listening
1417
    self._daemon_port[idx] = None
1418

    
1419
    if ie.success:
1420
      self._feedback_fn("Disk %s finished receiving data" % idx)
1421
    else:
1422
      self._feedback_fn(("Disk %s failed to receive data: %s"
1423
                         " (recent output: %s)") %
1424
                        (idx, ie.final_message, ie.recent_output))
1425

    
1426
    self._dresults[idx] = bool(ie.success)
1427

    
1428

    
1429
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1430
                 cds, timeouts):
1431
  """Imports an instance from another cluster.
1432

1433
  @param lu: Logical unit instance
1434
  @param feedback_fn: Feedback function
1435
  @type instance: L{objects.Instance}
1436
  @param instance: Instance object
1437
  @type pnode: L{objects.Node}
1438
  @param pnode: Primary node of instance as an object
1439
  @type source_x509_ca: OpenSSL.crypto.X509
1440
  @param source_x509_ca: Import source's X509 CA
1441
  @type cds: string
1442
  @param cds: Cluster domain secret
1443
  @type timeouts: L{ImportExportTimeouts}
1444
  @param timeouts: Timeouts for this import
1445

1446
  """
1447
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1448
                                                  source_x509_ca)
1449

    
1450
  magic_base = utils.GenerateSecret(6)
1451

    
1452
  # Decide whether to use IPv6
1453
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1454

    
1455
  # Create crypto key
1456
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1457
                                        constants.RIE_CERT_VALIDITY)
1458
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1459

    
1460
  (x509_key_name, x509_cert_pem) = result.payload
1461
  try:
1462
    # Load certificate
1463
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1464
                                                x509_cert_pem)
1465

    
1466
    # Sign certificate
1467
    signed_x509_cert_pem = \
1468
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1469

    
1470
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1471
                          len(instance.disks), pnode.primary_ip)
1472

    
1473
    ieloop = ImportExportLoop(lu)
1474
    try:
1475
      for idx, dev in enumerate(instance.disks):
1476
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1477

    
1478
        # Import daemon options
1479
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1480
                                           ca_pem=source_ca_pem,
1481
                                           magic=magic, ipv6=ipv6)
1482

    
1483
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1484
                              "disk%d" % idx,
1485
                              constants.IEIO_SCRIPT, (dev, idx),
1486
                              timeouts, cbs, private=(idx, )))
1487

    
1488
      ieloop.Run()
1489
    finally:
1490
      ieloop.FinalizeAll()
1491
  finally:
1492
    # Remove crypto key and certificate
1493
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1494
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1495

    
1496
  return cbs.disk_results
1497

    
1498

    
1499
def _GetImportExportHandshakeMessage(version):
1500
  """Returns the handshake message for a RIE protocol version.
1501

1502
  @type version: number
1503

1504
  """
1505
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1506

    
1507

    
1508
def ComputeRemoteExportHandshake(cds):
1509
  """Computes the remote import/export handshake.
1510

1511
  @type cds: string
1512
  @param cds: Cluster domain secret
1513

1514
  """
1515
  salt = utils.GenerateSecret(8)
1516
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1517
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1518

    
1519

    
1520
def CheckRemoteExportHandshake(cds, handshake):
1521
  """Checks the handshake of a remote import/export.
1522

1523
  @type cds: string
1524
  @param cds: Cluster domain secret
1525
  @type handshake: sequence
1526
  @param handshake: Handshake sent by remote peer
1527

1528
  """
1529
  try:
1530
    (version, hmac_digest, hmac_salt) = handshake
1531
  except (TypeError, ValueError), err:
1532
    return "Invalid data: %s" % err
1533

    
1534
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1535
                              hmac_digest, salt=hmac_salt):
1536
    return "Hash didn't match, clusters don't share the same domain secret"
1537

    
1538
  if version != constants.RIE_VERSION:
1539
    return ("Clusters don't have the same remote import/export protocol"
1540
            " (local=%s, remote=%s)" %
1541
            (constants.RIE_VERSION, version))
1542

    
1543
  return None
1544

    
1545

    
1546
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1547
  """Returns the hashed text for import/export disk information.
1548

1549
  @type disk_index: number
1550
  @param disk_index: Index of disk (included in hash)
1551
  @type host: string
1552
  @param host: Hostname
1553
  @type port: number
1554
  @param port: Daemon port
1555
  @type magic: string
1556
  @param magic: Magic value
1557

1558
  """
1559
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1560

    
1561

    
1562
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1563
  """Verifies received disk information for an export.
1564

1565
  @type cds: string
1566
  @param cds: Cluster domain secret
1567
  @type disk_index: number
1568
  @param disk_index: Index of disk (included in hash)
1569
  @type disk_info: sequence
1570
  @param disk_info: Disk information sent by remote peer
1571

1572
  """
1573
  try:
1574
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1575
  except (TypeError, ValueError), err:
1576
    raise errors.GenericError("Invalid data: %s" % err)
1577

    
1578
  if not (host and port and magic):
1579
    raise errors.GenericError("Missing destination host, port or magic")
1580

    
1581
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1582

    
1583
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1584
    raise errors.GenericError("HMAC is wrong")
1585

    
1586
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1587
    destination = host
1588
  else:
1589
    destination = netutils.Hostname.GetNormalizedName(host)
1590

    
1591
  return (destination,
1592
          utils.ValidateServiceName(port),
1593
          magic)
1594

    
1595

    
1596
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1597
  """Computes the signed disk information for a remote import.
1598

1599
  @type cds: string
1600
  @param cds: Cluster domain secret
1601
  @type salt: string
1602
  @param salt: HMAC salt
1603
  @type disk_index: number
1604
  @param disk_index: Index of disk (included in hash)
1605
  @type host: string
1606
  @param host: Hostname
1607
  @type port: number
1608
  @param port: Daemon port
1609
  @type magic: string
1610
  @param magic: Magic value
1611

1612
  """
1613
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1614
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1615
  return (host, port, magic, hmac_digest, salt)
1616

    
1617

    
1618
def CalculateGroupIPolicy(cluster, group):
1619
  """Calculate instance policy for group.
1620

1621
  """
1622
  return cluster.SimpleFillIPolicy(group.ipolicy)
1623

    
1624

    
1625
def ComputeDiskSize(disk_template, disks):
1626
  """Compute disk size requirements according to disk template
1627

1628
  """
1629
  # Required free disk space as a function of disk and swap space
1630
  req_size_dict = {
1631
    constants.DT_DISKLESS: 0,
1632
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1633
    # 128 MB are added for drbd metadata for each disk
1634
    constants.DT_DRBD8:
1635
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1636
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1637
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1638
    constants.DT_BLOCK: 0,
1639
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1640
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1641
  }
1642

    
1643
  if disk_template not in req_size_dict:
1644
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1645
                                 " is unknown" % disk_template)
1646

    
1647
  return req_size_dict[disk_template]