Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 06c2fb4a

History | View | Annotate | Download (46.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37

    
38

    
39
class _ImportExportError(Exception):
40
  """Local exception to report import/export errors.
41

42
  """
43

    
44

    
45
class ImportExportTimeouts(object):
46
  #: Time until daemon starts writing status file
47
  DEFAULT_READY_TIMEOUT = 10
48

    
49
  #: Length of time until errors cause hard failure
50
  DEFAULT_ERROR_TIMEOUT = 10
51

    
52
  #: Time after which daemon must be listening
53
  DEFAULT_LISTEN_TIMEOUT = 10
54

    
55
  #: Progress update interval
56
  DEFAULT_PROGRESS_INTERVAL = 60
57

    
58
  __slots__ = [
59
    "error",
60
    "ready",
61
    "listen",
62
    "connect",
63
    "progress",
64
    ]
65

    
66
  def __init__(self, connect,
67
               listen=DEFAULT_LISTEN_TIMEOUT,
68
               error=DEFAULT_ERROR_TIMEOUT,
69
               ready=DEFAULT_READY_TIMEOUT,
70
               progress=DEFAULT_PROGRESS_INTERVAL):
71
    """Initializes this class.
72

73
    @type connect: number
74
    @param connect: Timeout for establishing connection
75
    @type listen: number
76
    @param listen: Timeout for starting to listen for connections
77
    @type error: number
78
    @param error: Length of time until errors cause hard failure
79
    @type ready: number
80
    @param ready: Timeout for daemon to become ready
81
    @type progress: number
82
    @param progress: Progress update interval
83

84
    """
85
    self.error = error
86
    self.ready = ready
87
    self.listen = listen
88
    self.connect = connect
89
    self.progress = progress
90

    
91

    
92
class ImportExportCbBase(object):
93
  """Callbacks for disk import/export.
94

95
  """
96
  def ReportListening(self, ie, private, component):
97
    """Called when daemon started listening.
98

99
    @type ie: Subclass of L{_DiskImportExportBase}
100
    @param ie: Import/export object
101
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103

104
    """
105

    
106
  def ReportConnected(self, ie, private):
107
    """Called when a connection has been established.
108

109
    @type ie: Subclass of L{_DiskImportExportBase}
110
    @param ie: Import/export object
111
    @param private: Private data passed to import/export object
112

113
    """
114

    
115
  def ReportProgress(self, ie, private):
116
    """Called when new progress information should be reported.
117

118
    @type ie: Subclass of L{_DiskImportExportBase}
119
    @param ie: Import/export object
120
    @param private: Private data passed to import/export object
121

122
    """
123

    
124
  def ReportFinished(self, ie, private):
125
    """Called when a transfer has finished.
126

127
    @type ie: Subclass of L{_DiskImportExportBase}
128
    @param ie: Import/export object
129
    @param private: Private data passed to import/export object
130

131
    """
132

    
133

    
134
class _DiskImportExportBase(object):
135
  MODE_TEXT = None
136

    
137
  def __init__(self, lu, node_name, opts,
138
               instance, component, timeouts, cbs, private=None):
139
    """Initializes this class.
140

141
    @param lu: Logical unit instance
142
    @type node_name: string
143
    @param node_name: Node name for import
144
    @type opts: L{objects.ImportExportOptions}
145
    @param opts: Import/export daemon options
146
    @type instance: L{objects.Instance}
147
    @param instance: Instance object
148
    @type component: string
149
    @param component: which part of the instance is being imported
150
    @type timeouts: L{ImportExportTimeouts}
151
    @param timeouts: Timeouts for this import
152
    @type cbs: L{ImportExportCbBase}
153
    @param cbs: Callbacks
154
    @param private: Private data for callback functions
155

156
    """
157
    assert self.MODE_TEXT
158

    
159
    self._lu = lu
160
    self.node_name = node_name
161
    self._opts = opts.Copy()
162
    self._instance = instance
163
    self._component = component
164
    self._timeouts = timeouts
165
    self._cbs = cbs
166
    self._private = private
167

    
168
    # Set master daemon's timeout in options for import/export daemon
169
    assert self._opts.connect_timeout is None
170
    self._opts.connect_timeout = timeouts.connect
171

    
172
    # Parent loop
173
    self._loop = None
174

    
175
    # Timestamps
176
    self._ts_begin = None
177
    self._ts_connected = None
178
    self._ts_finished = None
179
    self._ts_cleanup = None
180
    self._ts_last_progress = None
181
    self._ts_last_error = None
182

    
183
    # Transfer status
184
    self.success = None
185
    self.final_message = None
186

    
187
    # Daemon status
188
    self._daemon_name = None
189
    self._daemon = None
190

    
191
  @property
192
  def recent_output(self):
193
    """Returns the most recent output from the daemon.
194

195
    """
196
    if self._daemon:
197
      return "\n".join(self._daemon.recent_output)
198

    
199
    return None
200

    
201
  @property
202
  def progress(self):
203
    """Returns transfer progress information.
204

205
    """
206
    if not self._daemon:
207
      return None
208

    
209
    return (self._daemon.progress_mbytes,
210
            self._daemon.progress_throughput,
211
            self._daemon.progress_percent,
212
            self._daemon.progress_eta)
213

    
214
  @property
215
  def magic(self):
216
    """Returns the magic value for this import/export.
217

218
    """
219
    return self._opts.magic
220

    
221
  @property
222
  def active(self):
223
    """Determines whether this transport is still active.
224

225
    """
226
    return self.success is None
227

    
228
  @property
229
  def loop(self):
230
    """Returns parent loop.
231

232
    @rtype: L{ImportExportLoop}
233

234
    """
235
    return self._loop
236

    
237
  def SetLoop(self, loop):
238
    """Sets the parent loop.
239

240
    @type loop: L{ImportExportLoop}
241

242
    """
243
    if self._loop:
244
      raise errors.ProgrammerError("Loop can only be set once")
245

    
246
    self._loop = loop
247

    
248
  def _StartDaemon(self):
249
    """Starts the import/export daemon.
250

251
    """
252
    raise NotImplementedError()
253

    
254
  def CheckDaemon(self):
255
    """Checks whether daemon has been started and if not, starts it.
256

257
    @rtype: string
258
    @return: Daemon name
259

260
    """
261
    assert self._ts_cleanup is None
262

    
263
    if self._daemon_name is None:
264
      assert self._ts_begin is None
265

    
266
      result = self._StartDaemon()
267
      if result.fail_msg:
268
        raise _ImportExportError("Failed to start %s on %s: %s" %
269
                                 (self.MODE_TEXT, self.node_name,
270
                                  result.fail_msg))
271

    
272
      daemon_name = result.payload
273

    
274
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
275
                   self.node_name)
