Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ a5ce2ea2

History | View | Annotate | Download (113.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65
from ganeti import pathutils
66

    
67

    
68
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
69
_ALLOWED_CLEAN_DIRS = frozenset([
70
  pathutils.DATA_DIR,
71
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
72
  pathutils.QUEUE_DIR,
73
  pathutils.CRYPTO_KEYS_DIR,
74
  ])
75
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
76
_X509_KEY_FILE = "key"
77
_X509_CERT_FILE = "cert"
78
_IES_STATUS_FILE = "status"
79
_IES_PID_FILE = "pid"
80
_IES_CA_FILE = "ca"
81

    
82
#: Valid LVS output line regex
83
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
84

    
85
# Actions for the master setup script
86
_MASTER_START = "start"
87
_MASTER_STOP = "stop"
88

    
89

    
90
class RPCFail(Exception):
91
  """Class denoting RPC failure.
92

93
  Its argument is the error message.
94

95
  """
96

    
97

    
98
def _Fail(msg, *args, **kwargs):
99
  """Log an error and the raise an RPCFail exception.
100

101
  This exception is then handled specially in the ganeti daemon and
102
  turned into a 'failed' return type. As such, this function is a
103
  useful shortcut for logging the error and returning it to the master
104
  daemon.
105

106
  @type msg: string
107
  @param msg: the text of the exception
108
  @raise RPCFail
109

110
  """
111
  if args:
112
    msg = msg % args
113
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
114
    if "exc" in kwargs and kwargs["exc"]:
115
      logging.exception(msg)
116
    else:
117
      logging.error(msg)
118
  raise RPCFail(msg)
119

    
120

    
121
def _GetConfig():
122
  """Simple wrapper to return a SimpleStore.
123

124
  @rtype: L{ssconf.SimpleStore}
125
  @return: a SimpleStore instance
126

127
  """
128
  return ssconf.SimpleStore()
129

    
130

    
131
def _GetSshRunner(cluster_name):
132
  """Simple wrapper to return an SshRunner.
133

134
  @type cluster_name: str
135
  @param cluster_name: the cluster name, which is needed
136
      by the SshRunner constructor
137
  @rtype: L{ssh.SshRunner}
138
  @return: an SshRunner instance
139

140
  """
141
  return ssh.SshRunner(cluster_name)
142

    
143

    
144
def _Decompress(data):
145
  """Unpacks data compressed by the RPC client.
146

147
  @type data: list or tuple
148
  @param data: Data sent by RPC client
149
  @rtype: str
150
  @return: Decompressed data
151

152
  """
153
  assert isinstance(data, (list, tuple))
154
  assert len(data) == 2
155
  (encoding, content) = data
156
  if encoding == constants.RPC_ENCODING_NONE:
157
    return content
158
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
159
    return zlib.decompress(base64.b64decode(content))
160
  else:
161
    raise AssertionError("Unknown data encoding")
162

    
163

    
164
def _CleanDirectory(path, exclude=None):
165
  """Removes all regular files in a directory.
166

167
  @type path: str
168
  @param path: the directory to clean
169
  @type exclude: list
170
  @param exclude: list of files to be excluded, defaults
171
      to the empty list
172

173
  """
174
  if path not in _ALLOWED_CLEAN_DIRS:
175
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
176
          path)
177

    
178
  if not os.path.isdir(path):
179
    return
180
  if exclude is None:
181
    exclude = []
182
  else:
183
    # Normalize excluded paths
184
    exclude = [os.path.normpath(i) for i in exclude]
185

    
186
  for rel_name in utils.ListVisibleFiles(path):
187
    full_name = utils.PathJoin(path, rel_name)
188
    if full_name in exclude:
189
      continue
190
    if os.path.isfile(full_name) and not os.path.islink(full_name):
191
      utils.RemoveFile(full_name)
192

    
193

    
194
def _BuildUploadFileList():
195
  """Build the list of allowed upload files.
196

197
  This is abstracted so that it's built only once at module import time.
198

199
  """
200
  allowed_files = set([
201
    pathutils.CLUSTER_CONF_FILE,
202
    constants.ETC_HOSTS,
203
    pathutils.SSH_KNOWN_HOSTS_FILE,
204
    pathutils.VNC_PASSWORD_FILE,
205
    pathutils.RAPI_CERT_FILE,
206
    pathutils.SPICE_CERT_FILE,
207
    pathutils.SPICE_CACERT_FILE,
208
    pathutils.RAPI_USERS_FILE,
209
    pathutils.CONFD_HMAC_KEY,
210
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
211
    ])
212

    
213
  for hv_name in constants.HYPER_TYPES:
214
    hv_class = hypervisor.GetHypervisorClass(hv_name)
215
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
216

    
217
  return frozenset(allowed_files)
218

    
219

    
220
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
221

    
222

    
223
def JobQueuePurge():
224
  """Removes job queue files and archived jobs.
225

226
  @rtype: tuple
227
  @return: True, None
228

229
  """
230
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
231
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
232

    
233

    
234
def GetMasterInfo():
235
  """Returns master information.
236

237
  This is an utility function to compute master information, either
238
  for consumption here or from the node daemon.
239

240
  @rtype: tuple
241
  @return: master_netdev, master_ip, master_name, primary_ip_family,
242
    master_netmask
243
  @raise RPCFail: in case of errors
244

245
  """
246
  try:
247
    cfg = _GetConfig()
248
    master_netdev = cfg.GetMasterNetdev()
249
    master_ip = cfg.GetMasterIP()
250
    master_netmask = cfg.GetMasterNetmask()
251
    master_node = cfg.GetMasterNode()
252
    primary_ip_family = cfg.GetPrimaryIPFamily()
253
  except errors.ConfigurationError, err:
254
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
255
  return (master_netdev, master_ip, master_node, primary_ip_family,
256
          master_netmask)
257

    
258

    
259
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
260
  """Decorator that runs hooks before and after the decorated function.
261

262
  @type hook_opcode: string
263
  @param hook_opcode: opcode of the hook
264
  @type hooks_path: string
265
  @param hooks_path: path of the hooks
266
  @type env_builder_fn: function
267
  @param env_builder_fn: function that returns a dictionary containing the
268
    environment variables for the hooks. Will get all the parameters of the
269
    decorated function.
270
  @raise RPCFail: in case of pre-hook failure
271

272
  """
273
  def decorator(fn):
274
    def wrapper(*args, **kwargs):
275
      _, myself = ssconf.GetMasterAndMyself()
276
      nodes = ([myself], [myself])  # these hooks run locally
277

    
278
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
279

    
280
      cfg = _GetConfig()
281
      hr = HooksRunner()
282
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
283
                            None, env_fn, logging.warning, cfg.GetClusterName(),
284
                            cfg.GetMasterNode())
285

    
286
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
287
      result = fn(*args, **kwargs)
288
      hm.RunPhase(constants.HOOKS_PHASE_POST)
289

    
290
      return result
291
    return wrapper
292
  return decorator
293

    
294

    
295
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
296
  """Builds environment variables for master IP hooks.
297

298
  @type master_params: L{objects.MasterNetworkParameters}
299
  @param master_params: network parameters of the master
300
  @type use_external_mip_script: boolean
301
  @param use_external_mip_script: whether to use an external master IP
302
    address setup script (unused, but necessary per the implementation of the
303
    _RunLocalHooks decorator)
304

305
  """
306
  # pylint: disable=W0613
307
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
308
  env = {
309
    "MASTER_NETDEV": master_params.netdev,
310
    "MASTER_IP": master_params.ip,
311
    "MASTER_NETMASK": str(master_params.netmask),
312
    "CLUSTER_IP_VERSION": str(ver),
313
  }
314

    
315
  return env
316

    
317

    
318
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
319
  """Execute the master IP address setup script.
320

321
  @type master_params: L{objects.MasterNetworkParameters}
322
  @param master_params: network parameters of the master
323
  @type action: string
324
  @param action: action to pass to the script. Must be one of
325
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
326
  @type use_external_mip_script: boolean
327
  @param use_external_mip_script: whether to use an external master IP
328
    address setup script
329
  @raise backend.RPCFail: if there are errors during the execution of the
330
    script
331

332
  """
333
  env = _BuildMasterIpEnv(master_params)
334

    
335
  if use_external_mip_script:
336
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
337
  else:
338
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
339

    
340
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
341

    
342
  if result.failed:
343
    _Fail("Failed to %s the master IP. Script return value: %s" %
344
          (action, result.exit_code), log=True)
345

    
346

    
347
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
348
               _BuildMasterIpEnv)
349
def ActivateMasterIp(master_params, use_external_mip_script):
350
  """Activate the IP address of the master daemon.
351

352
  @type master_params: L{objects.MasterNetworkParameters}
353
  @param master_params: network parameters of the master
354
  @type use_external_mip_script: boolean
355
  @param use_external_mip_script: whether to use an external master IP
356
    address setup script
357
  @raise RPCFail: in case of errors during the IP startup
358

359
  """
360
  _RunMasterSetupScript(master_params, _MASTER_START,
361
                        use_external_mip_script)
362

    
363

    
364
def StartMasterDaemons(no_voting):
365
  """Activate local node as master node.
366

367
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
368

369
  @type no_voting: boolean
370
  @param no_voting: whether to start ganeti-masterd without a node vote
371
      but still non-interactively
372
  @rtype: None
373

374
  """
375

    
376
  if no_voting:
377
    masterd_args = "--no-voting --yes-do-it"
378
  else:
379
    masterd_args = ""
380

    
381
  env = {
382
    "EXTRA_MASTERD_ARGS": masterd_args,
383
    }
384

    
385
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
386
  if result.failed:
387
    msg = "Can't start Ganeti master: %s" % result.output
388
    logging.error(msg)
389
    _Fail(msg)
390

    
391

    
392
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
393
               _BuildMasterIpEnv)
394
def DeactivateMasterIp(master_params, use_external_mip_script):
395
  """Deactivate the master IP on this node.
396

397
  @type master_params: L{objects.MasterNetworkParameters}
398
  @param master_params: network parameters of the master
399
  @type use_external_mip_script: boolean
400
  @param use_external_mip_script: whether to use an external master IP
401
    address setup script
402
  @raise RPCFail: in case of errors during the IP turndown
403

404
  """
405
  _RunMasterSetupScript(master_params, _MASTER_STOP,
406
                        use_external_mip_script)
407

    
408

    
409
def StopMasterDaemons():
410
  """Stop the master daemons on this node.
411

412
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
413

414
  @rtype: None
415

416
  """
417
  # TODO: log and report back to the caller the error failures; we
418
  # need to decide in which case we fail the RPC for this
419

    
420
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
421
  if result.failed:
422
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
423
                  " and error %s",
424
                  result.cmd, result.exit_code, result.output)
425

    
426

    
427
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
428
  """Change the netmask of the master IP.
429

430
  @param old_netmask: the old value of the netmask
431
  @param netmask: the new value of the netmask
432
  @param master_ip: the master IP
433
  @param master_netdev: the master network device
434

435
  """
436
  if old_netmask == netmask:
437
    return
438

    
439
  if not netutils.IPAddress.Own(master_ip):
440
    _Fail("The master IP address is not up, not attempting to change its"
441
          " netmask")
442

    
443
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
444
                         "%s/%s" % (master_ip, netmask),
445
                         "dev", master_netdev, "label",
446
                         "%s:0" % master_netdev])
447
  if result.failed:
448
    _Fail("Could not set the new netmask on the master IP address")
449

    
450
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
451
                         "%s/%s" % (master_ip, old_netmask),
452
                         "dev", master_netdev, "label",
453
                         "%s:0" % master_netdev])
454
  if result.failed:
455
    _Fail("Could not bring down the master IP address with the old netmask")
456

    
457

    
458
def EtcHostsModify(mode, host, ip):
459
  """Modify a host entry in /etc/hosts.
460

461
  @param mode: The mode to operate. Either add or remove entry
462
  @param host: The host to operate on
463
  @param ip: The ip associated with the entry
464

465
  """
466
  if mode == constants.ETC_HOSTS_ADD:
467
    if not ip:
468
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
469
              " present")
470
    utils.AddHostToEtcHosts(host, ip)
471
  elif mode == constants.ETC_HOSTS_REMOVE:
472
    if ip:
473
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
474
              " parameter is present")
475
    utils.RemoveHostFromEtcHosts(host)
476
  else:
477
    RPCFail("Mode not supported")
478

    
479

    
480
def LeaveCluster(modify_ssh_setup):
481
  """Cleans up and remove the current node.
482

483
  This function cleans up and prepares the current node to be removed
484
  from the cluster.
485

486
  If processing is successful, then it raises an
487
  L{errors.QuitGanetiException} which is used as a special case to
488
  shutdown the node daemon.
489

490
  @param modify_ssh_setup: boolean
491

492
  """
493
  _CleanDirectory(pathutils.DATA_DIR)
494
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
495
  JobQueuePurge()
496

    
497
  if modify_ssh_setup:
498
    try:
499
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
500

    
501
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
502

    
503
      utils.RemoveFile(priv_key)
504
      utils.RemoveFile(pub_key)
505
    except errors.OpExecError:
506
      logging.exception("Error while processing ssh files")
507

    
508
  try:
509
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
510
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
511
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
512
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
513
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
514
  except: # pylint: disable=W0702
515
    logging.exception("Error while removing cluster secrets")
516

    
517
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
518
  if result.failed:
519
    logging.error("Command %s failed with exitcode %s and error %s",
520
                  result.cmd, result.exit_code, result.output)
521

    
522
  # Raise a custom exception (handled in ganeti-noded)
523
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
524

    
525

    
526
def _GetVgInfo(name):
527
  """Retrieves information about a LVM volume group.
528

529
  """
530
  # TODO: GetVGInfo supports returning information for multiple VGs at once
531
  vginfo = bdev.LogicalVolume.GetVGInfo([name])
532
  if vginfo:
533
    vg_free = int(round(vginfo[0][0], 0))
534
    vg_size = int(round(vginfo[0][1], 0))
535
  else:
536
    vg_free = None
537
    vg_size = None
538

    
539
  return {
540
    "name": name,
541
    "vg_free": vg_free,
542
    "vg_size": vg_size,
543
    }
544

    
545

    
546
def _GetHvInfo(name):
547
  """Retrieves node information from a hypervisor.
548

549
  The information returned depends on the hypervisor. Common items:
550

551
    - vg_size is the size of the configured volume group in MiB
552
    - vg_free is the free size of the volume group in MiB
553
    - memory_dom0 is the memory allocated for domain0 in MiB
554
    - memory_free is the currently available (free) ram in MiB
555
    - memory_total is the total number of ram in MiB
556
    - hv_version: the hypervisor version, if available
557

558
  """
559
  return hypervisor.GetHypervisor(name).GetNodeInfo()
