Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 5a8648eb

History | View | Annotate | Download (108.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63

    
64

    
65
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66
_ALLOWED_CLEAN_DIRS = frozenset([
67
  constants.DATA_DIR,
68
  constants.JOB_QUEUE_ARCHIVE_DIR,
69
  constants.QUEUE_DIR,
70
  constants.CRYPTO_KEYS_DIR,
71
  ])
72
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73
_X509_KEY_FILE = "key"
74
_X509_CERT_FILE = "cert"
75
_IES_STATUS_FILE = "status"
76
_IES_PID_FILE = "pid"
77
_IES_CA_FILE = "ca"
78

    
79
#: Valid LVS output line regex
80
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81

    
82

    
83
class RPCFail(Exception):
84
  """Class denoting RPC failure.
85

86
  Its argument is the error message.
87

88
  """
89

    
90

    
91
def _Fail(msg, *args, **kwargs):
92
  """Log an error and the raise an RPCFail exception.
93

94
  This exception is then handled specially in the ganeti daemon and
95
  turned into a 'failed' return type. As such, this function is a
96
  useful shortcut for logging the error and returning it to the master
97
  daemon.
98

99
  @type msg: string
100
  @param msg: the text of the exception
101
  @raise RPCFail
102

103
  """
104
  if args:
105
    msg = msg % args
106
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
107
    if "exc" in kwargs and kwargs["exc"]:
108
      logging.exception(msg)
109
    else:
110
      logging.error(msg)
111
  raise RPCFail(msg)
112

    
113

    
114
def _GetConfig():
115
  """Simple wrapper to return a SimpleStore.
116

117
  @rtype: L{ssconf.SimpleStore}
118
  @return: a SimpleStore instance
119

120
  """
121
  return ssconf.SimpleStore()
122

    
123

    
124
def _GetSshRunner(cluster_name):
125
  """Simple wrapper to return an SshRunner.
126

127
  @type cluster_name: str
128
  @param cluster_name: the cluster name, which is needed
129
      by the SshRunner constructor
130
  @rtype: L{ssh.SshRunner}
131
  @return: an SshRunner instance
132

133
  """
134
  return ssh.SshRunner(cluster_name)
135

    
136

    
137
def _Decompress(data):
138
  """Unpacks data compressed by the RPC client.
139

140
  @type data: list or tuple
141
  @param data: Data sent by RPC client
142
  @rtype: str
143
  @return: Decompressed data
144

145
  """
146
  assert isinstance(data, (list, tuple))
147
  assert len(data) == 2
148
  (encoding, content) = data
149
  if encoding == constants.RPC_ENCODING_NONE:
150
    return content
151
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152
    return zlib.decompress(base64.b64decode(content))
153
  else:
154
    raise AssertionError("Unknown data encoding")
155

    
156

    
157
def _CleanDirectory(path, exclude=None):
158
  """Removes all regular files in a directory.
159

160
  @type path: str
161
  @param path: the directory to clean
162
  @type exclude: list
163
  @param exclude: list of files to be excluded, defaults
164
      to the empty list
165

166
  """
167
  if path not in _ALLOWED_CLEAN_DIRS:
168
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169
          path)
170

    
171
  if not os.path.isdir(path):
172
    return
173
  if exclude is None:
174
    exclude = []
175
  else:
176
    # Normalize excluded paths
177
    exclude = [os.path.normpath(i) for i in exclude]
178

    
179
  for rel_name in utils.ListVisibleFiles(path):
180
    full_name = utils.PathJoin(path, rel_name)
181
    if full_name in exclude:
182
      continue
183
    if os.path.isfile(full_name) and not os.path.islink(full_name):
184
      utils.RemoveFile(full_name)
185

    
186

    
187
def _BuildUploadFileList():
188
  """Build the list of allowed upload files.
189

190
  This is abstracted so that it's built only once at module import time.
191

192
  """
193
  allowed_files = set([
194
    constants.CLUSTER_CONF_FILE,
195
    constants.ETC_HOSTS,
196
    constants.SSH_KNOWN_HOSTS_FILE,
197
    constants.VNC_PASSWORD_FILE,
198
    constants.RAPI_CERT_FILE,
199
    constants.SPICE_CERT_FILE,
200
    constants.SPICE_CACERT_FILE,
201
    constants.RAPI_USERS_FILE,
202
    constants.CONFD_HMAC_KEY,
203
    constants.CLUSTER_DOMAIN_SECRET_FILE,
204
    ])
205

    
206
  for hv_name in constants.HYPER_TYPES:
207
    hv_class = hypervisor.GetHypervisorClass(hv_name)
208
    allowed_files.update(hv_class.GetAncillaryFiles())
209

    
210
  return frozenset(allowed_files)
211

    
212

    
213
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
214

    
215

    
216
def JobQueuePurge():
217
  """Removes job queue files and archived jobs.
218

219
  @rtype: tuple
220
  @return: True, None
221

222
  """
223
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
225

    
226

    
227
def GetMasterInfo():
228
  """Returns master information.
229

230
  This is an utility function to compute master information, either
231
  for consumption here or from the node daemon.
232

233
  @rtype: tuple
234
  @return: master_netdev, master_ip, master_netmask, master_name,
235
    primary_ip_family
236
  @raise RPCFail: in case of errors
237

238
  """
239
  try:
240
    cfg = _GetConfig()
241
    master_netdev = cfg.GetMasterNetdev()
242
    master_ip = cfg.GetMasterIP()
243
    master_netmask = cfg.GetMasterNetmask()
244
    master_node = cfg.GetMasterNode()
245
    primary_ip_family = cfg.GetPrimaryIPFamily()
246
  except errors.ConfigurationError, err:
247
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
248
  return (master_netdev, master_ip, master_netmask, master_node,
249
          primary_ip_family)
250

    
251

    
252
def ActivateMasterIp():
253
  """Activate the IP address of the master daemon.
254

255
  """
256
  # GetMasterInfo will raise an exception if not able to return data
257
  master_netdev, master_ip, master_netmask, _, family = GetMasterInfo()
258

    
259
  err_msg = None
260
  if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261
    if netutils.IPAddress.Own(master_ip):
262
      # we already have the ip:
263
      logging.debug("Master IP already configured, doing nothing")
264
    else:
265
      err_msg = "Someone else has the master ip, not activating"
266
      logging.error(err_msg)
267
  else:
268
    ipcls = netutils.IP4Address
269
    if family == netutils.IP6Address.family:
270
      ipcls = netutils.IP6Address
271

    
272
    result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
273
                           "%s/%s" % (master_ip, master_netmask),
274
                           "dev", master_netdev, "label",
275
                           "%s:0" % master_netdev])
276
    if result.failed:
277
      err_msg = "Can't activate master IP: %s" % result.output
278
      logging.error(err_msg)
279

    
280
    # we ignore the exit code of the following cmds
281
    if ipcls == netutils.IP4Address:
282
      utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
283
                    master_ip, master_ip])
284
    elif ipcls == netutils.IP6Address:
285
      try:
286
        utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
287
      except errors.OpExecError:
288
        # TODO: Better error reporting
289
        logging.warning("Can't execute ndisc6, please install if missing")
290

    
291
  if err_msg:
292
    _Fail(err_msg)
293

    
294

    
295
def StartMasterDaemons(no_voting):
296
  """Activate local node as master node.
297

298
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
299

300
  @type no_voting: boolean
301
  @param no_voting: whether to start ganeti-masterd without a node vote
302
      but still non-interactively
303
  @rtype: None
304

305
  """
306

    
307
  if no_voting:
308
    masterd_args = "--no-voting --yes-do-it"
309
  else:
310
    masterd_args = ""
311

    
312
  env = {
313
    "EXTRA_MASTERD_ARGS": masterd_args,
314
    }
315

    
316
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
317
  if result.failed:
318
    msg = "Can't start Ganeti master: %s" % result.output
319
    logging.error(msg)
320
    _Fail(msg)
321

    
322

    
323
def DeactivateMasterIp():
324
  """Deactivate the master IP on this node.
325

326
  """
327
  # TODO: log and report back to the caller the error failures; we
328
  # need to decide in which case we fail the RPC for this
329

    
330
  # GetMasterInfo will raise an exception if not able to return data
331
  master_netdev, master_ip, master_netmask, _, _ = GetMasterInfo()
332

    
333
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
334
                         "%s/%s" % (master_ip, master_netmask),
335
                         "dev", master_netdev])
336
  if result.failed:
337
    logging.error("Can't remove the master IP, error: %s", result.output)
338
    # but otherwise ignore the failure
339

    
340

    
341
def StopMasterDaemons():
342
  """Stop the master daemons on this node.
343

344
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
345

346
  @rtype: None
347

348
  """
349
  # TODO: log and report back to the caller the error failures; we
350
  # need to decide in which case we fail the RPC for this
351

    
352
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
353
  if result.failed:
354
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
355
                  " and error %s",
356
                  result.cmd, result.exit_code, result.output)
357

    
358

    
359
def ChangeMasterNetmask(netmask):
360
  """Change the netmask of the master IP.
361

362
  """
363
  master_netdev, master_ip, old_netmask, _, _ = GetMasterInfo()
364
  if old_netmask == netmask:
365
    return
366

    
367
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
368
                         "%s/%s" % (master_ip, netmask),
369
                         "dev", master_netdev, "label",
370
                         "%s:0" % master_netdev])
371
  if result.failed:
372
    _Fail("Could not change the master IP netmask")
373

    
374
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
375
                         "%s/%s" % (master_ip, old_netmask),
376
                         "dev", master_netdev, "label",
377
                         "%s:0" % master_netdev])
378
  if result.failed:
379
    _Fail("Could not change the master IP netmask")
380

    
381

    
382
def EtcHostsModify(mode, host, ip):
383
  """Modify a host entry in /etc/hosts.
384

385
  @param mode: The mode to operate. Either add or remove entry
386
  @param host: The host to operate on
387
  @param ip: The ip associated with the entry
388

389
  """
390
  if mode == constants.ETC_HOSTS_ADD:
391
    if not ip:
392
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
393
              " present")
394
    utils.AddHostToEtcHosts(host, ip)
395
  elif mode == constants.ETC_HOSTS_REMOVE:
396
    if ip:
397
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
398
              " parameter is present")
399
    utils.RemoveHostFromEtcHosts(host)
400
  else:
401
    RPCFail("Mode not supported")
402

    
403

    
404
def LeaveCluster(modify_ssh_setup):
405
  """Cleans up and remove the current node.
406

407
  This function cleans up and prepares the current node to be removed
408
  from the cluster.
409

410
  If processing is successful, then it raises an
411
  L{errors.QuitGanetiException} which is used as a special case to
412
  shutdown the node daemon.
413

414
  @param modify_ssh_setup: boolean
415

416
  """
417
  _CleanDirectory(constants.DATA_DIR)
418
  _CleanDirectory(constants.CRYPTO_KEYS_DIR)
419
  JobQueuePurge()
420

    
421
  if modify_ssh_setup:
422
    try:
423
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
424

    
425
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
426

    
427
      utils.RemoveFile(priv_key)
428
      utils.RemoveFile(pub_key)
429
    except errors.OpExecError:
430
      logging.exception("Error while processing ssh files")
431

    
432
  try:
433
    utils.RemoveFile(constants.CONFD_HMAC_KEY)
434
    utils.RemoveFile(constants.RAPI_CERT_FILE)
435
    utils.RemoveFile(constants.SPICE_CERT_FILE)
436
    utils.RemoveFile(constants.SPICE_CACERT_FILE)
437
    utils.RemoveFile(constants.NODED_CERT_FILE)
438
  except: # pylint: disable=W0702
439
    logging.exception("Error while removing cluster secrets")
440

    
441
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
442
  if result.failed:
443
    logging.error("Command %s failed with exitcode %s and error %s",
444
                  result.cmd, result.exit_code, result.output)
445

    
446
  # Raise a custom exception (handled in ganeti-noded)
447
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
448

    
449

    
450
def GetNodeInfo(vgname, hypervisor_type):
451
  """Gives back a hash with different information about the node.
452

453
  @type vgname: C{string}
454
  @param vgname: the name of the volume group to ask for disk space information
455
  @type hypervisor_type: C{str}
456
  @param hypervisor_type: the name of the hypervisor to ask for
457
      memory information
458
  @rtype: C{dict}
459
  @return: dictionary with the following keys:
460
      - vg_size is the size of the configured volume group in MiB
461
      - vg_free is the free size of the volume group in MiB
462
      - memory_dom0 is the memory allocated for domain0 in MiB
463
      - memory_free is the currently available (free) ram in MiB
464
      - memory_total is the total number of ram in MiB
465
      - hv_version: the hypervisor version, if available
466

467
  """
468
  outputarray = {}
469

    
470
  if vgname is not None:
471
    vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
472
    vg_free = vg_size = None
473
    if vginfo:
474
      vg_free = int(round(vginfo[0][0], 0))
475
      vg_size = int(round(vginfo[0][1], 0))
476
    outputarray["vg_size"] = vg_size
477
    outputarray["vg_free"] = vg_free
478

    
479
  if hypervisor_type is not None:
480
    hyper = hypervisor.GetHypervisor(hypervisor_type)
481
    hyp_info = hyper.GetNodeInfo()
482
    if hyp_info is not None:
483
      outputarray.update(hyp_info)
484

    
485
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
486

    
487
  return outputarray
488

    
489

    
490
def VerifyNode(what, cluster_name):
491
  """Verify the status of the local node.
492

493
  Based on the input L{what} parameter, various checks are done on the
494
  local node.
495

496
  If the I{filelist} key is present, this list of
497
  files is checksummed and the file/checksum pairs are returned.
498

499
  If the I{nodelist} key is present, we check that we have
500
  connectivity via ssh with the target nodes (and check the hostname
501
  report).
502

503
  If the I{node-net-test} key is present, we check that we have
504
  connectivity to the given nodes via both primary IP and, if
505
  applicable, secondary IPs.
506

507
  @type what: C{dict}
508
  @param what: a dictionary of things to check:
509
      - filelist: list of files for which to compute checksums
510
      - nodelist: list of nodes we should check ssh communication with
511
      - node-net-test: list of nodes we should check node daemon port
512
        connectivity with
513
      - hypervisor: list with hypervisors to run the verify for
514
  @rtype: dict
515
  @return: a dictionary with the same keys as the input dict, and
516
      values representing the result of the checks
517

518
  """