276

    
277
      self._ts_begin = time.time()
278
      self._daemon_name = daemon_name
279

    
280
    return self._daemon_name
281

    
282
  def GetDaemonName(self):
283
    """Returns the daemon name.
284

285
    """
286
    assert self._daemon_name, "Daemon has not been started"
287
    assert self._ts_cleanup is None
288
    return self._daemon_name
289

    
290
  def Abort(self):
291
    """Sends SIGTERM to import/export daemon (if still active).
292

293
    """
294
    if self._daemon_name:
295
      self._lu.LogWarning("Aborting %s '%s' on %s",
296
                          self.MODE_TEXT, self._daemon_name, self.node_name)
297
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
298
      if result.fail_msg:
299
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
300
                            self.MODE_TEXT, self._daemon_name,
301
                            self.node_name, result.fail_msg)
302
        return False
303

    
304
    return True
305

    
306
  def _SetDaemonData(self, data):
307
    """Internal function for updating status daemon data.
308

309
    @type data: L{objects.ImportExportStatus}
310
    @param data: Daemon status data
311

312
    """
313
    assert self._ts_begin is not None
314

    
315
    if not data:
316
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
317
        raise _ImportExportError("Didn't become ready after %s seconds" %
318
                                 self._timeouts.ready)
319

    
320
      return False
321

    
322
    self._daemon = data
323

    
324
    return True
325

    
326
  def SetDaemonData(self, success, data):
327
    """Updates daemon status data.
328

329
    @type success: bool
330
    @param success: Whether fetching data was successful or not
331
    @type data: L{objects.ImportExportStatus}
332
    @param data: Daemon status data
333

334
    """
335
    if not success:
336
      if self._ts_last_error is None:
337
        self._ts_last_error = time.time()
338

    
339
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
340
        raise _ImportExportError("Too many errors while updating data")
341

    
342
      return False
343

    
344
    self._ts_last_error = None
345

    
346
    return self._SetDaemonData(data)
347

    
348
  def CheckListening(self):
349
    """Checks whether the daemon is listening.
350

351
    """
352
    raise NotImplementedError()
353

    
354
  def _GetConnectedCheckEpoch(self):
355
    """Returns timeout to calculate connect timeout.
356

357
    """
358
    raise NotImplementedError()
359

    
360
  def CheckConnected(self):
361
    """Checks whether the daemon is connected.
362

363
    @rtype: bool
364
    @return: Whether the daemon is connected
365

366
    """
367
    assert self._daemon, "Daemon status missing"
368

    
369
    if self._ts_connected is not None:
370
      return True
371

    
372
    if self._daemon.connected:
373
      self._ts_connected = time.time()
374

    
375
      # TODO: Log remote peer
376
      logging.debug("%s '%s' on %s is now connected",
377
                    self.MODE_TEXT, self._daemon_name, self.node_name)
378

    
379
      self._cbs.ReportConnected(self, self._private)
380

    
381
      return True
382

    
383
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
384
                            self._timeouts.connect):
385
      raise _ImportExportError("Not connected after %s seconds" %
386
                               self._timeouts.connect)
387

    
388
    return False
389

    
390
  def _CheckProgress(self):
391
    """Checks whether a progress update should be reported.
392

393
    """
394
    if ((self._ts_last_progress is None or
395
        utils.TimeoutExpired(self._ts_last_progress,
396
                             self._timeouts.progress)) and
397
        self._daemon and
398
        self._daemon.progress_mbytes is not None and
399
        self._daemon.progress_throughput is not None):
400
      self._cbs.ReportProgress(self, self._private)
401
      self._ts_last_progress = time.time()
402

    
403
  def CheckFinished(self):
404
    """Checks whether the daemon exited.
405

406
    @rtype: bool
407
    @return: Whether the transfer is finished
408

409
    """
410
    assert self._daemon, "Daemon status missing"
411

    
412
    if self._ts_finished:
413
      return True
414

    
415
    if self._daemon.exit_status is None:
416
      # TODO: Adjust delay for ETA expiring soon
417
      self._CheckProgress()
418
      return False
419

    
420
    self._ts_finished = time.time()
421

    
422
    self._ReportFinished(self._daemon.exit_status == 0,
423
                         self._daemon.error_message)
424

    
425
    return True
426

    
427
  def _ReportFinished(self, success, message):
428
    """Transfer is finished or daemon exited.
429

430
    @type success: bool
431
    @param success: Whether the transfer was successful
432
    @type message: string
433
    @param message: Error message
434

435
    """
436
    assert self.success is None
437

    
438
    self.success = success
439
    self.final_message = message
440

    
441
    if success:
442
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
443
                   self._daemon_name, self.node_name)
444
    elif self._daemon_name:
445
      self._lu.LogWarning("%s '%s' on %s failed: %s",
446
                          self.MODE_TEXT, self._daemon_name, self.node_name,
447
                          message)
448
    else:
449
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450
                          self.node_name, message)
451

    
452
    self._cbs.ReportFinished(self, self._private)
453

    
454
  def _Finalize(self):
455
    """Makes the RPC call to finalize this import/export.
456

457
    """
458
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459

    
460
  def Finalize(self, error=None):
461
    """Finalizes this import/export.
462

463
    """