560

    
561

    
562
def _GetNamedNodeInfo(names, fn):
563
  """Calls C{fn} for all names in C{names} and returns a dictionary.
564

565
  @rtype: None or dict
566

567
  """
568
  if names is None:
569
    return None
570
  else:
571
    return map(fn, names)
572

    
573

    
574
def GetNodeInfo(vg_names, hv_names):
575
  """Gives back a hash with different information about the node.
576

577
  @type vg_names: list of string
578
  @param vg_names: Names of the volume groups to ask for disk space information
579
  @type hv_names: list of string
580
  @param hv_names: Names of the hypervisors to ask for node information
581
  @rtype: tuple; (string, None/dict, None/dict)
582
  @return: Tuple containing boot ID, volume group information and hypervisor
583
    information
584

585
  """
586
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
587
  vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
588
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
589

    
590
  return (bootid, vg_info, hv_info)
591

    
592

    
593
def VerifyNode(what, cluster_name):
594
  """Verify the status of the local node.
595

596
  Based on the input L{what} parameter, various checks are done on the
597
  local node.
598

599
  If the I{filelist} key is present, this list of
600
  files is checksummed and the file/checksum pairs are returned.
601

602
  If the I{nodelist} key is present, we check that we have
603
  connectivity via ssh with the target nodes (and check the hostname
604
  report).
605

606
  If the I{node-net-test} key is present, we check that we have
607
  connectivity to the given nodes via both primary IP and, if
608
  applicable, secondary IPs.
609

610
  @type what: C{dict}
611
  @param what: a dictionary of things to check:
612
      - filelist: list of files for which to compute checksums
613
      - nodelist: list of nodes we should check ssh communication with
614
      - node-net-test: list of nodes we should check node daemon port
615
        connectivity with
616
      - hypervisor: list with hypervisors to run the verify for
617
  @rtype: dict
618
  @return: a dictionary with the same keys as the input dict, and
619
      values representing the result of the checks
620

621
  """
622
  result = {}
623
  my_name = netutils.Hostname.GetSysName()
624
  port = netutils.GetDaemonPort(constants.NODED)
625
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
626

    
627
  if constants.NV_HYPERVISOR in what and vm_capable:
628
    result[constants.NV_HYPERVISOR] = tmp = {}
629
    for hv_name in what[constants.NV_HYPERVISOR]:
630
      try:
631
        val = hypervisor.GetHypervisor(hv_name).Verify()
632
      except errors.HypervisorError, err:
633
        val = "Error while checking hypervisor: %s" % str(err)
634
      tmp[hv_name] = val
635

    
636
  if constants.NV_HVPARAMS in what and vm_capable:
637
    result[constants.NV_HVPARAMS] = tmp = []
638
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
639
      try:
640
        logging.info("Validating hv %s, %s", hv_name, hvparms)
641
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
642
      except errors.HypervisorError, err:
643
        tmp.append((source, hv_name, str(err)))
644

    
645
  if constants.NV_FILELIST in what:
646
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
647
      what[constants.NV_FILELIST])
648

    
649
  if constants.NV_NODELIST in what:
650
    (nodes, bynode) = what[constants.NV_NODELIST]
651

    
652
    # Add nodes from other groups (different for each node)
653
    try:
654
      nodes.extend(bynode[my_name])
655
    except KeyError:
656
      pass
657

    
658
    # Use a random order
659
    random.shuffle(nodes)
660

    
661
    # Try to contact all nodes
662
    val = {}
663
    for node in nodes:
664
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
665
      if not success:
666
        val[node] = message
667

    
668
    result[constants.NV_NODELIST] = val
669

    
670
  if constants.NV_NODENETTEST in what:
671
    result[constants.NV_NODENETTEST] = tmp = {}
672
    my_pip = my_sip = None
673
    for name, pip, sip in what[constants.NV_NODENETTEST]:
674
      if name == my_name:
675
        my_pip = pip
676
        my_sip = sip
677
        break
678
    if not my_pip:
679
      tmp[my_name] = ("Can't find my own primary/secondary IP"
680
                      " in the node list")
681
    else:
682
      for name, pip, sip in what[constants.NV_NODENETTEST]:
683
        fail = []
684
        if not netutils.TcpPing(pip, port, source=my_pip):
685
          fail.append("primary")
686
        if sip != pip:
687
          if not netutils.TcpPing(sip, port, source=my_sip):
688
            fail.append("secondary")
689
        if fail:
690
          tmp[name] = ("failure using the %s interface(s)" %
691
                       " and ".join(fail))
692

    
693
  if constants.NV_MASTERIP in what:
694
    # FIXME: add checks on incoming data structures (here and in the
695
    # rest of the function)
696
    master_name, master_ip = what[constants.NV_MASTERIP]
697
    if master_name == my_name:
698
      source = constants.IP4_ADDRESS_LOCALHOST
699
    else:
700
      source = None
701
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
702
                                                     source=source)
703

    
704
  if constants.NV_USERSCRIPTS in what:
705
    result[constants.NV_USERSCRIPTS] = \
706
      [script for script in what[constants.NV_USERSCRIPTS]
707
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
708

    
709
  if constants.NV_OOB_PATHS in what:
710
    result[constants.NV_OOB_PATHS] = tmp = []
711
    for path in what[constants.NV_OOB_PATHS]:
712
      try:
713
        st = os.stat(path)
714
      except OSError, err:
715
        tmp.append("error stating out of band helper: %s" % err)
716
      else:
717
        if stat.S_ISREG(st.st_mode):
718
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
719
            tmp.append(None)
720
          else:
721
            tmp.append("out of band helper %s is not executable" % path)
722
        else:
723
          tmp.append("out of band helper %s is not a file" % path)
724

    
725
  if constants.NV_LVLIST in what and vm_capable:
726
    try:
727
      val = GetVolumeList(utils.ListVolumeGroups().keys())
728
    except RPCFail, err:
729
      val = str(err)
730
    result[constants.NV_LVLIST] = val
731

    
732
  if constants.NV_INSTANCELIST in what and vm_capable:
733
    # GetInstanceList can fail
734
    try:
735
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
736
    except RPCFail, err:
737
      val = str(err)
738
    result[constants.NV_INSTANCELIST] = val
739

    
740
  if constants.NV_VGLIST in what and vm_capable:
741
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
742

    
743
  if constants.NV_PVLIST in what and vm_capable:
744
    result[constants.NV_PVLIST] = \
745
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
746
                                   filter_allocatable=False)
747

    
748
  if constants.NV_VERSION in what:
749
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
750
                                    constants.RELEASE_VERSION)
751

    
752
  if constants.NV_HVINFO in what and vm_capable:
753
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
754
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
755

    
756
  if constants.NV_DRBDLIST in what and vm_capable:
757
    try:
758
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
759
    except errors.BlockDeviceError, err:
760
      logging.warning("Can't get used minors list", exc_info=True)
761
      used_minors = str(err)
762
    result[constants.NV_DRBDLIST] = used_minors
763

    
764
  if constants.NV_DRBDHELPER in what and vm_capable:
765
    status = True
766
    try:
767
      payload = bdev.BaseDRBD.GetUsermodeHelper()
768
    except errors.BlockDeviceError, err:
769
      logging.error("Can't get DRBD usermode helper: %s", str(err))
770
      status = False
771
      payload = str(err)
772
    result[constants.NV_DRBDHELPER] = (status, payload)
773

    
774
  if constants.NV_NODESETUP in what:
775
    result[constants.NV_NODESETUP] = tmpr = []
776
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
777
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
778
                  " under /sys, missing required directories /sys/block"
779
                  " and /sys/class/net")
780
    if (not os.path.isdir("/proc/sys") or
781
        not os.path.isfile("/proc/sysrq-trigger")):
782
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
783
                  " under /proc, missing required directory /proc/sys and"
784
                  " the file /proc/sysrq-trigger")
785

    
786
  if constants.NV_TIME in what:
787
    result[constants.NV_TIME] = utils.SplitTime(time.time())
788

    
789
  if constants.NV_OSLIST in what and vm_capable:
790
    result[constants.NV_OSLIST] = DiagnoseOS()
791

    
792
  if constants.NV_BRIDGES in what and vm_capable:
793
    result[constants.NV_BRIDGES] = [bridge
794
                                    for bridge in what[constants.NV_BRIDGES]
795
                                    if not utils.BridgeExists(bridge)]
796
  return result
797

    
798

    
799
def GetBlockDevSizes(devices):
800
  """Return the size of the given block devices
801

802
  @type devices: list
803
  @param devices: list of block device nodes to query
804
  @rtype: dict
805
  @return:
806
    dictionary of all block devices under /dev (key). The value is their
807
    size in MiB.
808

809
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
810

811
  """
812
  DEV_PREFIX = "/dev/"
813
  blockdevs = {}
814

    
815
  for devpath in devices:
816
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
817
      continue
818

    
819
    try:
820
      st = os.stat(devpath)
821
    except EnvironmentError, err:
822
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
823
      continue
824

    
825
    if stat.S_ISBLK(st.st_mode):
826
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
827
      if result.failed:
828
        # We don't want to fail, just do not list this device as available
829
        logging.warning("Cannot get size for block device %s", devpath)
830
        continue
831

    
832
      size = int(result.stdout) / (1024 * 1024)
833
      blockdevs[devpath] = size
834
  return blockdevs
835

    
836

    
837
def GetVolumeList(vg_names):
838
  """Compute list of logical volumes and their size.
839

840
  @type vg_names: list
841
  @param vg_names: the volume groups whose LVs we should list, or
842
      empty for all volume groups
843
  @rtype: dict
844
  @return:
845
      dictionary of all partions (key) with value being a tuple of
846
      their size (in MiB), inactive and online status::
847

848
        {'xenvg/test1': ('20.06', True, True)}
849

850
      in case of errors, a string is returned with the error
851
      details.
852

853
  """
854
  lvs = {}
855
  sep = "|"
856
  if not vg_names:
857
    vg_names = []
858
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
859
                         "--separator=%s" % sep,
860
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
861
  if result.failed:
862
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
863

    
864
  for line in result.stdout.splitlines():
865
    line = line.strip()
866
    match = _LVSLINE_REGEX.match(line)
867
    if not match:
868
      logging.error("Invalid line returned from lvs output: '%s'", line)
869
      continue
870
    vg_name, name, size, attr = match.groups()
871
    inactive = attr[4] == "-"
872
    online = attr[5] == "o"
873
    virtual = attr[0] == "v"
874
    if virtual:
875
      # we don't want to report such volumes as existing, since they
876
      # don't really hold data
877
      continue
878
    lvs[vg_name + "/" + name] = (size, inactive, online)
879

    
880
  return lvs
881

    
882

    
883
def ListVolumeGroups():
884
  """List the volume groups and their size.
885

886
  @rtype: dict
887
  @return: dictionary with keys volume name and values the
888
      size of the volume
889

890
  """
891
  return utils.ListVolumeGroups()
892

    
893

    
894
def NodeVolumes():
895
  """List all volumes on this node.
896

897
  @rtype: list
898
  @return:
899
    A list of dictionaries, each having four keys:
900
      - name: the logical volume name,
901
      - size: the size of the logical volume
902
      - dev: the physical device on which the LV lives
903
      - vg: the volume group to which it belongs
904

905
    In case of errors, we return an empty list and log the
906
    error.
907

908
    Note that since a logical volume can live on multiple physical
909
    volumes, the resulting list might include a logical volume
910
    multiple times.
911

912
  """
913
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
914
                         "--separator=|",
915
                         "--options=lv_name,lv_size,devices,vg_name"])
916
  if result.failed:
917
    _Fail("Failed to list logical volumes, lvs output: %s",
918
          result.output)
919

    
920
  def parse_dev(dev):
921
    return dev.split("(")[0]
922

    
923
  def handle_dev(dev):
924
    return [parse_dev(x) for x in dev.split(",")]
925

    
926
  def map_line(line):
927
    line = [v.strip() for v in line]