519
  result = {}
520
  my_name = netutils.Hostname.GetSysName()
521
  port = netutils.GetDaemonPort(constants.NODED)
522
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
523

    
524
  if constants.NV_HYPERVISOR in what and vm_capable:
525
    result[constants.NV_HYPERVISOR] = tmp = {}
526
    for hv_name in what[constants.NV_HYPERVISOR]:
527
      try:
528
        val = hypervisor.GetHypervisor(hv_name).Verify()
529
      except errors.HypervisorError, err:
530
        val = "Error while checking hypervisor: %s" % str(err)
531
      tmp[hv_name] = val
532

    
533
  if constants.NV_HVPARAMS in what and vm_capable:
534
    result[constants.NV_HVPARAMS] = tmp = []
535
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
536
      try:
537
        logging.info("Validating hv %s, %s", hv_name, hvparms)
538
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
539
      except errors.HypervisorError, err:
540
        tmp.append((source, hv_name, str(err)))
541

    
542
  if constants.NV_FILELIST in what:
543
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
544
      what[constants.NV_FILELIST])
545

    
546
  if constants.NV_NODELIST in what:
547
    (nodes, bynode) = what[constants.NV_NODELIST]
548

    
549
    # Add nodes from other groups (different for each node)
550
    try:
551
      nodes.extend(bynode[my_name])
552
    except KeyError:
553
      pass
554

    
555
    # Use a random order
556
    random.shuffle(nodes)
557

    
558
    # Try to contact all nodes
559
    val = {}
560
    for node in nodes:
561
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
562
      if not success:
563
        val[node] = message
564

    
565
    result[constants.NV_NODELIST] = val
566

    
567
  if constants.NV_NODENETTEST in what:
568
    result[constants.NV_NODENETTEST] = tmp = {}
569
    my_pip = my_sip = None
570
    for name, pip, sip in what[constants.NV_NODENETTEST]:
571
      if name == my_name:
572
        my_pip = pip
573
        my_sip = sip
574
        break
575
    if not my_pip:
576
      tmp[my_name] = ("Can't find my own primary/secondary IP"
577
                      " in the node list")
578
    else:
579
      for name, pip, sip in what[constants.NV_NODENETTEST]:
580
        fail = []
581
        if not netutils.TcpPing(pip, port, source=my_pip):
582
          fail.append("primary")
583
        if sip != pip:
584
          if not netutils.TcpPing(sip, port, source=my_sip):
585
            fail.append("secondary")
586
        if fail:
587
          tmp[name] = ("failure using the %s interface(s)" %
588
                       " and ".join(fail))
589

    
590
  if constants.NV_MASTERIP in what:
591
    # FIXME: add checks on incoming data structures (here and in the
592
    # rest of the function)
593
    master_name, master_ip = what[constants.NV_MASTERIP]
594
    if master_name == my_name:
595
      source = constants.IP4_ADDRESS_LOCALHOST
596
    else:
597
      source = None
598
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
599
                                                  source=source)
600

    
601
  if constants.NV_OOB_PATHS in what:
602
    result[constants.NV_OOB_PATHS] = tmp = []
603
    for path in what[constants.NV_OOB_PATHS]:
604
      try:
605
        st = os.stat(path)
606
      except OSError, err:
607
        tmp.append("error stating out of band helper: %s" % err)
608
      else:
609
        if stat.S_ISREG(st.st_mode):
610
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
611
            tmp.append(None)
612
          else:
613
            tmp.append("out of band helper %s is not executable" % path)
614
        else:
615
          tmp.append("out of band helper %s is not a file" % path)
616

    
617
  if constants.NV_LVLIST in what and vm_capable:
618
    try:
619
      val = GetVolumeList(utils.ListVolumeGroups().keys())
620
    except RPCFail, err:
621
      val = str(err)
622
    result[constants.NV_LVLIST] = val
623

    
624
  if constants.NV_INSTANCELIST in what and vm_capable:
625
    # GetInstanceList can fail
626
    try:
627
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
628
    except RPCFail, err:
629
      val = str(err)
630
    result[constants.NV_INSTANCELIST] = val
631

    
632
  if constants.NV_VGLIST in what and vm_capable:
633
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
634

    
635
  if constants.NV_PVLIST in what and vm_capable:
636
    result[constants.NV_PVLIST] = \
637
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
638
                                   filter_allocatable=False)
639

    
640
  if constants.NV_VERSION in what:
641
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
642
                                    constants.RELEASE_VERSION)
643

    
644
  if constants.NV_HVINFO in what and vm_capable:
645
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
646
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
647

    
648
  if constants.NV_DRBDLIST in what and vm_capable:
649
    try:
650
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
651
    except errors.BlockDeviceError, err:
652
      logging.warning("Can't get used minors list", exc_info=True)
653
      used_minors = str(err)
654
    result[constants.NV_DRBDLIST] = used_minors
655

    
656
  if constants.NV_DRBDHELPER in what and vm_capable:
657
    status = True
658
    try:
659
      payload = bdev.BaseDRBD.GetUsermodeHelper()
660
    except errors.BlockDeviceError, err:
661
      logging.error("Can't get DRBD usermode helper: %s", str(err))
662
      status = False
663
      payload = str(err)
664
    result[constants.NV_DRBDHELPER] = (status, payload)
665

    
666
  if constants.NV_NODESETUP in what:
667
    result[constants.NV_NODESETUP] = tmpr = []
668
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
669
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
670
                  " under /sys, missing required directories /sys/block"
671
                  " and /sys/class/net")
672
    if (not os.path.isdir("/proc/sys") or
673
        not os.path.isfile("/proc/sysrq-trigger")):
674
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
675
                  " under /proc, missing required directory /proc/sys and"
676
                  " the file /proc/sysrq-trigger")
677

    
678
  if constants.NV_TIME in what:
679
    result[constants.NV_TIME] = utils.SplitTime(time.time())
680

    
681
  if constants.NV_OSLIST in what and vm_capable:
682
    result[constants.NV_OSLIST] = DiagnoseOS()
683

    
684
  if constants.NV_BRIDGES in what and vm_capable:
685
    result[constants.NV_BRIDGES] = [bridge
686
                                    for bridge in what[constants.NV_BRIDGES]
687
                                    if not utils.BridgeExists(bridge)]
688
  return result
689

    
690

    
691
def GetBlockDevSizes(devices):
692
  """Return the size of the given block devices
693

694
  @type devices: list
695
  @param devices: list of block device nodes to query
696
  @rtype: dict
697
  @return:
698
    dictionary of all block devices under /dev (key). The value is their
699
    size in MiB.
700

701
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
702

703
  """
704
  DEV_PREFIX = "/dev/"
705
  blockdevs = {}
706

    
707
  for devpath in devices:
708
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
709
      continue
710

    
711
    try:
712
      st = os.stat(devpath)
713
    except EnvironmentError, err:
714
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
715
      continue
716

    
717
    if stat.S_ISBLK(st.st_mode):
718
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
719
      if result.failed:
720
        # We don't want to fail, just do not list this device as available
721
        logging.warning("Cannot get size for block device %s", devpath)
722
        continue
723

    
724
      size = int(result.stdout) / (1024 * 1024)
725
      blockdevs[devpath] = size
726
  return blockdevs
727

    
728

    
729
def GetVolumeList(vg_names):
730
  """Compute list of logical volumes and their size.
731

732
  @type vg_names: list
733
  @param vg_names: the volume groups whose LVs we should list, or
734
      empty for all volume groups
735
  @rtype: dict
736
  @return:
737
      dictionary of all partions (key) with value being a tuple of
738
      their size (in MiB), inactive and online status::
739

740
        {'xenvg/test1': ('20.06', True, True)}
741

742
      in case of errors, a string is returned with the error
743
      details.
744

745
  """
746
  lvs = {}
747
  sep = "|"
748
  if not vg_names:
749
    vg_names = []
750
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
751
                         "--separator=%s" % sep,
752
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
753
  if result.failed:
754
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
755

    
756
  for line in result.stdout.splitlines():
757
    line = line.strip()
758
    match = _LVSLINE_REGEX.match(line)
759
    if not match:
760
      logging.error("Invalid line returned from lvs output: '%s'", line)
761
      continue
762
    vg_name, name, size, attr = match.groups()
763
    inactive = attr[4] == "-"
764
    online = attr[5] == "o"
765
    virtual = attr[0] == "v"
766
    if virtual:
767
      # we don't want to report such volumes as existing, since they
768
      # don't really hold data
769
      continue
770
    lvs[vg_name + "/" + name] = (size, inactive, online)
771

    
772
  return lvs
773

    
774

    
775
def ListVolumeGroups():
776
  """List the volume groups and their size.
777

778
  @rtype: dict
779
  @return: dictionary with keys volume name and values the
780
      size of the volume
781

782
  """
783
  return utils.ListVolumeGroups()
784

    
785

    
786
def NodeVolumes():
787
  """List all volumes on this node.
788

789
  @rtype: list
790
  @return:
791
    A list of dictionaries, each having four keys:
792
      - name: the logical volume name,
793
      - size: the size of the logical volume
794
      - dev: the physical device on which the LV lives
795
      - vg: the volume group to which it belongs
796

797
    In case of errors, we return an empty list and log the
798
    error.
799

800
    Note that since a logical volume can live on multiple physical
801
    volumes, the resulting list might include a logical volume
802
    multiple times.
803

804
  """
805
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
806
                         "--separator=|",
807
                         "--options=lv_name,lv_size,devices,vg_name"])
808
  if result.failed:
809
    _Fail("Failed to list logical volumes, lvs output: %s",
810
          result.output)
811

    
812
  def parse_dev(dev):
813
    return dev.split("(")[0]
814

    
815
  def handle_dev(dev):
816
    return [parse_dev(x) for x in dev.split(",")]
817

    
818
  def map_line(line):
819
    line = [v.strip() for v in line]
