Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ b3589802

History | View | Annotate | Download (113.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70
_ALLOWED_CLEAN_DIRS = frozenset([
71
  pathutils.DATA_DIR,
72
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
73
  pathutils.QUEUE_DIR,
74
  pathutils.CRYPTO_KEYS_DIR,
75
  ])
76
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77
_X509_KEY_FILE = "key"
78
_X509_CERT_FILE = "cert"
79
_IES_STATUS_FILE = "status"
80
_IES_PID_FILE = "pid"
81
_IES_CA_FILE = "ca"
82

    
83
#: Valid LVS output line regex
84
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85

    
86
# Actions for the master setup script
87
_MASTER_START = "start"
88
_MASTER_STOP = "stop"
89

    
90

    
91
class RPCFail(Exception):
92
  """Class denoting RPC failure.
93

94
  Its argument is the error message.
95

96
  """
97

    
98

    
99
def _Fail(msg, *args, **kwargs):
100
  """Log an error and the raise an RPCFail exception.
101

102
  This exception is then handled specially in the ganeti daemon and
103
  turned into a 'failed' return type. As such, this function is a
104
  useful shortcut for logging the error and returning it to the master
105
  daemon.
106

107
  @type msg: string
108
  @param msg: the text of the exception
109
  @raise RPCFail
110

111
  """
112
  if args:
113
    msg = msg % args
114
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
115
    if "exc" in kwargs and kwargs["exc"]:
116
      logging.exception(msg)
117
    else:
118
      logging.error(msg)
119
  raise RPCFail(msg)
120

    
121

    
122
def _GetConfig():
123
  """Simple wrapper to return a SimpleStore.
124

125
  @rtype: L{ssconf.SimpleStore}
126
  @return: a SimpleStore instance
127

128
  """
129
  return ssconf.SimpleStore()
130

    
131

    
132
def _GetSshRunner(cluster_name):
133
  """Simple wrapper to return an SshRunner.
134

135
  @type cluster_name: str
136
  @param cluster_name: the cluster name, which is needed
137
      by the SshRunner constructor
138
  @rtype: L{ssh.SshRunner}
139
  @return: an SshRunner instance
140

141
  """
142
  return ssh.SshRunner(cluster_name)
143

    
144

    
145
def _Decompress(data):
146
  """Unpacks data compressed by the RPC client.
147

148
  @type data: list or tuple
149
  @param data: Data sent by RPC client
150
  @rtype: str
151
  @return: Decompressed data
152

153
  """
154
  assert isinstance(data, (list, tuple))
155
  assert len(data) == 2
156
  (encoding, content) = data
157
  if encoding == constants.RPC_ENCODING_NONE:
158
    return content
159
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160
    return zlib.decompress(base64.b64decode(content))
161
  else:
162
    raise AssertionError("Unknown data encoding")
163

    
164

    
165
def _CleanDirectory(path, exclude=None):
166
  """Removes all regular files in a directory.
167

168
  @type path: str
169
  @param path: the directory to clean
170
  @type exclude: list
171
  @param exclude: list of files to be excluded, defaults
172
      to the empty list
173

174
  """
175
  if path not in _ALLOWED_CLEAN_DIRS:
176
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
177
          path)
178

    
179
  if not os.path.isdir(path):
180
    return
181
  if exclude is None:
182
    exclude = []
183
  else:
184
    # Normalize excluded paths
185
    exclude = [os.path.normpath(i) for i in exclude]
186

    
187
  for rel_name in utils.ListVisibleFiles(path):
188
    full_name = utils.PathJoin(path, rel_name)
189
    if full_name in exclude:
190
      continue
191
    if os.path.isfile(full_name) and not os.path.islink(full_name):
192
      utils.RemoveFile(full_name)
193

    
194

    
195
def _BuildUploadFileList():
196
  """Build the list of allowed upload files.
197

198
  This is abstracted so that it's built only once at module import time.
199

200
  """
201
  allowed_files = set([
202
    pathutils.CLUSTER_CONF_FILE,
203
    constants.ETC_HOSTS,
204
    pathutils.SSH_KNOWN_HOSTS_FILE,
205
    pathutils.VNC_PASSWORD_FILE,
206
    pathutils.RAPI_CERT_FILE,
207
    pathutils.SPICE_CERT_FILE,
208
    pathutils.SPICE_CACERT_FILE,
209
    pathutils.RAPI_USERS_FILE,
210
    pathutils.CONFD_HMAC_KEY,
211
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
212
    ])
213

    
214
  for hv_name in constants.HYPER_TYPES:
215
    hv_class = hypervisor.GetHypervisorClass(hv_name)
216
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
217

    
218
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
219
    "Allowed file storage paths should never be uploaded via RPC"
220

    
221
  return frozenset(allowed_files)
222

    
223

    
224
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
225

    
226

    
227
def JobQueuePurge():
228
  """Removes job queue files and archived jobs.
229

230
  @rtype: tuple
231
  @return: True, None
232

233
  """
234
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
235
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
236

    
237

    
238
def GetMasterInfo():
239
  """Returns master information.
240

241
  This is an utility function to compute master information, either
242
  for consumption here or from the node daemon.
243

244
  @rtype: tuple
245
  @return: master_netdev, master_ip, master_name, primary_ip_family,
246
    master_netmask
247
  @raise RPCFail: in case of errors
248

249
  """
250
  try:
251
    cfg = _GetConfig()
252
    master_netdev = cfg.GetMasterNetdev()
253
    master_ip = cfg.GetMasterIP()
254
    master_netmask = cfg.GetMasterNetmask()
255
    master_node = cfg.GetMasterNode()
256
    primary_ip_family = cfg.GetPrimaryIPFamily()
257
  except errors.ConfigurationError, err:
258
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
259
  return (master_netdev, master_ip, master_node, primary_ip_family,
260
          master_netmask)
261

    
262

    
263
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
264
  """Decorator that runs hooks before and after the decorated function.
265

266
  @type hook_opcode: string
267
  @param hook_opcode: opcode of the hook
268
  @type hooks_path: string
269
  @param hooks_path: path of the hooks
270
  @type env_builder_fn: function
271
  @param env_builder_fn: function that returns a dictionary containing the
272
    environment variables for the hooks. Will get all the parameters of the
273
    decorated function.
274
  @raise RPCFail: in case of pre-hook failure
275

276
  """
277
  def decorator(fn):
278
    def wrapper(*args, **kwargs):
279
      _, myself = ssconf.GetMasterAndMyself()
280
      nodes = ([myself], [myself])  # these hooks run locally
281

    
282
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
283

    
284
      cfg = _GetConfig()
285
      hr = HooksRunner()
286
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
287
                            None, env_fn, logging.warning, cfg.GetClusterName(),
288
                            cfg.GetMasterNode())
289

    
290
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
291
      result = fn(*args, **kwargs)
292
      hm.RunPhase(constants.HOOKS_PHASE_POST)
293

    
294
      return result
295
    return wrapper
296
  return decorator
297

    
298

    
299
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
300
  """Builds environment variables for master IP hooks.
301

302
  @type master_params: L{objects.MasterNetworkParameters}
303
  @param master_params: network parameters of the master
304
  @type use_external_mip_script: boolean
305
  @param use_external_mip_script: whether to use an external master IP
306
    address setup script (unused, but necessary per the implementation of the
307
    _RunLocalHooks decorator)
308

309
  """
310
  # pylint: disable=W0613
311
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
312
  env = {
313
    "MASTER_NETDEV": master_params.netdev,
314
    "MASTER_IP": master_params.ip,
315
    "MASTER_NETMASK": str(master_params.netmask),
316
    "CLUSTER_IP_VERSION": str(ver),
317
  }
318

    
319
  return env
320

    
321

    
322
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
323
  """Execute the master IP address setup script.
324

325
  @type master_params: L{objects.MasterNetworkParameters}
326
  @param master_params: network parameters of the master
327
  @type action: string
328
  @param action: action to pass to the script. Must be one of
329
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
330
  @type use_external_mip_script: boolean
331
  @param use_external_mip_script: whether to use an external master IP
332
    address setup script
333
  @raise backend.RPCFail: if there are errors during the execution of the
334
    script
335

336
  """
337
  env = _BuildMasterIpEnv(master_params)
338

    
339
  if use_external_mip_script:
340
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
341
  else:
342
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
343

    
344
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
345

    
346
  if result.failed:
347
    _Fail("Failed to %s the master IP. Script return value: %s" %
348
          (action, result.exit_code), log=True)
349

    
350

    
351
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
352
               _BuildMasterIpEnv)
353
def ActivateMasterIp(master_params, use_external_mip_script):
354
  """Activate the IP address of the master daemon.
355

356
  @type master_params: L{objects.MasterNetworkParameters}
357
  @param master_params: network parameters of the master
358
  @type use_external_mip_script: boolean
359
  @param use_external_mip_script: whether to use an external master IP
360
    address setup script
361
  @raise RPCFail: in case of errors during the IP startup
362

363
  """
364
  _RunMasterSetupScript(master_params, _MASTER_START,
365
                        use_external_mip_script)
366

    
367

    
368
def StartMasterDaemons(no_voting):
369
  """Activate local node as master node.
370

371
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
372

373
  @type no_voting: boolean
374
  @param no_voting: whether to start ganeti-masterd without a node vote
375
      but still non-interactively
376
  @rtype: None
377

378
  """
379

    
380
  if no_voting:
381
    masterd_args = "--no-voting --yes-do-it"
382
  else:
383
    masterd_args = ""
384

    
385
  env = {
386
    "EXTRA_MASTERD_ARGS": masterd_args,
387
    }
388

    
389
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
390
  if result.failed:
391
    msg = "Can't start Ganeti master: %s" % result.output
392
    logging.error(msg)
393
    _Fail(msg)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
397
               _BuildMasterIpEnv)
398
def DeactivateMasterIp(master_params, use_external_mip_script):
399
  """Deactivate the master IP on this node.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP turndown
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_STOP,
410
                        use_external_mip_script)
411

    
412

    
413
def StopMasterDaemons():
414
  """Stop the master daemons on this node.
415

416
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
417

418
  @rtype: None
419

420
  """
421
  # TODO: log and report back to the caller the error failures; we
422
  # need to decide in which case we fail the RPC for this
423

    
424
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
425
  if result.failed:
426
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
427
                  " and error %s",
428
                  result.cmd, result.exit_code, result.output)
429

    
430

    
431
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
432
  """Change the netmask of the master IP.
433

434
  @param old_netmask: the old value of the netmask
435
  @param netmask: the new value of the netmask
436
  @param master_ip: the master IP
437
  @param master_netdev: the master network device
438

439
  """
440
  if old_netmask == netmask:
441
    return
442

    
443
  if not netutils.IPAddress.Own(master_ip):
444
    _Fail("The master IP address is not up, not attempting to change its"
445
          " netmask")
446

    
447
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
448
                         "%s/%s" % (master_ip, netmask),
449
                         "dev", master_netdev, "label",
450
                         "%s:0" % master_netdev])
451
  if result.failed:
452
    _Fail("Could not set the new netmask on the master IP address")
453

    
454
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
455
                         "%s/%s" % (master_ip, old_netmask),
456
                         "dev", master_netdev, "label",
457
                         "%s:0" % master_netdev])
458
  if result.failed:
459
    _Fail("Could not bring down the master IP address with the old netmask")
460

    
461

    
462
def EtcHostsModify(mode, host, ip):
463
  """Modify a host entry in /etc/hosts.
464

465
  @param mode: The mode to operate. Either add or remove entry
466
  @param host: The host to operate on
467
  @param ip: The ip associated with the entry
468

469
  """
470
  if mode == constants.ETC_HOSTS_ADD:
471
    if not ip:
472
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
473
              " present")
474
    utils.AddHostToEtcHosts(host, ip)
475
  elif mode == constants.ETC_HOSTS_REMOVE:
476
    if ip:
477
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
478
              " parameter is present")
479
    utils.RemoveHostFromEtcHosts(host)
480
  else:
481
    RPCFail("Mode not supported")
482

    
483

    
484
def LeaveCluster(modify_ssh_setup):
485
  """Cleans up and remove the current node.
486

487
  This function cleans up and prepares the current node to be removed
488
  from the cluster.
489

490
  If processing is successful, then it raises an
491
  L{errors.QuitGanetiException} which is used as a special case to
492
  shutdown the node daemon.
493

494
  @param modify_ssh_setup: boolean
495

496
  """
497
  _CleanDirectory(pathutils.DATA_DIR)
498
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
499
  JobQueuePurge()
500

    
501
  if modify_ssh_setup:
502
    try:
503
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
504

    
505
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
506

    
507
      utils.RemoveFile(priv_key)
508
      utils.RemoveFile(pub_key)
509
    except errors.OpExecError:
510
      logging.exception("Error while processing ssh files")
511

    
512
  try:
513
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
514
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
515
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
516
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
517
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
518
  except: # pylint: disable=W0702
519
    logging.exception("Error while removing cluster secrets")
520

    
521
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
522
  if result.failed:
523
    logging.error("Command %s failed with exitcode %s and error %s",
524
                  result.cmd, result.exit_code, result.output)
525

    
526
  # Raise a custom exception (handled in ganeti-noded)
527
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
528

    
529

    
530
def _GetVgInfo(name):
531
  """Retrieves information about a LVM volume group.
532

533
  """
534
  # TODO: GetVGInfo supports returning information for multiple VGs at once
535
  vginfo = bdev.LogicalVolume.GetVGInfo([name])
536
  if vginfo:
537
    vg_free = int(round(vginfo[0][0], 0))
538
    vg_size = int(round(vginfo[0][1], 0))
539
  else:
540
    vg_free = None
541
    vg_size = None
542

    
543
  return {
544
    "name": name,
545
    "vg_free": vg_free,
546
    "vg_size": vg_size,
547
    }
548

    
549

    
550
def _GetHvInfo(name):
551
  """Retrieves node information from a hypervisor.
552

553
  The information returned depends on the hypervisor. Common items:
554

555
    - vg_size is the size of the configured volume group in MiB
556
    - vg_free is the free size of the volume group in MiB
557
    - memory_dom0 is the memory allocated for domain0 in MiB
558
    - memory_free is the currently available (free) ram in MiB
559
    - memory_total is the total number of ram in MiB
560
    - hv_version: the hypervisor version, if available
561

562
  """
563
  return hypervisor.GetHypervisor(name).GetNodeInfo()
564

    
565

    
566
def _GetNamedNodeInfo(names, fn):
567
  """Calls C{fn} for all names in C{names} and returns a dictionary.
568

569
  @rtype: None or dict
570

571
  """
572
  if names is None:
573
    return None
574
  else:
575
    return map(fn, names)
576

    
577

    
578
def GetNodeInfo(vg_names, hv_names):
579
  """Gives back a hash with different information about the node.