928
    return [{"name": line[0], "size": line[1],
929
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
930

    
931
  all_devs = []
932
  for line in result.stdout.splitlines():
933
    if line.count("|") >= 3:
934
      all_devs.extend(map_line(line.split("|")))
935
    else:
936
      logging.warning("Strange line in the output from lvs: '%s'", line)
937
  return all_devs
938

    
939

    
940
def BridgesExist(bridges_list):
941
  """Check if a list of bridges exist on the current node.
942

943
  @rtype: boolean
944
  @return: C{True} if all of them exist, C{False} otherwise
945

946
  """
947
  missing = []
948
  for bridge in bridges_list:
949
    if not utils.BridgeExists(bridge):
950
      missing.append(bridge)
951

    
952
  if missing:
953
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
954

    
955

    
956
def GetInstanceList(hypervisor_list):
957
  """Provides a list of instances.
958

959
  @type hypervisor_list: list
960
  @param hypervisor_list: the list of hypervisors to query information
961

962
  @rtype: list
963
  @return: a list of all running instances on the current node
964
    - instance1.example.com
965
    - instance2.example.com
966

967
  """
968
  results = []
969
  for hname in hypervisor_list:
970
    try:
971
      names = hypervisor.GetHypervisor(hname).ListInstances()
972
      results.extend(names)
973
    except errors.HypervisorError, err:
974
      _Fail("Error enumerating instances (hypervisor %s): %s",
975
            hname, err, exc=True)
976

    
977
  return results
978

    
979

    
980
def GetInstanceInfo(instance, hname):
981
  """Gives back the information about an instance as a dictionary.
982

983
  @type instance: string
984
  @param instance: the instance name
985
  @type hname: string
986
  @param hname: the hypervisor type of the instance
987

988
  @rtype: dict
989
  @return: dictionary with the following keys:
990
      - memory: memory size of instance (int)
991
      - state: xen state of instance (string)
992
      - time: cpu time of instance (float)
993

994
  """
995
  output = {}
996

    
997
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
998
  if iinfo is not None:
999
    output["memory"] = iinfo[2]
1000
    output["state"] = iinfo[4]
1001
    output["time"] = iinfo[5]
1002

    
1003
  return output
1004

    
1005

    
1006
def GetInstanceMigratable(instance):
1007
  """Gives whether an instance can be migrated.
1008

1009
  @type instance: L{objects.Instance}
1010
  @param instance: object representing the instance to be checked.
1011

1012
  @rtype: tuple
1013
  @return: tuple of (result, description) where:
1014
      - result: whether the instance can be migrated or not
1015
      - description: a description of the issue, if relevant
1016

1017
  """
1018
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1019
  iname = instance.name
1020
  if iname not in hyper.ListInstances():
1021
    _Fail("Instance %s is not running", iname)
1022

    
1023
  for idx in range(len(instance.disks)):
1024
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1025
    if not os.path.islink(link_name):
1026
      logging.warning("Instance %s is missing symlink %s for disk %d",
1027
                      iname, link_name, idx)
1028

    
1029

    
1030
def GetAllInstancesInfo(hypervisor_list):
1031
  """Gather data about all instances.
1032

1033
  This is the equivalent of L{GetInstanceInfo}, except that it
1034
  computes data for all instances at once, thus being faster if one
1035
  needs data about more than one instance.
1036

1037
  @type hypervisor_list: list
1038
  @param hypervisor_list: list of hypervisors to query for instance data
1039

1040
  @rtype: dict
1041
  @return: dictionary of instance: data, with data having the following keys:
1042
      - memory: memory size of instance (int)
1043
      - state: xen state of instance (string)
1044
      - time: cpu time of instance (float)
1045
      - vcpus: the number of vcpus
1046

1047
  """
1048
  output = {}
1049

    
1050
  for hname in hypervisor_list:
1051
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1052
    if iinfo:
1053
      for name, _, memory, vcpus, state, times in iinfo:
1054
        value = {
1055
          "memory": memory,
1056
          "vcpus": vcpus,
1057
          "state": state,
1058
          "time": times,
1059
          }
1060
        if name in output:
1061
          # we only check static parameters, like memory and vcpus,
1062
          # and not state and time which can change between the
1063
          # invocations of the different hypervisors
1064
          for key in "memory", "vcpus":
1065
            if value[key] != output[name][key]:
1066
              _Fail("Instance %s is running twice"
1067
                    " with different parameters", name)
1068
        output[name] = value
1069

    
1070
  return output
1071

    
1072

    
1073
def _InstanceLogName(kind, os_name, instance, component):
1074
  """Compute the OS log filename for a given instance and operation.
1075

1076
  The instance name and os name are passed in as strings since not all
1077
  operations have these as part of an instance object.
1078

1079
  @type kind: string
1080
  @param kind: the operation type (e.g. add, import, etc.)
1081
  @type os_name: string
1082
  @param os_name: the os name
1083
  @type instance: string
1084
  @param instance: the name of the instance being imported/added/etc.
1085
  @type component: string or None
1086
  @param component: the name of the component of the instance being
1087
      transferred
1088

1089
  """
1090
  # TODO: Use tempfile.mkstemp to create unique filename
1091
  if component:
1092
    assert "/" not in component
1093
    c_msg = "-%s" % component
1094
  else:
1095
    c_msg = ""
1096
  base = ("%s-%s-%s%s-%s.log" %
1097
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1098
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1099

    
1100

    
1101
def InstanceOsAdd(instance, reinstall, debug):
1102
  """Add an OS to an instance.
1103

1104
  @type instance: L{objects.Instance}
1105
  @param instance: Instance whose OS is to be installed
1106
  @type reinstall: boolean
1107
  @param reinstall: whether this is an instance reinstall
1108
  @type debug: integer
1109
  @param debug: debug level, passed to the OS scripts
1110
  @rtype: None
1111

1112
  """
1113
  inst_os = OSFromDisk(instance.os)
1114

    
1115
  create_env = OSEnvironment(instance, inst_os, debug)
1116
  if reinstall:
1117
    create_env["INSTANCE_REINSTALL"] = "1"
1118

    
1119
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1120

    
1121
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1122
                        cwd=inst_os.path, output=logfile, reset_env=True)
1123
  if result.failed:
1124
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1125
                  " output: %s", result.cmd, result.fail_reason, logfile,
1126
                  result.output)
1127
    lines = [utils.SafeEncode(val)
1128
             for val in utils.TailFile(logfile, lines=20)]
1129
    _Fail("OS create script failed (%s), last lines in the"
1130
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1131

    
1132

    
1133
def RunRenameInstance(instance, old_name, debug):
1134
  """Run the OS rename script for an instance.
1135

1136
  @type instance: L{objects.Instance}
1137
  @param instance: Instance whose OS is to be installed
1138
  @type old_name: string
1139
  @param old_name: previous instance name
1140
  @type debug: integer
1141
  @param debug: debug level, passed to the OS scripts
1142
  @rtype: boolean
1143
  @return: the success of the operation
1144

1145
  """
1146
  inst_os = OSFromDisk(instance.os)
1147

    
1148
  rename_env = OSEnvironment(instance, inst_os, debug)
1149
  rename_env["OLD_INSTANCE_NAME"] = old_name
1150

    
1151
  logfile = _InstanceLogName("rename", instance.os,
1152
                             "%s-%s" % (old_name, instance.name), None)
1153

    
1154
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1155
                        cwd=inst_os.path, output=logfile, reset_env=True)
1156

    
1157
  if result.failed:
1158
    logging.error("os create command '%s' returned error: %s output: %s",
1159
                  result.cmd, result.fail_reason, result.output)
1160
    lines = [utils.SafeEncode(val)
1161
             for val in utils.TailFile(logfile, lines=20)]
1162
    _Fail("OS rename script failed (%s), last lines in the"
1163
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1164

    
1165

    
1166
def _GetBlockDevSymlinkPath(instance_name, idx):
1167
  return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1168
                        (instance_name, constants.DISK_SEPARATOR, idx))
1169

    
1170

    
1171
def _SymlinkBlockDev(instance_name, device_path, idx):
1172
  """Set up symlinks to a instance's block device.
1173

1174
  This is an auxiliary function run when an instance is start (on the primary
1175
  node) or when an instance is migrated (on the target node).
1176

1177

1178
  @param instance_name: the name of the target instance
1179
  @param device_path: path of the physical block device, on the node
1180
  @param idx: the disk index
1181
  @return: absolute path to the disk's symlink
1182

1183
  """
1184
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1185
  try:
1186
    os.symlink(device_path, link_name)
1187
  except OSError, err:
1188
    if err.errno == errno.EEXIST:
1189
      if (not os.path.islink(link_name) or
1190
          os.readlink(link_name) != device_path):
1191
        os.remove(link_name)
1192
        os.symlink(device_path, link_name)
1193
    else:
1194
      raise
1195

    
1196
  return link_name
1197

    
1198

    
1199
def _RemoveBlockDevLinks(instance_name, disks):
1200
  """Remove the block device symlinks belonging to the given instance.
1201

1202
  """
1203
  for idx, _ in enumerate(disks):
1204
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1205
    if os.path.islink(link_name):
1206
      try:
1207
        os.remove(link_name)
1208
      except OSError:
1209
        logging.exception("Can't remove symlink '%s'", link_name)
1210

    
1211

    
1212
def _GatherAndLinkBlockDevs(instance):
1213
  """Set up an instance's block device(s).
1214

1215
  This is run on the primary node at instance startup. The block
1216
  devices must be already assembled.
1217

1218
  @type instance: L{objects.Instance}
1219
  @param instance: the instance whose disks we shoul assemble
1220
  @rtype: list
1221
  @return: list of (disk_object, device_path)
1222

1223
  """
1224
  block_devices = []
1225
  for idx, disk in enumerate(instance.disks):
1226
    device = _RecursiveFindBD(disk)
1227
    if device is None:
1228
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1229
                                    str(disk))
1230
    device.Open()
1231
    try:
1232
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1233
    except OSError, e:
1234
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1235
                                    e.strerror)
1236

    
1237
    block_devices.append((disk, link_name))
1238

    
1239
  return block_devices
1240

    
1241

    
1242
def StartInstance(instance, startup_paused):
1243
  """Start an instance.
1244

1245
  @type instance: L{objects.Instance}
1246
  @param instance: the instance object
1247
  @type startup_paused: bool
1248
  @param instance: pause instance at startup?
1249
  @rtype: None
1250

1251
  """
1252
  running_instances = GetInstanceList([instance.hypervisor])
1253

    
1254
  if instance.name in running_instances:
1255
    logging.info("Instance %s already running, not starting", instance.name)
1256
    return
1257

    
1258
  try:
1259
    block_devices = _GatherAndLinkBlockDevs(instance)
1260
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1261
    hyper.StartInstance(instance, block_devices, startup_paused)
1262
  except errors.BlockDeviceError, err:
1263
    _Fail("Block device error: %s", err, exc=True)
1264
  except errors.HypervisorError, err:
1265
    _RemoveBlockDevLinks(instance.name, instance.disks)
1266
    _Fail("Hypervisor error: %s", err, exc=True)
1267

    
1268

    
1269
def InstanceShutdown(instance, timeout):
1270
  """Shut an instance down.
1271

1272
  @note: this functions uses polling with a hardcoded timeout.
1273

1274
  @type instance: L{objects.Instance}
1275
  @param instance: the instance object
1276
  @type timeout: integer
1277
  @param timeout: maximum timeout for soft shutdown
1278
  @rtype: None
1279

1280
  """
1281
  hv_name = instance.hypervisor
1282
  hyper = hypervisor.GetHypervisor(hv_name)
1283
  iname = instance.name
1284

    
1285
  if instance.name not in hyper.ListInstances():
1286
    logging.info("Instance %s not running, doing nothing", iname)
1287
    return
1288

    
1289
  class _TryShutdown:
1290
    def __init__(self):
1291
      self.tried_once = False
1292

    
1293
    def __call__(self):
1294
      if iname not in hyper.ListInstances():
1295
        return
1296

    
1297
      try:
1298
        hyper.StopInstance(instance, retry=self.tried_once)
1299
      except errors.HypervisorError, err:
1300
        if iname not in hyper.ListInstances():
1301
          # if the instance is no longer existing, consider this a
1302
          # success and go to cleanup
1303
          return
1304

    
1305
        _Fail("Failed to stop instance %s: %s", iname, err)
1306

    
1307
      self.tried_once = True
1308

    
1309
      raise utils.RetryAgain()
1310

    
1311
  try:
1312
    utils.Retry(_TryShutdown(), 5, timeout)
1313
  except utils.RetryTimeout:
1314
    # the shutdown did not succeed
1315
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1316

    
1317
    try:
1318
      hyper.StopInstance(instance, force=True)
1319
    except errors.HypervisorError, err:
1320
      if iname in hyper.ListInstances():
1321
        # only raise an error if the instance still exists, otherwise
1322
        # the error could simply be "instance ... unknown"!
1323
        _Fail("Failed to force stop instance %s: %s", iname, err)
1324

    
1325
    time.sleep(1)
1326

    
1327
    if iname in hyper.ListInstances():
1328
      _Fail("Could not shutdown instance %s even by destroy", iname)
1329

    
1330
  try:
1331
    hyper.CleanupInstance(instance.name)
1332
  except errors.HypervisorError, err:
1333
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1334

    
1335
  _RemoveBlockDevLinks(iname, instance.disks)
1336

    
1337

    
1338
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1339
  """Reboot an instance.
1340

1341
  @type instance: L{objects.Instance}
1342
  @param instance: the instance object to reboot
1343
  @type reboot_type: str
1344
  @param reboot_type: the type of reboot, one the following
1345
    constants:
1346
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1347
        instance OS, do not recreate the VM
1348
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1349
        restart the VM (at the hypervisor level)
1350
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1351
        not accepted here, since that mode is handled differently, in
1352
        cmdlib, and translates into full stop and start of the
1353
        instance (instead of a call_instance_reboot RPC)
1354
  @type shutdown_timeout: integer
1355
  @param shutdown_timeout: maximum timeout for soft shutdown
1356
  @rtype: None
1357

1358
  """
1359
  running_instances = GetInstanceList([instance.hypervisor])
1360

    
1361
  if instance.name not in running_instances:
1362
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1363

    
1364
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1365
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1366
    try:
1367
      hyper.RebootInstance(instance)
1368
    except errors.HypervisorError, err:
1369
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1370
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1371
    try:
1372
      InstanceShutdown(instance, shutdown_timeout)
1373
      return StartInstance(instance, False)
1374
    except errors.HypervisorError, err:
1375
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1376
  else:
1377
    _Fail("Invalid reboot_type received: %s", reboot_type)
1378

    
1379

    
1380
def InstanceBalloonMemory(instance, memory):
1381
  """Resize an instance's memory.
1382

1383
  @type instance: L{objects.Instance}
1384
  @param instance: the instance object
1385
  @type memory: int
1386
  @param memory: new memory amount in MB
1387
  @rtype: None
1388

1389
  """
1390
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1391
  running = hyper.ListInstances()
1392
  if instance.name not in running:
1393
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1394
    return
1395
  try:
1396
    hyper.BalloonInstanceMemory(instance, memory)
1397
  except errors.HypervisorError, err:
1398
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1399

    
1400

    
1401
def MigrationInfo(instance):
1402
  """Gather information about an instance to be migrated.
1403

1404
  @type instance: L{objects.Instance}
1405
  @param instance: the instance definition
1406

1407
  """
1408
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1409
  try:
1410
    info = hyper.MigrationInfo(instance)
1411
  except errors.HypervisorError, err:
1412
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1413
  return info
1414

    
1415

    
1416
def AcceptInstance(instance, info, target):
1417
  """Prepare the node to accept an instance.
1418

1419
  @type instance: L{objects.Instance}
1420
  @param instance: the instance definition
1421
  @type info: string/data (opaque)
1422
  @param info: migration information, from the source node
1423
  @type target: string
1424
  @param target: target host (usually ip), on this node
1425

1426
  """
1427
  # TODO: why is this required only for DTS_EXT_MIRROR?
1428
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1429
    # Create the symlinks, as the disks are not active
1430
    # in any way
1431
    try:
1432
      _GatherAndLinkBlockDevs(instance)
1433
    except errors.BlockDeviceError, err:
1434
      _Fail("Block device error: %s", err, exc=True)
1435

    
1436
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1437
  try:
1438
    hyper.AcceptInstance(instance, info, target)
1439
  except errors.HypervisorError, err:
1440
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1441
      _RemoveBlockDevLinks(instance.name, instance.disks)
1442
    _Fail("Failed to accept instance: %s", err, exc=True)
1443

    
1444

    
1445
def FinalizeMigrationDst(instance, info, success):
1446
  """Finalize any preparation to accept an instance.
1447

1448
  @type instance: L{objects.Instance}
1449
  @param instance: the instance definition
1450
  @type info: string/data (opaque)
1451
  @param info: migration information, from the source node
1452
  @type success: boolean
1453
  @param success: whether the migration was a success or a failure
1454

1455
  """
1456
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1457
  try:
1458
    hyper.FinalizeMigrationDst(instance, info, success)
1459
  except errors.HypervisorError, err:
1460
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1461

    
1462

    
1463
def MigrateInstance(instance, target, live):
1464
  """Migrates an instance to another node.
1465

1466
  @type instance: L{objects.Instance}
1467
  @param instance: the instance definition
1468
  @type target: string
1469
  @param target: the target node name
1470
  @type live: boolean
1471
  @param live: whether the migration should be done live or not (the
1472
      interpretation of this parameter is left to the hypervisor)
1473
  @raise RPCFail: if migration fails for some reason
1474

1475
  """
