Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 23057d29

History | View | Annotate | Download (85.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster(modify_ssh_setup):
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  @param modify_ssh_setup: boolean
365

366
  """
367
  _CleanDirectory(constants.DATA_DIR)
368
  JobQueuePurge()
369

    
370
  if modify_ssh_setup:
371
    try:
372
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
373

    
374
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
375

    
376
      utils.RemoveFile(priv_key)
377
      utils.RemoveFile(pub_key)
378
    except errors.OpExecError:
379
      logging.exception("Error while processing ssh files")
380

    
381
  try:
382
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
383
    utils.RemoveFile(constants.RAPI_CERT_FILE)
384
    utils.RemoveFile(constants.SSL_CERT_FILE)
385
  except:
386
    logging.exception("Error while removing cluster secrets")
387

    
388
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
389

    
390
  if confd_pid:
391
    utils.KillProcess(confd_pid, timeout=2)
392

    
393
  # Raise a custom exception (handled in ganeti-noded)
394
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
395

    
396

    
397
def GetNodeInfo(vgname, hypervisor_type):
398
  """Gives back a hash with different information about the node.
399

400
  @type vgname: C{string}
401
  @param vgname: the name of the volume group to ask for disk space information
402
  @type hypervisor_type: C{str}
403
  @param hypervisor_type: the name of the hypervisor to ask for
404
      memory information
405
  @rtype: C{dict}
406
  @return: dictionary with the following keys:
407
      - vg_size is the size of the configured volume group in MiB
408
      - vg_free is the free size of the volume group in MiB
409
      - memory_dom0 is the memory allocated for domain0 in MiB
410
      - memory_free is the currently available (free) ram in MiB
411
      - memory_total is the total number of ram in MiB
412

413
  """
414
  outputarray = {}
415
  vginfo = _GetVGInfo(vgname)
416
  outputarray['vg_size'] = vginfo['vg_size']
417
  outputarray['vg_free'] = vginfo['vg_free']
418

    
419
  hyper = hypervisor.GetHypervisor(hypervisor_type)
420
  hyp_info = hyper.GetNodeInfo()
421
  if hyp_info is not None:
422
    outputarray.update(hyp_info)
423

    
424
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
425

    
426
  return outputarray
427

    
428

    
429
def VerifyNode(what, cluster_name):
430
  """Verify the status of the local node.
431

432
  Based on the input L{what} parameter, various checks are done on the
433
  local node.
434

435
  If the I{filelist} key is present, this list of
436
  files is checksummed and the file/checksum pairs are returned.
437

438
  If the I{nodelist} key is present, we check that we have
439
  connectivity via ssh with the target nodes (and check the hostname
440
  report).
441

442
  If the I{node-net-test} key is present, we check that we have
443
  connectivity to the given nodes via both primary IP and, if
444
  applicable, secondary IPs.
445

446
  @type what: C{dict}
447
  @param what: a dictionary of things to check:
448
      - filelist: list of files for which to compute checksums
449
      - nodelist: list of nodes we should check ssh communication with
450
      - node-net-test: list of nodes we should check node daemon port
451
        connectivity with
452
      - hypervisor: list with hypervisors to run the verify for
453
  @rtype: dict
454
  @return: a dictionary with the same keys as the input dict, and
455
      values representing the result of the checks
456

457
  """
458
  result = {}
459

    
460
  if constants.NV_HYPERVISOR in what:
461
    result[constants.NV_HYPERVISOR] = tmp = {}
462
    for hv_name in what[constants.NV_HYPERVISOR]:
463
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
464

    
465
  if constants.NV_FILELIST in what:
466
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
467
      what[constants.NV_FILELIST])
468

    
469
  if constants.NV_NODELIST in what:
470
    result[constants.NV_NODELIST] = tmp = {}
471
    random.shuffle(what[constants.NV_NODELIST])
472
    for node in what[constants.NV_NODELIST]:
473
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
474
      if not success:
475
        tmp[node] = message
476

    
477
  if constants.NV_NODENETTEST in what:
478
    result[constants.NV_NODENETTEST] = tmp = {}
479
    my_name = utils.HostInfo().name
480
    my_pip = my_sip = None
481
    for name, pip, sip in what[constants.NV_NODENETTEST]:
482
      if name == my_name:
483
        my_pip = pip
484
        my_sip = sip
485
        break
486
    if not my_pip:
487
      tmp[my_name] = ("Can't find my own primary/secondary IP"
488
                      " in the node list")
489
    else:
490
      port = utils.GetDaemonPort(constants.NODED)
491
      for name, pip, sip in what[constants.NV_NODENETTEST]:
492
        fail = []
493
        if not utils.TcpPing(pip, port, source=my_pip):
494
          fail.append("primary")
495
        if sip != pip:
496
          if not utils.TcpPing(sip, port, source=my_sip):
497
            fail.append("secondary")
498
        if fail:
499
          tmp[name] = ("failure using the %s interface(s)" %
500
                       " and ".join(fail))
501

    
502
  if constants.NV_LVLIST in what:
503
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
504

    
505
  if constants.NV_INSTANCELIST in what:
506
    result[constants.NV_INSTANCELIST] = GetInstanceList(
507
      what[constants.NV_INSTANCELIST])
508

    
509
  if constants.NV_VGLIST in what:
510
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
511

    
512
  if constants.NV_VERSION in what:
513
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
514
                                    constants.RELEASE_VERSION)
515

    
516
  if constants.NV_HVINFO in what:
517
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
518
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
519

    
520
  if constants.NV_DRBDLIST in what:
521
    try:
522
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
523
    except errors.BlockDeviceError, err:
524
      logging.warning("Can't get used minors list", exc_info=True)
525
      used_minors = str(err)
526
    result[constants.NV_DRBDLIST] = used_minors
527

    
528
  if constants.NV_NODESETUP in what:
529
    result[constants.NV_NODESETUP] = tmpr = []
530
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
531
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
532
                  " under /sys, missing required directories /sys/block"
533
                  " and /sys/class/net")
534
    if (not os.path.isdir("/proc/sys") or
535
        not os.path.isfile("/proc/sysrq-trigger")):
536
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
537
                  " under /proc, missing required directory /proc/sys and"
538
                  " the file /proc/sysrq-trigger")
539
  return result
540

    
541

    
542
def GetVolumeList(vg_name):
543
  """Compute list of logical volumes and their size.
544

545
  @type vg_name: str
546
  @param vg_name: the volume group whose LVs we should list
547
  @rtype: dict
548
  @return:
549
      dictionary of all partions (key) with value being a tuple of
550
      their size (in MiB), inactive and online status::
551

552
        {'test1': ('20.06', True, True)}
553

554
      in case of errors, a string is returned with the error
555
      details.
556

557
  """
558
  lvs = {}
559
  sep = '|'
560
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
561
                         "--separator=%s" % sep,
562
                         "-olv_name,lv_size,lv_attr", vg_name])
563
  if result.failed:
564
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
565

    
566
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
567
  for line in result.stdout.splitlines():
568
    line = line.strip()
569
    match = valid_line_re.match(line)
570
    if not match:
571
      logging.error("Invalid line returned from lvs output: '%s'", line)
572
      continue
573
    name, size, attr = match.groups()
574
    inactive = attr[4] == '-'
575
    online = attr[5] == 'o'
576
    virtual = attr[0] == 'v'
577
    if virtual:
578
      # we don't want to report such volumes as existing, since they
579
      # don't really hold data
580
      continue
581
    lvs[name] = (size, inactive, online)
582

    
583
  return lvs
584

    
585

    
586
def ListVolumeGroups():
587
  """List the volume groups and their size.
588

589
  @rtype: dict
590
  @return: dictionary with keys volume name and values the
591
      size of the volume
592

593
  """
594
  return utils.ListVolumeGroups()
595

    
596

    
597
def NodeVolumes():
598
  """List all volumes on this node.
599

600
  @rtype: list
601
  @return:
602
    A list of dictionaries, each having four keys:
603
      - name: the logical volume name,
604
      - size: the size of the logical volume
605
      - dev: the physical device on which the LV lives
606
      - vg: the volume group to which it belongs
607

608
    In case of errors, we return an empty list and log the
609
    error.
610

611
    Note that since a logical volume can live on multiple physical
612
    volumes, the resulting list might include a logical volume
613
    multiple times.
614

615
  """
616
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
617
                         "--separator=|",
618
                         "--options=lv_name,lv_size,devices,vg_name"])
619
  if result.failed:
620
    _Fail("Failed to list logical volumes, lvs output: %s",
621
          result.output)
622

    
623
  def parse_dev(dev):
624
    if '(' in dev:
625
      return dev.split('(')[0]
626
    else:
627
      return dev
628

    
629
  def map_line(line):
630
    return {
631
      'name': line[0].strip(),
632
      'size': line[1].strip(),
633
      'dev': parse_dev(line[2].strip()),
634
      'vg': line[3].strip(),
635
    }
636

    
637
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
638
          if line.count('|') >= 3]
639

    
640

    
641
def BridgesExist(bridges_list):
642
  """Check if a list of bridges exist on the current node.
643

644
  @rtype: boolean
645
  @return: C{True} if all of them exist, C{False} otherwise
646

647
  """
648
  missing = []
649
  for bridge in bridges_list:
650
    if not utils.BridgeExists(bridge):
651
      missing.append(bridge)
652

    
653
  if missing:
654
    _Fail("Missing bridges %s", ", ".join(missing))
655

    
656

    
657
def GetInstanceList(hypervisor_list):
658
  """Provides a list of instances.
659

660
  @type hypervisor_list: list
661
  @param hypervisor_list: the list of hypervisors to query information
662

663
  @rtype: list
664
  @return: a list of all running instances on the current node
665
    - instance1.example.com
666
    - instance2.example.com
667

668
  """
669
  results = []
670
  for hname in hypervisor_list:
671
    try:
672
      names = hypervisor.GetHypervisor(hname).ListInstances()
673
      results.extend(names)
674
    except errors.HypervisorError, err:
675
      _Fail("Error enumerating instances (hypervisor %s): %s",
676
            hname, err, exc=True)
677

    
678
  return results
679

    
680

    
681
def GetInstanceInfo(instance, hname):
682
  """Gives back the information about an instance as a dictionary.
683

684
  @type instance: string
685
  @param instance: the instance name
686
  @type hname: string
687
  @param hname: the hypervisor type of the instance
688

689
  @rtype: dict
690
  @return: dictionary with the following keys:
691
      - memory: memory size of instance (int)
692
      - state: xen state of instance (string)
693
      - time: cpu time of instance (float)
694

695
  """
696
  output = {}
697

    
698
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
699
  if iinfo is not None:
700
    output['memory'] = iinfo[2]
701
    output['state'] = iinfo[4]
702
    output['time'] = iinfo[5]
703

    
704
  return output
705

    
706

    
707
def GetInstanceMigratable(instance):
708
  """Gives whether an instance can be migrated.
709

710
  @type instance: L{objects.Instance}
711
  @param instance: object representing the instance to be checked.
712

713
  @rtype: tuple
714
  @return: tuple of (result, description) where:
715
      - result: whether the instance can be migrated or not
716
      - description: a description of the issue, if relevant
717

718
  """
719
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
720
  iname = instance.name
721
  if iname not in hyper.ListInstances():
722
    _Fail("Instance %s is not running", iname)
723

    
724
  for idx in range(len(instance.disks)):
725
    link_name = _GetBlockDevSymlinkPath(iname, idx)
726
    if not os.path.islink(link_name):
727
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
728

    
729

    
730
def GetAllInstancesInfo(hypervisor_list):
731
  """Gather data about all instances.
732

733
  This is the equivalent of L{GetInstanceInfo}, except that it
734
  computes data for all instances at once, thus being faster if one
735
  needs data about more than one instance.
736

737
  @type hypervisor_list: list
738
  @param hypervisor_list: list of hypervisors to query for instance data
739

740
  @rtype: dict
741
  @return: dictionary of instance: data, with data having the following keys:
742
      - memory: memory size of instance (int)
743
      - state: xen state of instance (string)
744
      - time: cpu time of instance (float)
745
      - vcpus: the number of vcpus
746

747
  """
748
  output = {}
749

    
750
  for hname in hypervisor_list:
751
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
752
    if iinfo:
753
      for name, _, memory, vcpus, state, times in iinfo:
754
        value = {
755
          'memory': memory,
756
          'vcpus': vcpus,
757
          'state': state,
758
          'time': times,
759
          }
760
        if name in output:
761
          # we only check static parameters, like memory and vcpus,
762
          # and not state and time which can change between the
763
          # invocations of the different hypervisors
764
          for key in 'memory', 'vcpus':
765
            if value[key] != output[name][key]:
766
              _Fail("Instance %s is running twice"
767
                    " with different parameters", name)
768
        output[name] = value
769

    
770
  return output
771

    
772

    
773
def InstanceOsAdd(instance, reinstall):
774
  """Add an OS to an instance.
775

776
  @type instance: L{objects.Instance}
777
  @param instance: Instance whose OS is to be installed
778
  @type reinstall: boolean
779
  @param reinstall: whether this is an instance reinstall
780
  @rtype: None
781

782
  """
783
  inst_os = OSFromDisk(instance.os)
784

    
785
  create_env = OSEnvironment(instance, inst_os)
786
  if reinstall:
787
    create_env['INSTANCE_REINSTALL'] = "1"
788

    
789
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
790
                                     instance.name, int(time.time()))
791

    
792
  result = utils.RunCmd([inst_os.create_script], env=create_env,
793
                        cwd=inst_os.path, output=logfile,)
794
  if result.failed:
795
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
796
                  " output: %s", result.cmd, result.fail_reason, logfile,
797
                  result.output)
798
    lines = [utils.SafeEncode(val)
799
             for val in utils.TailFile(logfile, lines=20)]
800
    _Fail("OS create script failed (%s), last lines in the"
801
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
802

    
803

    
804
def RunRenameInstance(instance, old_name):
805
  """Run the OS rename script for an instance.
806

807
  @type instance: L{objects.Instance}
808
  @param instance: Instance whose OS is to be installed
809
  @type old_name: string
810
  @param old_name: previous instance name
811
  @rtype: boolean
812
  @return: the success of the operation
813

814
  """
815
  inst_os = OSFromDisk(instance.os)
816

    
817
  rename_env = OSEnvironment(instance, inst_os)
818
  rename_env['OLD_INSTANCE_NAME'] = old_name
819

    
820
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
821
                                           old_name,
822
                                           instance.name, int(time.time()))
823

    
824
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
825
                        cwd=inst_os.path, output=logfile)
826

    
827
  if result.failed:
828
    logging.error("os create command '%s' returned error: %s output: %s",
829
                  result.cmd, result.fail_reason, result.output)
830
    lines = [utils.SafeEncode(val)
831
             for val in utils.TailFile(logfile, lines=20)]
832
    _Fail("OS rename script failed (%s), last lines in the"
833
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
834

    
835

    
836
def _GetVGInfo(vg_name):
837
  """Get information about the volume group.
838

839
  @type vg_name: str
840
  @param vg_name: the volume group which we query
841
  @rtype: dict
842
  @return:
843
    A dictionary with the following keys:
844
      - C{vg_size} is the total size of the volume group in MiB
845
      - C{vg_free} is the free size of the volume group in MiB
846
      - C{pv_count} are the number of physical disks in that VG
847

848
    If an error occurs during gathering of data, we return the same dict
849
    with keys all set to None.
850

851
  """
852
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
853

    
854
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
855
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
856

    
857
  if retval.failed:
858
    logging.error("volume group %s not present", vg_name)
859
    return retdic
860
  valarr = retval.stdout.strip().rstrip(':').split(':')
861
  if len(valarr) == 3:
862
    try:
863
      retdic = {
864
        "vg_size": int(round(float(valarr[0]), 0)),
865
        "vg_free": int(round(float(valarr[1]), 0)),
866
        "pv_count": int(valarr[2]),
867
        }
868
    except ValueError, err:
869
      logging.exception("Fail to parse vgs output: %s", err)
870
  else:
871
    logging.error("vgs output has the wrong number of fields (expected"
872
                  " three): %s", str(valarr))
873
  return retdic
874

    
875

    
876
def _GetBlockDevSymlinkPath(instance_name, idx):
877
  return os.path.join(constants.DISK_LINKS_DIR,
878
                      "%s:%d" % (instance_name, idx))
879

    
880

    
881
def _SymlinkBlockDev(instance_name, device_path, idx):
882
  """Set up symlinks to a instance's block device.
883

884
  This is an auxiliary function run when an instance is start (on the primary
885
  node) or when an instance is migrated (on the target node).
886

887

888
  @param instance_name: the name of the target instance
889
  @param device_path: path of the physical block device, on the node
890
  @param idx: the disk index
891
  @return: absolute path to the disk's symlink
892

893
  """
894
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
895
  try:
896
    os.symlink(device_path, link_name)
897
  except OSError, err:
898
    if err.errno == errno.EEXIST:
899
      if (not os.path.islink(link_name) or
900
          os.readlink(link_name) != device_path):
901
        os.remove(link_name)
902
        os.symlink(device_path, link_name)
903
    else:
904
      raise
905

    
906
  return link_name
907

    
908

    
909
def _RemoveBlockDevLinks(instance_name, disks):
910
  """Remove the block device symlinks belonging to the given instance.
911

912
  """
913
  for idx, _ in enumerate(disks):
914
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
915
    if os.path.islink(link_name):
916
      try:
917
        os.remove(link_name)
918
      except OSError:
919
        logging.exception("Can't remove symlink '%s'", link_name)
920

    
921

    
922
def _GatherAndLinkBlockDevs(instance):
923
  """Set up an instance's block device(s).
924

925
  This is run on the primary node at instance startup. The block
926
  devices must be already assembled.
927

928
  @type instance: L{objects.Instance}
929
  @param instance: the instance whose disks we shoul assemble
930
  @rtype: list
931
  @return: list of (disk_object, device_path)
932

933
  """
934
  block_devices = []
935
  for idx, disk in enumerate(instance.disks):
936
    device = _RecursiveFindBD(disk)
937
    if device is None:
938
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
939
                                    str(disk))
940
    device.Open()
941
    try:
942
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
943
    except OSError, e:
944
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
945
                                    e.strerror)
946

    
947
    block_devices.append((disk, link_name))
948

    
949
  return block_devices
950

    
951

    
952
def StartInstance(instance):
953
  """Start an instance.
954

955
  @type instance: L{objects.Instance}
956
  @param instance: the instance object
957
  @rtype: None
958

959
  """
960
  running_instances = GetInstanceList([instance.hypervisor])
961

    
962
  if instance.name in running_instances:
963
    logging.info("Instance %s already running, not starting", instance.name)
964
    return
965

    
966
  try:
967
    block_devices = _GatherAndLinkBlockDevs(instance)
968
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
969
    hyper.StartInstance(instance, block_devices)
970
  except errors.BlockDeviceError, err:
971
    _Fail("Block device error: %s", err, exc=True)
972
  except errors.HypervisorError, err:
973
    _RemoveBlockDevLinks(instance.name, instance.disks)
974
    _Fail("Hypervisor error: %s", err, exc=True)
975

    
976

    
977
def InstanceShutdown(instance, timeout):
978
  """Shut an instance down.
979

980
  @note: this functions uses polling with a hardcoded timeout.
981

982
  @type instance: L{objects.Instance}
983
  @param instance: the instance object
984
  @type timeout: integer
985
  @param timeout: maximum timeout for soft shutdown
986
  @rtype: None
987

988
  """
989
  hv_name = instance.hypervisor
990
  hyper = hypervisor.GetHypervisor(hv_name)
991
  running_instances = hyper.ListInstances()
992
  iname = instance.name
993

    
994
  if iname not in running_instances:
995
    logging.info("Instance %s not running, doing nothing", iname)
996
    return
997

    
998
  start = time.time()
999
  end = start + timeout
1000
  sleep_time = 5
1001

    
1002
  tried_once = False
1003
  while time.time() < end:
1004
    try:
1005
      hyper.StopInstance(instance, retry=tried_once)
1006
    except errors.HypervisorError, err:
1007
      if instance.name not in hyper.ListInstances():
1008
        # if the instance is no longer existing, consider this a
1009
        # success and go to cleanup
1010
        break
1011
      _Fail("Failed to stop instance %s: %s", iname, err)
1012
    tried_once = True
1013
    time.sleep(sleep_time)
1014
    if instance.name not in hyper.ListInstances():
1015
      break
1016
  else:
1017
    # the shutdown did not succeed
1018
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1019

    
1020
    try:
1021
      hyper.StopInstance(instance, force=True)
1022
    except errors.HypervisorError, err:
1023
      if instance.name in hyper.ListInstances():
1024
        # only raise an error if the instance still exists, otherwise
1025
        # the error could simply be "instance ... unknown"!
1026
        _Fail("Failed to force stop instance %s: %s", iname, err)
1027

    
1028
    time.sleep(1)
1029
    if instance.name in GetInstanceList([hv_name]):
1030
      _Fail("Could not shutdown instance %s even by destroy", iname)
1031

    
1032
  _RemoveBlockDevLinks(iname, instance.disks)
1033

    
1034

    
1035
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1036
  """Reboot an instance.
1037

1038
  @type instance: L{objects.Instance}
1039
  @param instance: the instance object to reboot
1040
  @type reboot_type: str
1041
  @param reboot_type: the type of reboot, one the following
1042
    constants:
1043
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1044
        instance OS, do not recreate the VM
1045
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1046
        restart the VM (at the hypervisor level)
1047
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1048
        not accepted here, since that mode is handled differently, in
1049
        cmdlib, and translates into full stop and start of the
1050
        instance (instead of a call_instance_reboot RPC)
1051
  @type shutdown_timeout: integer
1052
  @param shutdown_timeout: maximum timeout for soft shutdown
1053
  @rtype: None
1054

1055
  """
1056
  running_instances = GetInstanceList([instance.hypervisor])
1057

    
1058
  if instance.name not in running_instances:
1059
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1060

    
1061
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1062
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1063
    try:
1064
      hyper.RebootInstance(instance)
1065
    except errors.HypervisorError, err:
1066
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1067
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1068
    try:
1069
      InstanceShutdown(instance, shutdown_timeout)
1070
      return StartInstance(instance)
1071
    except errors.HypervisorError, err:
1072
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1073
  else:
1074
    _Fail("Invalid reboot_type received: %s", reboot_type)
1075

    
1076

    
1077
def MigrationInfo(instance):
1078
  """Gather information about an instance to be migrated.
1079

1080
  @type instance: L{objects.Instance}
1081
  @param instance: the instance definition
1082

1083
  """
1084
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1085
  try:
1086
    info = hyper.MigrationInfo(instance)
1087
  except errors.HypervisorError, err:
1088
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1089
  return info
1090

    
1091

    
1092
def AcceptInstance(instance, info, target):
1093
  """Prepare the node to accept an instance.
1094

1095
  @type instance: L{objects.Instance}
1096
  @param instance: the instance definition
1097
  @type info: string/data (opaque)
1098
  @param info: migration information, from the source node
1099
  @type target: string
1100
  @param target: target host (usually ip), on this node
1101

1102
  """
1103
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1104
  try:
1105
    hyper.AcceptInstance(instance, info, target)
1106
  except errors.HypervisorError, err:
1107
    _Fail("Failed to accept instance: %s", err, exc=True)
1108

    
1109

    
1110
def FinalizeMigration(instance, info, success):
1111
  """Finalize any preparation to accept an instance.
1112

1113
  @type instance: L{objects.Instance}
1114
  @param instance: the instance definition
1115
  @type info: string/data (opaque)
1116
  @param info: migration information, from the source node
1117
  @type success: boolean
1118
  @param success: whether the migration was a success or a failure
1119

1120
  """
1121
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1122
  try:
1123
    hyper.FinalizeMigration(instance, info, success)
1124
  except errors.HypervisorError, err:
1125
    _Fail("Failed to finalize migration: %s", err, exc=True)
1126

    
1127

    
1128
def MigrateInstance(instance, target, live):
1129
  """Migrates an instance to another node.
1130

1131
  @type instance: L{objects.Instance}
1132
  @param instance: the instance definition
1133
  @type target: string
1134
  @param target: the target node name
1135
  @type live: boolean
1136
  @param live: whether the migration should be done live or not (the
1137
      interpretation of this parameter is left to the hypervisor)
1138
  @rtype: tuple
1139
  @return: a tuple of (success, msg) where:
1140
      - succes is a boolean denoting the success/failure of the operation
1141
      - msg is a string with details in case of failure
1142

1143
  """
1144
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1145

    
1146
  try:
1147
    hyper.MigrateInstance(instance.name, target, live)
1148
  except errors.HypervisorError, err:
1149
    _Fail("Failed to migrate instance: %s", err, exc=True)
1150

    
1151

    
1152
def BlockdevCreate(disk, size, owner, on_primary, info):
1153
  """Creates a block device for an instance.
1154

1155
  @type disk: L{objects.Disk}
1156
  @param disk: the object describing the disk we should create
1157
  @type size: int
1158
  @param size: the size of the physical underlying device, in MiB
1159
  @type owner: str
1160
  @param owner: the name of the instance for which disk is created,
1161
      used for device cache data
1162
  @type on_primary: boolean
1163
  @param on_primary:  indicates if it is the primary node or not
1164
  @type info: string
1165
  @param info: string that will be sent to the physical device
1166
      creation, used for example to set (LVM) tags on LVs
1167

1168
  @return: the new unique_id of the device (this can sometime be
1169
      computed only after creation), or None. On secondary nodes,
1170
      it's not required to return anything.
1171

1172
  """
1173
  clist = []
1174
  if disk.children:
1175
    for child in disk.children:
1176
      try:
1177
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1178
      except errors.BlockDeviceError, err:
1179
        _Fail("Can't assemble device %s: %s", child, err)
1180
      if on_primary or disk.AssembleOnSecondary():
1181
        # we need the children open in case the device itself has to
1182
        # be assembled
1183
        try:
1184
          crdev.Open()
1185
        except errors.BlockDeviceError, err:
1186
          _Fail("Can't make child '%s' read-write: %s", child, err)
1187
      clist.append(crdev)
1188

    
1189
  try:
1190
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1191
  except errors.BlockDeviceError, err:
1192
    _Fail("Can't create block device: %s", err)
1193

    
1194
  if on_primary or disk.AssembleOnSecondary():
1195
    try:
1196
      device.Assemble()
1197
    except errors.BlockDeviceError, err:
1198
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1199
    device.SetSyncSpeed(constants.SYNC_SPEED)
1200
    if on_primary or disk.OpenOnSecondary():
1201
      try:
1202
        device.Open(force=True)
1203
      except errors.BlockDeviceError, err:
1204
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1205
    DevCacheManager.UpdateCache(device.dev_path, owner,
1206
                                on_primary, disk.iv_name)
1207

    
1208
  device.SetInfo(info)
1209

    
1210
  return device.unique_id
1211

    
1212

    
1213
def BlockdevRemove(disk):
1214
  """Remove a block device.
1215

1216
  @note: This is intended to be called recursively.
1217

1218
  @type disk: L{objects.Disk}
1219
  @param disk: the disk object we should remove
1220
  @rtype: boolean
1221
  @return: the success of the operation
1222

1223
  """
1224
  msgs = []
1225
  try:
1226
    rdev = _RecursiveFindBD(disk)
1227
  except errors.BlockDeviceError, err:
1228
    # probably can't attach
1229
    logging.info("Can't attach to device %s in remove", disk)
1230
    rdev = None
1231
  if rdev is not None:
1232
    r_path = rdev.dev_path
1233
    try:
1234
      rdev.Remove()
1235
    except errors.BlockDeviceError, err:
1236
      msgs.append(str(err))
1237
    if not msgs:
1238
      DevCacheManager.RemoveCache(r_path)
1239

    
1240
  if disk.children:
1241
    for child in disk.children:
1242
      try:
1243
        BlockdevRemove(child)
1244
      except RPCFail, err:
1245
        msgs.append(str(err))
1246

    
1247
  if msgs:
1248
    _Fail("; ".join(msgs))
1249

    
1250

    
1251
def _RecursiveAssembleBD(disk, owner, as_primary):
1252
  """Activate a block device for an instance.
1253

1254
  This is run on the primary and secondary nodes for an instance.
1255

1256
  @note: this function is called recursively.
1257

1258
  @type disk: L{objects.Disk}
1259
  @param disk: the disk we try to assemble
1260
  @type owner: str
1261
  @param owner: the name of the instance which owns the disk
1262
  @type as_primary: boolean
1263
  @param as_primary: if we should make the block device
1264
      read/write
1265

1266
  @return: the assembled device or None (in case no device
1267
      was assembled)
1268
  @raise errors.BlockDeviceError: in case there is an error
1269
      during the activation of the children or the device
1270
      itself
1271

1272
  """
1273
  children = []
1274
  if disk.children:
1275
    mcn = disk.ChildrenNeeded()
1276
    if mcn == -1:
1277
      mcn = 0 # max number of Nones allowed
1278
    else:
1279
      mcn = len(disk.children) - mcn # max number of Nones
1280
    for chld_disk in disk.children:
1281
      try:
1282
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1283
      except errors.BlockDeviceError, err:
1284
        if children.count(None) >= mcn:
1285
          raise
1286
        cdev = None
1287
        logging.error("Error in child activation (but continuing): %s",
1288
                      str(err))
1289
      children.append(cdev)
1290

    
1291
  if as_primary or disk.AssembleOnSecondary():
1292
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1293
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1294
    result = r_dev
1295
    if as_primary or disk.OpenOnSecondary():
1296
      r_dev.Open()
1297
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1298
                                as_primary, disk.iv_name)
1299

    
1300
  else:
1301
    result = True
1302
  return result
1303

    
1304

    
1305
def BlockdevAssemble(disk, owner, as_primary):
1306
  """Activate a block device for an instance.
1307

1308
  This is a wrapper over _RecursiveAssembleBD.
1309

1310
  @rtype: str or boolean
1311
  @return: a C{/dev/...} path for primary nodes, and
1312
      C{True} for secondary nodes
1313

1314
  """
1315
  try:
1316
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1317
    if isinstance(result, bdev.BlockDev):
1318
      result = result.dev_path
1319
  except errors.BlockDeviceError, err:
1320
    _Fail("Error while assembling disk: %s", err, exc=True)
1321

    
1322
  return result
1323

    
1324

    
1325
def BlockdevShutdown(disk):
1326
  """Shut down a block device.
1327

1328
  First, if the device is assembled (Attach() is successful), then
1329
  the device is shutdown. Then the children of the device are
1330
  shutdown.
1331

1332
  This function is called recursively. Note that we don't cache the
1333
  children or such, as oppossed to assemble, shutdown of different
1334
  devices doesn't require that the upper device was active.
1335

1336
  @type disk: L{objects.Disk}
1337
  @param disk: the description of the disk we should
1338
      shutdown
1339
  @rtype: None
1340

1341
  """
1342
  msgs = []
1343
  r_dev = _RecursiveFindBD(disk)
1344
  if r_dev is not None:
1345
    r_path = r_dev.dev_path
1346
    try:
1347
      r_dev.Shutdown()
1348
      DevCacheManager.RemoveCache(r_path)
1349
    except errors.BlockDeviceError, err:
1350
      msgs.append(str(err))
1351

    
1352
  if disk.children:
1353
    for child in disk.children:
1354
      try:
1355
        BlockdevShutdown(child)
1356
      except RPCFail, err:
1357
        msgs.append(str(err))
1358

    
1359
  if msgs:
1360
    _Fail("; ".join(msgs))
1361

    
1362

    
1363
def BlockdevAddchildren(parent_cdev, new_cdevs):
1364
  """Extend a mirrored block device.
1365

1366
  @type parent_cdev: L{objects.Disk}
1367
  @param parent_cdev: the disk to which we should add children
1368
  @type new_cdevs: list of L{objects.Disk}
1369
  @param new_cdevs: the list of children which we should add
1370
  @rtype: None
1371

1372
  """
1373
  parent_bdev = _RecursiveFindBD(parent_cdev)
1374
  if parent_bdev is None:
1375
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1376
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1377
  if new_bdevs.count(None) > 0:
1378
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1379
  parent_bdev.AddChildren(new_bdevs)
1380

    
1381

    
1382
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1383
  """Shrink a mirrored block device.
1384

1385
  @type parent_cdev: L{objects.Disk}
1386
  @param parent_cdev: the disk from which we should remove children
1387
  @type new_cdevs: list of L{objects.Disk}
1388
  @param new_cdevs: the list of children which we should remove
1389
  @rtype: None
1390

1391
  """
1392
  parent_bdev = _RecursiveFindBD(parent_cdev)
1393
  if parent_bdev is None:
1394
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1395
  devs = []
1396
  for disk in new_cdevs:
1397
    rpath = disk.StaticDevPath()
1398
    if rpath is None:
1399
      bd = _RecursiveFindBD(disk)
1400
      if bd is None:
1401
        _Fail("Can't find device %s while removing children", disk)
1402
      else:
1403
        devs.append(bd.dev_path)
1404
    else:
1405
      devs.append(rpath)
1406
  parent_bdev.RemoveChildren(devs)
1407

    
1408

    
1409
def BlockdevGetmirrorstatus(disks):
1410
  """Get the mirroring status of a list of devices.
1411

1412
  @type disks: list of L{objects.Disk}
1413
  @param disks: the list of disks which we should query
1414
  @rtype: disk
1415
  @return:
1416
      a list of (mirror_done, estimated_time) tuples, which
1417
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1418
  @raise errors.BlockDeviceError: if any of the disks cannot be
1419
      found
1420

1421
  """
1422
  stats = []
1423
  for dsk in disks:
1424
    rbd = _RecursiveFindBD(dsk)
1425
    if rbd is None:
1426
      _Fail("Can't find device %s", dsk)
1427

    
1428
    stats.append(rbd.CombinedSyncStatus())
1429

    
1430
  return stats
1431

    
1432

    
1433
def _RecursiveFindBD(disk):
1434
  """Check if a device is activated.
1435

1436
  If so, return information about the real device.
1437

1438
  @type disk: L{objects.Disk}
1439
  @param disk: the disk object we need to find
1440

1441
  @return: None if the device can't be found,
1442
      otherwise the device instance
1443

1444
  """
1445
  children = []
1446
  if disk.children:
1447
    for chdisk in disk.children:
1448
      children.append(_RecursiveFindBD(chdisk))
1449

    
1450
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1451

    
1452

    
1453
def BlockdevFind(disk):
1454
  """Check if a device is activated.
1455

1456
  If it is, return information about the real device.
1457

1458
  @type disk: L{objects.Disk}
1459
  @param disk: the disk to find
1460
  @rtype: None or objects.BlockDevStatus
1461
  @return: None if the disk cannot be found, otherwise a the current
1462
           information
1463

1464
  """
1465
  try:
1466
    rbd = _RecursiveFindBD(disk)
1467
  except errors.BlockDeviceError, err:
1468
    _Fail("Failed to find device: %s", err, exc=True)
1469

    
1470
  if rbd is None:
1471
    return None
1472

    
1473
  return rbd.GetSyncStatus()
1474

    
1475

    
1476
def BlockdevGetsize(disks):
1477
  """Computes the size of the given disks.
1478

1479
  If a disk is not found, returns None instead.
1480

1481
  @type disks: list of L{objects.Disk}
1482
  @param disks: the list of disk to compute the size for
1483
  @rtype: list
1484
  @return: list with elements None if the disk cannot be found,
1485
      otherwise the size
1486

1487
  """
1488
  result = []
1489
  for cf in disks:
1490
    try:
1491
      rbd = _RecursiveFindBD(cf)
1492
    except errors.BlockDeviceError, err:
1493
      result.append(None)
1494
      continue
1495
    if rbd is None:
1496
      result.append(None)
1497
    else:
1498
      result.append(rbd.GetActualSize())
1499
  return result
1500

    
1501

    
1502
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1503
  """Export a block device to a remote node.
1504

1505
  @type disk: L{objects.Disk}
1506
  @param disk: the description of the disk to export
1507
  @type dest_node: str
1508
  @param dest_node: the destination node to export to
1509
  @type dest_path: str
1510
  @param dest_path: the destination path on the target node
1511
  @type cluster_name: str
1512
  @param cluster_name: the cluster name, needed for SSH hostalias
1513
  @rtype: None
1514

1515
  """
1516
  real_disk = _RecursiveFindBD(disk)
1517
  if real_disk is None:
1518
    _Fail("Block device '%s' is not set up", disk)
1519

    
1520
  real_disk.Open()
1521

    
1522
  # the block size on the read dd is 1MiB to match our units
1523
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1524
                               "dd if=%s bs=1048576 count=%s",
1525
                               real_disk.dev_path, str(disk.size))
1526

    
1527
  # we set here a smaller block size as, due to ssh buffering, more
1528
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1529
  # device is not already there or we pass a wrong path; we use
1530
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1531
  # to not buffer too much memory; this means that at best, we flush
1532
  # every 64k, which will not be very fast
1533
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1534
                                " oflag=dsync", dest_path)
1535

    
1536
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1537
                                                   constants.GANETI_RUNAS,
1538
                                                   destcmd)
1539

    
1540
  # all commands have been checked, so we're safe to combine them
1541
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1542

    
1543
  result = utils.RunCmd(["bash", "-c", command])
1544

    
1545
  if result.failed:
1546
    _Fail("Disk copy command '%s' returned error: %s"
1547
          " output: %s", command, result.fail_reason, result.output)
1548

    
1549

    
1550
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1551
  """Write a file to the filesystem.
1552

1553
  This allows the master to overwrite(!) a file. It will only perform
1554
  the operation if the file belongs to a list of configuration files.
1555

1556
  @type file_name: str
1557
  @param file_name: the target file name
1558
  @type data: str
1559
  @param data: the new contents of the file
1560
  @type mode: int
1561
  @param mode: the mode to give the file (can be None)
1562
  @type uid: int
1563
  @param uid: the owner of the file (can be -1 for default)
1564
  @type gid: int
1565
  @param gid: the group of the file (can be -1 for default)
1566
  @type atime: float
1567
  @param atime: the atime to set on the file (can be None)
1568
  @type mtime: float
1569
  @param mtime: the mtime to set on the file (can be None)
1570
  @rtype: None
1571

1572
  """
1573
  if not os.path.isabs(file_name):
1574
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1575

    
1576
  if file_name not in _ALLOWED_UPLOAD_FILES:
1577
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1578
          file_name)
1579

    
1580
  raw_data = _Decompress(data)
1581

    
1582
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1583
                  atime=atime, mtime=mtime)
1584

    
1585

    
1586
def WriteSsconfFiles(values):
1587
  """Update all ssconf files.
1588

1589
  Wrapper around the SimpleStore.WriteFiles.
1590

1591
  """
1592
  ssconf.SimpleStore().WriteFiles(values)
1593

    
1594

    
1595
def _ErrnoOrStr(err):
1596
  """Format an EnvironmentError exception.
1597

1598
  If the L{err} argument has an errno attribute, it will be looked up
1599
  and converted into a textual C{E...} description. Otherwise the
1600
  string representation of the error will be returned.
1601

1602
  @type err: L{EnvironmentError}
1603
  @param err: the exception to format
1604

1605
  """
1606
  if hasattr(err, 'errno'):
1607
    detail = errno.errorcode[err.errno]
1608
  else:
1609
    detail = str(err)
1610
  return detail
1611

    
1612

    
1613
def _OSOndiskAPIVersion(name, os_dir):
1614
  """Compute and return the API version of a given OS.
1615

1616
  This function will try to read the API version of the OS given by
1617
  the 'name' parameter and residing in the 'os_dir' directory.
1618

1619
  @type name: str
1620
  @param name: the OS name we should look for
1621
  @type os_dir: str
1622
  @param os_dir: the directory inwhich we should look for the OS
1623
  @rtype: tuple
1624
  @return: tuple (status, data) with status denoting the validity and
1625
      data holding either the vaid versions or an error message
1626

1627
  """
1628
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1629

    
1630
  try:
1631
    st = os.stat(api_file)
1632
  except EnvironmentError, err:
1633
    return False, ("Required file '%s' not found under path %s: %s" %
1634
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1635

    
1636
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1637
    return False, ("File '%s' in %s is not a regular file" %
1638
                   (constants.OS_API_FILE, os_dir))
1639

    
1640
  try:
1641
    api_versions = utils.ReadFile(api_file).splitlines()
1642
  except EnvironmentError, err:
1643
    return False, ("Error while reading the API version file at %s: %s" %
1644
                   (api_file, _ErrnoOrStr(err)))
1645

    
1646
  try:
1647
    api_versions = [int(version.strip()) for version in api_versions]
1648
  except (TypeError, ValueError), err:
1649
    return False, ("API version(s) can't be converted to integer: %s" %
1650
                   str(err))
1651

    
1652
  return True, api_versions
1653

    
1654

    
1655
def DiagnoseOS(top_dirs=None):
1656
  """Compute the validity for all OSes.
1657

1658
  @type top_dirs: list
1659
  @param top_dirs: the list of directories in which to
1660
      search (if not given defaults to
1661
      L{constants.OS_SEARCH_PATH})
1662
  @rtype: list of L{objects.OS}
1663
  @return: a list of tuples (name, path, status, diagnose, variants)
1664
      for all (potential) OSes under all search paths, where:
1665
          - name is the (potential) OS name
1666
          - path is the full path to the OS
1667
          - status True/False is the validity of the OS
1668
          - diagnose is the error message for an invalid OS, otherwise empty
1669
          - variants is a list of supported OS variants, if any
1670

1671
  """
1672
  if top_dirs is None:
1673
    top_dirs = constants.OS_SEARCH_PATH
1674

    
1675
  result = []
1676
  for dir_name in top_dirs:
1677
    if os.path.isdir(dir_name):
1678
      try:
1679
        f_names = utils.ListVisibleFiles(dir_name)
1680
      except EnvironmentError, err:
1681
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1682
        break
1683
      for name in f_names:
1684
        os_path = os.path.sep.join([dir_name, name])
1685
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1686
        if status:
1687
          diagnose = ""
1688
          variants = os_inst.supported_variants
1689
        else:
1690
          diagnose = os_inst
1691
          variants = []
1692
        result.append((name, os_path, status, diagnose, variants))
1693

    
1694
  return result
1695

    
1696

    
1697
def _TryOSFromDisk(name, base_dir=None):
1698
  """Create an OS instance from disk.
1699

1700
  This function will return an OS instance if the given name is a
1701
  valid OS name.
1702

1703
  @type base_dir: string
1704
  @keyword base_dir: Base directory containing OS installations.
1705
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1706
  @rtype: tuple
1707
  @return: success and either the OS instance if we find a valid one,
1708
      or error message
1709

1710
  """
1711
  if base_dir is None:
1712
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1713
    if os_dir is None:
1714
      return False, "Directory for OS %s not found in search path" % name
1715
  else:
1716
    os_dir = os.path.sep.join([base_dir, name])
1717

    
1718
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1719
  if not status:
1720
    # push the error up
1721
    return status, api_versions
1722

    
1723
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1724
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1725
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1726

    
1727
  # OS Files dictionary, we will populate it with the absolute path names
1728
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1729

    
1730
  if max(api_versions) >= constants.OS_API_V15:
1731
    os_files[constants.OS_VARIANTS_FILE] = ''
1732

    
1733
  for filename in os_files:
1734
    os_files[filename] = os.path.sep.join([os_dir, filename])
1735

    
1736
    try:
1737
      st = os.stat(os_files[filename])
1738
    except EnvironmentError, err:
1739
      return False, ("File '%s' under path '%s' is missing (%s)" %
1740
                     (filename, os_dir, _ErrnoOrStr(err)))
1741

    
1742
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1743
      return False, ("File '%s' under path '%s' is not a regular file" %
1744
                     (filename, os_dir))
1745

    
1746
    if filename in constants.OS_SCRIPTS:
1747
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1748
        return False, ("File '%s' under path '%s' is not executable" %
1749
                       (filename, os_dir))
1750

    
1751
  variants = None
1752
  if constants.OS_VARIANTS_FILE in os_files:
1753
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1754
    try:
1755
      variants = utils.ReadFile(variants_file).splitlines()
1756
    except EnvironmentError, err:
1757
      return False, ("Error while reading the OS variants file at %s: %s" %
1758
                     (variants_file, _ErrnoOrStr(err)))
1759
    if not variants:
1760
      return False, ("No supported os variant found")
1761

    
1762
  os_obj = objects.OS(name=name, path=os_dir,
1763
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1764
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1765
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1766
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1767
                      supported_variants=variants,
1768
                      api_versions=api_versions)
1769
  return True, os_obj
1770

    
1771

    
1772
def OSFromDisk(name, base_dir=None):
1773
  """Create an OS instance from disk.
1774

1775
  This function will return an OS instance if the given name is a
1776
  valid OS name. Otherwise, it will raise an appropriate
1777
  L{RPCFail} exception, detailing why this is not a valid OS.
1778

1779
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1780
  an exception but returns true/false status data.
1781

1782
  @type base_dir: string
1783
  @keyword base_dir: Base directory containing OS installations.
1784
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1785
  @rtype: L{objects.OS}
1786
  @return: the OS instance if we find a valid one
1787
  @raise RPCFail: if we don't find a valid OS
1788

1789
  """
1790
  name_only = name.split("+", 1)[0]
1791
  status, payload = _TryOSFromDisk(name_only, base_dir)
1792

    
1793
  if not status:
1794
    _Fail(payload)
1795

    
1796
  return payload
1797

    
1798

    
1799
def OSEnvironment(instance, os, debug=0):
1800
  """Calculate the environment for an os script.
1801

1802
  @type instance: L{objects.Instance}
1803
  @param instance: target instance for the os script run
1804
  @type os: L{objects.OS}
1805
  @param os: operating system for which the environment is being built
1806
  @type debug: integer
1807
  @param debug: debug level (0 or 1, for OS Api 10)
1808
  @rtype: dict
1809
  @return: dict of environment variables
1810
  @raise errors.BlockDeviceError: if the block device
1811
      cannot be found
1812

1813
  """
1814
  result = {}
1815
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1816
  result['OS_API_VERSION'] = '%d' % api_version
1817
  result['INSTANCE_NAME'] = instance.name
1818
  result['INSTANCE_OS'] = instance.os
1819
  result['HYPERVISOR'] = instance.hypervisor
1820
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1821
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1822
  result['DEBUG_LEVEL'] = '%d' % debug
1823
  if api_version >= constants.OS_API_V15:
1824
    try:
1825
      variant = instance.os.split('+', 1)[1]
1826
    except IndexError:
1827
      variant = os.supported_variants[0]
1828
    result['OS_VARIANT'] = variant
1829
  for idx, disk in enumerate(instance.disks):
1830
    real_disk = _RecursiveFindBD(disk)
1831
    if real_disk is None:
1832
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1833
                                    str(disk))
1834
    real_disk.Open()
1835
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1836
    result['DISK_%d_ACCESS' % idx] = disk.mode
1837
    if constants.HV_DISK_TYPE in instance.hvparams:
1838
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1839
        instance.hvparams[constants.HV_DISK_TYPE]
1840
    if disk.dev_type in constants.LDS_BLOCK:
1841
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1842
    elif disk.dev_type == constants.LD_FILE:
1843
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1844
        'file:%s' % disk.physical_id[0]
1845
  for idx, nic in enumerate(instance.nics):
1846
    result['NIC_%d_MAC' % idx] = nic.mac
1847
    if nic.ip:
1848
      result['NIC_%d_IP' % idx] = nic.ip
1849
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1850
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1851
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1852
    if nic.nicparams[constants.NIC_LINK]:
1853
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1854
    if constants.HV_NIC_TYPE in instance.hvparams:
1855
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1856
        instance.hvparams[constants.HV_NIC_TYPE]
1857

    
1858
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1859
    for key, value in source.items():
1860
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1861

    
1862
  return result
1863

    
1864
def BlockdevGrow(disk, amount):
1865
  """Grow a stack of block devices.
1866

1867
  This function is called recursively, with the childrens being the
1868
  first ones to resize.
1869

1870
  @type disk: L{objects.Disk}
1871
  @param disk: the disk to be grown
1872
  @rtype: (status, result)
1873
  @return: a tuple with the status of the operation
1874
      (True/False), and the errors message if status
1875
      is False
1876

1877
  """
1878
  r_dev = _RecursiveFindBD(disk)
1879
  if r_dev is None:
1880
    _Fail("Cannot find block device %s", disk)
1881

    
1882
  try:
1883
    r_dev.Grow(amount)
1884
  except errors.BlockDeviceError, err:
1885
    _Fail("Failed to grow block device: %s", err, exc=True)
1886

    
1887

    
1888
def BlockdevSnapshot(disk):
1889
  """Create a snapshot copy of a block device.
1890

1891
  This function is called recursively, and the snapshot is actually created
1892
  just for the leaf lvm backend device.
1893

1894
  @type disk: L{objects.Disk}
1895
  @param disk: the disk to be snapshotted
1896
  @rtype: string
1897
  @return: snapshot disk path
1898

1899
  """
1900
  if disk.children:
1901
    if len(disk.children) == 1:
1902
      # only one child, let's recurse on it
1903
      return BlockdevSnapshot(disk.children[0])
1904
    else:
1905
      # more than one child, choose one that matches
1906
      for child in disk.children:
1907
        if child.size == disk.size:
1908
          # return implies breaking the loop
1909
          return BlockdevSnapshot(child)
1910
  elif disk.dev_type == constants.LD_LV:
1911
    r_dev = _RecursiveFindBD(disk)
1912
    if r_dev is not None:
1913
      # let's stay on the safe side and ask for the full size, for now
1914
      return r_dev.Snapshot(disk.size)
1915
    else:
1916
      _Fail("Cannot find block device %s", disk)
1917
  else:
1918
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1919
          disk.unique_id, disk.dev_type)
1920

    
1921

    
1922
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1923
  """Export a block device snapshot to a remote node.
1924

1925
  @type disk: L{objects.Disk}
1926
  @param disk: the description of the disk to export
1927
  @type dest_node: str
1928
  @param dest_node: the destination node to export to
1929
  @type instance: L{objects.Instance}
1930
  @param instance: the instance object to whom the disk belongs
1931
  @type cluster_name: str
1932
  @param cluster_name: the cluster name, needed for SSH hostalias
1933
  @type idx: int
1934
  @param idx: the index of the disk in the instance's disk list,
1935
      used to export to the OS scripts environment
1936
  @rtype: None
1937

1938
  """
1939
  inst_os = OSFromDisk(instance.os)
1940
  export_env = OSEnvironment(instance, inst_os)
1941

    
1942
  export_script = inst_os.export_script
1943

    
1944
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1945
                                     instance.name, int(time.time()))
1946
  if not os.path.exists(constants.LOG_OS_DIR):
1947
    os.mkdir(constants.LOG_OS_DIR, 0750)
1948
  real_disk = _RecursiveFindBD(disk)
1949
  if real_disk is None:
1950
    _Fail("Block device '%s' is not set up", disk)
1951

    
1952
  real_disk.Open()
1953

    
1954
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1955
  export_env['EXPORT_INDEX'] = str(idx)
1956

    
1957
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1958
  destfile = disk.physical_id[1]
1959

    
1960
  # the target command is built out of three individual commands,
1961
  # which are joined by pipes; we check each individual command for
1962
  # valid parameters
1963
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1964
                               inst_os.path, export_script, logfile)
1965

    
1966
  comprcmd = "gzip"
1967

    
1968
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1969
                                destdir, destdir, destfile)
1970
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1971
                                                   constants.GANETI_RUNAS,
1972
                                                   destcmd)
1973

    
1974
  # all commands have been checked, so we're safe to combine them
1975
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1976

    
1977
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1978

    
1979
  if result.failed:
1980
    _Fail("OS snapshot export command '%s' returned error: %s"
1981
          " output: %s", command, result.fail_reason, result.output)
1982

    
1983

    
1984
def FinalizeExport(instance, snap_disks):
1985
  """Write out the export configuration information.
1986

1987
  @type instance: L{objects.Instance}
1988
  @param instance: the instance which we export, used for
1989
      saving configuration
1990
  @type snap_disks: list of L{objects.Disk}
1991
  @param snap_disks: list of snapshot block devices, which
1992
      will be used to get the actual name of the dump file
1993

1994
  @rtype: None
1995

1996
  """
1997
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1998
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1999

    
2000
  config = objects.SerializableConfigParser()
2001

    
2002
  config.add_section(constants.INISECT_EXP)
2003
  config.set(constants.INISECT_EXP, 'version', '0')
2004
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2005
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2006
  config.set(constants.INISECT_EXP, 'os', instance.os)
2007
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2008

    
2009
  config.add_section(constants.INISECT_INS)
2010
  config.set(constants.INISECT_INS, 'name', instance.name)
2011
  config.set(constants.INISECT_INS, 'memory', '%d' %
2012
             instance.beparams[constants.BE_MEMORY])
2013
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2014
             instance.beparams[constants.BE_VCPUS])
2015
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2016

    
2017
  nic_total = 0
2018
  for nic_count, nic in enumerate(instance.nics):
2019
    nic_total += 1
2020
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2021
               nic_count, '%s' % nic.mac)
2022
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2023
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2024
               '%s' % nic.bridge)
2025
  # TODO: redundant: on load can read nics until it doesn't exist
2026
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2027

    
2028
  disk_total = 0
2029
  for disk_count, disk in enumerate(snap_disks):
2030
    if disk:
2031
      disk_total += 1
2032
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2033
                 ('%s' % disk.iv_name))
2034
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2035
                 ('%s' % disk.physical_id[1]))
2036
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2037
                 ('%d' % disk.size))
2038

    
2039
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2040

    
2041
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2042
                  data=config.Dumps())
2043
  shutil.rmtree(finaldestdir, True)
2044
  shutil.move(destdir, finaldestdir)
2045

    
2046

    
2047
def ExportInfo(dest):
2048
  """Get export configuration information.
2049

2050
  @type dest: str
2051
  @param dest: directory containing the export
2052

2053
  @rtype: L{objects.SerializableConfigParser}
2054
  @return: a serializable config file containing the
2055
      export info
2056

2057
  """
2058
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2059

    
2060
  config = objects.SerializableConfigParser()
2061
  config.read(cff)
2062

    
2063
  if (not config.has_section(constants.INISECT_EXP) or
2064
      not config.has_section(constants.INISECT_INS)):
2065
    _Fail("Export info file doesn't have the required fields")
2066

    
2067
  return config.Dumps()
2068

    
2069

    
2070
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2071
  """Import an os image into an instance.
2072

2073
  @type instance: L{objects.Instance}
2074
  @param instance: instance to import the disks into
2075
  @type src_node: string
2076
  @param src_node: source node for the disk images
2077
  @type src_images: list of string
2078
  @param src_images: absolute paths of the disk images
2079
  @rtype: list of boolean
2080
  @return: each boolean represent the success of importing the n-th disk
2081

2082
  """
2083
  inst_os = OSFromDisk(instance.os)
2084
  import_env = OSEnvironment(instance, inst_os)
2085
  import_script = inst_os.import_script
2086

    
2087
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2088
                                        instance.name, int(time.time()))
2089
  if not os.path.exists(constants.LOG_OS_DIR):
2090
    os.mkdir(constants.LOG_OS_DIR, 0750)
2091

    
2092
  comprcmd = "gunzip"
2093
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2094
                               import_script, logfile)
2095

    
2096
  final_result = []
2097
  for idx, image in enumerate(src_images):
2098
    if image:
2099
      destcmd = utils.BuildShellCmd('cat %s', image)
2100
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2101
                                                       constants.GANETI_RUNAS,
2102
                                                       destcmd)
2103
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2104
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2105
      import_env['IMPORT_INDEX'] = str(idx)
2106
      result = utils.RunCmd(command, env=import_env)
2107
      if result.failed:
2108
        logging.error("Disk import command '%s' returned error: %s"
2109
                      " output: %s", command, result.fail_reason,
2110
                      result.output)
2111
        final_result.append("error importing disk %d: %s, %s" %
2112
                            (idx, result.fail_reason, result.output[-100]))
2113

    
2114
  if final_result:
2115
    _Fail("; ".join(final_result), log=False)
2116

    
2117

    
2118
def ListExports():
2119
  """Return a list of exports currently available on this machine.
2120

2121
  @rtype: list
2122
  @return: list of the exports
2123

2124
  """
2125
  if os.path.isdir(constants.EXPORT_DIR):
2126
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2127
  else:
2128
    _Fail("No exports directory")
2129

    
2130

    
2131
def RemoveExport(export):
2132
  """Remove an existing export from the node.
2133

2134
  @type export: str
2135
  @param export: the name of the export to remove
2136
  @rtype: None
2137

2138
  """
2139
  target = os.path.join(constants.EXPORT_DIR, export)
2140

    
2141
  try:
2142
    shutil.rmtree(target)
2143
  except EnvironmentError, err:
2144
    _Fail("Error while removing the export: %s", err, exc=True)
2145

    
2146

    
2147
def BlockdevRename(devlist):
2148
  """Rename a list of block devices.
2149

2150
  @type devlist: list of tuples
2151
  @param devlist: list of tuples of the form  (disk,
2152
      new_logical_id, new_physical_id); disk is an
2153
      L{objects.Disk} object describing the current disk,
2154
      and new logical_id/physical_id is the name we
2155
      rename it to
2156
  @rtype: boolean
2157
  @return: True if all renames succeeded, False otherwise
2158

2159
  """
2160
  msgs = []
2161
  result = True
2162
  for disk, unique_id in devlist:
2163
    dev = _RecursiveFindBD(disk)
2164
    if dev is None:
2165
      msgs.append("Can't find device %s in rename" % str(disk))
2166
      result = False
2167
      continue
2168
    try:
2169
      old_rpath = dev.dev_path
2170
      dev.Rename(unique_id)
2171
      new_rpath = dev.dev_path
2172
      if old_rpath != new_rpath:
2173
        DevCacheManager.RemoveCache(old_rpath)
2174
        # FIXME: we should add the new cache information here, like:
2175
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2176
        # but we don't have the owner here - maybe parse from existing
2177
        # cache? for now, we only lose lvm data when we rename, which
2178
        # is less critical than DRBD or MD
2179
    except errors.BlockDeviceError, err:
2180
      msgs.append("Can't rename device '%s' to '%s': %s" %
2181
                  (dev, unique_id, err))
2182
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2183
      result = False
2184
  if not result:
2185
    _Fail("; ".join(msgs))
2186

    
2187

    
2188
def _TransformFileStorageDir(file_storage_dir):
2189
  """Checks whether given file_storage_dir is valid.
2190

2191
  Checks wheter the given file_storage_dir is within the cluster-wide
2192
  default file_storage_dir stored in SimpleStore. Only paths under that
2193
  directory are allowed.
2194

2195
  @type file_storage_dir: str
2196
  @param file_storage_dir: the path to check
2197

2198
  @return: the normalized path if valid, None otherwise
2199

2200
  """
2201
  cfg = _GetConfig()
2202
  file_storage_dir = os.path.normpath(file_storage_dir)
2203
  base_file_storage_dir = cfg.GetFileStorageDir()
2204
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2205
      base_file_storage_dir):
2206
    _Fail("File storage directory '%s' is not under base file"
2207
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2208
  return file_storage_dir
2209

    
2210

    
2211
def CreateFileStorageDir(file_storage_dir):
2212
  """Create file storage directory.
2213

2214
  @type file_storage_dir: str
2215
  @param file_storage_dir: directory to create
2216

2217
  @rtype: tuple
2218
  @return: tuple with first element a boolean indicating wheter dir
2219
      creation was successful or not
2220

2221
  """
2222
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2223
  if os.path.exists(file_storage_dir):
2224
    if not os.path.isdir(file_storage_dir):
2225
      _Fail("Specified storage dir '%s' is not a directory",
2226
            file_storage_dir)
2227
  else:
2228
    try:
2229
      os.makedirs(file_storage_dir, 0750)
2230
    except OSError, err:
2231
      _Fail("Cannot create file storage directory '%s': %s",
2232
            file_storage_dir, err, exc=True)
2233

    
2234

    
2235
def RemoveFileStorageDir(file_storage_dir):
2236
  """Remove file storage directory.
2237

2238
  Remove it only if it's empty. If not log an error and return.
2239

2240
  @type file_storage_dir: str
2241
  @param file_storage_dir: the directory we should cleanup
2242
  @rtype: tuple (success,)
2243
  @return: tuple of one element, C{success}, denoting
2244
      whether the operation was successful
2245

2246
  """
2247
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2248
  if os.path.exists(file_storage_dir):
2249
    if not os.path.isdir(file_storage_dir):
2250
      _Fail("Specified Storage directory '%s' is not a directory",
2251
            file_storage_dir)
2252
    # deletes dir only if empty, otherwise we want to fail the rpc call
2253
    try:
2254
      os.rmdir(file_storage_dir)
2255
    except OSError, err:
2256
      _Fail("Cannot remove file storage directory '%s': %s",
2257
            file_storage_dir, err)
2258

    
2259

    
2260
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2261
  """Rename the file storage directory.
2262

2263
  @type old_file_storage_dir: str
2264
  @param old_file_storage_dir: the current path
2265
  @type new_file_storage_dir: str
2266
  @param new_file_storage_dir: the name we should rename to
2267
  @rtype: tuple (success,)
2268
  @return: tuple of one element, C{success}, denoting
2269
      whether the operation was successful
2270

2271
  """
2272
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2273
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2274
  if not os.path.exists(new_file_storage_dir):
2275
    if os.path.isdir(old_file_storage_dir):
2276
      try:
2277
        os.rename(old_file_storage_dir, new_file_storage_dir)
2278
      except OSError, err:
2279
        _Fail("Cannot rename '%s' to '%s': %s",
2280
              old_file_storage_dir, new_file_storage_dir, err)
2281
    else:
2282
      _Fail("Specified storage dir '%s' is not a directory",
2283
            old_file_storage_dir)
2284
  else:
2285
    if os.path.exists(old_file_storage_dir):
2286
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2287
            old_file_storage_dir, new_file_storage_dir)
2288

    
2289

    
2290
def _EnsureJobQueueFile(file_name):
2291
  """Checks whether the given filename is in the queue directory.
2292

2293
  @type file_name: str
2294
  @param file_name: the file name we should check
2295
  @rtype: None
2296
  @raises RPCFail: if the file is not valid
2297

2298
  """
2299
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2300
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2301

    
2302
  if not result:
2303
    _Fail("Passed job queue file '%s' does not belong to"
2304
          " the queue directory '%s'", file_name, queue_dir)
2305

    
2306

    
2307
def JobQueueUpdate(file_name, content):
2308
  """Updates a file in the queue directory.
2309

2310
  This is just a wrapper over L{utils.WriteFile}, with proper
2311
  checking.
2312

2313
  @type file_name: str
2314
  @param file_name: the job file name
2315
  @type content: str
2316
  @param content: the new job contents
2317
  @rtype: boolean
2318
  @return: the success of the operation
2319

2320
  """
2321
  _EnsureJobQueueFile(file_name)
2322

    
2323
  # Write and replace the file atomically
2324
  utils.WriteFile(file_name, data=_Decompress(content))
2325

    
2326

    
2327
def JobQueueRename(old, new):
2328
  """Renames a job queue file.
2329

2330
  This is just a wrapper over os.rename with proper checking.
2331

2332
  @type old: str
2333
  @param old: the old (actual) file name
2334
  @type new: str
2335
  @param new: the desired file name
2336
  @rtype: tuple
2337
  @return: the success of the operation and payload
2338

2339
  """
2340
  _EnsureJobQueueFile(old)
2341
  _EnsureJobQueueFile(new)
2342

    
2343
  utils.RenameFile(old, new, mkdir=True)
2344

    
2345

    
2346
def JobQueueSetDrainFlag(drain_flag):
2347
  """Set the drain flag for the queue.
2348

2349
  This will set or unset the queue drain flag.
2350

2351
  @type drain_flag: boolean
2352
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2353
  @rtype: truple
2354
  @return: always True, None
2355
  @warning: the function always returns True
2356

2357
  """
2358
  if drain_flag:
2359
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2360
  else:
2361
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2362

    
2363

    
2364
def BlockdevClose(instance_name, disks):
2365
  """Closes the given block devices.
2366

2367
  This means they will be switched to secondary mode (in case of
2368
  DRBD).
2369

2370
  @param instance_name: if the argument is not empty, the symlinks
2371
      of this instance will be removed
2372
  @type disks: list of L{objects.Disk}
2373
  @param disks: the list of disks to be closed
2374
  @rtype: tuple (success, message)
2375
  @return: a tuple of success and message, where success
2376
      indicates the succes of the operation, and message
2377
      which will contain the error details in case we
2378
      failed
2379

2380
  """
2381
  bdevs = []
2382
  for cf in disks:
2383
    rd = _RecursiveFindBD(cf)
2384
    if rd is None:
2385
      _Fail("Can't find device %s", cf)
2386
    bdevs.append(rd)
2387

    
2388
  msg = []
2389
  for rd in bdevs:
2390
    try:
2391
      rd.Close()
2392
    except errors.BlockDeviceError, err:
2393
      msg.append(str(err))
2394
  if msg:
2395
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2396
  else:
2397
    if instance_name:
2398
      _RemoveBlockDevLinks(instance_name, disks)
2399

    
2400

    
2401
def ValidateHVParams(hvname, hvparams):
2402
  """Validates the given hypervisor parameters.
2403

2404
  @type hvname: string
2405
  @param hvname: the hypervisor name
2406
  @type hvparams: dict
2407
  @param hvparams: the hypervisor parameters to be validated
2408
  @rtype: None
2409

2410
  """
2411
  try:
2412
    hv_type = hypervisor.GetHypervisor(hvname)
2413
    hv_type.ValidateParameters(hvparams)
2414
  except errors.HypervisorError, err:
2415
    _Fail(str(err), log=False)
2416

    
2417

    
2418
def DemoteFromMC():
2419
  """Demotes the current node from master candidate role.
2420

2421
  """
2422
  # try to ensure we're not the master by mistake
2423
  master, myself = ssconf.GetMasterAndMyself()
2424
  if master == myself:
2425
    _Fail("ssconf status shows I'm the master node, will not demote")
2426
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2427
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2428
    _Fail("The master daemon is running, will not demote")
2429
  try:
2430
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2431
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2432
  except EnvironmentError, err:
2433
    if err.errno != errno.ENOENT:
2434
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2435
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2436

    
2437

    
2438
def _FindDisks(nodes_ip, disks):
2439
  """Sets the physical ID on disks and returns the block devices.
2440

2441
  """
2442
  # set the correct physical ID
2443
  my_name = utils.HostInfo().name
2444
  for cf in disks:
2445
    cf.SetPhysicalID(my_name, nodes_ip)
2446

    
2447
  bdevs = []
2448

    
2449
  for cf in disks:
2450
    rd = _RecursiveFindBD(cf)
2451
    if rd is None:
2452
      _Fail("Can't find device %s", cf)
2453
    bdevs.append(rd)
2454
  return bdevs
2455

    
2456

    
2457
def DrbdDisconnectNet(nodes_ip, disks):
2458
  """Disconnects the network on a list of drbd devices.
2459

2460
  """
2461
  bdevs = _FindDisks(nodes_ip, disks)
2462

    
2463
  # disconnect disks
2464
  for rd in bdevs:
2465
    try:
2466
      rd.DisconnectNet()
2467
    except errors.BlockDeviceError, err:
2468
      _Fail("Can't change network configuration to standalone mode: %s",
2469
            err, exc=True)
2470

    
2471

    
2472
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2473
  """Attaches the network on a list of drbd devices.
2474

2475
  """
2476
  bdevs = _FindDisks(nodes_ip, disks)
2477

    
2478
  if multimaster:
2479
    for idx, rd in enumerate(bdevs):
2480
      try:
2481
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2482
      except EnvironmentError, err:
2483
        _Fail("Can't create symlink: %s", err)
2484
  # reconnect disks, switch to new master configuration and if
2485
  # needed primary mode
2486
  for rd in bdevs:
2487
    try:
2488
      rd.AttachNet(multimaster)
2489
    except errors.BlockDeviceError, err:
2490
      _Fail("Can't change network configuration: %s", err)
2491
  # wait until the disks are connected; we need to retry the re-attach
2492
  # if the device becomes standalone, as this might happen if the one
2493
  # node disconnects and reconnects in a different mode before the
2494
  # other node reconnects; in this case, one or both of the nodes will
2495
  # decide it has wrong configuration and switch to standalone
2496
  RECONNECT_TIMEOUT = 2 * 60
2497
  sleep_time = 0.100 # start with 100 miliseconds
2498
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2499
  while time.time() < timeout_limit:
2500
    all_connected = True
2501
    for rd in bdevs:
2502
      stats = rd.GetProcStatus()
2503
      if not (stats.is_connected or stats.is_in_resync):
2504
        all_connected = False
2505
      if stats.is_standalone:
2506
        # peer had different config info and this node became
2507
        # standalone, even though this should not happen with the
2508
        # new staged way of changing disk configs
2509
        try:
2510
          rd.AttachNet(multimaster)
2511
        except errors.BlockDeviceError, err:
2512
          _Fail("Can't change network configuration: %s", err)
2513
    if all_connected:
2514
      break
2515
    time.sleep(sleep_time)
2516
    sleep_time = min(5, sleep_time * 1.5)
2517
  if not all_connected:
2518
    _Fail("Timeout in disk reconnecting")
2519
  if multimaster:
2520
    # change to primary mode
2521
    for rd in bdevs:
2522
      try:
2523
        rd.Open()
2524
      except errors.BlockDeviceError, err:
2525
        _Fail("Can't change to primary mode: %s", err)
2526

    
2527

    
2528
def DrbdWaitSync(nodes_ip, disks):
2529
  """Wait until DRBDs have synchronized.
2530

2531
  """
2532
  bdevs = _FindDisks(nodes_ip, disks)
2533

    
2534
  min_resync = 100
2535
  alldone = True
2536
  for rd in bdevs:
2537
    stats = rd.GetProcStatus()
2538
    if not (stats.is_connected or stats.is_in_resync):
2539
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2540
    alldone = alldone and (not stats.is_in_resync)
2541
    if stats.sync_percent is not None:
2542
      min_resync = min(min_resync, stats.sync_percent)
2543

    
2544
  return (alldone, min_resync)
2545

    
2546

    
2547
def PowercycleNode(hypervisor_type):
2548
  """Hard-powercycle the node.
2549

2550
  Because we need to return first, and schedule the powercycle in the
2551
  background, we won't be able to report failures nicely.
2552

2553
  """
2554
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2555
  try:
2556
    pid = os.fork()
2557
  except OSError:
2558
    # if we can't fork, we'll pretend that we're in the child process
2559
    pid = 0
2560
  if pid > 0:
2561
    return "Reboot scheduled in 5 seconds"
2562
  time.sleep(5)
2563
  hyper.PowercycleNode()
2564

    
2565

    
2566
class HooksRunner(object):
2567
  """Hook runner.
2568

2569
  This class is instantiated on the node side (ganeti-noded) and not
2570
  on the master side.
2571

2572
  """
2573
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2574

    
2575
  def __init__(self, hooks_base_dir=None):
2576
    """Constructor for hooks runner.
2577

2578
    @type hooks_base_dir: str or None
2579
    @param hooks_base_dir: if not None, this overrides the
2580
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2581

2582
    """
2583
    if hooks_base_dir is None:
2584
      hooks_base_dir = constants.HOOKS_BASE_DIR
2585
    self._BASE_DIR = hooks_base_dir
2586

    
2587
  @staticmethod
2588
  def ExecHook(script, env):
2589
    """Exec one hook script.
2590

2591
    @type script: str
2592
    @param script: the full path to the script
2593
    @type env: dict
2594
    @param env: the environment with which to exec the script
2595
    @rtype: tuple (success, message)
2596
    @return: a tuple of success and message, where success
2597
        indicates the succes of the operation, and message
2598
        which will contain the error details in case we
2599
        failed
2600

2601
    """
2602
    # exec the process using subprocess and log the output
2603
    fdstdin = None
2604
    try:
2605
      fdstdin = open("/dev/null", "r")
2606
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2607
                               stderr=subprocess.STDOUT, close_fds=True,
2608
                               shell=False, cwd="/", env=env)
2609
      output = ""
2610
      try:
2611
        output = child.stdout.read(4096)
2612
        child.stdout.close()
2613
      except EnvironmentError, err:
2614
        output += "Hook script error: %s" % str(err)
2615

    
2616
      while True:
2617
        try:
2618
          result = child.wait()
2619
          break
2620
        except EnvironmentError, err:
2621
          if err.errno == errno.EINTR:
2622
            continue
2623
          raise
2624
    finally:
2625
      # try not to leak fds
2626
      for fd in (fdstdin, ):
2627
        if fd is not None:
2628
          try:
2629
            fd.close()
2630
          except EnvironmentError, err:
2631
            # just log the error
2632
            #logging.exception("Error while closing fd %s", fd)
2633
            pass
2634

    
2635
    return result == 0, utils.SafeEncode(output.strip())
2636

    
2637
  def RunHooks(self, hpath, phase, env):
2638
    """Run the scripts in the hooks directory.
2639

2640
    @type hpath: str
2641
    @param hpath: the path to the hooks directory which
2642
        holds the scripts
2643
    @type phase: str
2644
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2645
        L{constants.HOOKS_PHASE_POST}
2646
    @type env: dict
2647
    @param env: dictionary with the environment for the hook
2648
    @rtype: list
2649
    @return: list of 3-element tuples:
2650
      - script path
2651
      - script result, either L{constants.HKR_SUCCESS} or
2652
        L{constants.HKR_FAIL}
2653
      - output of the script
2654

2655
    @raise errors.ProgrammerError: for invalid input
2656
        parameters
2657

2658
    """
2659
    if phase == constants.HOOKS_PHASE_PRE:
2660
      suffix = "pre"
2661
    elif phase == constants.HOOKS_PHASE_POST:
2662
      suffix = "post"
2663
    else:
2664
      _Fail("Unknown hooks phase '%s'", phase)
2665

    
2666
    rr = []
2667

    
2668
    subdir = "%s-%s.d" % (hpath, suffix)
2669
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2670
    try:
2671
      dir_contents = utils.ListVisibleFiles(dir_name)
2672
    except OSError:
2673
      # FIXME: must log output in case of failures
2674
      return rr
2675

    
2676
    # we use the standard python sort order,
2677
    # so 00name is the recommended naming scheme
2678
    dir_contents.sort()
2679
    for relname in dir_contents:
2680
      fname = os.path.join(dir_name, relname)
2681
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2682
          self.RE_MASK.match(relname) is not None):
2683
        rrval = constants.HKR_SKIP
2684
        output = ""
2685
      else:
2686
        result, output = self.ExecHook(fname, env)
2687
        if not result:
2688
          rrval = constants.HKR_FAIL
2689
        else:
2690
          rrval = constants.HKR_SUCCESS
2691
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2692

    
2693
    return rr
2694

    
2695

    
2696
class IAllocatorRunner(object):
2697
  """IAllocator runner.
2698

2699
  This class is instantiated on the node side (ganeti-noded) and not on
2700
  the master side.
2701

2702
  """
2703
  def Run(self, name, idata):
2704
    """Run an iallocator script.
2705

2706
    @type name: str
2707
    @param name: the iallocator script name
2708
    @type idata: str
2709
    @param idata: the allocator input data
2710

2711
    @rtype: tuple
2712
    @return: two element tuple of:
2713
       - status
2714
       - either error message or stdout of allocator (for success)
2715

2716
    """
2717
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2718
                                  os.path.isfile)
2719
    if alloc_script is None:
2720
      _Fail("iallocator module '%s' not found in the search path", name)
2721

    
2722
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2723
    try:
2724
      os.write(fd, idata)
2725
      os.close(fd)
2726
      result = utils.RunCmd([alloc_script, fin_name])
2727
      if result.failed:
2728
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2729
              name, result.fail_reason, result.output)
2730
    finally:
2731
      os.unlink(fin_name)
2732

    
2733
    return result.stdout
2734

    
2735

    
2736
class DevCacheManager(object):
2737
  """Simple class for managing a cache of block device information.
2738

2739
  """
2740
  _DEV_PREFIX = "/dev/"
2741
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2742

    
2743
  @classmethod
2744
  def _ConvertPath(cls, dev_path):
2745
    """Converts a /dev/name path to the cache file name.
2746

2747
    This replaces slashes with underscores and strips the /dev
2748
    prefix. It then returns the full path to the cache file.
2749

2750
    @type dev_path: str
2751
    @param dev_path: the C{/dev/} path name
2752
    @rtype: str
2753
    @return: the converted path name
2754

2755
    """
2756
    if dev_path.startswith(cls._DEV_PREFIX):
2757
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2758
    dev_path = dev_path.replace("/", "_")
2759
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2760
    return fpath
2761

    
2762
  @classmethod
2763
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2764
    """Updates the cache information for a given device.
2765

2766
    @type dev_path: str
2767
    @param dev_path: the pathname of the device
2768
    @type owner: str
2769
    @param owner: the owner (instance name) of the device
2770
    @type on_primary: bool
2771
    @param on_primary: whether this is the primary
2772
        node nor not
2773
    @type iv_name: str
2774
    @param iv_name: the instance-visible name of the
2775
        device, as in objects.Disk.iv_name
2776

2777
    @rtype: None
2778

2779
    """
2780
    if dev_path is None:
2781
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2782
      return
2783
    fpath = cls._ConvertPath(dev_path)
2784
    if on_primary:
2785
      state = "primary"
2786
    else:
2787
      state = "secondary"
2788
    if iv_name is None:
2789
      iv_name = "not_visible"
2790
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2791
    try:
2792
      utils.WriteFile(fpath, data=fdata)
2793
    except EnvironmentError, err:
2794
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2795

    
2796
  @classmethod
2797
  def RemoveCache(cls, dev_path):
2798
    """Remove data for a dev_path.
2799

2800
    This is just a wrapper over L{utils.RemoveFile} with a converted
2801
    path name and logging.
2802

2803
    @type dev_path: str
2804
    @param dev_path: the pathname of the device
2805

2806
    @rtype: None
2807

2808
    """
2809
    if dev_path is None:
2810
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2811
      return
2812
    fpath = cls._ConvertPath(dev_path)
2813
    try:
2814
      utils.RemoveFile(fpath)
2815
    except EnvironmentError, err:
2816
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)