464
    if self._daemon_name:
465
      logging.info("Finalizing %s '%s' on %s",
466
                   self.MODE_TEXT, self._daemon_name, self.node_name)
467

    
468
      result = self._Finalize()
469
      if result.fail_msg:
470
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471
                            self.MODE_TEXT, self._daemon_name,
472
                            self.node_name, result.fail_msg)
473
        return False
474

    
475
      # Daemon is no longer running
476
      self._daemon_name = None
477
      self._ts_cleanup = time.time()
478

    
479
    if error:
480
      self._ReportFinished(False, error)
481

    
482
    return True
483

    
484

    
485
class DiskImport(_DiskImportExportBase):
486
  MODE_TEXT = "import"
487

    
488
  def __init__(self, lu, node_name, opts, instance, component,
489
               dest, dest_args, timeouts, cbs, private=None):
490
    """Initializes this class.
491

492
    @param lu: Logical unit instance
493
    @type node_name: string
494
    @param node_name: Node name for import
495
    @type opts: L{objects.ImportExportOptions}
496
    @param opts: Import/export daemon options
497
    @type instance: L{objects.Instance}
498
    @param instance: Instance object
499
    @type component: string
500
    @param component: which part of the instance is being imported
501
    @param dest: I/O destination
502
    @param dest_args: I/O arguments
503
    @type timeouts: L{ImportExportTimeouts}
504
    @param timeouts: Timeouts for this import
505
    @type cbs: L{ImportExportCbBase}
506
    @param cbs: Callbacks
507
    @param private: Private data for callback functions
508

509
    """
510
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
511
                                   component, timeouts, cbs, private)
512
    self._dest = dest
513
    self._dest_args = dest_args
514

    
515
    # Timestamps
516
    self._ts_listening = None
517

    
518
  @property
519
  def listen_port(self):
520
    """Returns the port the daemon is listening on.
521

522
    """
523
    if self._daemon:
524
      return self._daemon.listen_port
525

    
526
    return None
527

    
528
  def _StartDaemon(self):
529
    """Starts the import daemon.
530

531
    """
532
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
533
                                          self._instance, self._component,
534
                                          (self._dest, self._dest_args))
535

    
536
  def CheckListening(self):
537
    """Checks whether the daemon is listening.
538

539
    @rtype: bool
540
    @return: Whether the daemon is listening
541

542
    """
543
    assert self._daemon, "Daemon status missing"
544

    
545
    if self._ts_listening is not None:
546
      return True
547

    
548
    port = self._daemon.listen_port
549
    if port is not None:
550
      self._ts_listening = time.time()
551

    
552
      logging.debug("Import '%s' on %s is now listening on port %s",
553
                    self._daemon_name, self.node_name, port)
554

    
555
      self._cbs.ReportListening(self, self._private, self._component)
556

    
557
      return True
558

    
559
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
560
      raise _ImportExportError("Not listening after %s seconds" %
561
                               self._timeouts.listen)
562

    
563
    return False
564

    
565
  def _GetConnectedCheckEpoch(self):
566
    """Returns the time since we started listening.
567

568
    """
569
    assert self._ts_listening is not None, \
570
           ("Checking whether an import is connected is only useful"
571
            " once it's been listening")
572

    
573
    return self._ts_listening
574

    
575

    
576
class DiskExport(_DiskImportExportBase):
577
  MODE_TEXT = "export"
578

    
579
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
580
               instance, component, source, source_args,
581
               timeouts, cbs, private=None):
582
    """Initializes this class.
583

584
    @param lu: Logical unit instance
585
    @type node_name: string
586
    @param node_name: Node name for import
587
    @type opts: L{objects.ImportExportOptions}
588
    @param opts: Import/export daemon options
589
    @type dest_host: string
590
    @param dest_host: Destination host name or IP address
591
    @type dest_port: number
592
    @param dest_port: Destination port number
593
    @type instance: L{objects.Instance}
594
    @param instance: Instance object
595
    @type component: string
596
    @param component: which part of the instance is being imported
597
    @param source: I/O source
598
    @param source_args: I/O source
599
    @type timeouts: L{ImportExportTimeouts}
600
    @param timeouts: Timeouts for this import
601
    @type cbs: L{ImportExportCbBase}
602
    @param cbs: Callbacks
603
    @param private: Private data for callback functions
604

605
    """
606
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
607
                                   component, timeouts, cbs, private)
608
    self._dest_host = dest_host
609
    self._dest_port = dest_port
610
    self._source = source
611
    self._source_args = source_args
612

    
613
  def _StartDaemon(self):
614
    """Starts the export daemon.
615

616
    """
617
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
618
                                          self._dest_host, self._dest_port,
619
                                          self._instance, self._component,
620
                                          (self._source, self._source_args))
621

    
622
  def CheckListening(self):
623
    """Checks whether the daemon is listening.
624

625
    """
626
    # Only an import can be listening
627
    return True
628

    
629
  def _GetConnectedCheckEpoch(self):
630
    """Returns the time since the daemon started.
631

632
    """
633
    assert self._ts_begin is not None
634

    
635
    return self._ts_begin
636

    
637

    
638
def FormatProgress(progress):
639
  """Formats progress information for user consumption
640

641
  """
642
  (mbytes, throughput, percent, eta) = progress
643

    
644
  parts = [
645
    utils.FormatUnit(mbytes, "h"),
646

    
647
    # Not using FormatUnit as it doesn't support kilobytes
648
    "%0.1f MiB/s" % throughput,
649
    ]
650

    
651
  if percent is not None:
652
    parts.append("%d%%" % percent)
653

    
654
  if eta is not None:
655
    parts.append("ETA %s" % utils.FormatSeconds(eta))
656

    
657
  return utils.CommaJoin(parts)
658

    
659

    
660
class ImportExportLoop:
661
  MIN_DELAY = 1.0
662
  MAX_DELAY = 20.0
663

    
664
  def __init__(self, lu):
665
    """Initializes this class.
666

667
    """
668
    self._lu = lu