1476
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1477

    
1478
  try:
1479
    hyper.MigrateInstance(instance, target, live)
1480
  except errors.HypervisorError, err:
1481
    _Fail("Failed to migrate instance: %s", err, exc=True)
1482

    
1483

    
1484
def FinalizeMigrationSource(instance, success, live):
1485
  """Finalize the instance migration on the source node.
1486

1487
  @type instance: L{objects.Instance}
1488
  @param instance: the instance definition of the migrated instance
1489
  @type success: bool
1490
  @param success: whether the migration succeeded or not
1491
  @type live: bool
1492
  @param live: whether the user requested a live migration or not
1493
  @raise RPCFail: If the execution fails for some reason
1494

1495
  """
1496
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1497

    
1498
  try:
1499
    hyper.FinalizeMigrationSource(instance, success, live)
1500
  except Exception, err:  # pylint: disable=W0703
1501
    _Fail("Failed to finalize the migration on the source node: %s", err,
1502
          exc=True)
1503

    
1504

    
1505
def GetMigrationStatus(instance):
1506
  """Get the migration status
1507

1508
  @type instance: L{objects.Instance}
1509
  @param instance: the instance that is being migrated
1510
  @rtype: L{objects.MigrationStatus}
1511
  @return: the status of the current migration (one of
1512
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1513
           progress info that can be retrieved from the hypervisor
1514
  @raise RPCFail: If the migration status cannot be retrieved
1515

1516
  """
1517
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1518
  try:
1519
    return hyper.GetMigrationStatus(instance)
1520
  except Exception, err:  # pylint: disable=W0703
1521
    _Fail("Failed to get migration status: %s", err, exc=True)
1522

    
1523

    
1524
def BlockdevCreate(disk, size, owner, on_primary, info):
1525
  """Creates a block device for an instance.
1526

1527
  @type disk: L{objects.Disk}
1528
  @param disk: the object describing the disk we should create
1529
  @type size: int
1530
  @param size: the size of the physical underlying device, in MiB
1531
  @type owner: str
1532
  @param owner: the name of the instance for which disk is created,
1533
      used for device cache data
1534
  @type on_primary: boolean
1535
  @param on_primary:  indicates if it is the primary node or not
1536
  @type info: string
1537
  @param info: string that will be sent to the physical device
1538
      creation, used for example to set (LVM) tags on LVs
1539

1540
  @return: the new unique_id of the device (this can sometime be
1541
      computed only after creation), or None. On secondary nodes,
1542
      it's not required to return anything.
1543

1544
  """
1545
  # TODO: remove the obsolete "size" argument
1546
  # pylint: disable=W0613
1547
  clist = []
1548
  if disk.children:
1549
    for child in disk.children:
1550
      try:
1551
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1552
      except errors.BlockDeviceError, err:
1553
        _Fail("Can't assemble device %s: %s", child, err)
1554
      if on_primary or disk.AssembleOnSecondary():
1555
        # we need the children open in case the device itself has to
1556
        # be assembled
1557
        try:
1558
          # pylint: disable=E1103
1559
          crdev.Open()
1560
        except errors.BlockDeviceError, err:
1561
          _Fail("Can't make child '%s' read-write: %s", child, err)
1562
      clist.append(crdev)
1563

    
1564
  try:
1565
    device = bdev.Create(disk, clist)
1566
  except errors.BlockDeviceError, err:
1567
    _Fail("Can't create block device: %s", err)
1568

    
1569
  if on_primary or disk.AssembleOnSecondary():
1570
    try:
1571
      device.Assemble()
1572
    except errors.BlockDeviceError, err:
1573
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1574
    if on_primary or disk.OpenOnSecondary():
1575
      try:
1576
        device.Open(force=True)
1577
      except errors.BlockDeviceError, err:
1578
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1579
    DevCacheManager.UpdateCache(device.dev_path, owner,
1580
                                on_primary, disk.iv_name)
1581

    
1582
  device.SetInfo(info)
1583

    
1584
  return device.unique_id
1585

    
1586

    
1587
def _WipeDevice(path, offset, size):
1588
  """This function actually wipes the device.
1589

1590
  @param path: The path to the device to wipe
1591
  @param offset: The offset in MiB in the file
1592
  @param size: The size in MiB to write
1593

1594
  """
1595
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1596
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1597
         "count=%d" % size]
1598
  result = utils.RunCmd(cmd)
1599

    
1600
  if result.failed:
1601
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1602
          result.fail_reason, result.output)
1603

    
1604

    
1605
def BlockdevWipe(disk, offset, size):
1606
  """Wipes a block device.
1607

1608
  @type disk: L{objects.Disk}
1609
  @param disk: the disk object we want to wipe
1610
  @type offset: int
1611
  @param offset: The offset in MiB in the file
1612
  @type size: int
1613
  @param size: The size in MiB to write
1614

1615
  """
1616
  try:
1617
    rdev = _RecursiveFindBD(disk)
1618
  except errors.BlockDeviceError:
1619
    rdev = None
1620

    
1621
  if not rdev:
1622
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1623

    
1624
  # Do cross verify some of the parameters
1625
  if offset > rdev.size:
1626
    _Fail("Offset is bigger than device size")
1627
  if (offset + size) > rdev.size:
1628
    _Fail("The provided offset and size to wipe is bigger than device size")
1629

    
1630
  _WipeDevice(rdev.dev_path, offset, size)
1631

    
1632

    
1633
def BlockdevPauseResumeSync(disks, pause):
1634
  """Pause or resume the sync of the block device.
1635

1636
  @type disks: list of L{objects.Disk}
1637
  @param disks: the disks object we want to pause/resume
1638
  @type pause: bool
1639
  @param pause: Wheater to pause or resume
1640

1641
  """
1642
  success = []
1643
  for disk in disks:
1644
    try:
1645
      rdev = _RecursiveFindBD(disk)
1646
    except errors.BlockDeviceError:
1647
      rdev = None
1648

    
1649
    if not rdev:
1650
      success.append((False, ("Cannot change sync for device %s:"
1651
                              " device not found" % disk.iv_name)))
1652
      continue
1653

    
1654
    result = rdev.PauseResumeSync(pause)
1655

    
1656
    if result:
1657
      success.append((result, None))
1658
    else:
1659
      if pause:
1660
        msg = "Pause"
1661
      else:
1662
        msg = "Resume"
1663
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1664

    
1665
  return success
1666

    
1667

    
1668
def BlockdevRemove(disk):
1669
  """Remove a block device.
1670

1671
  @note: This is intended to be called recursively.
1672

1673
  @type disk: L{objects.Disk}
1674
  @param disk: the disk object we should remove
1675
  @rtype: boolean
1676
  @return: the success of the operation
1677

1678
  """
1679
  msgs = []
1680
  try:
1681
    rdev = _RecursiveFindBD(disk)
1682
  except errors.BlockDeviceError, err:
1683
    # probably can't attach
1684
    logging.info("Can't attach to device %s in remove", disk)
1685
    rdev = None
1686
  if rdev is not None:
1687
    r_path = rdev.dev_path
1688
    try:
1689
      rdev.Remove()
1690
    except errors.BlockDeviceError, err:
1691
      msgs.append(str(err))
1692
    if not msgs:
1693
      DevCacheManager.RemoveCache(r_path)
1694

    
1695
  if disk.children:
1696
    for child in disk.children:
1697
      try:
1698
        BlockdevRemove(child)
1699
      except RPCFail, err:
1700
        msgs.append(str(err))
1701

    
1702
  if msgs:
1703
    _Fail("; ".join(msgs))
1704

    
1705

    
1706
def _RecursiveAssembleBD(disk, owner, as_primary):
1707
  """Activate a block device for an instance.
1708

1709
  This is run on the primary and secondary nodes for an instance.
1710

1711
  @note: this function is called recursively.
1712

1713
  @type disk: L{objects.Disk}
1714
  @param disk: the disk we try to assemble
1715
  @type owner: str
1716
  @param owner: the name of the instance which owns the disk
1717
  @type as_primary: boolean
1718
  @param as_primary: if we should make the block device
1719
      read/write
1720

1721
  @return: the assembled device or None (in case no device
1722
      was assembled)
1723
  @raise errors.BlockDeviceError: in case there is an error
1724
      during the activation of the children or the device
1725
      itself
1726

1727
  """
1728
  children = []
1729
  if disk.children:
1730
    mcn = disk.ChildrenNeeded()
1731
    if mcn == -1:
1732
      mcn = 0 # max number of Nones allowed
1733
    else:
1734
      mcn = len(disk.children) - mcn # max number of Nones
1735
    for chld_disk in disk.children:
1736
      try:
1737
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1738
      except errors.BlockDeviceError, err:
1739
        if children.count(None) >= mcn:
1740
          raise
1741
        cdev = None
1742
        logging.error("Error in child activation (but continuing): %s",
1743
                      str(err))
1744
      children.append(cdev)
1745

    
1746
  if as_primary or disk.AssembleOnSecondary():
1747
    r_dev = bdev.Assemble(disk, children)
1748
    result = r_dev
1749
    if as_primary or disk.OpenOnSecondary():
1750
      r_dev.Open()
1751
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1752
                                as_primary, disk.iv_name)
1753

    
1754
  else:
1755
    result = True
1756
  return result
1757

    
1758

    
1759
def BlockdevAssemble(disk, owner, as_primary, idx):
1760
  """Activate a block device for an instance.
1761

1762
  This is a wrapper over _RecursiveAssembleBD.
1763

1764
  @rtype: str or boolean
1765
  @return: a C{/dev/...} path for primary nodes, and
1766
      C{True} for secondary nodes
1767

1768
  """
1769
  try:
1770
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1771
    if isinstance(result, bdev.BlockDev):
1772
      # pylint: disable=E1103
1773
      result = result.dev_path
1774
      if as_primary:
1775
        _SymlinkBlockDev(owner, result, idx)
1776
  except errors.BlockDeviceError, err:
1777
    _Fail("Error while assembling disk: %s", err, exc=True)
1778
  except OSError, err:
1779
    _Fail("Error while symlinking disk: %s", err, exc=True)
1780

    
1781
  return result
1782

    
1783

    
1784
def BlockdevShutdown(disk):
1785
  """Shut down a block device.
1786

1787
  First, if the device is assembled (Attach() is successful), then
1788
  the device is shutdown. Then the children of the device are
1789
  shutdown.
1790

1791
  This function is called recursively. Note that we don't cache the
1792
  children or such, as oppossed to assemble, shutdown of different
1793
  devices doesn't require that the upper device was active.
1794

1795
  @type disk: L{objects.Disk}
1796
  @param disk: the description of the disk we should
1797
      shutdown
1798
  @rtype: None
1799

1800
  """
1801
  msgs = []
1802
  r_dev = _RecursiveFindBD(disk)
1803
  if r_dev is not None:
1804
    r_path = r_dev.dev_path
1805
    try:
1806
      r_dev.Shutdown()
1807
      DevCacheManager.RemoveCache(r_path)
1808
    except errors.BlockDeviceError, err:
1809
      msgs.append(str(err))
1810

    
1811
  if disk.children:
1812
    for child in disk.children:
1813
      try:
1814
        BlockdevShutdown(child)
1815
      except RPCFail, err:
1816
        msgs.append(str(err))
1817

    
1818
  if msgs:
1819
    _Fail("; ".join(msgs))
1820

    
1821

    
1822
def BlockdevAddchildren(parent_cdev, new_cdevs):
1823
  """Extend a mirrored block device.
1824

1825
  @type parent_cdev: L{objects.Disk}
1826
  @param parent_cdev: the disk to which we should add children
1827
  @type new_cdevs: list of L{objects.Disk}
1828
  @param new_cdevs: the list of children which we should add
1829
  @rtype: None
1830

1831
  """
1832
  parent_bdev = _RecursiveFindBD(parent_cdev)
1833
  if parent_bdev is None:
1834
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1835
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1836
  if new_bdevs.count(None) > 0:
1837
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1838
  parent_bdev.AddChildren(new_bdevs)
1839

    
1840

    
1841
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1842
  """Shrink a mirrored block device.
1843

1844
  @type parent_cdev: L{objects.Disk}
1845
  @param parent_cdev: the disk from which we should remove children
1846
  @type new_cdevs: list of L{objects.Disk}
1847
  @param new_cdevs: the list of children which we should remove
1848
  @rtype: None
1849

1850
  """
1851
  parent_bdev = _RecursiveFindBD(parent_cdev)
1852
  if parent_bdev is None:
1853
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1854
  devs = []
1855
  for disk in new_cdevs:
1856
    rpath = disk.StaticDevPath()
1857
    if rpath is None:
1858
      bd = _RecursiveFindBD(disk)
1859
      if bd is None:
1860
        _Fail("Can't find device %s while removing children", disk)
1861
      else:
1862
        devs.append(bd.dev_path)
1863
    else:
1864
      if not utils.IsNormAbsPath(rpath):
1865
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1866
      devs.append(rpath)
1867
  parent_bdev.RemoveChildren(devs)
1868

    
1869

    
1870
def BlockdevGetmirrorstatus(disks):
1871
  """Get the mirroring status of a list of devices.
1872

1873
  @type disks: list of L{objects.Disk}
1874
  @param disks: the list of disks which we should query
1875
  @rtype: disk
1876
  @return: List of L{objects.BlockDevStatus}, one for each disk
1877
  @raise errors.BlockDeviceError: if any of the disks cannot be
1878
      found
1879

1880
  """
1881
  stats = []
1882
  for dsk in disks:
1883
    rbd = _RecursiveFindBD(dsk)
1884
    if rbd is None:
1885
      _Fail("Can't find device %s", dsk)
1886

    
1887
    stats.append(rbd.CombinedSyncStatus())
1888

    
1889
  return stats
1890

    
1891

    
1892
def BlockdevGetmirrorstatusMulti(disks):
1893
  """Get the mirroring status of a list of devices.
1894

1895
  @type disks: list of L{objects.Disk}
1896
  @param disks: the list of disks which we should query
1897
  @rtype: disk
1898
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1899
    success/failure, status is L{objects.BlockDevStatus} on success, string
1900
    otherwise
1901

1902
  """
1903
  result = []
1904
  for disk in disks:
1905
    try:
1906
      rbd = _RecursiveFindBD(disk)
1907
      if rbd is None:
1908
        result.append((False, "Can't find device %s" % disk))
1909
        continue
1910

    
1911
      status = rbd.CombinedSyncStatus()
1912
    except errors.BlockDeviceError, err:
1913
      logging.exception("Error while getting disk status")
1914
      result.append((False, str(err)))
1915
    else:
1916
      result.append((True, status))
1917

    
1918
  assert len(disks) == len(result)