820
    return [{"name": line[0], "size": line[1],
821
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
822

    
823
  all_devs = []
824
  for line in result.stdout.splitlines():
825
    if line.count("|") >= 3:
826
      all_devs.extend(map_line(line.split("|")))
827
    else:
828
      logging.warning("Strange line in the output from lvs: '%s'", line)
829
  return all_devs
830

    
831

    
832
def BridgesExist(bridges_list):
833
  """Check if a list of bridges exist on the current node.
834

835
  @rtype: boolean
836
  @return: C{True} if all of them exist, C{False} otherwise
837

838
  """
839
  missing = []
840
  for bridge in bridges_list:
841
    if not utils.BridgeExists(bridge):
842
      missing.append(bridge)
843

    
844
  if missing:
845
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
846

    
847

    
848
def GetInstanceList(hypervisor_list):
849
  """Provides a list of instances.
850

851
  @type hypervisor_list: list
852
  @param hypervisor_list: the list of hypervisors to query information
853

854
  @rtype: list
855
  @return: a list of all running instances on the current node
856
    - instance1.example.com
857
    - instance2.example.com
858

859
  """
860
  results = []
861
  for hname in hypervisor_list:
862
    try:
863
      names = hypervisor.GetHypervisor(hname).ListInstances()
864
      results.extend(names)
865
    except errors.HypervisorError, err:
866
      _Fail("Error enumerating instances (hypervisor %s): %s",
867
            hname, err, exc=True)
868

    
869
  return results
870

    
871

    
872
def GetInstanceInfo(instance, hname):
873
  """Gives back the information about an instance as a dictionary.
874

875
  @type instance: string
876
  @param instance: the instance name
877
  @type hname: string
878
  @param hname: the hypervisor type of the instance
879

880
  @rtype: dict
881
  @return: dictionary with the following keys:
882
      - memory: memory size of instance (int)
883
      - state: xen state of instance (string)
884
      - time: cpu time of instance (float)
885

886
  """
887
  output = {}
888

    
889
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
890
  if iinfo is not None:
891
    output["memory"] = iinfo[2]
892
    output["state"] = iinfo[4]
893
    output["time"] = iinfo[5]
894

    
895
  return output
896

    
897

    
898
def GetInstanceMigratable(instance):
899
  """Gives whether an instance can be migrated.
900

901
  @type instance: L{objects.Instance}
902
  @param instance: object representing the instance to be checked.
903

904
  @rtype: tuple
905
  @return: tuple of (result, description) where:
906
      - result: whether the instance can be migrated or not
907
      - description: a description of the issue, if relevant
908

909
  """
910
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
911
  iname = instance.name
912
  if iname not in hyper.ListInstances():
913
    _Fail("Instance %s is not running", iname)
914

    
915
  for idx in range(len(instance.disks)):
916
    link_name = _GetBlockDevSymlinkPath(iname, idx)
917
    if not os.path.islink(link_name):
918
      logging.warning("Instance %s is missing symlink %s for disk %d",
919
                      iname, link_name, idx)
920

    
921

    
922
def GetAllInstancesInfo(hypervisor_list):
923
  """Gather data about all instances.
924

925
  This is the equivalent of L{GetInstanceInfo}, except that it
926
  computes data for all instances at once, thus being faster if one
927
  needs data about more than one instance.
928

929
  @type hypervisor_list: list
930
  @param hypervisor_list: list of hypervisors to query for instance data
931

932
  @rtype: dict
933
  @return: dictionary of instance: data, with data having the following keys:
934
      - memory: memory size of instance (int)
935
      - state: xen state of instance (string)
936
      - time: cpu time of instance (float)
937
      - vcpus: the number of vcpus
938

939
  """
940
  output = {}
941

    
942
  for hname in hypervisor_list:
943
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
944
    if iinfo:
945
      for name, _, memory, vcpus, state, times in iinfo:
946
        value = {
947
          "memory": memory,
948
          "vcpus": vcpus,
949
          "state": state,
950
          "time": times,
951
          }
952
        if name in output:
953
          # we only check static parameters, like memory and vcpus,
954
          # and not state and time which can change between the
955
          # invocations of the different hypervisors
956
          for key in "memory", "vcpus":
957
            if value[key] != output[name][key]:
958
              _Fail("Instance %s is running twice"
959
                    " with different parameters", name)
960
        output[name] = value
961

    
962
  return output
963

    
964

    
965
def _InstanceLogName(kind, os_name, instance, component):
966
  """Compute the OS log filename for a given instance and operation.
967

968
  The instance name and os name are passed in as strings since not all
969
  operations have these as part of an instance object.
970

971
  @type kind: string
972
  @param kind: the operation type (e.g. add, import, etc.)
973
  @type os_name: string
974
  @param os_name: the os name
975
  @type instance: string
976
  @param instance: the name of the instance being imported/added/etc.
977
  @type component: string or None
978
  @param component: the name of the component of the instance being
979
      transferred
980

981
  """
982
  # TODO: Use tempfile.mkstemp to create unique filename
983
  if component:
984
    assert "/" not in component
985
    c_msg = "-%s" % component
986
  else:
987
    c_msg = ""
988
  base = ("%s-%s-%s%s-%s.log" %
989
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
990
  return utils.PathJoin(constants.LOG_OS_DIR, base)
991

    
992

    
993
def InstanceOsAdd(instance, reinstall, debug):
994
  """Add an OS to an instance.
995

996
  @type instance: L{objects.Instance}
997
  @param instance: Instance whose OS is to be installed
998
  @type reinstall: boolean
999
  @param reinstall: whether this is an instance reinstall
1000
  @type debug: integer
1001
  @param debug: debug level, passed to the OS scripts
1002
  @rtype: None
1003

1004
  """
1005
  inst_os = OSFromDisk(instance.os)
1006

    
1007
  create_env = OSEnvironment(instance, inst_os, debug)
1008
  if reinstall:
1009
    create_env["INSTANCE_REINSTALL"] = "1"
1010

    
1011
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1012

    
1013
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1014
                        cwd=inst_os.path, output=logfile, reset_env=True)
1015
  if result.failed:
1016
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1017
                  " output: %s", result.cmd, result.fail_reason, logfile,
1018
                  result.output)
1019
    lines = [utils.SafeEncode(val)
1020
             for val in utils.TailFile(logfile, lines=20)]
1021
    _Fail("OS create script failed (%s), last lines in the"
1022
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1023

    
1024

    
1025
def RunRenameInstance(instance, old_name, debug):
1026
  """Run the OS rename script for an instance.
1027

1028
  @type instance: L{objects.Instance}
1029
  @param instance: Instance whose OS is to be installed
1030
  @type old_name: string
1031
  @param old_name: previous instance name
1032
  @type debug: integer
1033
  @param debug: debug level, passed to the OS scripts
1034
  @rtype: boolean
1035
  @return: the success of the operation
1036

1037
  """
1038
  inst_os = OSFromDisk(instance.os)
1039

    
1040
  rename_env = OSEnvironment(instance, inst_os, debug)
1041
  rename_env["OLD_INSTANCE_NAME"] = old_name
1042

    
1043
  logfile = _InstanceLogName("rename", instance.os,
1044
                             "%s-%s" % (old_name, instance.name), None)
1045

    
1046
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1047
                        cwd=inst_os.path, output=logfile, reset_env=True)
1048

    
1049
  if result.failed:
1050
    logging.error("os create command '%s' returned error: %s output: %s",
1051
                  result.cmd, result.fail_reason, result.output)
1052
    lines = [utils.SafeEncode(val)
1053
             for val in utils.TailFile(logfile, lines=20)]
1054
    _Fail("OS rename script failed (%s), last lines in the"
1055
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1056

    
1057

    
1058
def _GetBlockDevSymlinkPath(instance_name, idx):
1059
  return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1060
                        (instance_name, constants.DISK_SEPARATOR, idx))
1061

    
1062

    
1063
def _SymlinkBlockDev(instance_name, device_path, idx):
1064
  """Set up symlinks to a instance's block device.
1065

1066
  This is an auxiliary function run when an instance is start (on the primary
1067
  node) or when an instance is migrated (on the target node).
1068

1069

1070
  @param instance_name: the name of the target instance
1071
  @param device_path: path of the physical block device, on the node
1072
  @param idx: the disk index
1073
  @return: absolute path to the disk's symlink
1074

1075
  """
1076
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1077
  try:
1078
    os.symlink(device_path, link_name)
1079
  except OSError, err:
1080
    if err.errno == errno.EEXIST:
1081
      if (not os.path.islink(link_name) or
1082
          os.readlink(link_name) != device_path):
1083
        os.remove(link_name)
1084
        os.symlink(device_path, link_name)
1085
    else:
1086
      raise
1087

    
1088
  return link_name
1089

    
1090

    
1091
def _RemoveBlockDevLinks(instance_name, disks):
1092
  """Remove the block device symlinks belonging to the given instance.
1093

1094
  """
1095
  for idx, _ in enumerate(disks):
1096
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1097
    if os.path.islink(link_name):
1098
      try:
1099
        os.remove(link_name)
1100
      except OSError:
1101
        logging.exception("Can't remove symlink '%s'", link_name)
1102

    
1103

    
1104
def _GatherAndLinkBlockDevs(instance):
1105
  """Set up an instance's block device(s).
1106

1107
  This is run on the primary node at instance startup. The block
1108
  devices must be already assembled.
1109

1110
  @type instance: L{objects.Instance}
1111
  @param instance: the instance whose disks we shoul assemble
1112
  @rtype: list
1113
  @return: list of (disk_object, device_path)
1114

1115
  """
1116
  block_devices = []
1117
  for idx, disk in enumerate(instance.disks):
1118
    device = _RecursiveFindBD(disk)
1119
    if device is None:
1120
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1121
                                    str(disk))
1122
    device.Open()
1123
    try:
1124
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1125
    except OSError, e:
1126
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1127
                                    e.strerror)
1128

    
1129
    block_devices.append((disk, link_name))
1130

    
1131
  return block_devices
1132

    
1133

    
1134
def StartInstance(instance, startup_paused):
1135
  """Start an instance.
1136

1137
  @type instance: L{objects.Instance}
1138
  @param instance: the instance object
1139
  @type startup_paused: bool
1140
  @param instance: pause instance at startup?
1141
  @rtype: None
1142

1143
  """
1144
  running_instances = GetInstanceList([instance.hypervisor])
1145

    
1146
  if instance.name in running_instances:
1147
    logging.info("Instance %s already running, not starting", instance.name)
1148
    return
1149

    
1150
  try:
1151
    block_devices = _GatherAndLinkBlockDevs(instance)
1152
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1153
    hyper.StartInstance(instance, block_devices, startup_paused)
1154
  except errors.BlockDeviceError, err:
1155
    _Fail("Block device error: %s", err, exc=True)
1156
  except errors.HypervisorError, err:
1157
    _RemoveBlockDevLinks(instance.name, instance.disks)
1158
    _Fail("Hypervisor error: %s", err, exc=True)
1159

    
1160

    
1161
def InstanceShutdown(instance, timeout):
1162
  """Shut an instance down.
1163

1164
  @note: this functions uses polling with a hardcoded timeout.
1165

1166
  @type instance: L{objects.Instance}
1167
  @param instance: the instance object
1168
  @type timeout: integer
1169
  @param timeout: maximum timeout for soft shutdown
1170
  @rtype: None
1171

1172
  """
1173
  hv_name = instance.hypervisor
1174
  hyper = hypervisor.GetHypervisor(hv_name)
1175
  iname = instance.name
1176

    
1177
  if instance.name not in hyper.ListInstances():
1178
    logging.info("Instance %s not running, doing nothing", iname)
1179
    return
1180

    
1181
  class _TryShutdown:
1182
    def __init__(self):
1183
      self.tried_once = False
1184

    
1185
    def __call__(self):
1186
      if iname not in hyper.ListInstances():
1187
        return
1188

    
1189
      try:
1190
        hyper.StopInstance(instance, retry=self.tried_once)
1191
      except errors.HypervisorError, err:
1192
        if iname not in hyper.ListInstances():
1193
          # if the instance is no longer existing, consider this a
1194
          # success and go to cleanup
1195
          return
1196

    
1197
        _Fail("Failed to stop instance %s: %s", iname, err)
1198

    
1199
      self.tried_once = True
1200

    
1201
      raise utils.RetryAgain()
1202

    
1203
  try:
1204
    utils.Retry(_TryShutdown(), 5, timeout)
1205
  except utils.RetryTimeout:
1206
    # the shutdown did not succeed
1207
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1208

    
1209
    try:
1210
      hyper.StopInstance(instance, force=True)
1211
    except errors.HypervisorError, err:
1212
      if iname in hyper.ListInstances():
1213
        # only raise an error if the instance still exists, otherwise
1214
        # the error could simply be "instance ... unknown"!
1215
        _Fail("Failed to force stop instance %s: %s", iname, err)
1216

    
1217
    time.sleep(1)
1218

    
1219
    if iname in hyper.ListInstances():
1220
      _Fail("Could not shutdown instance %s even by destroy", iname)
1221

    
1222
  try:
1223
    hyper.CleanupInstance(instance.name)
1224
  except errors.HypervisorError, err:
1225
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1226

    
1227
  _RemoveBlockDevLinks(iname, instance.disks)
1228

    
1229

    
1230
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1231
  """Reboot an instance.
1232

1233
  @type instance: L{objects.Instance}
1234
  @param instance: the instance object to reboot
1235
  @type reboot_type: str
1236
  @param reboot_type: the type of reboot, one the following
1237
    constants:
1238
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1239
        instance OS, do not recreate the VM
1240
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1241
        restart the VM (at the hypervisor level)
1242
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1243
        not accepted here, since that mode is handled differently, in
1244
        cmdlib, and translates into full stop and start of the
1245
        instance (instead of a call_instance_reboot RPC)
1246
  @type shutdown_timeout: integer
1247
  @param shutdown_timeout: maximum timeout for soft shutdown
1248
  @rtype: None
1249

1250
  """
1251
  running_instances = GetInstanceList([instance.hypervisor])
1252

    
1253
  if instance.name not in running_instances:
1254
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1255

    
1256
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1257
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1258
    try:
1259
      hyper.RebootInstance(instance)
1260
    except errors.HypervisorError, err:
1261
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1262
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1263
    try:
1264
      InstanceShutdown(instance, shutdown_timeout)
1265
      return StartInstance(instance, False)
1266
    except errors.HypervisorError, err:
1267
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1268
  else:
1269
    _Fail("Invalid reboot_type received: %s", reboot_type)
1270

    
1271

    
1272
def MigrationInfo(instance):
1273
  """Gather information about an instance to be migrated.
1274

1275
  @type instance: L{objects.Instance}
1276
  @param instance: the instance definition
1277

1278
  """
1279
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1280
  try:
1281
    info = hyper.MigrationInfo(instance)
1282
  except errors.HypervisorError, err:
1283
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1284
  return info
1285

    
1286

    
1287
def AcceptInstance(instance, info, target):
1288
  """Prepare the node to accept an instance.
1289

1290
  @type instance: L{objects.Instance}
1291
  @param instance: the instance definition
1292
  @type info: string/data (opaque)
1293
  @param info: migration information, from the source node
1294
  @type target: string
1295
  @param target: target host (usually ip), on this node
1296

1297
  """
1298
  # TODO: why is this required only for DTS_EXT_MIRROR?
1299
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1300
    # Create the symlinks, as the disks are not active
1301
    # in any way
1302
    try:
1303
      _GatherAndLinkBlockDevs(instance)
1304
    except errors.BlockDeviceError, err:
1305
      _Fail("Block device error: %s", err, exc=True)
1306

    
1307
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1308
  try:
1309
    hyper.AcceptInstance(instance, info, target)
1310
  except errors.HypervisorError, err:
1311
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1312
      _RemoveBlockDevLinks(instance.name, instance.disks)
1313
    _Fail("Failed to accept instance: %s", err, exc=True)
1314

    
1315

    
1316
def FinalizeMigrationDst(instance, info, success):
1317
  """Finalize any preparation to accept an instance.
1318

1319
  @type instance: L{objects.Instance}
1320
  @param instance: the instance definition
1321
  @type info: string/data (opaque)
1322
  @param info: migration information, from the source node
1323
  @type success: boolean
1324
  @param success: whether the migration was a success or a failure
1325

1326
  """
1327
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1328
  try:
1329
    hyper.FinalizeMigrationDst(instance, info, success)
1330
  except errors.HypervisorError, err:
1331
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1332

    
1333

    
1334
def MigrateInstance(instance, target, live):
1335
  """Migrates an instance to another node.
1336

1337
  @type instance: L{objects.Instance}
1338
  @param instance: the instance definition
1339
  @type target: string
1340
  @param target: the target node name
1341
  @type live: boolean
1342
  @param live: whether the migration should be done live or not (the
1343
      interpretation of this parameter is left to the hypervisor)
1344
  @raise RPCFail: if migration fails for some reason
1345

1346
  """
1347
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1348

    
1349
  try:
1350
    hyper.MigrateInstance(instance, target, live)
1351
  except errors.HypervisorError, err:
1352
    _Fail("Failed to migrate instance: %s", err, exc=True)
1353

    
1354

    
1355
def FinalizeMigrationSource(instance, success, live):
1356
  """Finalize the instance migration on the source node.
1357

1358
  @type instance: L{objects.Instance}
1359
  @param instance: the instance definition of the migrated instance
1360
  @type success: bool
1361
  @param success: whether the migration succeeded or not
1362
  @type live: bool
1363
  @param live: whether the user requested a live migration or not
1364
  @raise RPCFail: If the execution fails for some reason
1365

1366
  """
1367
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1368

    
1369
  try:
1370
    hyper.FinalizeMigrationSource(instance, success, live)
1371
  except Exception, err:  # pylint: disable=W0703
1372
    _Fail("Failed to finalize the migration on the source node: %s", err,
1373
          exc=True)
1374

    
1375

    
1376
def GetMigrationStatus(instance):
1377
  """Get the migration status
1378

1379
  @type instance: L{objects.Instance}
1380
  @param instance: the instance that is being migrated
1381
  @rtype: L{objects.MigrationStatus}
1382
  @return: the status of the current migration (one of
1383
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1384
           progress info that can be retrieved from the hypervisor
1385
  @raise RPCFail: If the migration status cannot be retrieved
1386

1387
  """
1388
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1389
  try:
1390
    return hyper.GetMigrationStatus(instance)
1391
  except Exception, err:  # pylint: disable=W0703
1392
    _Fail("Failed to get migration status: %s", err, exc=True)