669
    self._queue = []
670
    self._pending_add = []
671

    
672
  def Add(self, diskie):
673
    """Adds an import/export object to the loop.
674

675
    @type diskie: Subclass of L{_DiskImportExportBase}
676
    @param diskie: Import/export object
677

678
    """
679
    assert diskie not in self._pending_add
680
    assert diskie.loop is None
681

    
682
    diskie.SetLoop(self)
683

    
684
    # Adding new objects to a staging list is necessary, otherwise the main
685
    # loop gets confused if callbacks modify the queue while the main loop is
686
    # iterating over it.
687
    self._pending_add.append(diskie)
688

    
689
  @staticmethod
690
  def _CollectDaemonStatus(lu, daemons):
691
    """Collects the status for all import/export daemons.
692

693
    """
694
    daemon_status = {}
695

    
696
    for node_name, names in daemons.iteritems():
697
      result = lu.rpc.call_impexp_status(node_name, names)
698
      if result.fail_msg:
699
        lu.LogWarning("Failed to get daemon status on %s: %s",
700
                      node_name, result.fail_msg)
701
        continue
702

    
703
      assert len(names) == len(result.payload)
704

    
705
      daemon_status[node_name] = dict(zip(names, result.payload))
706

    
707
    return daemon_status
708

    
709
  @staticmethod
710
  def _GetActiveDaemonNames(queue):
711
    """Gets the names of all active daemons.
712

713
    """
714
    result = {}
715
    for diskie in queue:
716
      if not diskie.active:
717
        continue
718

    
719
      try:
720
        # Start daemon if necessary
721
        daemon_name = diskie.CheckDaemon()
722
      except _ImportExportError, err:
723
        logging.exception("%s failed", diskie.MODE_TEXT)
724
        diskie.Finalize(error=str(err))
725
        continue
726

    
727
      result.setdefault(diskie.node_name, []).append(daemon_name)
728

    
729
    assert len(queue) >= len(result)
730
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
731

    
732
    logging.debug("daemons=%r", result)
733

    
734
    return result
735

    
736
  def _AddPendingToQueue(self):
737
    """Adds all pending import/export objects to the internal queue.
738

739
    """
740
    assert compat.all(diskie not in self._queue and diskie.loop == self
741
                      for diskie in self._pending_add)
742

    
743
    self._queue.extend(self._pending_add)
744

    
745
    del self._pending_add[:]
746

    
747
  def Run(self):
748
    """Utility main loop.
749

750
    """
751
    while True:
752
      self._AddPendingToQueue()
753

    
754
      # Collect all active daemon names
755
      daemons = self._GetActiveDaemonNames(self._queue)
756
      if not daemons:
757
        break
758

    
759
      # Collection daemon status data
760
      data = self._CollectDaemonStatus(self._lu, daemons)
761

    
762
      # Use data
763
      delay = self.MAX_DELAY
764
      for diskie in self._queue:
765
        if not diskie.active:
766
          continue
767

    
768
        try:
769
          try:
770
            all_daemon_data = data[diskie.node_name]
771
          except KeyError:
772
            result = diskie.SetDaemonData(False, None)
773
          else:
774
            result = \
775
              diskie.SetDaemonData(True,
776
                                   all_daemon_data[diskie.GetDaemonName()])
777

    
778
          if not result:
779
            # Daemon not yet ready, retry soon
780
            delay = min(3.0, delay)
781
            continue
782

    
783
          if diskie.CheckFinished():
784
            # Transfer finished
785
            diskie.Finalize()
786
            continue
787

    
788
          # Normal case: check again in 5 seconds
789
          delay = min(5.0, delay)
790

    
791
          if not diskie.CheckListening():
792
            # Not yet listening, retry soon
793
            delay = min(1.0, delay)
794
            continue
795

    
796
          if not diskie.CheckConnected():
797
            # Not yet connected, retry soon
798
            delay = min(1.0, delay)
799
            continue
800

    
801
        except _ImportExportError, err:
802
          logging.exception("%s failed", diskie.MODE_TEXT)
803
          diskie.Finalize(error=str(err))
804

    
805
      if not compat.any(diskie.active for diskie in self._queue):
806
        break
807

    
808
      # Wait a bit
809
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
810
      logging.debug("Waiting for %ss", delay)
811
      time.sleep(delay)
812

    
813
  def FinalizeAll(self):
814
    """Finalizes all pending transfers.
815

816
    """
817
    success = True
818

    
819
    for diskie in self._queue:
820
      success = diskie.Finalize() and success
821

    
822
    return success
823

    
824

    
825
class _TransferInstCbBase(ImportExportCbBase):
826
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
827
               dest_node, dest_ip):
828
    """Initializes this class.
829

830
    """
831
    ImportExportCbBase.__init__(self)
832

    
833
    self.lu = lu
834
    self.feedback_fn = feedback_fn
835
    self.instance = instance
836
    self.timeouts = timeouts
837
    self.src_node = src_node
838
    self.src_cbs = src_cbs
839
    self.dest_node = dest_node
840
    self.dest_ip = dest_ip
841

    
842

    
843
class _TransferInstSourceCb(_TransferInstCbBase):
844
  def ReportConnected(self, ie, dtp):
845
    """Called when a connection has been established.
846

847
    """
848
    assert self.src_cbs is None
849
    assert dtp.src_export == ie
850
    assert dtp.dest_import
851

    
852
    self.feedback_fn("%s is sending data on %s" %
853
                     (dtp.data.name, ie.node_name))
854

    
855
  def ReportProgress(self, ie, dtp):
856
    """Called when new progress information should be reported.
857

858
    """
859
    progress = ie.progress
860
    if not progress:
861
      return
862

    
863
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
864

    
865
  def ReportFinished(self, ie, dtp):
866
    """Called when a transfer has finished.
867

868
    """
869
    assert self.src_cbs is None
870
    assert dtp.src_export == ie
871
    assert dtp.dest_import
872

    
873
    if ie.success:
