Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ ca77edbc

History | View | Annotate | Download (78 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=None):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100

101
  """
102
  if not os.path.isdir(path):
103
    return
104
  if exclude is None:
105
    exclude = []
106
  else:
107
    # Normalize excluded paths
108
    exclude = [os.path.normpath(i) for i in exclude]
109

    
110
  for rel_name in utils.ListVisibleFiles(path):
111
    full_name = os.path.normpath(os.path.join(path, rel_name))
112
    if full_name in exclude:
113
      continue
114
    if os.path.isfile(full_name) and not os.path.islink(full_name):
115
      utils.RemoveFile(full_name)
116

    
117

    
118
def JobQueuePurge():
119
  """Removes job queue files and archived jobs.
120

121
  @rtype: None
122

123
  """
124
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126

    
127

    
128
def GetMasterInfo():
129
  """Returns master information.
130

131
  This is an utility function to compute master information, either
132
  for consumption here or from the node daemon.
133

134
  @rtype: tuple
135
  @return: (master_netdev, master_ip, master_name) if we have a good
136
      configuration, otherwise (None, None, None)
137

138
  """
139
  try:
140
    cfg = _GetConfig()
141
    master_netdev = cfg.GetMasterNetdev()
142
    master_ip = cfg.GetMasterIP()
143
    master_node = cfg.GetMasterNode()
144
  except errors.ConfigurationError, err:
145
    logging.exception("Cluster configuration incomplete")
146
    return (None, None, None)
147
  return (master_netdev, master_ip, master_node)
148

    
149

    
150
def StartMaster(start_daemons):
151
  """Activate local node as master node.
152

153
  The function will always try activate the IP address of the master
154
  (unless someone else has it). It will also start the master daemons,
155
  based on the start_daemons parameter.
156

157
  @type start_daemons: boolean
158
  @param start_daemons: whther to also start the master
159
      daemons (ganeti-masterd and ganeti-rapi)
160
  @rtype: None
161

162
  """
163
  ok = True
164
  master_netdev, master_ip, _ = GetMasterInfo()
165
  if not master_netdev:
166
    return False
167

    
168
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169
    if utils.OwnIpAddress(master_ip):
170
      # we already have the ip:
171
      logging.debug("Already started")
172
    else:
173
      logging.error("Someone else has the master ip, not activating")
174
      ok = False
175
  else:
176
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177
                           "dev", master_netdev, "label",
178
                           "%s:0" % master_netdev])
179
    if result.failed:
180
      logging.error("Can't activate master IP: %s", result.output)
181
      ok = False
182

    
183
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184
                           "-s", master_ip, master_ip])
185
    # we'll ignore the exit code of arping
186

    
187
  # and now start the master and rapi daemons
188
  if start_daemons:
189
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
190
      result = utils.RunCmd([daemon])
191
      if result.failed:
192
        logging.error("Can't start daemon %s: %s", daemon, result.output)
193
        ok = False
194
  return ok
195

    
196

    
197
def StopMaster(stop_daemons):
198
  """Deactivate this node as master.
199

200
  The function will always try to deactivate the IP address of the
201
  master. It will also stop the master daemons depending on the
202
  stop_daemons parameter.
203

204
  @type stop_daemons: boolean
205
  @param stop_daemons: whether to also stop the master daemons
206
      (ganeti-masterd and ganeti-rapi)
207
  @rtype: None
208

209
  """
210
  master_netdev, master_ip, _ = GetMasterInfo()
211
  if not master_netdev:
212
    return False
213

    
214
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215
                         "dev", master_netdev])
216
  if result.failed:
217
    logging.error("Can't remove the master IP, error: %s", result.output)
218
    # but otherwise ignore the failure
219

    
220
  if stop_daemons:
221
    # stop/kill the rapi and the master daemon
222
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224

    
225
  return True
226

    
227

    
228
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229
  """Joins this node to the cluster.
230

231
  This does the following:
232
      - updates the hostkeys of the machine (rsa and dsa)
233
      - adds the ssh private key to the user
234
      - adds the ssh public key to the users' authorized_keys file
235

236
  @type dsa: str
237
  @param dsa: the DSA private key to write
238
  @type dsapub: str
239
  @param dsapub: the DSA public key to write
240
  @type rsa: str
241
  @param rsa: the RSA private key to write
242
  @type rsapub: str
243
  @param rsapub: the RSA public key to write
244
  @type sshkey: str
245
  @param sshkey: the SSH private key to write
246
  @type sshpub: str
247
  @param sshpub: the SSH public key to write
248
  @rtype: boolean
249
  @return: the success of the operation
250

251
  """
252
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256
  for name, content, mode in sshd_keys:
257
    utils.WriteFile(name, data=content, mode=mode)
258

    
259
  try:
260
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261
                                                    mkdir=True)
262
  except errors.OpExecError, err:
263
    logging.exception("Error while processing user ssh files")
264
    return False
265

    
266
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267
    utils.WriteFile(name, data=content, mode=0600)
268

    
269
  utils.AddAuthorizedKey(auth_keys, sshpub)
270

    
271
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272

    
273
  return True
274

    
275

    
276
def LeaveCluster():
277
  """Cleans up and remove the current node.
278

279
  This function cleans up and prepares the current node to be removed
280
  from the cluster.
281

282
  If processing is successful, then it raises an
283
  L{errors.QuitGanetiException} which is used as a special case to
284
  shutdown the node daemon.
285

286
  """
287
  _CleanDirectory(constants.DATA_DIR)
288
  JobQueuePurge()
289

    
290
  try:
291
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292
  except errors.OpExecError:
293
    logging.exception("Error while processing ssh files")
294
    return
295

    
296
  f = open(pub_key, 'r')
297
  try:
298
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299
  finally:
300
    f.close()
301

    
302
  utils.RemoveFile(priv_key)
303
  utils.RemoveFile(pub_key)
304

    
305
  # Return a reassuring string to the caller, and quit
306
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307

    
308

    
309
def GetNodeInfo(vgname, hypervisor_type):
310
  """Gives back a hash with different informations about the node.
311

312
  @type vgname: C{string}
313
  @param vgname: the name of the volume group to ask for disk space information
314
  @type hypervisor_type: C{str}
315
  @param hypervisor_type: the name of the hypervisor to ask for
316
      memory information
317
  @rtype: C{dict}
318
  @return: dictionary with the following keys:
319
      - vg_size is the size of the configured volume group in MiB
320
      - vg_free is the free size of the volume group in MiB
321
      - memory_dom0 is the memory allocated for domain0 in MiB
322
      - memory_free is the currently available (free) ram in MiB
323
      - memory_total is the total number of ram in MiB
324

325
  """
326
  outputarray = {}
327
  vginfo = _GetVGInfo(vgname)
328
  outputarray['vg_size'] = vginfo['vg_size']
329
  outputarray['vg_free'] = vginfo['vg_free']
330

    
331
  hyper = hypervisor.GetHypervisor(hypervisor_type)
332
  hyp_info = hyper.GetNodeInfo()
333
  if hyp_info is not None:
334
    outputarray.update(hyp_info)
335

    
336
  f = open("/proc/sys/kernel/random/boot_id", 'r')
337
  try:
338
    outputarray["bootid"] = f.read(128).rstrip("\n")
339
  finally:
340
    f.close()
341

    
342
  return outputarray
343

    
344

    
345
def VerifyNode(what, cluster_name):
346
  """Verify the status of the local node.
347

348
  Based on the input L{what} parameter, various checks are done on the
349
  local node.
350

351
  If the I{filelist} key is present, this list of
352
  files is checksummed and the file/checksum pairs are returned.
353

354
  If the I{nodelist} key is present, we check that we have
355
  connectivity via ssh with the target nodes (and check the hostname
356
  report).
357

358
  If the I{node-net-test} key is present, we check that we have
359
  connectivity to the given nodes via both primary IP and, if
360
  applicable, secondary IPs.
361

362
  @type what: C{dict}
363
  @param what: a dictionary of things to check:
364
      - filelist: list of files for which to compute checksums
365
      - nodelist: list of nodes we should check ssh communication with
366
      - node-net-test: list of nodes we should check node daemon port
367
        connectivity with
368
      - hypervisor: list with hypervisors to run the verify for
369
  @rtype: dict
370
  @return: a dictionary with the same keys as the input dict, and
371
      values representing the result of the checks
372

373
  """
374
  result = {}
375

    
376
  if constants.NV_HYPERVISOR in what:
377
    result[constants.NV_HYPERVISOR] = tmp = {}
378
    for hv_name in what[constants.NV_HYPERVISOR]:
379
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380

    
381
  if constants.NV_FILELIST in what:
382
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
383
      what[constants.NV_FILELIST])
384

    
385
  if constants.NV_NODELIST in what:
386
    result[constants.NV_NODELIST] = tmp = {}
387
    random.shuffle(what[constants.NV_NODELIST])
388
    for node in what[constants.NV_NODELIST]:
389
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390
      if not success:
391
        tmp[node] = message
392

    
393
  if constants.NV_NODENETTEST in what:
394
    result[constants.NV_NODENETTEST] = tmp = {}
395
    my_name = utils.HostInfo().name
396
    my_pip = my_sip = None
397
    for name, pip, sip in what[constants.NV_NODENETTEST]:
398
      if name == my_name:
399
        my_pip = pip
400
        my_sip = sip
401
        break
402
    if not my_pip:
403
      tmp[my_name] = ("Can't find my own primary/secondary IP"
404
                      " in the node list")
405
    else:
406
      port = utils.GetNodeDaemonPort()
407
      for name, pip, sip in what[constants.NV_NODENETTEST]:
408
        fail = []
409
        if not utils.TcpPing(pip, port, source=my_pip):
410
          fail.append("primary")
411
        if sip != pip:
412
          if not utils.TcpPing(sip, port, source=my_sip):
413
            fail.append("secondary")
414
        if fail:
415
          tmp[name] = ("failure using the %s interface(s)" %
416
                       " and ".join(fail))
417

    
418
  if constants.NV_LVLIST in what:
419
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420

    
421
  if constants.NV_INSTANCELIST in what:
422
    result[constants.NV_INSTANCELIST] = GetInstanceList(
423
      what[constants.NV_INSTANCELIST])
424

    
425
  if constants.NV_VGLIST in what:
426
    result[constants.NV_VGLIST] = ListVolumeGroups()
427

    
428
  if constants.NV_VERSION in what:
429
    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430

    
431
  if constants.NV_HVINFO in what:
432
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434

    
435
  if constants.NV_DRBDLIST in what:
436
    try:
437
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
438
    except errors.BlockDeviceError:
439
      logging.warning("Can't get used minors list", exc_info=True)
440
      used_minors = []
441
    result[constants.NV_DRBDLIST] = used_minors
442

    
443
  return result
444

    
445

    
446
def GetVolumeList(vg_name):
447
  """Compute list of logical volumes and their size.