1393

    
1394

    
1395
def BlockdevCreate(disk, size, owner, on_primary, info):
1396
  """Creates a block device for an instance.
1397

1398
  @type disk: L{objects.Disk}
1399
  @param disk: the object describing the disk we should create
1400
  @type size: int
1401
  @param size: the size of the physical underlying device, in MiB
1402
  @type owner: str
1403
  @param owner: the name of the instance for which disk is created,
1404
      used for device cache data
1405
  @type on_primary: boolean
1406
  @param on_primary:  indicates if it is the primary node or not
1407
  @type info: string
1408
  @param info: string that will be sent to the physical device
1409
      creation, used for example to set (LVM) tags on LVs
1410

1411
  @return: the new unique_id of the device (this can sometime be
1412
      computed only after creation), or None. On secondary nodes,
1413
      it's not required to return anything.
1414

1415
  """
1416
  # TODO: remove the obsolete "size" argument
1417
  # pylint: disable=W0613
1418
  clist = []
1419
  if disk.children:
1420
    for child in disk.children:
1421
      try:
1422
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1423
      except errors.BlockDeviceError, err:
1424
        _Fail("Can't assemble device %s: %s", child, err)
1425
      if on_primary or disk.AssembleOnSecondary():
1426
        # we need the children open in case the device itself has to
1427
        # be assembled
1428
        try:
1429
          # pylint: disable=E1103
1430
          crdev.Open()
1431
        except errors.BlockDeviceError, err:
1432
          _Fail("Can't make child '%s' read-write: %s", child, err)
1433
      clist.append(crdev)
1434

    
1435
  try:
1436
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1437
  except errors.BlockDeviceError, err:
1438
    _Fail("Can't create block device: %s", err)
1439

    
1440
  if on_primary or disk.AssembleOnSecondary():
1441
    try:
1442
      device.Assemble()
1443
    except errors.BlockDeviceError, err:
1444
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1445
    device.SetSyncSpeed(constants.SYNC_SPEED)
1446
    if on_primary or disk.OpenOnSecondary():
1447
      try:
1448
        device.Open(force=True)
1449
      except errors.BlockDeviceError, err:
1450
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1451
    DevCacheManager.UpdateCache(device.dev_path, owner,
1452
                                on_primary, disk.iv_name)
1453

    
1454
  device.SetInfo(info)
1455

    
1456
  return device.unique_id
1457

    
1458

    
1459
def _WipeDevice(path, offset, size):
1460
  """This function actually wipes the device.
1461

1462
  @param path: The path to the device to wipe
1463
  @param offset: The offset in MiB in the file
1464
  @param size: The size in MiB to write
1465

1466
  """
1467
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1468
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1469
         "count=%d" % size]
1470
  result = utils.RunCmd(cmd)
1471

    
1472
  if result.failed:
1473
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1474
          result.fail_reason, result.output)
1475

    
1476

    
1477
def BlockdevWipe(disk, offset, size):
1478
  """Wipes a block device.
1479

1480
  @type disk: L{objects.Disk}
1481
  @param disk: the disk object we want to wipe
1482
  @type offset: int
1483
  @param offset: The offset in MiB in the file
1484
  @type size: int
1485
  @param size: The size in MiB to write
1486

1487
  """
1488
  try:
1489
    rdev = _RecursiveFindBD(disk)
1490
  except errors.BlockDeviceError:
1491
    rdev = None
1492

    
1493
  if not rdev:
1494
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1495

    
1496
  # Do cross verify some of the parameters
1497
  if offset > rdev.size:
1498
    _Fail("Offset is bigger than device size")
1499
  if (offset + size) > rdev.size:
1500
    _Fail("The provided offset and size to wipe is bigger than device size")
1501

    
1502
  _WipeDevice(rdev.dev_path, offset, size)
1503

    
1504

    
1505
def BlockdevPauseResumeSync(disks, pause):
1506
  """Pause or resume the sync of the block device.
1507

1508
  @type disks: list of L{objects.Disk}
1509
  @param disks: the disks object we want to pause/resume
1510
  @type pause: bool
1511
  @param pause: Wheater to pause or resume
1512

1513
  """
1514
  success = []
1515
  for disk in disks:
1516
    try:
1517
      rdev = _RecursiveFindBD(disk)
1518
    except errors.BlockDeviceError:
1519
      rdev = None
1520

    
1521
    if not rdev:
1522
      success.append((False, ("Cannot change sync for device %s:"
1523
                              " device not found" % disk.iv_name)))
1524
      continue
1525

    
1526
    result = rdev.PauseResumeSync(pause)
1527

    
1528
    if result:
1529
      success.append((result, None))
1530
    else:
1531
      if pause:
1532
        msg = "Pause"
1533
      else:
1534
        msg = "Resume"
1535
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1536

    
1537
  return success
1538

    
1539

    
1540
def BlockdevRemove(disk):
1541
  """Remove a block device.
1542

1543
  @note: This is intended to be called recursively.
1544

1545
  @type disk: L{objects.Disk}
1546
  @param disk: the disk object we should remove
1547
  @rtype: boolean
1548
  @return: the success of the operation
1549

1550
  """
1551
  msgs = []
1552
  try:
1553
    rdev = _RecursiveFindBD(disk)
1554
  except errors.BlockDeviceError, err:
1555
    # probably can't attach
1556
    logging.info("Can't attach to device %s in remove", disk)
1557
    rdev = None
1558
  if rdev is not None:
1559
    r_path = rdev.dev_path
1560
    try:
1561
      rdev.Remove()
1562
    except errors.BlockDeviceError, err:
1563
      msgs.append(str(err))
1564
    if not msgs:
1565
      DevCacheManager.RemoveCache(r_path)
1566

    
1567
  if disk.children:
1568
    for child in disk.children:
1569
      try:
1570
        BlockdevRemove(child)
1571
      except RPCFail, err:
1572
        msgs.append(str(err))
1573

    
1574
  if msgs:
1575
    _Fail("; ".join(msgs))
1576

    
1577

    
1578
def _RecursiveAssembleBD(disk, owner, as_primary):
1579
  """Activate a block device for an instance.
1580

1581
  This is run on the primary and secondary nodes for an instance.
1582

1583
  @note: this function is called recursively.
1584

1585
  @type disk: L{objects.Disk}
1586
  @param disk: the disk we try to assemble
1587
  @type owner: str
1588
  @param owner: the name of the instance which owns the disk
1589
  @type as_primary: boolean
1590
  @param as_primary: if we should make the block device
1591
      read/write
1592

1593
  @return: the assembled device or None (in case no device
1594
      was assembled)
1595
  @raise errors.BlockDeviceError: in case there is an error
1596
      during the activation of the children or the device
1597
      itself
1598

1599
  """
1600
  children = []
1601
  if disk.children:
1602
    mcn = disk.ChildrenNeeded()
1603
    if mcn == -1:
1604
      mcn = 0 # max number of Nones allowed
1605
    else:
1606
      mcn = len(disk.children) - mcn # max number of Nones
1607
    for chld_disk in disk.children:
1608
      try:
1609
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1610
      except errors.BlockDeviceError, err:
1611
        if children.count(None) >= mcn:
1612
          raise
1613
        cdev = None
1614
        logging.error("Error in child activation (but continuing): %s",
1615
                      str(err))
1616
      children.append(cdev)
1617

    
1618
  if as_primary or disk.AssembleOnSecondary():
1619
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1620
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1621
    result = r_dev
1622
    if as_primary or disk.OpenOnSecondary():
1623
      r_dev.Open()
1624
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1625
                                as_primary, disk.iv_name)
1626

    
1627
  else:
1628
    result = True
1629
  return result
1630

    
1631

    
1632
def BlockdevAssemble(disk, owner, as_primary, idx):
1633
  """Activate a block device for an instance.
1634

1635
  This is a wrapper over _RecursiveAssembleBD.
1636

1637
  @rtype: str or boolean
1638
  @return: a C{/dev/...} path for primary nodes, and
1639
      C{True} for secondary nodes
1640

1641
  """
1642
  try:
1643
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1644
    if isinstance(result, bdev.BlockDev):
1645
      # pylint: disable=E1103
1646
      result = result.dev_path
1647
      if as_primary:
1648
        _SymlinkBlockDev(owner, result, idx)
1649
  except errors.BlockDeviceError, err:
1650
    _Fail("Error while assembling disk: %s", err, exc=True)
1651
  except OSError, err:
1652
    _Fail("Error while symlinking disk: %s", err, exc=True)
1653

    
1654
  return result
1655

    
1656

    
1657
def BlockdevShutdown(disk):
1658
  """Shut down a block device.
1659

1660
  First, if the device is assembled (Attach() is successful), then
1661
  the device is shutdown. Then the children of the device are
1662
  shutdown.
1663

1664
  This function is called recursively. Note that we don't cache the
1665
  children or such, as oppossed to assemble, shutdown of different
1666
  devices doesn't require that the upper device was active.
1667

1668
  @type disk: L{objects.Disk}
1669
  @param disk: the description of the disk we should
1670
      shutdown
1671
  @rtype: None
1672

1673
  """
1674
  msgs = []
1675
  r_dev = _RecursiveFindBD(disk)
1676
  if r_dev is not None:
1677
    r_path = r_dev.dev_path
1678
    try:
1679
      r_dev.Shutdown()
1680
      DevCacheManager.RemoveCache(r_path)
1681
    except errors.BlockDeviceError, err:
1682
      msgs.append(str(err))
1683

    
1684
  if disk.children:
1685
    for child in disk.children:
1686
      try:
1687
        BlockdevShutdown(child)
1688
      except RPCFail, err:
1689
        msgs.append(str(err))
1690

    
1691
  if msgs:
1692
    _Fail("; ".join(msgs))
1693

    
1694

    
1695
def BlockdevAddchildren(parent_cdev, new_cdevs):
1696
  """Extend a mirrored block device.
1697

1698
  @type parent_cdev: L{objects.Disk}
1699
  @param parent_cdev: the disk to which we should add children
1700
  @type new_cdevs: list of L{objects.Disk}
1701
  @param new_cdevs: the list of children which we should add
1702
  @rtype: None
1703

1704
  """
1705
  parent_bdev = _RecursiveFindBD(parent_cdev)
1706
  if parent_bdev is None:
1707
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1708
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1709
  if new_bdevs.count(None) > 0:
1710
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1711
  parent_bdev.AddChildren(new_bdevs)
1712

    
1713

    
1714
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1715
  """Shrink a mirrored block device.
1716

1717
  @type parent_cdev: L{objects.Disk}
1718
  @param parent_cdev: the disk from which we should remove children
1719
  @type new_cdevs: list of L{objects.Disk}
1720
  @param new_cdevs: the list of children which we should remove
1721
  @rtype: None
1722

1723
  """
1724
  parent_bdev = _RecursiveFindBD(parent_cdev)
1725
  if parent_bdev is None:
1726
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1727
  devs = []
1728
  for disk in new_cdevs:
1729
    rpath = disk.StaticDevPath()
1730
    if rpath is None:
1731
      bd = _RecursiveFindBD(disk)
1732
      if bd is None:
1733
        _Fail("Can't find device %s while removing children", disk)
1734
      else:
1735
        devs.append(bd.dev_path)
1736
    else:
1737
      if not utils.IsNormAbsPath(rpath):
1738
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1739
      devs.append(rpath)
1740
  parent_bdev.RemoveChildren(devs)
1741

    
1742

    
1743
def BlockdevGetmirrorstatus(disks):
1744
  """Get the mirroring status of a list of devices.
1745

1746
  @type disks: list of L{objects.Disk}
1747
  @param disks: the list of disks which we should query
1748
  @rtype: disk
1749
  @return: List of L{objects.BlockDevStatus}, one for each disk
1750
  @raise errors.BlockDeviceError: if any of the disks cannot be
1751
      found
1752

1753
  """
1754
  stats = []
1755
  for dsk in disks:
1756
    rbd = _RecursiveFindBD(dsk)
1757
    if rbd is None:
1758
      _Fail("Can't find device %s", dsk)
1759

    
1760
    stats.append(rbd.CombinedSyncStatus())
1761

    
1762
  return stats
1763

    
1764

    
1765
def BlockdevGetmirrorstatusMulti(disks):
1766
  """Get the mirroring status of a list of devices.
1767

1768
  @type disks: list of L{objects.Disk}
1769
  @param disks: the list of disks which we should query
1770
  @rtype: disk
1771
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1772
    success/failure, status is L{objects.BlockDevStatus} on success, string
1773
    otherwise
1774

1775
  """
1776
  result = []
1777
  for disk in disks:
1778
    try:
1779
      rbd = _RecursiveFindBD(disk)
1780
      if rbd is None:
1781
        result.append((False, "Can't find device %s" % disk))
1782
        continue
1783

    
1784
      status = rbd.CombinedSyncStatus()
1785
    except errors.BlockDeviceError, err:
1786
      logging.exception("Error while getting disk status")
1787
      result.append((False, str(err)))
1788
    else:
1789
      result.append((True, status))
1790

    
1791
  assert len(disks) == len(result)
1792

    
1793
  return result
1794

    
1795

    
1796
def _RecursiveFindBD(disk):
1797
  """Check if a device is activated.
1798

1799
  If so, return information about the real device.
1800

1801
  @type disk: L{objects.Disk}
1802
  @param disk: the disk object we need to find
1803

1804
  @return: None if the device can't be found,
1805
      otherwise the device instance
1806

1807
  """
1808
  children = []
1809
  if disk.children:
1810
    for chdisk in disk.children:
1811
      children.append(_RecursiveFindBD(chdisk))
1812

    
1813
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1814

    
1815

    
1816
def _OpenRealBD(disk):
1817
  """Opens the underlying block device of a disk.
1818

1819
  @type disk: L{objects.Disk}
1820
  @param disk: the disk object we want to open
1821

1822
  """
1823
  real_disk = _RecursiveFindBD(disk)
1824
  if real_disk is None:
1825
    _Fail("Block device '%s' is not set up", disk)
1826

    
1827
  real_disk.Open()
1828

    
1829
  return real_disk
1830

    
1831

    
1832
def BlockdevFind(disk):
1833
  """Check if a device is activated.
1834

1835
  If it is, return information about the real device.
1836

1837
  @type disk: L{objects.Disk}
1838
  @param disk: the disk to find
1839
  @rtype: None or objects.BlockDevStatus
1840
  @return: None if the disk cannot be found, otherwise a the current
1841
           information
1842

1843
  """
1844
  try:
1845
    rbd = _RecursiveFindBD(disk)