580

581
  @type vg_names: list of string
582
  @param vg_names: Names of the volume groups to ask for disk space information
583
  @type hv_names: list of string
584
  @param hv_names: Names of the hypervisors to ask for node information
585
  @rtype: tuple; (string, None/dict, None/dict)
586
  @return: Tuple containing boot ID, volume group information and hypervisor
587
    information
588

589
  """
590
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
591
  vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
592
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
593

    
594
  return (bootid, vg_info, hv_info)
595

    
596

    
597
def VerifyNode(what, cluster_name):
598
  """Verify the status of the local node.
599

600
  Based on the input L{what} parameter, various checks are done on the
601
  local node.
602

603
  If the I{filelist} key is present, this list of
604
  files is checksummed and the file/checksum pairs are returned.
605

606
  If the I{nodelist} key is present, we check that we have
607
  connectivity via ssh with the target nodes (and check the hostname
608
  report).
609

610
  If the I{node-net-test} key is present, we check that we have
611
  connectivity to the given nodes via both primary IP and, if
612
  applicable, secondary IPs.
613

614
  @type what: C{dict}
615
  @param what: a dictionary of things to check:
616
      - filelist: list of files for which to compute checksums
617
      - nodelist: list of nodes we should check ssh communication with
618
      - node-net-test: list of nodes we should check node daemon port
619
        connectivity with
620
      - hypervisor: list with hypervisors to run the verify for
621
  @rtype: dict
622
  @return: a dictionary with the same keys as the input dict, and
623
      values representing the result of the checks
624

625
  """
626
  result = {}
627
  my_name = netutils.Hostname.GetSysName()
628
  port = netutils.GetDaemonPort(constants.NODED)
629
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
630

    
631
  if constants.NV_HYPERVISOR in what and vm_capable:
632
    result[constants.NV_HYPERVISOR] = tmp = {}
633
    for hv_name in what[constants.NV_HYPERVISOR]:
634
      try:
635
        val = hypervisor.GetHypervisor(hv_name).Verify()
636
      except errors.HypervisorError, err:
637
        val = "Error while checking hypervisor: %s" % str(err)
638
      tmp[hv_name] = val
639

    
640
  if constants.NV_HVPARAMS in what and vm_capable:
641
    result[constants.NV_HVPARAMS] = tmp = []
642
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
643
      try:
644
        logging.info("Validating hv %s, %s", hv_name, hvparms)
645
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
646
      except errors.HypervisorError, err:
647
        tmp.append((source, hv_name, str(err)))
648

    
649
  if constants.NV_FILELIST in what:
650
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
651
      what[constants.NV_FILELIST])
652

    
653
  if constants.NV_NODELIST in what:
654
    (nodes, bynode) = what[constants.NV_NODELIST]
655

    
656
    # Add nodes from other groups (different for each node)
657
    try:
658
      nodes.extend(bynode[my_name])
659
    except KeyError:
660
      pass
661

    
662
    # Use a random order
663
    random.shuffle(nodes)
664

    
665
    # Try to contact all nodes
666
    val = {}
667
    for node in nodes:
668
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
669
      if not success:
670
        val[node] = message
671

    
672
    result[constants.NV_NODELIST] = val
673

    
674
  if constants.NV_NODENETTEST in what:
675
    result[constants.NV_NODENETTEST] = tmp = {}
676
    my_pip = my_sip = None
677
    for name, pip, sip in what[constants.NV_NODENETTEST]:
678
      if name == my_name:
679
        my_pip = pip
680
        my_sip = sip
681
        break
682
    if not my_pip:
683
      tmp[my_name] = ("Can't find my own primary/secondary IP"
684
                      " in the node list")
685
    else:
686
      for name, pip, sip in what[constants.NV_NODENETTEST]:
687
        fail = []
688
        if not netutils.TcpPing(pip, port, source=my_pip):
689
          fail.append("primary")
690
        if sip != pip:
691
          if not netutils.TcpPing(sip, port, source=my_sip):
692
            fail.append("secondary")
693
        if fail:
694
          tmp[name] = ("failure using the %s interface(s)" %
695
                       " and ".join(fail))
696

    
697
  if constants.NV_MASTERIP in what:
698
    # FIXME: add checks on incoming data structures (here and in the
699
    # rest of the function)
700
    master_name, master_ip = what[constants.NV_MASTERIP]
701
    if master_name == my_name:
702
      source = constants.IP4_ADDRESS_LOCALHOST
703
    else:
704
      source = None
705
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
706
                                                     source=source)
707

    
708
  if constants.NV_USERSCRIPTS in what:
709
    result[constants.NV_USERSCRIPTS] = \
710
      [script for script in what[constants.NV_USERSCRIPTS]
711
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
712

    
713
  if constants.NV_OOB_PATHS in what:
714
    result[constants.NV_OOB_PATHS] = tmp = []
715
    for path in what[constants.NV_OOB_PATHS]:
716
      try:
717
        st = os.stat(path)
718
      except OSError, err:
719
        tmp.append("error stating out of band helper: %s" % err)
720
      else:
721
        if stat.S_ISREG(st.st_mode):
722
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
723
            tmp.append(None)
724
          else:
725
            tmp.append("out of band helper %s is not executable" % path)
726
        else:
727
          tmp.append("out of band helper %s is not a file" % path)
728

    
729
  if constants.NV_LVLIST in what and vm_capable:
730
    try:
731
      val = GetVolumeList(utils.ListVolumeGroups().keys())
732
    except RPCFail, err:
733
      val = str(err)
734
    result[constants.NV_LVLIST] = val
735

    
736
  if constants.NV_INSTANCELIST in what and vm_capable:
737
    # GetInstanceList can fail
738
    try:
739
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
740
    except RPCFail, err:
741
      val = str(err)
742
    result[constants.NV_INSTANCELIST] = val
743

    
744
  if constants.NV_VGLIST in what and vm_capable:
745
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
746

    
747
  if constants.NV_PVLIST in what and vm_capable:
748
    result[constants.NV_PVLIST] = \
749
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
750
                                   filter_allocatable=False)
751

    
752
  if constants.NV_VERSION in what:
753
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
754
                                    constants.RELEASE_VERSION)
755

    
756
  if constants.NV_HVINFO in what and vm_capable:
757
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
758
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
759

    
760
  if constants.NV_DRBDLIST in what and vm_capable:
761
    try:
762
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
763
    except errors.BlockDeviceError, err:
764
      logging.warning("Can't get used minors list", exc_info=True)
765
      used_minors = str(err)
766
    result[constants.NV_DRBDLIST] = used_minors
767

    
768
  if constants.NV_DRBDHELPER in what and vm_capable:
769
    status = True
770
    try:
771
      payload = bdev.BaseDRBD.GetUsermodeHelper()
772
    except errors.BlockDeviceError, err:
773
      logging.error("Can't get DRBD usermode helper: %s", str(err))
774
      status = False
775
      payload = str(err)
776
    result[constants.NV_DRBDHELPER] = (status, payload)
777

    
778
  if constants.NV_NODESETUP in what:
779
    result[constants.NV_NODESETUP] = tmpr = []
780
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
781
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
782
                  " under /sys, missing required directories /sys/block"
783
                  " and /sys/class/net")
784
    if (not os.path.isdir("/proc/sys") or
785
        not os.path.isfile("/proc/sysrq-trigger")):
786
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
787
                  " under /proc, missing required directory /proc/sys and"
788
                  " the file /proc/sysrq-trigger")
789

    
790
  if constants.NV_TIME in what:
791
    result[constants.NV_TIME] = utils.SplitTime(time.time())
792

    
793
  if constants.NV_OSLIST in what and vm_capable:
794
    result[constants.NV_OSLIST] = DiagnoseOS()
795

    
796
  if constants.NV_BRIDGES in what and vm_capable:
797
    result[constants.NV_BRIDGES] = [bridge
798
                                    for bridge in what[constants.NV_BRIDGES]
799
                                    if not utils.BridgeExists(bridge)]
800
  return result
801

    
802

    
803
def GetBlockDevSizes(devices):
804
  """Return the size of the given block devices
805

806
  @type devices: list
807
  @param devices: list of block device nodes to query
808
  @rtype: dict
809
  @return:
810
    dictionary of all block devices under /dev (key). The value is their
811
    size in MiB.
812

813
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
814

815
  """
816
  DEV_PREFIX = "/dev/"
817
  blockdevs = {}
818

    
819
  for devpath in devices:
820
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
821
      continue
822

    
823
    try:
824
      st = os.stat(devpath)
825
    except EnvironmentError, err:
826
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
827
      continue
828

    
829
    if stat.S_ISBLK(st.st_mode):
830
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
831
      if result.failed:
832
        # We don't want to fail, just do not list this device as available
833
        logging.warning("Cannot get size for block device %s", devpath)
834
        continue
835

    
836
      size = int(result.stdout) / (1024 * 1024)
837
      blockdevs[devpath] = size
838
  return blockdevs
839

    
840

    
841
def GetVolumeList(vg_names):
842
  """Compute list of logical volumes and their size.
843

844
  @type vg_names: list
845
  @param vg_names: the volume groups whose LVs we should list, or
846
      empty for all volume groups
847
  @rtype: dict
848
  @return:
849
      dictionary of all partions (key) with value being a tuple of
850
      their size (in MiB), inactive and online status::
851

852
        {'xenvg/test1': ('20.06', True, True)}
853

854
      in case of errors, a string is returned with the error
855
      details.
856

857
  """
858
  lvs = {}
859
  sep = "|"
860
  if not vg_names:
861
    vg_names = []
862
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
863
                         "--separator=%s" % sep,
864
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
865
  if result.failed:
866
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
867

    
868
  for line in result.stdout.splitlines():
869
    line = line.strip()
870
    match = _LVSLINE_REGEX.match(line)
871
    if not match:
872
      logging.error("Invalid line returned from lvs output: '%s'", line)
873
      continue
874
    vg_name, name, size, attr = match.groups()
875
    inactive = attr[4] == "-"
876
    online = attr[5] == "o"
877
    virtual = attr[0] == "v"
878
    if virtual:
879
      # we don't want to report such volumes as existing, since they
880
      # don't really hold data
881
      continue
882
    lvs[vg_name + "/" + name] = (size, inactive, online)
883

    
884
  return lvs
885

    
886

    
887
def ListVolumeGroups():
888
  """List the volume groups and their size.
889

890
  @rtype: dict
891
  @return: dictionary with keys volume name and values the
892
      size of the volume
893

894
  """
895
  return utils.ListVolumeGroups()
896

    
897

    
898
def NodeVolumes():
899
  """List all volumes on this node.
900

901
  @rtype: list
902
  @return:
903
    A list of dictionaries, each having four keys:
904
      - name: the logical volume name,
905
      - size: the size of the logical volume
906
      - dev: the physical device on which the LV lives
907
      - vg: the volume group to which it belongs
908

909
    In case of errors, we return an empty list and log the
910
    error.
911

912
    Note that since a logical volume can live on multiple physical
913
    volumes, the resulting list might include a logical volume
914
    multiple times.
915

916
  """
917
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
918
                         "--separator=|",
919
                         "--options=lv_name,lv_size,devices,vg_name"])
920
  if result.failed:
921
    _Fail("Failed to list logical volumes, lvs output: %s",
922
          result.output)
923

    
924
  def parse_dev(dev):
925
    return dev.split("(")[0]
926

    
927
  def handle_dev(dev):
928
    return [parse_dev(x) for x in dev.split(",")]
929

    
930
  def map_line(line):
931
    line = [v.strip() for v in line]
932
    return [{"name": line[0], "size": line[1],
933
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
934

    
935
  all_devs = []
936
  for line in result.stdout.splitlines():
937
    if line.count("|") >= 3:
938
      all_devs.extend(map_line(line.split("|")))
939
    else:
940
      logging.warning("Strange line in the output from lvs: '%s'", line)
941
  return all_devs
942

    
943

    
944
def BridgesExist(bridges_list):
945
  """Check if a list of bridges exist on the current node.
946

947
  @rtype: boolean
948
  @return: C{True} if all of them exist, C{False} otherwise
949

950
  """
951
  missing = []
952
  for bridge in bridges_list:
953
    if not utils.BridgeExists(bridge):
954
      missing.append(bridge)
955

    
956
  if missing:
957
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
958

    
959

    
960
def GetInstanceList(hypervisor_list):
961
  """Provides a list of instances.
962

963
  @type hypervisor_list: list
964
  @param hypervisor_list: the list of hypervisors to query information
965

966
  @rtype: list
967
  @return: a list of all running instances on the current node
968
    - instance1.example.com
969
    - instance2.example.com
970

971
  """
972
  results = []
973
  for hname in hypervisor_list:
974
    try:
975
      names = hypervisor.GetHypervisor(hname).ListInstances()
976
      results.extend(names)
977
    except errors.HypervisorError, err:
978
      _Fail("Error enumerating instances (hypervisor %s): %s",
979
            hname, err, exc=True)
980

    
981
  return results
982

    
983

    
984
def GetInstanceInfo(instance, hname):
985
  """Gives back the information about an instance as a dictionary.
986

987
  @type instance: string
988
  @param instance: the instance name
989
  @type hname: string
990
  @param hname: the hypervisor type of the instance
991

992
  @rtype: dict
993
  @return: dictionary with the following keys:
994
      - memory: memory size of instance (int)
995
      - state: xen state of instance (string)
996
      - time: cpu time of instance (float)
997
      - vcpus: the number of vcpus (int)
998

999
  """
1000
  output = {}
1001

    
1002
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1003
  if iinfo is not None:
1004
    output["memory"] = iinfo[2]
1005
    output["vcpus"] = iinfo[3]
1006
    output["state"] = iinfo[4]
1007
    output["time"] = iinfo[5]
1008

    
1009
  return output
1010

    
1011

    
1012
def GetInstanceMigratable(instance):
1013
  """Gives whether an instance can be migrated.
1014

1015
  @type instance: L{objects.Instance}
1016
  @param instance: object representing the instance to be checked.
1017

1018
  @rtype: tuple
1019
  @return: tuple of (result, description) where:
1020
      - result: whether the instance can be migrated or not
1021
      - description: a description of the issue, if relevant
1022

1023
  """
1024
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1025
  iname = instance.name
1026
  if iname not in hyper.ListInstances():
1027
    _Fail("Instance %s is not running", iname)
1028

    
1029
  for idx in range(len(instance.disks)):
1030
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1031
    if not os.path.islink(link_name):
1032
      logging.warning("Instance %s is missing symlink %s for disk %d",
1033
                      iname, link_name, idx)
1034

    
1035

    
1036
def GetAllInstancesInfo(hypervisor_list):
1037
  """Gather data about all instances.
1038

1039
  This is the equivalent of L{GetInstanceInfo}, except that it