448

449
  @type vg_name: str
450
  @param vg_name: the volume group whose LVs we should list
451
  @rtype: dict
452
  @return:
453
      dictionary of all partions (key) with value being a tuple of
454
      their size (in MiB), inactive and online status::
455

456
        {'test1': ('20.06', True, True)}
457

458
      in case of errors, a string is returned with the error
459
      details.
460

461
  """
462
  lvs = {}
463
  sep = '|'
464
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
465
                         "--separator=%s" % sep,
466
                         "-olv_name,lv_size,lv_attr", vg_name])
467
  if result.failed:
468
    logging.error("Failed to list logical volumes, lvs output: %s",
469
                  result.output)
470
    return result.output
471

    
472
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
473
  for line in result.stdout.splitlines():
474
    line = line.strip()
475
    match = valid_line_re.match(line)
476
    if not match:
477
      logging.error("Invalid line returned from lvs output: '%s'", line)
478
      continue
479
    name, size, attr = match.groups()
480
    inactive = attr[4] == '-'
481
    online = attr[5] == 'o'
482
    lvs[name] = (size, inactive, online)
483

    
484
  return lvs
485

    
486

    
487
def ListVolumeGroups():
488
  """List the volume groups and their size.
489

490
  @rtype: dict
491
  @return: dictionary with keys volume name and values the
492
      size of the volume
493

494
  """
495
  return utils.ListVolumeGroups()
496

    
497

    
498
def NodeVolumes():
499
  """List all volumes on this node.
500

501
  @rtype: list
502
  @return:
503
    A list of dictionaries, each having four keys:
504
      - name: the logical volume name,
505
      - size: the size of the logical volume
506
      - dev: the physical device on which the LV lives
507
      - vg: the volume group to which it belongs
508

509
    In case of errors, we return an empty list and log the
510
    error.
511

512
    Note that since a logical volume can live on multiple physical
513
    volumes, the resulting list might include a logical volume
514
    multiple times.
515

516
  """
517
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
518
                         "--separator=|",
519
                         "--options=lv_name,lv_size,devices,vg_name"])
520
  if result.failed:
521
    logging.error("Failed to list logical volumes, lvs output: %s",
522
                  result.output)
523
    return []
524

    
525
  def parse_dev(dev):
526
    if '(' in dev:
527
      return dev.split('(')[0]
528
    else:
529
      return dev
530

    
531
  def map_line(line):
532
    return {
533
      'name': line[0].strip(),
534
      'size': line[1].strip(),
535
      'dev': parse_dev(line[2].strip()),
536
      'vg': line[3].strip(),
537
    }
538

    
539
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
540
          if line.count('|') >= 3]
541

    
542

    
543
def BridgesExist(bridges_list):
544
  """Check if a list of bridges exist on the current node.
545

546
  @rtype: boolean
547
  @return: C{True} if all of them exist, C{False} otherwise
548

549
  """
550
  for bridge in bridges_list:
551
    if not utils.BridgeExists(bridge):
552
      return False
553

    
554
  return True
555

    
556

    
557
def GetInstanceList(hypervisor_list):
558
  """Provides a list of instances.
559

560
  @type hypervisor_list: list
561
  @param hypervisor_list: the list of hypervisors to query information
562

563
  @rtype: list
564
  @return: a list of all running instances on the current node
565
    - instance1.example.com
566
    - instance2.example.com
567

568
  """
569
  results = []
570
  for hname in hypervisor_list:
571
    try:
572
      names = hypervisor.GetHypervisor(hname).ListInstances()
573
      results.extend(names)
574
    except errors.HypervisorError, err:
575
      logging.exception("Error enumerating instances for hypevisor %s", hname)
576
      # FIXME: should we somehow not propagate this to the master?
577
      raise
578

    
579
  return results
580

    
581

    
582
def GetInstanceInfo(instance, hname):
583
  """Gives back the informations about an instance as a dictionary.
584

585
  @type instance: string
586
  @param instance: the instance name
587
  @type hname: string
588
  @param hname: the hypervisor type of the instance
589

590
  @rtype: dict
591
  @return: dictionary with the following keys:
592
      - memory: memory size of instance (int)
593
      - state: xen state of instance (string)
594
      - time: cpu time of instance (float)
595

596
  """
597
  output = {}
598

    
599
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
600
  if iinfo is not None:
601
    output['memory'] = iinfo[2]
602
    output['state'] = iinfo[4]
603
    output['time'] = iinfo[5]
604

    
605
  return output
606

    
607

    
608
def GetInstanceMigratable(instance):
609
  """Gives whether an instance can be migrated.
610

611
  @type instance: L{objects.Instance}
612
  @param instance: object representing the instance to be checked.
613

614
  @rtype: tuple
615
  @return: tuple of (result, description) where:
616
      - result: whether the instance can be migrated or not
617
      - description: a description of the issue, if relevant
618

619
  """
620
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
621
  if instance.name not in hyper.ListInstances():
622
    return (False, 'not running')
623

    
624
  for idx in range(len(instance.disks)):
625
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
626
    if not os.path.islink(link_name):
627
      return (False, 'not restarted since ganeti 1.2.5')
628

    
629
  return (True, '')
630

    
631

    
632
def GetAllInstancesInfo(hypervisor_list):
633
  """Gather data about all instances.
634

635
  This is the equivalent of L{GetInstanceInfo}, except that it
636
  computes data for all instances at once, thus being faster if one
637
  needs data about more than one instance.
638

639
  @type hypervisor_list: list
640
  @param hypervisor_list: list of hypervisors to query for instance data
641

642
  @rtype: dict
643
  @return: dictionary of instance: data, with data having the following keys:
644
      - memory: memory size of instance (int)
645
      - state: xen state of instance (string)
646
      - time: cpu time of instance (float)
647
      - vcpus: the number of vcpus
648

649
  """
650
  output = {}
651

    
652
  for hname in hypervisor_list:
653
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
654
    if iinfo:
655
      for name, inst_id, memory, vcpus, state, times in iinfo:
656
        value = {
657
          'memory': memory,
658
          'vcpus': vcpus,
659
          'state': state,
660
          'time': times,
661
          }
662
        if name in output and output[name] != value:
663
          raise errors.HypervisorError("Instance %s running duplicate"
664
                                       " with different parameters" % name)
665
        output[name] = value
666

    
667
  return output
668

    
669

    
670
def AddOSToInstance(instance):
671
  """Add an OS to an instance.
672

673
  @type instance: L{objects.Instance}
674
  @param instance: Instance whose OS is to be installed
675
  @rtype: boolean
676
  @return: the success of the operation
677

678
  """
679
  inst_os = OSFromDisk(instance.os)
680

    
681
  create_env = OSEnvironment(instance)
682

    
683
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
684
                                     instance.name, int(time.time()))
685

    
686
  result = utils.RunCmd([inst_os.create_script], env=create_env,
687
                        cwd=inst_os.path, output=logfile,)
688
  if result.failed:
689
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
690
                  " output: %s", result.cmd, result.fail_reason, logfile,
691
                  result.output)
692
    lines = [val.encode("string_escape")
693
             for val in utils.TailFile(logfile, lines=20)]
694
    return (False, "OS create script failed (%s), last lines in the"
695
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
696

    
697
  return (True, "Successfully installed")
698

    
699

    
700
def RunRenameInstance(instance, old_name):
701
  """Run the OS rename script for an instance.
702

703
  @type instance: L{objects.Instance}
704
  @param instance: Instance whose OS is to be installed
705
  @type old_name: string
706
  @param old_name: previous instance name
707
  @rtype: boolean
708
  @return: the success of the operation
709

710
  """
711
  inst_os = OSFromDisk(instance.os)
712

    
713
  rename_env = OSEnvironment(instance)
714
  rename_env['OLD_INSTANCE_NAME'] = old_name
715

    
716
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
717
                                           old_name,
718
                                           instance.name, int(time.time()))
719

    
720
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
721
                        cwd=inst_os.path, output=logfile)
722

    
723
  if result.failed:
724
    logging.error("os create command '%s' returned error: %s output: %s",
725
                  result.cmd, result.fail_reason, result.output)
726
    lines = [val.encode("string_escape")
727
             for val in utils.TailFile(logfile, lines=20)]
728
    return (False, "OS rename script failed (%s), last lines in the"
729
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
730

    
731
  return (True, "Rename successful")
732

    
733

    
734
def _GetVGInfo(vg_name):
735
  """Get informations about the volume group.
736

737
  @type vg_name: str
738
  @param vg_name: the volume group which we query
739
  @rtype: dict
740
  @return:
741
    A dictionary with the following keys:
742
      - C{vg_size} is the total size of the volume group in MiB
743
      - C{vg_free} is the free size of the volume group in MiB
744
      - C{pv_count} are the number of physical disks in that VG
745

746
    If an error occurs during gathering of data, we return the same dict
747
    with keys all set to None.
748