1846
  except errors.BlockDeviceError, err:
1847
    _Fail("Failed to find device: %s", err, exc=True)
1848

    
1849
  if rbd is None:
1850
    return None
1851

    
1852
  return rbd.GetSyncStatus()
1853

    
1854

    
1855
def BlockdevGetsize(disks):
1856
  """Computes the size of the given disks.
1857

1858
  If a disk is not found, returns None instead.
1859

1860
  @type disks: list of L{objects.Disk}
1861
  @param disks: the list of disk to compute the size for
1862
  @rtype: list
1863
  @return: list with elements None if the disk cannot be found,
1864
      otherwise the size
1865

1866
  """
1867
  result = []
1868
  for cf in disks:
1869
    try:
1870
      rbd = _RecursiveFindBD(cf)
1871
    except errors.BlockDeviceError:
1872
      result.append(None)
1873
      continue
1874
    if rbd is None:
1875
      result.append(None)
1876
    else:
1877
      result.append(rbd.GetActualSize())
1878
  return result
1879

    
1880

    
1881
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1882
  """Export a block device to a remote node.
1883

1884
  @type disk: L{objects.Disk}
1885
  @param disk: the description of the disk to export
1886
  @type dest_node: str
1887
  @param dest_node: the destination node to export to
1888
  @type dest_path: str
1889
  @param dest_path: the destination path on the target node
1890
  @type cluster_name: str
1891
  @param cluster_name: the cluster name, needed for SSH hostalias
1892
  @rtype: None
1893

1894
  """
1895
  real_disk = _OpenRealBD(disk)
1896

    
1897
  # the block size on the read dd is 1MiB to match our units
1898
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1899
                               "dd if=%s bs=1048576 count=%s",
1900
                               real_disk.dev_path, str(disk.size))
1901

    
1902
  # we set here a smaller block size as, due to ssh buffering, more
1903
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1904
  # device is not already there or we pass a wrong path; we use
1905
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1906
  # to not buffer too much memory; this means that at best, we flush
1907
  # every 64k, which will not be very fast
1908
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1909
                                " oflag=dsync", dest_path)
1910

    
1911
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1912
                                                   constants.GANETI_RUNAS,
1913
                                                   destcmd)
1914

    
1915
  # all commands have been checked, so we're safe to combine them
1916
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1917

    
1918
  result = utils.RunCmd(["bash", "-c", command])
1919

    
1920
  if result.failed:
1921
    _Fail("Disk copy command '%s' returned error: %s"
1922
          " output: %s", command, result.fail_reason, result.output)
1923

    
1924

    
1925
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1926
  """Write a file to the filesystem.
1927

1928
  This allows the master to overwrite(!) a file. It will only perform
1929
  the operation if the file belongs to a list of configuration files.
1930

1931
  @type file_name: str
1932
  @param file_name: the target file name
1933
  @type data: str
1934
  @param data: the new contents of the file
1935
  @type mode: int
1936
  @param mode: the mode to give the file (can be None)
1937
  @type uid: string
1938
  @param uid: the owner of the file
1939
  @type gid: string
1940
  @param gid: the group of the file
1941
  @type atime: float
1942
  @param atime: the atime to set on the file (can be None)
1943
  @type mtime: float
1944
  @param mtime: the mtime to set on the file (can be None)
1945
  @rtype: None
1946

1947
  """
1948
  if not os.path.isabs(file_name):
1949
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1950

    
1951
  if file_name not in _ALLOWED_UPLOAD_FILES:
1952
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1953
          file_name)
1954

    
1955
  raw_data = _Decompress(data)
1956

    
1957
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1958
    _Fail("Invalid username/groupname type")
1959

    
1960
  getents = runtime.GetEnts()
1961
  uid = getents.LookupUser(uid)
1962
  gid = getents.LookupGroup(gid)
1963

    
1964
  utils.SafeWriteFile(file_name, None,
1965
                      data=raw_data, mode=mode, uid=uid, gid=gid,
1966
                      atime=atime, mtime=mtime)
1967

    
1968

    
1969
def RunOob(oob_program, command, node, timeout):
1970
  """Executes oob_program with given command on given node.
1971

1972
  @param oob_program: The path to the executable oob_program
1973
  @param command: The command to invoke on oob_program
1974
  @param node: The node given as an argument to the program
1975
  @param timeout: Timeout after which we kill the oob program
1976

1977
  @return: stdout
1978
  @raise RPCFail: If execution fails for some reason
1979

1980
  """
1981
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1982

    
1983
  if result.failed:
1984
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1985
          result.fail_reason, result.output)
1986

    
1987
  return result.stdout
1988

    
1989

    
1990
def WriteSsconfFiles(values):
1991
  """Update all ssconf files.
1992

1993
  Wrapper around the SimpleStore.WriteFiles.
1994

1995
  """
1996
  ssconf.SimpleStore().WriteFiles(values)
1997

    
1998

    
1999
def _ErrnoOrStr(err):
2000
  """Format an EnvironmentError exception.
2001

2002
  If the L{err} argument has an errno attribute, it will be looked up
2003
  and converted into a textual C{E...} description. Otherwise the
2004
  string representation of the error will be returned.
2005

2006
  @type err: L{EnvironmentError}
2007
  @param err: the exception to format
2008

2009
  """
2010
  if hasattr(err, "errno"):
2011
    detail = errno.errorcode[err.errno]
2012
  else:
2013
    detail = str(err)
2014
  return detail
2015

    
2016

    
2017
def _OSOndiskAPIVersion(os_dir):
2018
  """Compute and return the API version of a given OS.
2019

2020
  This function will try to read the API version of the OS residing in
2021
  the 'os_dir' directory.
2022

2023
  @type os_dir: str
2024
  @param os_dir: the directory in which we should look for the OS
2025
  @rtype: tuple
2026
  @return: tuple (status, data) with status denoting the validity and
2027
      data holding either the vaid versions or an error message
2028

2029
  """
2030
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2031

    
2032
  try:
2033
    st = os.stat(api_file)
2034
  except EnvironmentError, err:
2035
    return False, ("Required file '%s' not found under path %s: %s" %
2036
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2037

    
2038
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2039
    return False, ("File '%s' in %s is not a regular file" %
2040
                   (constants.OS_API_FILE, os_dir))
2041

    
2042
  try:
2043
    api_versions = utils.ReadFile(api_file).splitlines()
2044
  except EnvironmentError, err:
2045
    return False, ("Error while reading the API version file at %s: %s" %
2046
                   (api_file, _ErrnoOrStr(err)))
2047

    
2048
  try:
2049
    api_versions = [int(version.strip()) for version in api_versions]
2050
  except (TypeError, ValueError), err:
2051
    return False, ("API version(s) can't be converted to integer: %s" %
2052
                   str(err))
2053

    
2054
  return True, api_versions
2055

    
2056

    
2057
def DiagnoseOS(top_dirs=None):
2058
  """Compute the validity for all OSes.
2059

2060
  @type top_dirs: list
2061
  @param top_dirs: the list of directories in which to
2062
      search (if not given defaults to
2063
      L{constants.OS_SEARCH_PATH})
2064
  @rtype: list of L{objects.OS}
2065
  @return: a list of tuples (name, path, status, diagnose, variants,
2066
      parameters, api_version) for all (potential) OSes under all
2067
      search paths, where:
2068
          - name is the (potential) OS name
2069
          - path is the full path to the OS
2070
          - status True/False is the validity of the OS
2071
          - diagnose is the error message for an invalid OS, otherwise empty
2072
          - variants is a list of supported OS variants, if any
2073
          - parameters is a list of (name, help) parameters, if any
2074
          - api_version is a list of support OS API versions
2075

2076
  """
2077
  if top_dirs is None:
2078
    top_dirs = constants.OS_SEARCH_PATH
2079

    
2080
  result = []
2081
  for dir_name in top_dirs:
2082
    if os.path.isdir(dir_name):
2083
      try:
2084
        f_names = utils.ListVisibleFiles(dir_name)
2085
      except EnvironmentError, err:
2086
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2087
        break
2088
      for name in f_names:
2089
        os_path = utils.PathJoin(dir_name, name)
2090
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2091
        if status:
2092
          diagnose = ""
2093
          variants = os_inst.supported_variants
2094
          parameters = os_inst.supported_parameters
2095
          api_versions = os_inst.api_versions
2096
        else:
2097
          diagnose = os_inst
2098
          variants = parameters = api_versions = []
2099
        result.append((name, os_path, status, diagnose, variants,
2100
                       parameters, api_versions))
2101

    
2102
  return result
2103

    
2104

    
2105
def _TryOSFromDisk(name, base_dir=None):
2106
  """Create an OS instance from disk.
2107

2108
  This function will return an OS instance if the given name is a
2109
  valid OS name.
2110

2111
  @type base_dir: string
2112
  @keyword base_dir: Base directory containing OS installations.
2113
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2114
  @rtype: tuple
2115
  @return: success and either the OS instance if we find a valid one,
2116
      or error message
2117

2118
  """
2119
  if base_dir is None:
2120
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2121
  else:
2122
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2123

    
2124
  if os_dir is None:
2125
    return False, "Directory for OS %s not found in search path" % name
2126

    
2127
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2128
  if not status:
2129
    # push the error up
2130
    return status, api_versions
2131

    
2132
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2133
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2134
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2135

    
2136
  # OS Files dictionary, we will populate it with the absolute path
2137
  # names; if the value is True, then it is a required file, otherwise
2138
  # an optional one
2139
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2140

    
2141
  if max(api_versions) >= constants.OS_API_V15:
2142
    os_files[constants.OS_VARIANTS_FILE] = False
2143

    
2144
  if max(api_versions) >= constants.OS_API_V20:
2145
    os_files[constants.OS_PARAMETERS_FILE] = True
2146
  else:
2147
    del os_files[constants.OS_SCRIPT_VERIFY]
2148

    
2149
  for (filename, required) in os_files.items():
2150
    os_files[filename] = utils.PathJoin(os_dir, filename)
2151

    
2152
    try:
2153
      st = os.stat(os_files[filename])
2154
    except EnvironmentError, err:
2155
      if err.errno == errno.ENOENT and not required:
2156
        del os_files[filename]
2157
        continue
2158
      return False, ("File '%s' under path '%s' is missing (%s)" %
2159
                     (filename, os_dir, _ErrnoOrStr(err)))
2160

    
2161
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2162
      return False, ("File '%s' under path '%s' is not a regular file" %
2163
                     (filename, os_dir))
2164

    
2165
    if filename in constants.OS_SCRIPTS:
2166
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2167
        return False, ("File '%s' under path '%s' is not executable" %
2168
                       (filename, os_dir))
2169

    
2170
  variants = []
2171
  if constants.OS_VARIANTS_FILE in os_files:
2172
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2173
    try:
2174
      variants = utils.ReadFile(variants_file).splitlines()
2175
    except EnvironmentError, err:
2176
      # we accept missing files, but not other errors
2177
      if err.errno != errno.ENOENT:
2178
        return False, ("Error while reading the OS variants file at %s: %s" %
2179
                       (variants_file, _ErrnoOrStr(err)))
2180

    
2181
  parameters = []
2182
  if constants.OS_PARAMETERS_FILE in os_files:
2183
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2184
    try:
2185
      parameters = utils.ReadFile(parameters_file).splitlines()
2186
    except EnvironmentError, err:
2187
      return False, ("Error while reading the OS parameters file at %s: %s" %
2188
                     (parameters_file, _ErrnoOrStr(err)))
2189
    parameters = [v.split(None, 1) for v in parameters]
2190

    
2191
  os_obj = objects.OS(name=name, path=os_dir,
2192
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2193
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2194
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2195
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2196
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2197
                                                 None),
2198
                      supported_variants=variants,
2199
                      supported_parameters=parameters,
2200
                      api_versions=api_versions)
2201
  return True, os_obj
2202

    
2203

    
2204
def OSFromDisk(name, base_dir=None):
2205
  """Create an OS instance from disk.
2206

2207
  This function will return an OS instance if the given name is a
2208
  valid OS name. Otherwise, it will raise an appropriate
2209
  L{RPCFail} exception, detailing why this is not a valid OS.
2210

2211
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2212
  an exception but returns true/false status data.
2213

2214
  @type base_dir: string
2215
  @keyword base_dir: Base directory containing OS installations.
2216
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2217
  @rtype: L{objects.OS}
2218
  @return: the OS instance if we find a valid one
2219
  @raise RPCFail: if we don't find a valid OS
2220

2221
  """
2222
  name_only = objects.OS.GetName(name)
2223
  status, payload = _TryOSFromDisk(name_only, base_dir)
2224

    
2225
  if not status:
2226
    _Fail(payload)
2227

    
2228
  return payload
2229

    
2230

    
2231
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2232
  """Calculate the basic environment for an os script.
2233

2234
  @type os_name: str
2235
  @param os_name: full operating system name (including variant)
2236
  @type inst_os: L{objects.OS}
2237
  @param inst_os: operating system for which the environment is being built
2238
  @type os_params: dict
2239
  @param os_params: the OS parameters
2240
  @type debug: integer
2241
  @param debug: debug level (0 or 1, for OS Api 10)
2242
  @rtype: dict
2243
  @return: dict of environment variables
2244
  @raise errors.BlockDeviceError: if the block device
2245
      cannot be found
2246

2247
  """
2248
  result = {}
2249
  api_version = \
2250
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2251
  result["OS_API_VERSION"] = "%d" % api_version
2252
  result["OS_NAME"] = inst_os.name
2253
  result["DEBUG_LEVEL"] = "%d" % debug
2254

    
2255
  # OS variants
2256
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2257
    variant = objects.OS.GetVariant(os_name)
2258
    if not variant:
2259
      variant = inst_os.supported_variants[0]
2260
  else:
2261
    variant = ""
2262
  result["OS_VARIANT"] = variant
2263

    
2264
  # OS params
2265
  for pname, pvalue in os_params.items():
2266
    result["OSP_%s" % pname.upper()] = pvalue
2267

    
2268
  return result
2269

    
2270

    
2271
def OSEnvironment(instance, inst_os, debug=0):
2272
  """Calculate the environment for an os script.
2273

2274
  @type instance: L{objects.Instance}
2275
  @param instance: target instance for the os script run
2276
  @type inst_os: L{objects.OS}
2277
  @param inst_os: operating system for which the environment is being built
2278
  @type debug: integer
2279
  @param debug: debug level (0 or 1, for OS Api 10)
2280
  @rtype: dict
2281
  @return: dict of environment variables
2282
  @raise errors.BlockDeviceError: if the block device
2283
      cannot be found
2284

2285
  """