1919

    
1920
  return result
1921

    
1922

    
1923
def _RecursiveFindBD(disk):
1924
  """Check if a device is activated.
1925

1926
  If so, return information about the real device.
1927

1928
  @type disk: L{objects.Disk}
1929
  @param disk: the disk object we need to find
1930

1931
  @return: None if the device can't be found,
1932
      otherwise the device instance
1933

1934
  """
1935
  children = []
1936
  if disk.children:
1937
    for chdisk in disk.children:
1938
      children.append(_RecursiveFindBD(chdisk))
1939

    
1940
  return bdev.FindDevice(disk, children)
1941

    
1942

    
1943
def _OpenRealBD(disk):
1944
  """Opens the underlying block device of a disk.
1945

1946
  @type disk: L{objects.Disk}
1947
  @param disk: the disk object we want to open
1948

1949
  """
1950
  real_disk = _RecursiveFindBD(disk)
1951
  if real_disk is None:
1952
    _Fail("Block device '%s' is not set up", disk)
1953

    
1954
  real_disk.Open()
1955

    
1956
  return real_disk
1957

    
1958

    
1959
def BlockdevFind(disk):
1960
  """Check if a device is activated.
1961

1962
  If it is, return information about the real device.
1963

1964
  @type disk: L{objects.Disk}
1965
  @param disk: the disk to find
1966
  @rtype: None or objects.BlockDevStatus
1967
  @return: None if the disk cannot be found, otherwise a the current
1968
           information
1969

1970
  """
1971
  try:
1972
    rbd = _RecursiveFindBD(disk)
1973
  except errors.BlockDeviceError, err:
1974
    _Fail("Failed to find device: %s", err, exc=True)
1975

    
1976
  if rbd is None:
1977
    return None
1978

    
1979
  return rbd.GetSyncStatus()
1980

    
1981

    
1982
def BlockdevGetsize(disks):
1983
  """Computes the size of the given disks.
1984

1985
  If a disk is not found, returns None instead.
1986

1987
  @type disks: list of L{objects.Disk}
1988
  @param disks: the list of disk to compute the size for
1989
  @rtype: list
1990
  @return: list with elements None if the disk cannot be found,
1991
      otherwise the size
1992

1993
  """
1994
  result = []
1995
  for cf in disks:
1996
    try:
1997
      rbd = _RecursiveFindBD(cf)
1998
    except errors.BlockDeviceError:
1999
      result.append(None)
2000
      continue
2001
    if rbd is None:
2002
      result.append(None)
2003
    else:
2004
      result.append(rbd.GetActualSize())
2005
  return result
2006

    
2007

    
2008
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2009
  """Export a block device to a remote node.
2010

2011
  @type disk: L{objects.Disk}
2012
  @param disk: the description of the disk to export
2013
  @type dest_node: str
2014
  @param dest_node: the destination node to export to
2015
  @type dest_path: str
2016
  @param dest_path: the destination path on the target node
2017
  @type cluster_name: str
2018
  @param cluster_name: the cluster name, needed for SSH hostalias
2019
  @rtype: None
2020

2021
  """
2022
  real_disk = _OpenRealBD(disk)
2023

    
2024
  # the block size on the read dd is 1MiB to match our units
2025
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2026
                               "dd if=%s bs=1048576 count=%s",
2027
                               real_disk.dev_path, str(disk.size))
2028

    
2029
  # we set here a smaller block size as, due to ssh buffering, more
2030
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2031
  # device is not already there or we pass a wrong path; we use
2032
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2033
  # to not buffer too much memory; this means that at best, we flush
2034
  # every 64k, which will not be very fast
2035
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2036
                                " oflag=dsync", dest_path)
2037

    
2038
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2039
                                                   constants.GANETI_RUNAS,
2040
                                                   destcmd)
2041

    
2042
  # all commands have been checked, so we're safe to combine them
2043
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2044

    
2045
  result = utils.RunCmd(["bash", "-c", command])
2046

    
2047
  if result.failed:
2048
    _Fail("Disk copy command '%s' returned error: %s"
2049
          " output: %s", command, result.fail_reason, result.output)
2050

    
2051

    
2052
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2053
  """Write a file to the filesystem.
2054

2055
  This allows the master to overwrite(!) a file. It will only perform
2056
  the operation if the file belongs to a list of configuration files.
2057

2058
  @type file_name: str
2059
  @param file_name: the target file name
2060
  @type data: str
2061
  @param data: the new contents of the file
2062
  @type mode: int
2063
  @param mode: the mode to give the file (can be None)
2064
  @type uid: string
2065
  @param uid: the owner of the file
2066
  @type gid: string
2067
  @param gid: the group of the file
2068
  @type atime: float
2069
  @param atime: the atime to set on the file (can be None)
2070
  @type mtime: float
2071
  @param mtime: the mtime to set on the file (can be None)
2072
  @rtype: None
2073

2074
  """
2075
  if not os.path.isabs(file_name):
2076
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2077

    
2078
  if file_name not in _ALLOWED_UPLOAD_FILES:
2079
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2080
          file_name)
2081

    
2082
  raw_data = _Decompress(data)
2083

    
2084
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2085
    _Fail("Invalid username/groupname type")
2086

    
2087
  getents = runtime.GetEnts()
2088
  uid = getents.LookupUser(uid)
2089
  gid = getents.LookupGroup(gid)
2090

    
2091
  utils.SafeWriteFile(file_name, None,
2092
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2093
                      atime=atime, mtime=mtime)
2094

    
2095

    
2096
def RunOob(oob_program, command, node, timeout):
2097
  """Executes oob_program with given command on given node.
2098

2099
  @param oob_program: The path to the executable oob_program
2100
  @param command: The command to invoke on oob_program
2101
  @param node: The node given as an argument to the program
2102
  @param timeout: Timeout after which we kill the oob program
2103

2104
  @return: stdout
2105
  @raise RPCFail: If execution fails for some reason
2106

2107
  """
2108
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2109

    
2110
  if result.failed:
2111
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2112
          result.fail_reason, result.output)
2113

    
2114
  return result.stdout
2115

    
2116

    
2117
def WriteSsconfFiles(values):
2118
  """Update all ssconf files.
2119

2120
  Wrapper around the SimpleStore.WriteFiles.
2121

2122
  """
2123
  ssconf.SimpleStore().WriteFiles(values)
2124

    
2125

    
2126
def _OSOndiskAPIVersion(os_dir):
2127
  """Compute and return the API version of a given OS.
2128

2129
  This function will try to read the API version of the OS residing in
2130
  the 'os_dir' directory.
2131

2132
  @type os_dir: str
2133
  @param os_dir: the directory in which we should look for the OS
2134
  @rtype: tuple
2135
  @return: tuple (status, data) with status denoting the validity and
2136
      data holding either the vaid versions or an error message
2137

2138
  """
2139
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2140

    
2141
  try:
2142
    st = os.stat(api_file)
2143
  except EnvironmentError, err:
2144
    return False, ("Required file '%s' not found under path %s: %s" %
2145
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2146

    
2147
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2148
    return False, ("File '%s' in %s is not a regular file" %
2149
                   (constants.OS_API_FILE, os_dir))
2150

    
2151
  try:
2152
    api_versions = utils.ReadFile(api_file).splitlines()
2153
  except EnvironmentError, err:
2154
    return False, ("Error while reading the API version file at %s: %s" %
2155
                   (api_file, utils.ErrnoOrStr(err)))
2156

    
2157
  try:
2158
    api_versions = [int(version.strip()) for version in api_versions]
2159
  except (TypeError, ValueError), err:
2160
    return False, ("API version(s) can't be converted to integer: %s" %
2161
                   str(err))
2162

    
2163
  return True, api_versions
2164

    
2165

    
2166
def DiagnoseOS(top_dirs=None):
2167
  """Compute the validity for all OSes.
2168

2169
  @type top_dirs: list
2170
  @param top_dirs: the list of directories in which to
2171
      search (if not given defaults to
2172
      L{pathutils.OS_SEARCH_PATH})
2173
  @rtype: list of L{objects.OS}
2174
  @return: a list of tuples (name, path, status, diagnose, variants,
2175
      parameters, api_version) for all (potential) OSes under all
2176
      search paths, where:
2177
          - name is the (potential) OS name
2178
          - path is the full path to the OS
2179
          - status True/False is the validity of the OS
2180
          - diagnose is the error message for an invalid OS, otherwise empty
2181
          - variants is a list of supported OS variants, if any
2182
          - parameters is a list of (name, help) parameters, if any
2183
          - api_version is a list of support OS API versions
2184

2185
  """
2186
  if top_dirs is None:
2187
    top_dirs = pathutils.OS_SEARCH_PATH
2188

    
2189
  result = []
2190
  for dir_name in top_dirs:
2191
    if os.path.isdir(dir_name):
2192
      try:
2193
        f_names = utils.ListVisibleFiles(dir_name)
2194
      except EnvironmentError, err:
2195
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2196
        break
2197
      for name in f_names:
2198
        os_path = utils.PathJoin(dir_name, name)
2199
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2200
        if status:
2201
          diagnose = ""
2202
          variants = os_inst.supported_variants
2203
          parameters = os_inst.supported_parameters
2204
          api_versions = os_inst.api_versions
2205
        else:
2206
          diagnose = os_inst
2207
          variants = parameters = api_versions = []
2208
        result.append((name, os_path, status, diagnose, variants,
2209
                       parameters, api_versions))
2210

    
2211
  return result
2212

    
2213

    
2214
def _TryOSFromDisk(name, base_dir=None):
2215
  """Create an OS instance from disk.
2216

2217
  This function will return an OS instance if the given name is a
2218
  valid OS name.
2219

2220
  @type base_dir: string
2221
  @keyword base_dir: Base directory containing OS installations.
2222
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2223
  @rtype: tuple
2224
  @return: success and either the OS instance if we find a valid one,
2225
      or error message
2226

2227
  """
2228
  if base_dir is None:
2229
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2230
  else:
2231
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2232

    
2233
  if os_dir is None:
2234
    return False, "Directory for OS %s not found in search path" % name
2235

    
2236
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2237
  if not status:
2238
    # push the error up
2239
    return status, api_versions
2240

    
2241
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2242
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2243
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2244

    
2245
  # OS Files dictionary, we will populate it with the absolute path
2246
  # names; if the value is True, then it is a required file, otherwise
2247
  # an optional one
2248
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2249

    
2250
  if max(api_versions) >= constants.OS_API_V15:
2251
    os_files[constants.OS_VARIANTS_FILE] = False
2252

    
2253
  if max(api_versions) >= constants.OS_API_V20:
2254
    os_files[constants.OS_PARAMETERS_FILE] = True
2255
  else:
2256
    del os_files[constants.OS_SCRIPT_VERIFY]
2257

    
2258
  for (filename, required) in os_files.items():
2259
    os_files[filename] = utils.PathJoin(os_dir, filename)
2260

    
2261
    try:
2262
      st = os.stat(os_files[filename])
2263
    except EnvironmentError, err:
2264
      if err.errno == errno.ENOENT and not required:
2265
        del os_files[filename]
2266
        continue
2267
      return False, ("File '%s' under path '%s' is missing (%s)" %
2268
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2269

    
2270
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2271
      return False, ("File '%s' under path '%s' is not a regular file" %
2272
                     (filename, os_dir))
2273

    
2274
    if filename in constants.OS_SCRIPTS:
2275
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2276
        return False, ("File '%s' under path '%s' is not executable" %
2277
                       (filename, os_dir))
2278

    
2279
  variants = []
2280
  if constants.OS_VARIANTS_FILE in os_files:
2281
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2282
    try:
2283
      variants = utils.ReadFile(variants_file).splitlines()
2284
    except EnvironmentError, err:
2285
      # we accept missing files, but not other errors
2286
      if err.errno != errno.ENOENT:
2287
        return False, ("Error while reading the OS variants file at %s: %s" %
2288
                       (variants_file, utils.ErrnoOrStr(err)))
2289

    
2290
  parameters = []
2291
  if constants.OS_PARAMETERS_FILE in os_files:
2292
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2293
    try:
2294
      parameters = utils.ReadFile(parameters_file).splitlines()
2295
    except EnvironmentError, err:
2296
      return False, ("Error while reading the OS parameters file at %s: %s" %
2297
                     (parameters_file, utils.ErrnoOrStr(err)))
2298
    parameters = [v.split(None, 1) for v in parameters]
2299

    
2300
  os_obj = objects.OS(name=name, path=os_dir,
2301
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2302
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2303
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2304
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2305
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2306
                                                 None),
2307
                      supported_variants=variants,
2308
                      supported_parameters=parameters,
2309
                      api_versions=api_versions)
2310
  return True, os_obj
2311

    
2312

    
2313
def OSFromDisk(name, base_dir=None):
2314
  """Create an OS instance from disk.
2315

2316
  This function will return an OS instance if the given name is a
2317
  valid OS name. Otherwise, it will raise an appropriate
2318
  L{RPCFail} exception, detailing why this is not a valid OS.
2319

2320
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2321
  an exception but returns true/false status data.
2322

2323
  @type base_dir: string
2324
  @keyword base_dir: Base directory containing OS installations.
2325
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2326
  @rtype: L{objects.OS}
2327
  @return: the OS instance if we find a valid one
2328
  @raise RPCFail: if we don't find a valid OS
2329

2330
  """
2331
  name_only = objects.OS.GetName(name)
2332
  status, payload = _TryOSFromDisk(name_only, base_dir)
2333

    
2334
  if not status:
2335
    _Fail(payload)
2336

    
2337
  return payload
2338

    
2339

    
2340
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2341
  """Calculate the basic environment for an os script.
2342

2343
  @type os_name: str
2344
  @param os_name: full operating system name (including variant)
2345
  @type inst_os: L{objects.OS}
2346
  @param inst_os: operating system for which the environment is being built
2347
  @type os_params: dict
2348
  @param os_params: the OS parameters
2349
  @type debug: integer
2350
  @param debug: debug level (0 or 1, for OS Api 10)
2351
  @rtype: dict
2352
  @return: dict of environment variables
2353
  @raise errors.BlockDeviceError: if the block device
2354
      cannot be found
2355

2356
  """
2357
  result = {}
2358
  api_version = \
2359
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2360
  result["OS_API_VERSION"] = "%d" % api_version
2361
  result["OS_NAME"] = inst_os.name
2362
  result["DEBUG_LEVEL"] = "%d" % debug
2363

    
2364
  # OS variants
2365
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2366
    variant = objects.OS.GetVariant(os_name)
2367
    if not variant:
2368
      variant = inst_os.supported_variants[0]
2369
  else:
2370
    variant = ""
2371
  result["OS_VARIANT"] = variant
2372

    
2373
  # OS params
2374
  for pname, pvalue in os_params.items():
2375
    result["OSP_%s" % pname.upper()] = pvalue
2376

    
2377
  # Set a default path otherwise programs called by OS scripts (or
2378
  # even hooks called from OS scripts) might break, and we don't want
2379
  # to have each script require setting a PATH variable
2380
  result["PATH"] = constants.HOOKS_PATH
2381

    
2382
  return result
2383

    
2384

    
2385
def OSEnvironment(instance, inst_os, debug=0):
2386
  """Calculate the environment for an os script.
2387

2388
  @type instance: L{objects.Instance}
2389
  @param instance: target instance for the os script run
2390
  @type inst_os: L{objects.OS}
2391
  @param inst_os: operating system for which the environment is being built
2392
  @type debug: integer
2393
  @param debug: debug level (0 or 1, for OS Api 10)
2394
  @rtype: dict
2395
  @return: dict of environment variables
2396
  @raise errors.BlockDeviceError: if the block device
2397
      cannot be found
2398

2399
  """
2400
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2401

    
2402
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2403
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2404

    
2405
  result["HYPERVISOR"] = instance.hypervisor
2406
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2407
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2408
  result["INSTANCE_SECONDARY_NODES"] = \
2409
      ("%s" % " ".join(instance.secondary_nodes))
2410

    
2411
  # Disks
2412
  for idx, disk in enumerate(instance.disks):
2413
    real_disk = _OpenRealBD(disk)
2414
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2415
    result["DISK_%d_ACCESS" % idx] = disk.mode
2416
    if constants.HV_DISK_TYPE in instance.hvparams:
2417
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2418
        instance.hvparams[constants.HV_DISK_TYPE]
2419
    if disk.dev_type in constants.LDS_BLOCK:
2420
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2421
    elif disk.dev_type == constants.LD_FILE:
2422
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2423
        "file:%s" % disk.physical_id[0]
2424

    
2425
  # NICs
2426
  for idx, nic in enumerate(instance.nics):
2427
    result["NIC_%d_MAC" % idx] = nic.mac
2428
    if nic.ip:
2429
      result["NIC_%d_IP" % idx] = nic.ip
2430
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2431
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2432
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2433
    if nic.nicparams[constants.NIC_LINK]:
2434
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2435
    if constants.HV_NIC_TYPE in instance.hvparams:
2436
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2437
        instance.hvparams[constants.HV_NIC_TYPE]
2438

    
2439
  # HV/BE params
2440
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2441
    for key, value in source.items():
2442
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2443

    
2444
  return result
2445

    
2446

    
2447
def BlockdevGrow(disk, amount, dryrun, backingstore):
2448
  """Grow a stack of block devices.
2449

2450
  This function is called recursively, with the childrens being the
2451
  first ones to resize.
2452

2453
  @type disk: L{objects.Disk}
2454
  @param disk: the disk to be grown
2455
  @type amount: integer
2456
  @param amount: the amount (in mebibytes) to grow with
2457
  @type dryrun: boolean
2458
  @param dryrun: whether to execute the operation in simulation mode
2459
      only, without actually increasing the size
2460
  @param backingstore: whether to execute the operation on backing storage
2461
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2462
      whereas LVM, file, RBD are backing storage
2463
  @rtype: (status, result)
2464
  @return: a tuple with the status of the operation (True/False), and
2465
      the errors message if status is False
2466

2467
  """
2468
  r_dev = _RecursiveFindBD(disk)
2469
  if r_dev is None:
2470
    _Fail("Cannot find block device %s", disk)
2471

    
2472
  try:
2473
    r_dev.Grow(amount, dryrun, backingstore)
2474
  except errors.BlockDeviceError, err:
2475
    _Fail("Failed to grow block device: %s", err, exc=True)
2476

    
2477

    
2478
def BlockdevSnapshot(disk):
2479
  """Create a snapshot copy of a block device.
2480

2481
  This function is called recursively, and the snapshot is actually created
2482
  just for the leaf lvm backend device.
2483

2484
  @type disk: L{objects.Disk}
2485
  @param disk: the disk to be snapshotted
2486
  @rtype: string
2487
  @return: snapshot disk ID as (vg, lv)
2488

2489
  """
2490
  if disk.dev_type == constants.LD_DRBD8:
2491
    if not disk.children:
2492
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2493
            disk.unique_id)