874
      self.feedback_fn("%s finished sending data" % dtp.data.name)
875
    else:
876
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
877
                       (dtp.data.name, ie.final_message, ie.recent_output))
878

    
879
    dtp.RecordResult(ie.success)
880

    
881
    cb = dtp.data.finished_fn
882
    if cb:
883
      cb()
884

    
885
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
886
    # give the daemon a moment to sort things out
887
    if dtp.dest_import and not ie.success:
888
      dtp.dest_import.Abort()
889

    
890

    
891
class _TransferInstDestCb(_TransferInstCbBase):
892
  def ReportListening(self, ie, dtp, component):
893
    """Called when daemon started listening.
894

895
    """
896
    assert self.src_cbs
897
    assert dtp.src_export is None
898
    assert dtp.dest_import
899
    assert dtp.export_opts
900

    
901
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
902

    
903
    # Start export on source node
904
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
905
                    self.dest_ip, ie.listen_port, self.instance,
906
                    component, dtp.data.src_io, dtp.data.src_ioargs,
907
                    self.timeouts, self.src_cbs, private=dtp)
908
    ie.loop.Add(de)
909

    
910
    dtp.src_export = de
911

    
912
  def ReportConnected(self, ie, dtp):
913
    """Called when a connection has been established.
914

915
    """
916
    self.feedback_fn("%s is receiving data on %s" %
917
                     (dtp.data.name, self.dest_node))
918

    
919
  def ReportFinished(self, ie, dtp):
920
    """Called when a transfer has finished.
921

922
    """
923
    if ie.success:
924
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
925
    else:
926
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
927
                       (dtp.data.name, ie.final_message, ie.recent_output))
928

    
929
    dtp.RecordResult(ie.success)
930

    
931
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
932
    # give the daemon a moment to sort things out
933
    if dtp.src_export and not ie.success:
934
      dtp.src_export.Abort()
935

    
936

    
937
class DiskTransfer(object):
938
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
939
               finished_fn):
940
    """Initializes this class.
941

942
    @type name: string
943
    @param name: User-visible name for this transfer (e.g. "disk/0")
944
    @param src_io: Source I/O type
945
    @param src_ioargs: Source I/O arguments
946
    @param dest_io: Destination I/O type
947
    @param dest_ioargs: Destination I/O arguments
948
    @type finished_fn: callable
949
    @param finished_fn: Function called once transfer has finished
950

951
    """
952
    self.name = name
953

    
954
    self.src_io = src_io
955
    self.src_ioargs = src_ioargs
956

    
957
    self.dest_io = dest_io
958
    self.dest_ioargs = dest_ioargs
959

    
960
    self.finished_fn = finished_fn
961

    
962

    
963
class _DiskTransferPrivate(object):
964
  def __init__(self, data, success, export_opts):
965
    """Initializes this class.
966

967
    @type data: L{DiskTransfer}
968
    @type success: bool
969

970
    """
971
    self.data = data
972
    self.success = success
973
    self.export_opts = export_opts
974

    
975
    self.src_export = None
976
    self.dest_import = None
977

    
978
  def RecordResult(self, success):
979
    """Updates the status.
980

981
    One failed part will cause the whole transfer to fail.
982

983
    """
984
    self.success = self.success and success
985

    
986

    
987
def _GetInstDiskMagic(base, instance_name, index):
988
  """Computes the magic value for a disk export or import.
989

990
  @type base: string
991
  @param base: Random seed value (can be the same for all disks of a transfer)
992
  @type instance_name: string
993
  @param instance_name: Name of instance
994
  @type index: number
995
  @param index: Disk index
996

997
  """
998
  h = compat.sha1_hash()
999
  h.update(str(constants.RIE_VERSION))
1000
  h.update(base)
1001
  h.update(instance_name)
1002
  h.update(str(index))
1003
  return h.hexdigest()
1004

    
1005

    
1006
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1007
                         instance, all_transfers):
1008
  """Transfers an instance's data from one node to another.
1009

1010
  @param lu: Logical unit instance
1011
  @param feedback_fn: Feedback function
1012
  @type src_node: string
1013
  @param src_node: Source node name
1014
  @type dest_node: string
1015
  @param dest_node: Destination node name
1016
  @type dest_ip: string
1017
  @param dest_ip: IP address of destination node
1018
  @type instance: L{objects.Instance}
1019
  @param instance: Instance object
1020
  @type all_transfers: list of L{DiskTransfer} instances
1021
  @param all_transfers: List of all disk transfers to be made
1022
  @rtype: list
1023
  @return: List with a boolean (True=successful, False=failed) for success for
1024
           each transfer
1025

1026
  """
1027
  # Disable compression for all moves as these are all within the same cluster
1028
  compress = constants.IEC_NONE
1029

    
1030
  logging.debug("Source node %s, destination node %s, compression '%s'",
1031
                src_node, dest_node, compress)
1032

    
1033
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1034
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1035
                                  src_node, None, dest_node, dest_ip)
1036
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1037
                                 src_node, src_cbs, dest_node, dest_ip)
1038

    
1039
  all_dtp = []
1040

    
1041
  base_magic = utils.GenerateSecret(6)
1042

    
1043
  ieloop = ImportExportLoop(lu)
1044
  try:
1045
    for idx, transfer in enumerate(all_transfers):
1046
      if transfer:
1047
        feedback_fn("Exporting %s from %s to %s" %
1048
                    (transfer.name, src_node, dest_node))
1049

    
1050
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1051
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1052
                                           compress=compress, magic=magic)
1053

    
1054
        dtp = _DiskTransferPrivate(transfer, True, opts)
1055

    
1056
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1057
                        transfer.dest_io, transfer.dest_ioargs,
1058
                        timeouts, dest_cbs, private=dtp)
1059
        ieloop.Add(di)
1060

    
1061
        dtp.dest_import = di
1062
      else:
1063
        dtp = _DiskTransferPrivate(None, False, None)
1064

    
1065
      all_dtp.append(dtp)
1066

    
1067
    ieloop.Run()