1040
  computes data for all instances at once, thus being faster if one
1041
  needs data about more than one instance.
1042

1043
  @type hypervisor_list: list
1044
  @param hypervisor_list: list of hypervisors to query for instance data
1045

1046
  @rtype: dict
1047
  @return: dictionary of instance: data, with data having the following keys:
1048
      - memory: memory size of instance (int)
1049
      - state: xen state of instance (string)
1050
      - time: cpu time of instance (float)
1051
      - vcpus: the number of vcpus
1052

1053
  """
1054
  output = {}
1055

    
1056
  for hname in hypervisor_list:
1057
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1058
    if iinfo:
1059
      for name, _, memory, vcpus, state, times in iinfo:
1060
        value = {
1061
          "memory": memory,
1062
          "vcpus": vcpus,
1063
          "state": state,
1064
          "time": times,
1065
          }
1066
        if name in output:
1067
          # we only check static parameters, like memory and vcpus,
1068
          # and not state and time which can change between the
1069
          # invocations of the different hypervisors
1070
          for key in "memory", "vcpus":
1071
            if value[key] != output[name][key]:
1072
              _Fail("Instance %s is running twice"
1073
                    " with different parameters", name)
1074
        output[name] = value
1075

    
1076
  return output
1077

    
1078

    
1079
def _InstanceLogName(kind, os_name, instance, component):
1080
  """Compute the OS log filename for a given instance and operation.
1081

1082
  The instance name and os name are passed in as strings since not all
1083
  operations have these as part of an instance object.
1084

1085
  @type kind: string
1086
  @param kind: the operation type (e.g. add, import, etc.)
1087
  @type os_name: string
1088
  @param os_name: the os name
1089
  @type instance: string
1090
  @param instance: the name of the instance being imported/added/etc.
1091
  @type component: string or None
1092
  @param component: the name of the component of the instance being
1093
      transferred
1094

1095
  """
1096
  # TODO: Use tempfile.mkstemp to create unique filename
1097
  if component:
1098
    assert "/" not in component
1099
    c_msg = "-%s" % component
1100
  else:
1101
    c_msg = ""
1102
  base = ("%s-%s-%s%s-%s.log" %
1103
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1104
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1105

    
1106

    
1107
def InstanceOsAdd(instance, reinstall, debug):
1108
  """Add an OS to an instance.
1109

1110
  @type instance: L{objects.Instance}
1111
  @param instance: Instance whose OS is to be installed
1112
  @type reinstall: boolean
1113
  @param reinstall: whether this is an instance reinstall
1114
  @type debug: integer
1115
  @param debug: debug level, passed to the OS scripts
1116
  @rtype: None
1117

1118
  """
1119
  inst_os = OSFromDisk(instance.os)
1120

    
1121
  create_env = OSEnvironment(instance, inst_os, debug)
1122
  if reinstall:
1123
    create_env["INSTANCE_REINSTALL"] = "1"
1124

    
1125
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1126

    
1127
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1128
                        cwd=inst_os.path, output=logfile, reset_env=True)
1129
  if result.failed:
1130
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1131
                  " output: %s", result.cmd, result.fail_reason, logfile,
1132
                  result.output)
1133
    lines = [utils.SafeEncode(val)
1134
             for val in utils.TailFile(logfile, lines=20)]
1135
    _Fail("OS create script failed (%s), last lines in the"
1136
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1137

    
1138

    
1139
def RunRenameInstance(instance, old_name, debug):
1140
  """Run the OS rename script for an instance.
1141

1142
  @type instance: L{objects.Instance}
1143
  @param instance: Instance whose OS is to be installed
1144
  @type old_name: string
1145
  @param old_name: previous instance name
1146
  @type debug: integer
1147
  @param debug: debug level, passed to the OS scripts
1148
  @rtype: boolean
1149
  @return: the success of the operation
1150

1151
  """
1152
  inst_os = OSFromDisk(instance.os)
1153

    
1154
  rename_env = OSEnvironment(instance, inst_os, debug)
1155
  rename_env["OLD_INSTANCE_NAME"] = old_name
1156

    
1157
  logfile = _InstanceLogName("rename", instance.os,
1158
                             "%s-%s" % (old_name, instance.name), None)
1159

    
1160
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1161
                        cwd=inst_os.path, output=logfile, reset_env=True)
1162

    
1163
  if result.failed:
1164
    logging.error("os create command '%s' returned error: %s output: %s",
1165
                  result.cmd, result.fail_reason, result.output)
1166
    lines = [utils.SafeEncode(val)
1167
             for val in utils.TailFile(logfile, lines=20)]
1168
    _Fail("OS rename script failed (%s), last lines in the"
1169
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1170

    
1171

    
1172
def _GetBlockDevSymlinkPath(instance_name, idx):
1173
  return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1174
                        (instance_name, constants.DISK_SEPARATOR, idx))
1175

    
1176

    
1177
def _SymlinkBlockDev(instance_name, device_path, idx):
1178
  """Set up symlinks to a instance's block device.
1179

1180
  This is an auxiliary function run when an instance is start (on the primary
1181
  node) or when an instance is migrated (on the target node).
1182

1183

1184
  @param instance_name: the name of the target instance
1185
  @param device_path: path of the physical block device, on the node
1186
  @param idx: the disk index
1187
  @return: absolute path to the disk's symlink
1188

1189
  """
1190
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1191
  try:
1192
    os.symlink(device_path, link_name)
1193
  except OSError, err:
1194
    if err.errno == errno.EEXIST:
1195
      if (not os.path.islink(link_name) or
1196
          os.readlink(link_name) != device_path):
1197
        os.remove(link_name)
1198
        os.symlink(device_path, link_name)
1199
    else:
1200
      raise
1201

    
1202
  return link_name
1203

    
1204

    
1205
def _RemoveBlockDevLinks(instance_name, disks):
1206
  """Remove the block device symlinks belonging to the given instance.
1207

1208
  """
1209
  for idx, _ in enumerate(disks):
1210
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1211
    if os.path.islink(link_name):
1212
      try:
1213
        os.remove(link_name)
1214
      except OSError:
1215
        logging.exception("Can't remove symlink '%s'", link_name)
1216

    
1217

    
1218
def _GatherAndLinkBlockDevs(instance):
1219
  """Set up an instance's block device(s).
1220

1221
  This is run on the primary node at instance startup. The block
1222
  devices must be already assembled.
1223

1224
  @type instance: L{objects.Instance}
1225
  @param instance: the instance whose disks we shoul assemble
1226
  @rtype: list
1227
  @return: list of (disk_object, device_path)
1228

1229
  """
1230
  block_devices = []
1231
  for idx, disk in enumerate(instance.disks):
1232
    device = _RecursiveFindBD(disk)
1233
    if device is None:
1234
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1235
                                    str(disk))
1236
    device.Open()
1237
    try:
1238
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1239
    except OSError, e:
1240
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1241
                                    e.strerror)
1242

    
1243
    block_devices.append((disk, link_name))
1244

    
1245
  return block_devices
1246

    
1247

    
1248
def StartInstance(instance, startup_paused):
1249
  """Start an instance.
1250

1251
  @type instance: L{objects.Instance}
1252
  @param instance: the instance object
1253
  @type startup_paused: bool
1254
  @param instance: pause instance at startup?
1255
  @rtype: None
1256

1257
  """
1258
  running_instances = GetInstanceList([instance.hypervisor])
1259

    
1260
  if instance.name in running_instances:
1261
    logging.info("Instance %s already running, not starting", instance.name)
1262
    return
1263

    
1264
  try:
1265
    block_devices = _GatherAndLinkBlockDevs(instance)
1266
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1267
    hyper.StartInstance(instance, block_devices, startup_paused)
1268
  except errors.BlockDeviceError, err:
1269
    _Fail("Block device error: %s", err, exc=True)
1270
  except errors.HypervisorError, err:
1271
    _RemoveBlockDevLinks(instance.name, instance.disks)
1272
    _Fail("Hypervisor error: %s", err, exc=True)
1273

    
1274

    
1275
def InstanceShutdown(instance, timeout):
1276
  """Shut an instance down.
1277

1278
  @note: this functions uses polling with a hardcoded timeout.
1279

1280
  @type instance: L{objects.Instance}
1281
  @param instance: the instance object
1282
  @type timeout: integer
1283
  @param timeout: maximum timeout for soft shutdown
1284
  @rtype: None
1285

1286
  """
1287
  hv_name = instance.hypervisor
1288
  hyper = hypervisor.GetHypervisor(hv_name)
1289
  iname = instance.name
1290

    
1291
  if instance.name not in hyper.ListInstances():
1292
    logging.info("Instance %s not running, doing nothing", iname)
1293
    return
1294

    
1295
  class _TryShutdown:
1296
    def __init__(self):
1297
      self.tried_once = False
1298

    
1299
    def __call__(self):
1300
      if iname not in hyper.ListInstances():
1301
        return
1302

    
1303
      try:
1304
        hyper.StopInstance(instance, retry=self.tried_once)
1305
      except errors.HypervisorError, err:
1306
        if iname not in hyper.ListInstances():
1307
          # if the instance is no longer existing, consider this a
1308
          # success and go to cleanup
1309
          return
1310

    
1311
        _Fail("Failed to stop instance %s: %s", iname, err)
1312

    
1313
      self.tried_once = True
1314

    
1315
      raise utils.RetryAgain()
1316

    
1317
  try:
1318
    utils.Retry(_TryShutdown(), 5, timeout)
1319
  except utils.RetryTimeout:
1320
    # the shutdown did not succeed
1321
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1322

    
1323
    try:
1324
      hyper.StopInstance(instance, force=True)
1325
    except errors.HypervisorError, err:
1326
      if iname in hyper.ListInstances():
1327
        # only raise an error if the instance still exists, otherwise
1328
        # the error could simply be "instance ... unknown"!
1329
        _Fail("Failed to force stop instance %s: %s", iname, err)
1330

    
1331
    time.sleep(1)
1332

    
1333
    if iname in hyper.ListInstances():
1334
      _Fail("Could not shutdown instance %s even by destroy", iname)
1335

    
1336
  try:
1337
    hyper.CleanupInstance(instance.name)
1338
  except errors.HypervisorError, err:
1339
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1340

    
1341
  _RemoveBlockDevLinks(iname, instance.disks)
1342

    
1343

    
1344
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1345
  """Reboot an instance.
1346

1347
  @type instance: L{objects.Instance}
1348
  @param instance: the instance object to reboot
1349
  @type reboot_type: str
1350
  @param reboot_type: the type of reboot, one the following
1351
    constants:
1352
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1353
        instance OS, do not recreate the VM
1354
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1355
        restart the VM (at the hypervisor level)
1356
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1357
        not accepted here, since that mode is handled differently, in
1358
        cmdlib, and translates into full stop and start of the
1359
        instance (instead of a call_instance_reboot RPC)
1360
  @type shutdown_timeout: integer
1361
  @param shutdown_timeout: maximum timeout for soft shutdown
1362
  @rtype: None
1363

1364
  """
1365
  running_instances = GetInstanceList([instance.hypervisor])
1366

    
1367
  if instance.name not in running_instances:
1368
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1369

    
1370
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1371
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1372
    try:
1373
      hyper.RebootInstance(instance)
1374
    except errors.HypervisorError, err:
1375
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1376
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1377
    try:
1378
      InstanceShutdown(instance, shutdown_timeout)
1379
      return StartInstance(instance, False)
1380
    except errors.HypervisorError, err:
1381
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1382
  else:
1383
    _Fail("Invalid reboot_type received: %s", reboot_type)
1384

    
1385

    
1386
def InstanceBalloonMemory(instance, memory):
1387
  """Resize an instance's memory.
1388

1389
  @type instance: L{objects.Instance}
1390
  @param instance: the instance object
1391
  @type memory: int
1392
  @param memory: new memory amount in MB
1393
  @rtype: None
1394

1395
  """
1396
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1397
  running = hyper.ListInstances()
1398
  if instance.name not in running:
1399
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1400
    return
1401
  try:
1402
    hyper.BalloonInstanceMemory(instance, memory)
1403
  except errors.HypervisorError, err:
1404
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1405

    
1406

    
1407
def MigrationInfo(instance):
1408
  """Gather information about an instance to be migrated.
1409

1410
  @type instance: L{objects.Instance}
1411
  @param instance: the instance definition
1412

1413
  """
1414
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1415
  try:
1416
    info = hyper.MigrationInfo(instance)
1417
  except errors.HypervisorError, err:
1418
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1419
  return info
1420

    
1421

    
1422
def AcceptInstance(instance, info, target):
1423
  """Prepare the node to accept an instance.
1424

1425
  @type instance: L{objects.Instance}
1426
  @param instance: the instance definition
1427
  @type info: string/data (opaque)
1428
  @param info: migration information, from the source node
1429
  @type target: string
1430
  @param target: target host (usually ip), on this node
1431

1432
  """
1433
  # TODO: why is this required only for DTS_EXT_MIRROR?
1434
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1435
    # Create the symlinks, as the disks are not active
1436
    # in any way
1437
    try:
1438
      _GatherAndLinkBlockDevs(instance)
1439
    except errors.BlockDeviceError, err:
1440
      _Fail("Block device error: %s", err, exc=True)
1441

    
1442
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1443
  try:
1444
    hyper.AcceptInstance(instance, info, target)
1445
  except errors.HypervisorError, err:
1446
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1447
      _RemoveBlockDevLinks(instance.name, instance.disks)
1448
    _Fail("Failed to accept instance: %s", err, exc=True)
1449

    
1450

    
1451
def FinalizeMigrationDst(instance, info, success):
1452
  """Finalize any preparation to accept an instance.
1453

1454
  @type instance: L{objects.Instance}
1455
  @param instance: the instance definition
1456
  @type info: string/data (opaque)
1457
  @param info: migration information, from the source node
1458
  @type success: boolean
1459
  @param success: whether the migration was a success or a failure
1460

1461
  """
1462
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1463
  try:
1464
    hyper.FinalizeMigrationDst(instance, info, success)
1465
  except errors.HypervisorError, err:
1466
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1467

    
1468

    
1469
def MigrateInstance(instance, target, live):
1470
  """Migrates an instance to another node.
1471

1472
  @type instance: L{objects.Instance}
1473
  @param instance: the instance definition
1474
  @type target: string
1475
  @param target: the target node name
1476
  @type live: boolean
1477
  @param live: whether the migration should be done live or not (the
1478
      interpretation of this parameter is left to the hypervisor)
1479
  @raise RPCFail: if migration fails for some reason
1480

1481
  """
1482
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1483

    
1484
  try:
1485
    hyper.MigrateInstance(instance, target, live)
1486
  except errors.HypervisorError, err:
1487
    _Fail("Failed to migrate instance: %s", err, exc=True)
1488

    
1489

    
1490
def FinalizeMigrationSource(instance, success, live):
1491
  """Finalize the instance migration on the source node.