2286
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2287

    
2288
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2289
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2290

    
2291
  result["HYPERVISOR"] = instance.hypervisor
2292
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2293
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2294
  result["INSTANCE_SECONDARY_NODES"] = \
2295
      ("%s" % " ".join(instance.secondary_nodes))
2296

    
2297
  # Disks
2298
  for idx, disk in enumerate(instance.disks):
2299
    real_disk = _OpenRealBD(disk)
2300
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2301
    result["DISK_%d_ACCESS" % idx] = disk.mode
2302
    if constants.HV_DISK_TYPE in instance.hvparams:
2303
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2304
        instance.hvparams[constants.HV_DISK_TYPE]
2305
    if disk.dev_type in constants.LDS_BLOCK:
2306
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2307
    elif disk.dev_type == constants.LD_FILE:
2308
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2309
        "file:%s" % disk.physical_id[0]
2310

    
2311
  # NICs
2312
  for idx, nic in enumerate(instance.nics):
2313
    result["NIC_%d_MAC" % idx] = nic.mac
2314
    if nic.ip:
2315
      result["NIC_%d_IP" % idx] = nic.ip
2316
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2317
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2318
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2319
    if nic.nicparams[constants.NIC_LINK]:
2320
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2321
    if constants.HV_NIC_TYPE in instance.hvparams:
2322
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2323
        instance.hvparams[constants.HV_NIC_TYPE]
2324

    
2325
  # HV/BE params
2326
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2327
    for key, value in source.items():
2328
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2329

    
2330
  return result
2331

    
2332

    
2333
def BlockdevGrow(disk, amount, dryrun):
2334
  """Grow a stack of block devices.
2335

2336
  This function is called recursively, with the childrens being the
2337
  first ones to resize.
2338

2339
  @type disk: L{objects.Disk}
2340
  @param disk: the disk to be grown
2341
  @type amount: integer
2342
  @param amount: the amount (in mebibytes) to grow with
2343
  @type dryrun: boolean
2344
  @param dryrun: whether to execute the operation in simulation mode
2345
      only, without actually increasing the size
2346
  @rtype: (status, result)
2347
  @return: a tuple with the status of the operation (True/False), and
2348
      the errors message if status is False
2349

2350
  """
2351
  r_dev = _RecursiveFindBD(disk)
2352
  if r_dev is None:
2353
    _Fail("Cannot find block device %s", disk)
2354

    
2355
  try:
2356
    r_dev.Grow(amount, dryrun)
2357
  except errors.BlockDeviceError, err:
2358
    _Fail("Failed to grow block device: %s", err, exc=True)
2359

    
2360

    
2361
def BlockdevSnapshot(disk):
2362
  """Create a snapshot copy of a block device.
2363

2364
  This function is called recursively, and the snapshot is actually created
2365
  just for the leaf lvm backend device.
2366

2367
  @type disk: L{objects.Disk}
2368
  @param disk: the disk to be snapshotted
2369
  @rtype: string
2370
  @return: snapshot disk ID as (vg, lv)
2371

2372
  """
2373
  if disk.dev_type == constants.LD_DRBD8:
2374
    if not disk.children:
2375
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2376
            disk.unique_id)
2377
    return BlockdevSnapshot(disk.children[0])
2378
  elif disk.dev_type == constants.LD_LV:
2379
    r_dev = _RecursiveFindBD(disk)
2380
    if r_dev is not None:
2381
      # FIXME: choose a saner value for the snapshot size
2382
      # let's stay on the safe side and ask for the full size, for now
2383
      return r_dev.Snapshot(disk.size)
2384
    else:
2385
      _Fail("Cannot find block device %s", disk)
2386
  else:
2387
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2388
          disk.unique_id, disk.dev_type)
2389

    
2390

    
2391
def FinalizeExport(instance, snap_disks):
2392
  """Write out the export configuration information.
2393

2394
  @type instance: L{objects.Instance}
2395
  @param instance: the instance which we export, used for
2396
      saving configuration
2397
  @type snap_disks: list of L{objects.Disk}
2398
  @param snap_disks: list of snapshot block devices, which
2399
      will be used to get the actual name of the dump file
2400

2401
  @rtype: None
2402

2403
  """
2404
  destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2405
  finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2406

    
2407
  config = objects.SerializableConfigParser()
2408

    
2409
  config.add_section(constants.INISECT_EXP)
2410
  config.set(constants.INISECT_EXP, "version", "0")
2411
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2412
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2413
  config.set(constants.INISECT_EXP, "os", instance.os)
2414
  config.set(constants.INISECT_EXP, "compression", "none")
2415

    
2416
  config.add_section(constants.INISECT_INS)
2417
  config.set(constants.INISECT_INS, "name", instance.name)
2418
  config.set(constants.INISECT_INS, "memory", "%d" %
2419
             instance.beparams[constants.BE_MEMORY])
2420
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2421
             instance.beparams[constants.BE_VCPUS])
2422
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2423
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2424
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2425

    
2426
  nic_total = 0
2427
  for nic_count, nic in enumerate(instance.nics):
2428
    nic_total += 1
2429
    config.set(constants.INISECT_INS, "nic%d_mac" %
2430
               nic_count, "%s" % nic.mac)
2431
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2432
    for param in constants.NICS_PARAMETER_TYPES:
2433
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2434
                 "%s" % nic.nicparams.get(param, None))
2435
  # TODO: redundant: on load can read nics until it doesn't exist
2436
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2437

    
2438
  disk_total = 0
2439
  for disk_count, disk in enumerate(snap_disks):
2440
    if disk:
2441
      disk_total += 1
2442
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2443
                 ("%s" % disk.iv_name))
2444
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2445
                 ("%s" % disk.physical_id[1]))
2446
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2447
                 ("%d" % disk.size))
2448

    
2449
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2450

    
2451
  # New-style hypervisor/backend parameters
2452

    
2453
  config.add_section(constants.INISECT_HYP)
2454
  for name, value in instance.hvparams.items():
2455
    if name not in constants.HVC_GLOBALS:
2456
      config.set(constants.INISECT_HYP, name, str(value))
2457

    
2458
  config.add_section(constants.INISECT_BEP)
2459
  for name, value in instance.beparams.items():
2460
    config.set(constants.INISECT_BEP, name, str(value))
2461

    
2462
  config.add_section(constants.INISECT_OSP)
2463
  for name, value in instance.osparams.items():
2464
    config.set(constants.INISECT_OSP, name, str(value))
2465

    
2466
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2467
                  data=config.Dumps())
2468
  shutil.rmtree(finaldestdir, ignore_errors=True)
2469
  shutil.move(destdir, finaldestdir)
2470

    
2471

    
2472
def ExportInfo(dest):
2473
  """Get export configuration information.
2474

2475
  @type dest: str
2476
  @param dest: directory containing the export
2477

2478
  @rtype: L{objects.SerializableConfigParser}
2479
  @return: a serializable config file containing the
2480
      export info
2481

2482
  """
2483
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2484

    
2485
  config = objects.SerializableConfigParser()
2486
  config.read(cff)
2487

    
2488
  if (not config.has_section(constants.INISECT_EXP) or
2489
      not config.has_section(constants.INISECT_INS)):
2490
    _Fail("Export info file doesn't have the required fields")
2491

    
2492
  return config.Dumps()
2493

    
2494

    
2495
def ListExports():
2496
  """Return a list of exports currently available on this machine.
2497

2498
  @rtype: list
2499
  @return: list of the exports
2500

2501
  """
2502
  if os.path.isdir(constants.EXPORT_DIR):
2503
    return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2504
  else:
2505
    _Fail("No exports directory")
2506

    
2507

    
2508
def RemoveExport(export):
2509
  """Remove an existing export from the node.
2510

2511
  @type export: str
2512
  @param export: the name of the export to remove
2513
  @rtype: None
2514

2515
  """
2516
  target = utils.PathJoin(constants.EXPORT_DIR, export)
2517

    
2518
  try:
2519
    shutil.rmtree(target)
2520
  except EnvironmentError, err:
2521
    _Fail("Error while removing the export: %s", err, exc=True)
2522

    
2523

    
2524
def BlockdevRename(devlist):
2525
  """Rename a list of block devices.
2526

2527
  @type devlist: list of tuples
2528
  @param devlist: list of tuples of the form  (disk,
2529
      new_logical_id, new_physical_id); disk is an
2530
      L{objects.Disk} object describing the current disk,
2531
      and new logical_id/physical_id is the name we
2532
      rename it to
2533
  @rtype: boolean
2534
  @return: True if all renames succeeded, False otherwise
2535

2536
  """
2537
  msgs = []
2538
  result = True
2539
  for disk, unique_id in devlist:
2540
    dev = _RecursiveFindBD(disk)
2541
    if dev is None:
2542
      msgs.append("Can't find device %s in rename" % str(disk))
2543
      result = False
2544
      continue
2545
    try:
2546
      old_rpath = dev.dev_path
2547
      dev.Rename(unique_id)
2548
      new_rpath = dev.dev_path
2549
      if old_rpath != new_rpath:
2550
        DevCacheManager.RemoveCache(old_rpath)
2551
        # FIXME: we should add the new cache information here, like:
2552
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2553
        # but we don't have the owner here - maybe parse from existing
2554
        # cache? for now, we only lose lvm data when we rename, which
2555
        # is less critical than DRBD or MD
2556
    except errors.BlockDeviceError, err:
2557
      msgs.append("Can't rename device '%s' to '%s': %s" %
2558
                  (dev, unique_id, err))
2559
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2560
      result = False
2561
  if not result:
2562
    _Fail("; ".join(msgs))
2563

    
2564

    
2565
def _TransformFileStorageDir(fs_dir):
2566
  """Checks whether given file_storage_dir is valid.
2567

2568
  Checks wheter the given fs_dir is within the cluster-wide default
2569
  file_storage_dir or the shared_file_storage_dir, which are stored in
2570
  SimpleStore. Only paths under those directories are allowed.
2571

2572
  @type fs_dir: str
2573
  @param fs_dir: the path to check
2574

2575
  @return: the normalized path if valid, None otherwise
2576

2577
  """
2578
  if not constants.ENABLE_FILE_STORAGE:
2579
    _Fail("File storage disabled at configure time")
2580
  cfg = _GetConfig()
2581
  fs_dir = os.path.normpath(fs_dir)
2582
  base_fstore = cfg.GetFileStorageDir()
2583
  base_shared = cfg.GetSharedFileStorageDir()
2584
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2585
          utils.IsBelowDir(base_shared, fs_dir)):
2586
    _Fail("File storage directory '%s' is not under base file"
2587
          " storage directory '%s' or shared storage directory '%s'",
2588
          fs_dir, base_fstore, base_shared)
2589
  return fs_dir
2590

    
2591

    
2592
def CreateFileStorageDir(file_storage_dir):
2593
  """Create file storage directory.
2594

2595
  @type file_storage_dir: str
2596
  @param file_storage_dir: directory to create
2597

2598
  @rtype: tuple
2599
  @return: tuple with first element a boolean indicating wheter dir
2600
      creation was successful or not
2601

2602
  """
2603
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2604
  if os.path.exists(file_storage_dir):
2605
    if not os.path.isdir(file_storage_dir):
2606
      _Fail("Specified storage dir '%s' is not a directory",
2607
            file_storage_dir)
2608
  else:
2609
    try:
2610
      os.makedirs(file_storage_dir, 0750)
2611
    except OSError, err:
2612
      _Fail("Cannot create file storage directory '%s': %s",
2613
            file_storage_dir, err, exc=True)
2614

    
2615

    
2616
def RemoveFileStorageDir(file_storage_dir):
2617
  """Remove file storage directory.
2618

2619
  Remove it only if it's empty. If not log an error and return.
2620

2621
  @type file_storage_dir: str
2622
  @param file_storage_dir: the directory we should cleanup
2623
  @rtype: tuple (success,)
2624
  @return: tuple of one element, C{success}, denoting
2625
      whether the operation was successful
2626

2627
  """
2628
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2629
  if os.path.exists(file_storage_dir):
2630
    if not os.path.isdir(file_storage_dir):
2631
      _Fail("Specified Storage directory '%s' is not a directory",
2632
            file_storage_dir)
2633
    # deletes dir only if empty, otherwise we want to fail the rpc call
2634
    try:
2635
      os.rmdir(file_storage_dir)
2636
    except OSError, err:
2637
      _Fail("Cannot remove file storage directory '%s': %s",
2638
            file_storage_dir, err)
2639

    
2640

    
2641
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2642
  """Rename the file storage directory.
2643

2644
  @type old_file_storage_dir: str
2645
  @param old_file_storage_dir: the current path
2646
  @type new_file_storage_dir: str
2647
  @param new_file_storage_dir: the name we should rename to
2648
  @rtype: tuple (success,)
2649
  @return: tuple of one element, C{success}, denoting
2650
      whether the operation was successful
2651

2652
  """
2653
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2654
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2655
  if not os.path.exists(new_file_storage_dir):
2656
    if os.path.isdir(old_file_storage_dir):
2657
      try:
2658
        os.rename(old_file_storage_dir, new_file_storage_dir)
2659
      except OSError, err:
2660
        _Fail("Cannot rename '%s' to '%s': %s",
2661
              old_file_storage_dir, new_file_storage_dir, err)
2662
    else:
2663
      _Fail("Specified storage dir '%s' is not a directory",
2664
            old_file_storage_dir)
2665
  else:
2666
    if os.path.exists(old_file_storage_dir):
2667
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2668
            old_file_storage_dir, new_file_storage_dir)
2669

    
2670

    
2671
def _EnsureJobQueueFile(file_name):
2672
  """Checks whether the given filename is in the queue directory.
2673

2674
  @type file_name: str
2675
  @param file_name: the file name we should check
2676
  @rtype: None
2677
  @raises RPCFail: if the file is not valid
2678

2679
  """
2680
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2681
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2682

    
2683
  if not result:
2684
    _Fail("Passed job queue file '%s' does not belong to"
2685
          " the queue directory '%s'", file_name, queue_dir)
