Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 23e3c9b7

History | View | Annotate | Download (114.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70
_ALLOWED_CLEAN_DIRS = frozenset([
71
  pathutils.DATA_DIR,
72
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
73
  pathutils.QUEUE_DIR,
74
  pathutils.CRYPTO_KEYS_DIR,
75
  ])
76
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77
_X509_KEY_FILE = "key"
78
_X509_CERT_FILE = "cert"
79
_IES_STATUS_FILE = "status"
80
_IES_PID_FILE = "pid"
81
_IES_CA_FILE = "ca"
82

    
83
#: Valid LVS output line regex
84
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85

    
86
# Actions for the master setup script
87
_MASTER_START = "start"
88
_MASTER_STOP = "stop"
89

    
90

    
91
class RPCFail(Exception):
92
  """Class denoting RPC failure.
93

94
  Its argument is the error message.
95

96
  """
97

    
98

    
99
def _Fail(msg, *args, **kwargs):
100
  """Log an error and the raise an RPCFail exception.
101

102
  This exception is then handled specially in the ganeti daemon and
103
  turned into a 'failed' return type. As such, this function is a
104
  useful shortcut for logging the error and returning it to the master
105
  daemon.
106

107
  @type msg: string
108
  @param msg: the text of the exception
109
  @raise RPCFail
110

111
  """
112
  if args:
113
    msg = msg % args
114
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
115
    if "exc" in kwargs and kwargs["exc"]:
116
      logging.exception(msg)
117
    else:
118
      logging.error(msg)
119
  raise RPCFail(msg)
120

    
121

    
122
def _GetConfig():
123
  """Simple wrapper to return a SimpleStore.
124

125
  @rtype: L{ssconf.SimpleStore}
126
  @return: a SimpleStore instance
127

128
  """
129
  return ssconf.SimpleStore()
130

    
131

    
132
def _GetSshRunner(cluster_name):
133
  """Simple wrapper to return an SshRunner.
134

135
  @type cluster_name: str
136
  @param cluster_name: the cluster name, which is needed
137
      by the SshRunner constructor
138
  @rtype: L{ssh.SshRunner}
139
  @return: an SshRunner instance
140

141
  """
142
  return ssh.SshRunner(cluster_name)
143

    
144

    
145
def _Decompress(data):
146
  """Unpacks data compressed by the RPC client.
147

148
  @type data: list or tuple
149
  @param data: Data sent by RPC client
150
  @rtype: str
151
  @return: Decompressed data
152

153
  """
154
  assert isinstance(data, (list, tuple))
155
  assert len(data) == 2
156
  (encoding, content) = data
157
  if encoding == constants.RPC_ENCODING_NONE:
158
    return content
159
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160
    return zlib.decompress(base64.b64decode(content))
161
  else:
162
    raise AssertionError("Unknown data encoding")
163

    
164

    
165
def _CleanDirectory(path, exclude=None):
166
  """Removes all regular files in a directory.
167

168
  @type path: str
169
  @param path: the directory to clean
170
  @type exclude: list
171
  @param exclude: list of files to be excluded, defaults
172
      to the empty list
173

174
  """
175
  if path not in _ALLOWED_CLEAN_DIRS:
176
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
177
          path)
178

    
179
  if not os.path.isdir(path):
180
    return
181
  if exclude is None:
182
    exclude = []
183
  else:
184
    # Normalize excluded paths
185
    exclude = [os.path.normpath(i) for i in exclude]
186

    
187
  for rel_name in utils.ListVisibleFiles(path):
188
    full_name = utils.PathJoin(path, rel_name)
189
    if full_name in exclude:
190
      continue
191
    if os.path.isfile(full_name) and not os.path.islink(full_name):
192
      utils.RemoveFile(full_name)
193

    
194

    
195
def _BuildUploadFileList():
196
  """Build the list of allowed upload files.
197

198
  This is abstracted so that it's built only once at module import time.
199

200
  """
201
  allowed_files = set([
202
    pathutils.CLUSTER_CONF_FILE,
203
    pathutils.ETC_HOSTS,
204
    pathutils.SSH_KNOWN_HOSTS_FILE,
205
    pathutils.VNC_PASSWORD_FILE,
206
    pathutils.RAPI_CERT_FILE,
207
    pathutils.SPICE_CERT_FILE,
208
    pathutils.SPICE_CACERT_FILE,
209
    pathutils.RAPI_USERS_FILE,
210
    pathutils.CONFD_HMAC_KEY,
211
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
212
    ])
213

    
214
  for hv_name in constants.HYPER_TYPES:
215
    hv_class = hypervisor.GetHypervisorClass(hv_name)
216
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
217

    
218
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
219
    "Allowed file storage paths should never be uploaded via RPC"
220

    
221
  return frozenset(allowed_files)
222

    
223

    
224
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
225

    
226

    
227
def JobQueuePurge():
228
  """Removes job queue files and archived jobs.
229

230
  @rtype: tuple
231
  @return: True, None
232

233
  """
234
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
235
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
236

    
237

    
238
def GetMasterInfo():
239
  """Returns master information.
240

241
  This is an utility function to compute master information, either
242
  for consumption here or from the node daemon.
243

244
  @rtype: tuple
245
  @return: master_netdev, master_ip, master_name, primary_ip_family,
246
    master_netmask
247
  @raise RPCFail: in case of errors
248

249
  """
250
  try:
251
    cfg = _GetConfig()
252
    master_netdev = cfg.GetMasterNetdev()
253
    master_ip = cfg.GetMasterIP()
254
    master_netmask = cfg.GetMasterNetmask()
255
    master_node = cfg.GetMasterNode()
256
    primary_ip_family = cfg.GetPrimaryIPFamily()
257
  except errors.ConfigurationError, err:
258
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
259
  return (master_netdev, master_ip, master_node, primary_ip_family,
260
          master_netmask)
261

    
262

    
263
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
264
  """Decorator that runs hooks before and after the decorated function.
265

266
  @type hook_opcode: string
267
  @param hook_opcode: opcode of the hook
268
  @type hooks_path: string
269
  @param hooks_path: path of the hooks
270
  @type env_builder_fn: function
271
  @param env_builder_fn: function that returns a dictionary containing the
272
    environment variables for the hooks. Will get all the parameters of the
273
    decorated function.
274
  @raise RPCFail: in case of pre-hook failure
275

276
  """
277
  def decorator(fn):
278
    def wrapper(*args, **kwargs):
279
      _, myself = ssconf.GetMasterAndMyself()
280
      nodes = ([myself], [myself])  # these hooks run locally
281

    
282
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
283

    
284
      cfg = _GetConfig()
285
      hr = HooksRunner()
286
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
287
                            None, env_fn, logging.warning, cfg.GetClusterName(),
288
                            cfg.GetMasterNode())
289

    
290
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
291
      result = fn(*args, **kwargs)
292
      hm.RunPhase(constants.HOOKS_PHASE_POST)
293

    
294
      return result
295
    return wrapper
296
  return decorator
297

    
298

    
299
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
300
  """Builds environment variables for master IP hooks.
301

302
  @type master_params: L{objects.MasterNetworkParameters}
303
  @param master_params: network parameters of the master
304
  @type use_external_mip_script: boolean
305
  @param use_external_mip_script: whether to use an external master IP
306
    address setup script (unused, but necessary per the implementation of the
307
    _RunLocalHooks decorator)
308

309
  """
310
  # pylint: disable=W0613
311
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
312
  env = {
313
    "MASTER_NETDEV": master_params.netdev,
314
    "MASTER_IP": master_params.ip,
315
    "MASTER_NETMASK": str(master_params.netmask),
316
    "CLUSTER_IP_VERSION": str(ver),
317
  }
318

    
319
  return env
320

    
321

    
322
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
323
  """Execute the master IP address setup script.
324

325
  @type master_params: L{objects.MasterNetworkParameters}
326
  @param master_params: network parameters of the master
327
  @type action: string
328
  @param action: action to pass to the script. Must be one of
329
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
330
  @type use_external_mip_script: boolean
331
  @param use_external_mip_script: whether to use an external master IP
332
    address setup script
333
  @raise backend.RPCFail: if there are errors during the execution of the
334
    script
335

336
  """
337
  env = _BuildMasterIpEnv(master_params)
338

    
339
  if use_external_mip_script:
340
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
341
  else:
342
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
343

    
344
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
345

    
346
  if result.failed:
347
    _Fail("Failed to %s the master IP. Script return value: %s" %
348
          (action, result.exit_code), log=True)
349

    
350

    
351
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
352
               _BuildMasterIpEnv)
353
def ActivateMasterIp(master_params, use_external_mip_script):
354
  """Activate the IP address of the master daemon.
355

356
  @type master_params: L{objects.MasterNetworkParameters}
357
  @param master_params: network parameters of the master
358
  @type use_external_mip_script: boolean
359
  @param use_external_mip_script: whether to use an external master IP
360
    address setup script
361
  @raise RPCFail: in case of errors during the IP startup
362

363
  """
364
  _RunMasterSetupScript(master_params, _MASTER_START,
365
                        use_external_mip_script)
366

    
367

    
368
def StartMasterDaemons(no_voting):
369
  """Activate local node as master node.
370

371
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
372

373
  @type no_voting: boolean
374
  @param no_voting: whether to start ganeti-masterd without a node vote
375
      but still non-interactively
376
  @rtype: None
377

378
  """
379

    
380
  if no_voting:
381
    masterd_args = "--no-voting --yes-do-it"
382
  else:
383
    masterd_args = ""
384

    
385
  env = {
386
    "EXTRA_MASTERD_ARGS": masterd_args,
387
    }
388

    
389
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
390
  if result.failed:
391
    msg = "Can't start Ganeti master: %s" % result.output
392
    logging.error(msg)
393
    _Fail(msg)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
397
               _BuildMasterIpEnv)
398
def DeactivateMasterIp(master_params, use_external_mip_script):
399
  """Deactivate the master IP on this node.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP turndown
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_STOP,
410
                        use_external_mip_script)
411

    
412

    
413
def StopMasterDaemons():
414
  """Stop the master daemons on this node.
415

416
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
417

418
  @rtype: None
419

420
  """
421
  # TODO: log and report back to the caller the error failures; we
422
  # need to decide in which case we fail the RPC for this
423

    
424
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
425
  if result.failed:
426
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
427
                  " and error %s",
428
                  result.cmd, result.exit_code, result.output)
429

    
430

    
431
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
432
  """Change the netmask of the master IP.
433

434
  @param old_netmask: the old value of the netmask
435
  @param netmask: the new value of the netmask
436
  @param master_ip: the master IP
437
  @param master_netdev: the master network device
438

439
  """
440
  if old_netmask == netmask:
441
    return
442

    
443
  if not netutils.IPAddress.Own(master_ip):
444
    _Fail("The master IP address is not up, not attempting to change its"
445
          " netmask")
446

    
447
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
448
                         "%s/%s" % (master_ip, netmask),
449
                         "dev", master_netdev, "label",
450
                         "%s:0" % master_netdev])
451
  if result.failed:
452
    _Fail("Could not set the new netmask on the master IP address")
453

    
454
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
455
                         "%s/%s" % (master_ip, old_netmask),
456
                         "dev", master_netdev, "label",
457
                         "%s:0" % master_netdev])
458
  if result.failed:
459
    _Fail("Could not bring down the master IP address with the old netmask")
460

    
461

    
462
def EtcHostsModify(mode, host, ip):
463
  """Modify a host entry in /etc/hosts.
464

465
  @param mode: The mode to operate. Either add or remove entry
466
  @param host: The host to operate on
467
  @param ip: The ip associated with the entry
468

469
  """
470
  if mode == constants.ETC_HOSTS_ADD:
471
    if not ip:
472
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
473
              " present")
474
    utils.AddHostToEtcHosts(host, ip)
475
  elif mode == constants.ETC_HOSTS_REMOVE:
476
    if ip:
477
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
478
              " parameter is present")
479
    utils.RemoveHostFromEtcHosts(host)
480
  else:
481
    RPCFail("Mode not supported")
482

    
483

    
484
def LeaveCluster(modify_ssh_setup):
485
  """Cleans up and remove the current node.
486

487
  This function cleans up and prepares the current node to be removed
488
  from the cluster.
489

490
  If processing is successful, then it raises an
491
  L{errors.QuitGanetiException} which is used as a special case to
492
  shutdown the node daemon.
493

494
  @param modify_ssh_setup: boolean
495

496
  """
497
  _CleanDirectory(pathutils.DATA_DIR)
498
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
499
  JobQueuePurge()
500

    
501
  if modify_ssh_setup:
502
    try:
503
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
504

    
505
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
506

    
507
      utils.RemoveFile(priv_key)
508
      utils.RemoveFile(pub_key)
509
    except errors.OpExecError:
510
      logging.exception("Error while processing ssh files")
511

    
512
  try:
513
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
514
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
515
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
516
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
517
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
518
  except: # pylint: disable=W0702
519
    logging.exception("Error while removing cluster secrets")
520

    
521
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
522
  if result.failed:
523
    logging.error("Command %s failed with exitcode %s and error %s",
524
                  result.cmd, result.exit_code, result.output)
525

    
526
  # Raise a custom exception (handled in ganeti-noded)
527
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
528

    
529

    
530
def _GetVgInfo(name):
531
  """Retrieves information about a LVM volume group.
532

533
  """
534
  # TODO: GetVGInfo supports returning information for multiple VGs at once
535
  vginfo = bdev.LogicalVolume.GetVGInfo([name])
536
  if vginfo:
537
    vg_free = int(round(vginfo[0][0], 0))
538
    vg_size = int(round(vginfo[0][1], 0))
539
  else:
540
    vg_free = None
541
    vg_size = None
542

    
543
  return {
544
    "name": name,
545
    "vg_free": vg_free,
546
    "vg_size": vg_size,
547
    }
548

    
549

    
550
def _GetHvInfo(name):
551
  """Retrieves node information from a hypervisor.
552

553
  The information returned depends on the hypervisor. Common items:
554

555
    - vg_size is the size of the configured volume group in MiB
556
    - vg_free is the free size of the volume group in MiB
557
    - memory_dom0 is the memory allocated for domain0 in MiB
558
    - memory_free is the currently available (free) ram in MiB
559
    - memory_total is the total number of ram in MiB
560
    - hv_version: the hypervisor version, if available
561

562
  """
563
  return hypervisor.GetHypervisor(name).GetNodeInfo()
564

    
565

    
566
def _GetNamedNodeInfo(names, fn):
567
  """Calls C{fn} for all names in C{names} and returns a dictionary.
568

569
  @rtype: None or dict
570

571
  """
572
  if names is None:
573
    return None
574
  else:
575
    return map(fn, names)
576

    
577

    
578
def GetNodeInfo(vg_names, hv_names):
579
  """Gives back a hash with different information about the node.
580

581
  @type vg_names: list of string
582
  @param vg_names: Names of the volume groups to ask for disk space information