749
  """
750
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
751

    
752
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
753
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
754

    
755
  if retval.failed:
756
    logging.error("volume group %s not present", vg_name)
757
    return retdic
758
  valarr = retval.stdout.strip().rstrip(':').split(':')
759
  if len(valarr) == 3:
760
    try:
761
      retdic = {
762
        "vg_size": int(round(float(valarr[0]), 0)),
763
        "vg_free": int(round(float(valarr[1]), 0)),
764
        "pv_count": int(valarr[2]),
765
        }
766
    except ValueError, err:
767
      logging.exception("Fail to parse vgs output")
768
  else:
769
    logging.error("vgs output has the wrong number of fields (expected"
770
                  " three): %s", str(valarr))
771
  return retdic
772

    
773

    
774
def _GetBlockDevSymlinkPath(instance_name, idx):
775
  return os.path.join(constants.DISK_LINKS_DIR,
776
                      "%s:%d" % (instance_name, idx))
777

    
778

    
779
def _SymlinkBlockDev(instance_name, device_path, idx):
780
  """Set up symlinks to a instance's block device.
781

782
  This is an auxiliary function run when an instance is start (on the primary
783
  node) or when an instance is migrated (on the target node).
784

785

786
  @param instance_name: the name of the target instance
787
  @param device_path: path of the physical block device, on the node
788
  @param idx: the disk index
789
  @return: absolute path to the disk's symlink
790

791
  """
792
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
793
  try:
794
    os.symlink(device_path, link_name)
795
  except OSError, err:
796
    if err.errno == errno.EEXIST:
797
      if (not os.path.islink(link_name) or
798
          os.readlink(link_name) != device_path):
799
        os.remove(link_name)
800
        os.symlink(device_path, link_name)
801
    else:
802
      raise
803

    
804
  return link_name
805

    
806

    
807
def _RemoveBlockDevLinks(instance_name, disks):
808
  """Remove the block device symlinks belonging to the given instance.
809

810
  """
811
  for idx, disk in enumerate(disks):
812
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
813
    if os.path.islink(link_name):
814
      try:
815
        os.remove(link_name)
816
      except OSError:
817
        logging.exception("Can't remove symlink '%s'", link_name)
818

    
819

    
820
def _GatherAndLinkBlockDevs(instance):
821
  """Set up an instance's block device(s).
822

823
  This is run on the primary node at instance startup. The block
824
  devices must be already assembled.
825

826
  @type instance: L{objects.Instance}
827
  @param instance: the instance whose disks we shoul assemble
828
  @rtype: list
829
  @return: list of (disk_object, device_path)
830

831
  """
832
  block_devices = []
833
  for idx, disk in enumerate(instance.disks):
834
    device = _RecursiveFindBD(disk)
835
    if device is None:
836
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
837
                                    str(disk))
838
    device.Open()
839
    try:
840
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
841
    except OSError, e:
842
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
843
                                    e.strerror)
844

    
845
    block_devices.append((disk, link_name))
846

    
847
  return block_devices
848

    
849

    
850
def StartInstance(instance, extra_args):
851
  """Start an instance.
852

853
  @type instance: L{objects.Instance}
854
  @param instance: the instance object
855
  @rtype: boolean
856
  @return: whether the startup was successful or not
857

858
  """
859
  running_instances = GetInstanceList([instance.hypervisor])
860

    
861
  if instance.name in running_instances:
862
    return (True, "Already running")
863

    
864
  try:
865
    block_devices = _GatherAndLinkBlockDevs(instance)
866
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
867
    hyper.StartInstance(instance, block_devices, extra_args)
868
  except errors.BlockDeviceError, err:
869
    logging.exception("Failed to start instance")
870
    return (False, "Block device error: %s" % str(err))
871
  except errors.HypervisorError, err:
872
    logging.exception("Failed to start instance")
873
    _RemoveBlockDevLinks(instance.name, instance.disks)
874
    return (False, "Hypervisor error: %s" % str(err))
875

    
876
  return (True, "Instance started successfully")
877

    
878

    
879
def ShutdownInstance(instance):
880
  """Shut an instance down.
881

882
  @note: this functions uses polling with a hardcoded timeout.
883

884
  @type instance: L{objects.Instance}
885
  @param instance: the instance object
886
  @rtype: boolean
887
  @return: whether the startup was successful or not
888

889
  """
890
  hv_name = instance.hypervisor
891
  running_instances = GetInstanceList([hv_name])
892

    
893
  if instance.name not in running_instances:
894
    return True
895

    
896
  hyper = hypervisor.GetHypervisor(hv_name)
897
  try:
898
    hyper.StopInstance(instance)
899
  except errors.HypervisorError, err:
900
    logging.error("Failed to stop instance: %s" % err)
901
    return False
902

    
903
  # test every 10secs for 2min
904

    
905
  time.sleep(1)
906
  for dummy in range(11):
907
    if instance.name not in GetInstanceList([hv_name]):
908
      break
909
    time.sleep(10)
910
  else:
911
    # the shutdown did not succeed
912
    logging.error("Shutdown of '%s' unsuccessful, using destroy",
913
                  instance.name)
914

    
915
    try:
916
      hyper.StopInstance(instance, force=True)
917
    except errors.HypervisorError, err:
918
      logging.exception("Failed to stop instance: %s" % err)
919
      return False
920

    
921
    time.sleep(1)
922
    if instance.name in GetInstanceList([hv_name]):
923
      logging.error("Could not shutdown instance '%s' even by destroy",
924
                    instance.name)
925
      return False
926

    
927
  _RemoveBlockDevLinks(instance.name, instance.disks)
928

    
929
  return True
930

    
931

    
932
def RebootInstance(instance, reboot_type, extra_args):
933
  """Reboot an instance.
934

935
  @type instance: L{objects.Instance}
936
  @param instance: the instance object to reboot
937
  @type reboot_type: str
938
  @param reboot_type: the type of reboot, one the following
939
    constants:
940
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
941
        instance OS, do not recreate the VM
942
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
943
        restart the VM (at the hypervisor level)
944
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
945
        is not accepted here, since that mode is handled
946
        differently
947
  @rtype: boolean
948
  @return: the success of the operation
949

950
  """
951
  running_instances = GetInstanceList([instance.hypervisor])
952

    
953
  if instance.name not in running_instances:
954
    logging.error("Cannot reboot instance that is not running")
955
    return False
956

    
957
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
958
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
959
    try:
960
      hyper.RebootInstance(instance)
961
    except errors.HypervisorError, err:
962
      logging.exception("Failed to soft reboot instance")
963
      return False
964
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
965
    try:
966
      ShutdownInstance(instance)
967
      StartInstance(instance, extra_args)
968
    except errors.HypervisorError, err:
969
      logging.exception("Failed to hard reboot instance")
970
      return False
971
  else:
972
    raise errors.ParameterError("reboot_type invalid")
973

    
974
  return True
975

    
976

    
977
def MigrationInfo(instance):
978
  """Gather information about an instance to be migrated.
979

980
  @type instance: L{objects.Instance}
981
  @param instance: the instance definition
982

983
  """
984
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
985
  try:
986
    info = hyper.MigrationInfo(instance)
987
  except errors.HypervisorError, err:
988
    msg = "Failed to fetch migration information"
989
    logging.exception(msg)
990
    return (False, '%s: %s' % (msg, err))
991
  return (True, info)
992

    
993

    
994
def AcceptInstance(instance, info, target):
995
  """Prepare the node to accept an instance.
996

997
  @type instance: L{objects.Instance}
998
  @param instance: the instance definition
999
  @type info: string/data (opaque)
1000
  @param info: migration information, from the source node
1001
  @type target: string
1002
  @param target: target host (usually ip), on this node
1003

1004
  """
1005
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1006
  try:
1007
    hyper.AcceptInstance(instance, info, target)
1008
  except errors.HypervisorError, err:
1009
    msg = "Failed to accept instance"
1010
    logging.exception(msg)
1011
    return (False, '%s: %s' % (msg, err))
1012
  return (True, "Accept successfull")
1013

    
1014

    
1015
def FinalizeMigration(instance, info, success):
1016
  """Finalize any preparation to accept an instance.
1017

1018
  @type instance: L{objects.Instance}
1019
  @param instance: the instance definition
1020
  @type info: string/data (opaque)
1021
  @param info: migration information, from the source node
1022
  @type success: boolean
1023
  @param success: whether the migration was a success or a failure
1024

1025
  """
1026
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1027
  try:
1028
    hyper.FinalizeMigration(instance, info, success)
1029
  except errors.HypervisorError, err:
1030
    msg = "Failed to finalize migration"
1031
    logging.exception(msg)
1032
    return (False, '%s: %s' % (msg, err))
1033
  return (True, "Migration Finalized")
1034

    
1035

    
1036
def MigrateInstance(instance, target, live):
1037
  """Migrates an instance to another node.
1038

1039
  @type instance: L{objects.Instance}
1040
  @param instance: the instance definition
1041
  @type target: string
1042
  @param target: the target node name
1043
  @type live: boolean
1044
  @param live: whether the migration should be done live or not (the
1045
      interpretation of this parameter is left to the hypervisor)
1046
  @rtype: tuple
1047
  @return: a tuple of (success, msg) where:
1048
      - succes is a boolean denoting the success/failure of the operation
1049
      - msg is a string with details in case of failure
1050

1051
  """
1052
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1053

    
1054
  try:
1055
    hyper.MigrateInstance(instance.name, target, live)
1056
  except errors.HypervisorError, err:
1057
    msg = "Failed to migrate instance"
1058
    logging.exception(msg)
1059
    return (False, "%s: %s" % (msg, err))
1060
  return (True, "Migration successfull")
1061

    
1062

    
1063
def CreateBlockDevice(disk, size, owner, on_primary, info):
1064
  """Creates a block device for an instance.
1065

1066
  @type disk: L{objects.Disk}
1067
  @param disk: the object describing the disk we should create
1068
  @type size: int
1069
  @param size: the size of the physical underlying device, in MiB
1070
  @type owner: str
1071
  @param owner: the name of the instance for which disk is created,
1072
      used for device cache data
1073
  @type on_primary: boolean
1074
  @param on_primary:  indicates if it is the primary node or not