2686

    
2687

    
2688
def JobQueueUpdate(file_name, content):
2689
  """Updates a file in the queue directory.
2690

2691
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2692
  checking.
2693

2694
  @type file_name: str
2695
  @param file_name: the job file name
2696
  @type content: str
2697
  @param content: the new job contents
2698
  @rtype: boolean
2699
  @return: the success of the operation
2700

2701
  """
2702
  _EnsureJobQueueFile(file_name)
2703
  getents = runtime.GetEnts()
2704

    
2705
  # Write and replace the file atomically
2706
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2707
                  gid=getents.masterd_gid)
2708

    
2709

    
2710
def JobQueueRename(old, new):
2711
  """Renames a job queue file.
2712

2713
  This is just a wrapper over os.rename with proper checking.
2714

2715
  @type old: str
2716
  @param old: the old (actual) file name
2717
  @type new: str
2718
  @param new: the desired file name
2719
  @rtype: tuple
2720
  @return: the success of the operation and payload
2721

2722
  """
2723
  _EnsureJobQueueFile(old)
2724
  _EnsureJobQueueFile(new)
2725

    
2726
  utils.RenameFile(old, new, mkdir=True)
2727

    
2728

    
2729
def BlockdevClose(instance_name, disks):
2730
  """Closes the given block devices.
2731

2732
  This means they will be switched to secondary mode (in case of
2733
  DRBD).
2734

2735
  @param instance_name: if the argument is not empty, the symlinks
2736
      of this instance will be removed
2737
  @type disks: list of L{objects.Disk}
2738
  @param disks: the list of disks to be closed
2739
  @rtype: tuple (success, message)
2740
  @return: a tuple of success and message, where success
2741
      indicates the succes of the operation, and message
2742
      which will contain the error details in case we
2743
      failed
2744

2745
  """
2746
  bdevs = []
2747
  for cf in disks:
2748
    rd = _RecursiveFindBD(cf)
2749
    if rd is None:
2750
      _Fail("Can't find device %s", cf)
2751
    bdevs.append(rd)
2752

    
2753
  msg = []
2754
  for rd in bdevs:
2755
    try:
2756
      rd.Close()
2757
    except errors.BlockDeviceError, err:
2758
      msg.append(str(err))
2759
  if msg:
2760
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2761
  else:
2762
    if instance_name:
2763
      _RemoveBlockDevLinks(instance_name, disks)
2764

    
2765

    
2766
def ValidateHVParams(hvname, hvparams):
2767
  """Validates the given hypervisor parameters.
2768

2769
  @type hvname: string
2770
  @param hvname: the hypervisor name
2771
  @type hvparams: dict
2772
  @param hvparams: the hypervisor parameters to be validated
2773
  @rtype: None
2774

2775
  """
2776
  try:
2777
    hv_type = hypervisor.GetHypervisor(hvname)
2778
    hv_type.ValidateParameters(hvparams)
2779
  except errors.HypervisorError, err:
2780
    _Fail(str(err), log=False)
2781

    
2782

    
2783
def _CheckOSPList(os_obj, parameters):
2784
  """Check whether a list of parameters is supported by the OS.
2785

2786
  @type os_obj: L{objects.OS}
2787
  @param os_obj: OS object to check
2788
  @type parameters: list
2789
  @param parameters: the list of parameters to check
2790

2791
  """
2792
  supported = [v[0] for v in os_obj.supported_parameters]
2793
  delta = frozenset(parameters).difference(supported)
2794
  if delta:
2795
    _Fail("The following parameters are not supported"
2796
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2797

    
2798

    
2799
def ValidateOS(required, osname, checks, osparams):
2800
  """Validate the given OS' parameters.
2801

2802
  @type required: boolean
2803
  @param required: whether absence of the OS should translate into
2804
      failure or not
2805
  @type osname: string
2806
  @param osname: the OS to be validated
2807
  @type checks: list
2808
  @param checks: list of the checks to run (currently only 'parameters')
2809
  @type osparams: dict
2810
  @param osparams: dictionary with OS parameters
2811
  @rtype: boolean
2812
  @return: True if the validation passed, or False if the OS was not
2813
      found and L{required} was false
2814

2815
  """
2816
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2817
    _Fail("Unknown checks required for OS %s: %s", osname,
2818
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2819

    
2820
  name_only = objects.OS.GetName(osname)
2821
  status, tbv = _TryOSFromDisk(name_only, None)
2822

    
2823
  if not status:
2824
    if required:
2825
      _Fail(tbv)
2826
    else:
2827
      return False
2828

    
2829
  if max(tbv.api_versions) < constants.OS_API_V20:
2830
    return True
2831

    
2832
  if constants.OS_VALIDATE_PARAMETERS in checks:
2833
    _CheckOSPList(tbv, osparams.keys())
2834

    
2835
  validate_env = OSCoreEnv(osname, tbv, osparams)
2836
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2837
                        cwd=tbv.path, reset_env=True)
2838
  if result.failed:
2839
    logging.error("os validate command '%s' returned error: %s output: %s",
2840
                  result.cmd, result.fail_reason, result.output)
2841
    _Fail("OS validation script failed (%s), output: %s",
2842
          result.fail_reason, result.output, log=False)
2843

    
2844
  return True
2845

    
2846

    
2847
def DemoteFromMC():
2848
  """Demotes the current node from master candidate role.
2849

2850
  """
2851
  # try to ensure we're not the master by mistake
2852
  master, myself = ssconf.GetMasterAndMyself()
2853
  if master == myself:
2854
    _Fail("ssconf status shows I'm the master node, will not demote")
2855

    
2856
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2857
  if not result.failed:
2858
    _Fail("The master daemon is running, will not demote")
2859

    
2860
  try:
2861
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2862
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2863
  except EnvironmentError, err:
2864
    if err.errno != errno.ENOENT:
2865
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2866

    
2867
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2868

    
2869

    
2870
def _GetX509Filenames(cryptodir, name):
2871
  """Returns the full paths for the private key and certificate.
2872

2873
  """
2874
  return (utils.PathJoin(cryptodir, name),
2875
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2876
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2877

    
2878

    
2879
def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2880
  """Creates a new X509 certificate for SSL/TLS.
2881

2882
  @type validity: int
2883
  @param validity: Validity in seconds
2884
  @rtype: tuple; (string, string)
2885
  @return: Certificate name and public part
2886

2887
  """
2888
  (key_pem, cert_pem) = \
2889
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2890
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
2891

    
2892
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
2893
                              prefix="x509-%s-" % utils.TimestampForFilename())
2894
  try:
2895
    name = os.path.basename(cert_dir)
2896
    assert len(name) > 5
2897

    
2898
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2899

    
2900
    utils.WriteFile(key_file, mode=0400, data=key_pem)
2901
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2902

    
2903
    # Never return private key as it shouldn't leave the node
2904
    return (name, cert_pem)
2905
  except Exception:
2906
    shutil.rmtree(cert_dir, ignore_errors=True)
2907
    raise
2908

    
2909

    
2910
def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2911
  """Removes a X509 certificate.
2912

2913
  @type name: string
2914
  @param name: Certificate name
2915

2916
  """
2917
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2918

    
2919
  utils.RemoveFile(key_file)
2920
  utils.RemoveFile(cert_file)
2921

    
2922
  try:
2923
    os.rmdir(cert_dir)
2924
  except EnvironmentError, err:
2925
    _Fail("Cannot remove certificate directory '%s': %s",
2926
          cert_dir, err)
2927

    
2928

    
2929
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2930
  """Returns the command for the requested input/output.
2931

2932
  @type instance: L{objects.Instance}
2933
  @param instance: The instance object
2934
  @param mode: Import/export mode
2935
  @param ieio: Input/output type
2936
  @param ieargs: Input/output arguments
2937

2938
  """
2939
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2940

    
2941
  env = None
2942
  prefix = None
2943
  suffix = None
2944
  exp_size = None
2945

    
2946
  if ieio == constants.IEIO_FILE:
2947
    (filename, ) = ieargs
2948

    
2949
    if not utils.IsNormAbsPath(filename):
2950
      _Fail("Path '%s' is not normalized or absolute", filename)
2951

    
2952
    real_filename = os.path.realpath(filename)
2953
    directory = os.path.dirname(real_filename)
2954

    
2955
    if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2956
      _Fail("File '%s' is not under exports directory '%s': %s",
2957
            filename, constants.EXPORT_DIR, real_filename)
2958

    
2959
    # Create directory
2960
    utils.Makedirs(directory, mode=0750)
2961

    
2962
    quoted_filename = utils.ShellQuote(filename)
2963

    
2964
    if mode == constants.IEM_IMPORT:
2965
      suffix = "> %s" % quoted_filename
2966
    elif mode == constants.IEM_EXPORT:
2967
      suffix = "< %s" % quoted_filename
2968

    
2969
      # Retrieve file size
2970
      try:
2971
        st = os.stat(filename)
2972
      except EnvironmentError, err:
2973
        logging.error("Can't stat(2) %s: %s", filename, err)
2974
      else:
2975
        exp_size = utils.BytesToMebibyte(st.st_size)
2976

    
2977
  elif ieio == constants.IEIO_RAW_DISK:
2978
    (disk, ) = ieargs
2979

    
2980
    real_disk = _OpenRealBD(disk)
2981

    
2982
    if mode == constants.IEM_IMPORT:
2983
      # we set here a smaller block size as, due to transport buffering, more
2984
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
2985
      # is not already there or we pass a wrong path; we use notrunc to no
2986
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2987
      # much memory; this means that at best, we flush every 64k, which will
2988
      # not be very fast
2989
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2990
                                    " bs=%s oflag=dsync"),
2991
                                    real_disk.dev_path,
2992
                                    str(64 * 1024))
2993

    
2994
    elif mode == constants.IEM_EXPORT:
2995
      # the block size on the read dd is 1MiB to match our units
2996
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2997
                                   real_disk.dev_path,
2998
                                   str(1024 * 1024), # 1 MB
2999
                                   str(disk.size))
3000
      exp_size = disk.size
3001

    
3002
  elif ieio == constants.IEIO_SCRIPT:
3003
    (disk, disk_index, ) = ieargs
3004

    
3005
    assert isinstance(disk_index, (int, long))
3006

    
3007
    real_disk = _OpenRealBD(disk)
3008

    
3009
    inst_os = OSFromDisk(instance.os)
3010
    env = OSEnvironment(instance, inst_os)
3011

    
3012
    if mode == constants.IEM_IMPORT:
3013
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3014
      env["IMPORT_INDEX"] = str(disk_index)
3015
      script = inst_os.import_script
3016

    
3017
    elif mode == constants.IEM_EXPORT:
3018
      env["EXPORT_DEVICE"] = real_disk.dev_path
3019
      env["EXPORT_INDEX"] = str(disk_index)
3020
      script = inst_os.export_script
3021

    
3022
    # TODO: Pass special environment only to script
3023
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3024

    
3025
    if mode == constants.IEM_IMPORT:
3026
      suffix = "| %s" % script_cmd
3027

    
3028
    elif mode == constants.IEM_EXPORT:
3029
      prefix = "%s |" % script_cmd
3030

    
3031
    # Let script predict size
3032
    exp_size = constants.IE_CUSTOM_SIZE
3033

    
3034
  else:
3035
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3036

    
3037
  return (env, prefix, suffix, exp_size)
3038

    
3039

    
3040
def _CreateImportExportStatusDir(prefix):
3041
  """Creates status directory for import/export.
3042

3043
  """
3044
  return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3045
                          prefix=("%s-%s-" %
3046
                                  (prefix, utils.TimestampForFilename())))
3047

    
3048

    
3049
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3050
                            ieio, ieioargs):
3051
  """Starts an import or export daemon.
3052

3053
  @param mode: Import/output mode
3054
  @type opts: L{objects.ImportExportOptions}
3055
  @param opts: Daemon options
3056
  @type host: string
3057
  @param host: Remote host for export (None for import)
3058
  @type port: int
3059
  @param port: Remote port for export (None for import)
3060
  @type instance: L{objects.Instance}
3061
  @param instance: Instance object
3062
  @type component: string
3063
  @param component: which part of the instance is transferred now,
3064
      e.g. 'disk/0'
3065
  @param ieio: Input/output type
3066
  @param ieioargs: Input/output arguments
3067

3068
  """
3069
  if mode == constants.IEM_IMPORT:
3070
    prefix = "import"
3071

    
3072
    if not (host is None and port is None):
3073
      _Fail("Can not specify host or port on import")
3074

    
3075
  elif mode == constants.IEM_EXPORT:
3076
    prefix = "export"
3077

    
3078
    if host is None or port is None:
3079
      _Fail("Host and port must be specified for an export")
3080

    
3081
  else:
3082
    _Fail("Invalid mode %r", mode)
3083

    
3084
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3085
    _Fail("Cluster certificate can only be used for both key and CA")
3086

    
3087
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3088
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3089

    
3090
  if opts.key_name is None:
3091
    # Use server.pem
3092
    key_path = constants.NODED_CERT_FILE
3093
    cert_path = constants.NODED_CERT_FILE
3094
    assert opts.ca_pem is None
3095
  else:
3096
    (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3097
                                                 opts.key_name)
3098
    assert opts.ca_pem is not None
3099

    
3100
  for i in [key_path, cert_path]:
3101
    if not os.path.exists(i):
3102
      _Fail("File '%s' does not exist" % i)
3103

    
3104
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3105
  try:
3106
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3107
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3108
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3109

    
3110
    if opts.ca_pem is None:
3111
      # Use server.pem
3112
      ca = utils.ReadFile(constants.NODED_CERT_FILE)
3113
    else:
3114
      ca = opts.ca_pem
3115

    
3116
    # Write CA file
3117
    utils.WriteFile(ca_file, data=ca, mode=0400)
3118

    
3119
    cmd = [
3120
      constants.IMPORT_EXPORT_DAEMON,
3121
      status_file, mode,
3122
      "--key=%s" % key_path,
3123
      "--cert=%s" % cert_path,
3124
      "--ca=%s" % ca_file,
3125
      ]
3126

    
3127
    if host:
3128
      cmd.append("--host=%s" % host)
3129

    
3130
    if port:
3131
      cmd.append("--port=%s" % port)
3132

    
3133
    if opts.ipv6:
3134
      cmd.append("--ipv6")
3135
    else:
3136
      cmd.append("--ipv4")