1068
  finally:
1069
    ieloop.FinalizeAll()
1070

    
1071
  assert len(all_dtp) == len(all_transfers)
1072
  assert compat.all((dtp.src_export is None or
1073
                      dtp.src_export.success is not None) and
1074
                     (dtp.dest_import is None or
1075
                      dtp.dest_import.success is not None)
1076
                     for dtp in all_dtp), \
1077
         "Not all imports/exports are finalized"
1078

    
1079
  return [bool(dtp.success) for dtp in all_dtp]
1080

    
1081

    
1082
class _RemoteExportCb(ImportExportCbBase):
1083
  def __init__(self, feedback_fn, disk_count):
1084
    """Initializes this class.
1085

1086
    """
1087
    ImportExportCbBase.__init__(self)
1088
    self._feedback_fn = feedback_fn
1089
    self._dresults = [None] * disk_count
1090

    
1091
  @property
1092
  def disk_results(self):
1093
    """Returns per-disk results.
1094

1095
    """
1096
    return self._dresults
1097

    
1098
  def ReportConnected(self, ie, private):
1099
    """Called when a connection has been established.
1100

1101
    """
1102
    (idx, _) = private
1103

    
1104
    self._feedback_fn("Disk %s is now sending data" % idx)
1105

    
1106
  def ReportProgress(self, ie, private):
1107
    """Called when new progress information should be reported.
1108

1109
    """
1110
    (idx, _) = private
1111

    
1112
    progress = ie.progress
1113
    if not progress:
1114
      return
1115

    
1116
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1117

    
1118
  def ReportFinished(self, ie, private):
1119
    """Called when a transfer has finished.
1120

1121
    """
1122
    (idx, finished_fn) = private
1123

    
1124
    if ie.success:
1125
      self._feedback_fn("Disk %s finished sending data" % idx)
1126
    else:
1127
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1128
                        (idx, ie.final_message, ie.recent_output))
1129

    
1130
    self._dresults[idx] = bool(ie.success)
1131

    
1132
    if finished_fn:
1133
      finished_fn()
1134

    
1135

    
1136
class ExportInstanceHelper:
1137
  def __init__(self, lu, feedback_fn, instance):
1138
    """Initializes this class.
1139

1140
    @param lu: Logical unit instance
1141
    @param feedback_fn: Feedback function
1142
    @type instance: L{objects.Instance}
1143
    @param instance: Instance object
1144

1145
    """
1146
    self._lu = lu
1147
    self._feedback_fn = feedback_fn
1148
    self._instance = instance
1149

    
1150
    self._snap_disks = []
1151
    self._removed_snaps = [False] * len(instance.disks)
1152

    
1153
  def CreateSnapshots(self):
1154
    """Creates an LVM snapshot for every disk of the instance.
1155

1156
    """
1157
    assert not self._snap_disks
1158

    
1159
    instance = self._instance
1160
    src_node = instance.primary_node
1161

    
1162
    for idx, disk in enumerate(instance.disks):
1163
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1164
                        (idx, src_node))
1165

    
1166
      # result.payload will be a snapshot of an lvm leaf of the one we
1167
      # passed
1168
      result = self._lu.rpc.call_blockdev_snapshot(src_node,
1169
                                                   (disk, instance),
1170
                                                   None)
1171
      new_dev = False
1172
      msg = result.fail_msg
1173
      if msg:
1174
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1175
                            idx, src_node, msg)
1176
      elif (not isinstance(result.payload, (tuple, list)) or
1177
            len(result.payload) != 2):
1178
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1179
                            " result '%s'", idx, src_node, result.payload)
1180
      else:
1181
        disk_id = tuple(result.payload)
1182
        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1183
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1184
                               logical_id=disk_id, physical_id=disk_id,
1185
                               iv_name=disk.iv_name,
1186
                               params=disk_params)
1187

    
1188
      self._snap_disks.append(new_dev)
1189

    
1190
    assert len(self._snap_disks) == len(instance.disks)
1191
    assert len(self._removed_snaps) == len(instance.disks)
1192

    
1193
  def _RemoveSnapshot(self, disk_index):
1194
    """Removes an LVM snapshot.
1195

1196
    @type disk_index: number
1197
    @param disk_index: Index of the snapshot to be removed
1198

1199
    """
1200
    disk = self._snap_disks[disk_index]
1201
    if disk and not self._removed_snaps[disk_index]:
1202
      src_node = self._instance.primary_node
1203

    
1204
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1205
                        (disk_index, src_node))
1206

    
1207
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1208
      if result.fail_msg:
1209
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1210
                            " %s: %s", disk_index, src_node, result.fail_msg)
1211
      else:
1212
        self._removed_snaps[disk_index] = True
1213

    
1214
  def LocalExport(self, dest_node):
1215
    """Intra-cluster instance export.
1216

1217
    @type dest_node: L{objects.Node}
1218
    @param dest_node: Destination node
1219

1220
    """
1221
    instance = self._instance
1222
    src_node = instance.primary_node
1223

    
1224
    assert len(self._snap_disks) == len(instance.disks)
1225

    
1226
    transfers = []
1227

    
1228
    for idx, dev in enumerate(self._snap_disks):
1229
      if not dev:
1230
        transfers.append(None)
1231
        continue
1232

    
1233
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1234
                            dev.physical_id[1])
1235

    
1236
      finished_fn = compat.partial(self._TransferFinished, idx)
1237

    
1238
      # FIXME: pass debug option from opcode to backend
1239
      dt = DiskTransfer("snapshot/%s" % idx,
1240
                        constants.IEIO_SCRIPT, (dev, idx),
1241
                        constants.IEIO_FILE, (path, ),
1242
                        finished_fn)
1243
      transfers.append(dt)
1244

    
1245
    # Actually export data
1246
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1247
                                    src_node, dest_node.name,
1248
                                    dest_node.secondary_ip,
1249
                                    instance, transfers)
1250

    
1251
    assert len(dresults) == len(instance.disks)