1075
  @type info: string
1076
  @param info: string that will be sent to the physical device
1077
      creation, used for example to set (LVM) tags on LVs
1078

1079
  @return: the new unique_id of the device (this can sometime be
1080
      computed only after creation), or None. On secondary nodes,
1081
      it's not required to return anything.
1082

1083
  """
1084
  clist = []
1085
  if disk.children:
1086
    for child in disk.children:
1087
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
1088
      if on_primary or disk.AssembleOnSecondary():
1089
        # we need the children open in case the device itself has to
1090
        # be assembled
1091
        crdev.Open()
1092
      clist.append(crdev)
1093

    
1094
  try:
1095
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1096
  except errors.GenericError, err:
1097
    return False, "Can't create block device: %s" % str(err)
1098

    
1099
  if on_primary or disk.AssembleOnSecondary():
1100
    if not device.Assemble():
1101
      errorstring = "Can't assemble device after creation, very unusual event"
1102
      logging.error(errorstring)
1103
      return False, errorstring
1104
    device.SetSyncSpeed(constants.SYNC_SPEED)
1105
    if on_primary or disk.OpenOnSecondary():
1106
      device.Open(force=True)
1107
    DevCacheManager.UpdateCache(device.dev_path, owner,
1108
                                on_primary, disk.iv_name)
1109

    
1110
  device.SetInfo(info)
1111

    
1112
  physical_id = device.unique_id
1113
  return True, physical_id
1114

    
1115

    
1116
def RemoveBlockDevice(disk):
1117
  """Remove a block device.
1118

1119
  @note: This is intended to be called recursively.
1120

1121
  @type disk: L{objects.Disk}
1122
  @param disk: the disk object we should remove
1123
  @rtype: boolean
1124
  @return: the success of the operation
1125

1126
  """
1127
  try:
1128
    rdev = _RecursiveFindBD(disk)
1129
  except errors.BlockDeviceError, err:
1130
    # probably can't attach
1131
    logging.info("Can't attach to device %s in remove", disk)
1132
    rdev = None
1133
  if rdev is not None:
1134
    r_path = rdev.dev_path
1135
    result = rdev.Remove()
1136
    if result:
1137
      DevCacheManager.RemoveCache(r_path)
1138
  else:
1139
    result = True
1140
  if disk.children:
1141
    for child in disk.children:
1142
      result = result and RemoveBlockDevice(child)
1143
  return result
1144

    
1145

    
1146
def _RecursiveAssembleBD(disk, owner, as_primary):
1147
  """Activate a block device for an instance.
1148

1149
  This is run on the primary and secondary nodes for an instance.
1150

1151
  @note: this function is called recursively.
1152

1153
  @type disk: L{objects.Disk}
1154
  @param disk: the disk we try to assemble
1155
  @type owner: str
1156
  @param owner: the name of the instance which owns the disk
1157
  @type as_primary: boolean
1158
  @param as_primary: if we should make the block device
1159
      read/write
1160

1161
  @return: the assembled device or None (in case no device
1162
      was assembled)
1163
  @raise errors.BlockDeviceError: in case there is an error
1164
      during the activation of the children or the device
1165
      itself
1166

1167
  """
1168
  children = []
1169
  if disk.children:
1170
    mcn = disk.ChildrenNeeded()
1171
    if mcn == -1:
1172
      mcn = 0 # max number of Nones allowed
1173
    else:
1174
      mcn = len(disk.children) - mcn # max number of Nones
1175
    for chld_disk in disk.children:
1176
      try:
1177
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1178
      except errors.BlockDeviceError, err:
1179
        if children.count(None) >= mcn:
1180
          raise
1181
        cdev = None
1182
        logging.debug("Error in child activation: %s", str(err))
1183
      children.append(cdev)
1184

    
1185
  if as_primary or disk.AssembleOnSecondary():
1186
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1187
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1188
    result = r_dev
1189
    if as_primary or disk.OpenOnSecondary():
1190
      r_dev.Open()
1191
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1192
                                as_primary, disk.iv_name)
1193

    
1194
  else:
1195
    result = True
1196
  return result
1197

    
1198

    
1199
def AssembleBlockDevice(disk, owner, as_primary):
1200
  """Activate a block device for an instance.
1201

1202
  This is a wrapper over _RecursiveAssembleBD.
1203

1204
  @rtype: str or boolean
1205
  @return: a C{/dev/...} path for primary nodes, and
1206
      C{True} for secondary nodes
1207

1208
  """
1209
  result = _RecursiveAssembleBD(disk, owner, as_primary)
1210
  if isinstance(result, bdev.BlockDev):
1211
    result = result.dev_path
1212
  return result
1213

    
1214

    
1215
def ShutdownBlockDevice(disk):
1216
  """Shut down a block device.
1217

1218
  First, if the device is assembled (Attach() is successfull), then
1219
  the device is shutdown. Then the children of the device are
1220
  shutdown.
1221

1222
  This function is called recursively. Note that we don't cache the
1223
  children or such, as oppossed to assemble, shutdown of different
1224
  devices doesn't require that the upper device was active.
1225

1226
  @type disk: L{objects.Disk}
1227
  @param disk: the description of the disk we should
1228
      shutdown
1229
  @rtype: boolean
1230
  @return: the success of the operation
1231

1232
  """
1233
  r_dev = _RecursiveFindBD(disk)
1234
  if r_dev is not None:
1235
    r_path = r_dev.dev_path
1236
    result = r_dev.Shutdown()
1237
    if result:
1238
      DevCacheManager.RemoveCache(r_path)
1239
  else:
1240
    result = True
1241
  if disk.children:
1242
    for child in disk.children:
1243
      result = result and ShutdownBlockDevice(child)
1244
  return result
1245

    
1246

    
1247
def MirrorAddChildren(parent_cdev, new_cdevs):
1248
  """Extend a mirrored block device.
1249

1250
  @type parent_cdev: L{objects.Disk}
1251
  @param parent_cdev: the disk to which we should add children
1252
  @type new_cdevs: list of L{objects.Disk}
1253
  @param new_cdevs: the list of children which we should add
1254
  @rtype: boolean
1255
  @return: the success of the operation
1256

1257
  """
1258
  parent_bdev = _RecursiveFindBD(parent_cdev)
1259
  if parent_bdev is None:
1260
    logging.error("Can't find parent device")
1261
    return False
1262
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1263
  if new_bdevs.count(None) > 0:
1264
    logging.error("Can't find new device(s) to add: %s:%s",
1265
                  new_bdevs, new_cdevs)
1266
    return False
1267
  parent_bdev.AddChildren(new_bdevs)
1268
  return True
1269

    
1270

    
1271
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1272
  """Shrink a mirrored block device.
1273

1274
  @type parent_cdev: L{objects.Disk}
1275
  @param parent_cdev: the disk from which we should remove children
1276
  @type new_cdevs: list of L{objects.Disk}
1277
  @param new_cdevs: the list of children which we should remove
1278
  @rtype: boolean
1279
  @return: the success of the operation
1280

1281
  """
1282
  parent_bdev = _RecursiveFindBD(parent_cdev)
1283
  if parent_bdev is None:
1284
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1285
    return False
1286
  devs = []
1287
  for disk in new_cdevs:
1288
    rpath = disk.StaticDevPath()
1289
    if rpath is None:
1290
      bd = _RecursiveFindBD(disk)
1291
      if bd is None:
1292
        logging.error("Can't find dynamic device %s while removing children",
1293
                      disk)
1294
        return False
1295
      else:
1296
        devs.append(bd.dev_path)
1297
    else:
1298
      devs.append(rpath)
1299
  parent_bdev.RemoveChildren(devs)
1300
  return True
1301

    
1302

    
1303
def GetMirrorStatus(disks):
1304
  """Get the mirroring status of a list of devices.
1305

1306
  @type disks: list of L{objects.Disk}
1307
  @param disks: the list of disks which we should query
1308
  @rtype: disk
1309
  @return:
1310
      a list of (mirror_done, estimated_time) tuples, which
1311
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1312
  @raise errors.BlockDeviceError: if any of the disks cannot be
1313
      found
1314

1315
  """
1316
  stats = []
1317
  for dsk in disks:
1318
    rbd = _RecursiveFindBD(dsk)
1319
    if rbd is None:
1320
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1321
    stats.append(rbd.CombinedSyncStatus())
1322
  return stats
1323

    
1324

    
1325
def _RecursiveFindBD(disk):
1326
  """Check if a device is activated.
1327

1328
  If so, return informations about the real device.
1329

1330
  @type disk: L{objects.Disk}
1331
  @param disk: the disk object we need to find
1332

1333
  @return: None if the device can't be found,
1334
      otherwise the device instance
1335

1336
  """
1337
  children = []
1338
  if disk.children:
1339
    for chdisk in disk.children:
1340
      children.append(_RecursiveFindBD(chdisk))
1341

    
1342
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1343

    
1344

    
1345
def FindBlockDevice(disk):
1346
  """Check if a device is activated.
1347

1348
  If it is, return informations about the real device.
1349

1350
  @type disk: L{objects.Disk}
1351
  @param disk: the disk to find
1352
  @rtype: None or tuple
1353
  @return: None if the disk cannot be found, otherwise a
1354
      tuple (device_path, major, minor, sync_percent,
1355
      estimated_time, is_degraded)
1356

1357
  """
1358
  rbd = _RecursiveFindBD(disk)
1359
  if rbd is None:
1360
    return rbd
1361
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1362

    
1363

    
1364
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1365
  """Write a file to the filesystem.
1366

1367
  This allows the master to overwrite(!) a file. It will only perform
1368
  the operation if the file belongs to a list of configuration files.
1369

1370
  @type file_name: str
1371
  @param file_name: the target file name
1372
  @type data: str
1373
  @param data: the new contents of the file
1374
  @type mode: int
1375
  @param mode: the mode to give the file (can be None)
1376
  @type uid: int
1377
  @param uid: the owner of the file (can be -1 for default)
1378
  @type gid: int