3137

    
3138
    if opts.compress:
3139
      cmd.append("--compress=%s" % opts.compress)
3140

    
3141
    if opts.magic:
3142
      cmd.append("--magic=%s" % opts.magic)
3143

    
3144
    if exp_size is not None:
3145
      cmd.append("--expected-size=%s" % exp_size)
3146

    
3147
    if cmd_prefix:
3148
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3149

    
3150
    if cmd_suffix:
3151
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3152

    
3153
    if mode == constants.IEM_EXPORT:
3154
      # Retry connection a few times when connecting to remote peer
3155
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3156
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3157
    elif opts.connect_timeout is not None:
3158
      assert mode == constants.IEM_IMPORT
3159
      # Overall timeout for establishing connection while listening
3160
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3161

    
3162
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3163

    
3164
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3165
    # support for receiving a file descriptor for output
3166
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3167
                      output=logfile)
3168

    
3169
    # The import/export name is simply the status directory name
3170
    return os.path.basename(status_dir)
3171

    
3172
  except Exception:
3173
    shutil.rmtree(status_dir, ignore_errors=True)
3174
    raise
3175

    
3176

    
3177
def GetImportExportStatus(names):
3178
  """Returns import/export daemon status.
3179

3180
  @type names: sequence
3181
  @param names: List of names
3182
  @rtype: List of dicts
3183
  @return: Returns a list of the state of each named import/export or None if a
3184
           status couldn't be read
3185

3186
  """
3187
  result = []
3188

    
3189
  for name in names:
3190
    status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3191
                                 _IES_STATUS_FILE)
3192

    
3193
    try:
3194
      data = utils.ReadFile(status_file)
3195
    except EnvironmentError, err:
3196
      if err.errno != errno.ENOENT:
3197
        raise
3198
      data = None
3199

    
3200
    if not data:
3201
      result.append(None)
3202
      continue
3203

    
3204
    result.append(serializer.LoadJson(data))
3205

    
3206
  return result
3207

    
3208

    
3209
def AbortImportExport(name):
3210
  """Sends SIGTERM to a running import/export daemon.
3211

3212
  """
3213
  logging.info("Abort import/export %s", name)
3214

    
3215
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3216
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3217

    
3218
  if pid:
3219
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3220
                 name, pid)
3221
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3222

    
3223

    
3224
def CleanupImportExport(name):
3225
  """Cleanup after an import or export.
3226

3227
  If the import/export daemon is still running it's killed. Afterwards the
3228
  whole status directory is removed.
3229

3230
  """
3231
  logging.info("Finalizing import/export %s", name)
3232

    
3233
  status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3234

    
3235
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3236

    
3237
  if pid:
3238
    logging.info("Import/export %s is still running with PID %s",
3239
                 name, pid)
3240
    utils.KillProcess(pid, waitpid=False)
3241

    
3242
  shutil.rmtree(status_dir, ignore_errors=True)
3243

    
3244

    
3245
def _FindDisks(nodes_ip, disks):
3246
  """Sets the physical ID on disks and returns the block devices.
3247

3248
  """
3249
  # set the correct physical ID
3250
  my_name = netutils.Hostname.GetSysName()
3251
  for cf in disks:
3252
    cf.SetPhysicalID(my_name, nodes_ip)
3253

    
3254
  bdevs = []
3255

    
3256
  for cf in disks:
3257
    rd = _RecursiveFindBD(cf)
3258
    if rd is None:
3259
      _Fail("Can't find device %s", cf)
3260
    bdevs.append(rd)
3261
  return bdevs
3262

    
3263

    
3264
def DrbdDisconnectNet(nodes_ip, disks):
3265
  """Disconnects the network on a list of drbd devices.
3266

3267
  """
3268
  bdevs = _FindDisks(nodes_ip, disks)
3269

    
3270
  # disconnect disks
3271
  for rd in bdevs:
3272
    try:
3273
      rd.DisconnectNet()
3274
    except errors.BlockDeviceError, err:
3275
      _Fail("Can't change network configuration to standalone mode: %s",
3276
            err, exc=True)
3277

    
3278

    
3279
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3280
  """Attaches the network on a list of drbd devices.
3281

3282
  """
3283
  bdevs = _FindDisks(nodes_ip, disks)
3284

    
3285
  if multimaster:
3286
    for idx, rd in enumerate(bdevs):
3287
      try:
3288
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3289
      except EnvironmentError, err:
3290
        _Fail("Can't create symlink: %s", err)
3291
  # reconnect disks, switch to new master configuration and if
3292
  # needed primary mode
3293
  for rd in bdevs:
3294
    try:
3295
      rd.AttachNet(multimaster)
3296
    except errors.BlockDeviceError, err:
3297
      _Fail("Can't change network configuration: %s", err)
3298

    
3299
  # wait until the disks are connected; we need to retry the re-attach
3300
  # if the device becomes standalone, as this might happen if the one
3301
  # node disconnects and reconnects in a different mode before the
3302
  # other node reconnects; in this case, one or both of the nodes will
3303
  # decide it has wrong configuration and switch to standalone
3304

    
3305
  def _Attach():
3306
    all_connected = True
3307

    
3308
    for rd in bdevs:
3309
      stats = rd.GetProcStatus()
3310

    
3311
      all_connected = (all_connected and
3312
                       (stats.is_connected or stats.is_in_resync))
3313

    
3314
      if stats.is_standalone:
3315
        # peer had different config info and this node became
3316
        # standalone, even though this should not happen with the
3317
        # new staged way of changing disk configs
3318
        try:
3319
          rd.AttachNet(multimaster)
3320
        except errors.BlockDeviceError, err:
3321
          _Fail("Can't change network configuration: %s", err)
3322

    
3323
    if not all_connected:
3324
      raise utils.RetryAgain()
3325

    
3326
  try:
3327
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3328
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3329
  except utils.RetryTimeout:
3330
    _Fail("Timeout in disk reconnecting")
3331

    
3332
  if multimaster:
3333
    # change to primary mode
3334
    for rd in bdevs:
3335
      try:
3336
        rd.Open()
3337
      except errors.BlockDeviceError, err:
3338
        _Fail("Can't change to primary mode: %s", err)
3339

    
3340

    
3341
def DrbdWaitSync(nodes_ip, disks):
3342
  """Wait until DRBDs have synchronized.
3343

3344
  """
3345
  def _helper(rd):
3346
    stats = rd.GetProcStatus()
3347
    if not (stats.is_connected or stats.is_in_resync):
3348
      raise utils.RetryAgain()
3349
    return stats
3350

    
3351
  bdevs = _FindDisks(nodes_ip, disks)
3352

    
3353
  min_resync = 100
3354
  alldone = True
3355
  for rd in bdevs:
3356
    try:
3357
      # poll each second for 15 seconds
3358
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3359
    except utils.RetryTimeout:
3360
      stats = rd.GetProcStatus()
3361
      # last check
3362
      if not (stats.is_connected or stats.is_in_resync):
3363
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3364
    alldone = alldone and (not stats.is_in_resync)
3365
    if stats.sync_percent is not None:
3366
      min_resync = min(min_resync, stats.sync_percent)
3367

    
3368
  return (alldone, min_resync)
3369

    
3370

    
3371
def GetDrbdUsermodeHelper():
3372
  """Returns DRBD usermode helper currently configured.
3373

3374
  """
3375
  try:
3376
    return bdev.BaseDRBD.GetUsermodeHelper()
3377
  except errors.BlockDeviceError, err:
3378
    _Fail(str(err))
3379

    
3380

    
3381
def PowercycleNode(hypervisor_type):
3382
  """Hard-powercycle the node.
3383

3384
  Because we need to return first, and schedule the powercycle in the
3385
  background, we won't be able to report failures nicely.
3386

3387
  """
3388
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3389
  try:
3390
    pid = os.fork()
3391
  except OSError:
3392
    # if we can't fork, we'll pretend that we're in the child process
3393
    pid = 0
3394
  if pid > 0:
3395
    return "Reboot scheduled in 5 seconds"
3396
  # ensure the child is running on ram
3397
  try:
3398
    utils.Mlockall()
3399
  except Exception: # pylint: disable=W0703
3400
    pass
3401
  time.sleep(5)
3402
  hyper.PowercycleNode()
3403

    
3404

    
3405
class HooksRunner(object):
3406
  """Hook runner.
3407

3408
  This class is instantiated on the node side (ganeti-noded) and not
3409
  on the master side.
3410

3411
  """
3412
  def __init__(self, hooks_base_dir=None):
3413
    """Constructor for hooks runner.
3414

3415
    @type hooks_base_dir: str or None
3416
    @param hooks_base_dir: if not None, this overrides the
3417
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
3418

3419
    """
3420
    if hooks_base_dir is None:
3421
      hooks_base_dir = constants.HOOKS_BASE_DIR
3422
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3423
    # constant
3424
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3425

    
3426
  def RunHooks(self, hpath, phase, env):
3427
    """Run the scripts in the hooks directory.
3428

3429
    @type hpath: str
3430
    @param hpath: the path to the hooks directory which
3431
        holds the scripts
3432
    @type phase: str
3433
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3434
        L{constants.HOOKS_PHASE_POST}
3435
    @type env: dict
3436
    @param env: dictionary with the environment for the hook
3437
    @rtype: list
3438
    @return: list of 3-element tuples:
3439
      - script path
3440
      - script result, either L{constants.HKR_SUCCESS} or
3441
        L{constants.HKR_FAIL}
3442
      - output of the script
3443

3444
    @raise errors.ProgrammerError: for invalid input
3445
        parameters
3446

3447
    """
3448
    if phase == constants.HOOKS_PHASE_PRE:
3449
      suffix = "pre"
3450
    elif phase == constants.HOOKS_PHASE_POST:
3451
      suffix = "post"
3452
    else:
3453
      _Fail("Unknown hooks phase '%s'", phase)
3454

    
3455
    subdir = "%s-%s.d" % (hpath, suffix)
3456
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3457

    
3458
    results = []
3459

    
3460
    if not os.path.isdir(dir_name):
3461
      # for non-existing/non-dirs, we simply exit instead of logging a
3462
      # warning at every operation
3463
      return results
3464

    
3465
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3466

    
3467
    for (relname, relstatus, runresult)  in runparts_results:
3468
      if relstatus == constants.RUNPARTS_SKIP:
3469
        rrval = constants.HKR_SKIP
3470
        output = ""
3471
      elif relstatus == constants.RUNPARTS_ERR:
3472
        rrval = constants.HKR_FAIL
3473
        output = "Hook script execution error: %s" % runresult
3474
      elif relstatus == constants.RUNPARTS_RUN:
3475
        if runresult.failed:
3476
          rrval = constants.HKR_FAIL
3477
        else:
3478
          rrval = constants.HKR_SUCCESS
3479
        output = utils.SafeEncode(runresult.output.strip())
3480
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3481

    
3482
    return results
3483

    
3484

    
3485
class IAllocatorRunner(object):
3486
  """IAllocator runner.
3487

3488
  This class is instantiated on the node side (ganeti-noded) and not on
3489
  the master side.
3490

3491
  """
3492
  @staticmethod
3493
  def Run(name, idata):
3494
    """Run an iallocator script.
3495

3496
    @type name: str
3497
    @param name: the iallocator script name
3498
    @type idata: str
3499
    @param idata: the allocator input data
3500

3501
    @rtype: tuple
3502
    @return: two element tuple of:
3503
       - status
3504
       - either error message or stdout of allocator (for success)
3505

3506
    """
3507
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3508
                                  os.path.isfile)
3509
    if alloc_script is None:
3510
      _Fail("iallocator module '%s' not found in the search path", name)
3511

    
3512
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3513
    try:
3514
      os.write(fd, idata)
3515
      os.close(fd)
3516
      result = utils.RunCmd([alloc_script, fin_name])
3517
      if result.failed:
3518
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3519
              name, result.fail_reason, result.output)
3520
    finally:
3521
      os.unlink(fin_name)
3522

    
3523
    return result.stdout
3524

    
3525

    
3526
class DevCacheManager(object):
3527
  """Simple class for managing a cache of block device information.
3528

3529
  """
3530
  _DEV_PREFIX = "/dev/"
3531
  _ROOT_DIR = constants.BDEV_CACHE_DIR
3532

    
3533
  @classmethod
3534
  def _ConvertPath(cls, dev_path):
3535
    """Converts a /dev/name path to the cache file name.
3536

3537
    This replaces slashes with underscores and strips the /dev
3538
    prefix. It then returns the full path to the cache file.
3539

3540
    @type dev_path: str
3541
    @param dev_path: the C{/dev/} path name
3542
    @rtype: str
3543
    @return: the converted path name
3544

3545
    """
3546
    if dev_path.startswith(cls._DEV_PREFIX):
3547
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3548
    dev_path = dev_path.replace("/", "_")
3549
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3550
    return fpath
3551

    
3552
  @classmethod
3553
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3554
    """Updates the cache information for a given device.
3555

3556
    @type dev_path: str
3557
    @param dev_path: the pathname of the device
3558
    @type owner: str
3559
    @param owner: the owner (instance name) of the device
3560
    @type on_primary: bool
3561
    @param on_primary: whether this is the primary
3562
        node nor not
3563
    @type iv_name: str
3564
    @param iv_name: the instance-visible name of the
3565
        device, as in objects.Disk.iv_name
3566

3567
    @rtype: None
3568

3569
    """
3570
    if dev_path is None:
3571
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3572
      return
3573
    fpath = cls._ConvertPath(dev_path)
3574
    if on_primary:
3575
      state = "primary"
3576
    else:
3577
      state = "secondary"
3578
    if iv_name is None:
3579
      iv_name = "not_visible"
3580
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3581
    try:
3582
      utils.WriteFile(fpath, data=fdata)
3583
    except EnvironmentError, err:
3584
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3585

    
3586
  @classmethod
3587
  def RemoveCache(cls, dev_path):
3588
    """Remove data for a dev_path.
3589

3590
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3591
    path name and logging.
3592

3593
    @type dev_path: str
3594
    @param dev_path: the pathname of the device
3595

3596
    @rtype: None
3597

3598
    """
3599
    if dev_path is None:
3600
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3601
      return
3602
    fpath = cls._ConvertPath(dev_path)
3603
    try:
3604
      utils.RemoveFile(fpath)
3605
    except EnvironmentError, err:
3606
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)