2494
    return BlockdevSnapshot(disk.children[0])
2495
  elif disk.dev_type == constants.LD_LV:
2496
    r_dev = _RecursiveFindBD(disk)
2497
    if r_dev is not None:
2498
      # FIXME: choose a saner value for the snapshot size
2499
      # let's stay on the safe side and ask for the full size, for now
2500
      return r_dev.Snapshot(disk.size)
2501
    else:
2502
      _Fail("Cannot find block device %s", disk)
2503
  else:
2504
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2505
          disk.unique_id, disk.dev_type)
2506

    
2507

    
2508
def FinalizeExport(instance, snap_disks):
2509
  """Write out the export configuration information.
2510

2511
  @type instance: L{objects.Instance}
2512
  @param instance: the instance which we export, used for
2513
      saving configuration
2514
  @type snap_disks: list of L{objects.Disk}
2515
  @param snap_disks: list of snapshot block devices, which
2516
      will be used to get the actual name of the dump file
2517

2518
  @rtype: None
2519

2520
  """
2521
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2522
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2523

    
2524
  config = objects.SerializableConfigParser()
2525

    
2526
  config.add_section(constants.INISECT_EXP)
2527
  config.set(constants.INISECT_EXP, "version", "0")
2528
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2529
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2530
  config.set(constants.INISECT_EXP, "os", instance.os)
2531
  config.set(constants.INISECT_EXP, "compression", "none")
2532

    
2533
  config.add_section(constants.INISECT_INS)
2534
  config.set(constants.INISECT_INS, "name", instance.name)
2535
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2536
             instance.beparams[constants.BE_MAXMEM])
2537
  config.set(constants.INISECT_INS, "minmem", "%d" %
2538
             instance.beparams[constants.BE_MINMEM])
2539
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2540
  config.set(constants.INISECT_INS, "memory", "%d" %
2541
             instance.beparams[constants.BE_MAXMEM])
2542
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2543
             instance.beparams[constants.BE_VCPUS])
2544
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2545
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2546
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2547

    
2548
  nic_total = 0
2549
  for nic_count, nic in enumerate(instance.nics):
2550
    nic_total += 1
2551
    config.set(constants.INISECT_INS, "nic%d_mac" %
2552
               nic_count, "%s" % nic.mac)
2553
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2554
    for param in constants.NICS_PARAMETER_TYPES:
2555
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2556
                 "%s" % nic.nicparams.get(param, None))
2557
  # TODO: redundant: on load can read nics until it doesn't exist
2558
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2559

    
2560
  disk_total = 0
2561
  for disk_count, disk in enumerate(snap_disks):
2562
    if disk:
2563
      disk_total += 1
2564
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2565
                 ("%s" % disk.iv_name))
2566
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2567
                 ("%s" % disk.physical_id[1]))
2568
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2569
                 ("%d" % disk.size))
2570

    
2571
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2572

    
2573
  # New-style hypervisor/backend parameters
2574

    
2575
  config.add_section(constants.INISECT_HYP)
2576
  for name, value in instance.hvparams.items():
2577
    if name not in constants.HVC_GLOBALS:
2578
      config.set(constants.INISECT_HYP, name, str(value))
2579

    
2580
  config.add_section(constants.INISECT_BEP)
2581
  for name, value in instance.beparams.items():
2582
    config.set(constants.INISECT_BEP, name, str(value))
2583

    
2584
  config.add_section(constants.INISECT_OSP)
2585
  for name, value in instance.osparams.items():
2586
    config.set(constants.INISECT_OSP, name, str(value))
2587

    
2588
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2589
                  data=config.Dumps())
2590
  shutil.rmtree(finaldestdir, ignore_errors=True)
2591
  shutil.move(destdir, finaldestdir)
2592

    
2593

    
2594
def ExportInfo(dest):
2595
  """Get export configuration information.
2596

2597
  @type dest: str
2598
  @param dest: directory containing the export
2599

2600
  @rtype: L{objects.SerializableConfigParser}
2601
  @return: a serializable config file containing the
2602
      export info
2603

2604
  """
2605
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2606

    
2607
  config = objects.SerializableConfigParser()
2608
  config.read(cff)
2609

    
2610
  if (not config.has_section(constants.INISECT_EXP) or
2611
      not config.has_section(constants.INISECT_INS)):
2612
    _Fail("Export info file doesn't have the required fields")
2613

    
2614
  return config.Dumps()
2615

    
2616

    
2617
def ListExports():
2618
  """Return a list of exports currently available on this machine.
2619

2620
  @rtype: list
2621
  @return: list of the exports
2622

2623
  """
2624
  if os.path.isdir(pathutils.EXPORT_DIR):
2625
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2626
  else:
2627
    _Fail("No exports directory")
2628

    
2629

    
2630
def RemoveExport(export):
2631
  """Remove an existing export from the node.
2632

2633
  @type export: str
2634
  @param export: the name of the export to remove
2635
  @rtype: None
2636

2637
  """
2638
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2639

    
2640
  try:
2641
    shutil.rmtree(target)
2642
  except EnvironmentError, err:
2643
    _Fail("Error while removing the export: %s", err, exc=True)
2644

    
2645

    
2646
def BlockdevRename(devlist):
2647
  """Rename a list of block devices.
2648

2649
  @type devlist: list of tuples
2650
  @param devlist: list of tuples of the form  (disk,
2651
      new_logical_id, new_physical_id); disk is an
2652
      L{objects.Disk} object describing the current disk,
2653
      and new logical_id/physical_id is the name we
2654
      rename it to
2655
  @rtype: boolean
2656
  @return: True if all renames succeeded, False otherwise
2657

2658
  """
2659
  msgs = []
2660
  result = True
2661
  for disk, unique_id in devlist:
2662
    dev = _RecursiveFindBD(disk)
2663
    if dev is None:
2664
      msgs.append("Can't find device %s in rename" % str(disk))
2665
      result = False
2666
      continue
2667
    try:
2668
      old_rpath = dev.dev_path
2669
      dev.Rename(unique_id)
2670
      new_rpath = dev.dev_path
2671
      if old_rpath != new_rpath:
2672
        DevCacheManager.RemoveCache(old_rpath)
2673
        # FIXME: we should add the new cache information here, like:
2674
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2675
        # but we don't have the owner here - maybe parse from existing
2676
        # cache? for now, we only lose lvm data when we rename, which
2677
        # is less critical than DRBD or MD
2678
    except errors.BlockDeviceError, err:
2679
      msgs.append("Can't rename device '%s' to '%s': %s" %
2680
                  (dev, unique_id, err))
2681
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2682
      result = False
2683
  if not result:
2684
    _Fail("; ".join(msgs))
2685

    
2686

    
2687
def _TransformFileStorageDir(fs_dir):
2688
  """Checks whether given file_storage_dir is valid.
2689

2690
  Checks wheter the given fs_dir is within the cluster-wide default
2691
  file_storage_dir or the shared_file_storage_dir, which are stored in
2692
  SimpleStore. Only paths under those directories are allowed.
2693

2694
  @type fs_dir: str
2695
  @param fs_dir: the path to check
2696

2697
  @return: the normalized path if valid, None otherwise
2698

2699
  """
2700
  if not constants.ENABLE_FILE_STORAGE:
2701
    _Fail("File storage disabled at configure time")
2702
  cfg = _GetConfig()
2703
  fs_dir = os.path.normpath(fs_dir)
2704
  base_fstore = cfg.GetFileStorageDir()
2705
  base_shared = cfg.GetSharedFileStorageDir()
2706
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2707
          utils.IsBelowDir(base_shared, fs_dir)):
2708
    _Fail("File storage directory '%s' is not under base file"
2709
          " storage directory '%s' or shared storage directory '%s'",
2710
          fs_dir, base_fstore, base_shared)
2711
  return fs_dir
2712

    
2713

    
2714
def CreateFileStorageDir(file_storage_dir):
2715
  """Create file storage directory.
2716

2717
  @type file_storage_dir: str
2718
  @param file_storage_dir: directory to create
2719

2720
  @rtype: tuple
2721
  @return: tuple with first element a boolean indicating wheter dir
2722
      creation was successful or not
2723

2724
  """
2725
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2726
  if os.path.exists(file_storage_dir):
2727
    if not os.path.isdir(file_storage_dir):
2728
      _Fail("Specified storage dir '%s' is not a directory",
2729
            file_storage_dir)
2730
  else:
2731
    try:
2732
      os.makedirs(file_storage_dir, 0750)
2733
    except OSError, err:
2734
      _Fail("Cannot create file storage directory '%s': %s",
2735
            file_storage_dir, err, exc=True)
2736

    
2737

    
2738
def RemoveFileStorageDir(file_storage_dir):
2739
  """Remove file storage directory.
2740

2741
  Remove it only if it's empty. If not log an error and return.
2742

2743
  @type file_storage_dir: str
2744
  @param file_storage_dir: the directory we should cleanup
2745
  @rtype: tuple (success,)
2746
  @return: tuple of one element, C{success}, denoting
2747
      whether the operation was successful
2748

2749
  """
2750
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2751
  if os.path.exists(file_storage_dir):
2752
    if not os.path.isdir(file_storage_dir):
2753
      _Fail("Specified Storage directory '%s' is not a directory",
2754
            file_storage_dir)
2755
    # deletes dir only if empty, otherwise we want to fail the rpc call
2756
    try:
2757
      os.rmdir(file_storage_dir)
2758
    except OSError, err:
2759
      _Fail("Cannot remove file storage directory '%s': %s",
2760
            file_storage_dir, err)
2761

    
2762

    
2763
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2764
  """Rename the file storage directory.
2765

2766
  @type old_file_storage_dir: str
2767
  @param old_file_storage_dir: the current path
2768
  @type new_file_storage_dir: str
2769
  @param new_file_storage_dir: the name we should rename to
2770
  @rtype: tuple (success,)
2771
  @return: tuple of one element, C{success}, denoting
2772
      whether the operation was successful
2773

2774
  """
2775
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2776
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2777
  if not os.path.exists(new_file_storage_dir):
2778
    if os.path.isdir(old_file_storage_dir):
2779
      try:
2780
        os.rename(old_file_storage_dir, new_file_storage_dir)
2781
      except OSError, err:
2782
        _Fail("Cannot rename '%s' to '%s': %s",
2783
              old_file_storage_dir, new_file_storage_dir, err)
2784
    else:
2785
      _Fail("Specified storage dir '%s' is not a directory",
2786
            old_file_storage_dir)
2787
  else:
2788
    if os.path.exists(old_file_storage_dir):
2789
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2790
            old_file_storage_dir, new_file_storage_dir)