1379
  @param gid: the group of the file (can be -1 for default)
1380
  @type atime: float
1381
  @param atime: the atime to set on the file (can be None)
1382
  @type mtime: float
1383
  @param mtime: the mtime to set on the file (can be None)
1384
  @rtype: boolean
1385
  @return: the success of the operation; errors are logged
1386
      in the node daemon log
1387

1388
  """
1389
  if not os.path.isabs(file_name):
1390
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1391
                  file_name)
1392
    return False
1393

    
1394
  allowed_files = [
1395
    constants.CLUSTER_CONF_FILE,
1396
    constants.ETC_HOSTS,
1397
    constants.SSH_KNOWN_HOSTS_FILE,
1398
    constants.VNC_PASSWORD_FILE,
1399
    ]
1400

    
1401
  if file_name not in allowed_files:
1402
    logging.error("Filename passed to UploadFile not in allowed"
1403
                 " upload targets: '%s'", file_name)
1404
    return False
1405

    
1406
  raw_data = _Decompress(data)
1407

    
1408
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1409
                  atime=atime, mtime=mtime)
1410
  return True
1411

    
1412

    
1413
def WriteSsconfFiles(values):
1414
  """Update all ssconf files.
1415

1416
  Wrapper around the SimpleStore.WriteFiles.
1417

1418
  """
1419
  ssconf.SimpleStore().WriteFiles(values)
1420

    
1421

    
1422
def _ErrnoOrStr(err):
1423
  """Format an EnvironmentError exception.
1424

1425
  If the L{err} argument has an errno attribute, it will be looked up
1426
  and converted into a textual C{E...} description. Otherwise the
1427
  string representation of the error will be returned.
1428

1429
  @type err: L{EnvironmentError}
1430
  @param err: the exception to format
1431

1432
  """
1433
  if hasattr(err, 'errno'):
1434
    detail = errno.errorcode[err.errno]
1435
  else:
1436
    detail = str(err)
1437
  return detail
1438

    
1439

    
1440
def _OSOndiskVersion(name, os_dir):
1441
  """Compute and return the API version of a given OS.
1442

1443
  This function will try to read the API version of the OS given by
1444
  the 'name' parameter and residing in the 'os_dir' directory.
1445

1446
  @type name: str
1447
  @param name: the OS name we should look for
1448
  @type os_dir: str
1449
  @param os_dir: the directory inwhich we should look for the OS
1450
  @rtype: int or None
1451
  @return:
1452
      Either an integer denoting the version or None in the
1453
      case when this is not a valid OS name.
1454
  @raise errors.InvalidOS: if the OS cannot be found
1455

1456
  """
1457
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1458

    
1459
  try:
1460
    st = os.stat(api_file)
1461
  except EnvironmentError, err:
1462
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1463
                           " found (%s)" % _ErrnoOrStr(err))
1464

    
1465
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1466
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1467
                           " a regular file")
1468

    
1469
  try:
1470
    f = open(api_file)
1471
    try:
1472
      api_versions = f.readlines()
1473
    finally:
1474
      f.close()
1475
  except EnvironmentError, err:
1476
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1477
                           " API version (%s)" % _ErrnoOrStr(err))
1478

    
1479
  api_versions = [version.strip() for version in api_versions]
1480
  try:
1481
    api_versions = [int(version) for version in api_versions]
1482
  except (TypeError, ValueError), err:
1483
    raise errors.InvalidOS(name, os_dir,
1484
                           "API version is not integer (%s)" % str(err))
1485

    
1486
  return api_versions
1487

    
1488

    
1489
def DiagnoseOS(top_dirs=None):
1490
  """Compute the validity for all OSes.
1491

1492
  @type top_dirs: list
1493
  @param top_dirs: the list of directories in which to
1494
      search (if not given defaults to
1495
      L{constants.OS_SEARCH_PATH})
1496
  @rtype: list of L{objects.OS}
1497
  @return: an OS object for each name in all the given
1498
      directories
1499

1500
  """
1501
  if top_dirs is None:
1502
    top_dirs = constants.OS_SEARCH_PATH
1503

    
1504
  result = []
1505
  for dir_name in top_dirs:
1506
    if os.path.isdir(dir_name):
1507
      try:
1508
        f_names = utils.ListVisibleFiles(dir_name)
1509
      except EnvironmentError, err:
1510
        logging.exception("Can't list the OS directory %s", dir_name)
1511
        break
1512
      for name in f_names:
1513
        try:
1514
          os_inst = OSFromDisk(name, base_dir=dir_name)
1515
          result.append(os_inst)
1516
        except errors.InvalidOS, err:
1517
          result.append(objects.OS.FromInvalidOS(err))
1518

    
1519
  return result
1520

    
1521

    
1522
def OSFromDisk(name, base_dir=None):
1523
  """Create an OS instance from disk.
1524

1525
  This function will return an OS instance if the given name is a
1526
  valid OS name. Otherwise, it will raise an appropriate
1527
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1528

1529
  @type base_dir: string
1530
  @keyword base_dir: Base directory containing OS installations.
1531
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1532
  @rtype: L{objects.OS}
1533
  @return: the OS instance if we find a valid one
1534
  @raise errors.InvalidOS: if we don't find a valid OS
1535

1536
  """
1537
  if base_dir is None:
1538
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1539
    if os_dir is None:
1540
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1541
  else:
1542
    os_dir = os.path.sep.join([base_dir, name])
1543

    
1544
  api_versions = _OSOndiskVersion(name, os_dir)
1545

    
1546
  if constants.OS_API_VERSION not in api_versions:
1547
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1548
                           " (found %s want %s)"
1549
                           % (api_versions, constants.OS_API_VERSION))
1550

    
1551
  # OS Scripts dictionary, we will populate it with the actual script names
1552
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1553

    
1554
  for script in os_scripts:
1555
    os_scripts[script] = os.path.sep.join([os_dir, script])
1556

    
1557
    try:
1558
      st = os.stat(os_scripts[script])
1559
    except EnvironmentError, err:
1560
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1561
                             (script, _ErrnoOrStr(err)))
1562

    
1563
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1564
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1565
                             script)
1566

    
1567
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1568
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1569
                             script)
1570

    
1571

    
1572
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1573
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1574
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1575
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1576
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1577
                    api_versions=api_versions)
1578

    
1579
def OSEnvironment(instance, debug=0):
1580
  """Calculate the environment for an os script.
1581

1582
  @type instance: L{objects.Instance}
1583
  @param instance: target instance for the os script run
1584
  @type debug: integer
1585
  @param debug: debug level (0 or 1, for OS Api 10)
1586
  @rtype: dict
1587
  @return: dict of environment variables
1588
  @raise errors.BlockDeviceError: if the block device
1589
      cannot be found
1590

1591
  """
1592
  result = {}
1593
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1594
  result['INSTANCE_NAME'] = instance.name
1595
  result['HYPERVISOR'] = instance.hypervisor
1596
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1597
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1598
  result['DEBUG_LEVEL'] = '%d' % debug
1599
  for idx, disk in enumerate(instance.disks):
1600
    real_disk = _RecursiveFindBD(disk)
1601
    if real_disk is None:
1602
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1603
                                    str(disk))
1604
    real_disk.Open()
1605
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1606
    # FIXME: When disks will have read-only mode, populate this
1607
    result['DISK_%d_ACCESS' % idx] = 'W'
1608
    if constants.HV_DISK_TYPE in instance.hvparams:
1609
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1610
        instance.hvparams[constants.HV_DISK_TYPE]
1611
    if disk.dev_type in constants.LDS_BLOCK:
1612
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1613
    elif disk.dev_type == constants.LD_FILE:
1614
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1615
        'file:%s' % disk.physical_id[0]
1616
  for idx, nic in enumerate(instance.nics):
1617
    result['NIC_%d_MAC' % idx] = nic.mac
1618
    if nic.ip:
1619
      result['NIC_%d_IP' % idx] = nic.ip
1620
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1621
    if constants.HV_NIC_TYPE in instance.hvparams:
1622
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1623
        instance.hvparams[constants.HV_NIC_TYPE]
1624

    
1625
  return result
1626

    
1627
def GrowBlockDevice(disk, amount):
1628
  """Grow a stack of block devices.
1629

1630
  This function is called recursively, with the childrens being the
1631
  first ones to resize.
1632

1633
  @type disk: L{objects.Disk}
1634
  @param disk: the disk to be grown
1635
  @rtype: (status, result)
1636
  @return: a tuple with the status of the operation
1637
      (True/False), and the errors message if status
1638
      is False
1639

1640
  """
1641
  r_dev = _RecursiveFindBD(disk)
1642
  if r_dev is None:
1643
    return False, "Cannot find block device %s" % (disk,)
1644

    
1645
  try:
1646
    r_dev.Grow(amount)
1647
  except errors.BlockDeviceError, err:
1648
    return False, str(err)
1649

    
1650
  return True, None
1651

    
1652

    
1653
def SnapshotBlockDevice(disk):
1654
  """Create a snapshot copy of a block device.
1655

1656
  This function is called recursively, and the snapshot is actually created
1657
  just for the leaf lvm backend device.
1658

1659
  @type disk: L{objects.Disk}
1660
  @param disk: the disk to be snapshotted
1661
  @rtype: string
1662
  @return: snapshot disk path
1663

1664
  """
1665
  if disk.children:
1666
    if len(disk.children) == 1:
1667
      # only one child, let's recurse on it
1668
      return SnapshotBlockDevice(disk.children[0])
1669
    else:
1670
      # more than one child, choose one that matches
1671
      for child in disk.children:
1672
        if child.size == disk.size:
1673
          # return implies breaking the loop
1674
          return SnapshotBlockDevice(child)
1675
  elif disk.dev_type == constants.LD_LV:
1676
    r_dev = _RecursiveFindBD(disk)
1677
    if r_dev is not None:
1678
      # let's stay on the safe side and ask for the full size, for now
1679
      return r_dev.Snapshot(disk.size)
1680
    else:
1681
      return None
1682
  else:
1683
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1684
                                 " '%s' of type '%s'" %
1685
                                 (disk.unique_id, disk.dev_type))
1686

    
1687

    
1688
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1689
  """Export a block device snapshot to a remote node.