583
  @type hv_names: list of string
584
  @param hv_names: Names of the hypervisors to ask for node information
585
  @rtype: tuple; (string, None/dict, None/dict)
586
  @return: Tuple containing boot ID, volume group information and hypervisor
587
    information
588

589
  """
590
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
591
  vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
592
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
593

    
594
  return (bootid, vg_info, hv_info)
595

    
596

    
597
def VerifyNode(what, cluster_name):
598
  """Verify the status of the local node.
599

600
  Based on the input L{what} parameter, various checks are done on the
601
  local node.
602

603
  If the I{filelist} key is present, this list of
604
  files is checksummed and the file/checksum pairs are returned.
605

606
  If the I{nodelist} key is present, we check that we have
607
  connectivity via ssh with the target nodes (and check the hostname
608
  report).
609

610
  If the I{node-net-test} key is present, we check that we have
611
  connectivity to the given nodes via both primary IP and, if
612
  applicable, secondary IPs.
613

614
  @type what: C{dict}
615
  @param what: a dictionary of things to check:
616
      - filelist: list of files for which to compute checksums
617
      - nodelist: list of nodes we should check ssh communication with
618
      - node-net-test: list of nodes we should check node daemon port
619
        connectivity with
620
      - hypervisor: list with hypervisors to run the verify for
621
  @rtype: dict
622
  @return: a dictionary with the same keys as the input dict, and
623
      values representing the result of the checks
624

625
  """
626
  result = {}
627
  my_name = netutils.Hostname.GetSysName()
628
  port = netutils.GetDaemonPort(constants.NODED)
629
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
630

    
631
  if constants.NV_HYPERVISOR in what and vm_capable:
632
    result[constants.NV_HYPERVISOR] = tmp = {}
633
    for hv_name in what[constants.NV_HYPERVISOR]:
634
      try:
635
        val = hypervisor.GetHypervisor(hv_name).Verify()
636
      except errors.HypervisorError, err:
637
        val = "Error while checking hypervisor: %s" % str(err)
638
      tmp[hv_name] = val
639

    
640
  if constants.NV_HVPARAMS in what and vm_capable:
641
    result[constants.NV_HVPARAMS] = tmp = []
642
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
643
      try:
644
        logging.info("Validating hv %s, %s", hv_name, hvparms)
645
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
646
      except errors.HypervisorError, err:
647
        tmp.append((source, hv_name, str(err)))
648

    
649
  if constants.NV_FILELIST in what:
650
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
651
                                              what[constants.NV_FILELIST]))
652
    result[constants.NV_FILELIST] = \
653
      dict((vcluster.MakeVirtualPath(key), value)
654
           for (key, value) in fingerprints.items())
655

    
656
  if constants.NV_NODELIST in what:
657
    (nodes, bynode) = what[constants.NV_NODELIST]
658

    
659
    # Add nodes from other groups (different for each node)
660
    try:
661
      nodes.extend(bynode[my_name])
662
    except KeyError:
663
      pass
664

    
665
    # Use a random order
666
    random.shuffle(nodes)
667

    
668
    # Try to contact all nodes
669
    val = {}
670
    for node in nodes:
671
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
672
      if not success:
673
        val[node] = message
674

    
675
    result[constants.NV_NODELIST] = val
676

    
677
  if constants.NV_NODENETTEST in what:
678
    result[constants.NV_NODENETTEST] = tmp = {}
679
    my_pip = my_sip = None
680
    for name, pip, sip in what[constants.NV_NODENETTEST]:
681
      if name == my_name:
682
        my_pip = pip
683
        my_sip = sip
684
        break
685
    if not my_pip:
686
      tmp[my_name] = ("Can't find my own primary/secondary IP"
687
                      " in the node list")
688
    else:
689
      for name, pip, sip in what[constants.NV_NODENETTEST]:
690
        fail = []
691
        if not netutils.TcpPing(pip, port, source=my_pip):
692
          fail.append("primary")
693
        if sip != pip:
694
          if not netutils.TcpPing(sip, port, source=my_sip):
695
            fail.append("secondary")
696
        if fail:
697
          tmp[name] = ("failure using the %s interface(s)" %
698
                       " and ".join(fail))
699

    
700
  if constants.NV_MASTERIP in what:
701
    # FIXME: add checks on incoming data structures (here and in the
702
    # rest of the function)
703
    master_name, master_ip = what[constants.NV_MASTERIP]
704
    if master_name == my_name:
705
      source = constants.IP4_ADDRESS_LOCALHOST
706
    else:
707
      source = None
708
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
709
                                                     source=source)
710

    
711
  if constants.NV_USERSCRIPTS in what:
712
    result[constants.NV_USERSCRIPTS] = \
713
      [script for script in what[constants.NV_USERSCRIPTS]
714
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
715

    
716
  if constants.NV_OOB_PATHS in what:
717
    result[constants.NV_OOB_PATHS] = tmp = []
718
    for path in what[constants.NV_OOB_PATHS]:
719
      try:
720
        st = os.stat(path)
721
      except OSError, err:
722
        tmp.append("error stating out of band helper: %s" % err)
723
      else:
724
        if stat.S_ISREG(st.st_mode):
725
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
726
            tmp.append(None)
727
          else:
728
            tmp.append("out of band helper %s is not executable" % path)
729
        else:
730
          tmp.append("out of band helper %s is not a file" % path)
731

    
732
  if constants.NV_LVLIST in what and vm_capable:
733
    try:
734
      val = GetVolumeList(utils.ListVolumeGroups().keys())
735
    except RPCFail, err:
736
      val = str(err)
737
    result[constants.NV_LVLIST] = val
738

    
739
  if constants.NV_INSTANCELIST in what and vm_capable:
740
    # GetInstanceList can fail
741
    try:
742
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
743
    except RPCFail, err:
744
      val = str(err)
745
    result[constants.NV_INSTANCELIST] = val
746

    
747
  if constants.NV_VGLIST in what and vm_capable:
748
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
749

    
750
  if constants.NV_PVLIST in what and vm_capable:
751
    result[constants.NV_PVLIST] = \
752
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
753
                                   filter_allocatable=False)
754

    
755
  if constants.NV_VERSION in what:
756
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
757
                                    constants.RELEASE_VERSION)
758

    
759
  if constants.NV_HVINFO in what and vm_capable:
760
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
761
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
762

    
763
  if constants.NV_DRBDLIST in what and vm_capable:
764
    try:
765
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
766
    except errors.BlockDeviceError, err:
767
      logging.warning("Can't get used minors list", exc_info=True)
768
      used_minors = str(err)
769
    result[constants.NV_DRBDLIST] = used_minors
770

    
771
  if constants.NV_DRBDHELPER in what and vm_capable:
772
    status = True
773
    try:
774
      payload = bdev.BaseDRBD.GetUsermodeHelper()
775
    except errors.BlockDeviceError, err:
776
      logging.error("Can't get DRBD usermode helper: %s", str(err))
777
      status = False
778
      payload = str(err)
779
    result[constants.NV_DRBDHELPER] = (status, payload)
780

    
781
  if constants.NV_NODESETUP in what:
782
    result[constants.NV_NODESETUP] = tmpr = []
783
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
784
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
785
                  " under /sys, missing required directories /sys/block"
786
                  " and /sys/class/net")
787
    if (not os.path.isdir("/proc/sys") or
788
        not os.path.isfile("/proc/sysrq-trigger")):
789
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
790
                  " under /proc, missing required directory /proc/sys and"
791
                  " the file /proc/sysrq-trigger")
792

    
793
  if constants.NV_TIME in what:
794
    result[constants.NV_TIME] = utils.SplitTime(time.time())
795

    
796
  if constants.NV_OSLIST in what and vm_capable:
797
    result[constants.NV_OSLIST] = DiagnoseOS()
798

    
799
  if constants.NV_BRIDGES in what and vm_capable:
800
    result[constants.NV_BRIDGES] = [bridge
801
                                    for bridge in what[constants.NV_BRIDGES]
802
                                    if not utils.BridgeExists(bridge)]
803

    
804
  return result
805

    
806

    
807
def GetBlockDevSizes(devices):
808
  """Return the size of the given block devices
809

810
  @type devices: list
811
  @param devices: list of block device nodes to query
812
  @rtype: dict
813
  @return:
814
    dictionary of all block devices under /dev (key). The value is their
815
    size in MiB.
816

817
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
818

819
  """
820
  DEV_PREFIX = "/dev/"
821
  blockdevs = {}
822

    
823
  for devpath in devices:
824
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
825
      continue
826

    
827
    try:
828
      st = os.stat(devpath)
829
    except EnvironmentError, err:
830
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
831
      continue
832

    
833
    if stat.S_ISBLK(st.st_mode):
834
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
835
      if result.failed:
836
        # We don't want to fail, just do not list this device as available
837
        logging.warning("Cannot get size for block device %s", devpath)
838
        continue
839

    
840
      size = int(result.stdout) / (1024 * 1024)
841
      blockdevs[devpath] = size
842
  return blockdevs
843

    
844

    
845
def GetVolumeList(vg_names):
846
  """Compute list of logical volumes and their size.
847

848
  @type vg_names: list
849
  @param vg_names: the volume groups whose LVs we should list, or
850
      empty for all volume groups
851
  @rtype: dict
852
  @return:
853
      dictionary of all partions (key) with value being a tuple of
854
      their size (in MiB), inactive and online status::
855

856
        {'xenvg/test1': ('20.06', True, True)}
857

858
      in case of errors, a string is returned with the error
859
      details.
860

861
  """
862
  lvs = {}
863
  sep = "|"
864
  if not vg_names:
865
    vg_names = []
866
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
867
                         "--separator=%s" % sep,
868
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
869
  if result.failed:
870
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
871

    
872
  for line in result.stdout.splitlines():
873
    line = line.strip()
874
    match = _LVSLINE_REGEX.match(line)
875
    if not match:
876
      logging.error("Invalid line returned from lvs output: '%s'", line)
877
      continue
878
    vg_name, name, size, attr = match.groups()
879
    inactive = attr[4] == "-"
880
    online = attr[5] == "o"
881
    virtual = attr[0] == "v"
882
    if virtual:
883
      # we don't want to report such volumes as existing, since they
884
      # don't really hold data
885
      continue
886
    lvs[vg_name + "/" + name] = (size, inactive, online)
887

    
888
  return lvs
889

    
890

    
891
def ListVolumeGroups():
892
  """List the volume groups and their size.
893

894
  @rtype: dict
895
  @return: dictionary with keys volume name and values the
896
      size of the volume
897

898
  """
899
  return utils.ListVolumeGroups()
900

    
901

    
902
def NodeVolumes():
903
  """List all volumes on this node.
904

905
  @rtype: list
906
  @return:
907
    A list of dictionaries, each having four keys:
908
      - name: the logical volume name,
909
      - size: the size of the logical volume
910
      - dev: the physical device on which the LV lives
911
      - vg: the volume group to which it belongs
912

913
    In case of errors, we return an empty list and log the
914
    error.
915

916
    Note that since a logical volume can live on multiple physical
917
    volumes, the resulting list might include a logical volume
918
    multiple times.
919

920
  """
921
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
922
                         "--separator=|",
923
                         "--options=lv_name,lv_size,devices,vg_name"])
924
  if result.failed:
925
    _Fail("Failed to list logical volumes, lvs output: %s",
926
          result.output)
927

    
928
  def parse_dev(dev):
929
    return dev.split("(")[0]
930

    
931
  def handle_dev(dev):
932
    return [parse_dev(x) for x in dev.split(",")]
933

    
934
  def map_line(line):
935
    line = [v.strip() for v in line]
936
    return [{"name": line[0], "size": line[1],
937
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
938

    
939
  all_devs = []
940
  for line in result.stdout.splitlines():
941
    if line.count("|") >= 3:
942
      all_devs.extend(map_line(line.split("|")))
943
    else:
944
      logging.warning("Strange line in the output from lvs: '%s'", line)
945
  return all_devs
946

    
947

    
948
def BridgesExist(bridges_list):
949
  """Check if a list of bridges exist on the current node.
950

951
  @rtype: boolean
952
  @return: C{True} if all of them exist, C{False} otherwise
953

954
  """
955
  missing = []
956
  for bridge in bridges_list:
957
    if not utils.BridgeExists(bridge):
958
      missing.append(bridge)
959

    
960
  if missing:
961
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
962

    
963

    
964
def GetInstanceList(hypervisor_list):
965
  """Provides a list of instances.
966

967
  @type hypervisor_list: list
968
  @param hypervisor_list: the list of hypervisors to query information
969

970
  @rtype: list
971
  @return: a list of all running instances on the current node
972
    - instance1.example.com
973
    - instance2.example.com
974

975
  """
976
  results = []
977
  for hname in hypervisor_list:
978
    try:
979
      names = hypervisor.GetHypervisor(hname).ListInstances()
980
      results.extend(names)
981
    except errors.HypervisorError, err:
982
      _Fail("Error enumerating instances (hypervisor %s): %s",
983
            hname, err, exc=True)
984

    
985
  return results
986

    
987

    
988
def GetInstanceInfo(instance, hname):
989
  """Gives back the information about an instance as a dictionary.
990

991
  @type instance: string
992
  @param instance: the instance name
993
  @type hname: string
994
  @param hname: the hypervisor type of the instance
995

996
  @rtype: dict
997
  @return: dictionary with the following keys:
998
      - memory: memory size of instance (int)
999
      - state: xen state of instance (string)
1000
      - time: cpu time of instance (float)
1001
      - vcpus: the number of vcpus (int)
1002

1003
  """
1004
  output = {}
1005

    
1006
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1007
  if iinfo is not None:
1008
    output["memory"] = iinfo[2]
1009
    output["vcpus"] = iinfo[3]
1010
    output["state"] = iinfo[4]
1011
    output["time"] = iinfo[5]
1012

    
1013
  return output
1014

    
1015

    
1016
def GetInstanceMigratable(instance):
1017
  """Gives whether an instance can be migrated.
1018

1019
  @type instance: L{objects.Instance}
1020
  @param instance: object representing the instance to be checked.
1021

1022
  @rtype: tuple
1023
  @return: tuple of (result, description) where:
1024
      - result: whether the instance can be migrated or not
1025
      - description: a description of the issue, if relevant
1026

1027
  """
1028
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029
  iname = instance.name
1030
  if iname not in hyper.ListInstances():
1031
    _Fail("Instance %s is not running", iname)
1032

    
1033
  for idx in range(len(instance.disks)):
1034
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1035
    if not os.path.islink(link_name):
1036
      logging.warning("Instance %s is missing symlink %s for disk %d",
1037
                      iname, link_name, idx)
1038

    
1039

    
1040
def GetAllInstancesInfo(hypervisor_list):
1041
  """Gather data about all instances.
1042

1043
  This is the equivalent of L{GetInstanceInfo}, except that it
1044
  computes data for all instances at once, thus being faster if one