2791

    
2792

    
2793
def _EnsureJobQueueFile(file_name):
2794
  """Checks whether the given filename is in the queue directory.
2795

2796
  @type file_name: str
2797
  @param file_name: the file name we should check
2798
  @rtype: None
2799
  @raises RPCFail: if the file is not valid
2800

2801
  """
2802
  queue_dir = os.path.normpath(pathutils.QUEUE_DIR)
2803
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2804

    
2805
  if not result:
2806
    _Fail("Passed job queue file '%s' does not belong to"
2807
          " the queue directory '%s'", file_name, queue_dir)
2808

    
2809

    
2810
def JobQueueUpdate(file_name, content):
2811
  """Updates a file in the queue directory.
2812

2813
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2814
  checking.
2815

2816
  @type file_name: str
2817
  @param file_name: the job file name
2818
  @type content: str
2819
  @param content: the new job contents
2820
  @rtype: boolean
2821
  @return: the success of the operation
2822

2823
  """
2824
  _EnsureJobQueueFile(file_name)
2825
  getents = runtime.GetEnts()
2826

    
2827
  # Write and replace the file atomically
2828
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2829
                  gid=getents.masterd_gid)
2830

    
2831

    
2832
def JobQueueRename(old, new):
2833
  """Renames a job queue file.
2834

2835
  This is just a wrapper over os.rename with proper checking.
2836

2837
  @type old: str
2838
  @param old: the old (actual) file name
2839
  @type new: str
2840
  @param new: the desired file name
2841
  @rtype: tuple
2842
  @return: the success of the operation and payload
2843

2844
  """
2845
  _EnsureJobQueueFile(old)
2846
  _EnsureJobQueueFile(new)
2847

    
2848
  getents = runtime.GetEnts()
2849

    
2850
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2851
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2852

    
2853

    
2854
def BlockdevClose(instance_name, disks):
2855
  """Closes the given block devices.
2856

2857
  This means they will be switched to secondary mode (in case of
2858
  DRBD).
2859

2860
  @param instance_name: if the argument is not empty, the symlinks
2861
      of this instance will be removed
2862
  @type disks: list of L{objects.Disk}
2863
  @param disks: the list of disks to be closed
2864
  @rtype: tuple (success, message)
2865
  @return: a tuple of success and message, where success
2866
      indicates the succes of the operation, and message
2867
      which will contain the error details in case we
2868
      failed
2869

2870
  """
2871
  bdevs = []
2872
  for cf in disks:
2873
    rd = _RecursiveFindBD(cf)
2874
    if rd is None:
2875
      _Fail("Can't find device %s", cf)
2876
    bdevs.append(rd)
2877

    
2878
  msg = []
2879
  for rd in bdevs:
2880
    try:
2881
      rd.Close()
2882
    except errors.BlockDeviceError, err:
2883
      msg.append(str(err))
2884
  if msg:
2885
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2886
  else:
2887
    if instance_name:
2888
      _RemoveBlockDevLinks(instance_name, disks)
2889

    
2890

    
2891
def ValidateHVParams(hvname, hvparams):
2892
  """Validates the given hypervisor parameters.
2893

2894
  @type hvname: string
2895
  @param hvname: the hypervisor name
2896
  @type hvparams: dict
2897
  @param hvparams: the hypervisor parameters to be validated
2898
  @rtype: None
2899

2900
  """
2901
  try:
2902
    hv_type = hypervisor.GetHypervisor(hvname)
2903
    hv_type.ValidateParameters(hvparams)
2904
  except errors.HypervisorError, err:
2905
    _Fail(str(err), log=False)
2906

    
2907

    
2908
def _CheckOSPList(os_obj, parameters):
2909
  """Check whether a list of parameters is supported by the OS.
2910

2911
  @type os_obj: L{objects.OS}
2912
  @param os_obj: OS object to check
2913
  @type parameters: list
2914
  @param parameters: the list of parameters to check
2915

2916
  """
2917
  supported = [v[0] for v in os_obj.supported_parameters]
2918
  delta = frozenset(parameters).difference(supported)
2919
  if delta:
2920
    _Fail("The following parameters are not supported"
2921
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2922

    
2923

    
2924
def ValidateOS(required, osname, checks, osparams):
2925
  """Validate the given OS' parameters.
2926

2927
  @type required: boolean
2928
  @param required: whether absence of the OS should translate into
2929
      failure or not
2930
  @type osname: string
2931
  @param osname: the OS to be validated
2932
  @type checks: list
2933
  @param checks: list of the checks to run (currently only 'parameters')
2934
  @type osparams: dict
2935
  @param osparams: dictionary with OS parameters
2936
  @rtype: boolean
2937
  @return: True if the validation passed, or False if the OS was not
2938
      found and L{required} was false
2939

2940
  """
2941
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2942
    _Fail("Unknown checks required for OS %s: %s", osname,
2943
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2944

    
2945
  name_only = objects.OS.GetName(osname)
2946
  status, tbv = _TryOSFromDisk(name_only, None)
2947

    
2948
  if not status:
2949
    if required:
2950
      _Fail(tbv)
2951
    else:
2952
      return False
2953

    
2954
  if max(tbv.api_versions) < constants.OS_API_V20:
2955
    return True
2956

    
2957
  if constants.OS_VALIDATE_PARAMETERS in checks:
2958
    _CheckOSPList(tbv, osparams.keys())
2959

    
2960
  validate_env = OSCoreEnv(osname, tbv, osparams)
2961
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2962
                        cwd=tbv.path, reset_env=True)
2963
  if result.failed:
2964
    logging.error("os validate command '%s' returned error: %s output: %s",
2965
                  result.cmd, result.fail_reason, result.output)
2966
    _Fail("OS validation script failed (%s), output: %s",
2967
          result.fail_reason, result.output, log=False)
2968

    
2969
  return True
2970

    
2971

    
2972
def DemoteFromMC():
2973
  """Demotes the current node from master candidate role.
2974

2975
  """
2976
  # try to ensure we're not the master by mistake
2977
  master, myself = ssconf.GetMasterAndMyself()
2978
  if master == myself:
2979
    _Fail("ssconf status shows I'm the master node, will not demote")
2980

    
2981
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
2982
  if not result.failed:
2983
    _Fail("The master daemon is running, will not demote")
2984

    
2985
  try:
2986
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
2987
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
2988
  except EnvironmentError, err:
2989
    if err.errno != errno.ENOENT:
2990
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2991

    
2992
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
2993

    
2994

    
2995
def _GetX509Filenames(cryptodir, name):
2996
  """Returns the full paths for the private key and certificate.
2997

2998
  """
2999
  return (utils.PathJoin(cryptodir, name),
3000
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3001
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3002

    
3003

    
3004
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3005
  """Creates a new X509 certificate for SSL/TLS.
3006

3007
  @type validity: int
3008
  @param validity: Validity in seconds
3009
  @rtype: tuple; (string, string)
3010
  @return: Certificate name and public part
3011

3012
  """
3013
  (key_pem, cert_pem) = \
3014
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3015
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3016

    
3017
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3018
                              prefix="x509-%s-" % utils.TimestampForFilename())
3019
  try:
3020
    name = os.path.basename(cert_dir)
3021
    assert len(name) > 5
3022

    
3023
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3024

    
3025
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3026
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3027

    
3028
    # Never return private key as it shouldn't leave the node
3029
    return (name, cert_pem)
3030
  except Exception:
3031
    shutil.rmtree(cert_dir, ignore_errors=True)
3032
    raise
3033

    
3034

    
3035
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3036
  """Removes a X509 certificate.
3037

3038
  @type name: string
3039
  @param name: Certificate name
3040

3041
  """
3042
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3043

    
3044
  utils.RemoveFile(key_file)
3045
  utils.RemoveFile(cert_file)
3046

    
3047
  try:
3048
    os.rmdir(cert_dir)
3049
  except EnvironmentError, err:
3050
    _Fail("Cannot remove certificate directory '%s': %s",
3051
          cert_dir, err)
3052

    
3053

    
3054
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3055
  """Returns the command for the requested input/output.
3056

3057
  @type instance: L{objects.Instance}
3058
  @param instance: The instance object
3059
  @param mode: Import/export mode
3060
  @param ieio: Input/output type
3061
  @param ieargs: Input/output arguments
3062

3063
  """
3064
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3065

    
3066
  env = None
3067
  prefix = None
3068
  suffix = None
3069
  exp_size = None
3070

    
3071
  if ieio == constants.IEIO_FILE:
3072
    (filename, ) = ieargs
3073

    
3074
    if not utils.IsNormAbsPath(filename):
3075
      _Fail("Path '%s' is not normalized or absolute", filename)
3076

    
3077
    real_filename = os.path.realpath(filename)
3078
    directory = os.path.dirname(real_filename)
3079

    
3080
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3081
      _Fail("File '%s' is not under exports directory '%s': %s",
3082
            filename, pathutils.EXPORT_DIR, real_filename)
3083

    
3084
    # Create directory
3085
    utils.Makedirs(directory, mode=0750)
3086

    
3087
    quoted_filename = utils.ShellQuote(filename)
3088

    
3089
    if mode == constants.IEM_IMPORT:
3090
      suffix = "> %s" % quoted_filename
3091
    elif mode == constants.IEM_EXPORT:
3092
      suffix = "< %s" % quoted_filename
3093

    
3094
      # Retrieve file size
3095
      try:
3096
        st = os.stat(filename)
3097
      except EnvironmentError, err:
3098
        logging.error("Can't stat(2) %s: %s", filename, err)
3099
      else:
3100
        exp_size = utils.BytesToMebibyte(st.st_size)
3101

    
3102
  elif ieio == constants.IEIO_RAW_DISK:
3103
    (disk, ) = ieargs
3104

    
3105
    real_disk = _OpenRealBD(disk)
3106

    
3107
    if mode == constants.IEM_IMPORT:
3108
      # we set here a smaller block size as, due to transport buffering, more
3109
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3110
      # is not already there or we pass a wrong path; we use notrunc to no
3111
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3112
      # much memory; this means that at best, we flush every 64k, which will
3113
      # not be very fast
3114
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3115
                                    " bs=%s oflag=dsync"),
3116
                                    real_disk.dev_path,
3117
                                    str(64 * 1024))
3118

    
3119
    elif mode == constants.IEM_EXPORT:
3120
      # the block size on the read dd is 1MiB to match our units
3121
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3122
                                   real_disk.dev_path,
3123
                                   str(1024 * 1024), # 1 MB
3124
                                   str(disk.size))
3125
      exp_size = disk.size
3126

    
3127
  elif ieio == constants.IEIO_SCRIPT:
3128
    (disk, disk_index, ) = ieargs
3129

    
3130
    assert isinstance(disk_index, (int, long))
3131

    
3132
    real_disk = _OpenRealBD(disk)
3133

    
3134
    inst_os = OSFromDisk(instance.os)
3135
    env = OSEnvironment(instance, inst_os)
3136

    
3137
    if mode == constants.IEM_IMPORT:
3138
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3139
      env["IMPORT_INDEX"] = str(disk_index)
3140
      script = inst_os.import_script
3141

    
3142
    elif mode == constants.IEM_EXPORT:
3143
      env["EXPORT_DEVICE"] = real_disk.dev_path
3144
      env["EXPORT_INDEX"] = str(disk_index)
3145
      script = inst_os.export_script
3146

    
3147
    # TODO: Pass special environment only to script
3148
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3149

    
3150
    if mode == constants.IEM_IMPORT:
3151
      suffix = "| %s" % script_cmd
3152

    
3153
    elif mode == constants.IEM_EXPORT:
3154
      prefix = "%s |" % script_cmd
3155

    
3156
    # Let script predict size
3157
    exp_size = constants.IE_CUSTOM_SIZE
3158

    
3159
  else:
3160
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3161

    
3162
  return (env, prefix, suffix, exp_size)
3163

    
3164

    
3165
def _CreateImportExportStatusDir(prefix):
3166
  """Creates status directory for import/export.
3167

3168
  """
3169
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3170
                          prefix=("%s-%s-" %
3171
                                  (prefix, utils.TimestampForFilename())))
3172

    
3173

    
3174
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3175
                            ieio, ieioargs):
3176
  """Starts an import or export daemon.
3177

3178
  @param mode: Import/output mode
3179
  @type opts: L{objects.ImportExportOptions}
3180
  @param opts: Daemon options
3181
  @type host: string
3182
  @param host: Remote host for export (None for import)
3183
  @type port: int
3184
  @param port: Remote port for export (None for import)
3185
  @type instance: L{objects.Instance}
3186
  @param instance: Instance object
3187
  @type component: string
3188
  @param component: which part of the instance is transferred now,
3189
      e.g. 'disk/0'
3190
  @param ieio: Input/output type
3191
  @param ieioargs: Input/output arguments
3192

3193
  """
3194
  if mode == constants.IEM_IMPORT:
3195
    prefix = "import"
3196

    
3197
    if not (host is None and port is None):
3198
      _Fail("Can not specify host or port on import")
3199

    
3200
  elif mode == constants.IEM_EXPORT:
3201
    prefix = "export"
3202

    
3203
    if host is None or port is None:
3204
      _Fail("Host and port must be specified for an export")
3205

    
3206
  else:
3207
    _Fail("Invalid mode %r", mode)
3208

    
3209
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3210
    _Fail("Cluster certificate can only be used for both key and CA")
3211

    
3212
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3213
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3214

    
3215
  if opts.key_name is None:
3216
    # Use server.pem
3217
    key_path = pathutils.NODED_CERT_FILE
3218
    cert_path = pathutils.NODED_CERT_FILE
3219
    assert opts.ca_pem is None
3220
  else:
3221
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3222
                                                 opts.key_name)
3223
    assert opts.ca_pem is not None
3224

    
3225
  for i in [key_path, cert_path]:
3226
    if not os.path.exists(i):
3227
      _Fail("File '%s' does not exist" % i)
3228

    
3229
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3230
  try:
3231
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3232
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3233
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3234

    
3235
    if opts.ca_pem is None:
3236
      # Use server.pem
3237
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3238
    else:
3239
      ca = opts.ca_pem
3240

    
3241
    # Write CA file
3242
    utils.WriteFile(ca_file, data=ca, mode=0400)
3243

    
3244
    cmd = [
3245
      pathutils.IMPORT_EXPORT_DAEMON,
3246
      status_file, mode,
3247
      "--key=%s" % key_path,
3248
      "--cert=%s" % cert_path,
3249
      "--ca=%s" % ca_file,
3250
      ]
3251

    
3252
    if host:
3253
      cmd.append("--host=%s" % host)
3254

    
3255
    if port:
3256
      cmd.append("--port=%s" % port)
3257

    
3258
    if opts.ipv6:
3259
      cmd.append("--ipv6")