1690

1691
  @type disk: L{objects.Disk}
1692
  @param disk: the description of the disk to export
1693
  @type dest_node: str
1694
  @param dest_node: the destination node to export to
1695
  @type instance: L{objects.Instance}
1696
  @param instance: the instance object to whom the disk belongs
1697
  @type cluster_name: str
1698
  @param cluster_name: the cluster name, needed for SSH hostalias
1699
  @type idx: int
1700
  @param idx: the index of the disk in the instance's disk list,
1701
      used to export to the OS scripts environment
1702
  @rtype: boolean
1703
  @return: the success of the operation
1704

1705
  """
1706
  export_env = OSEnvironment(instance)
1707

    
1708
  inst_os = OSFromDisk(instance.os)
1709
  export_script = inst_os.export_script
1710

    
1711
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1712
                                     instance.name, int(time.time()))
1713
  if not os.path.exists(constants.LOG_OS_DIR):
1714
    os.mkdir(constants.LOG_OS_DIR, 0750)
1715
  real_disk = _RecursiveFindBD(disk)
1716
  if real_disk is None:
1717
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1718
                                  str(disk))
1719
  real_disk.Open()
1720

    
1721
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1722
  export_env['EXPORT_INDEX'] = str(idx)
1723

    
1724
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1725
  destfile = disk.physical_id[1]
1726

    
1727
  # the target command is built out of three individual commands,
1728
  # which are joined by pipes; we check each individual command for
1729
  # valid parameters
1730
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1731
                               export_script, logfile)
1732

    
1733
  comprcmd = "gzip"
1734

    
1735
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1736
                                destdir, destdir, destfile)
1737
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1738
                                                   constants.GANETI_RUNAS,
1739
                                                   destcmd)
1740

    
1741
  # all commands have been checked, so we're safe to combine them
1742
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1743

    
1744
  result = utils.RunCmd(command, env=export_env)
1745

    
1746
  if result.failed:
1747
    logging.error("os snapshot export command '%s' returned error: %s"
1748
                  " output: %s", command, result.fail_reason, result.output)
1749
    return False
1750

    
1751
  return True
1752

    
1753

    
1754
def FinalizeExport(instance, snap_disks):
1755
  """Write out the export configuration information.
1756

1757
  @type instance: L{objects.Instance}
1758
  @param instance: the instance which we export, used for
1759
      saving configuration
1760
  @type snap_disks: list of L{objects.Disk}
1761
  @param snap_disks: list of snapshot block devices, which
1762
      will be used to get the actual name of the dump file
1763

1764
  @rtype: boolean
1765
  @return: the success of the operation
1766

1767
  """
1768
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1769
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1770

    
1771
  config = objects.SerializableConfigParser()
1772

    
1773
  config.add_section(constants.INISECT_EXP)
1774
  config.set(constants.INISECT_EXP, 'version', '0')
1775
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1776
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1777
  config.set(constants.INISECT_EXP, 'os', instance.os)
1778
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1779

    
1780
  config.add_section(constants.INISECT_INS)
1781
  config.set(constants.INISECT_INS, 'name', instance.name)
1782
  config.set(constants.INISECT_INS, 'memory', '%d' %
1783
             instance.beparams[constants.BE_MEMORY])
1784
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1785
             instance.beparams[constants.BE_VCPUS])
1786
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1787

    
1788
  nic_total = 0
1789
  for nic_count, nic in enumerate(instance.nics):
1790
    nic_total += 1
1791
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1792
               nic_count, '%s' % nic.mac)
1793
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1794
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1795
               '%s' % nic.bridge)
1796
  # TODO: redundant: on load can read nics until it doesn't exist
1797
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1798

    
1799
  disk_total = 0
1800
  for disk_count, disk in enumerate(snap_disks):
1801
    if disk:
1802
      disk_total += 1
1803
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1804
                 ('%s' % disk.iv_name))
1805
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1806
                 ('%s' % disk.physical_id[1]))
1807
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1808
                 ('%d' % disk.size))
1809

    
1810
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1811

    
1812
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1813
                  data=config.Dumps())
1814
  shutil.rmtree(finaldestdir, True)
1815
  shutil.move(destdir, finaldestdir)
1816

    
1817
  return True
1818

    
1819

    
1820
def ExportInfo(dest):
1821
  """Get export configuration information.
1822

1823
  @type dest: str
1824
  @param dest: directory containing the export
1825

1826
  @rtype: L{objects.SerializableConfigParser}
1827
  @return: a serializable config file containing the
1828
      export info
1829

1830
  """
1831
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1832

    
1833
  config = objects.SerializableConfigParser()
1834
  config.read(cff)
1835

    
1836
  if (not config.has_section(constants.INISECT_EXP) or
1837
      not config.has_section(constants.INISECT_INS)):
1838
    return None
1839

    
1840
  return config
1841

    
1842

    
1843
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1844
  """Import an os image into an instance.
1845

1846
  @type instance: L{objects.Instance}
1847
  @param instance: instance to import the disks into
1848
  @type src_node: string
1849
  @param src_node: source node for the disk images
1850
  @type src_images: list of string
1851
  @param src_images: absolute paths of the disk images
1852
  @rtype: list of boolean
1853
  @return: each boolean represent the success of importing the n-th disk
1854

1855
  """
1856
  import_env = OSEnvironment(instance)
1857
  inst_os = OSFromDisk(instance.os)
1858
  import_script = inst_os.import_script
1859

    
1860
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1861
                                        instance.name, int(time.time()))
1862
  if not os.path.exists(constants.LOG_OS_DIR):
1863
    os.mkdir(constants.LOG_OS_DIR, 0750)
1864

    
1865
  comprcmd = "gunzip"
1866
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1867
                               import_script, logfile)
1868

    
1869
  final_result = []
1870
  for idx, image in enumerate(src_images):
1871
    if image:
1872
      destcmd = utils.BuildShellCmd('cat %s', image)
1873
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1874
                                                       constants.GANETI_RUNAS,
1875
                                                       destcmd)
1876
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1877
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1878
      import_env['IMPORT_INDEX'] = str(idx)
1879
      result = utils.RunCmd(command, env=import_env)
1880
      if result.failed:
1881
        logging.error("Disk import command '%s' returned error: %s"
1882
                      " output: %s", command, result.fail_reason,
1883
                      result.output)
1884
        final_result.append(False)
1885
      else:
1886
        final_result.append(True)
1887
    else:
1888
      final_result.append(True)
1889

    
1890
  return final_result
1891

    
1892

    
1893
def ListExports():
1894
  """Return a list of exports currently available on this machine.
1895

1896
  @rtype: list
1897
  @return: list of the exports
1898

1899
  """
1900
  if os.path.isdir(constants.EXPORT_DIR):
1901
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1902
  else:
1903
    return []
1904

    
1905

    
1906
def RemoveExport(export):
1907
  """Remove an existing export from the node.
1908

1909
  @type export: str
1910
  @param export: the name of the export to remove
1911
  @rtype: boolean
1912
  @return: the success of the operation
1913

1914
  """
1915
  target = os.path.join(constants.EXPORT_DIR, export)
1916

    
1917
  shutil.rmtree(target)
1918
  # TODO: catch some of the relevant exceptions and provide a pretty
1919
  # error message if rmtree fails.
1920

    
1921
  return True
1922

    
1923

    
1924
def RenameBlockDevices(devlist):
1925
  """Rename a list of block devices.
1926

1927
  @type devlist: list of tuples
1928
  @param devlist: list of tuples of the form  (disk,
1929
      new_logical_id, new_physical_id); disk is an
1930
      L{objects.Disk} object describing the current disk,
1931
      and new logical_id/physical_id is the name we
1932
      rename it to
1933
  @rtype: boolean
1934
  @return: True if all renames succeeded, False otherwise
1935

1936
  """
1937
  result = True
1938
  for disk, unique_id in devlist:
1939
    dev = _RecursiveFindBD(disk)
1940
    if dev is None:
1941
      result = False
1942
      continue
1943
    try:
1944
      old_rpath = dev.dev_path
1945
      dev.Rename(unique_id)
1946
      new_rpath = dev.dev_path
1947
      if old_rpath != new_rpath:
1948
        DevCacheManager.RemoveCache(old_rpath)
1949
        # FIXME: we should add the new cache information here, like:
1950
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1951
        # but we don't have the owner here - maybe parse from existing
1952
        # cache? for now, we only lose lvm data when we rename, which
1953
        # is less critical than DRBD or MD
1954
    except errors.BlockDeviceError, err:
1955
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1956
      result = False
1957
  return result
1958

    
1959

    
1960
def _TransformFileStorageDir(file_storage_dir):
1961
  """Checks whether given file_storage_dir is valid.
1962

1963
  Checks wheter the given file_storage_dir is within the cluster-wide
1964
  default file_storage_dir stored in SimpleStore. Only paths under that
1965
  directory are allowed.
1966

1967
  @type file_storage_dir: str
1968
  @param file_storage_dir: the path to check
1969

1970
  @return: the normalized path if valid, None otherwise
1971

1972
  """
1973
  cfg = _GetConfig()
1974
  file_storage_dir = os.path.normpath(file_storage_dir)
1975
  base_file_storage_dir = cfg.GetFileStorageDir()
1976
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1977
      base_file_storage_dir):
1978
    logging.error("file storage directory '%s' is not under base file"
1979
                  " storage directory '%s'",
1980
                  file_storage_dir, base_file_storage_dir)
1981
    return None
1982
  return file_storage_dir
1983

    
1984

    
1985
def CreateFileStorageDir(file_storage_dir):
1986
  """Create file storage directory.
1987

1988
  @type file_storage_dir: str
1989
  @param file_storage_dir: directory to create
1990

1991
  @rtype: tuple