1252

    
1253
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1254
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1255
                                               self._snap_disks)
1256
    msg = result.fail_msg
1257
    fin_resu = not msg
1258
    if msg:
1259
      self._lu.LogWarning("Could not finalize export for instance %s"
1260
                          " on node %s: %s", instance.name, dest_node.name, msg)
1261

    
1262
    return (fin_resu, dresults)
1263

    
1264
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265
    """Inter-cluster instance export.
1266

1267
    @type disk_info: list
1268
    @param disk_info: Per-disk destination information
1269
    @type key_name: string
1270
    @param key_name: Name of X509 key to use
1271
    @type dest_ca_pem: string
1272
    @param dest_ca_pem: Destination X509 CA in PEM format
1273
    @type timeouts: L{ImportExportTimeouts}
1274
    @param timeouts: Timeouts for this import
1275

1276
    """
1277
    instance = self._instance
1278

    
1279
    assert len(disk_info) == len(instance.disks)
1280

    
1281
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1282

    
1283
    ieloop = ImportExportLoop(self._lu)
1284
    try:
1285
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1286
                                                           disk_info)):
1287
        # Decide whether to use IPv6
1288
        ipv6 = netutils.IP6Address.IsValid(host)
1289

    
1290
        opts = objects.ImportExportOptions(key_name=key_name,
1291
                                           ca_pem=dest_ca_pem,
1292
                                           magic=magic, ipv6=ipv6)
1293

    
1294
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1295
        finished_fn = compat.partial(self._TransferFinished, idx)
1296
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1297
                              opts, host, port, instance, "disk%d" % idx,
1298
                              constants.IEIO_SCRIPT, (dev, idx),
1299
                              timeouts, cbs, private=(idx, finished_fn)))
1300

    
1301
      ieloop.Run()
1302
    finally:
1303
      ieloop.FinalizeAll()
1304

    
1305
    return (True, cbs.disk_results)
1306

    
1307
  def _TransferFinished(self, idx):
1308
    """Called once a transfer has finished.
1309

1310
    @type idx: number
1311
    @param idx: Disk index
1312

1313
    """
1314
    logging.debug("Transfer %s finished", idx)
1315
    self._RemoveSnapshot(idx)
1316

    
1317
  def Cleanup(self):
1318
    """Remove all snapshots.
1319

1320
    """
1321
    assert len(self._removed_snaps) == len(self._instance.disks)
1322
    for idx in range(len(self._instance.disks)):
1323
      self._RemoveSnapshot(idx)
1324

    
1325

    
1326
class _RemoteImportCb(ImportExportCbBase):
1327
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1328
               external_address):
1329
    """Initializes this class.
1330

1331
    @type cds: string
1332
    @param cds: Cluster domain secret
1333
    @type x509_cert_pem: string
1334
    @param x509_cert_pem: CA used for signing import key
1335
    @type disk_count: number
1336
    @param disk_count: Number of disks
1337
    @type external_address: string
1338
    @param external_address: External address of destination node
1339

1340
    """
1341
    ImportExportCbBase.__init__(self)
1342
    self._feedback_fn = feedback_fn
1343
    self._cds = cds
1344
    self._x509_cert_pem = x509_cert_pem
1345
    self._disk_count = disk_count
1346
    self._external_address = external_address
1347

    
1348
    self._dresults = [None] * disk_count
1349
    self._daemon_port = [None] * disk_count
1350

    
1351
    self._salt = utils.GenerateSecret(8)
1352

    
1353
  @property
1354
  def disk_results(self):
1355
    """Returns per-disk results.
1356

1357
    """
1358
    return self._dresults
1359

    
1360
  def _CheckAllListening(self):
1361
    """Checks whether all daemons are listening.
1362

1363
    If all daemons are listening, the information is sent to the client.
1364

1365
    """
1366
    if not compat.all(dp is not None for dp in self._daemon_port):
1367
      return
1368

    
1369
    host = self._external_address
1370

    
1371
    disks = []
1372
    for idx, (port, magic) in enumerate(self._daemon_port):
1373
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1374
                                               idx, host, port, magic))
1375

    
1376
    assert len(disks) == self._disk_count
1377

    
1378
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1379
      "disks": disks,
1380
      "x509_ca": self._x509_cert_pem,
1381
      })
1382

    
1383
  def ReportListening(self, ie, private, _):
1384
    """Called when daemon started listening.
1385

1386
    """
1387
    (idx, ) = private
1388

    
1389
    self._feedback_fn("Disk %s is now listening" % idx)
1390

    
1391
    assert self._daemon_port[idx] is None
1392

    
1393
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1394

    
1395
    self._CheckAllListening()
1396

    
1397
  def ReportConnected(self, ie, private):
1398
    """Called when a connection has been established.
1399

1400
    """
1401
    (idx, ) = private
1402

    
1403
    self._feedback_fn("Disk %s is now receiving data" % idx)
1404

    
1405
  def ReportFinished(self, ie, private):
1406
    """Called when a transfer has finished.
1407

1408
    """
1409
    (idx, ) = private
1410

    
1411
    # Daemon is certainly no longer listening
1412
    self._daemon_port[idx] = None
1413

    
1414
    if ie.success:
1415
      self._feedback_fn("Disk %s finished receiving data" % idx)
1416
    else:
1417
      self._feedback_fn(("Disk %s failed to receive data: %s"
1418
                         " (recent output: %s)") %
1419
                        (idx, ie.final_message, ie.recent_output))
1420

    
1421
    self._dresults[idx] = bool(ie.success)
1422

    
1423

    
1424
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1425
                 cds, timeouts):
1426
  """Imports an instance from another cluster.
1427

1428
  @param lu: Logical unit instance
1429
  @param feedback_fn: Feedback function
1430
  @type instance: L{objects.Instance}
1431
  @param instance: Instance object
1432
  @type pnode: L{objects.Node}
1433
  @param pnode: Primary node of instance as an object
1434
  @type source_x509_ca: OpenSSL.crypto.X509
1435
  @param source_x509_ca: Import source's X509 CA
1436
  @type cds: string
1437
  @param cds: Cluster domain secret
1438
  @type timeouts: L{ImportExportTimeouts}
1439
  @param timeouts: Timeouts for this import
1440

1441
  """