1045
  needs data about more than one instance.
1046

1047
  @type hypervisor_list: list
1048
  @param hypervisor_list: list of hypervisors to query for instance data
1049

1050
  @rtype: dict
1051
  @return: dictionary of instance: data, with data having the following keys:
1052
      - memory: memory size of instance (int)
1053
      - state: xen state of instance (string)
1054
      - time: cpu time of instance (float)
1055
      - vcpus: the number of vcpus
1056

1057
  """
1058
  output = {}
1059

    
1060
  for hname in hypervisor_list:
1061
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1062
    if iinfo:
1063
      for name, _, memory, vcpus, state, times in iinfo:
1064
        value = {
1065
          "memory": memory,
1066
          "vcpus": vcpus,
1067
          "state": state,
1068
          "time": times,
1069
          }
1070
        if name in output:
1071
          # we only check static parameters, like memory and vcpus,
1072
          # and not state and time which can change between the
1073
          # invocations of the different hypervisors
1074
          for key in "memory", "vcpus":
1075
            if value[key] != output[name][key]:
1076
              _Fail("Instance %s is running twice"
1077
                    " with different parameters", name)
1078
        output[name] = value
1079

    
1080
  return output
1081

    
1082

    
1083
def _InstanceLogName(kind, os_name, instance, component):
1084
  """Compute the OS log filename for a given instance and operation.
1085

1086
  The instance name and os name are passed in as strings since not all
1087
  operations have these as part of an instance object.
1088

1089
  @type kind: string
1090
  @param kind: the operation type (e.g. add, import, etc.)
1091
  @type os_name: string
1092
  @param os_name: the os name
1093
  @type instance: string
1094
  @param instance: the name of the instance being imported/added/etc.
1095
  @type component: string or None
1096
  @param component: the name of the component of the instance being
1097
      transferred
1098

1099
  """
1100
  # TODO: Use tempfile.mkstemp to create unique filename
1101
  if component:
1102
    assert "/" not in component
1103
    c_msg = "-%s" % component
1104
  else:
1105
    c_msg = ""
1106
  base = ("%s-%s-%s%s-%s.log" %
1107
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1108
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1109

    
1110

    
1111
def InstanceOsAdd(instance, reinstall, debug):
1112
  """Add an OS to an instance.
1113

1114
  @type instance: L{objects.Instance}
1115
  @param instance: Instance whose OS is to be installed
1116
  @type reinstall: boolean
1117
  @param reinstall: whether this is an instance reinstall
1118
  @type debug: integer
1119
  @param debug: debug level, passed to the OS scripts
1120
  @rtype: None
1121

1122
  """
1123
  inst_os = OSFromDisk(instance.os)
1124

    
1125
  create_env = OSEnvironment(instance, inst_os, debug)
1126
  if reinstall:
1127
    create_env["INSTANCE_REINSTALL"] = "1"
1128

    
1129
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1130

    
1131
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1132
                        cwd=inst_os.path, output=logfile, reset_env=True)
1133
  if result.failed:
1134
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1135
                  " output: %s", result.cmd, result.fail_reason, logfile,
1136
                  result.output)
1137
    lines = [utils.SafeEncode(val)
1138
             for val in utils.TailFile(logfile, lines=20)]
1139
    _Fail("OS create script failed (%s), last lines in the"
1140
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1141

    
1142

    
1143
def RunRenameInstance(instance, old_name, debug):
1144
  """Run the OS rename script for an instance.
1145

1146
  @type instance: L{objects.Instance}
1147
  @param instance: Instance whose OS is to be installed
1148
  @type old_name: string
1149
  @param old_name: previous instance name
1150
  @type debug: integer
1151
  @param debug: debug level, passed to the OS scripts
1152
  @rtype: boolean
1153
  @return: the success of the operation
1154

1155
  """
1156
  inst_os = OSFromDisk(instance.os)
1157

    
1158
  rename_env = OSEnvironment(instance, inst_os, debug)
1159
  rename_env["OLD_INSTANCE_NAME"] = old_name
1160

    
1161
  logfile = _InstanceLogName("rename", instance.os,
1162
                             "%s-%s" % (old_name, instance.name), None)
1163

    
1164
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1165
                        cwd=inst_os.path, output=logfile, reset_env=True)
1166

    
1167
  if result.failed:
1168
    logging.error("os create command '%s' returned error: %s output: %s",
1169
                  result.cmd, result.fail_reason, result.output)
1170
    lines = [utils.SafeEncode(val)
1171
             for val in utils.TailFile(logfile, lines=20)]
1172
    _Fail("OS rename script failed (%s), last lines in the"
1173
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1174

    
1175

    
1176
def _GetBlockDevSymlinkPath(instance_name, idx):
1177
  return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1178
                        (instance_name, constants.DISK_SEPARATOR, idx))
1179

    
1180

    
1181
def _SymlinkBlockDev(instance_name, device_path, idx):
1182
  """Set up symlinks to a instance's block device.
1183

1184
  This is an auxiliary function run when an instance is start (on the primary
1185
  node) or when an instance is migrated (on the target node).
1186

1187

1188
  @param instance_name: the name of the target instance
1189
  @param device_path: path of the physical block device, on the node
1190
  @param idx: the disk index
1191
  @return: absolute path to the disk's symlink
1192

1193
  """
1194
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1195
  try:
1196
    os.symlink(device_path, link_name)
1197
  except OSError, err:
1198
    if err.errno == errno.EEXIST:
1199
      if (not os.path.islink(link_name) or
1200
          os.readlink(link_name) != device_path):
1201
        os.remove(link_name)
1202
        os.symlink(device_path, link_name)
1203
    else:
1204
      raise
1205

    
1206
  return link_name
1207

    
1208

    
1209
def _RemoveBlockDevLinks(instance_name, disks):
1210
  """Remove the block device symlinks belonging to the given instance.
1211

1212
  """
1213
  for idx, _ in enumerate(disks):
1214
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1215
    if os.path.islink(link_name):
1216
      try:
1217
        os.remove(link_name)
1218
      except OSError:
1219
        logging.exception("Can't remove symlink '%s'", link_name)
1220

    
1221

    
1222
def _GatherAndLinkBlockDevs(instance):
1223
  """Set up an instance's block device(s).
1224

1225
  This is run on the primary node at instance startup. The block
1226
  devices must be already assembled.
1227

1228
  @type instance: L{objects.Instance}
1229
  @param instance: the instance whose disks we shoul assemble
1230
  @rtype: list
1231
  @return: list of (disk_object, device_path)
1232

1233
  """
1234
  block_devices = []
1235
  for idx, disk in enumerate(instance.disks):
1236
    device = _RecursiveFindBD(disk)
1237
    if device is None:
1238
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1239
                                    str(disk))
1240
    device.Open()
1241
    try:
1242
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1243
    except OSError, e:
1244
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1245
                                    e.strerror)
1246

    
1247
    block_devices.append((disk, link_name))
1248

    
1249
  return block_devices
1250

    
1251

    
1252
def StartInstance(instance, startup_paused):
1253
  """Start an instance.
1254

1255
  @type instance: L{objects.Instance}
1256
  @param instance: the instance object
1257
  @type startup_paused: bool
1258
  @param instance: pause instance at startup?
1259
  @rtype: None
1260

1261
  """
1262
  running_instances = GetInstanceList([instance.hypervisor])
1263

    
1264
  if instance.name in running_instances:
1265
    logging.info("Instance %s already running, not starting", instance.name)
1266
    return
1267

    
1268
  try:
1269
    block_devices = _GatherAndLinkBlockDevs(instance)
1270
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1271
    hyper.StartInstance(instance, block_devices, startup_paused)
1272
  except errors.BlockDeviceError, err:
1273
    _Fail("Block device error: %s", err, exc=True)
1274
  except errors.HypervisorError, err:
1275
    _RemoveBlockDevLinks(instance.name, instance.disks)
1276
    _Fail("Hypervisor error: %s", err, exc=True)
1277

    
1278

    
1279
def InstanceShutdown(instance, timeout):
1280
  """Shut an instance down.
1281

1282
  @note: this functions uses polling with a hardcoded timeout.
1283

1284
  @type instance: L{objects.Instance}
1285
  @param instance: the instance object
1286
  @type timeout: integer
1287
  @param timeout: maximum timeout for soft shutdown
1288
  @rtype: None
1289

1290
  """
1291
  hv_name = instance.hypervisor
1292
  hyper = hypervisor.GetHypervisor(hv_name)
1293
  iname = instance.name
1294

    
1295
  if instance.name not in hyper.ListInstances():
1296
    logging.info("Instance %s not running, doing nothing", iname)
1297
    return
1298

    
1299
  class _TryShutdown:
1300
    def __init__(self):
1301
      self.tried_once = False
1302

    
1303
    def __call__(self):
1304
      if iname not in hyper.ListInstances():
1305
        return
1306

    
1307
      try:
1308
        hyper.StopInstance(instance, retry=self.tried_once)
1309
      except errors.HypervisorError, err:
1310
        if iname not in hyper.ListInstances():
1311
          # if the instance is no longer existing, consider this a
1312
          # success and go to cleanup
1313
          return
1314

    
1315
        _Fail("Failed to stop instance %s: %s", iname, err)
1316

    
1317
      self.tried_once = True
1318

    
1319
      raise utils.RetryAgain()
1320

    
1321
  try:
1322
    utils.Retry(_TryShutdown(), 5, timeout)
1323
  except utils.RetryTimeout:
1324
    # the shutdown did not succeed
1325
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1326

    
1327
    try:
1328
      hyper.StopInstance(instance, force=True)
1329
    except errors.HypervisorError, err:
1330
      if iname in hyper.ListInstances():
1331
        # only raise an error if the instance still exists, otherwise
1332
        # the error could simply be "instance ... unknown"!
1333
        _Fail("Failed to force stop instance %s: %s", iname, err)
1334

    
1335
    time.sleep(1)
1336

    
1337
    if iname in hyper.ListInstances():
1338
      _Fail("Could not shutdown instance %s even by destroy", iname)
1339

    
1340
  try:
1341
    hyper.CleanupInstance(instance.name)
1342
  except errors.HypervisorError, err:
1343
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1344

    
1345
  _RemoveBlockDevLinks(iname, instance.disks)
1346

    
1347

    
1348
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1349
  """Reboot an instance.
1350

1351
  @type instance: L{objects.Instance}
1352
  @param instance: the instance object to reboot
1353
  @type reboot_type: str
1354
  @param reboot_type: the type of reboot, one the following
1355
    constants:
1356
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1357
        instance OS, do not recreate the VM
1358
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1359
        restart the VM (at the hypervisor level)
1360
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1361
        not accepted here, since that mode is handled differently, in
1362
        cmdlib, and translates into full stop and start of the
1363
        instance (instead of a call_instance_reboot RPC)
1364
  @type shutdown_timeout: integer
1365
  @param shutdown_timeout: maximum timeout for soft shutdown
1366
  @rtype: None
1367

1368
  """
1369
  running_instances = GetInstanceList([instance.hypervisor])
1370

    
1371
  if instance.name not in running_instances:
1372
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1373

    
1374
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1375
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1376
    try:
1377
      hyper.RebootInstance(instance)
1378
    except errors.HypervisorError, err:
1379
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1380
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1381
    try:
1382
      InstanceShutdown(instance, shutdown_timeout)
1383
      return StartInstance(instance, False)
1384
    except errors.HypervisorError, err:
1385
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1386
  else:
1387
    _Fail("Invalid reboot_type received: %s", reboot_type)
1388

    
1389

    
1390
def InstanceBalloonMemory(instance, memory):
1391
  """Resize an instance's memory.
1392

1393
  @type instance: L{objects.Instance}
1394
  @param instance: the instance object
1395
  @type memory: int
1396
  @param memory: new memory amount in MB
1397
  @rtype: None
1398

1399
  """
1400
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1401
  running = hyper.ListInstances()
1402
  if instance.name not in running:
1403
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1404
    return
1405
  try:
1406
    hyper.BalloonInstanceMemory(instance, memory)
1407
  except errors.HypervisorError, err:
1408
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1409

    
1410

    
1411
def MigrationInfo(instance):
1412
  """Gather information about an instance to be migrated.
1413

1414
  @type instance: L{objects.Instance}
1415
  @param instance: the instance definition
1416

1417
  """
1418
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1419
  try:
1420
    info = hyper.MigrationInfo(instance)
1421
  except errors.HypervisorError, err:
1422
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1423
  return info
1424

    
1425

    
1426
def AcceptInstance(instance, info, target):
1427
  """Prepare the node to accept an instance.
1428

1429
  @type instance: L{objects.Instance}
1430
  @param instance: the instance definition
1431
  @type info: string/data (opaque)
1432
  @param info: migration information, from the source node
1433
  @type target: string
1434
  @param target: target host (usually ip), on this node
1435

1436
  """
1437
  # TODO: why is this required only for DTS_EXT_MIRROR?
1438
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1439
    # Create the symlinks, as the disks are not active
1440
    # in any way
1441
    try:
1442
      _GatherAndLinkBlockDevs(instance)
1443
    except errors.BlockDeviceError, err:
1444
      _Fail("Block device error: %s", err, exc=True)
1445

    
1446
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1447
  try:
1448
    hyper.AcceptInstance(instance, info, target)
1449
  except errors.HypervisorError, err:
1450
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1451
      _RemoveBlockDevLinks(instance.name, instance.disks)
1452
    _Fail("Failed to accept instance: %s", err, exc=True)
1453

    
1454

    
1455
def FinalizeMigrationDst(instance, info, success):
1456
  """Finalize any preparation to accept an instance.
1457

1458
  @type instance: L{objects.Instance}
1459
  @param instance: the instance definition
1460
  @type info: string/data (opaque)
1461
  @param info: migration information, from the source node
1462
  @type success: boolean
1463
  @param success: whether the migration was a success or a failure
1464

1465
  """
1466
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1467
  try:
1468
    hyper.FinalizeMigrationDst(instance, info, success)
1469
  except errors.HypervisorError, err:
1470
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1471

    
1472

    
1473
def MigrateInstance(instance, target, live):
1474
  """Migrates an instance to another node.
1475

1476
  @type instance: L{objects.Instance}
1477
  @param instance: the instance definition
1478
  @type target: string
1479
  @param target: the target node name
1480
  @type live: boolean
1481
  @param live: whether the migration should be done live or not (the
1482
      interpretation of this parameter is left to the hypervisor)
1483
  @raise RPCFail: if migration fails for some reason
1484

1485
  """
1486
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1487

    
1488
  try:
1489
    hyper.MigrateInstance(instance, target, live)
1490
  except errors.HypervisorError, err:
1491
    _Fail("Failed to migrate instance: %s", err, exc=True)
1492

    
1493

    
1494
def FinalizeMigrationSource(instance, success, live):
1495
  """Finalize the instance migration on the source node.
1496

1497
  @type instance: L{objects.Instance}