1492

1493
  @type instance: L{objects.Instance}
1494
  @param instance: the instance definition of the migrated instance
1495
  @type success: bool
1496
  @param success: whether the migration succeeded or not
1497
  @type live: bool
1498
  @param live: whether the user requested a live migration or not
1499
  @raise RPCFail: If the execution fails for some reason
1500

1501
  """
1502
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1503

    
1504
  try:
1505
    hyper.FinalizeMigrationSource(instance, success, live)
1506
  except Exception, err:  # pylint: disable=W0703
1507
    _Fail("Failed to finalize the migration on the source node: %s", err,
1508
          exc=True)
1509

    
1510

    
1511
def GetMigrationStatus(instance):
1512
  """Get the migration status
1513

1514
  @type instance: L{objects.Instance}
1515
  @param instance: the instance that is being migrated
1516
  @rtype: L{objects.MigrationStatus}
1517
  @return: the status of the current migration (one of
1518
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1519
           progress info that can be retrieved from the hypervisor
1520
  @raise RPCFail: If the migration status cannot be retrieved
1521

1522
  """
1523
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1524
  try:
1525
    return hyper.GetMigrationStatus(instance)
1526
  except Exception, err:  # pylint: disable=W0703
1527
    _Fail("Failed to get migration status: %s", err, exc=True)
1528

    
1529

    
1530
def BlockdevCreate(disk, size, owner, on_primary, info):
1531
  """Creates a block device for an instance.
1532

1533
  @type disk: L{objects.Disk}
1534
  @param disk: the object describing the disk we should create
1535
  @type size: int
1536
  @param size: the size of the physical underlying device, in MiB
1537
  @type owner: str
1538
  @param owner: the name of the instance for which disk is created,
1539
      used for device cache data
1540
  @type on_primary: boolean
1541
  @param on_primary:  indicates if it is the primary node or not
1542
  @type info: string
1543
  @param info: string that will be sent to the physical device
1544
      creation, used for example to set (LVM) tags on LVs
1545

1546
  @return: the new unique_id of the device (this can sometime be
1547
      computed only after creation), or None. On secondary nodes,
1548
      it's not required to return anything.
1549

1550
  """
1551
  # TODO: remove the obsolete "size" argument
1552
  # pylint: disable=W0613
1553
  clist = []
1554
  if disk.children:
1555
    for child in disk.children:
1556
      try:
1557
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1558
      except errors.BlockDeviceError, err:
1559
        _Fail("Can't assemble device %s: %s", child, err)
1560
      if on_primary or disk.AssembleOnSecondary():
1561
        # we need the children open in case the device itself has to
1562
        # be assembled
1563
        try:
1564
          # pylint: disable=E1103
1565
          crdev.Open()
1566
        except errors.BlockDeviceError, err:
1567
          _Fail("Can't make child '%s' read-write: %s", child, err)
1568
      clist.append(crdev)
1569

    
1570
  try:
1571
    device = bdev.Create(disk, clist)
1572
  except errors.BlockDeviceError, err:
1573
    _Fail("Can't create block device: %s", err)
1574

    
1575
  if on_primary or disk.AssembleOnSecondary():
1576
    try:
1577
      device.Assemble()
1578
    except errors.BlockDeviceError, err:
1579
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1580
    if on_primary or disk.OpenOnSecondary():
1581
      try:
1582
        device.Open(force=True)
1583
      except errors.BlockDeviceError, err:
1584
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1585
    DevCacheManager.UpdateCache(device.dev_path, owner,
1586
                                on_primary, disk.iv_name)
1587

    
1588
  device.SetInfo(info)
1589

    
1590
  return device.unique_id
1591

    
1592

    
1593
def _WipeDevice(path, offset, size):
1594
  """This function actually wipes the device.
1595

1596
  @param path: The path to the device to wipe
1597
  @param offset: The offset in MiB in the file
1598
  @param size: The size in MiB to write
1599

1600
  """
1601
  # Internal sizes are always in Mebibytes; if the following "dd" command
1602
  # should use a different block size the offset and size given to this
1603
  # function must be adjusted accordingly before being passed to "dd".
1604
  block_size = 1024 * 1024
1605

    
1606
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1607
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1608
         "count=%d" % size]
1609
  result = utils.RunCmd(cmd)
1610

    
1611
  if result.failed:
1612
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1613
          result.fail_reason, result.output)
1614

    
1615

    
1616
def BlockdevWipe(disk, offset, size):
1617
  """Wipes a block device.
1618

1619
  @type disk: L{objects.Disk}
1620
  @param disk: the disk object we want to wipe
1621
  @type offset: int
1622
  @param offset: The offset in MiB in the file
1623
  @type size: int
1624
  @param size: The size in MiB to write
1625

1626
  """
1627
  try:
1628
    rdev = _RecursiveFindBD(disk)
1629
  except errors.BlockDeviceError:
1630
    rdev = None
1631

    
1632
  if not rdev:
1633
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1634

    
1635
  # Do cross verify some of the parameters
1636
  if offset < 0:
1637
    _Fail("Negative offset")
1638
  if size < 0:
1639
    _Fail("Negative size")
1640
  if offset > rdev.size:
1641
    _Fail("Offset is bigger than device size")
1642
  if (offset + size) > rdev.size:
1643
    _Fail("The provided offset and size to wipe is bigger than device size")
1644

    
1645
  _WipeDevice(rdev.dev_path, offset, size)
1646

    
1647

    
1648
def BlockdevPauseResumeSync(disks, pause):
1649
  """Pause or resume the sync of the block device.
1650

1651
  @type disks: list of L{objects.Disk}
1652
  @param disks: the disks object we want to pause/resume
1653
  @type pause: bool
1654
  @param pause: Wheater to pause or resume
1655

1656
  """
1657
  success = []
1658
  for disk in disks:
1659
    try:
1660
      rdev = _RecursiveFindBD(disk)
1661
    except errors.BlockDeviceError:
1662
      rdev = None
1663

    
1664
    if not rdev:
1665
      success.append((False, ("Cannot change sync for device %s:"
1666
                              " device not found" % disk.iv_name)))
1667
      continue
1668

    
1669
    result = rdev.PauseResumeSync(pause)
1670

    
1671
    if result:
1672
      success.append((result, None))
1673
    else:
1674
      if pause:
1675
        msg = "Pause"
1676
      else:
1677
        msg = "Resume"
1678
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1679

    
1680
  return success
1681

    
1682

    
1683
def BlockdevRemove(disk):
1684
  """Remove a block device.
1685

1686
  @note: This is intended to be called recursively.
1687

1688
  @type disk: L{objects.Disk}
1689
  @param disk: the disk object we should remove
1690
  @rtype: boolean
1691
  @return: the success of the operation
1692

1693
  """
1694
  msgs = []
1695
  try:
1696
    rdev = _RecursiveFindBD(disk)
1697
  except errors.BlockDeviceError, err:
1698
    # probably can't attach
1699
    logging.info("Can't attach to device %s in remove", disk)
1700
    rdev = None
1701
  if rdev is not None:
1702
    r_path = rdev.dev_path
1703
    try:
1704
      rdev.Remove()
1705
    except errors.BlockDeviceError, err:
1706
      msgs.append(str(err))
1707
    if not msgs:
1708
      DevCacheManager.RemoveCache(r_path)
1709

    
1710
  if disk.children:
1711
    for child in disk.children:
1712
      try:
1713
        BlockdevRemove(child)
1714
      except RPCFail, err:
1715
        msgs.append(str(err))
1716

    
1717
  if msgs:
1718
    _Fail("; ".join(msgs))
1719

    
1720

    
1721
def _RecursiveAssembleBD(disk, owner, as_primary):
1722
  """Activate a block device for an instance.
1723

1724
  This is run on the primary and secondary nodes for an instance.
1725

1726
  @note: this function is called recursively.
1727

1728
  @type disk: L{objects.Disk}
1729
  @param disk: the disk we try to assemble
1730
  @type owner: str
1731
  @param owner: the name of the instance which owns the disk
1732
  @type as_primary: boolean
1733
  @param as_primary: if we should make the block device
1734
      read/write
1735

1736
  @return: the assembled device or None (in case no device
1737
      was assembled)
1738
  @raise errors.BlockDeviceError: in case there is an error
1739
      during the activation of the children or the device
1740
      itself
1741

1742
  """
1743
  children = []
1744
  if disk.children:
1745
    mcn = disk.ChildrenNeeded()
1746
    if mcn == -1:
1747
      mcn = 0 # max number of Nones allowed
1748
    else:
1749
      mcn = len(disk.children) - mcn # max number of Nones
1750
    for chld_disk in disk.children:
1751
      try:
1752
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1753
      except errors.BlockDeviceError, err:
1754
        if children.count(None) >= mcn:
1755
          raise
1756
        cdev = None
1757
        logging.error("Error in child activation (but continuing): %s",
1758
                      str(err))
1759
      children.append(cdev)
1760

    
1761
  if as_primary or disk.AssembleOnSecondary():
1762
    r_dev = bdev.Assemble(disk, children)
1763
    result = r_dev
1764
    if as_primary or disk.OpenOnSecondary():
1765
      r_dev.Open()
1766
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1767
                                as_primary, disk.iv_name)
1768

    
1769
  else:
1770
    result = True
1771
  return result
1772

    
1773

    
1774
def BlockdevAssemble(disk, owner, as_primary, idx):
1775
  """Activate a block device for an instance.
1776

1777
  This is a wrapper over _RecursiveAssembleBD.
1778

1779
  @rtype: str or boolean
1780
  @return: a C{/dev/...} path for primary nodes, and
1781
      C{True} for secondary nodes
1782

1783
  """
1784
  try:
1785
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1786
    if isinstance(result, bdev.BlockDev):
1787
      # pylint: disable=E1103
1788
      result = result.dev_path
1789
      if as_primary:
1790
        _SymlinkBlockDev(owner, result, idx)
1791
  except errors.BlockDeviceError, err:
1792
    _Fail("Error while assembling disk: %s", err, exc=True)
1793
  except OSError, err:
1794
    _Fail("Error while symlinking disk: %s", err, exc=True)
1795

    
1796
  return result
1797

    
1798

    
1799
def BlockdevShutdown(disk):
1800
  """Shut down a block device.
1801

1802
  First, if the device is assembled (Attach() is successful), then
1803
  the device is shutdown. Then the children of the device are
1804
  shutdown.
1805

1806
  This function is called recursively. Note that we don't cache the
1807
  children or such, as oppossed to assemble, shutdown of different
1808
  devices doesn't require that the upper device was active.
1809

1810
  @type disk: L{objects.Disk}
1811
  @param disk: the description of the disk we should
1812
      shutdown
1813
  @rtype: None
1814

1815
  """
1816
  msgs = []
1817
  r_dev = _RecursiveFindBD(disk)
1818
  if r_dev is not None:
1819
    r_path = r_dev.dev_path
1820
    try:
1821
      r_dev.Shutdown()
1822
      DevCacheManager.RemoveCache(r_path)
1823
    except errors.BlockDeviceError, err:
1824
      msgs.append(str(err))
1825

    
1826
  if disk.children:
1827
    for child in disk.children:
1828
      try:
1829
        BlockdevShutdown(child)
1830
      except RPCFail, err:
1831
        msgs.append(str(err))
1832

    
1833
  if msgs:
1834
    _Fail("; ".join(msgs))
1835

    
1836

    
1837
def BlockdevAddchildren(parent_cdev, new_cdevs):
1838
  """Extend a mirrored block device.
1839

1840
  @type parent_cdev: L{objects.Disk}
1841
  @param parent_cdev: the disk to which we should add children
1842
  @type new_cdevs: list of L{objects.Disk}
1843
  @param new_cdevs: the list of children which we should add
1844
  @rtype: None
1845

1846
  """
1847
  parent_bdev = _RecursiveFindBD(parent_cdev)
1848
  if parent_bdev is None:
1849
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1850
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1851
  if new_bdevs.count(None) > 0:
1852
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1853
  parent_bdev.AddChildren(new_bdevs)
1854

    
1855

    
1856
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1857
  """Shrink a mirrored block device.
1858

1859
  @type parent_cdev: L{objects.Disk}
1860
  @param parent_cdev: the disk from which we should remove children
1861
  @type new_cdevs: list of L{objects.Disk}
1862
  @param new_cdevs: the list of children which we should remove
1863
  @rtype: None
1864

1865
  """
1866
  parent_bdev = _RecursiveFindBD(parent_cdev)
1867
  if parent_bdev is None:
1868
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1869
  devs = []
1870
  for disk in new_cdevs:
1871
    rpath = disk.StaticDevPath()
1872
    if rpath is None:
1873
      bd = _RecursiveFindBD(disk)
1874
      if bd is None:
1875
        _Fail("Can't find device %s while removing children", disk)
1876
      else:
1877
        devs.append(bd.dev_path)
1878
    else:
1879
      if not utils.IsNormAbsPath(rpath):
1880
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1881
      devs.append(rpath)
1882
  parent_bdev.RemoveChildren(devs)
1883

    
1884

    
1885
def BlockdevGetmirrorstatus(disks):
1886
  """Get the mirroring status of a list of devices.
1887

1888
  @type disks: list of L{objects.Disk}
1889
  @param disks: the list of disks which we should query
1890
  @rtype: disk
1891
  @return: List of L{objects.BlockDevStatus}, one for each disk
1892
  @raise errors.BlockDeviceError: if any of the disks cannot be
1893
      found
1894

1895
  """
1896
  stats = []
1897
  for dsk in disks:
1898
    rbd = _RecursiveFindBD(dsk)
1899
    if rbd is None:
1900
      _Fail("Can't find device %s", dsk)
1901

    
1902
    stats.append(rbd.CombinedSyncStatus())
1903

    
1904
  return stats
1905

    
1906

    
1907
def BlockdevGetmirrorstatusMulti(disks):
1908
  """Get the mirroring status of a list of devices.
1909

1910
  @type disks: list of L{objects.Disk}
1911
  @param disks: the list of disks which we should query
1912
  @rtype: disk
1913
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1914
    success/failure, status is L{objects.BlockDevStatus} on success, string
1915
    otherwise
1916

1917
  """
1918
  result = []
1919
  for disk in disks:
1920
    try:
1921
      rbd = _RecursiveFindBD(disk)
1922
      if rbd is None:
1923
        result.append((False, "Can't find device %s" % disk))
1924
        continue
1925

    
1926
      status = rbd.CombinedSyncStatus()
1927
    except errors.BlockDeviceError, err:
1928
      logging.exception("Error while getting disk status")
1929
      result.append((False, str(err)))
1930
    else:
1931
      result.append((True, status))
1932

    
1933
  assert len(disks) == len(result)
1934

    
1935
  return result
1936

    
1937

    
1938
def _RecursiveFindBD(disk):
1939
  """Check if a device is activated.
1940

1941
  If so, return information about the real device.
1942

1943
  @type disk: L{objects.Disk}
1944
  @param disk: the disk object we need to find
1945

1946
  @return: None if the device can't be found,
1947
      otherwise the device instance
1948

1949
  """
1950
  children = []
1951
  if disk.children:
1952
    for chdisk in disk.children:
1953
      children.append(_RecursiveFindBD(chdisk))
1954

    
1955
  return bdev.FindDevice(disk, children)
1956

    
1957

    
1958
def _OpenRealBD(disk):
1959
  """Opens the underlying block device of a disk.
1960

1961
  @type disk: L{objects.Disk}
1962
  @param disk: the disk object we want to open
1963

1964
  """
1965
  real_disk = _RecursiveFindBD(disk)
1966
  if real_disk is None:
1967
    _Fail("Block device '%s' is not set up", disk)
1968

    
1969
  real_disk.Open()
1970

    
1971
  return real_disk
1972

    
1973

    
1974
def BlockdevFind(disk):
1975
  """Check if a device is activated.
1976

1977
  If it is, return information about the real device.
1978

1979
  @type disk: L{objects.Disk}
1980
  @param disk: the disk to find
1981
  @rtype: None or objects.BlockDevStatus
1982
  @return: None if the disk cannot be found, otherwise a the current
1983
           information
1984

1985
  """
1986
  try:
1987
    rbd = _RecursiveFindBD(disk)
1988
  except errors.BlockDeviceError, err:
1989
    _Fail("Failed to find device: %s", err, exc=True)
1990

    
1991
  if rbd is None:
1992
    return None
1993

    
1994
  return rbd.GetSyncStatus()
1995

    
1996

    
1997
def BlockdevGetsize(disks):
1998
  """Computes the size of the given disks.
1999

2000
  If a disk is not found, returns None instead.
2001

2002
  @type disks: list of L{objects.Disk}
2003
  @param disks: the list of disk to compute the size for
2004
  @rtype: list
2005
  @return: list with elements None if the disk cannot be found,
2006
      otherwise the size
2007

2008
  """
2009
  result = []
2010
  for cf in disks:
2011
    try:
2012
      rbd = _RecursiveFindBD(cf)
2013
    except errors.BlockDeviceError:
2014
      result.append(None)
2015
      continue
2016
    if rbd is None:
2017
      result.append(None)
2018
    else:
2019
      result.append(rbd.GetActualSize())
2020
  return result
2021

    
2022

    
2023
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2024
  """Export a block device to a remote node.
2025

2026
  @type disk: L{objects.Disk}
2027
  @param disk: the description of the disk to export
2028
  @type dest_node: str
2029
  @param dest_node: the destination node to export to
2030
  @type dest_path: str
2031
  @param dest_path: the destination path on the target node
2032
  @type cluster_name: str
2033
  @param cluster_name: the cluster name, needed for SSH hostalias
2034
  @rtype: None
2035

2036
  """
2037
  real_disk = _OpenRealBD(disk)
2038

    
2039
  # the block size on the read dd is 1MiB to match our units
2040
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2041
                               "dd if=%s bs=1048576 count=%s",
2042
                               real_disk.dev_path, str(disk.size))
2043

    
2044
  # we set here a smaller block size as, due to ssh buffering, more
2045
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2046
  # device is not already there or we pass a wrong path; we use
2047
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2048
  # to not buffer too much memory; this means that at best, we flush
2049
  # every 64k, which will not be very fast
2050
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2051
                                " oflag=dsync", dest_path)
2052

    
2053
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2054
                                                   constants.SSH_LOGIN_USER,
2055
                                                   destcmd)
2056

    
2057
  # all commands have been checked, so we're safe to combine them
2058
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2059

    
2060
  result = utils.RunCmd(["bash", "-c", command])
2061

    
2062
  if result.failed:
2063
    _Fail("Disk copy command '%s' returned error: %s"
2064
          " output: %s", command, result.fail_reason, result.output)
2065

    
2066

    
2067
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2068
  """Write a file to the filesystem.
2069

2070
  This allows the master to overwrite(!) a file. It will only perform
2071
  the operation if the file belongs to a list of configuration files.
2072

2073
  @type file_name: str
2074
  @param file_name: the target file name
2075
  @type data: str
2076
  @param data: the new contents of the file
2077
  @type mode: int
2078
  @param mode: the mode to give the file (can be None)
2079
  @type uid: string
2080
  @param uid: the owner of the file
2081
  @type gid: string
2082
  @param gid: the group of the file
2083
  @type atime: float
2084
  @param atime: the atime to set on the file (can be None)
2085
  @type mtime: float
2086
  @param mtime: the mtime to set on the file (can be None)
2087
  @rtype: None
2088

2089
  """
2090
  file_name = vcluster.LocalizeVirtualPath(file_name)
2091

    
2092
  if not os.path.isabs(file_name):
2093
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2094

    
2095
  if file_name not in _ALLOWED_UPLOAD_FILES:
2096
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2097
          file_name)
2098

    
2099
  raw_data = _Decompress(data)
2100

    
2101
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2102
    _Fail("Invalid username/groupname type")
2103

    
2104
  getents = runtime.GetEnts()
2105
  uid = getents.LookupUser(uid)
2106
  gid = getents.LookupGroup(gid)
2107

    
2108
  utils.SafeWriteFile(file_name, None,
2109
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2110
                      atime=atime, mtime=mtime)
2111

    
2112

    
2113
def RunOob(oob_program, command, node, timeout):
2114
  """Executes oob_program with given command on given node.
2115

2116
  @param oob_program: The path to the executable oob_program
2117
  @param command: The command to invoke on oob_program
2118
  @param node: The node given as an argument to the program
2119
  @param timeout: Timeout after which we kill the oob program
2120

2121
  @return: stdout
2122
  @raise RPCFail: If execution fails for some reason
2123

2124
  """
2125
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2126

    
2127
  if result.failed:
2128
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2129
          result.fail_reason, result.output)
2130

    
2131
  return result.stdout
2132

    
2133

    
2134
def _OSOndiskAPIVersion(os_dir):
2135
  """Compute and return the API version of a given OS.
2136

2137
  This function will try to read the API version of the OS residing in
2138
  the 'os_dir' directory.
2139

2140
  @type os_dir: str
2141
  @param os_dir: the directory in which we should look for the OS
2142
  @rtype: tuple
2143
  @return: tuple (status, data) with status denoting the validity and
2144
      data holding either the vaid versions or an error message
2145

2146
  """
2147
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2148

    
2149
  try:
2150
    st = os.stat(api_file)
2151
  except EnvironmentError, err:
2152
    return False, ("Required file '%s' not found under path %s: %s" %
2153
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2154

    
2155
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2156
    return False, ("File '%s' in %s is not a regular file" %
2157
                   (constants.OS_API_FILE, os_dir))
2158

    
2159
  try:
2160
    api_versions = utils.ReadFile(api_file).splitlines()
2161
  except EnvironmentError, err:
2162
    return False, ("Error while reading the API version file at %s: %s" %
2163
                   (api_file, utils.ErrnoOrStr(err)))
2164

    
2165
  try:
2166
    api_versions = [int(version.strip()) for version in api_versions]
2167
  except (TypeError, ValueError), err:
2168
    return False, ("API version(s) can't be converted to integer: %s" %
2169
                   str(err))
2170

    
2171
  return True, api_versions
2172

    
2173

    
2174
def DiagnoseOS(top_dirs=None):
2175
  """Compute the validity for all OSes.
2176

2177
  @type top_dirs: list
2178
  @param top_dirs: the list of directories in which to
2179
      search (if not given defaults to
2180
      L{pathutils.OS_SEARCH_PATH})
2181
  @rtype: list of L{objects.OS}
2182
  @return: a list of tuples (name, path, status, diagnose, variants,
2183
      parameters, api_version) for all (potential) OSes under all
2184
      search paths, where:
2185
          - name is the (potential) OS name
2186
          - path is the full path to the OS
2187
          - status True/False is the validity of the OS
2188
          - diagnose is the error message for an invalid OS, otherwise empty
2189
          - variants is a list of supported OS variants, if any
2190
          - parameters is a list of (name, help) parameters, if any
2191
          - api_version is a list of support OS API versions
2192

2193
  """
2194
  if top_dirs is None:
2195
    top_dirs = pathutils.OS_SEARCH_PATH
2196

    
2197
  result = []
2198
  for dir_name in top_dirs:
2199
    if os.path.isdir(dir_name):
2200
      try:
2201
        f_names = utils.ListVisibleFiles(dir_name)
2202
      except EnvironmentError, err:
2203
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2204
        break
2205
      for name in f_names:
2206
        os_path = utils.PathJoin(dir_name, name)
2207
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2208
        if status:
2209
          diagnose = ""
2210
          variants = os_inst.supported_variants
2211
          parameters = os_inst.supported_parameters
2212
          api_versions = os_inst.api_versions
2213
        else:
2214
          diagnose = os_inst
2215
          variants = parameters = api_versions = []
2216
        result.append((name, os_path, status, diagnose, variants,
2217
                       parameters, api_versions))
2218

    
2219
  return result
2220

    
2221

    
2222
def _TryOSFromDisk(name, base_dir=None):
2223
  """Create an OS instance from disk.
2224

2225
  This function will return an OS instance if the given name is a
2226
  valid OS name.
2227

2228
  @type base_dir: string
2229
  @keyword base_dir: Base directory containing OS installations.
2230
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2231
  @rtype: tuple
2232
  @return: success and either the OS instance if we find a valid one,
2233
      or error message
2234

2235
  """
2236
  if base_dir is None:
2237
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2238
  else:
2239
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2240

    
2241
  if os_dir is None:
2242
    return False, "Directory for OS %s not found in search path" % name
2243

    
2244
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2245
  if not status:
2246
    # push the error up
2247
    return status, api_versions
2248

    
2249
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2250
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2251
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2252

    
2253
  # OS Files dictionary, we will populate it with the absolute path
2254
  # names; if the value is True, then it is a required file, otherwise
2255
  # an optional one
2256
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2257

    
2258
  if max(api_versions) >= constants.OS_API_V15:
2259
    os_files[constants.OS_VARIANTS_FILE] = False
2260

    
2261
  if max(api_versions) >= constants.OS_API_V20:
2262
    os_files[constants.OS_PARAMETERS_FILE] = True
2263
  else:
2264
    del os_files[constants.OS_SCRIPT_VERIFY]
2265

    
2266
  for (filename, required) in os_files.items():
2267
    os_files[filename] = utils.PathJoin(os_dir, filename)
2268

    
2269
    try:
2270
      st = os.stat(os_files[filename])
2271
    except EnvironmentError, err:
2272
      if err.errno == errno.ENOENT and not required:
2273
        del os_files[filename]
2274
        continue
2275
      return False, ("File '%s' under path '%s' is missing (%s)" %
2276
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2277

    
2278
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2279
      return False, ("File '%s' under path '%s' is not a regular file" %
2280
                     (filename, os_dir))
2281

    
2282
    if filename in constants.OS_SCRIPTS:
2283
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2284
        return False, ("File '%s' under path '%s' is not executable" %
2285
                       (filename, os_dir))
2286

    
2287
  variants = []
2288
  if constants.OS_VARIANTS_FILE in os_files:
2289
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2290
    try:
2291
      variants = utils.ReadFile(variants_file).splitlines()
2292
    except EnvironmentError, err:
2293
      # we accept missing files, but not other errors
2294
      if err.errno != errno.ENOENT:
2295
        return False, ("Error while reading the OS variants file at %s: %s" %
2296
                       (variants_file, utils.ErrnoOrStr(err)))
2297

    
2298
  parameters = []
2299
  if constants.OS_PARAMETERS_FILE in os_files:
2300
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2301
    try:
2302
      parameters = utils.ReadFile(parameters_file).splitlines()
2303
    except EnvironmentError, err:
2304
      return False, ("Error while reading the OS parameters file at %s: %s" %
2305
                     (parameters_file, utils.ErrnoOrStr(err)))
2306
    parameters = [v.split(None, 1) for v in parameters]
2307

    
2308
  os_obj = objects.OS(name=name, path=os_dir,
2309
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2310
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2311
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2312
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2313
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2314
                                                 None),
2315
                      supported_variants=variants,
2316
                      supported_parameters=parameters,
2317
                      api_versions=api_versions)
2318
  return True, os_obj
2319

    
2320

    
2321
def OSFromDisk(name, base_dir=None):
2322
  """Create an OS instance from disk.
2323

2324
  This function will return an OS instance if the given name is a
2325
  valid OS name. Otherwise, it will raise an appropriate
2326
  L{RPCFail} exception, detailing why this is not a valid OS.
2327

2328
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2329
  an exception but returns true/false status data.
2330

2331
  @type base_dir: string
2332
  @keyword base_dir: Base directory containing OS installations.
2333
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2334
  @rtype: L{objects.OS}
2335
  @return: the OS instance if we find a valid one
2336
  @raise RPCFail: if we don't find a valid OS
2337

2338
  """
2339
  name_only = objects.OS.GetName(name)
2340
  status, payload = _TryOSFromDisk(name_only, base_dir)
2341

    
2342
  if not status:
2343
    _Fail(payload)
2344

    
2345
  return payload
2346

    
2347

    
2348
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2349
  """Calculate the basic environment for an os script.
2350

2351
  @type os_name: str
2352
  @param os_name: full operating system name (including variant)
2353
  @type inst_os: L{objects.OS}
2354
  @param inst_os: operating system for which the environment is being built
2355
  @type os_params: dict
2356
  @param os_params: the OS parameters
2357
  @type debug: integer
2358
  @param debug: debug level (0 or 1, for OS Api 10)
2359
  @rtype: dict
2360
  @return: dict of environment variables
2361
  @raise errors.BlockDeviceError: if the block device
2362
      cannot be found
2363

2364
  """
2365
  result = {}
2366
  api_version = \
2367
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2368
  result["OS_API_VERSION"] = "%d" % api_version
2369
  result["OS_NAME"] = inst_os.name
2370
  result["DEBUG_LEVEL"] = "%d" % debug
2371

    
2372
  # OS variants
2373
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2374
    variant = objects.OS.GetVariant(os_name)
2375
    if not variant:
2376
      variant = inst_os.supported_variants[0]
2377
  else:
2378
    variant = ""
2379
  result["OS_VARIANT"] = variant
2380

    
2381
  # OS params
2382
  for pname, pvalue in os_params.items():
2383
    result["OSP_%s" % pname.upper()] = pvalue
2384

    
2385
  # Set a default path otherwise programs called by OS scripts (or
2386
  # even hooks called from OS scripts) might break, and we don't want
2387
  # to have each script require setting a PATH variable
2388
  result["PATH"] = constants.HOOKS_PATH
2389

    
2390
  return result
2391

    
2392

    
2393
def OSEnvironment(instance, inst_os, debug=0):
2394
  """Calculate the environment for an os script.
2395

2396
  @type instance: L{objects.Instance}
2397
  @param instance: target instance for the os script run
2398
  @type inst_os: L{objects.OS}
2399
  @param inst_os: operating system for which the environment is being built
2400
  @type debug: integer
2401
  @param debug: debug level (0 or 1, for OS Api 10)
2402
  @rtype: dict
2403
  @return: dict of environment variables
2404
  @raise errors.BlockDeviceError: if the block device
2405
      cannot be found
2406

2407
  """
2408
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2409

    
2410
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2411
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2412

    
2413
  result["HYPERVISOR"] = instance.hypervisor
2414
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2415
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2416
  result["INSTANCE_SECONDARY_NODES"] = \
2417
      ("%s" % " ".join(instance.secondary_nodes))
2418

    
2419
  # Disks
2420
  for idx, disk in enumerate(instance.disks):
2421
    real_disk = _OpenRealBD(disk)
2422
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2423
    result["DISK_%d_ACCESS" % idx] = disk.mode
2424
    if constants.HV_DISK_TYPE in instance.hvparams:
2425
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2426
        instance.hvparams[constants.HV_DISK_TYPE]
2427
    if disk.dev_type in constants.LDS_BLOCK:
2428
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2429
    elif disk.dev_type == constants.LD_FILE:
2430
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2431
        "file:%s" % disk.physical_id[0]
2432

    
2433
  # NICs
2434
  for idx, nic in enumerate(instance.nics):
2435
    result["NIC_%d_MAC" % idx] = nic.mac
2436
    if nic.ip:
2437
      result["NIC_%d_IP" % idx] = nic.ip
2438
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2439
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2440
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2441
    if nic.nicparams[constants.NIC_LINK]:
2442
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2443
    if constants.HV_NIC_TYPE in instance.hvparams:
2444
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2445
        instance.hvparams[constants.HV_NIC_TYPE]
2446

    
2447
  # HV/BE params
2448
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2449
    for key, value in source.items():
2450
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2451

    
2452
  return result
2453

    
2454

    
2455
def BlockdevGrow(disk, amount, dryrun, backingstore):
2456
  """Grow a stack of block devices.
2457

2458
  This function is called recursively, with the childrens being the
2459
  first ones to resize.
2460

2461
  @type disk: L{objects.Disk}
2462
  @param disk: the disk to be grown
2463
  @type amount: integer
2464
  @param amount: the amount (in mebibytes) to grow with
2465
  @type dryrun: boolean
2466
  @param dryrun: whether to execute the operation in simulation mode
2467
      only, without actually increasing the size
2468
  @param backingstore: whether to execute the operation on backing storage
2469
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2470
      whereas LVM, file, RBD are backing storage
2471
  @rtype: (status, result)
2472
  @return: a tuple with the status of the operation (True/False), and
2473
      the errors message if status is False
2474

2475
  """
2476
  r_dev = _RecursiveFindBD(disk)
2477
  if r_dev is None:
2478
    _Fail("Cannot find block device %s", disk)
2479

    
2480
  try:
2481
    r_dev.Grow(amount, dryrun, backingstore)
2482
  except errors.BlockDeviceError, err:
2483
    _Fail("Failed to grow block device: %s", err, exc=True)
2484

    
2485

    
2486
def BlockdevSnapshot(disk):
2487
  """Create a snapshot copy of a block device.
2488

2489
  This function is called recursively, and the snapshot is actually created
2490
  just for the leaf lvm backend device.
2491

2492
  @type disk: L{objects.Disk}
2493
  @param disk: the disk to be snapshotted
2494
  @rtype: string
2495
  @return: snapshot disk ID as (vg, lv)
2496

2497
  """
2498
  if disk.dev_type == constants.LD_DRBD8:
2499
    if not disk.children:
2500
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2501
            disk.unique_id)
2502
    return BlockdevSnapshot(disk.children[0])
2503
  elif disk.dev_type == constants.LD_LV:
2504
    r_dev = _RecursiveFindBD(disk)
2505
    if r_dev is not None:
2506
      # FIXME: choose a saner value for the snapshot size
2507
      # let's stay on the safe side and ask for the full size, for now
2508
      return r_dev.Snapshot(disk.size)
2509
    else:
2510
      _Fail("Cannot find block device %s", disk)
2511
  else:
2512
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2513
          disk.unique_id, disk.dev_type)
2514

    
2515

    
2516
def FinalizeExport(instance, snap_disks):
2517
  """Write out the export configuration information.
2518

2519
  @type instance: L{objects.Instance}
2520
  @param instance: the instance which we export, used for
2521
      saving configuration
2522
  @type snap_disks: list of L{objects.Disk}
2523
  @param snap_disks: list of snapshot block devices, which
2524
      will be used to get the actual name of the dump file
2525

2526
  @rtype: None
2527

2528
  """
2529
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2530
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2531

    
2532
  config = objects.SerializableConfigParser()
2533

    
2534
  config.add_section(constants.INISECT_EXP)
2535
  config.set(constants.INISECT_EXP, "version", "0")
2536
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2537
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2538
  config.set(constants.INISECT_EXP, "os", instance.os)
2539
  config.set(constants.INISECT_EXP, "compression", "none")
2540

    
2541
  config.add_section(constants.INISECT_INS)
2542
  config.set(constants.INISECT_INS, "name", instance.name)
2543
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2544
             instance.beparams[constants.BE_MAXMEM])
2545
  config.set(constants.INISECT_INS, "minmem", "%d" %
2546
             instance.beparams[constants.BE_MINMEM])
2547
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2548
  config.set(constants.INISECT_INS, "memory", "%d" %
2549
             instance.beparams[constants.BE_MAXMEM])
2550
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2551
             instance.beparams[constants.BE_VCPUS])
2552
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2553
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2554
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2555

    
2556
  nic_total = 0
2557
  for nic_count, nic in enumerate(instance.nics):
2558
    nic_total += 1
2559
    config.set(constants.INISECT_INS, "nic%d_mac" %
2560
               nic_count, "%s" % nic.mac)
2561
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2562
    for param in constants.NICS_PARAMETER_TYPES:
2563
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2564
                 "%s" % nic.nicparams.get(param, None))
2565
  # TODO: redundant: on load can read nics until it doesn't exist
2566
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2567

    
2568
  disk_total = 0
2569
  for disk_count, disk in enumerate(snap_disks):
2570
    if disk:
2571
      disk_total += 1
2572
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2573
                 ("%s" % disk.iv_name))
2574
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2575
                 ("%s" % disk.physical_id[1]))
2576
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2577
                 ("%d" % disk.size))
2578

    
2579
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2580

    
2581
  # New-style hypervisor/backend parameters
2582

    
2583
  config.add_section(constants.INISECT_HYP)
2584
  for name, value in instance.hvparams.items():
2585
    if name not in constants.HVC_GLOBALS:
2586
      config.set(constants.INISECT_HYP, name, str(value))
2587

    
2588
  config.add_section(constants.INISECT_BEP)
2589
  for name, value in instance.beparams.items():
2590
    config.set(constants.INISECT_BEP, name, str(value))
2591

    
2592
  config.add_section(constants.INISECT_OSP)
2593
  for name, value in instance.osparams.items():
2594
    config.set(constants.INISECT_OSP, name, str(value))
2595

    
2596
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2597
                  data=config.Dumps())
2598
  shutil.rmtree(finaldestdir, ignore_errors=True)
2599
  shutil.move(destdir, finaldestdir)
2600

    
2601

    
2602
def ExportInfo(dest):
2603
  """Get export configuration information.
2604

2605
  @type dest: str
2606
  @param dest: directory containing the export
2607

2608
  @rtype: L{objects.SerializableConfigParser}
2609
  @return: a serializable config file containing the
2610
      export info
2611

2612
  """
2613
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2614

    
2615
  config = objects.SerializableConfigParser()
2616
  config.read(cff)
2617

    
2618
  if (not config.has_section(constants.INISECT_EXP) or
2619
      not config.has_section(constants.INISECT_INS)):
2620
    _Fail("Export info file doesn't have the required fields")
2621

    
2622
  return config.Dumps()
2623

    
2624

    
2625
def ListExports():
2626
  """Return a list of exports currently available on this machine.
2627

2628
  @rtype: list
2629
  @return: list of the exports
2630

2631
  """
2632
  if os.path.isdir(pathutils.EXPORT_DIR):
2633
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2634
  else:
2635
    _Fail("No exports directory")
2636

    
2637

    
2638
def RemoveExport(export):
2639
  """Remove an existing export from the node.
2640

2641
  @type export: str
2642
  @param export: the name of the export to remove
2643
  @rtype: None
2644

2645
  """
2646
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2647

    
2648
  try:
2649
    shutil.rmtree(target)
2650
  except EnvironmentError, err:
2651
    _Fail("Error while removing the export: %s", err, exc=True)
2652

    
2653

    
2654
def BlockdevRename(devlist):
2655
  """Rename a list of block devices.
2656

2657
  @type devlist: list of tuples
2658
  @param devlist: list of tuples of the form  (disk,
2659
      new_logical_id, new_physical_id); disk is an
2660
      L{objects.Disk} object describing the current disk,
2661
      and new logical_id/physical_id is the name we
2662
      rename it to
2663
  @rtype: boolean
2664
  @return: True if all renames succeeded, False otherwise
2665

2666
  """
2667
  msgs = []
2668
  result = True
2669
  for disk, unique_id in devlist:
2670
    dev = _RecursiveFindBD(disk)
2671
    if dev is None:
2672
      msgs.append("Can't find device %s in rename" % str(disk))
2673
      result = False
2674
      continue
2675
    try:
2676
      old_rpath = dev.dev_path
2677
      dev.Rename(unique_id)
2678
      new_rpath = dev.dev_path
2679
      if old_rpath != new_rpath:
2680
        DevCacheManager.RemoveCache(old_rpath)
2681
        # FIXME: we should add the new cache information here, like:
2682
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2683
        # but we don't have the owner here - maybe parse from existing
2684
        # cache? for now, we only lose lvm data when we rename, which
2685
        # is less critical than DRBD or MD
2686
    except errors.BlockDeviceError, err:
2687
      msgs.append("Can't rename device '%s' to '%s': %s" %
2688
                  (dev, unique_id, err))
2689
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2690
      result = False
2691
  if not result:
2692
    _Fail("; ".join(msgs))
2693

    
2694

    
2695
def _TransformFileStorageDir(fs_dir):
2696
  """Checks whether given file_storage_dir is valid.
2697

2698
  Checks wheter the given fs_dir is within the cluster-wide default
2699
  file_storage_dir or the shared_file_storage_dir, which are stored in
2700
  SimpleStore. Only paths under those directories are allowed.
2701

2702
  @type fs_dir: str
2703
  @param fs_dir: the path to check
2704

2705
  @return: the normalized path if valid, None otherwise
2706

2707
  """
2708
  if not (constants.ENABLE_FILE_STORAGE or
2709
          constants.ENABLE_SHARED_FILE_STORAGE):
2710
    _Fail("File storage disabled at configure time")
2711
  cfg = _GetConfig()
2712
  fs_dir = os.path.normpath(fs_dir)
2713
  base_fstore = cfg.GetFileStorageDir()
2714
  base_shared = cfg.GetSharedFileStorageDir()
2715
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2716
          utils.IsBelowDir(base_shared, fs_dir)):
2717
    _Fail("File storage directory '%s' is not under base file"
2718
          " storage directory '%s' or shared storage directory '%s'",
2719
          fs_dir, base_fstore, base_shared)
2720
  return fs_dir
2721

    
2722

    
2723
def CreateFileStorageDir(file_storage_dir):
2724
  """Create file storage directory.
2725

2726
  @type file_storage_dir: str
2727
  @param file_storage_dir: directory to create
2728

2729
  @rtype: tuple
2730
  @return: tuple with first element a boolean indicating wheter dir
2731
      creation was successful or not
2732

2733
  """
2734
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2735
  if os.path.exists(file_storage_dir):
2736
    if not os.path.isdir(file_storage_dir):
2737
      _Fail("Specified storage dir '%s' is not a directory",
2738
            file_storage_dir)
2739
  else:
2740
    try:
2741
      os.makedirs(file_storage_dir, 0750)
2742
    except OSError, err:
2743
      _Fail("Cannot create file storage directory '%s': %s",
2744
            file_storage_dir, err, exc=True)
2745

    
2746

    
2747
def RemoveFileStorageDir(file_storage_dir):
2748
  """Remove file storage directory.
2749

2750
  Remove it only if it's empty. If not log an error and return.
2751

2752
  @type file_storage_dir: str
2753
  @param file_storage_dir: the directory we should cleanup
2754
  @rtype: tuple (success,)
2755
  @return: tuple of one element, C{success}, denoting
2756
      whether the operation was successful
2757

2758
  """
2759
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2760
  if os.path.exists(file_storage_dir):
2761
    if not os.path.isdir(file_storage_dir):
2762
      _Fail("Specified Storage directory '%s' is not a directory",
2763
            file_storage_dir)
2764
    # deletes dir only if empty, otherwise we want to fail the rpc call
2765
    try:
2766
      os.rmdir(file_storage_dir)
2767
    except OSError, err:
2768
      _Fail("Cannot remove file storage directory '%s': %s",
2769
            file_storage_dir, err)
2770

    
2771

    
2772
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2773
  """Rename the file storage directory.
2774

2775
  @type old_file_storage_dir: str
2776
  @param old_file_storage_dir: the current path
2777
  @type new_file_storage_dir: str
2778
  @param new_file_storage_dir: the name we should rename to
2779
  @rtype: tuple (success,)
2780
  @return: tuple of one element, C{success}, denoting
2781
      whether the operation was successful
2782

2783
  """
2784
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2785
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2786
  if not os.path.exists(new_file_storage_dir):
2787
    if os.path.isdir(old_file_storage_dir):
2788
      try:
2789
        os.rename(old_file_storage_dir, new_file_storage_dir)
2790
      except OSError, err:
2791
        _Fail("Cannot rename '%s' to '%s': %s",
2792
              old_file_storage_dir, new_file_storage_dir, err)
2793
    else:
2794
      _Fail("Specified storage dir '%s' is not a directory",
2795
            old_file_storage_dir)
2796
  else:
2797
    if os.path.exists(old_file_storage_dir):
2798
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2799
            old_file_storage_dir, new_file_storage_dir)
2800

    
2801

    
2802
def _EnsureJobQueueFile(file_name):
2803
  """Checks whether the given filename is in the queue directory.
2804

2805
  @type file_name: str
2806
  @param file_name: the file name we should check
2807
  @rtype: None
2808
  @raises RPCFail: if the file is not valid
2809

2810
  """
2811
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2812
    _Fail("Passed job queue file '%s' does not belong to"
2813
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2814

    
2815

    
2816
def JobQueueUpdate(file_name, content):
2817
  """Updates a file in the queue directory.
2818

2819
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2820
  checking.
2821

2822
  @type file_name: str
2823
  @param file_name: the job file name
2824
  @type content: str
2825
  @param content: the new job contents
2826
  @rtype: boolean
2827
  @return: the success of the operation
2828

2829
  """
2830
  file_name = vcluster.LocalizeVirtualPath(file_name)
2831

    
2832
  _EnsureJobQueueFile(file_name)
2833
  getents = runtime.GetEnts()
2834

    
2835
  # Write and replace the file atomically
2836
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2837
                  gid=getents.masterd_gid)
2838

    
2839

    
2840
def JobQueueRename(old, new):
2841
  """Renames a job queue file.
2842

2843
  This is just a wrapper over os.rename with proper checking.
2844

2845
  @type old: str
2846
  @param old: the old (actual) file name
2847
  @type new: str
2848
  @param new: the desired file name
2849
  @rtype: tuple
2850
  @return: the success of the operation and payload
2851

2852
  """
2853
  old = vcluster.LocalizeVirtualPath(old)
2854
  new = vcluster.LocalizeVirtualPath(new)
2855

    
2856
  _EnsureJobQueueFile(old)
2857
  _EnsureJobQueueFile(new)
2858

    
2859
  getents = runtime.GetEnts()
2860

    
2861
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2862
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2863

    
2864

    
2865
def BlockdevClose(instance_name, disks):
2866
  """Closes the given block devices.
2867

2868
  This means they will be switched to secondary mode (in case of
2869
  DRBD).
2870

2871
  @param instance_name: if the argument is not empty, the symlinks
2872
      of this instance will be removed
2873
  @type disks: list of L{objects.Disk}
2874
  @param disks: the list of disks to be closed
2875
  @rtype: tuple (success, message)
2876
  @return: a tuple of success and message, where success
2877
      indicates the succes of the operation, and message
2878
      which will contain the error details in case we
2879
      failed
2880

2881
  """
2882
  bdevs = []
2883
  for cf in disks:
2884
    rd = _RecursiveFindBD(cf)
2885
    if rd is None:
2886
      _Fail("Can't find device %s", cf)
2887
    bdevs.append(rd)
2888

    
2889
  msg = []
2890
  for rd in bdevs:
2891
    try:
2892
      rd.Close()
2893
    except errors.BlockDeviceError, err:
2894
      msg.append(str(err))
2895
  if msg:
2896
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2897
  else:
2898
    if instance_name:
2899
      _RemoveBlockDevLinks(instance_name, disks)
2900

    
2901

    
2902
def ValidateHVParams(hvname, hvparams):
2903
  """Validates the given hypervisor parameters.
2904

2905
  @type hvname: string
2906
  @param hvname: the hypervisor name
2907
  @type hvparams: dict
2908
  @param hvparams: the hypervisor parameters to be validated
2909
  @rtype: None
2910

2911
  """
2912
  try:
2913
    hv_type = hypervisor.GetHypervisor(hvname)
2914
    hv_type.ValidateParameters(hvparams)
2915
  except errors.HypervisorError, err:
2916
    _Fail(str(err), log=False)
2917

    
2918

    
2919
def _CheckOSPList(os_obj, parameters):
2920
  """Check whether a list of parameters is supported by the OS.
2921

2922
  @type os_obj: L{objects.OS}
2923
  @param os_obj: OS object to check
2924
  @type parameters: list
2925
  @param parameters: the list of parameters to check
2926

2927
  """
2928
  supported = [v[0] for v in os_obj.supported_parameters]
2929
  delta = frozenset(parameters).difference(supported)
2930
  if delta:
2931
    _Fail("The following parameters are not supported"
2932
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2933

    
2934

    
2935
def ValidateOS(required, osname, checks, osparams):
2936
  """Validate the given OS' parameters.
2937

2938
  @type required: boolean
2939
  @param required: whether absence of the OS should translate into
2940
      failure or not
2941
  @type osname: string
2942
  @param osname: the OS to be validated
2943
  @type checks: list
2944
  @param checks: list of the checks to run (currently only 'parameters')
2945
  @type osparams: dict
2946
  @param osparams: dictionary with OS parameters
2947
  @rtype: boolean
2948
  @return: True if the validation passed, or False if the OS was not
2949
      found and L{required} was false
2950

2951
  """
2952
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2953
    _Fail("Unknown checks required for OS %s: %s", osname,
2954
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2955

    
2956
  name_only = objects.OS.GetName(osname)
2957
  status, tbv = _TryOSFromDisk(name_only, None)
2958

    
2959
  if not status:
2960
    if required:
2961
      _Fail(tbv)
2962
    else:
2963
      return False
2964

    
2965
  if max(tbv.api_versions) < constants.OS_API_V20:
2966
    return True
2967

    
2968
  if constants.OS_VALIDATE_PARAMETERS in checks:
2969
    _CheckOSPList(tbv, osparams.keys())
2970

    
2971
  validate_env = OSCoreEnv(osname, tbv, osparams)
2972
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2973
                        cwd=tbv.path, reset_env=True)
2974
  if result.failed:
2975
    logging.error("os validate command '%s' returned error: %s output: %s",
2976
                  result.cmd, result.fail_reason, result.output)
2977
    _Fail("OS validation script failed (%s), output: %s",
2978
          result.fail_reason, result.output, log=False)
2979

    
2980
  return True
2981

    
2982

    
2983
def DemoteFromMC():
2984
  """Demotes the current node from master candidate role.
2985

2986
  """
2987
  # try to ensure we're not the master by mistake
2988
  master, myself = ssconf.GetMasterAndMyself()
2989
  if master == myself:
2990
    _Fail("ssconf status shows I'm the master node, will not demote")
2991

    
2992
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
2993
  if not result.failed:
2994
    _Fail("The master daemon is running, will not demote")
2995

    
2996
  try:
2997
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
2998
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
2999
  except EnvironmentError, err:
3000
    if err.errno != errno.ENOENT:
3001
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3002

    
3003
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3004

    
3005

    
3006
def _GetX509Filenames(cryptodir, name):
3007
  """Returns the full paths for the private key and certificate.
3008

3009
  """
3010
  return (utils.PathJoin(cryptodir, name),
3011
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3012
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3013

    
3014

    
3015
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3016
  """Creates a new X509 certificate for SSL/TLS.
3017

3018
  @type validity: int
3019
  @param validity: Validity in seconds
3020
  @rtype: tuple; (string, string)
3021
  @return: Certificate name and public part
3022

3023
  """
3024
  (key_pem, cert_pem) = \
3025
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3026
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3027

    
3028
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3029
                              prefix="x509-%s-" % utils.TimestampForFilename())
3030
  try:
3031
    name = os.path.basename(cert_dir)
3032
    assert len(name) > 5
3033

    
3034
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3035

    
3036
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3037
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3038

    
3039
    # Never return private key as it shouldn't leave the node
3040
    return (name, cert_pem)
3041
  except Exception:
3042
    shutil.rmtree(cert_dir, ignore_errors=True)
3043
    raise
3044

    
3045

    
3046
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3047
  """Removes a X509 certificate.
3048

3049
  @type name: string
3050
  @param name: Certificate name
3051

3052
  """
3053
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3054

    
3055
  utils.RemoveFile(key_file)
3056
  utils.RemoveFile(cert_file)
3057

    
3058
  try:
3059
    os.rmdir(cert_dir)
3060
  except EnvironmentError, err:
3061
    _Fail("Cannot remove certificate directory '%s': %s",
3062
          cert_dir, err)
3063

    
3064

    
3065
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3066
  """Returns the command for the requested input/output.
3067

3068
  @type instance: L{objects.Instance}
3069
  @param instance: The instance object
3070
  @param mode: Import/export mode
3071
  @param ieio: Input/output type
3072
  @param ieargs: Input/output arguments
3073

3074
  """
3075
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3076

    
3077
  env = None
3078
  prefix = None
3079
  suffix = None
3080
  exp_size = None
3081

    
3082
  if ieio == constants.IEIO_FILE:
3083
    (filename, ) = ieargs
3084

    
3085
    if not utils.IsNormAbsPath(filename):
3086
      _Fail("Path '%s' is not normalized or absolute", filename)
3087

    
3088
    real_filename = os.path.realpath(filename)
3089
    directory = os.path.dirname(real_filename)
3090

    
3091
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3092
      _Fail("File '%s' is not under exports directory '%s': %s",
3093
            filename, pathutils.EXPORT_DIR, real_filename)
3094

    
3095
    # Create directory
3096
    utils.Makedirs(directory, mode=0750)
3097

    
3098
    quoted_filename = utils.ShellQuote(filename)
3099

    
3100
    if mode == constants.IEM_IMPORT:
3101
      suffix = "> %s" % quoted_filename
3102
    elif mode == constants.IEM_EXPORT:
3103
      suffix = "< %s" % quoted_filename
3104

    
3105
      # Retrieve file size
3106
      try:
3107
        st = os.stat(filename)
3108
      except EnvironmentError, err:
3109
        logging.error("Can't stat(2) %s: %s", filename, err)
3110
      else:
3111
        exp_size = utils.BytesToMebibyte(st.st_size)
3112

    
3113
  elif ieio == constants.IEIO_RAW_DISK:
3114
    (disk, ) = ieargs
3115

    
3116
    real_disk = _OpenRealBD(disk)
3117

    
3118
    if mode == constants.IEM_IMPORT:
3119
      # we set here a smaller block size as, due to transport buffering, more
3120
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3121
      # is not already there or we pass a wrong path; we use notrunc to no
3122
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3123
      # much memory; this means that at best, we flush every 64k, which will
3124
      # not be very fast
3125
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3126
                                    " bs=%s oflag=dsync"),
3127
                                    real_disk.dev_path,
3128
                                    str(64 * 1024))
3129

    
3130
    elif mode == constants.IEM_EXPORT:
3131
      # the block size on the read dd is 1MiB to match our units
3132
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3133
                                   real_disk.dev_path,
3134
                                   str(1024 * 1024), # 1 MB
3135
                                   str(disk.size))
3136
      exp_size = disk.size
3137

    
3138
  elif ieio == constants.IEIO_SCRIPT:
3139
    (disk, disk_index, ) = ieargs
3140

    
3141
    assert isinstance(disk_index, (int, long))
3142

    
3143
    real_disk = _OpenRealBD(disk)
3144

    
3145
    inst_os = OSFromDisk(instance.os)
3146
    env = OSEnvironment(instance, inst_os)
3147

    
3148
    if mode == constants.IEM_IMPORT:
3149
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3150
      env["IMPORT_INDEX"] = str(disk_index)
3151
      script = inst_os.import_script
3152

    
3153
    elif mode == constants.IEM_EXPORT:
3154
      env["EXPORT_DEVICE"] = real_disk.dev_path
3155
      env["EXPORT_INDEX"] = str(disk_index)
3156
      script = inst_os.export_script
3157

    
3158
    # TODO: Pass special environment only to script
3159
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3160

    
3161
    if mode == constants.IEM_IMPORT:
3162
      suffix = "| %s" % script_cmd
3163

    
3164
    elif mode == constants.IEM_EXPORT:
3165
      prefix = "%s |" % script_cmd
3166

    
3167
    # Let script predict size
3168
    exp_size = constants.IE_CUSTOM_SIZE
3169

    
3170
  else:
3171
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3172

    
3173
  return (env, prefix, suffix, exp_size)
3174

    
3175

    
3176
def _CreateImportExportStatusDir(prefix):
3177
  """Creates status directory for import/export.
3178

3179
  """
3180
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3181
                          prefix=("%s-%s-" %
3182
                                  (prefix, utils.TimestampForFilename())))
3183

    
3184

    
3185
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3186
                            ieio, ieioargs):
3187
  """Starts an import or export daemon.
3188

3189
  @param mode: Import/output mode
3190
  @type opts: L{objects.ImportExportOptions}
3191
  @param opts: Daemon options
3192
  @type host: string
3193
  @param host: Remote host for export (None for import)
3194
  @type port: int
3195
  @param port: Remote port for export (None for import)
3196
  @type instance: L{objects.Instance}
3197
  @param instance: Instance object
3198
  @type component: string
3199
  @param component: which part of the instance is transferred now,
3200
      e.g. 'disk/0'
3201
  @param ieio: Input/output type
3202
  @param ieioargs: Input/output arguments
3203

3204
  """
3205
  if mode == constants.IEM_IMPORT:
3206
    prefix = "import"
3207

    
3208
    if not (host is None and port is None):
3209
      _Fail("Can not specify host or port on import")
3210

    
3211
  elif mode == constants.IEM_EXPORT:
3212
    prefix = "export"
3213

    
3214
    if host is None or port is None:
3215
      _Fail("Host and port must be specified for an export")
3216

    
3217
  else:
3218
    _Fail("Invalid mode %r", mode)
3219

    
3220
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3221
    _Fail("Cluster certificate can only be used for both key and CA")
3222

    
3223
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3224
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3225

    
3226
  if opts.key_name is None:
3227
    # Use server.pem
3228
    key_path = pathutils.NODED_CERT_FILE
3229
    cert_path = pathutils.NODED_CERT_FILE
3230
    assert opts.ca_pem is None
3231
  else:
3232
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3233
                                                 opts.key_name)
3234
    assert opts.ca_pem is not None
3235

    
3236
  for i in [key_path, cert_path]:
3237
    if not os.path.exists(i):
3238
      _Fail("File '%s' does not exist" % i)
3239

    
3240
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3241
  try:
3242
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3243
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3244
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3245

    
3246
    if opts.ca_pem is None:
3247
      # Use server.pem
3248
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3249
    else:
3250
      ca = opts.ca_pem
3251

    
3252
    # Write CA file
3253
    utils.WriteFile(ca_file, data=ca, mode=0400)
3254

    
3255
    cmd = [
3256
      pathutils.IMPORT_EXPORT_DAEMON,
3257
      status_file, mode,
3258
      "--key=%s" % key_path,
3259
      "--cert=%s" % cert_path,
3260
      "--ca=%s" % ca_file,
3261
      ]
3262

    
3263
    if host:
3264
      cmd.append("--host=%s" % host)
3265

    
3266
    if port:
3267
      cmd.append("--port=%s" % port)
3268

    
3269
    if opts.ipv6:
3270
      cmd.append("--ipv6")
3271
    else:
3272
      cmd.append("--ipv4")
3273

    
3274
    if opts.compress:
3275
      cmd.append("--compress=%s" % opts.compress)
3276

    
3277
    if opts.magic:
3278
      cmd.append("--magic=%s" % opts.magic)
3279

    
3280
    if exp_size is not None:
3281
      cmd.append("--expected-size=%s" % exp_size)
3282

    
3283
    if cmd_prefix:
3284
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3285

    
3286
    if cmd_suffix:
3287
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3288

    
3289
    if mode == constants.IEM_EXPORT:
3290
      # Retry connection a few times when connecting to remote peer
3291
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3292
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3293
    elif opts.connect_timeout is not None:
3294
      assert mode == constants.IEM_IMPORT
3295
      # Overall timeout for establishing connection while listening
3296
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3297

    
3298
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3299

    
3300
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3301
    # support for receiving a file descriptor for output
3302
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3303
                      output=logfile)
3304

    
3305
    # The import/export name is simply the status directory name
3306
    return os.path.basename(status_dir)
3307

    
3308
  except Exception:
3309
    shutil.rmtree(status_dir, ignore_errors=True)
3310
    raise
3311

    
3312

    
3313
def GetImportExportStatus(names):
3314
  """Returns import/export daemon status.
3315

3316
  @type names: sequence
3317
  @param names: List of names
3318
  @rtype: List of dicts
3319
  @return: Returns a list of the state of each named import/export or None if a
3320
           status couldn't be read
3321

3322
  """
3323
  result = []
3324

    
3325
  for name in names:
3326
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3327
                                 _IES_STATUS_FILE)
3328

    
3329
    try:
3330
      data = utils.ReadFile(status_file)
3331
    except EnvironmentError, err:
3332
      if err.errno != errno.ENOENT:
3333
        raise
3334
      data = None
3335

    
3336
    if not data:
3337
      result.append(None)
3338
      continue
3339

    
3340
    result.append(serializer.LoadJson(data))
3341

    
3342
  return result
3343

    
3344

    
3345
def AbortImportExport(name):
3346
  """Sends SIGTERM to a running import/export daemon.
3347

3348
  """
3349
  logging.info("Abort import/export %s", name)
3350

    
3351
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3352
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3353

    
3354
  if pid:
3355
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3356
                 name, pid)
3357
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3358

    
3359

    
3360
def CleanupImportExport(name):
3361
  """Cleanup after an import or export.
3362

3363
  If the import/export daemon is still running it's killed. Afterwards the
3364
  whole status directory is removed.
3365

3366
  """
3367
  logging.info("Finalizing import/export %s", name)
3368

    
3369
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3370

    
3371
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3372

    
3373
  if pid:
3374
    logging.info("Import/export %s is still running with PID %s",
3375
                 name, pid)
3376
    utils.KillProcess(pid, waitpid=False)
3377

    
3378
  shutil.rmtree(status_dir, ignore_errors=True)
3379

    
3380

    
3381
def _FindDisks(nodes_ip, disks):
3382
  """Sets the physical ID on disks and returns the block devices.
3383

3384
  """
3385
  # set the correct physical ID
3386
  my_name = netutils.Hostname.GetSysName()
3387
  for cf in disks:
3388
    cf.SetPhysicalID(my_name, nodes_ip)
3389

    
3390
  bdevs = []
3391

    
3392
  for cf in disks:
3393
    rd = _RecursiveFindBD(cf)
3394
    if rd is None:
3395
      _Fail("Can't find device %s", cf)
3396
    bdevs.append(rd)
3397
  return bdevs
3398

    
3399

    
3400
def DrbdDisconnectNet(nodes_ip, disks):
3401
  """Disconnects the network on a list of drbd devices.
3402

3403
  """
3404
  bdevs = _FindDisks(nodes_ip, disks)
3405

    
3406
  # disconnect disks
3407
  for rd in bdevs:
3408
    try:
3409
      rd.DisconnectNet()
3410
    except errors.BlockDeviceError, err:
3411
      _Fail("Can't change network configuration to standalone mode: %s",
3412
            err, exc=True)
3413

    
3414

    
3415
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3416
  """Attaches the network on a list of drbd devices.
3417

3418
  """
3419
  bdevs = _FindDisks(nodes_ip, disks)
3420

    
3421
  if multimaster:
3422
    for idx, rd in enumerate(bdevs):
3423
      try:
3424
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3425
      except EnvironmentError, err:
3426
        _Fail("Can't create symlink: %s", err)
3427
  # reconnect disks, switch to new master configuration and if
3428
  # needed primary mode
3429
  for rd in bdevs:
3430
    try:
3431
      rd.AttachNet(multimaster)
3432
    except errors.BlockDeviceError, err:
3433
      _Fail("Can't change network configuration: %s", err)
3434

    
3435
  # wait until the disks are connected; we need to retry the re-attach
3436
  # if the device becomes standalone, as this might happen if the one
3437
  # node disconnects and reconnects in a different mode before the
3438
  # other node reconnects; in this case, one or both of the nodes will
3439
  # decide it has wrong configuration and switch to standalone
3440

    
3441
  def _Attach():
3442
    all_connected = True
3443

    
3444
    for rd in bdevs:
3445
      stats = rd.GetProcStatus()
3446

    
3447
      all_connected = (all_connected and
3448
                       (stats.is_connected or stats.is_in_resync))
3449

    
3450
      if stats.is_standalone:
3451
        # peer had different config info and this node became
3452
        # standalone, even though this should not happen with the
3453
        # new staged way of changing disk configs
3454
        try:
3455
          rd.AttachNet(multimaster)
3456
        except errors.BlockDeviceError, err:
3457
          _Fail("Can't change network configuration: %s", err)
3458

    
3459
    if not all_connected:
3460
      raise utils.RetryAgain()
3461

    
3462
  try:
3463
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3464
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3465
  except utils.RetryTimeout:
3466
    _Fail("Timeout in disk reconnecting")
3467

    
3468
  if multimaster:
3469
    # change to primary mode
3470
    for rd in bdevs:
3471
      try:
3472
        rd.Open()
3473
      except errors.BlockDeviceError, err:
3474
        _Fail("Can't change to primary mode: %s", err)
3475

    
3476

    
3477
def DrbdWaitSync(nodes_ip, disks):
3478
  """Wait until DRBDs have synchronized.
3479

3480
  """
3481
  def _helper(rd):
3482
    stats = rd.GetProcStatus()
3483
    if not (stats.is_connected or stats.is_in_resync):
3484
      raise utils.RetryAgain()
3485
    return stats
3486

    
3487
  bdevs = _FindDisks(nodes_ip, disks)
3488

    
3489
  min_resync = 100
3490
  alldone = True
3491
  for rd in bdevs:
3492
    try:
3493
      # poll each second for 15 seconds
3494
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3495
    except utils.RetryTimeout:
3496
      stats = rd.GetProcStatus()
3497
      # last check
3498
      if not (stats.is_connected or stats.is_in_resync):
3499
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3500
    alldone = alldone and (not stats.is_in_resync)
3501
    if stats.sync_percent is not None:
3502
      min_resync = min(min_resync, stats.sync_percent)
3503

    
3504
  return (alldone, min_resync)
3505

    
3506

    
3507
def GetDrbdUsermodeHelper():
3508
  """Returns DRBD usermode helper currently configured.
3509

3510
  """
3511
  try:
3512
    return bdev.BaseDRBD.GetUsermodeHelper()
3513
  except errors.BlockDeviceError, err:
3514
    _Fail(str(err))
3515

    
3516

    
3517
def PowercycleNode(hypervisor_type):
3518
  """Hard-powercycle the node.
3519

3520
  Because we need to return first, and schedule the powercycle in the
3521
  background, we won't be able to report failures nicely.
3522

3523
  """
3524
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3525
  try:
3526
    pid = os.fork()
3527
  except OSError:
3528
    # if we can't fork, we'll pretend that we're in the child process
3529
    pid = 0
3530
  if pid > 0:
3531
    return "Reboot scheduled in 5 seconds"
3532
  # ensure the child is running on ram
3533
  try:
3534
    utils.Mlockall()
3535
  except Exception: # pylint: disable=W0703
3536
    pass
3537
  time.sleep(5)
3538
  hyper.PowercycleNode()
3539

    
3540

    
3541
class HooksRunner(object):
3542
  """Hook runner.
3543

3544
  This class is instantiated on the node side (ganeti-noded) and not
3545
  on the master side.
3546

3547
  """
3548
  def __init__(self, hooks_base_dir=None):
3549
    """Constructor for hooks runner.
3550

3551
    @type hooks_base_dir: str or None
3552
    @param hooks_base_dir: if not None, this overrides the
3553
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3554

3555
    """
3556
    if hooks_base_dir is None:
3557
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3558
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3559
    # constant
3560
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3561

    
3562
  def RunLocalHooks(self, node_list, hpath, phase, env):
3563
    """Check that the hooks will be run only locally and then run them.
3564

3565
    """
3566
    assert len(node_list) == 1
3567
    node = node_list[0]
3568
    _, myself = ssconf.GetMasterAndMyself()
3569
    assert node == myself
3570

    
3571
    results = self.RunHooks(hpath, phase, env)
3572

    
3573
    # Return values in the form expected by HooksMaster
3574
    return {node: (None, False, results)}
3575

    
3576
  def RunHooks(self, hpath, phase, env):
3577
    """Run the scripts in the hooks directory.
3578

3579
    @type hpath: str
3580
    @param hpath: the path to the hooks directory which
3581
        holds the scripts
3582
    @type phase: str
3583
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3584
        L{constants.HOOKS_PHASE_POST}
3585
    @type env: dict
3586
    @param env: dictionary with the environment for the hook
3587
    @rtype: list
3588
    @return: list of 3-element tuples:
3589
      - script path
3590
      - script result, either L{constants.HKR_SUCCESS} or
3591
        L{constants.HKR_FAIL}
3592
      - output of the script
3593

3594
    @raise errors.ProgrammerError: for invalid input
3595
        parameters
3596

3597
    """
3598
    if phase == constants.HOOKS_PHASE_PRE:
3599
      suffix = "pre"
3600
    elif phase == constants.HOOKS_PHASE_POST:
3601
      suffix = "post"
3602
    else:
3603
      _Fail("Unknown hooks phase '%s'", phase)
3604

    
3605
    subdir = "%s-%s.d" % (hpath, suffix)
3606
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3607

    
3608
    results = []
3609

    
3610
    if not os.path.isdir(dir_name):
3611
      # for non-existing/non-dirs, we simply exit instead of logging a
3612
      # warning at every operation
3613
      return results
3614

    
3615
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3616

    
3617
    for (relname, relstatus, runresult) in runparts_results:
3618
      if relstatus == constants.RUNPARTS_SKIP:
3619
        rrval = constants.HKR_SKIP
3620
        output = ""
3621
      elif relstatus == constants.RUNPARTS_ERR:
3622
        rrval = constants.HKR_FAIL
3623
        output = "Hook script execution error: %s" % runresult
3624
      elif relstatus == constants.RUNPARTS_RUN:
3625
        if runresult.failed:
3626
          rrval = constants.HKR_FAIL
3627
        else:
3628
          rrval = constants.HKR_SUCCESS
3629
        output = utils.SafeEncode(runresult.output.strip())
3630
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3631

    
3632
    return results
3633

    
3634

    
3635
class IAllocatorRunner(object):
3636
  """IAllocator runner.
3637

3638
  This class is instantiated on the node side (ganeti-noded) and not on
3639
  the master side.
3640

3641
  """
3642
  @staticmethod
3643
  def Run(name, idata):
3644
    """Run an iallocator script.
3645

3646
    @type name: str
3647
    @param name: the iallocator script name
3648
    @type idata: str
3649
    @param idata: the allocator input data
3650

3651
    @rtype: tuple
3652
    @return: two element tuple of:
3653
       - status
3654
       - either error message or stdout of allocator (for success)
3655

3656
    """
3657
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3658
                                  os.path.isfile)
3659
    if alloc_script is None:
3660
      _Fail("iallocator module '%s' not found in the search path", name)
3661

    
3662
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3663
    try:
3664
      os.write(fd, idata)
3665
      os.close(fd)
3666
      result = utils.RunCmd([alloc_script, fin_name])
3667
      if result.failed:
3668
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3669
              name, result.fail_reason, result.output)
3670
    finally:
3671
      os.unlink(fin_name)
3672

    
3673
    return result.stdout
3674

    
3675

    
3676
class DevCacheManager(object):
3677
  """Simple class for managing a cache of block device information.
3678

3679
  """
3680
  _DEV_PREFIX = "/dev/"
3681
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3682

    
3683
  @classmethod
3684
  def _ConvertPath(cls, dev_path):
3685
    """Converts a /dev/name path to the cache file name.
3686

3687
    This replaces slashes with underscores and strips the /dev
3688
    prefix. It then returns the full path to the cache file.
3689

3690
    @type dev_path: str
3691
    @param dev_path: the C{/dev/} path name
3692
    @rtype: str
3693
    @return: the converted path name
3694

3695
    """
3696
    if dev_path.startswith(cls._DEV_PREFIX):
3697
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3698
    dev_path = dev_path.replace("/", "_")
3699
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3700
    return fpath
3701

    
3702
  @classmethod
3703
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3704
    """Updates the cache information for a given device.
3705

3706
    @type dev_path: str
3707
    @param dev_path: the pathname of the device
3708
    @type owner: str
3709
    @param owner: the owner (instance name) of the device
3710
    @type on_primary: bool
3711
    @param on_primary: whether this is the primary
3712
        node nor not
3713
    @type iv_name: str
3714
    @param iv_name: the instance-visible name of the
3715
        device, as in objects.Disk.iv_name
3716

3717
    @rtype: None
3718

3719
    """
3720
    if dev_path is None:
3721
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3722
      return
3723
    fpath = cls._ConvertPath(dev_path)
3724
    if on_primary:
3725
      state = "primary"
3726
    else:
3727
      state = "secondary"
3728
    if iv_name is None:
3729
      iv_name = "not_visible"
3730
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3731
    try:
3732
      utils.WriteFile(fpath, data=fdata)
3733
    except EnvironmentError, err:
3734
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3735

    
3736
  @classmethod
3737
  def RemoveCache(cls, dev_path):
3738
    """Remove data for a dev_path.
3739

3740
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3741
    path name and logging.
3742

3743
    @type dev_path: str
3744
    @param dev_path: the pathname of the device
3745

3746
    @rtype: None
3747

3748
    """
3749
    if dev_path is None:
3750
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3751
      return
3752
    fpath = cls._ConvertPath(dev_path)
3753
    try:
3754
      utils.RemoveFile(fpath)
3755
    except EnvironmentError, err:
3756
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)