1442
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1443
                                                  source_x509_ca)
1444

    
1445
  magic_base = utils.GenerateSecret(6)
1446

    
1447
  # Decide whether to use IPv6
1448
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1449

    
1450
  # Create crypto key
1451
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1452
                                        constants.RIE_CERT_VALIDITY)
1453
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1454

    
1455
  (x509_key_name, x509_cert_pem) = result.payload
1456
  try:
1457
    # Load certificate
1458
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459
                                                x509_cert_pem)
1460

    
1461
    # Sign certificate
1462
    signed_x509_cert_pem = \
1463
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1464

    
1465
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1466
                          len(instance.disks), pnode.primary_ip)
1467

    
1468
    ieloop = ImportExportLoop(lu)
1469
    try:
1470
      for idx, dev in enumerate(instance.disks):
1471
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1472

    
1473
        # Import daemon options
1474
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1475
                                           ca_pem=source_ca_pem,
1476
                                           magic=magic, ipv6=ipv6)
1477

    
1478
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1479
                              "disk%d" % idx,
1480
                              constants.IEIO_SCRIPT, (dev, idx),
1481
                              timeouts, cbs, private=(idx, )))
1482

    
1483
      ieloop.Run()
1484
    finally:
1485
      ieloop.FinalizeAll()
1486
  finally:
1487
    # Remove crypto key and certificate
1488
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1489
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1490

    
1491
  return cbs.disk_results
1492

    
1493

    
1494
def _GetImportExportHandshakeMessage(version):
1495
  """Returns the handshake message for a RIE protocol version.
1496

1497
  @type version: number
1498

1499
  """
1500
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501

    
1502

    
1503
def ComputeRemoteExportHandshake(cds):
1504
  """Computes the remote import/export handshake.
1505

1506
  @type cds: string
1507
  @param cds: Cluster domain secret
1508

1509
  """
1510
  salt = utils.GenerateSecret(8)
1511
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1512
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1513

    
1514

    
1515
def CheckRemoteExportHandshake(cds, handshake):
1516
  """Checks the handshake of a remote import/export.
1517

1518
  @type cds: string
1519
  @param cds: Cluster domain secret
1520
  @type handshake: sequence
1521
  @param handshake: Handshake sent by remote peer
1522

1523
  """
1524
  try:
1525
    (version, hmac_digest, hmac_salt) = handshake
1526
  except (TypeError, ValueError), err:
1527
    return "Invalid data: %s" % err
1528

    
1529
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1530
                              hmac_digest, salt=hmac_salt):
1531
    return "Hash didn't match, clusters don't share the same domain secret"
1532

    
1533
  if version != constants.RIE_VERSION:
1534
    return ("Clusters don't have the same remote import/export protocol"
1535
            " (local=%s, remote=%s)" %
1536
            (constants.RIE_VERSION, version))
1537

    
1538
  return None
1539

    
1540

    
1541
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1542
  """Returns the hashed text for import/export disk information.
1543

1544
  @type disk_index: number
1545
  @param disk_index: Index of disk (included in hash)
1546
  @type host: string
1547
  @param host: Hostname
1548
  @type port: number
1549
  @param port: Daemon port
1550
  @type magic: string
1551
  @param magic: Magic value
1552

1553
  """
1554
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555

    
1556

    
1557
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1558
  """Verifies received disk information for an export.
1559

1560
  @type cds: string
1561
  @param cds: Cluster domain secret
1562
  @type disk_index: number
1563
  @param disk_index: Index of disk (included in hash)
1564
  @type disk_info: sequence
1565
  @param disk_info: Disk information sent by remote peer
1566

1567
  """
1568
  try:
1569
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1570
  except (TypeError, ValueError), err:
1571
    raise errors.GenericError("Invalid data: %s" % err)
1572

    
1573
  if not (host and port and magic):
1574
    raise errors.GenericError("Missing destination host, port or magic")
1575

    
1576
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1577

    
1578
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1579
    raise errors.GenericError("HMAC is wrong")
1580

    
1581
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1582
    destination = host
1583
  else:
1584
    destination = netutils.Hostname.GetNormalizedName(host)
1585

    
1586
  return (destination,
1587
          utils.ValidateServiceName(port),
1588
          magic)
1589

    
1590

    
1591
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1592
  """Computes the signed disk information for a remote import.
1593

1594
  @type cds: string
1595
  @param cds: Cluster domain secret
1596
  @type salt: string
1597
  @param salt: HMAC salt
1598
  @type disk_index: number
1599
  @param disk_index: Index of disk (included in hash)
1600
  @type host: string
1601
  @param host: Hostname
1602
  @type port: number
1603
  @param port: Daemon port
1604
  @type magic: string
1605
  @param magic: Magic value
1606

1607
  """
1608
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1609
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1610
  return (host, port, magic, hmac_digest, salt)
1611

    
1612

    
1613
def CalculateGroupIPolicy(cluster, group):
1614
  """Calculate instance policy for group.
1615

1616
  """
1617
  return cluster.SimpleFillIPolicy(group.ipolicy)
1618

    
1619

    
1620
def ComputeDiskSize(disk_template, disks):
1621
  """Compute disk size requirements according to disk template
1622

1623
  """
1624
  # Required free disk space as a function of disk and swap space
1625
  req_size_dict = {
1626
    constants.DT_DISKLESS: 0,
1627
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1628
    # 128 MB are added for drbd metadata for each disk
1629
    constants.DT_DRBD8:
1630
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1631
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1632
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1633
    constants.DT_BLOCK: 0,
1634
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1635
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1636
  }
1637

    
1638
  if disk_template not in req_size_dict:
1639
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1640
                                 " is unknown" % disk_template)
1641

    
1642
  return req_size_dict[disk_template]