3260
    else:
3261
      cmd.append("--ipv4")
3262

    
3263
    if opts.compress:
3264
      cmd.append("--compress=%s" % opts.compress)
3265

    
3266
    if opts.magic:
3267
      cmd.append("--magic=%s" % opts.magic)
3268

    
3269
    if exp_size is not None:
3270
      cmd.append("--expected-size=%s" % exp_size)
3271

    
3272
    if cmd_prefix:
3273
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3274

    
3275
    if cmd_suffix:
3276
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3277

    
3278
    if mode == constants.IEM_EXPORT:
3279
      # Retry connection a few times when connecting to remote peer
3280
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3281
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3282
    elif opts.connect_timeout is not None:
3283
      assert mode == constants.IEM_IMPORT
3284
      # Overall timeout for establishing connection while listening
3285
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3286

    
3287
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3288

    
3289
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3290
    # support for receiving a file descriptor for output
3291
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3292
                      output=logfile)
3293

    
3294
    # The import/export name is simply the status directory name
3295
    return os.path.basename(status_dir)
3296

    
3297
  except Exception:
3298
    shutil.rmtree(status_dir, ignore_errors=True)
3299
    raise
3300

    
3301

    
3302
def GetImportExportStatus(names):
3303
  """Returns import/export daemon status.
3304

3305
  @type names: sequence
3306
  @param names: List of names
3307
  @rtype: List of dicts
3308
  @return: Returns a list of the state of each named import/export or None if a
3309
           status couldn't be read
3310

3311
  """
3312
  result = []
3313

    
3314
  for name in names:
3315
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3316
                                 _IES_STATUS_FILE)
3317

    
3318
    try:
3319
      data = utils.ReadFile(status_file)
3320
    except EnvironmentError, err:
3321
      if err.errno != errno.ENOENT:
3322
        raise
3323
      data = None
3324

    
3325
    if not data:
3326
      result.append(None)
3327
      continue
3328

    
3329
    result.append(serializer.LoadJson(data))
3330

    
3331
  return result
3332

    
3333

    
3334
def AbortImportExport(name):
3335
  """Sends SIGTERM to a running import/export daemon.
3336

3337
  """
3338
  logging.info("Abort import/export %s", name)
3339

    
3340
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3341
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3342

    
3343
  if pid:
3344
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3345
                 name, pid)
3346
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3347

    
3348

    
3349
def CleanupImportExport(name):
3350
  """Cleanup after an import or export.
3351

3352
  If the import/export daemon is still running it's killed. Afterwards the
3353
  whole status directory is removed.
3354

3355
  """
3356
  logging.info("Finalizing import/export %s", name)
3357

    
3358
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3359

    
3360
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3361

    
3362
  if pid:
3363
    logging.info("Import/export %s is still running with PID %s",
3364
                 name, pid)
3365
    utils.KillProcess(pid, waitpid=False)
3366

    
3367
  shutil.rmtree(status_dir, ignore_errors=True)
3368

    
3369

    
3370
def _FindDisks(nodes_ip, disks):
3371
  """Sets the physical ID on disks and returns the block devices.
3372

3373
  """
3374
  # set the correct physical ID
3375
  my_name = netutils.Hostname.GetSysName()
3376
  for cf in disks:
3377
    cf.SetPhysicalID(my_name, nodes_ip)
3378

    
3379
  bdevs = []
3380

    
3381
  for cf in disks:
3382
    rd = _RecursiveFindBD(cf)
3383
    if rd is None:
3384
      _Fail("Can't find device %s", cf)
3385
    bdevs.append(rd)
3386
  return bdevs
3387

    
3388

    
3389
def DrbdDisconnectNet(nodes_ip, disks):
3390
  """Disconnects the network on a list of drbd devices.
3391

3392
  """
3393
  bdevs = _FindDisks(nodes_ip, disks)
3394

    
3395
  # disconnect disks
3396
  for rd in bdevs:
3397
    try:
3398
      rd.DisconnectNet()
3399
    except errors.BlockDeviceError, err:
3400
      _Fail("Can't change network configuration to standalone mode: %s",
3401
            err, exc=True)
3402

    
3403

    
3404
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3405
  """Attaches the network on a list of drbd devices.
3406

3407
  """
3408
  bdevs = _FindDisks(nodes_ip, disks)
3409

    
3410
  if multimaster:
3411
    for idx, rd in enumerate(bdevs):
3412
      try:
3413
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3414
      except EnvironmentError, err:
3415
        _Fail("Can't create symlink: %s", err)
3416
  # reconnect disks, switch to new master configuration and if
3417
  # needed primary mode
3418
  for rd in bdevs:
3419
    try:
3420
      rd.AttachNet(multimaster)
3421
    except errors.BlockDeviceError, err:
3422
      _Fail("Can't change network configuration: %s", err)
3423

    
3424
  # wait until the disks are connected; we need to retry the re-attach
3425
  # if the device becomes standalone, as this might happen if the one
3426
  # node disconnects and reconnects in a different mode before the
3427
  # other node reconnects; in this case, one or both of the nodes will
3428
  # decide it has wrong configuration and switch to standalone
3429

    
3430
  def _Attach():
3431
    all_connected = True
3432

    
3433
    for rd in bdevs:
3434
      stats = rd.GetProcStatus()
3435

    
3436
      all_connected = (all_connected and
3437
                       (stats.is_connected or stats.is_in_resync))
3438

    
3439
      if stats.is_standalone:
3440
        # peer had different config info and this node became
3441
        # standalone, even though this should not happen with the
3442
        # new staged way of changing disk configs
3443
        try:
3444
          rd.AttachNet(multimaster)
3445
        except errors.BlockDeviceError, err:
3446
          _Fail("Can't change network configuration: %s", err)
3447

    
3448
    if not all_connected:
3449
      raise utils.RetryAgain()
3450

    
3451
  try:
3452
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3453
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3454
  except utils.RetryTimeout:
3455
    _Fail("Timeout in disk reconnecting")
3456

    
3457
  if multimaster:
3458
    # change to primary mode
3459
    for rd in bdevs:
3460
      try:
3461
        rd.Open()
3462
      except errors.BlockDeviceError, err:
3463
        _Fail("Can't change to primary mode: %s", err)
3464

    
3465

    
3466
def DrbdWaitSync(nodes_ip, disks):
3467
  """Wait until DRBDs have synchronized.
3468

3469
  """
3470
  def _helper(rd):
3471
    stats = rd.GetProcStatus()
3472
    if not (stats.is_connected or stats.is_in_resync):
3473
      raise utils.RetryAgain()
3474
    return stats
3475

    
3476
  bdevs = _FindDisks(nodes_ip, disks)
3477

    
3478
  min_resync = 100
3479
  alldone = True
3480
  for rd in bdevs:
3481
    try:
3482
      # poll each second for 15 seconds
3483
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3484
    except utils.RetryTimeout:
3485
      stats = rd.GetProcStatus()
3486
      # last check
3487
      if not (stats.is_connected or stats.is_in_resync):
3488
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3489
    alldone = alldone and (not stats.is_in_resync)
3490
    if stats.sync_percent is not None:
3491
      min_resync = min(min_resync, stats.sync_percent)
3492

    
3493
  return (alldone, min_resync)
3494

    
3495

    
3496
def GetDrbdUsermodeHelper():
3497
  """Returns DRBD usermode helper currently configured.
3498

3499
  """
3500
  try:
3501
    return bdev.BaseDRBD.GetUsermodeHelper()
3502
  except errors.BlockDeviceError, err:
3503
    _Fail(str(err))
3504

    
3505

    
3506
def PowercycleNode(hypervisor_type):
3507
  """Hard-powercycle the node.
3508

3509
  Because we need to return first, and schedule the powercycle in the
3510
  background, we won't be able to report failures nicely.
3511

3512
  """
3513
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3514
  try:
3515
    pid = os.fork()
3516
  except OSError:
3517
    # if we can't fork, we'll pretend that we're in the child process
3518
    pid = 0
3519
  if pid > 0:
3520
    return "Reboot scheduled in 5 seconds"
3521
  # ensure the child is running on ram
3522
  try:
3523
    utils.Mlockall()
3524
  except Exception: # pylint: disable=W0703
3525
    pass
3526
  time.sleep(5)
3527
  hyper.PowercycleNode()
3528

    
3529

    
3530
class HooksRunner(object):
3531
  """Hook runner.
3532

3533
  This class is instantiated on the node side (ganeti-noded) and not
3534
  on the master side.
3535

3536
  """
3537
  def __init__(self, hooks_base_dir=None):
3538
    """Constructor for hooks runner.
3539

3540
    @type hooks_base_dir: str or None
3541
    @param hooks_base_dir: if not None, this overrides the
3542
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3543

3544
    """
3545
    if hooks_base_dir is None:
3546
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3547
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3548
    # constant
3549
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3550

    
3551
  def RunLocalHooks(self, node_list, hpath, phase, env):
3552
    """Check that the hooks will be run only locally and then run them.
3553

3554
    """
3555
    assert len(node_list) == 1
3556
    node = node_list[0]
3557
    _, myself = ssconf.GetMasterAndMyself()
3558
    assert node == myself
3559

    
3560
    results = self.RunHooks(hpath, phase, env)
3561

    
3562
    # Return values in the form expected by HooksMaster
3563
    return {node: (None, False, results)}
3564

    
3565
  def RunHooks(self, hpath, phase, env):
3566
    """Run the scripts in the hooks directory.
3567

3568
    @type hpath: str
3569
    @param hpath: the path to the hooks directory which
3570
        holds the scripts
3571
    @type phase: str
3572
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3573
        L{constants.HOOKS_PHASE_POST}
3574
    @type env: dict
3575
    @param env: dictionary with the environment for the hook
3576
    @rtype: list
3577
    @return: list of 3-element tuples:
3578
      - script path
3579
      - script result, either L{constants.HKR_SUCCESS} or
3580
        L{constants.HKR_FAIL}
3581
      - output of the script
3582

3583
    @raise errors.ProgrammerError: for invalid input
3584
        parameters
3585

3586
    """
3587
    if phase == constants.HOOKS_PHASE_PRE:
3588
      suffix = "pre"
3589
    elif phase == constants.HOOKS_PHASE_POST:
3590
      suffix = "post"
3591
    else:
3592
      _Fail("Unknown hooks phase '%s'", phase)
3593

    
3594
    subdir = "%s-%s.d" % (hpath, suffix)
3595
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3596

    
3597
    results = []
3598

    
3599
    if not os.path.isdir(dir_name):
3600
      # for non-existing/non-dirs, we simply exit instead of logging a
3601
      # warning at every operation
3602
      return results
3603

    
3604
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3605

    
3606
    for (relname, relstatus, runresult) in runparts_results:
3607
      if relstatus == constants.RUNPARTS_SKIP:
3608
        rrval = constants.HKR_SKIP
3609
        output = ""
3610
      elif relstatus == constants.RUNPARTS_ERR:
3611
        rrval = constants.HKR_FAIL
3612
        output = "Hook script execution error: %s" % runresult
3613
      elif relstatus == constants.RUNPARTS_RUN:
3614
        if runresult.failed:
3615
          rrval = constants.HKR_FAIL
3616
        else:
3617
          rrval = constants.HKR_SUCCESS
3618
        output = utils.SafeEncode(runresult.output.strip())
3619
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3620

    
3621
    return results
3622

    
3623

    
3624
class IAllocatorRunner(object):
3625
  """IAllocator runner.
3626

3627
  This class is instantiated on the node side (ganeti-noded) and not on
3628
  the master side.
3629

3630
  """
3631
  @staticmethod
3632
  def Run(name, idata):
3633
    """Run an iallocator script.
3634

3635
    @type name: str
3636
    @param name: the iallocator script name
3637
    @type idata: str
3638
    @param idata: the allocator input data
3639

3640
    @rtype: tuple
3641
    @return: two element tuple of:
3642
       - status
3643
       - either error message or stdout of allocator (for success)
3644

3645
    """
3646
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3647
                                  os.path.isfile)
3648
    if alloc_script is None:
3649
      _Fail("iallocator module '%s' not found in the search path", name)
3650

    
3651
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3652
    try:
3653
      os.write(fd, idata)
3654
      os.close(fd)
3655
      result = utils.RunCmd([alloc_script, fin_name])
3656
      if result.failed:
3657
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3658
              name, result.fail_reason, result.output)
3659
    finally:
3660
      os.unlink(fin_name)
3661

    
3662
    return result.stdout
3663

    
3664

    
3665
class DevCacheManager(object):
3666
  """Simple class for managing a cache of block device information.
3667

3668
  """
3669
  _DEV_PREFIX = "/dev/"
3670
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3671

    
3672
  @classmethod
3673
  def _ConvertPath(cls, dev_path):
3674
    """Converts a /dev/name path to the cache file name.
3675

3676
    This replaces slashes with underscores and strips the /dev
3677
    prefix. It then returns the full path to the cache file.
3678

3679
    @type dev_path: str
3680
    @param dev_path: the C{/dev/} path name
3681
    @rtype: str
3682
    @return: the converted path name
3683

3684
    """
3685
    if dev_path.startswith(cls._DEV_PREFIX):
3686
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3687
    dev_path = dev_path.replace("/", "_")
3688
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3689
    return fpath
3690

    
3691
  @classmethod
3692
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3693
    """Updates the cache information for a given device.
3694

3695
    @type dev_path: str
3696
    @param dev_path: the pathname of the device
3697
    @type owner: str
3698
    @param owner: the owner (instance name) of the device
3699
    @type on_primary: bool
3700
    @param on_primary: whether this is the primary
3701
        node nor not
3702
    @type iv_name: str
3703
    @param iv_name: the instance-visible name of the
3704
        device, as in objects.Disk.iv_name
3705

3706
    @rtype: None
3707

3708
    """
3709
    if dev_path is None:
3710
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3711
      return
3712
    fpath = cls._ConvertPath(dev_path)
3713
    if on_primary:
3714
      state = "primary"
3715
    else:
3716
      state = "secondary"
3717
    if iv_name is None:
3718
      iv_name = "not_visible"
3719
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3720
    try:
3721
      utils.WriteFile(fpath, data=fdata)
3722
    except EnvironmentError, err:
3723
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3724

    
3725
  @classmethod
3726
  def RemoveCache(cls, dev_path):
3727
    """Remove data for a dev_path.
3728

3729
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3730
    path name and logging.
3731

3732
    @type dev_path: str
3733
    @param dev_path: the pathname of the device
3734

3735
    @rtype: None
3736

3737
    """
3738
    if dev_path is None:
3739
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3740
      return
3741
    fpath = cls._ConvertPath(dev_path)
3742
    try:
3743
      utils.RemoveFile(fpath)
3744
    except EnvironmentError, err:
3745
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)