1992
  @return: tuple with first element a boolean indicating wheter dir
1993
      creation was successful or not
1994

1995
  """
1996
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1997
  result = True,
1998
  if not file_storage_dir:
1999
    result = False,
2000
  else:
2001
    if os.path.exists(file_storage_dir):
2002
      if not os.path.isdir(file_storage_dir):
2003
        logging.error("'%s' is not a directory", file_storage_dir)
2004
        result = False,
2005
    else:
2006
      try:
2007
        os.makedirs(file_storage_dir, 0750)
2008
      except OSError, err:
2009
        logging.error("Cannot create file storage directory '%s': %s",
2010
                      file_storage_dir, err)
2011
        result = False,
2012
  return result
2013

    
2014

    
2015
def RemoveFileStorageDir(file_storage_dir):
2016
  """Remove file storage directory.
2017

2018
  Remove it only if it's empty. If not log an error and return.
2019

2020
  @type file_storage_dir: str
2021
  @param file_storage_dir: the directory we should cleanup
2022
  @rtype: tuple (success,)
2023
  @return: tuple of one element, C{success}, denoting
2024
      whether the operation was successfull
2025

2026
  """
2027
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2028
  result = True,
2029
  if not file_storage_dir:
2030
    result = False,
2031
  else:
2032
    if os.path.exists(file_storage_dir):
2033
      if not os.path.isdir(file_storage_dir):
2034
        logging.error("'%s' is not a directory", file_storage_dir)
2035
        result = False,
2036
      # deletes dir only if empty, otherwise we want to return False
2037
      try:
2038
        os.rmdir(file_storage_dir)
2039
      except OSError, err:
2040
        logging.exception("Cannot remove file storage directory '%s'",
2041
                          file_storage_dir)
2042
        result = False,
2043
  return result
2044

    
2045

    
2046
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2047
  """Rename the file storage directory.
2048

2049
  @type old_file_storage_dir: str
2050
  @param old_file_storage_dir: the current path
2051
  @type new_file_storage_dir: str
2052
  @param new_file_storage_dir: the name we should rename to
2053
  @rtype: tuple (success,)
2054
  @return: tuple of one element, C{success}, denoting
2055
      whether the operation was successful
2056

2057
  """
2058
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2059
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2060
  result = True,
2061
  if not old_file_storage_dir or not new_file_storage_dir:
2062
    result = False,
2063
  else:
2064
    if not os.path.exists(new_file_storage_dir):
2065
      if os.path.isdir(old_file_storage_dir):
2066
        try:
2067
          os.rename(old_file_storage_dir, new_file_storage_dir)
2068
        except OSError, err:
2069
          logging.exception("Cannot rename '%s' to '%s'",
2070
                            old_file_storage_dir, new_file_storage_dir)
2071
          result =  False,
2072
      else:
2073
        logging.error("'%s' is not a directory", old_file_storage_dir)
2074
        result = False,
2075
    else:
2076
      if os.path.exists(old_file_storage_dir):
2077
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2078
                      old_file_storage_dir, new_file_storage_dir)
2079
        result = False,
2080
  return result
2081

    
2082

    
2083
def _IsJobQueueFile(file_name):
2084
  """Checks whether the given filename is in the queue directory.
2085

2086
  @type file_name: str
2087
  @param file_name: the file name we should check
2088
  @rtype: boolean
2089
  @return: whether the file is under the queue directory
2090

2091
  """
2092
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2093
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2094

    
2095
  if not result:
2096
    logging.error("'%s' is not a file in the queue directory",
2097
                  file_name)
2098

    
2099
  return result
2100

    
2101

    
2102
def JobQueueUpdate(file_name, content):
2103
  """Updates a file in the queue directory.
2104

2105
  This is just a wrapper over L{utils.WriteFile}, with proper
2106
  checking.
2107

2108
  @type file_name: str
2109
  @param file_name: the job file name
2110
  @type content: str
2111
  @param content: the new job contents
2112
  @rtype: boolean
2113
  @return: the success of the operation
2114

2115
  """
2116
  if not _IsJobQueueFile(file_name):
2117
    return False
2118

    
2119
  # Write and replace the file atomically
2120
  utils.WriteFile(file_name, data=_Decompress(content))
2121

    
2122
  return True
2123

    
2124

    
2125
def JobQueueRename(old, new):
2126
  """Renames a job queue file.
2127

2128
  This is just a wrapper over os.rename with proper checking.
2129

2130
  @type old: str
2131
  @param old: the old (actual) file name
2132
  @type new: str
2133
  @param new: the desired file name
2134
  @rtype: boolean
2135
  @return: the success of the operation
2136

2137
  """
2138
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2139
    return False
2140

    
2141
  utils.RenameFile(old, new, mkdir=True)
2142

    
2143
  return True
2144

    
2145

    
2146
def JobQueueSetDrainFlag(drain_flag):
2147
  """Set the drain flag for the queue.
2148

2149
  This will set or unset the queue drain flag.
2150

2151
  @type drain_flag: boolean
2152
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2153
  @rtype: boolean
2154
  @return: always True
2155
  @warning: the function always returns True
2156

2157
  """
2158
  if drain_flag:
2159
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2160
  else:
2161
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2162

    
2163
  return True
2164

    
2165

    
2166
def CloseBlockDevices(instance_name, disks):
2167
  """Closes the given block devices.
2168

2169
  This means they will be switched to secondary mode (in case of
2170
  DRBD).
2171

2172
  @param instance_name: if the argument is not empty, the symlinks
2173
      of this instance will be removed
2174
  @type disks: list of L{objects.Disk}
2175
  @param disks: the list of disks to be closed
2176
  @rtype: tuple (success, message)
2177
  @return: a tuple of success and message, where success
2178
      indicates the succes of the operation, and message
2179
      which will contain the error details in case we
2180
      failed
2181

2182
  """
2183
  bdevs = []
2184
  for cf in disks:
2185
    rd = _RecursiveFindBD(cf)
2186
    if rd is None:
2187
      return (False, "Can't find device %s" % cf)
2188
    bdevs.append(rd)
2189

    
2190
  msg = []
2191
  for rd in bdevs:
2192
    try:
2193
      rd.Close()
2194
    except errors.BlockDeviceError, err:
2195
      msg.append(str(err))
2196
  if msg:
2197
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2198
  else:
2199
    if instance_name:
2200
      _RemoveBlockDevLinks(instance_name, disks)
2201
    return (True, "All devices secondary")
2202

    
2203

    
2204
def ValidateHVParams(hvname, hvparams):
2205
  """Validates the given hypervisor parameters.
2206

2207
  @type hvname: string
2208
  @param hvname: the hypervisor name
2209
  @type hvparams: dict
2210
  @param hvparams: the hypervisor parameters to be validated
2211
  @rtype: tuple (success, message)
2212
  @return: a tuple of success and message, where success
2213
      indicates the succes of the operation, and message
2214
      which will contain the error details in case we
2215
      failed
2216

2217
  """
2218
  try:
2219
    hv_type = hypervisor.GetHypervisor(hvname)
2220
    hv_type.ValidateParameters(hvparams)
2221
    return (True, "Validation passed")
2222
  except errors.HypervisorError, err:
2223
    return (False, str(err))
2224

    
2225

    
2226
def DemoteFromMC():
2227
  """Demotes the current node from master candidate role.
2228

2229
  """
2230
  # try to ensure we're not the master by mistake
2231
  master, myself = ssconf.GetMasterAndMyself()
2232
  if master == myself:
2233
    return (False, "ssconf status shows I'm the master node, will not demote")
2234
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2235
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2236
    return (False, "The master daemon is running, will not demote")
2237
  try:
2238
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2239
  except EnvironmentError, err:
2240
    if err.errno != errno.ENOENT:
2241
      return (False, "Error while backing up cluster file: %s" % str(err))
2242
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2243
  return (True, "Done")
2244

    
2245

    
2246
def _FindDisks(nodes_ip, disks):
2247
  """Sets the physical ID on disks and returns the block devices.
2248

2249
  """
2250
  # set the correct physical ID
2251
  my_name = utils.HostInfo().name
2252
  for cf in disks:
2253
    cf.SetPhysicalID(my_name, nodes_ip)
2254

    
2255
  bdevs = []
2256

    
2257
  for cf in disks:
2258
    rd = _RecursiveFindBD(cf)
2259
    if rd is None:
2260
      return (False, "Can't find device %s" % cf)
2261
    bdevs.append(rd)
2262
  return (True, bdevs)
2263

    
2264

    
2265
def DrbdDisconnectNet(nodes_ip, disks):
2266
  """Disconnects the network on a list of drbd devices.
2267

2268
  """
2269
  status, bdevs = _FindDisks(nodes_ip, disks)
2270
  if not status:
2271
    return status, bdevs
2272

    
2273
  # disconnect disks
2274
  for rd in bdevs:
2275
    try:
2276
      rd.DisconnectNet()
2277
    except errors.BlockDeviceError, err:
2278
      logging.exception("Failed to go into standalone mode")
2279
      return (False, "Can't change network configuration: %s" % str(err))
2280
  return (True, "All disks are now disconnected")
2281

    
2282

    
2283
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2284
  """Attaches the network on a list of drbd devices.
2285