1498
  @param instance: the instance definition of the migrated instance
1499
  @type success: bool
1500
  @param success: whether the migration succeeded or not
1501
  @type live: bool
1502
  @param live: whether the user requested a live migration or not
1503
  @raise RPCFail: If the execution fails for some reason
1504

1505
  """
1506
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1507

    
1508
  try:
1509
    hyper.FinalizeMigrationSource(instance, success, live)
1510
  except Exception, err:  # pylint: disable=W0703
1511
    _Fail("Failed to finalize the migration on the source node: %s", err,
1512
          exc=True)
1513

    
1514

    
1515
def GetMigrationStatus(instance):
1516
  """Get the migration status
1517

1518
  @type instance: L{objects.Instance}
1519
  @param instance: the instance that is being migrated
1520
  @rtype: L{objects.MigrationStatus}
1521
  @return: the status of the current migration (one of
1522
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1523
           progress info that can be retrieved from the hypervisor
1524
  @raise RPCFail: If the migration status cannot be retrieved
1525

1526
  """
1527
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1528
  try:
1529
    return hyper.GetMigrationStatus(instance)
1530
  except Exception, err:  # pylint: disable=W0703
1531
    _Fail("Failed to get migration status: %s", err, exc=True)
1532

    
1533

    
1534
def BlockdevCreate(disk, size, owner, on_primary, info):
1535
  """Creates a block device for an instance.
1536

1537
  @type disk: L{objects.Disk}
1538
  @param disk: the object describing the disk we should create
1539
  @type size: int
1540
  @param size: the size of the physical underlying device, in MiB
1541
  @type owner: str
1542
  @param owner: the name of the instance for which disk is created,
1543
      used for device cache data
1544
  @type on_primary: boolean
1545
  @param on_primary:  indicates if it is the primary node or not
1546
  @type info: string
1547
  @param info: string that will be sent to the physical device
1548
      creation, used for example to set (LVM) tags on LVs
1549

1550
  @return: the new unique_id of the device (this can sometime be
1551
      computed only after creation), or None. On secondary nodes,
1552
      it's not required to return anything.
1553

1554
  """
1555
  # TODO: remove the obsolete "size" argument
1556
  # pylint: disable=W0613
1557
  clist = []
1558
  if disk.children:
1559
    for child in disk.children:
1560
      try:
1561
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1562
      except errors.BlockDeviceError, err:
1563
        _Fail("Can't assemble device %s: %s", child, err)
1564
      if on_primary or disk.AssembleOnSecondary():
1565
        # we need the children open in case the device itself has to
1566
        # be assembled
1567
        try:
1568
          # pylint: disable=E1103
1569
          crdev.Open()
1570
        except errors.BlockDeviceError, err:
1571
          _Fail("Can't make child '%s' read-write: %s", child, err)
1572
      clist.append(crdev)
1573

    
1574
  try:
1575
    device = bdev.Create(disk, clist)
1576
  except errors.BlockDeviceError, err:
1577
    _Fail("Can't create block device: %s", err)
1578

    
1579
  if on_primary or disk.AssembleOnSecondary():
1580
    try:
1581
      device.Assemble()
1582
    except errors.BlockDeviceError, err:
1583
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1584
    if on_primary or disk.OpenOnSecondary():
1585
      try:
1586
        device.Open(force=True)
1587
      except errors.BlockDeviceError, err:
1588
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1589
    DevCacheManager.UpdateCache(device.dev_path, owner,
1590
                                on_primary, disk.iv_name)
1591

    
1592
  device.SetInfo(info)
1593

    
1594
  return device.unique_id
1595

    
1596

    
1597
def _WipeDevice(path, offset, size):
1598
  """This function actually wipes the device.
1599

1600
  @param path: The path to the device to wipe
1601
  @param offset: The offset in MiB in the file
1602
  @param size: The size in MiB to write
1603

1604
  """
1605
  # Internal sizes are always in Mebibytes; if the following "dd" command
1606
  # should use a different block size the offset and size given to this
1607
  # function must be adjusted accordingly before being passed to "dd".
1608
  block_size = 1024 * 1024
1609

    
1610
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1611
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1612
         "count=%d" % size]
1613
  result = utils.RunCmd(cmd)
1614

    
1615
  if result.failed:
1616
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1617
          result.fail_reason, result.output)
1618

    
1619

    
1620
def BlockdevWipe(disk, offset, size):
1621
  """Wipes a block device.
1622

1623
  @type disk: L{objects.Disk}
1624
  @param disk: the disk object we want to wipe
1625
  @type offset: int
1626
  @param offset: The offset in MiB in the file
1627
  @type size: int
1628
  @param size: The size in MiB to write
1629

1630
  """
1631
  try:
1632
    rdev = _RecursiveFindBD(disk)
1633
  except errors.BlockDeviceError:
1634
    rdev = None
1635

    
1636
  if not rdev:
1637
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1638

    
1639
  # Do cross verify some of the parameters
1640
  if offset < 0:
1641
    _Fail("Negative offset")
1642
  if size < 0:
1643
    _Fail("Negative size")
1644
  if offset > rdev.size:
1645
    _Fail("Offset is bigger than device size")
1646
  if (offset + size) > rdev.size:
1647
    _Fail("The provided offset and size to wipe is bigger than device size")
1648

    
1649
  _WipeDevice(rdev.dev_path, offset, size)
1650

    
1651

    
1652
def BlockdevPauseResumeSync(disks, pause):
1653
  """Pause or resume the sync of the block device.
1654

1655
  @type disks: list of L{objects.Disk}
1656
  @param disks: the disks object we want to pause/resume
1657
  @type pause: bool
1658
  @param pause: Wheater to pause or resume
1659

1660
  """
1661
  success = []
1662
  for disk in disks:
1663
    try:
1664
      rdev = _RecursiveFindBD(disk)
1665
    except errors.BlockDeviceError:
1666
      rdev = None
1667

    
1668
    if not rdev:
1669
      success.append((False, ("Cannot change sync for device %s:"
1670
                              " device not found" % disk.iv_name)))
1671
      continue
1672

    
1673
    result = rdev.PauseResumeSync(pause)
1674

    
1675
    if result:
1676
      success.append((result, None))
1677
    else:
1678
      if pause:
1679
        msg = "Pause"
1680
      else:
1681
        msg = "Resume"
1682
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1683

    
1684
  return success
1685

    
1686

    
1687
def BlockdevRemove(disk):
1688
  """Remove a block device.
1689

1690
  @note: This is intended to be called recursively.
1691

1692
  @type disk: L{objects.Disk}
1693
  @param disk: the disk object we should remove
1694
  @rtype: boolean
1695
  @return: the success of the operation
1696

1697
  """
1698
  msgs = []
1699
  try:
1700
    rdev = _RecursiveFindBD(disk)
1701
  except errors.BlockDeviceError, err:
1702
    # probably can't attach
1703
    logging.info("Can't attach to device %s in remove", disk)
1704
    rdev = None
1705
  if rdev is not None:
1706
    r_path = rdev.dev_path
1707
    try:
1708
      rdev.Remove()
1709
    except errors.BlockDeviceError, err:
1710
      msgs.append(str(err))
1711
    if not msgs:
1712
      DevCacheManager.RemoveCache(r_path)
1713

    
1714
  if disk.children:
1715
    for child in disk.children:
1716
      try:
1717
        BlockdevRemove(child)
1718
      except RPCFail, err:
1719
        msgs.append(str(err))
1720

    
1721
  if msgs:
1722
    _Fail("; ".join(msgs))
1723

    
1724

    
1725
def _RecursiveAssembleBD(disk, owner, as_primary):
1726
  """Activate a block device for an instance.
1727

1728
  This is run on the primary and secondary nodes for an instance.
1729

1730
  @note: this function is called recursively.
1731

1732
  @type disk: L{objects.Disk}
1733
  @param disk: the disk we try to assemble
1734
  @type owner: str
1735
  @param owner: the name of the instance which owns the disk
1736
  @type as_primary: boolean
1737
  @param as_primary: if we should make the block device
1738
      read/write
1739

1740
  @return: the assembled device or None (in case no device
1741
      was assembled)
1742
  @raise errors.BlockDeviceError: in case there is an error
1743
      during the activation of the children or the device
1744
      itself
1745

1746
  """
1747
  children = []
1748
  if disk.children:
1749
    mcn = disk.ChildrenNeeded()
1750
    if mcn == -1:
1751
      mcn = 0 # max number of Nones allowed
1752
    else:
1753
      mcn = len(disk.children) - mcn # max number of Nones
1754
    for chld_disk in disk.children:
1755
      try:
1756
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1757
      except errors.BlockDeviceError, err:
1758
        if children.count(None) >= mcn:
1759
          raise
1760
        cdev = None
1761
        logging.error("Error in child activation (but continuing): %s",
1762
                      str(err))
1763
      children.append(cdev)
1764

    
1765
  if as_primary or disk.AssembleOnSecondary():
1766
    r_dev = bdev.Assemble(disk, children)
1767
    result = r_dev
1768
    if as_primary or disk.OpenOnSecondary():
1769
      r_dev.Open()
1770
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1771
                                as_primary, disk.iv_name)
1772

    
1773
  else:
1774
    result = True
1775
  return result
1776

    
1777

    
1778
def BlockdevAssemble(disk, owner, as_primary, idx):
1779
  """Activate a block device for an instance.
1780

1781
  This is a wrapper over _RecursiveAssembleBD.
1782

1783
  @rtype: str or boolean
1784
  @return: a C{/dev/...} path for primary nodes, and
1785
      C{True} for secondary nodes
1786

1787
  """
1788
  try:
1789
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1790
    if isinstance(result, bdev.BlockDev):
1791
      # pylint: disable=E1103
1792
      result = result.dev_path
1793
      if as_primary:
1794
        _SymlinkBlockDev(owner, result, idx)
1795
  except errors.BlockDeviceError, err:
1796
    _Fail("Error while assembling disk: %s", err, exc=True)
1797
  except OSError, err:
1798
    _Fail("Error while symlinking disk: %s", err, exc=True)
1799

    
1800
  return result
1801

    
1802

    
1803
def BlockdevShutdown(disk):
1804
  """Shut down a block device.
1805

1806
  First, if the device is assembled (Attach() is successful), then
1807
  the device is shutdown. Then the children of the device are
1808
  shutdown.
1809

1810
  This function is called recursively. Note that we don't cache the
1811
  children or such, as oppossed to assemble, shutdown of different
1812
  devices doesn't require that the upper device was active.
1813

1814
  @type disk: L{objects.Disk}
1815
  @param disk: the description of the disk we should
1816
      shutdown
1817
  @rtype: None
1818

1819
  """
1820
  msgs = []
1821
  r_dev = _RecursiveFindBD(disk)
1822
  if r_dev is not None:
1823
    r_path = r_dev.dev_path
1824
    try:
1825
      r_dev.Shutdown()
1826
      DevCacheManager.RemoveCache(r_path)
1827
    except errors.BlockDeviceError, err:
1828
      msgs.append(str(err))
1829

    
1830
  if disk.children:
1831
    for child in disk.children:
1832
      try:
1833
        BlockdevShutdown(child)
1834
      except RPCFail, err:
1835
        msgs.append(str(err))
1836

    
1837
  if msgs:
1838
    _Fail("; ".join(msgs))
1839

    
1840

    
1841
def BlockdevAddchildren(parent_cdev, new_cdevs):
1842
  """Extend a mirrored block device.
1843

1844
  @type parent_cdev: L{objects.Disk}
1845
  @param parent_cdev: the disk to which we should add children
1846
  @type new_cdevs: list of L{objects.Disk}
1847
  @param new_cdevs: the list of children which we should add
1848
  @rtype: None
1849

1850
  """
1851
  parent_bdev = _RecursiveFindBD(parent_cdev)
1852
  if parent_bdev is None:
1853
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1854
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1855
  if new_bdevs.count(None) > 0:
1856
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1857
  parent_bdev.AddChildren(new_bdevs)
1858

    
1859

    
1860
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1861
  """Shrink a mirrored block device.
1862

1863
  @type parent_cdev: L{objects.Disk}
1864
  @param parent_cdev: the disk from which we should remove children
1865
  @type new_cdevs: list of L{objects.Disk}
1866
  @param new_cdevs: the list of children which we should remove
1867
  @rtype: None
1868

1869
  """
1870
  parent_bdev = _RecursiveFindBD(parent_cdev)
1871
  if parent_bdev is None:
1872
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1873
  devs = []
1874
  for disk in new_cdevs:
1875
    rpath = disk.StaticDevPath()
1876
    if rpath is None:
1877
      bd = _RecursiveFindBD(disk)
1878
      if bd is None:
1879
        _Fail("Can't find device %s while removing children", disk)
1880
      else:
1881
        devs.append(bd.dev_path)
1882
    else:
1883
      if not utils.IsNormAbsPath(rpath):
1884
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1885
      devs.append(rpath)
1886
  parent_bdev.RemoveChildren(devs)
1887

    
1888

    
1889
def BlockdevGetmirrorstatus(disks):
1890
  """Get the mirroring status of a list of devices.
1891

1892
  @type disks: list of L{objects.Disk}
1893
  @param disks: the list of disks which we should query
1894
  @rtype: disk
1895
  @return: List of L{objects.BlockDevStatus}, one for each disk
1896
  @raise errors.BlockDeviceError: if any of the disks cannot be
1897
      found
1898

1899
  """
1900
  stats = []
1901
  for dsk in disks:
1902
    rbd = _RecursiveFindBD(dsk)
1903
    if rbd is None:
1904
      _Fail("Can't find device %s", dsk)
1905

    
1906
    stats.append(rbd.CombinedSyncStatus())
1907

    
1908
  return stats
1909

    
1910

    
1911
def BlockdevGetmirrorstatusMulti(disks):
1912
  """Get the mirroring status of a list of devices.
1913

1914
  @type disks: list of L{objects.Disk}
1915
  @param disks: the list of disks which we should query
1916
  @rtype: disk
1917
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1918
    success/failure, status is L{objects.BlockDevStatus} on success, string
1919
    otherwise
1920

1921
  """
1922
  result = []
1923
  for disk in disks:
1924
    try:
1925
      rbd = _RecursiveFindBD(disk)
1926
      if rbd is None:
1927
        result.append((False, "Can't find device %s" % disk))
1928
        continue
1929

    
1930
      status = rbd.CombinedSyncStatus()
1931
    except errors.BlockDeviceError, err:
1932
      logging.exception("Error while getting disk status")
1933
      result.append((False, str(err)))
1934
    else:
1935
      result.append((True, status))
1936

    
1937
  assert len(disks) == len(result)
1938

    
1939
  return result
1940

    
1941

    
1942
def _RecursiveFindBD(disk):
1943
  """Check if a device is activated.
1944

1945
  If so, return information about the real device.
1946

1947
  @type disk: L{objects.Disk}
1948
  @param disk: the disk object we need to find
1949

1950
  @return: None if the device can't be found,
1951
      otherwise the device instance
1952

1953
  """
1954
  children = []
1955
  if disk.children:
1956
    for chdisk in disk.children:
1957
      children.append(_RecursiveFindBD(chdisk))
1958

    
1959
  return bdev.FindDevice(disk, children)
1960

    
1961

    
1962
def _OpenRealBD(disk):
1963
  """Opens the underlying block device of a disk.
1964

1965
  @type disk: L{objects.Disk}
1966
  @param disk: the disk object we want to open
1967

1968
  """
1969
  real_disk = _RecursiveFindBD(disk)
1970
  if real_disk is None:
1971
    _Fail("Block device '%s' is not set up", disk)
1972

    
1973
  real_disk.Open()
1974

    
1975
  return real_disk
1976

    
1977

    
1978
def BlockdevFind(disk):
1979
  """Check if a device is activated.
1980

1981
  If it is, return information about the real device.
1982

1983
  @type disk: L{objects.Disk}
1984
  @param disk: the disk to find
1985
  @rtype: None or objects.BlockDevStatus
1986
  @return: None if the disk cannot be found, otherwise a the current
1987
           information
1988

1989
  """
1990
  try:
1991
    rbd = _RecursiveFindBD(disk)
1992
  except errors.BlockDeviceError, err:
1993
    _Fail("Failed to find device: %s", err, exc=True)
1994

    
1995
  if rbd is None:
1996
    return None
1997

    
1998
  return rbd.GetSyncStatus()
1999

    
2000

    
2001
def BlockdevGetsize(disks):
2002
  """Computes the size of the given disks.
2003

2004
  If a disk is not found, returns None instead.
2005

2006
  @type disks: list of L{objects.Disk}
2007
  @param disks: the list of disk to compute the size for
2008
  @rtype: list
2009
  @return: list with elements None if the disk cannot be found,
2010
      otherwise the size
2011

2012
  """
2013
  result = []
2014
  for cf in disks:
2015
    try:
2016
      rbd = _RecursiveFindBD(cf)
2017
    except errors.BlockDeviceError:
2018
      result.append(None)
2019
      continue
2020
    if rbd is None:
2021
      result.append(None)
2022
    else:
2023
      result.append(rbd.GetActualSize())
2024
  return result
2025

    
2026

    
2027
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2028
  """Export a block device to a remote node.
2029

2030
  @type disk: L{objects.Disk}
2031
  @param disk: the description of the disk to export
2032
  @type dest_node: str
2033
  @param dest_node: the destination node to export to
2034
  @type dest_path: str
2035
  @param dest_path: the destination path on the target node
2036
  @type cluster_name: str
2037
  @param cluster_name: the cluster name, needed for SSH hostalias
2038
  @rtype: None
2039

2040
  """
2041
  real_disk = _OpenRealBD(disk)
2042

    
2043
  # the block size on the read dd is 1MiB to match our units
2044
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2045
                               "dd if=%s bs=1048576 count=%s",
2046
                               real_disk.dev_path, str(disk.size))
2047

    
2048
  # we set here a smaller block size as, due to ssh buffering, more
2049
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2050
  # device is not already there or we pass a wrong path; we use
2051
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2052
  # to not buffer too much memory; this means that at best, we flush
2053
  # every 64k, which will not be very fast
2054
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2055
                                " oflag=dsync", dest_path)
2056

    
2057
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2058
                                                   constants.SSH_LOGIN_USER,
2059
                                                   destcmd)
2060

    
2061
  # all commands have been checked, so we're safe to combine them
2062
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2063

    
2064
  result = utils.RunCmd(["bash", "-c", command])
2065

    
2066
  if result.failed:
2067
    _Fail("Disk copy command '%s' returned error: %s"
2068
          " output: %s", command, result.fail_reason, result.output)
2069

    
2070

    
2071
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2072
  """Write a file to the filesystem.
2073

2074
  This allows the master to overwrite(!) a file. It will only perform
2075
  the operation if the file belongs to a list of configuration files.
2076

2077
  @type file_name: str
2078
  @param file_name: the target file name
2079
  @type data: str
2080
  @param data: the new contents of the file
2081
  @type mode: int
2082
  @param mode: the mode to give the file (can be None)
2083
  @type uid: string
2084
  @param uid: the owner of the file
2085
  @type gid: string
2086
  @param gid: the group of the file
2087
  @type atime: float
2088
  @param atime: the atime to set on the file (can be None)
2089
  @type mtime: float
2090
  @param mtime: the mtime to set on the file (can be None)
2091
  @rtype: None
2092

2093
  """
2094
  file_name = vcluster.LocalizeVirtualPath(file_name)
2095

    
2096
  if not os.path.isabs(file_name):
2097
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2098

    
2099
  if file_name not in _ALLOWED_UPLOAD_FILES:
2100
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2101
          file_name)
2102

    
2103
  raw_data = _Decompress(data)
2104

    
2105
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2106
    _Fail("Invalid username/groupname type")
2107

    
2108
  getents = runtime.GetEnts()
2109
  uid = getents.LookupUser(uid)
2110
  gid = getents.LookupGroup(gid)
2111

    
2112
  utils.SafeWriteFile(file_name, None,
2113
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2114
                      atime=atime, mtime=mtime)
2115

    
2116

    
2117
def RunOob(oob_program, command, node, timeout):
2118
  """Executes oob_program with given command on given node.
2119

2120
  @param oob_program: The path to the executable oob_program
2121
  @param command: The command to invoke on oob_program
2122
  @param node: The node given as an argument to the program
2123
  @param timeout: Timeout after which we kill the oob program
2124

2125
  @return: stdout
2126
  @raise RPCFail: If execution fails for some reason
2127

2128
  """
2129
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2130

    
2131
  if result.failed:
2132
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2133
          result.fail_reason, result.output)
2134

    
2135
  return result.stdout
2136

    
2137

    
2138
def _OSOndiskAPIVersion(os_dir):
2139
  """Compute and return the API version of a given OS.
2140

2141
  This function will try to read the API version of the OS residing in
2142
  the 'os_dir' directory.
2143

2144
  @type os_dir: str
2145
  @param os_dir: the directory in which we should look for the OS
2146
  @rtype: tuple
2147
  @return: tuple (status, data) with status denoting the validity and
2148
      data holding either the vaid versions or an error message
2149

2150
  """
2151
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2152

    
2153
  try:
2154
    st = os.stat(api_file)
2155
  except EnvironmentError, err:
2156
    return False, ("Required file '%s' not found under path %s: %s" %
2157
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2158

    
2159
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2160
    return False, ("File '%s' in %s is not a regular file" %
2161
                   (constants.OS_API_FILE, os_dir))
2162

    
2163
  try:
2164
    api_versions = utils.ReadFile(api_file).splitlines()
2165
  except EnvironmentError, err:
2166
    return False, ("Error while reading the API version file at %s: %s" %
2167
                   (api_file, utils.ErrnoOrStr(err)))
2168

    
2169
  try:
2170
    api_versions = [int(version.strip()) for version in api_versions]
2171
  except (TypeError, ValueError), err:
2172
    return False, ("API version(s) can't be converted to integer: %s" %
2173
                   str(err))
2174

    
2175
  return True, api_versions
2176

    
2177

    
2178
def DiagnoseOS(top_dirs=None):
2179
  """Compute the validity for all OSes.
2180

2181
  @type top_dirs: list
2182
  @param top_dirs: the list of directories in which to
2183
      search (if not given defaults to
2184
      L{pathutils.OS_SEARCH_PATH})
2185
  @rtype: list of L{objects.OS}
2186
  @return: a list of tuples (name, path, status, diagnose, variants,
2187
      parameters, api_version) for all (potential) OSes under all
2188
      search paths, where:
2189
          - name is the (potential) OS name
2190
          - path is the full path to the OS
2191
          - status True/False is the validity of the OS
2192
          - diagnose is the error message for an invalid OS, otherwise empty
2193
          - variants is a list of supported OS variants, if any
2194
          - parameters is a list of (name, help) parameters, if any
2195
          - api_version is a list of support OS API versions
2196

2197
  """
2198
  if top_dirs is None:
2199
    top_dirs = pathutils.OS_SEARCH_PATH
2200

    
2201
  result = []
2202
  for dir_name in top_dirs:
2203
    if os.path.isdir(dir_name):
2204
      try:
2205
        f_names = utils.ListVisibleFiles(dir_name)
2206
      except EnvironmentError, err:
2207
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2208
        break
2209
      for name in f_names:
2210
        os_path = utils.PathJoin(dir_name, name)
2211
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2212
        if status:
2213
          diagnose = ""
2214
          variants = os_inst.supported_variants
2215
          parameters = os_inst.supported_parameters
2216
          api_versions = os_inst.api_versions
2217
        else:
2218
          diagnose = os_inst
2219
          variants = parameters = api_versions = []
2220
        result.append((name, os_path, status, diagnose, variants,
2221
                       parameters, api_versions))
2222

    
2223
  return result
2224

    
2225

    
2226
def _TryOSFromDisk(name, base_dir=None):
2227
  """Create an OS instance from disk.
2228

2229
  This function will return an OS instance if the given name is a
2230
  valid OS name.
2231

2232
  @type base_dir: string
2233
  @keyword base_dir: Base directory containing OS installations.
2234
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2235
  @rtype: tuple
2236
  @return: success and either the OS instance if we find a valid one,
2237
      or error message
2238

2239
  """
2240
  if base_dir is None:
2241
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2242
  else:
2243
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2244

    
2245
  if os_dir is None:
2246
    return False, "Directory for OS %s not found in search path" % name
2247

    
2248
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2249
  if not status:
2250
    # push the error up
2251
    return status, api_versions
2252

    
2253
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2254
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2255
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2256

    
2257
  # OS Files dictionary, we will populate it with the absolute path
2258
  # names; if the value is True, then it is a required file, otherwise
2259
  # an optional one
2260
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2261

    
2262
  if max(api_versions) >= constants.OS_API_V15:
2263
    os_files[constants.OS_VARIANTS_FILE] = False
2264

    
2265
  if max(api_versions) >= constants.OS_API_V20:
2266
    os_files[constants.OS_PARAMETERS_FILE] = True
2267
  else:
2268
    del os_files[constants.OS_SCRIPT_VERIFY]
2269

    
2270
  for (filename, required) in os_files.items():
2271
    os_files[filename] = utils.PathJoin(os_dir, filename)
2272

    
2273
    try:
2274
      st = os.stat(os_files[filename])
2275
    except EnvironmentError, err:
2276
      if err.errno == errno.ENOENT and not required:
2277
        del os_files[filename]
2278
        continue
2279
      return False, ("File '%s' under path '%s' is missing (%s)" %
2280
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2281

    
2282
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2283
      return False, ("File '%s' under path '%s' is not a regular file" %
2284
                     (filename, os_dir))
2285

    
2286
    if filename in constants.OS_SCRIPTS:
2287
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2288
        return False, ("File '%s' under path '%s' is not executable" %
2289
                       (filename, os_dir))
2290

    
2291
  variants = []
2292
  if constants.OS_VARIANTS_FILE in os_files:
2293
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2294
    try:
2295
      variants = \
2296
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2297
    except EnvironmentError, err:
2298
      # we accept missing files, but not other errors
2299
      if err.errno != errno.ENOENT:
2300
        return False, ("Error while reading the OS variants file at %s: %s" %
2301
                       (variants_file, utils.ErrnoOrStr(err)))
2302

    
2303
  parameters = []
2304
  if constants.OS_PARAMETERS_FILE in os_files:
2305
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2306
    try:
2307
      parameters = utils.ReadFile(parameters_file).splitlines()
2308
    except EnvironmentError, err:
2309
      return False, ("Error while reading the OS parameters file at %s: %s" %
2310
                     (parameters_file, utils.ErrnoOrStr(err)))
2311
    parameters = [v.split(None, 1) for v in parameters]
2312

    
2313
  os_obj = objects.OS(name=name, path=os_dir,
2314
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2315
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2316
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2317
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2318
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2319
                                                 None),
2320
                      supported_variants=variants,
2321
                      supported_parameters=parameters,
2322
                      api_versions=api_versions)
2323
  return True, os_obj
2324

    
2325

    
2326
def OSFromDisk(name, base_dir=None):
2327
  """Create an OS instance from disk.
2328

2329
  This function will return an OS instance if the given name is a
2330
  valid OS name. Otherwise, it will raise an appropriate
2331
  L{RPCFail} exception, detailing why this is not a valid OS.
2332

2333
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2334
  an exception but returns true/false status data.
2335

2336
  @type base_dir: string
2337
  @keyword base_dir: Base directory containing OS installations.
2338
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2339
  @rtype: L{objects.OS}
2340
  @return: the OS instance if we find a valid one
2341
  @raise RPCFail: if we don't find a valid OS
2342

2343
  """
2344
  name_only = objects.OS.GetName(name)
2345
  status, payload = _TryOSFromDisk(name_only, base_dir)
2346

    
2347
  if not status:
2348
    _Fail(payload)
2349

    
2350
  return payload
2351

    
2352

    
2353
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2354
  """Calculate the basic environment for an os script.
2355

2356
  @type os_name: str
2357
  @param os_name: full operating system name (including variant)
2358
  @type inst_os: L{objects.OS}
2359
  @param inst_os: operating system for which the environment is being built
2360
  @type os_params: dict
2361
  @param os_params: the OS parameters
2362
  @type debug: integer
2363
  @param debug: debug level (0 or 1, for OS Api 10)
2364
  @rtype: dict
2365
  @return: dict of environment variables
2366
  @raise errors.BlockDeviceError: if the block device
2367
      cannot be found
2368

2369
  """
2370
  result = {}
2371
  api_version = \
2372
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2373
  result["OS_API_VERSION"] = "%d" % api_version
2374
  result["OS_NAME"] = inst_os.name
2375
  result["DEBUG_LEVEL"] = "%d" % debug
2376

    
2377
  # OS variants
2378
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2379
    variant = objects.OS.GetVariant(os_name)
2380
    if not variant:
2381
      variant = inst_os.supported_variants[0]
2382
  else:
2383
    variant = ""
2384
  result["OS_VARIANT"] = variant
2385

    
2386
  # OS params
2387
  for pname, pvalue in os_params.items():
2388
    result["OSP_%s" % pname.upper()] = pvalue
2389

    
2390
  # Set a default path otherwise programs called by OS scripts (or
2391
  # even hooks called from OS scripts) might break, and we don't want
2392
  # to have each script require setting a PATH variable
2393
  result["PATH"] = constants.HOOKS_PATH
2394

    
2395
  return result
2396

    
2397

    
2398
def OSEnvironment(instance, inst_os, debug=0):
2399
  """Calculate the environment for an os script.
2400

2401
  @type instance: L{objects.Instance}
2402
  @param instance: target instance for the os script run
2403
  @type inst_os: L{objects.OS}
2404
  @param inst_os: operating system for which the environment is being built
2405
  @type debug: integer
2406
  @param debug: debug level (0 or 1, for OS Api 10)
2407
  @rtype: dict
2408
  @return: dict of environment variables
2409
  @raise errors.BlockDeviceError: if the block device
2410
      cannot be found
2411

2412
  """
2413
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2414

    
2415
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2416
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2417

    
2418
  result["HYPERVISOR"] = instance.hypervisor
2419
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2420
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2421
  result["INSTANCE_SECONDARY_NODES"] = \
2422
      ("%s" % " ".join(instance.secondary_nodes))
2423

    
2424
  # Disks
2425
  for idx, disk in enumerate(instance.disks):
2426
    real_disk = _OpenRealBD(disk)
2427
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2428
    result["DISK_%d_ACCESS" % idx] = disk.mode
2429
    if constants.HV_DISK_TYPE in instance.hvparams:
2430
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2431
        instance.hvparams[constants.HV_DISK_TYPE]
2432
    if disk.dev_type in constants.LDS_BLOCK:
2433
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2434
    elif disk.dev_type == constants.LD_FILE:
2435
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2436
        "file:%s" % disk.physical_id[0]
2437

    
2438
  # NICs
2439
  for idx, nic in enumerate(instance.nics):
2440
    result["NIC_%d_MAC" % idx] = nic.mac
2441
    if nic.ip:
2442
      result["NIC_%d_IP" % idx] = nic.ip
2443
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2444
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2445
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2446
    if nic.nicparams[constants.NIC_LINK]:
2447
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2448
    if constants.HV_NIC_TYPE in instance.hvparams:
2449
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2450
        instance.hvparams[constants.HV_NIC_TYPE]
2451

    
2452
  # HV/BE params
2453
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2454
    for key, value in source.items():
2455
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2456

    
2457
  return result
2458

    
2459

    
2460
def BlockdevGrow(disk, amount, dryrun, backingstore):
2461
  """Grow a stack of block devices.
2462

2463
  This function is called recursively, with the childrens being the
2464
  first ones to resize.
2465

2466
  @type disk: L{objects.Disk}
2467
  @param disk: the disk to be grown
2468
  @type amount: integer
2469
  @param amount: the amount (in mebibytes) to grow with
2470
  @type dryrun: boolean
2471
  @param dryrun: whether to execute the operation in simulation mode
2472
      only, without actually increasing the size
2473
  @param backingstore: whether to execute the operation on backing storage
2474
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2475
      whereas LVM, file, RBD are backing storage
2476
  @rtype: (status, result)
2477
  @return: a tuple with the status of the operation (True/False), and
2478
      the errors message if status is False
2479

2480
  """
2481
  r_dev = _RecursiveFindBD(disk)
2482
  if r_dev is None:
2483
    _Fail("Cannot find block device %s", disk)
2484

    
2485
  try:
2486
    r_dev.Grow(amount, dryrun, backingstore)
2487
  except errors.BlockDeviceError, err:
2488
    _Fail("Failed to grow block device: %s", err, exc=True)
2489

    
2490

    
2491
def BlockdevSnapshot(disk):
2492
  """Create a snapshot copy of a block device.
2493

2494
  This function is called recursively, and the snapshot is actually created
2495
  just for the leaf lvm backend device.
2496

2497
  @type disk: L{objects.Disk}
2498
  @param disk: the disk to be snapshotted
2499
  @rtype: string
2500
  @return: snapshot disk ID as (vg, lv)
2501

2502
  """
2503
  if disk.dev_type == constants.LD_DRBD8:
2504
    if not disk.children:
2505
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2506
            disk.unique_id)
2507
    return BlockdevSnapshot(disk.children[0])
2508
  elif disk.dev_type == constants.LD_LV:
2509
    r_dev = _RecursiveFindBD(disk)
2510
    if r_dev is not None:
2511
      # FIXME: choose a saner value for the snapshot size
2512
      # let's stay on the safe side and ask for the full size, for now
2513
      return r_dev.Snapshot(disk.size)
2514
    else:
2515
      _Fail("Cannot find block device %s", disk)
2516
  else:
2517
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2518
          disk.unique_id, disk.dev_type)
2519

    
2520

    
2521
def BlockdevSetInfo(disk, info):
2522
  """Sets 'metadata' information on block devices.
2523

2524
  This function sets 'info' metadata on block devices. Initial
2525
  information is set at device creation; this function should be used
2526
  for example after renames.
2527

2528
  @type disk: L{objects.Disk}
2529
  @param disk: the disk to be grown
2530
  @type info: string
2531
  @param info: new 'info' metadata
2532
  @rtype: (status, result)
2533
  @return: a tuple with the status of the operation (True/False), and
2534
      the errors message if status is False
2535

2536
  """
2537
  r_dev = _RecursiveFindBD(disk)
2538
  if r_dev is None:
2539
    _Fail("Cannot find block device %s", disk)
2540

    
2541
  try:
2542
    r_dev.SetInfo(info)
2543
  except errors.BlockDeviceError, err:
2544
    _Fail("Failed to set information on block device: %s", err, exc=True)
2545

    
2546

    
2547
def FinalizeExport(instance, snap_disks):
2548
  """Write out the export configuration information.
2549

2550
  @type instance: L{objects.Instance}
2551
  @param instance: the instance which we export, used for
2552
      saving configuration
2553
  @type snap_disks: list of L{objects.Disk}
2554
  @param snap_disks: list of snapshot block devices, which
2555
      will be used to get the actual name of the dump file
2556

2557
  @rtype: None
2558

2559
  """
2560
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2561
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2562

    
2563
  config = objects.SerializableConfigParser()
2564

    
2565
  config.add_section(constants.INISECT_EXP)
2566
  config.set(constants.INISECT_EXP, "version", "0")
2567
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2568
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2569
  config.set(constants.INISECT_EXP, "os", instance.os)
2570
  config.set(constants.INISECT_EXP, "compression", "none")
2571

    
2572
  config.add_section(constants.INISECT_INS)
2573
  config.set(constants.INISECT_INS, "name", instance.name)
2574
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2575
             instance.beparams[constants.BE_MAXMEM])
2576
  config.set(constants.INISECT_INS, "minmem", "%d" %
2577
             instance.beparams[constants.BE_MINMEM])
2578
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2579
  config.set(constants.INISECT_INS, "memory", "%d" %
2580
             instance.beparams[constants.BE_MAXMEM])
2581
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2582
             instance.beparams[constants.BE_VCPUS])
2583
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2584
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2585
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2586

    
2587
  nic_total = 0
2588
  for nic_count, nic in enumerate(instance.nics):
2589
    nic_total += 1
2590
    config.set(constants.INISECT_INS, "nic%d_mac" %
2591
               nic_count, "%s" % nic.mac)
2592
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2593
    for param in constants.NICS_PARAMETER_TYPES:
2594
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2595
                 "%s" % nic.nicparams.get(param, None))
2596
  # TODO: redundant: on load can read nics until it doesn't exist
2597
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2598

    
2599
  disk_total = 0
2600
  for disk_count, disk in enumerate(snap_disks):
2601
    if disk:
2602
      disk_total += 1
2603
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2604
                 ("%s" % disk.iv_name))
2605
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2606
                 ("%s" % disk.physical_id[1]))
2607
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2608
                 ("%d" % disk.size))
2609

    
2610
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2611

    
2612
  # New-style hypervisor/backend parameters
2613

    
2614
  config.add_section(constants.INISECT_HYP)
2615
  for name, value in instance.hvparams.items():
2616
    if name not in constants.HVC_GLOBALS:
2617
      config.set(constants.INISECT_HYP, name, str(value))
2618

    
2619
  config.add_section(constants.INISECT_BEP)
2620
  for name, value in instance.beparams.items():
2621
    config.set(constants.INISECT_BEP, name, str(value))
2622

    
2623
  config.add_section(constants.INISECT_OSP)
2624
  for name, value in instance.osparams.items():
2625
    config.set(constants.INISECT_OSP, name, str(value))
2626

    
2627
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2628
                  data=config.Dumps())
2629
  shutil.rmtree(finaldestdir, ignore_errors=True)
2630
  shutil.move(destdir, finaldestdir)
2631

    
2632

    
2633
def ExportInfo(dest):
2634
  """Get export configuration information.
2635

2636
  @type dest: str
2637
  @param dest: directory containing the export
2638

2639
  @rtype: L{objects.SerializableConfigParser}
2640
  @return: a serializable config file containing the
2641
      export info
2642

2643
  """
2644
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2645

    
2646
  config = objects.SerializableConfigParser()
2647
  config.read(cff)
2648

    
2649
  if (not config.has_section(constants.INISECT_EXP) or
2650
      not config.has_section(constants.INISECT_INS)):
2651
    _Fail("Export info file doesn't have the required fields")
2652

    
2653
  return config.Dumps()
2654

    
2655

    
2656
def ListExports():
2657
  """Return a list of exports currently available on this machine.
2658

2659
  @rtype: list
2660
  @return: list of the exports
2661

2662
  """
2663
  if os.path.isdir(pathutils.EXPORT_DIR):
2664
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2665
  else:
2666
    _Fail("No exports directory")
2667

    
2668

    
2669
def RemoveExport(export):
2670
  """Remove an existing export from the node.
2671

2672
  @type export: str
2673
  @param export: the name of the export to remove
2674
  @rtype: None
2675

2676
  """
2677
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2678

    
2679
  try:
2680
    shutil.rmtree(target)
2681
  except EnvironmentError, err:
2682
    _Fail("Error while removing the export: %s", err, exc=True)
2683

    
2684

    
2685
def BlockdevRename(devlist):
2686
  """Rename a list of block devices.
2687

2688
  @type devlist: list of tuples
2689
  @param devlist: list of tuples of the form  (disk,
2690
      new_logical_id, new_physical_id); disk is an
2691
      L{objects.Disk} object describing the current disk,
2692
      and new logical_id/physical_id is the name we
2693
      rename it to
2694
  @rtype: boolean
2695
  @return: True if all renames succeeded, False otherwise
2696

2697
  """
2698
  msgs = []
2699
  result = True
2700
  for disk, unique_id in devlist:
2701
    dev = _RecursiveFindBD(disk)
2702
    if dev is None:
2703
      msgs.append("Can't find device %s in rename" % str(disk))
2704
      result = False
2705
      continue
2706
    try:
2707
      old_rpath = dev.dev_path
2708
      dev.Rename(unique_id)
2709
      new_rpath = dev.dev_path
2710
      if old_rpath != new_rpath:
2711
        DevCacheManager.RemoveCache(old_rpath)
2712
        # FIXME: we should add the new cache information here, like:
2713
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2714
        # but we don't have the owner here - maybe parse from existing
2715
        # cache? for now, we only lose lvm data when we rename, which
2716
        # is less critical than DRBD or MD
2717
    except errors.BlockDeviceError, err:
2718
      msgs.append("Can't rename device '%s' to '%s': %s" %
2719
                  (dev, unique_id, err))
2720
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2721
      result = False
2722
  if not result:
2723
    _Fail("; ".join(msgs))
2724

    
2725

    
2726
def _TransformFileStorageDir(fs_dir):
2727
  """Checks whether given file_storage_dir is valid.
2728

2729
  Checks wheter the given fs_dir is within the cluster-wide default
2730
  file_storage_dir or the shared_file_storage_dir, which are stored in
2731
  SimpleStore. Only paths under those directories are allowed.
2732

2733
  @type fs_dir: str
2734
  @param fs_dir: the path to check
2735

2736
  @return: the normalized path if valid, None otherwise
2737

2738
  """
2739
  if not (constants.ENABLE_FILE_STORAGE or
2740
          constants.ENABLE_SHARED_FILE_STORAGE):
2741
    _Fail("File storage disabled at configure time")
2742
  cfg = _GetConfig()
2743
  fs_dir = os.path.normpath(fs_dir)
2744
  base_fstore = cfg.GetFileStorageDir()
2745
  base_shared = cfg.GetSharedFileStorageDir()
2746
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2747
          utils.IsBelowDir(base_shared, fs_dir)):
2748
    _Fail("File storage directory '%s' is not under base file"
2749
          " storage directory '%s' or shared storage directory '%s'",
2750
          fs_dir, base_fstore, base_shared)
2751
  return fs_dir
2752

    
2753

    
2754
def CreateFileStorageDir(file_storage_dir):
2755
  """Create file storage directory.
2756

2757
  @type file_storage_dir: str
2758
  @param file_storage_dir: directory to create
2759

2760
  @rtype: tuple
2761
  @return: tuple with first element a boolean indicating wheter dir
2762
      creation was successful or not
2763

2764
  """
2765
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2766
  if os.path.exists(file_storage_dir):
2767
    if not os.path.isdir(file_storage_dir):
2768
      _Fail("Specified storage dir '%s' is not a directory",
2769
            file_storage_dir)
2770
  else:
2771
    try:
2772
      os.makedirs(file_storage_dir, 0750)
2773
    except OSError, err:
2774
      _Fail("Cannot create file storage directory '%s': %s",
2775
            file_storage_dir, err, exc=True)
2776

    
2777

    
2778
def RemoveFileStorageDir(file_storage_dir):
2779
  """Remove file storage directory.
2780

2781
  Remove it only if it's empty. If not log an error and return.
2782

2783
  @type file_storage_dir: str
2784
  @param file_storage_dir: the directory we should cleanup
2785
  @rtype: tuple (success,)
2786
  @return: tuple of one element, C{success}, denoting
2787
      whether the operation was successful
2788

2789
  """
2790
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2791
  if os.path.exists(file_storage_dir):
2792
    if not os.path.isdir(file_storage_dir):
2793
      _Fail("Specified Storage directory '%s' is not a directory",
2794
            file_storage_dir)
2795
    # deletes dir only if empty, otherwise we want to fail the rpc call
2796
    try:
2797
      os.rmdir(file_storage_dir)
2798
    except OSError, err:
2799
      _Fail("Cannot remove file storage directory '%s': %s",
2800
            file_storage_dir, err)
2801

    
2802

    
2803
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2804
  """Rename the file storage directory.
2805

2806
  @type old_file_storage_dir: str
2807
  @param old_file_storage_dir: the current path
2808
  @type new_file_storage_dir: str
2809
  @param new_file_storage_dir: the name we should rename to
2810
  @rtype: tuple (success,)
2811
  @return: tuple of one element, C{success}, denoting
2812
      whether the operation was successful
2813

2814
  """
2815
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2816
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2817
  if not os.path.exists(new_file_storage_dir):
2818
    if os.path.isdir(old_file_storage_dir):
2819
      try:
2820
        os.rename(old_file_storage_dir, new_file_storage_dir)
2821
      except OSError, err:
2822
        _Fail("Cannot rename '%s' to '%s': %s",
2823
              old_file_storage_dir, new_file_storage_dir, err)
2824
    else:
2825
      _Fail("Specified storage dir '%s' is not a directory",
2826
            old_file_storage_dir)
2827
  else:
2828
    if os.path.exists(old_file_storage_dir):
2829
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2830
            old_file_storage_dir, new_file_storage_dir)
2831

    
2832

    
2833
def _EnsureJobQueueFile(file_name):
2834
  """Checks whether the given filename is in the queue directory.
2835

2836
  @type file_name: str
2837
  @param file_name: the file name we should check
2838
  @rtype: None
2839
  @raises RPCFail: if the file is not valid
2840

2841
  """
2842
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2843
    _Fail("Passed job queue file '%s' does not belong to"
2844
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2845

    
2846

    
2847
def JobQueueUpdate(file_name, content):
2848
  """Updates a file in the queue directory.
2849

2850
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2851
  checking.
2852

2853
  @type file_name: str
2854
  @param file_name: the job file name
2855
  @type content: str
2856
  @param content: the new job contents
2857
  @rtype: boolean
2858
  @return: the success of the operation
2859

2860
  """
2861
  file_name = vcluster.LocalizeVirtualPath(file_name)
2862

    
2863
  _EnsureJobQueueFile(file_name)
2864
  getents = runtime.GetEnts()
2865

    
2866
  # Write and replace the file atomically
2867
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2868
                  gid=getents.masterd_gid)
2869

    
2870

    
2871
def JobQueueRename(old, new):
2872
  """Renames a job queue file.
2873

2874
  This is just a wrapper over os.rename with proper checking.
2875

2876
  @type old: str
2877
  @param old: the old (actual) file name
2878
  @type new: str
2879
  @param new: the desired file name
2880
  @rtype: tuple
2881
  @return: the success of the operation and payload
2882

2883
  """
2884
  old = vcluster.LocalizeVirtualPath(old)
2885
  new = vcluster.LocalizeVirtualPath(new)
2886

    
2887
  _EnsureJobQueueFile(old)
2888
  _EnsureJobQueueFile(new)
2889

    
2890
  getents = runtime.GetEnts()
2891

    
2892
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2893
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2894

    
2895

    
2896
def BlockdevClose(instance_name, disks):
2897
  """Closes the given block devices.
2898

2899
  This means they will be switched to secondary mode (in case of
2900
  DRBD).
2901

2902
  @param instance_name: if the argument is not empty, the symlinks
2903
      of this instance will be removed
2904
  @type disks: list of L{objects.Disk}
2905
  @param disks: the list of disks to be closed
2906
  @rtype: tuple (success, message)
2907
  @return: a tuple of success and message, where success
2908
      indicates the succes of the operation, and message
2909
      which will contain the error details in case we
2910
      failed
2911

2912
  """
2913
  bdevs = []
2914
  for cf in disks:
2915
    rd = _RecursiveFindBD(cf)
2916
    if rd is None:
2917
      _Fail("Can't find device %s", cf)
2918
    bdevs.append(rd)
2919

    
2920
  msg = []
2921
  for rd in bdevs:
2922
    try:
2923
      rd.Close()
2924
    except errors.BlockDeviceError, err:
2925
      msg.append(str(err))
2926
  if msg:
2927
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2928
  else:
2929
    if instance_name:
2930
      _RemoveBlockDevLinks(instance_name, disks)
2931

    
2932

    
2933
def ValidateHVParams(hvname, hvparams):
2934
  """Validates the given hypervisor parameters.
2935

2936
  @type hvname: string
2937
  @param hvname: the hypervisor name
2938
  @type hvparams: dict
2939
  @param hvparams: the hypervisor parameters to be validated
2940
  @rtype: None
2941

2942
  """
2943
  try:
2944
    hv_type = hypervisor.GetHypervisor(hvname)
2945
    hv_type.ValidateParameters(hvparams)
2946
  except errors.HypervisorError, err:
2947
    _Fail(str(err), log=False)
2948

    
2949

    
2950
def _CheckOSPList(os_obj, parameters):
2951
  """Check whether a list of parameters is supported by the OS.
2952

2953
  @type os_obj: L{objects.OS}
2954
  @param os_obj: OS object to check
2955
  @type parameters: list
2956
  @param parameters: the list of parameters to check
2957

2958
  """
2959
  supported = [v[0] for v in os_obj.supported_parameters]
2960
  delta = frozenset(parameters).difference(supported)
2961
  if delta:
2962
    _Fail("The following parameters are not supported"
2963
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2964

    
2965

    
2966
def ValidateOS(required, osname, checks, osparams):
2967
  """Validate the given OS' parameters.
2968

2969
  @type required: boolean
2970
  @param required: whether absence of the OS should translate into
2971
      failure or not
2972
  @type osname: string
2973
  @param osname: the OS to be validated
2974
  @type checks: list
2975
  @param checks: list of the checks to run (currently only 'parameters')
2976
  @type osparams: dict
2977
  @param osparams: dictionary with OS parameters
2978
  @rtype: boolean
2979
  @return: True if the validation passed, or False if the OS was not
2980
      found and L{required} was false
2981

2982
  """
2983
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2984
    _Fail("Unknown checks required for OS %s: %s", osname,
2985
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2986

    
2987
  name_only = objects.OS.GetName(osname)
2988
  status, tbv = _TryOSFromDisk(name_only, None)
2989

    
2990
  if not status:
2991
    if required:
2992
      _Fail(tbv)
2993
    else:
2994
      return False
2995

    
2996
  if max(tbv.api_versions) < constants.OS_API_V20:
2997
    return True
2998

    
2999
  if constants.OS_VALIDATE_PARAMETERS in checks:
3000
    _CheckOSPList(tbv, osparams.keys())
3001

    
3002
  validate_env = OSCoreEnv(osname, tbv, osparams)
3003
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3004
                        cwd=tbv.path, reset_env=True)
3005
  if result.failed:
3006
    logging.error("os validate command '%s' returned error: %s output: %s",
3007
                  result.cmd, result.fail_reason, result.output)
3008
    _Fail("OS validation script failed (%s), output: %s",
3009
          result.fail_reason, result.output, log=False)
3010

    
3011
  return True
3012

    
3013

    
3014
def DemoteFromMC():
3015
  """Demotes the current node from master candidate role.
3016

3017
  """
3018
  # try to ensure we're not the master by mistake
3019
  master, myself = ssconf.GetMasterAndMyself()
3020
  if master == myself:
3021
    _Fail("ssconf status shows I'm the master node, will not demote")
3022

    
3023
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3024
  if not result.failed:
3025
    _Fail("The master daemon is running, will not demote")
3026

    
3027
  try:
3028
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3029
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3030
  except EnvironmentError, err:
3031
    if err.errno != errno.ENOENT:
3032
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3033

    
3034
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3035

    
3036

    
3037
def _GetX509Filenames(cryptodir, name):
3038
  """Returns the full paths for the private key and certificate.
3039

3040
  """
3041
  return (utils.PathJoin(cryptodir, name),
3042
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3043
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3044

    
3045

    
3046
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3047
  """Creates a new X509 certificate for SSL/TLS.
3048

3049
  @type validity: int
3050
  @param validity: Validity in seconds
3051
  @rtype: tuple; (string, string)
3052
  @return: Certificate name and public part
3053

3054
  """
3055
  (key_pem, cert_pem) = \
3056
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3057
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3058

    
3059
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3060
                              prefix="x509-%s-" % utils.TimestampForFilename())
3061
  try:
3062
    name = os.path.basename(cert_dir)
3063
    assert len(name) > 5
3064

    
3065
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3066

    
3067
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3068
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3069

    
3070
    # Never return private key as it shouldn't leave the node
3071
    return (name, cert_pem)
3072
  except Exception:
3073
    shutil.rmtree(cert_dir, ignore_errors=True)
3074
    raise
3075

    
3076

    
3077
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3078
  """Removes a X509 certificate.
3079

3080
  @type name: string
3081
  @param name: Certificate name
3082

3083
  """
3084
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3085

    
3086
  utils.RemoveFile(key_file)
3087
  utils.RemoveFile(cert_file)
3088

    
3089
  try:
3090
    os.rmdir(cert_dir)
3091
  except EnvironmentError, err:
3092
    _Fail("Cannot remove certificate directory '%s': %s",
3093
          cert_dir, err)
3094

    
3095

    
3096
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3097
  """Returns the command for the requested input/output.
3098

3099
  @type instance: L{objects.Instance}
3100
  @param instance: The instance object
3101
  @param mode: Import/export mode
3102
  @param ieio: Input/output type
3103
  @param ieargs: Input/output arguments
3104

3105
  """
3106
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3107

    
3108
  env = None
3109
  prefix = None
3110
  suffix = None
3111
  exp_size = None
3112

    
3113
  if ieio == constants.IEIO_FILE:
3114
    (filename, ) = ieargs
3115

    
3116
    if not utils.IsNormAbsPath(filename):
3117
      _Fail("Path '%s' is not normalized or absolute", filename)
3118

    
3119
    real_filename = os.path.realpath(filename)
3120
    directory = os.path.dirname(real_filename)
3121

    
3122
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3123
      _Fail("File '%s' is not under exports directory '%s': %s",
3124
            filename, pathutils.EXPORT_DIR, real_filename)
3125

    
3126
    # Create directory
3127
    utils.Makedirs(directory, mode=0750)
3128

    
3129
    quoted_filename = utils.ShellQuote(filename)
3130

    
3131
    if mode == constants.IEM_IMPORT:
3132
      suffix = "> %s" % quoted_filename
3133
    elif mode == constants.IEM_EXPORT:
3134
      suffix = "< %s" % quoted_filename
3135

    
3136
      # Retrieve file size
3137
      try:
3138
        st = os.stat(filename)
3139
      except EnvironmentError, err:
3140
        logging.error("Can't stat(2) %s: %s", filename, err)
3141
      else:
3142
        exp_size = utils.BytesToMebibyte(st.st_size)
3143

    
3144
  elif ieio == constants.IEIO_RAW_DISK:
3145
    (disk, ) = ieargs
3146

    
3147
    real_disk = _OpenRealBD(disk)
3148

    
3149
    if mode == constants.IEM_IMPORT:
3150
      # we set here a smaller block size as, due to transport buffering, more
3151
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3152
      # is not already there or we pass a wrong path; we use notrunc to no
3153
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3154
      # much memory; this means that at best, we flush every 64k, which will
3155
      # not be very fast
3156
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3157
                                    " bs=%s oflag=dsync"),
3158
                                    real_disk.dev_path,
3159
                                    str(64 * 1024))
3160

    
3161
    elif mode == constants.IEM_EXPORT:
3162
      # the block size on the read dd is 1MiB to match our units
3163
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3164
                                   real_disk.dev_path,
3165
                                   str(1024 * 1024), # 1 MB
3166
                                   str(disk.size))
3167
      exp_size = disk.size
3168

    
3169
  elif ieio == constants.IEIO_SCRIPT:
3170
    (disk, disk_index, ) = ieargs
3171

    
3172
    assert isinstance(disk_index, (int, long))
3173

    
3174
    real_disk = _OpenRealBD(disk)
3175

    
3176
    inst_os = OSFromDisk(instance.os)
3177
    env = OSEnvironment(instance, inst_os)
3178

    
3179
    if mode == constants.IEM_IMPORT:
3180
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3181
      env["IMPORT_INDEX"] = str(disk_index)
3182
      script = inst_os.import_script
3183

    
3184
    elif mode == constants.IEM_EXPORT:
3185
      env["EXPORT_DEVICE"] = real_disk.dev_path
3186
      env["EXPORT_INDEX"] = str(disk_index)
3187
      script = inst_os.export_script
3188

    
3189
    # TODO: Pass special environment only to script
3190
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3191

    
3192
    if mode == constants.IEM_IMPORT:
3193
      suffix = "| %s" % script_cmd
3194

    
3195
    elif mode == constants.IEM_EXPORT:
3196
      prefix = "%s |" % script_cmd
3197

    
3198
    # Let script predict size
3199
    exp_size = constants.IE_CUSTOM_SIZE
3200

    
3201
  else:
3202
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3203

    
3204
  return (env, prefix, suffix, exp_size)
3205

    
3206

    
3207
def _CreateImportExportStatusDir(prefix):
3208
  """Creates status directory for import/export.
3209

3210
  """
3211
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3212
                          prefix=("%s-%s-" %
3213
                                  (prefix, utils.TimestampForFilename())))
3214

    
3215

    
3216
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3217
                            ieio, ieioargs):
3218
  """Starts an import or export daemon.
3219

3220
  @param mode: Import/output mode
3221
  @type opts: L{objects.ImportExportOptions}
3222
  @param opts: Daemon options
3223
  @type host: string
3224
  @param host: Remote host for export (None for import)
3225
  @type port: int
3226
  @param port: Remote port for export (None for import)
3227
  @type instance: L{objects.Instance}
3228
  @param instance: Instance object
3229
  @type component: string
3230
  @param component: which part of the instance is transferred now,
3231
      e.g. 'disk/0'
3232
  @param ieio: Input/output type
3233
  @param ieioargs: Input/output arguments
3234

3235
  """
3236
  if mode == constants.IEM_IMPORT:
3237
    prefix = "import"
3238

    
3239
    if not (host is None and port is None):
3240
      _Fail("Can not specify host or port on import")
3241

    
3242
  elif mode == constants.IEM_EXPORT:
3243
    prefix = "export"
3244

    
3245
    if host is None or port is None:
3246
      _Fail("Host and port must be specified for an export")
3247

    
3248
  else:
3249
    _Fail("Invalid mode %r", mode)
3250

    
3251
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3252
    _Fail("Cluster certificate can only be used for both key and CA")
3253

    
3254
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3255
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3256

    
3257
  if opts.key_name is None:
3258
    # Use server.pem
3259
    key_path = pathutils.NODED_CERT_FILE
3260
    cert_path = pathutils.NODED_CERT_FILE
3261
    assert opts.ca_pem is None
3262
  else:
3263
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3264
                                                 opts.key_name)
3265
    assert opts.ca_pem is not None
3266

    
3267
  for i in [key_path, cert_path]:
3268
    if not os.path.exists(i):
3269
      _Fail("File '%s' does not exist" % i)
3270

    
3271
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3272
  try:
3273
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3274
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3275
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3276

    
3277
    if opts.ca_pem is None:
3278
      # Use server.pem
3279
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3280
    else:
3281
      ca = opts.ca_pem
3282

    
3283
    # Write CA file
3284
    utils.WriteFile(ca_file, data=ca, mode=0400)
3285

    
3286
    cmd = [
3287
      pathutils.IMPORT_EXPORT_DAEMON,
3288
      status_file, mode,
3289
      "--key=%s" % key_path,
3290
      "--cert=%s" % cert_path,
3291
      "--ca=%s" % ca_file,
3292
      ]
3293

    
3294
    if host:
3295
      cmd.append("--host=%s" % host)
3296

    
3297
    if port:
3298
      cmd.append("--port=%s" % port)
3299

    
3300
    if opts.ipv6:
3301
      cmd.append("--ipv6")
3302
    else:
3303
      cmd.append("--ipv4")
3304

    
3305
    if opts.compress:
3306
      cmd.append("--compress=%s" % opts.compress)
3307

    
3308
    if opts.magic:
3309
      cmd.append("--magic=%s" % opts.magic)
3310

    
3311
    if exp_size is not None:
3312
      cmd.append("--expected-size=%s" % exp_size)
3313

    
3314
    if cmd_prefix:
3315
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3316

    
3317
    if cmd_suffix:
3318
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3319

    
3320
    if mode == constants.IEM_EXPORT:
3321
      # Retry connection a few times when connecting to remote peer
3322
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3323
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3324
    elif opts.connect_timeout is not None:
3325
      assert mode == constants.IEM_IMPORT
3326
      # Overall timeout for establishing connection while listening
3327
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3328

    
3329
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3330

    
3331
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3332
    # support for receiving a file descriptor for output
3333
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3334
                      output=logfile)
3335

    
3336
    # The import/export name is simply the status directory name
3337
    return os.path.basename(status_dir)
3338

    
3339
  except Exception:
3340
    shutil.rmtree(status_dir, ignore_errors=True)
3341
    raise
3342

    
3343

    
3344
def GetImportExportStatus(names):
3345
  """Returns import/export daemon status.
3346

3347
  @type names: sequence
3348
  @param names: List of names
3349
  @rtype: List of dicts
3350
  @return: Returns a list of the state of each named import/export or None if a
3351
           status couldn't be read
3352

3353
  """
3354
  result = []
3355

    
3356
  for name in names:
3357
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3358
                                 _IES_STATUS_FILE)
3359

    
3360
    try:
3361
      data = utils.ReadFile(status_file)
3362
    except EnvironmentError, err:
3363
      if err.errno != errno.ENOENT:
3364
        raise
3365
      data = None
3366

    
3367
    if not data:
3368
      result.append(None)
3369
      continue
3370

    
3371
    result.append(serializer.LoadJson(data))
3372

    
3373
  return result
3374

    
3375

    
3376
def AbortImportExport(name):
3377
  """Sends SIGTERM to a running import/export daemon.
3378

3379
  """
3380
  logging.info("Abort import/export %s", name)
3381

    
3382
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3383
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3384

    
3385
  if pid:
3386
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3387
                 name, pid)
3388
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3389

    
3390

    
3391
def CleanupImportExport(name):
3392
  """Cleanup after an import or export.
3393

3394
  If the import/export daemon is still running it's killed. Afterwards the
3395
  whole status directory is removed.
3396

3397
  """
3398
  logging.info("Finalizing import/export %s", name)
3399

    
3400
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3401

    
3402
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3403

    
3404
  if pid:
3405
    logging.info("Import/export %s is still running with PID %s",
3406
                 name, pid)
3407
    utils.KillProcess(pid, waitpid=False)
3408

    
3409
  shutil.rmtree(status_dir, ignore_errors=True)
3410

    
3411

    
3412
def _FindDisks(nodes_ip, disks):
3413
  """Sets the physical ID on disks and returns the block devices.
3414

3415
  """
3416
  # set the correct physical ID
3417
  my_name = netutils.Hostname.GetSysName()
3418
  for cf in disks:
3419
    cf.SetPhysicalID(my_name, nodes_ip)
3420

    
3421
  bdevs = []
3422

    
3423
  for cf in disks:
3424
    rd = _RecursiveFindBD(cf)
3425
    if rd is None:
3426
      _Fail("Can't find device %s", cf)
3427
    bdevs.append(rd)
3428
  return bdevs
3429

    
3430

    
3431
def DrbdDisconnectNet(nodes_ip, disks):
3432
  """Disconnects the network on a list of drbd devices.
3433

3434
  """
3435
  bdevs = _FindDisks(nodes_ip, disks)
3436

    
3437
  # disconnect disks
3438
  for rd in bdevs:
3439
    try:
3440
      rd.DisconnectNet()
3441
    except errors.BlockDeviceError, err:
3442
      _Fail("Can't change network configuration to standalone mode: %s",
3443
            err, exc=True)
3444

    
3445

    
3446
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3447
  """Attaches the network on a list of drbd devices.
3448

3449
  """
3450
  bdevs = _FindDisks(nodes_ip, disks)
3451

    
3452
  if multimaster:
3453
    for idx, rd in enumerate(bdevs):
3454
      try:
3455
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3456
      except EnvironmentError, err:
3457
        _Fail("Can't create symlink: %s", err)
3458
  # reconnect disks, switch to new master configuration and if
3459
  # needed primary mode
3460
  for rd in bdevs:
3461
    try:
3462
      rd.AttachNet(multimaster)
3463
    except errors.BlockDeviceError, err:
3464
      _Fail("Can't change network configuration: %s", err)
3465

    
3466
  # wait until the disks are connected; we need to retry the re-attach
3467
  # if the device becomes standalone, as this might happen if the one
3468
  # node disconnects and reconnects in a different mode before the
3469
  # other node reconnects; in this case, one or both of the nodes will
3470
  # decide it has wrong configuration and switch to standalone
3471

    
3472
  def _Attach():
3473
    all_connected = True
3474

    
3475
    for rd in bdevs:
3476
      stats = rd.GetProcStatus()
3477

    
3478
      all_connected = (all_connected and
3479
                       (stats.is_connected or stats.is_in_resync))
3480

    
3481
      if stats.is_standalone:
3482
        # peer had different config info and this node became
3483
        # standalone, even though this should not happen with the
3484
        # new staged way of changing disk configs
3485
        try:
3486
          rd.AttachNet(multimaster)
3487
        except errors.BlockDeviceError, err:
3488
          _Fail("Can't change network configuration: %s", err)
3489

    
3490
    if not all_connected:
3491
      raise utils.RetryAgain()
3492

    
3493
  try:
3494
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3495
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3496
  except utils.RetryTimeout:
3497
    _Fail("Timeout in disk reconnecting")
3498

    
3499
  if multimaster:
3500
    # change to primary mode
3501
    for rd in bdevs:
3502
      try:
3503
        rd.Open()
3504
      except errors.BlockDeviceError, err:
3505
        _Fail("Can't change to primary mode: %s", err)
3506

    
3507

    
3508
def DrbdWaitSync(nodes_ip, disks):
3509
  """Wait until DRBDs have synchronized.
3510

3511
  """
3512
  def _helper(rd):
3513
    stats = rd.GetProcStatus()
3514
    if not (stats.is_connected or stats.is_in_resync):
3515
      raise utils.RetryAgain()
3516
    return stats
3517

    
3518
  bdevs = _FindDisks(nodes_ip, disks)
3519

    
3520
  min_resync = 100
3521
  alldone = True
3522
  for rd in bdevs:
3523
    try:
3524
      # poll each second for 15 seconds
3525
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3526
    except utils.RetryTimeout:
3527
      stats = rd.GetProcStatus()
3528
      # last check
3529
      if not (stats.is_connected or stats.is_in_resync):
3530
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3531
    alldone = alldone and (not stats.is_in_resync)
3532
    if stats.sync_percent is not None:
3533
      min_resync = min(min_resync, stats.sync_percent)
3534

    
3535
  return (alldone, min_resync)
3536

    
3537

    
3538
def GetDrbdUsermodeHelper():
3539
  """Returns DRBD usermode helper currently configured.
3540

3541
  """
3542
  try:
3543
    return bdev.BaseDRBD.GetUsermodeHelper()
3544
  except errors.BlockDeviceError, err:
3545
    _Fail(str(err))
3546

    
3547

    
3548
def PowercycleNode(hypervisor_type):
3549
  """Hard-powercycle the node.
3550

3551
  Because we need to return first, and schedule the powercycle in the
3552
  background, we won't be able to report failures nicely.
3553

3554
  """
3555
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3556
  try:
3557
    pid = os.fork()
3558
  except OSError:
3559
    # if we can't fork, we'll pretend that we're in the child process
3560
    pid = 0
3561
  if pid > 0:
3562
    return "Reboot scheduled in 5 seconds"
3563
  # ensure the child is running on ram
3564
  try:
3565
    utils.Mlockall()
3566
  except Exception: # pylint: disable=W0703
3567
    pass
3568
  time.sleep(5)
3569
  hyper.PowercycleNode()
3570

    
3571

    
3572
class HooksRunner(object):
3573
  """Hook runner.
3574

3575
  This class is instantiated on the node side (ganeti-noded) and not
3576
  on the master side.
3577

3578
  """
3579
  def __init__(self, hooks_base_dir=None):
3580
    """Constructor for hooks runner.
3581

3582
    @type hooks_base_dir: str or None
3583
    @param hooks_base_dir: if not None, this overrides the
3584
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3585

3586
    """
3587
    if hooks_base_dir is None:
3588
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3589
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3590
    # constant
3591
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3592

    
3593
  def RunLocalHooks(self, node_list, hpath, phase, env):
3594
    """Check that the hooks will be run only locally and then run them.
3595

3596
    """
3597
    assert len(node_list) == 1
3598
    node = node_list[0]
3599
    _, myself = ssconf.GetMasterAndMyself()
3600
    assert node == myself
3601

    
3602
    results = self.RunHooks(hpath, phase, env)
3603

    
3604
    # Return values in the form expected by HooksMaster
3605
    return {node: (None, False, results)}
3606

    
3607
  def RunHooks(self, hpath, phase, env):
3608
    """Run the scripts in the hooks directory.
3609

3610
    @type hpath: str
3611
    @param hpath: the path to the hooks directory which
3612
        holds the scripts
3613
    @type phase: str
3614
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3615
        L{constants.HOOKS_PHASE_POST}
3616
    @type env: dict
3617
    @param env: dictionary with the environment for the hook
3618
    @rtype: list
3619
    @return: list of 3-element tuples:
3620
      - script path
3621
      - script result, either L{constants.HKR_SUCCESS} or
3622
        L{constants.HKR_FAIL}
3623
      - output of the script
3624

3625
    @raise errors.ProgrammerError: for invalid input
3626
        parameters
3627

3628
    """
3629
    if phase == constants.HOOKS_PHASE_PRE:
3630
      suffix = "pre"
3631
    elif phase == constants.HOOKS_PHASE_POST:
3632
      suffix = "post"
3633
    else:
3634
      _Fail("Unknown hooks phase '%s'", phase)
3635

    
3636
    subdir = "%s-%s.d" % (hpath, suffix)
3637
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3638

    
3639
    results = []
3640

    
3641
    if not os.path.isdir(dir_name):
3642
      # for non-existing/non-dirs, we simply exit instead of logging a
3643
      # warning at every operation
3644
      return results
3645

    
3646
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3647

    
3648
    for (relname, relstatus, runresult) in runparts_results:
3649
      if relstatus == constants.RUNPARTS_SKIP:
3650
        rrval = constants.HKR_SKIP
3651
        output = ""
3652
      elif relstatus == constants.RUNPARTS_ERR:
3653
        rrval = constants.HKR_FAIL
3654
        output = "Hook script execution error: %s" % runresult
3655
      elif relstatus == constants.RUNPARTS_RUN:
3656
        if runresult.failed:
3657
          rrval = constants.HKR_FAIL
3658
        else:
3659
          rrval = constants.HKR_SUCCESS
3660
        output = utils.SafeEncode(runresult.output.strip())
3661
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3662

    
3663
    return results
3664

    
3665

    
3666
class IAllocatorRunner(object):
3667
  """IAllocator runner.
3668

3669
  This class is instantiated on the node side (ganeti-noded) and not on
3670
  the master side.
3671

3672
  """
3673
  @staticmethod
3674
  def Run(name, idata):
3675
    """Run an iallocator script.
3676

3677
    @type name: str
3678
    @param name: the iallocator script name
3679
    @type idata: str
3680
    @param idata: the allocator input data
3681

3682
    @rtype: tuple
3683
    @return: two element tuple of:
3684
       - status
3685
       - either error message or stdout of allocator (for success)
3686

3687
    """
3688
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3689
                                  os.path.isfile)
3690
    if alloc_script is None:
3691
      _Fail("iallocator module '%s' not found in the search path", name)
3692

    
3693
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3694
    try:
3695
      os.write(fd, idata)
3696
      os.close(fd)
3697
      result = utils.RunCmd([alloc_script, fin_name])
3698
      if result.failed:
3699
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3700
              name, result.fail_reason, result.output)
3701
    finally:
3702
      os.unlink(fin_name)
3703

    
3704
    return result.stdout
3705

    
3706

    
3707
class DevCacheManager(object):
3708
  """Simple class for managing a cache of block device information.
3709

3710
  """
3711
  _DEV_PREFIX = "/dev/"
3712
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3713

    
3714
  @classmethod
3715
  def _ConvertPath(cls, dev_path):
3716
    """Converts a /dev/name path to the cache file name.
3717

3718
    This replaces slashes with underscores and strips the /dev
3719
    prefix. It then returns the full path to the cache file.
3720

3721
    @type dev_path: str
3722
    @param dev_path: the C{/dev/} path name
3723
    @rtype: str
3724
    @return: the converted path name
3725

3726
    """
3727
    if dev_path.startswith(cls._DEV_PREFIX):
3728
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3729
    dev_path = dev_path.replace("/", "_")
3730
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3731
    return fpath
3732

    
3733
  @classmethod
3734
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3735
    """Updates the cache information for a given device.
3736

3737
    @type dev_path: str
3738
    @param dev_path: the pathname of the device
3739
    @type owner: str
3740
    @param owner: the owner (instance name) of the device
3741
    @type on_primary: bool
3742
    @param on_primary: whether this is the primary
3743
        node nor not
3744
    @type iv_name: str
3745
    @param iv_name: the instance-visible name of the
3746
        device, as in objects.Disk.iv_name
3747

3748
    @rtype: None
3749

3750
    """
3751
    if dev_path is None:
3752
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3753
      return
3754
    fpath = cls._ConvertPath(dev_path)
3755
    if on_primary:
3756
      state = "primary"
3757
    else:
3758
      state = "secondary"
3759
    if iv_name is None:
3760
      iv_name = "not_visible"
3761
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3762
    try:
3763
      utils.WriteFile(fpath, data=fdata)
3764
    except EnvironmentError, err:
3765
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3766

    
3767
  @classmethod
3768
  def RemoveCache(cls, dev_path):
3769
    """Remove data for a dev_path.
3770

3771
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3772
    path name and logging.
3773

3774
    @type dev_path: str
3775
    @param dev_path: the pathname of the device
3776

3777
    @rtype: None
3778

3779
    """
3780
    if dev_path is None:
3781
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3782
      return
3783
    fpath = cls._ConvertPath(dev_path)
3784
    try:
3785
      utils.RemoveFile(fpath)
3786
    except EnvironmentError, err:
3787
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)