2286
  """
2287
  status, bdevs = _FindDisks(nodes_ip, disks)
2288
  if not status:
2289
    return status, bdevs
2290

    
2291
  if multimaster:
2292
    for idx, rd in enumerate(bdevs):
2293
      try:
2294
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2295
      except EnvironmentError, err:
2296
        return (False, "Can't create symlink: %s" % str(err))
2297
  # reconnect disks, switch to new master configuration and if
2298
  # needed primary mode
2299
  for rd in bdevs:
2300
    try:
2301
      rd.AttachNet(multimaster)
2302
    except errors.BlockDeviceError, err:
2303
      return (False, "Can't change network configuration: %s" % str(err))
2304
  # wait until the disks are connected; we need to retry the re-attach
2305
  # if the device becomes standalone, as this might happen if the one
2306
  # node disconnects and reconnects in a different mode before the
2307
  # other node reconnects; in this case, one or both of the nodes will
2308
  # decide it has wrong configuration and switch to standalone
2309
  RECONNECT_TIMEOUT = 2 * 60
2310
  sleep_time = 0.100 # start with 100 miliseconds
2311
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2312
  while time.time() < timeout_limit:
2313
    all_connected = True
2314
    for rd in bdevs:
2315
      stats = rd.GetProcStatus()
2316
      if not (stats.is_connected or stats.is_in_resync):
2317
        all_connected = False
2318
      if stats.is_standalone:
2319
        # peer had different config info and this node became
2320
        # standalone, even though this should not happen with the
2321
        # new staged way of changing disk configs
2322
        try:
2323
          rd.ReAttachNet(multimaster)
2324
        except errors.BlockDeviceError, err:
2325
          return (False, "Can't change network configuration: %s" % str(err))
2326
    if all_connected:
2327
      break
2328
    time.sleep(sleep_time)
2329
    sleep_time = min(5, sleep_time * 1.5)
2330
  if not all_connected:
2331
    return (False, "Timeout in disk reconnecting")
2332
  if multimaster:
2333
    # change to primary mode
2334
    for rd in bdevs:
2335
      rd.Open()
2336
  if multimaster:
2337
    msg = "multi-master and primary"
2338
  else:
2339
    msg = "single-master"
2340
  return (True, "Disks are now configured as %s" % msg)
2341

    
2342

    
2343
def DrbdWaitSync(nodes_ip, disks):
2344
  """Wait until DRBDs have synchronized.
2345

2346
  """
2347
  status, bdevs = _FindDisks(nodes_ip, disks)
2348
  if not status:
2349
    return status, bdevs
2350

    
2351
  min_resync = 100
2352
  alldone = True
2353
  failure = False
2354
  for rd in bdevs:
2355
    stats = rd.GetProcStatus()
2356
    if not (stats.is_connected or stats.is_in_resync):
2357
      failure = True
2358
      break
2359
    alldone = alldone and (not stats.is_in_resync)
2360
    if stats.sync_percent is not None:
2361
      min_resync = min(min_resync, stats.sync_percent)
2362
  return (not failure, (alldone, min_resync))
2363

    
2364

    
2365
class HooksRunner(object):
2366
  """Hook runner.
2367

2368
  This class is instantiated on the node side (ganeti-noded) and not
2369
  on the master side.
2370

2371
  """
2372
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2373

    
2374
  def __init__(self, hooks_base_dir=None):
2375
    """Constructor for hooks runner.
2376

2377
    @type hooks_base_dir: str or None
2378
    @param hooks_base_dir: if not None, this overrides the
2379
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2380

2381
    """
2382
    if hooks_base_dir is None:
2383
      hooks_base_dir = constants.HOOKS_BASE_DIR
2384
    self._BASE_DIR = hooks_base_dir
2385

    
2386
  @staticmethod
2387
  def ExecHook(script, env):
2388
    """Exec one hook script.
2389

2390
    @type script: str
2391
    @param script: the full path to the script
2392
    @type env: dict
2393
    @param env: the environment with which to exec the script
2394
    @rtype: tuple (success, message)
2395
    @return: a tuple of success and message, where success
2396
        indicates the succes of the operation, and message
2397
        which will contain the error details in case we
2398
        failed
2399

2400
    """
2401
    # exec the process using subprocess and log the output
2402
    fdstdin = None
2403
    try:
2404
      fdstdin = open("/dev/null", "r")
2405
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2406
                               stderr=subprocess.STDOUT, close_fds=True,
2407
                               shell=False, cwd="/", env=env)
2408
      output = ""
2409
      try:
2410
        output = child.stdout.read(4096)
2411
        child.stdout.close()
2412
      except EnvironmentError, err:
2413
        output += "Hook script error: %s" % str(err)
2414

    
2415
      while True:
2416
        try:
2417
          result = child.wait()
2418
          break
2419
        except EnvironmentError, err:
2420
          if err.errno == errno.EINTR:
2421
            continue
2422
          raise
2423
    finally:
2424
      # try not to leak fds
2425
      for fd in (fdstdin, ):
2426
        if fd is not None:
2427
          try:
2428
            fd.close()
2429
          except EnvironmentError, err:
2430
            # just log the error
2431
            #logging.exception("Error while closing fd %s", fd)
2432
            pass
2433

    
2434
    return result == 0, output
2435

    
2436
  def RunHooks(self, hpath, phase, env):
2437
    """Run the scripts in the hooks directory.
2438

2439
    @type hpath: str
2440
    @param hpath: the path to the hooks directory which
2441
        holds the scripts
2442
    @type phase: str
2443
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2444
        L{constants.HOOKS_PHASE_POST}
2445
    @type env: dict
2446
    @param env: dictionary with the environment for the hook
2447
    @rtype: list
2448
    @return: list of 3-element tuples:
2449
      - script path
2450
      - script result, either L{constants.HKR_SUCCESS} or
2451
        L{constants.HKR_FAIL}
2452
      - output of the script
2453

2454
    @raise errors.ProgrammerError: for invalid input
2455
        parameters
2456

2457
    """
2458
    if phase == constants.HOOKS_PHASE_PRE:
2459
      suffix = "pre"
2460
    elif phase == constants.HOOKS_PHASE_POST:
2461
      suffix = "post"
2462
    else:
2463
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2464
    rr = []
2465

    
2466
    subdir = "%s-%s.d" % (hpath, suffix)
2467
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2468
    try:
2469
      dir_contents = utils.ListVisibleFiles(dir_name)
2470
    except OSError, err:
2471
      # FIXME: must log output in case of failures
2472
      return rr
2473

    
2474
    # we use the standard python sort order,
2475
    # so 00name is the recommended naming scheme
2476
    dir_contents.sort()
2477
    for relname in dir_contents:
2478
      fname = os.path.join(dir_name, relname)
2479
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2480
          self.RE_MASK.match(relname) is not None):
2481
        rrval = constants.HKR_SKIP
2482
        output = ""
2483
      else:
2484
        result, output = self.ExecHook(fname, env)
2485
        if not result:
2486
          rrval = constants.HKR_FAIL
2487
        else:
2488
          rrval = constants.HKR_SUCCESS
2489
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2490

    
2491
    return rr
2492

    
2493

    
2494
class IAllocatorRunner(object):
2495
  """IAllocator runner.
2496

2497
  This class is instantiated on the node side (ganeti-noded) and not on
2498
  the master side.
2499

2500
  """
2501
  def Run(self, name, idata):
2502
    """Run an iallocator script.
2503

2504
    @type name: str
2505
    @param name: the iallocator script name
2506
    @type idata: str
2507
    @param idata: the allocator input data
2508

2509
    @rtype: tuple
2510
    @return: four element tuple of:
2511
       - run status (one of the IARUN_ constants)
2512
       - stdout
2513
       - stderr
2514
       - fail reason (as from L{utils.RunResult})
2515

2516
    """
2517
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2518
                                  os.path.isfile)
2519
    if alloc_script is None:
2520
      return (constants.IARUN_NOTFOUND, None, None, None)
2521

    
2522
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2523
    try:
2524
      os.write(fd, idata)
2525
      os.close(fd)
2526
      result = utils.RunCmd([alloc_script, fin_name])
2527
      if result.failed:
2528
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2529
                result.fail_reason)
2530
    finally:
2531
      os.unlink(fin_name)
2532

    
2533
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2534

    
2535

    
2536
class DevCacheManager(object):
2537
  """Simple class for managing a cache of block device information.
2538

2539
  """
2540
  _DEV_PREFIX = "/dev/"
2541
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2542

    
2543
  @classmethod
2544
  def _ConvertPath(cls, dev_path):
2545
    """Converts a /dev/name path to the cache file name.
2546

2547
    This replaces slashes with underscores and strips the /dev
2548
    prefix. It then returns the full path to the cache file.
2549

2550
    @type dev_path: str
2551
    @param dev_path: the C{/dev/} path name
2552
    @rtype: str
2553
    @return: the converted path name
2554

2555
    """
2556
    if dev_path.startswith(cls._DEV_PREFIX):
2557
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2558
    dev_path = dev_path.replace("/", "_")
2559
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2560
    return fpath
2561

    
2562
  @classmethod
2563
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2564
    """Updates the cache information for a given device.
2565

2566
    @type dev_path: str
2567
    @param dev_path: the pathname of the device
2568
    @type owner: str
2569
    @param owner: the owner (instance name) of the device
2570
    @type on_primary: bool
2571
    @param on_primary: whether this is the primary
2572
        node nor not
2573
    @type iv_name: str
2574
    @param iv_name: the instance-visible name of the
2575
        device, as in objects.Disk.iv_name
2576

2577
    @rtype: None
2578

2579
    """
2580
    if dev_path is None:
2581
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2582
      return
2583
    fpath = cls._ConvertPath(dev_path)
2584
    if on_primary:
2585
      state = "primary"
2586
    else:
2587
      state = "secondary"
2588
    if iv_name is None:
2589
      iv_name = "not_visible"
2590
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2591
    try:
2592
      utils.WriteFile(fpath, data=fdata)
2593
    except EnvironmentError, err:
2594
      logging.exception("Can't update bdev cache for %s", dev_path)
2595

    
2596
  @classmethod
2597
  def RemoveCache(cls, dev_path):
2598
    """Remove data for a dev_path.
2599

2600
    This is just a wrapper over L{utils.RemoveFile} with a converted
2601
    path name and logging.
2602

2603
    @type dev_path: str
2604
    @param dev_path: the pathname of the device
2605

2606
    @rtype: None
2607

2608
    """
2609
    if dev_path is None:
2610
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2611
      return
2612
    fpath = cls._ConvertPath(dev_path)
2613
    try:
2614
      utils.RemoveFile(fpath)
2615
    except EnvironmentError, err:
2616
      logging.exception("Can't update bdev cache for %s", dev_path)