Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 6906a9d8

History | View | Annotate | Download (77.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=None):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100

101
  """
102
  if not os.path.isdir(path):
103
    return
104
  if exclude is None:
105
    exclude = []
106
  else:
107
    # Normalize excluded paths
108
    exclude = [os.path.normpath(i) for i in exclude]
109

    
110
  for rel_name in utils.ListVisibleFiles(path):
111
    full_name = os.path.normpath(os.path.join(path, rel_name))
112
    if full_name in exclude:
113
      continue
114
    if os.path.isfile(full_name) and not os.path.islink(full_name):
115
      utils.RemoveFile(full_name)
116

    
117

    
118
def JobQueuePurge():
119
  """Removes job queue files and archived jobs.
120

121
  @rtype: None
122

123
  """
124
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126

    
127

    
128
def GetMasterInfo():
129
  """Returns master information.
130

131
  This is an utility function to compute master information, either
132
  for consumption here or from the node daemon.
133

134
  @rtype: tuple
135
  @return: (master_netdev, master_ip, master_name) if we have a good
136
      configuration, otherwise (None, None, None)
137

138
  """
139
  try:
140
    cfg = _GetConfig()
141
    master_netdev = cfg.GetMasterNetdev()
142
    master_ip = cfg.GetMasterIP()
143
    master_node = cfg.GetMasterNode()
144
  except errors.ConfigurationError, err:
145
    logging.exception("Cluster configuration incomplete")
146
    return (None, None, None)
147
  return (master_netdev, master_ip, master_node)
148

    
149

    
150
def StartMaster(start_daemons):
151
  """Activate local node as master node.
152

153
  The function will always try activate the IP address of the master
154
  (unless someone else has it). It will also start the master daemons,
155
  based on the start_daemons parameter.
156

157
  @type start_daemons: boolean
158
  @param start_daemons: whther to also start the master
159
      daemons (ganeti-masterd and ganeti-rapi)
160
  @rtype: None
161

162
  """
163
  ok = True
164
  master_netdev, master_ip, _ = GetMasterInfo()
165
  if not master_netdev:
166
    return False
167

    
168
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169
    if utils.OwnIpAddress(master_ip):
170
      # we already have the ip:
171
      logging.debug("Already started")
172
    else:
173
      logging.error("Someone else has the master ip, not activating")
174
      ok = False
175
  else:
176
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177
                           "dev", master_netdev, "label",
178
                           "%s:0" % master_netdev])
179
    if result.failed:
180
      logging.error("Can't activate master IP: %s", result.output)
181
      ok = False
182

    
183
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184
                           "-s", master_ip, master_ip])
185
    # we'll ignore the exit code of arping
186

    
187
  # and now start the master and rapi daemons
188
  if start_daemons:
189
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
190
      result = utils.RunCmd([daemon])
191
      if result.failed:
192
        logging.error("Can't start daemon %s: %s", daemon, result.output)
193
        ok = False
194
  return ok
195

    
196

    
197
def StopMaster(stop_daemons):
198
  """Deactivate this node as master.
199

200
  The function will always try to deactivate the IP address of the
201
  master. It will also stop the master daemons depending on the
202
  stop_daemons parameter.
203

204
  @type stop_daemons: boolean
205
  @param stop_daemons: whether to also stop the master daemons
206
      (ganeti-masterd and ganeti-rapi)
207
  @rtype: None
208

209
  """
210
  master_netdev, master_ip, _ = GetMasterInfo()
211
  if not master_netdev:
212
    return False
213

    
214
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215
                         "dev", master_netdev])
216
  if result.failed:
217
    logging.error("Can't remove the master IP, error: %s", result.output)
218
    # but otherwise ignore the failure
219

    
220
  if stop_daemons:
221
    # stop/kill the rapi and the master daemon
222
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224

    
225
  return True
226

    
227

    
228
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229
  """Joins this node to the cluster.
230

231
  This does the following:
232
      - updates the hostkeys of the machine (rsa and dsa)
233
      - adds the ssh private key to the user
234
      - adds the ssh public key to the users' authorized_keys file
235

236
  @type dsa: str
237
  @param dsa: the DSA private key to write
238
  @type dsapub: str
239
  @param dsapub: the DSA public key to write
240
  @type rsa: str
241
  @param rsa: the RSA private key to write
242
  @type rsapub: str
243
  @param rsapub: the RSA public key to write
244
  @type sshkey: str
245
  @param sshkey: the SSH private key to write
246
  @type sshpub: str
247
  @param sshpub: the SSH public key to write
248
  @rtype: boolean
249
  @return: the success of the operation
250

251
  """
252
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256
  for name, content, mode in sshd_keys:
257
    utils.WriteFile(name, data=content, mode=mode)
258

    
259
  try:
260
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261
                                                    mkdir=True)
262
  except errors.OpExecError, err:
263
    logging.exception("Error while processing user ssh files")
264
    return False
265

    
266
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267
    utils.WriteFile(name, data=content, mode=0600)
268

    
269
  utils.AddAuthorizedKey(auth_keys, sshpub)
270

    
271
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272

    
273
  return True
274

    
275

    
276
def LeaveCluster():
277
  """Cleans up and remove the current node.
278

279
  This function cleans up and prepares the current node to be removed
280
  from the cluster.
281

282
  If processing is successful, then it raises an
283
  L{errors.QuitGanetiException} which is used as a special case to
284
  shutdown the node daemon.
285

286
  """
287
  _CleanDirectory(constants.DATA_DIR)
288
  JobQueuePurge()
289

    
290
  try:
291
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292
  except errors.OpExecError:
293
    logging.exception("Error while processing ssh files")
294
    return
295

    
296
  f = open(pub_key, 'r')
297
  try:
298
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299
  finally:
300
    f.close()
301

    
302
  utils.RemoveFile(priv_key)
303
  utils.RemoveFile(pub_key)
304

    
305
  # Return a reassuring string to the caller, and quit
306
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307

    
308

    
309
def GetNodeInfo(vgname, hypervisor_type):
310
  """Gives back a hash with different informations about the node.
311

312
  @type vgname: C{string}
313
  @param vgname: the name of the volume group to ask for disk space information
314
  @type hypervisor_type: C{str}
315
  @param hypervisor_type: the name of the hypervisor to ask for
316
      memory information
317
  @rtype: C{dict}
318
  @return: dictionary with the following keys:
319
      - vg_size is the size of the configured volume group in MiB
320
      - vg_free is the free size of the volume group in MiB
321
      - memory_dom0 is the memory allocated for domain0 in MiB
322
      - memory_free is the currently available (free) ram in MiB
323
      - memory_total is the total number of ram in MiB
324

325
  """
326
  outputarray = {}
327
  vginfo = _GetVGInfo(vgname)
328
  outputarray['vg_size'] = vginfo['vg_size']
329
  outputarray['vg_free'] = vginfo['vg_free']
330

    
331
  hyper = hypervisor.GetHypervisor(hypervisor_type)
332
  hyp_info = hyper.GetNodeInfo()
333
  if hyp_info is not None:
334
    outputarray.update(hyp_info)
335

    
336
  f = open("/proc/sys/kernel/random/boot_id", 'r')
337
  try:
338
    outputarray["bootid"] = f.read(128).rstrip("\n")
339
  finally:
340
    f.close()
341

    
342
  return outputarray
343

    
344

    
345
def VerifyNode(what, cluster_name):
346
  """Verify the status of the local node.
347

348
  Based on the input L{what} parameter, various checks are done on the
349
  local node.
350

351
  If the I{filelist} key is present, this list of
352
  files is checksummed and the file/checksum pairs are returned.
353

354
  If the I{nodelist} key is present, we check that we have
355
  connectivity via ssh with the target nodes (and check the hostname
356
  report).
357

358
  If the I{node-net-test} key is present, we check that we have
359
  connectivity to the given nodes via both primary IP and, if
360
  applicable, secondary IPs.
361

362
  @type what: C{dict}
363
  @param what: a dictionary of things to check:
364
      - filelist: list of files for which to compute checksums
365
      - nodelist: list of nodes we should check ssh communication with
366
      - node-net-test: list of nodes we should check node daemon port
367
        connectivity with
368
      - hypervisor: list with hypervisors to run the verify for
369
  @rtype: dict
370
  @return: a dictionary with the same keys as the input dict, and
371
      values representing the result of the checks
372

373
  """
374
  result = {}
375

    
376
  if constants.NV_HYPERVISOR in what:
377
    result[constants.NV_HYPERVISOR] = tmp = {}
378
    for hv_name in what[constants.NV_HYPERVISOR]:
379
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380

    
381
  if constants.NV_FILELIST in what:
382
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
383
      what[constants.NV_FILELIST])
384

    
385
  if constants.NV_NODELIST in what:
386
    result[constants.NV_NODELIST] = tmp = {}
387
    random.shuffle(what[constants.NV_NODELIST])
388
    for node in what[constants.NV_NODELIST]:
389
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390
      if not success:
391
        tmp[node] = message
392

    
393
  if constants.NV_NODENETTEST in what:
394
    result[constants.NV_NODENETTEST] = tmp = {}
395
    my_name = utils.HostInfo().name
396
    my_pip = my_sip = None
397
    for name, pip, sip in what[constants.NV_NODENETTEST]:
398
      if name == my_name:
399
        my_pip = pip
400
        my_sip = sip
401
        break
402
    if not my_pip:
403
      tmp[my_name] = ("Can't find my own primary/secondary IP"
404
                      " in the node list")
405
    else:
406
      port = utils.GetNodeDaemonPort()
407
      for name, pip, sip in what[constants.NV_NODENETTEST]:
408
        fail = []
409
        if not utils.TcpPing(pip, port, source=my_pip):
410
          fail.append("primary")
411
        if sip != pip:
412
          if not utils.TcpPing(sip, port, source=my_sip):
413
            fail.append("secondary")
414
        if fail:
415
          tmp[name] = ("failure using the %s interface(s)" %
416
                       " and ".join(fail))
417

    
418
  if constants.NV_LVLIST in what:
419
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420

    
421
  if constants.NV_INSTANCELIST in what:
422
    result[constants.NV_INSTANCELIST] = GetInstanceList(
423
      what[constants.NV_INSTANCELIST])
424

    
425
  if constants.NV_VGLIST in what:
426
    result[constants.NV_VGLIST] = ListVolumeGroups()
427

    
428
  if constants.NV_VERSION in what:
429
    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430

    
431
  if constants.NV_HVINFO in what:
432
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434

    
435
  if constants.NV_DRBDLIST in what:
436
    try:
437
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
438
    except errors.BlockDeviceErrors:
439
      logging.warning("Can't get used minors list", exc_info=True)
440
      used_minors = []
441
    result[constants.NV_DRBDLIST] = used_minors
442

    
443
  return result
444

    
445

    
446
def GetVolumeList(vg_name):
447
  """Compute list of logical volumes and their size.
448

449
  @type vg_name: str
450
  @param vg_name: the volume group whose LVs we should list
451
  @rtype: dict
452
  @return:
453
      dictionary of all partions (key) with value being a tuple of
454
      their size (in MiB), inactive and online status::
455

456
        {'test1': ('20.06', True, True)}
457

458
      in case of errors, a string is returned with the error
459
      details.
460

461
  """
462
  lvs = {}
463
  sep = '|'
464
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
465
                         "--separator=%s" % sep,
466
                         "-olv_name,lv_size,lv_attr", vg_name])
467
  if result.failed:
468
    logging.error("Failed to list logical volumes, lvs output: %s",
469
                  result.output)
470
    return result.output
471

    
472
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
473
  for line in result.stdout.splitlines():
474
    line = line.strip()
475
    match = valid_line_re.match(line)
476
    if not match:
477
      logging.error("Invalid line returned from lvs output: '%s'", line)
478
      continue
479
    name, size, attr = match.groups()
480
    inactive = attr[4] == '-'
481
    online = attr[5] == 'o'
482
    lvs[name] = (size, inactive, online)
483

    
484
  return lvs
485

    
486

    
487
def ListVolumeGroups():
488
  """List the volume groups and their size.
489

490
  @rtype: dict
491
  @return: dictionary with keys volume name and values the
492
      size of the volume
493

494
  """
495
  return utils.ListVolumeGroups()
496

    
497

    
498
def NodeVolumes():
499
  """List all volumes on this node.
500

501
  @rtype: list
502
  @return:
503
    A list of dictionaries, each having four keys:
504
      - name: the logical volume name,
505
      - size: the size of the logical volume
506
      - dev: the physical device on which the LV lives
507
      - vg: the volume group to which it belongs
508

509
    In case of errors, we return an empty list and log the
510
    error.
511

512
    Note that since a logical volume can live on multiple physical
513
    volumes, the resulting list might include a logical volume
514
    multiple times.
515

516
  """
517
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
518
                         "--separator=|",
519
                         "--options=lv_name,lv_size,devices,vg_name"])
520
  if result.failed:
521
    logging.error("Failed to list logical volumes, lvs output: %s",
522
                  result.output)
523
    return []
524

    
525
  def parse_dev(dev):
526
    if '(' in dev:
527
      return dev.split('(')[0]
528
    else:
529
      return dev
530

    
531
  def map_line(line):
532
    return {
533
      'name': line[0].strip(),
534
      'size': line[1].strip(),
535
      'dev': parse_dev(line[2].strip()),
536
      'vg': line[3].strip(),
537
    }
538

    
539
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
540
          if line.count('|') >= 3]
541

    
542

    
543
def BridgesExist(bridges_list):
544
  """Check if a list of bridges exist on the current node.
545

546
  @rtype: boolean
547
  @return: C{True} if all of them exist, C{False} otherwise
548

549
  """
550
  for bridge in bridges_list:
551
    if not utils.BridgeExists(bridge):
552
      return False
553

    
554
  return True
555

    
556

    
557
def GetInstanceList(hypervisor_list):
558
  """Provides a list of instances.
559

560
  @type hypervisor_list: list
561
  @param hypervisor_list: the list of hypervisors to query information
562

563
  @rtype: list
564
  @return: a list of all running instances on the current node
565
    - instance1.example.com
566
    - instance2.example.com
567

568
  """
569
  results = []
570
  for hname in hypervisor_list:
571
    try:
572
      names = hypervisor.GetHypervisor(hname).ListInstances()
573
      results.extend(names)
574
    except errors.HypervisorError, err:
575
      logging.exception("Error enumerating instances for hypevisor %s", hname)
576
      # FIXME: should we somehow not propagate this to the master?
577
      raise
578

    
579
  return results
580

    
581

    
582
def GetInstanceInfo(instance, hname):
583
  """Gives back the informations about an instance as a dictionary.
584

585
  @type instance: string
586
  @param instance: the instance name
587
  @type hname: string
588
  @param hname: the hypervisor type of the instance
589

590
  @rtype: dict
591
  @return: dictionary with the following keys:
592
      - memory: memory size of instance (int)
593
      - state: xen state of instance (string)
594
      - time: cpu time of instance (float)
595

596
  """
597
  output = {}
598

    
599
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
600
  if iinfo is not None:
601
    output['memory'] = iinfo[2]
602
    output['state'] = iinfo[4]
603
    output['time'] = iinfo[5]
604

    
605
  return output
606

    
607

    
608
def GetInstanceMigratable(instance):
609
  """Gives whether an instance can be migrated.
610

611
  @type instance: L{objects.Instance}
612
  @param instance: object representing the instance to be checked.
613

614
  @rtype: tuple
615
  @return: tuple of (result, description) where:
616
      - result: whether the instance can be migrated or not
617
      - description: a description of the issue, if relevant
618

619
  """
620
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
621
  if instance.name not in hyper.ListInstances():
622
    return (False, 'not running')
623

    
624
  for idx in range(len(instance.disks)):
625
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
626
    if not os.path.islink(link_name):
627
      return (False, 'not restarted since ganeti 1.2.5')
628

    
629
  return (True, '')
630

    
631

    
632
def GetAllInstancesInfo(hypervisor_list):
633
  """Gather data about all instances.
634

635
  This is the equivalent of L{GetInstanceInfo}, except that it
636
  computes data for all instances at once, thus being faster if one
637
  needs data about more than one instance.
638

639
  @type hypervisor_list: list
640
  @param hypervisor_list: list of hypervisors to query for instance data
641

642
  @rtype: dict
643
  @return: dictionary of instance: data, with data having the following keys:
644
      - memory: memory size of instance (int)
645
      - state: xen state of instance (string)
646
      - time: cpu time of instance (float)
647
      - vcpus: the number of vcpus
648

649
  """
650
  output = {}
651

    
652
  for hname in hypervisor_list:
653
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
654
    if iinfo:
655
      for name, inst_id, memory, vcpus, state, times in iinfo:
656
        value = {
657
          'memory': memory,
658
          'vcpus': vcpus,
659
          'state': state,
660
          'time': times,
661
          }
662
        if name in output and output[name] != value:
663
          raise errors.HypervisorError("Instance %s running duplicate"
664
                                       " with different parameters" % name)
665
        output[name] = value
666

    
667
  return output
668

    
669

    
670
def AddOSToInstance(instance):
671
  """Add an OS to an instance.
672

673
  @type instance: L{objects.Instance}
674
  @param instance: Instance whose OS is to be installed
675
  @rtype: boolean
676
  @return: the success of the operation
677

678
  """
679
  inst_os = OSFromDisk(instance.os)
680

    
681
  create_env = OSEnvironment(instance)
682

    
683
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
684
                                     instance.name, int(time.time()))
685

    
686
  result = utils.RunCmd([inst_os.create_script], env=create_env,
687
                        cwd=inst_os.path, output=logfile,)
688
  if result.failed:
689
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
690
                  " output: %s", result.cmd, result.fail_reason, logfile,
691
                  result.output)
692
    lines = [val.encode("string_escape")
693
             for val in utils.TailFile(logfile, lines=20)]
694
    return (False, "OS create script failed (%s), last lines in the"
695
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
696

    
697
  return (True, "Successfully installed")
698

    
699

    
700
def RunRenameInstance(instance, old_name):
701
  """Run the OS rename script for an instance.
702

703
  @type instance: L{objects.Instance}
704
  @param instance: Instance whose OS is to be installed
705
  @type old_name: string
706
  @param old_name: previous instance name
707
  @rtype: boolean
708
  @return: the success of the operation
709

710
  """
711
  inst_os = OSFromDisk(instance.os)
712

    
713
  rename_env = OSEnvironment(instance)
714
  rename_env['OLD_INSTANCE_NAME'] = old_name
715

    
716
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
717
                                           old_name,
718
                                           instance.name, int(time.time()))
719

    
720
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
721
                        cwd=inst_os.path, output=logfile)
722

    
723
  if result.failed:
724
    logging.error("os create command '%s' returned error: %s output: %s",
725
                  result.cmd, result.fail_reason, result.output)
726
    lines = [val.encode("string_escape")
727
             for val in utils.TailFile(logfile, lines=20)]
728
    return (False, "OS rename script failed (%s), last lines in the"
729
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
730

    
731
  return (True, "Rename successful")
732

    
733

    
734
def _GetVGInfo(vg_name):
735
  """Get informations about the volume group.
736

737
  @type vg_name: str
738
  @param vg_name: the volume group which we query
739
  @rtype: dict
740
  @return:
741
    A dictionary with the following keys:
742
      - C{vg_size} is the total size of the volume group in MiB
743
      - C{vg_free} is the free size of the volume group in MiB
744
      - C{pv_count} are the number of physical disks in that VG
745

746
    If an error occurs during gathering of data, we return the same dict
747
    with keys all set to None.
748

749
  """
750
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
751

    
752
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
753
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
754

    
755
  if retval.failed:
756
    logging.error("volume group %s not present", vg_name)
757
    return retdic
758
  valarr = retval.stdout.strip().rstrip(':').split(':')
759
  if len(valarr) == 3:
760
    try:
761
      retdic = {
762
        "vg_size": int(round(float(valarr[0]), 0)),
763
        "vg_free": int(round(float(valarr[1]), 0)),
764
        "pv_count": int(valarr[2]),
765
        }
766
    except ValueError, err:
767
      logging.exception("Fail to parse vgs output")
768
  else:
769
    logging.error("vgs output has the wrong number of fields (expected"
770
                  " three): %s", str(valarr))
771
  return retdic
772

    
773

    
774
def _GetBlockDevSymlinkPath(instance_name, idx):
775
  return os.path.join(constants.DISK_LINKS_DIR,
776
                      "%s:%d" % (instance_name, idx))
777

    
778

    
779
def _SymlinkBlockDev(instance_name, device_path, idx):
780
  """Set up symlinks to a instance's block device.
781

782
  This is an auxiliary function run when an instance is start (on the primary
783
  node) or when an instance is migrated (on the target node).
784

785

786
  @param instance_name: the name of the target instance
787
  @param device_path: path of the physical block device, on the node
788
  @param idx: the disk index
789
  @return: absolute path to the disk's symlink
790

791
  """
792
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
793
  try:
794
    os.symlink(device_path, link_name)
795
  except OSError, err:
796
    if err.errno == errno.EEXIST:
797
      if (not os.path.islink(link_name) or
798
          os.readlink(link_name) != device_path):
799
        os.remove(link_name)
800
        os.symlink(device_path, link_name)
801
    else:
802
      raise
803

    
804
  return link_name
805

    
806

    
807
def _RemoveBlockDevLinks(instance_name, disks):
808
  """Remove the block device symlinks belonging to the given instance.
809

810
  """
811
  for idx, disk in enumerate(disks):
812
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
813
    if os.path.islink(link_name):
814
      try:
815
        os.remove(link_name)
816
      except OSError:
817
        logging.exception("Can't remove symlink '%s'", link_name)
818

    
819

    
820
def _GatherAndLinkBlockDevs(instance):
821
  """Set up an instance's block device(s).
822

823
  This is run on the primary node at instance startup. The block
824
  devices must be already assembled.
825

826
  @type instance: L{objects.Instance}
827
  @param instance: the instance whose disks we shoul assemble
828
  @rtype: list
829
  @return: list of (disk_object, device_path)
830

831
  """
832
  block_devices = []
833
  for idx, disk in enumerate(instance.disks):
834
    device = _RecursiveFindBD(disk)
835
    if device is None:
836
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
837
                                    str(disk))
838
    device.Open()
839
    try:
840
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
841
    except OSError, e:
842
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
843
                                    e.strerror)
844

    
845
    block_devices.append((disk, link_name))
846

    
847
  return block_devices
848

    
849

    
850
def StartInstance(instance, extra_args):
851
  """Start an instance.
852

853
  @type instance: L{objects.Instance}
854
  @param instance: the instance object
855
  @rtype: boolean
856
  @return: whether the startup was successful or not
857

858
  """
859
  running_instances = GetInstanceList([instance.hypervisor])
860

    
861
  if instance.name in running_instances:
862
    return (True, "Already running")
863

    
864
  try:
865
    block_devices = _GatherAndLinkBlockDevs(instance)
866
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
867
    hyper.StartInstance(instance, block_devices, extra_args)
868
  except errors.BlockDeviceError, err:
869
    logging.exception("Failed to start instance")
870
    return (False, "Block device error: %s" % str(err))
871
  except errors.HypervisorError, err:
872
    logging.exception("Failed to start instance")
873
    _RemoveBlockDevLinks(instance.name, instance.disks)
874
    return (False, "Hypervisor error: %s" % str(err))
875

    
876
  return (True, "Instance started successfully")
877

    
878

    
879
def ShutdownInstance(instance):
880
  """Shut an instance down.
881

882
  @note: this functions uses polling with a hardcoded timeout.
883

884
  @type instance: L{objects.Instance}
885
  @param instance: the instance object
886
  @rtype: boolean
887
  @return: whether the startup was successful or not
888

889
  """
890
  hv_name = instance.hypervisor
891
  running_instances = GetInstanceList([hv_name])
892

    
893
  if instance.name not in running_instances:
894
    return True
895

    
896
  hyper = hypervisor.GetHypervisor(hv_name)
897
  try:
898
    hyper.StopInstance(instance)
899
  except errors.HypervisorError, err:
900
    logging.error("Failed to stop instance: %s" % err)
901
    return False
902

    
903
  # test every 10secs for 2min
904

    
905
  time.sleep(1)
906
  for dummy in range(11):
907
    if instance.name not in GetInstanceList([hv_name]):
908
      break
909
    time.sleep(10)
910
  else:
911
    # the shutdown did not succeed
912
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
913

    
914
    try:
915
      hyper.StopInstance(instance, force=True)
916
    except errors.HypervisorError, err:
917
      logging.exception("Failed to stop instance: %s" % err)
918
      return False
919

    
920
    time.sleep(1)
921
    if instance.name in GetInstanceList([hv_name]):
922
      logging.error("could not shutdown instance '%s' even by destroy",
923
                    instance.name)
924
      return False
925

    
926
  _RemoveBlockDevLinks(instance.name, instance.disks)
927

    
928
  return True
929

    
930

    
931
def RebootInstance(instance, reboot_type, extra_args):
932
  """Reboot an instance.
933

934
  @type instance: L{objects.Instance}
935
  @param instance: the instance object to reboot
936
  @type reboot_type: str
937
  @param reboot_type: the type of reboot, one the following
938
    constants:
939
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
940
        instance OS, do not recreate the VM
941
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
942
        restart the VM (at the hypervisor level)
943
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
944
        is not accepted here, since that mode is handled
945
        differently
946
  @rtype: boolean
947
  @return: the success of the operation
948

949
  """
950
  running_instances = GetInstanceList([instance.hypervisor])
951

    
952
  if instance.name not in running_instances:
953
    logging.error("Cannot reboot instance that is not running")
954
    return False
955

    
956
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
957
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
958
    try:
959
      hyper.RebootInstance(instance)
960
    except errors.HypervisorError, err:
961
      logging.exception("Failed to soft reboot instance")
962
      return False
963
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
964
    try:
965
      ShutdownInstance(instance)
966
      StartInstance(instance, extra_args)
967
    except errors.HypervisorError, err:
968
      logging.exception("Failed to hard reboot instance")
969
      return False
970
  else:
971
    raise errors.ParameterError("reboot_type invalid")
972

    
973
  return True
974

    
975

    
976
def MigrationInfo(instance):
977
  """Gather information about an instance to be migrated.
978

979
  @type instance: L{objects.Instance}
980
  @param instance: the instance definition
981

982
  """
983
  return (True, '')
984

    
985

    
986
def AcceptInstance(instance, info, target):
987
  """Prepare the node to accept an instance.
988

989
  @type instance: L{objects.Instance}
990
  @param instance: the instance definition
991
  @type info: string/data (opaque)
992
  @param info: migration information, from the source node
993
  @type target: string
994
  @param target: target host (usually ip), on this node
995

996
  """
997
  return (True, "Accept successfull")
998

    
999

    
1000
def FinalizeMigration(instance, info, success):
1001
  """Finalize any preparation to accept an instance.
1002

1003
  @type instance: L{objects.Instance}
1004
  @param instance: the instance definition
1005
  @type info: string/data (opaque)
1006
  @param info: migration information, from the source node
1007
  @type success: boolean
1008
  @param success: whether the migration was a success or a failure
1009

1010
  """
1011
  return (True, "Migration Finalized")
1012

    
1013

    
1014
def MigrateInstance(instance, target, live):
1015
  """Migrates an instance to another node.
1016

1017
  @type instance: L{objects.Instance}
1018
  @param instance: the instance definition
1019
  @type target: string
1020
  @param target: the target node name
1021
  @type live: boolean
1022
  @param live: whether the migration should be done live or not (the
1023
      interpretation of this parameter is left to the hypervisor)
1024
  @rtype: tuple
1025
  @return: a tuple of (success, msg) where:
1026
      - succes is a boolean denoting the success/failure of the operation
1027
      - msg is a string with details in case of failure
1028

1029
  """
1030
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1031

    
1032
  try:
1033
    hyper.MigrateInstance(instance.name, target, live)
1034
  except errors.HypervisorError, err:
1035
    msg = "Failed to migrate instance"
1036
    logging.exception(msg)
1037
    return (False, "%s: %s" % (msg, err))
1038
  return (True, "Migration successfull")
1039

    
1040

    
1041
def CreateBlockDevice(disk, size, owner, on_primary, info):
1042
  """Creates a block device for an instance.
1043

1044
  @type disk: L{objects.Disk}
1045
  @param disk: the object describing the disk we should create
1046
  @type size: int
1047
  @param size: the size of the physical underlying device, in MiB
1048
  @type owner: str
1049
  @param owner: the name of the instance for which disk is created,
1050
      used for device cache data
1051
  @type on_primary: boolean
1052
  @param on_primary:  indicates if it is the primary node or not
1053
  @type info: string
1054
  @param info: string that will be sent to the physical device
1055
      creation, used for example to set (LVM) tags on LVs
1056

1057
  @return: the new unique_id of the device (this can sometime be
1058
      computed only after creation), or None. On secondary nodes,
1059
      it's not required to return anything.
1060

1061
  """
1062
  clist = []
1063
  if disk.children:
1064
    for child in disk.children:
1065
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
1066
      if on_primary or disk.AssembleOnSecondary():
1067
        # we need the children open in case the device itself has to
1068
        # be assembled
1069
        crdev.Open()
1070
      clist.append(crdev)
1071

    
1072
  try:
1073
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1074
  except errors.GenericError, err:
1075
    return False, "Can't create block device: %s" % str(err)
1076

    
1077
  if on_primary or disk.AssembleOnSecondary():
1078
    if not device.Assemble():
1079
      errorstring = "Can't assemble device after creation, very unusual event"
1080
      logging.error(errorstring)
1081
      return False, errorstring
1082
    device.SetSyncSpeed(constants.SYNC_SPEED)
1083
    if on_primary or disk.OpenOnSecondary():
1084
      device.Open(force=True)
1085
    DevCacheManager.UpdateCache(device.dev_path, owner,
1086
                                on_primary, disk.iv_name)
1087

    
1088
  device.SetInfo(info)
1089

    
1090
  physical_id = device.unique_id
1091
  return True, physical_id
1092

    
1093

    
1094
def RemoveBlockDevice(disk):
1095
  """Remove a block device.
1096

1097
  @note: This is intended to be called recursively.
1098

1099
  @type disk: L{objects.Disk}
1100
  @param disk: the disk object we should remove
1101
  @rtype: boolean
1102
  @return: the success of the operation
1103

1104
  """
1105
  try:
1106
    rdev = _RecursiveFindBD(disk)
1107
  except errors.BlockDeviceError, err:
1108
    # probably can't attach
1109
    logging.info("Can't attach to device %s in remove", disk)
1110
    rdev = None
1111
  if rdev is not None:
1112
    r_path = rdev.dev_path
1113
    result = rdev.Remove()
1114
    if result:
1115
      DevCacheManager.RemoveCache(r_path)
1116
  else:
1117
    result = True
1118
  if disk.children:
1119
    for child in disk.children:
1120
      result = result and RemoveBlockDevice(child)
1121
  return result
1122

    
1123

    
1124
def _RecursiveAssembleBD(disk, owner, as_primary):
1125
  """Activate a block device for an instance.
1126

1127
  This is run on the primary and secondary nodes for an instance.
1128

1129
  @note: this function is called recursively.
1130

1131
  @type disk: L{objects.Disk}
1132
  @param disk: the disk we try to assemble
1133
  @type owner: str
1134
  @param owner: the name of the instance which owns the disk
1135
  @type as_primary: boolean
1136
  @param as_primary: if we should make the block device
1137
      read/write
1138

1139
  @return: the assembled device or None (in case no device
1140
      was assembled)
1141
  @raise errors.BlockDeviceError: in case there is an error
1142
      during the activation of the children or the device
1143
      itself
1144

1145
  """
1146
  children = []
1147
  if disk.children:
1148
    mcn = disk.ChildrenNeeded()
1149
    if mcn == -1:
1150
      mcn = 0 # max number of Nones allowed
1151
    else:
1152
      mcn = len(disk.children) - mcn # max number of Nones
1153
    for chld_disk in disk.children:
1154
      try:
1155
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1156
      except errors.BlockDeviceError, err:
1157
        if children.count(None) >= mcn:
1158
          raise
1159
        cdev = None
1160
        logging.debug("Error in child activation: %s", str(err))
1161
      children.append(cdev)
1162

    
1163
  if as_primary or disk.AssembleOnSecondary():
1164
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1165
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1166
    result = r_dev
1167
    if as_primary or disk.OpenOnSecondary():
1168
      r_dev.Open()
1169
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1170
                                as_primary, disk.iv_name)
1171

    
1172
  else:
1173
    result = True
1174
  return result
1175

    
1176

    
1177
def AssembleBlockDevice(disk, owner, as_primary):
1178
  """Activate a block device for an instance.
1179

1180
  This is a wrapper over _RecursiveAssembleBD.
1181

1182
  @rtype: str or boolean
1183
  @return: a C{/dev/...} path for primary nodes, and
1184
      C{True} for secondary nodes
1185

1186
  """
1187
  result = _RecursiveAssembleBD(disk, owner, as_primary)
1188
  if isinstance(result, bdev.BlockDev):
1189
    result = result.dev_path
1190
  return result
1191

    
1192

    
1193
def ShutdownBlockDevice(disk):
1194
  """Shut down a block device.
1195

1196
  First, if the device is assembled (Attach() is successfull), then
1197
  the device is shutdown. Then the children of the device are
1198
  shutdown.
1199

1200
  This function is called recursively. Note that we don't cache the
1201
  children or such, as oppossed to assemble, shutdown of different
1202
  devices doesn't require that the upper device was active.
1203

1204
  @type disk: L{objects.Disk}
1205
  @param disk: the description of the disk we should
1206
      shutdown
1207
  @rtype: boolean
1208
  @return: the success of the operation
1209

1210
  """
1211
  r_dev = _RecursiveFindBD(disk)
1212
  if r_dev is not None:
1213
    r_path = r_dev.dev_path
1214
    result = r_dev.Shutdown()
1215
    if result:
1216
      DevCacheManager.RemoveCache(r_path)
1217
  else:
1218
    result = True
1219
  if disk.children:
1220
    for child in disk.children:
1221
      result = result and ShutdownBlockDevice(child)
1222
  return result
1223

    
1224

    
1225
def MirrorAddChildren(parent_cdev, new_cdevs):
1226
  """Extend a mirrored block device.
1227

1228
  @type parent_cdev: L{objects.Disk}
1229
  @param parent_cdev: the disk to which we should add children
1230
  @type new_cdevs: list of L{objects.Disk}
1231
  @param new_cdevs: the list of children which we should add
1232
  @rtype: boolean
1233
  @return: the success of the operation
1234

1235
  """
1236
  parent_bdev = _RecursiveFindBD(parent_cdev)
1237
  if parent_bdev is None:
1238
    logging.error("Can't find parent device")
1239
    return False
1240
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1241
  if new_bdevs.count(None) > 0:
1242
    logging.error("Can't find new device(s) to add: %s:%s",
1243
                  new_bdevs, new_cdevs)
1244
    return False
1245
  parent_bdev.AddChildren(new_bdevs)
1246
  return True
1247

    
1248

    
1249
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1250
  """Shrink a mirrored block device.
1251

1252
  @type parent_cdev: L{objects.Disk}
1253
  @param parent_cdev: the disk from which we should remove children
1254
  @type new_cdevs: list of L{objects.Disk}
1255
  @param new_cdevs: the list of children which we should remove
1256
  @rtype: boolean
1257
  @return: the success of the operation
1258

1259
  """
1260
  parent_bdev = _RecursiveFindBD(parent_cdev)
1261
  if parent_bdev is None:
1262
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1263
    return False
1264
  devs = []
1265
  for disk in new_cdevs:
1266
    rpath = disk.StaticDevPath()
1267
    if rpath is None:
1268
      bd = _RecursiveFindBD(disk)
1269
      if bd is None:
1270
        logging.error("Can't find dynamic device %s while removing children",
1271
                      disk)
1272
        return False
1273
      else:
1274
        devs.append(bd.dev_path)
1275
    else:
1276
      devs.append(rpath)
1277
  parent_bdev.RemoveChildren(devs)
1278
  return True
1279

    
1280

    
1281
def GetMirrorStatus(disks):
1282
  """Get the mirroring status of a list of devices.
1283

1284
  @type disks: list of L{objects.Disk}
1285
  @param disks: the list of disks which we should query
1286
  @rtype: disk
1287
  @return:
1288
      a list of (mirror_done, estimated_time) tuples, which
1289
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1290
  @raise errors.BlockDeviceError: if any of the disks cannot be
1291
      found
1292

1293
  """
1294
  stats = []
1295
  for dsk in disks:
1296
    rbd = _RecursiveFindBD(dsk)
1297
    if rbd is None:
1298
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1299
    stats.append(rbd.CombinedSyncStatus())
1300
  return stats
1301

    
1302

    
1303
def _RecursiveFindBD(disk):
1304
  """Check if a device is activated.
1305

1306
  If so, return informations about the real device.
1307

1308
  @type disk: L{objects.Disk}
1309
  @param disk: the disk object we need to find
1310

1311
  @return: None if the device can't be found,
1312
      otherwise the device instance
1313

1314
  """
1315
  children = []
1316
  if disk.children:
1317
    for chdisk in disk.children:
1318
      children.append(_RecursiveFindBD(chdisk))
1319

    
1320
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1321

    
1322

    
1323
def FindBlockDevice(disk):
1324
  """Check if a device is activated.
1325

1326
  If it is, return informations about the real device.
1327

1328
  @type disk: L{objects.Disk}
1329
  @param disk: the disk to find
1330
  @rtype: None or tuple
1331
  @return: None if the disk cannot be found, otherwise a
1332
      tuple (device_path, major, minor, sync_percent,
1333
      estimated_time, is_degraded)
1334

1335
  """
1336
  rbd = _RecursiveFindBD(disk)
1337
  if rbd is None:
1338
    return rbd
1339
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1340

    
1341

    
1342
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1343
  """Write a file to the filesystem.
1344

1345
  This allows the master to overwrite(!) a file. It will only perform
1346
  the operation if the file belongs to a list of configuration files.
1347

1348
  @type file_name: str
1349
  @param file_name: the target file name
1350
  @type data: str
1351
  @param data: the new contents of the file
1352
  @type mode: int
1353
  @param mode: the mode to give the file (can be None)
1354
  @type uid: int
1355
  @param uid: the owner of the file (can be -1 for default)
1356
  @type gid: int
1357
  @param gid: the group of the file (can be -1 for default)
1358
  @type atime: float
1359
  @param atime: the atime to set on the file (can be None)
1360
  @type mtime: float
1361
  @param mtime: the mtime to set on the file (can be None)
1362
  @rtype: boolean
1363
  @return: the success of the operation; errors are logged
1364
      in the node daemon log
1365

1366
  """
1367
  if not os.path.isabs(file_name):
1368
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1369
                  file_name)
1370
    return False
1371

    
1372
  allowed_files = [
1373
    constants.CLUSTER_CONF_FILE,
1374
    constants.ETC_HOSTS,
1375
    constants.SSH_KNOWN_HOSTS_FILE,
1376
    constants.VNC_PASSWORD_FILE,
1377
    ]
1378

    
1379
  if file_name not in allowed_files:
1380
    logging.error("Filename passed to UploadFile not in allowed"
1381
                 " upload targets: '%s'", file_name)
1382
    return False
1383

    
1384
  raw_data = _Decompress(data)
1385

    
1386
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1387
                  atime=atime, mtime=mtime)
1388
  return True
1389

    
1390

    
1391
def WriteSsconfFiles(values):
1392
  """Update all ssconf files.
1393

1394
  Wrapper around the SimpleStore.WriteFiles.
1395

1396
  """
1397
  ssconf.SimpleStore().WriteFiles(values)
1398

    
1399

    
1400
def _ErrnoOrStr(err):
1401
  """Format an EnvironmentError exception.
1402

1403
  If the L{err} argument has an errno attribute, it will be looked up
1404
  and converted into a textual C{E...} description. Otherwise the
1405
  string representation of the error will be returned.
1406

1407
  @type err: L{EnvironmentError}
1408
  @param err: the exception to format
1409

1410
  """
1411
  if hasattr(err, 'errno'):
1412
    detail = errno.errorcode[err.errno]
1413
  else:
1414
    detail = str(err)
1415
  return detail
1416

    
1417

    
1418
def _OSOndiskVersion(name, os_dir):
1419
  """Compute and return the API version of a given OS.
1420

1421
  This function will try to read the API version of the OS given by
1422
  the 'name' parameter and residing in the 'os_dir' directory.
1423

1424
  @type name: str
1425
  @param name: the OS name we should look for
1426
  @type os_dir: str
1427
  @param os_dir: the directory inwhich we should look for the OS
1428
  @rtype: int or None
1429
  @return:
1430
      Either an integer denoting the version or None in the
1431
      case when this is not a valid OS name.
1432
  @raise errors.InvalidOS: if the OS cannot be found
1433

1434
  """
1435
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1436

    
1437
  try:
1438
    st = os.stat(api_file)
1439
  except EnvironmentError, err:
1440
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1441
                           " found (%s)" % _ErrnoOrStr(err))
1442

    
1443
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1444
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1445
                           " a regular file")
1446

    
1447
  try:
1448
    f = open(api_file)
1449
    try:
1450
      api_versions = f.readlines()
1451
    finally:
1452
      f.close()
1453
  except EnvironmentError, err:
1454
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1455
                           " API version (%s)" % _ErrnoOrStr(err))
1456

    
1457
  api_versions = [version.strip() for version in api_versions]
1458
  try:
1459
    api_versions = [int(version) for version in api_versions]
1460
  except (TypeError, ValueError), err:
1461
    raise errors.InvalidOS(name, os_dir,
1462
                           "API version is not integer (%s)" % str(err))
1463

    
1464
  return api_versions
1465

    
1466

    
1467
def DiagnoseOS(top_dirs=None):
1468
  """Compute the validity for all OSes.
1469

1470
  @type top_dirs: list
1471
  @param top_dirs: the list of directories in which to
1472
      search (if not given defaults to
1473
      L{constants.OS_SEARCH_PATH})
1474
  @rtype: list of L{objects.OS}
1475
  @return: an OS object for each name in all the given
1476
      directories
1477

1478
  """
1479
  if top_dirs is None:
1480
    top_dirs = constants.OS_SEARCH_PATH
1481

    
1482
  result = []
1483
  for dir_name in top_dirs:
1484
    if os.path.isdir(dir_name):
1485
      try:
1486
        f_names = utils.ListVisibleFiles(dir_name)
1487
      except EnvironmentError, err:
1488
        logging.exception("Can't list the OS directory %s", dir_name)
1489
        break
1490
      for name in f_names:
1491
        try:
1492
          os_inst = OSFromDisk(name, base_dir=dir_name)
1493
          result.append(os_inst)
1494
        except errors.InvalidOS, err:
1495
          result.append(objects.OS.FromInvalidOS(err))
1496

    
1497
  return result
1498

    
1499

    
1500
def OSFromDisk(name, base_dir=None):
1501
  """Create an OS instance from disk.
1502

1503
  This function will return an OS instance if the given name is a
1504
  valid OS name. Otherwise, it will raise an appropriate
1505
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1506

1507
  @type base_dir: string
1508
  @keyword base_dir: Base directory containing OS installations.
1509
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1510
  @rtype: L{objects.OS}
1511
  @return: the OS instance if we find a valid one
1512
  @raise errors.InvalidOS: if we don't find a valid OS
1513

1514
  """
1515
  if base_dir is None:
1516
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1517
    if os_dir is None:
1518
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1519
  else:
1520
    os_dir = os.path.sep.join([base_dir, name])
1521

    
1522
  api_versions = _OSOndiskVersion(name, os_dir)
1523

    
1524
  if constants.OS_API_VERSION not in api_versions:
1525
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1526
                           " (found %s want %s)"
1527
                           % (api_versions, constants.OS_API_VERSION))
1528

    
1529
  # OS Scripts dictionary, we will populate it with the actual script names
1530
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1531

    
1532
  for script in os_scripts:
1533
    os_scripts[script] = os.path.sep.join([os_dir, script])
1534

    
1535
    try:
1536
      st = os.stat(os_scripts[script])
1537
    except EnvironmentError, err:
1538
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1539
                             (script, _ErrnoOrStr(err)))
1540

    
1541
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1542
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1543
                             script)
1544

    
1545
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1546
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1547
                             script)
1548

    
1549

    
1550
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1551
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1552
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1553
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1554
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1555
                    api_versions=api_versions)
1556

    
1557
def OSEnvironment(instance, debug=0):
1558
  """Calculate the environment for an os script.
1559

1560
  @type instance: L{objects.Instance}
1561
  @param instance: target instance for the os script run
1562
  @type debug: integer
1563
  @param debug: debug level (0 or 1, for OS Api 10)
1564
  @rtype: dict
1565
  @return: dict of environment variables
1566
  @raise errors.BlockDeviceError: if the block device
1567
      cannot be found
1568

1569
  """
1570
  result = {}
1571
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1572
  result['INSTANCE_NAME'] = instance.name
1573
  result['HYPERVISOR'] = instance.hypervisor
1574
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1575
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1576
  result['DEBUG_LEVEL'] = '%d' % debug
1577
  for idx, disk in enumerate(instance.disks):
1578
    real_disk = _RecursiveFindBD(disk)
1579
    if real_disk is None:
1580
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1581
                                    str(disk))
1582
    real_disk.Open()
1583
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1584
    # FIXME: When disks will have read-only mode, populate this
1585
    result['DISK_%d_ACCESS' % idx] = 'W'
1586
    if constants.HV_DISK_TYPE in instance.hvparams:
1587
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1588
        instance.hvparams[constants.HV_DISK_TYPE]
1589
    if disk.dev_type in constants.LDS_BLOCK:
1590
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1591
    elif disk.dev_type == constants.LD_FILE:
1592
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1593
        'file:%s' % disk.physical_id[0]
1594
  for idx, nic in enumerate(instance.nics):
1595
    result['NIC_%d_MAC' % idx] = nic.mac
1596
    if nic.ip:
1597
      result['NIC_%d_IP' % idx] = nic.ip
1598
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1599
    if constants.HV_NIC_TYPE in instance.hvparams:
1600
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1601
        instance.hvparams[constants.HV_NIC_TYPE]
1602

    
1603
  return result
1604

    
1605
def GrowBlockDevice(disk, amount):
1606
  """Grow a stack of block devices.
1607

1608
  This function is called recursively, with the childrens being the
1609
  first ones to resize.
1610

1611
  @type disk: L{objects.Disk}
1612
  @param disk: the disk to be grown
1613
  @rtype: (status, result)
1614
  @return: a tuple with the status of the operation
1615
      (True/False), and the errors message if status
1616
      is False
1617

1618
  """
1619
  r_dev = _RecursiveFindBD(disk)
1620
  if r_dev is None:
1621
    return False, "Cannot find block device %s" % (disk,)
1622

    
1623
  try:
1624
    r_dev.Grow(amount)
1625
  except errors.BlockDeviceError, err:
1626
    return False, str(err)
1627

    
1628
  return True, None
1629

    
1630

    
1631
def SnapshotBlockDevice(disk):
1632
  """Create a snapshot copy of a block device.
1633

1634
  This function is called recursively, and the snapshot is actually created
1635
  just for the leaf lvm backend device.
1636

1637
  @type disk: L{objects.Disk}
1638
  @param disk: the disk to be snapshotted
1639
  @rtype: string
1640
  @return: snapshot disk path
1641

1642
  """
1643
  if disk.children:
1644
    if len(disk.children) == 1:
1645
      # only one child, let's recurse on it
1646
      return SnapshotBlockDevice(disk.children[0])
1647
    else:
1648
      # more than one child, choose one that matches
1649
      for child in disk.children:
1650
        if child.size == disk.size:
1651
          # return implies breaking the loop
1652
          return SnapshotBlockDevice(child)
1653
  elif disk.dev_type == constants.LD_LV:
1654
    r_dev = _RecursiveFindBD(disk)
1655
    if r_dev is not None:
1656
      # let's stay on the safe side and ask for the full size, for now
1657
      return r_dev.Snapshot(disk.size)
1658
    else:
1659
      return None
1660
  else:
1661
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1662
                                 " '%s' of type '%s'" %
1663
                                 (disk.unique_id, disk.dev_type))
1664

    
1665

    
1666
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1667
  """Export a block device snapshot to a remote node.
1668

1669
  @type disk: L{objects.Disk}
1670
  @param disk: the description of the disk to export
1671
  @type dest_node: str
1672
  @param dest_node: the destination node to export to
1673
  @type instance: L{objects.Instance}
1674
  @param instance: the instance object to whom the disk belongs
1675
  @type cluster_name: str
1676
  @param cluster_name: the cluster name, needed for SSH hostalias
1677
  @type idx: int
1678
  @param idx: the index of the disk in the instance's disk list,
1679
      used to export to the OS scripts environment
1680
  @rtype: boolean
1681
  @return: the success of the operation
1682

1683
  """
1684
  export_env = OSEnvironment(instance)
1685

    
1686
  inst_os = OSFromDisk(instance.os)
1687
  export_script = inst_os.export_script
1688

    
1689
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1690
                                     instance.name, int(time.time()))
1691
  if not os.path.exists(constants.LOG_OS_DIR):
1692
    os.mkdir(constants.LOG_OS_DIR, 0750)
1693
  real_disk = _RecursiveFindBD(disk)
1694
  if real_disk is None:
1695
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1696
                                  str(disk))
1697
  real_disk.Open()
1698

    
1699
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1700
  export_env['EXPORT_INDEX'] = str(idx)
1701

    
1702
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1703
  destfile = disk.physical_id[1]
1704

    
1705
  # the target command is built out of three individual commands,
1706
  # which are joined by pipes; we check each individual command for
1707
  # valid parameters
1708
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1709
                               export_script, logfile)
1710

    
1711
  comprcmd = "gzip"
1712

    
1713
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1714
                                destdir, destdir, destfile)
1715
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1716
                                                   constants.GANETI_RUNAS,
1717
                                                   destcmd)
1718

    
1719
  # all commands have been checked, so we're safe to combine them
1720
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1721

    
1722
  result = utils.RunCmd(command, env=export_env)
1723

    
1724
  if result.failed:
1725
    logging.error("os snapshot export command '%s' returned error: %s"
1726
                  " output: %s", command, result.fail_reason, result.output)
1727
    return False
1728

    
1729
  return True
1730

    
1731

    
1732
def FinalizeExport(instance, snap_disks):
1733
  """Write out the export configuration information.
1734

1735
  @type instance: L{objects.Instance}
1736
  @param instance: the instance which we export, used for
1737
      saving configuration
1738
  @type snap_disks: list of L{objects.Disk}
1739
  @param snap_disks: list of snapshot block devices, which
1740
      will be used to get the actual name of the dump file
1741

1742
  @rtype: boolean
1743
  @return: the success of the operation
1744

1745
  """
1746
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1747
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1748

    
1749
  config = objects.SerializableConfigParser()
1750

    
1751
  config.add_section(constants.INISECT_EXP)
1752
  config.set(constants.INISECT_EXP, 'version', '0')
1753
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1754
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1755
  config.set(constants.INISECT_EXP, 'os', instance.os)
1756
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1757

    
1758
  config.add_section(constants.INISECT_INS)
1759
  config.set(constants.INISECT_INS, 'name', instance.name)
1760
  config.set(constants.INISECT_INS, 'memory', '%d' %
1761
             instance.beparams[constants.BE_MEMORY])
1762
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1763
             instance.beparams[constants.BE_VCPUS])
1764
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1765

    
1766
  nic_total = 0
1767
  for nic_count, nic in enumerate(instance.nics):
1768
    nic_total += 1
1769
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1770
               nic_count, '%s' % nic.mac)
1771
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1772
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1773
               '%s' % nic.bridge)
1774
  # TODO: redundant: on load can read nics until it doesn't exist
1775
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1776

    
1777
  disk_total = 0
1778
  for disk_count, disk in enumerate(snap_disks):
1779
    if disk:
1780
      disk_total += 1
1781
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1782
                 ('%s' % disk.iv_name))
1783
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1784
                 ('%s' % disk.physical_id[1]))
1785
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1786
                 ('%d' % disk.size))
1787

    
1788
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1789

    
1790
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1791
                  data=config.Dumps())
1792
  shutil.rmtree(finaldestdir, True)
1793
  shutil.move(destdir, finaldestdir)
1794

    
1795
  return True
1796

    
1797

    
1798
def ExportInfo(dest):
1799
  """Get export configuration information.
1800

1801
  @type dest: str
1802
  @param dest: directory containing the export
1803

1804
  @rtype: L{objects.SerializableConfigParser}
1805
  @return: a serializable config file containing the
1806
      export info
1807

1808
  """
1809
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1810

    
1811
  config = objects.SerializableConfigParser()
1812
  config.read(cff)
1813

    
1814
  if (not config.has_section(constants.INISECT_EXP) or
1815
      not config.has_section(constants.INISECT_INS)):
1816
    return None
1817

    
1818
  return config
1819

    
1820

    
1821
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1822
  """Import an os image into an instance.
1823

1824
  @type instance: L{objects.Instance}
1825
  @param instance: instance to import the disks into
1826
  @type src_node: string
1827
  @param src_node: source node for the disk images
1828
  @type src_images: list of string
1829
  @param src_images: absolute paths of the disk images
1830
  @rtype: list of boolean
1831
  @return: each boolean represent the success of importing the n-th disk
1832

1833
  """
1834
  import_env = OSEnvironment(instance)
1835
  inst_os = OSFromDisk(instance.os)
1836
  import_script = inst_os.import_script
1837

    
1838
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1839
                                        instance.name, int(time.time()))
1840
  if not os.path.exists(constants.LOG_OS_DIR):
1841
    os.mkdir(constants.LOG_OS_DIR, 0750)
1842

    
1843
  comprcmd = "gunzip"
1844
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1845
                               import_script, logfile)
1846

    
1847
  final_result = []
1848
  for idx, image in enumerate(src_images):
1849
    if image:
1850
      destcmd = utils.BuildShellCmd('cat %s', image)
1851
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1852
                                                       constants.GANETI_RUNAS,
1853
                                                       destcmd)
1854
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1855
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1856
      import_env['IMPORT_INDEX'] = str(idx)
1857
      result = utils.RunCmd(command, env=import_env)
1858
      if result.failed:
1859
        logging.error("Disk import command '%s' returned error: %s"
1860
                      " output: %s", command, result.fail_reason,
1861
                      result.output)
1862
        final_result.append(False)
1863
      else:
1864
        final_result.append(True)
1865
    else:
1866
      final_result.append(True)
1867

    
1868
  return final_result
1869

    
1870

    
1871
def ListExports():
1872
  """Return a list of exports currently available on this machine.
1873

1874
  @rtype: list
1875
  @return: list of the exports
1876

1877
  """
1878
  if os.path.isdir(constants.EXPORT_DIR):
1879
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1880
  else:
1881
    return []
1882

    
1883

    
1884
def RemoveExport(export):
1885
  """Remove an existing export from the node.
1886

1887
  @type export: str
1888
  @param export: the name of the export to remove
1889
  @rtype: boolean
1890
  @return: the success of the operation
1891

1892
  """
1893
  target = os.path.join(constants.EXPORT_DIR, export)
1894

    
1895
  shutil.rmtree(target)
1896
  # TODO: catch some of the relevant exceptions and provide a pretty
1897
  # error message if rmtree fails.
1898

    
1899
  return True
1900

    
1901

    
1902
def RenameBlockDevices(devlist):
1903
  """Rename a list of block devices.
1904

1905
  @type devlist: list of tuples
1906
  @param devlist: list of tuples of the form  (disk,
1907
      new_logical_id, new_physical_id); disk is an
1908
      L{objects.Disk} object describing the current disk,
1909
      and new logical_id/physical_id is the name we
1910
      rename it to
1911
  @rtype: boolean
1912
  @return: True if all renames succeeded, False otherwise
1913

1914
  """
1915
  result = True
1916
  for disk, unique_id in devlist:
1917
    dev = _RecursiveFindBD(disk)
1918
    if dev is None:
1919
      result = False
1920
      continue
1921
    try:
1922
      old_rpath = dev.dev_path
1923
      dev.Rename(unique_id)
1924
      new_rpath = dev.dev_path
1925
      if old_rpath != new_rpath:
1926
        DevCacheManager.RemoveCache(old_rpath)
1927
        # FIXME: we should add the new cache information here, like:
1928
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1929
        # but we don't have the owner here - maybe parse from existing
1930
        # cache? for now, we only lose lvm data when we rename, which
1931
        # is less critical than DRBD or MD
1932
    except errors.BlockDeviceError, err:
1933
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1934
      result = False
1935
  return result
1936

    
1937

    
1938
def _TransformFileStorageDir(file_storage_dir):
1939
  """Checks whether given file_storage_dir is valid.
1940

1941
  Checks wheter the given file_storage_dir is within the cluster-wide
1942
  default file_storage_dir stored in SimpleStore. Only paths under that
1943
  directory are allowed.
1944

1945
  @type file_storage_dir: str
1946
  @param file_storage_dir: the path to check
1947

1948
  @return: the normalized path if valid, None otherwise
1949

1950
  """
1951
  cfg = _GetConfig()
1952
  file_storage_dir = os.path.normpath(file_storage_dir)
1953
  base_file_storage_dir = cfg.GetFileStorageDir()
1954
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1955
      base_file_storage_dir):
1956
    logging.error("file storage directory '%s' is not under base file"
1957
                  " storage directory '%s'",
1958
                  file_storage_dir, base_file_storage_dir)
1959
    return None
1960
  return file_storage_dir
1961

    
1962

    
1963
def CreateFileStorageDir(file_storage_dir):
1964
  """Create file storage directory.
1965

1966
  @type file_storage_dir: str
1967
  @param file_storage_dir: directory to create
1968

1969
  @rtype: tuple
1970
  @return: tuple with first element a boolean indicating wheter dir
1971
      creation was successful or not
1972

1973
  """
1974
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1975
  result = True,
1976
  if not file_storage_dir:
1977
    result = False,
1978
  else:
1979
    if os.path.exists(file_storage_dir):
1980
      if not os.path.isdir(file_storage_dir):
1981
        logging.error("'%s' is not a directory", file_storage_dir)
1982
        result = False,
1983
    else:
1984
      try:
1985
        os.makedirs(file_storage_dir, 0750)
1986
      except OSError, err:
1987
        logging.error("Cannot create file storage directory '%s': %s",
1988
                      file_storage_dir, err)
1989
        result = False,
1990
  return result
1991

    
1992

    
1993
def RemoveFileStorageDir(file_storage_dir):
1994
  """Remove file storage directory.
1995

1996
  Remove it only if it's empty. If not log an error and return.
1997

1998
  @type file_storage_dir: str
1999
  @param file_storage_dir: the directory we should cleanup
2000
  @rtype: tuple (success,)
2001
  @return: tuple of one element, C{success}, denoting
2002
      whether the operation was successfull
2003

2004
  """
2005
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2006
  result = True,
2007
  if not file_storage_dir:
2008
    result = False,
2009
  else:
2010
    if os.path.exists(file_storage_dir):
2011
      if not os.path.isdir(file_storage_dir):
2012
        logging.error("'%s' is not a directory", file_storage_dir)
2013
        result = False,
2014
      # deletes dir only if empty, otherwise we want to return False
2015
      try:
2016
        os.rmdir(file_storage_dir)
2017
      except OSError, err:
2018
        logging.exception("Cannot remove file storage directory '%s'",
2019
                          file_storage_dir)
2020
        result = False,
2021
  return result
2022

    
2023

    
2024
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2025
  """Rename the file storage directory.
2026

2027
  @type old_file_storage_dir: str
2028
  @param old_file_storage_dir: the current path
2029
  @type new_file_storage_dir: str
2030
  @param new_file_storage_dir: the name we should rename to
2031
  @rtype: tuple (success,)
2032
  @return: tuple of one element, C{success}, denoting
2033
      whether the operation was successful
2034

2035
  """
2036
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2037
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2038
  result = True,
2039
  if not old_file_storage_dir or not new_file_storage_dir:
2040
    result = False,
2041
  else:
2042
    if not os.path.exists(new_file_storage_dir):
2043
      if os.path.isdir(old_file_storage_dir):
2044
        try:
2045
          os.rename(old_file_storage_dir, new_file_storage_dir)
2046
        except OSError, err:
2047
          logging.exception("Cannot rename '%s' to '%s'",
2048
                            old_file_storage_dir, new_file_storage_dir)
2049
          result =  False,
2050
      else:
2051
        logging.error("'%s' is not a directory", old_file_storage_dir)
2052
        result = False,
2053
    else:
2054
      if os.path.exists(old_file_storage_dir):
2055
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2056
                      old_file_storage_dir, new_file_storage_dir)
2057
        result = False,
2058
  return result
2059

    
2060

    
2061
def _IsJobQueueFile(file_name):
2062
  """Checks whether the given filename is in the queue directory.
2063

2064
  @type file_name: str
2065
  @param file_name: the file name we should check
2066
  @rtype: boolean
2067
  @return: whether the file is under the queue directory
2068

2069
  """
2070
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2071
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2072

    
2073
  if not result:
2074
    logging.error("'%s' is not a file in the queue directory",
2075
                  file_name)
2076

    
2077
  return result
2078

    
2079

    
2080
def JobQueueUpdate(file_name, content):
2081
  """Updates a file in the queue directory.
2082

2083
  This is just a wrapper over L{utils.WriteFile}, with proper
2084
  checking.
2085

2086
  @type file_name: str
2087
  @param file_name: the job file name
2088
  @type content: str
2089
  @param content: the new job contents
2090
  @rtype: boolean
2091
  @return: the success of the operation
2092

2093
  """
2094
  if not _IsJobQueueFile(file_name):
2095
    return False
2096

    
2097
  # Write and replace the file atomically
2098
  utils.WriteFile(file_name, data=_Decompress(content))
2099

    
2100
  return True
2101

    
2102

    
2103
def JobQueueRename(old, new):
2104
  """Renames a job queue file.
2105

2106
  This is just a wrapper over os.rename with proper checking.
2107

2108
  @type old: str
2109
  @param old: the old (actual) file name
2110
  @type new: str
2111
  @param new: the desired file name
2112
  @rtype: boolean
2113
  @return: the success of the operation
2114

2115
  """
2116
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2117
    return False
2118

    
2119
  utils.RenameFile(old, new, mkdir=True)
2120

    
2121
  return True
2122

    
2123

    
2124
def JobQueueSetDrainFlag(drain_flag):
2125
  """Set the drain flag for the queue.
2126

2127
  This will set or unset the queue drain flag.
2128

2129
  @type drain_flag: boolean
2130
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2131
  @rtype: boolean
2132
  @return: always True
2133
  @warning: the function always returns True
2134

2135
  """
2136
  if drain_flag:
2137
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2138
  else:
2139
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2140

    
2141
  return True
2142

    
2143

    
2144
def CloseBlockDevices(instance_name, disks):
2145
  """Closes the given block devices.
2146

2147
  This means they will be switched to secondary mode (in case of
2148
  DRBD).
2149

2150
  @param instance_name: if the argument is not empty, the symlinks
2151
      of this instance will be removed
2152
  @type disks: list of L{objects.Disk}
2153
  @param disks: the list of disks to be closed
2154
  @rtype: tuple (success, message)
2155
  @return: a tuple of success and message, where success
2156
      indicates the succes of the operation, and message
2157
      which will contain the error details in case we
2158
      failed
2159

2160
  """
2161
  bdevs = []
2162
  for cf in disks:
2163
    rd = _RecursiveFindBD(cf)
2164
    if rd is None:
2165
      return (False, "Can't find device %s" % cf)
2166
    bdevs.append(rd)
2167

    
2168
  msg = []
2169
  for rd in bdevs:
2170
    try:
2171
      rd.Close()
2172
    except errors.BlockDeviceError, err:
2173
      msg.append(str(err))
2174
  if msg:
2175
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2176
  else:
2177
    if instance_name:
2178
      _RemoveBlockDevLinks(instance_name, disks)
2179
    return (True, "All devices secondary")
2180

    
2181

    
2182
def ValidateHVParams(hvname, hvparams):
2183
  """Validates the given hypervisor parameters.
2184

2185
  @type hvname: string
2186
  @param hvname: the hypervisor name
2187
  @type hvparams: dict
2188
  @param hvparams: the hypervisor parameters to be validated
2189
  @rtype: tuple (success, message)
2190
  @return: a tuple of success and message, where success
2191
      indicates the succes of the operation, and message
2192
      which will contain the error details in case we
2193
      failed
2194

2195
  """
2196
  try:
2197
    hv_type = hypervisor.GetHypervisor(hvname)
2198
    hv_type.ValidateParameters(hvparams)
2199
    return (True, "Validation passed")
2200
  except errors.HypervisorError, err:
2201
    return (False, str(err))
2202

    
2203

    
2204
def DemoteFromMC():
2205
  """Demotes the current node from master candidate role.
2206

2207
  """
2208
  # try to ensure we're not the master by mistake
2209
  master, myself = ssconf.GetMasterAndMyself()
2210
  if master == myself:
2211
    return (False, "ssconf status shows I'm the master node, will not demote")
2212
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2213
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2214
    return (False, "The master daemon is running, will not demote")
2215
  try:
2216
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2217
  except EnvironmentError, err:
2218
    if err.errno != errno.ENOENT:
2219
      return (False, "Error while backing up cluster file: %s" % str(err))
2220
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2221
  return (True, "Done")
2222

    
2223

    
2224
def _FindDisks(nodes_ip, disks):
2225
  """Sets the physical ID on disks and returns the block devices.
2226

2227
  """
2228
  # set the correct physical ID
2229
  my_name = utils.HostInfo().name
2230
  for cf in disks:
2231
    cf.SetPhysicalID(my_name, nodes_ip)
2232

    
2233
  bdevs = []
2234

    
2235
  for cf in disks:
2236
    rd = _RecursiveFindBD(cf)
2237
    if rd is None:
2238
      return (False, "Can't find device %s" % cf)
2239
    bdevs.append(rd)
2240
  return (True, bdevs)
2241

    
2242

    
2243
def DrbdDisconnectNet(nodes_ip, disks):
2244
  """Disconnects the network on a list of drbd devices.
2245

2246
  """
2247
  status, bdevs = _FindDisks(nodes_ip, disks)
2248
  if not status:
2249
    return status, bdevs
2250

    
2251
  # disconnect disks
2252
  for rd in bdevs:
2253
    try:
2254
      rd.DisconnectNet()
2255
    except errors.BlockDeviceError, err:
2256
      logging.exception("Failed to go into standalone mode")
2257
      return (False, "Can't change network configuration: %s" % str(err))
2258
  return (True, "All disks are now disconnected")
2259

    
2260

    
2261
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2262
  """Attaches the network on a list of drbd devices.
2263

2264
  """
2265
  status, bdevs = _FindDisks(nodes_ip, disks)
2266
  if not status:
2267
    return status, bdevs
2268

    
2269
  if multimaster:
2270
    for idx, rd in enumerate(bdevs):
2271
      try:
2272
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2273
      except EnvironmentError, err:
2274
        return (False, "Can't create symlink: %s" % str(err))
2275
  # reconnect disks, switch to new master configuration and if
2276
  # needed primary mode
2277
  for rd in bdevs:
2278
    try:
2279
      rd.AttachNet(multimaster)
2280
    except errors.BlockDeviceError, err:
2281
      return (False, "Can't change network configuration: %s" % str(err))
2282
  # wait until the disks are connected; we need to retry the re-attach
2283
  # if the device becomes standalone, as this might happen if the one
2284
  # node disconnects and reconnects in a different mode before the
2285
  # other node reconnects; in this case, one or both of the nodes will
2286
  # decide it has wrong configuration and switch to standalone
2287
  RECONNECT_TIMEOUT = 2 * 60
2288
  sleep_time = 0.100 # start with 100 miliseconds
2289
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2290
  while time.time() < timeout_limit:
2291
    all_connected = True
2292
    for rd in bdevs:
2293
      stats = rd.GetProcStatus()
2294
      if not (stats.is_connected or stats.is_in_resync):
2295
        all_connected = False
2296
      if stats.is_standalone:
2297
        # peer had different config info and this node became
2298
        # standalone, even though this should not happen with the
2299
        # new staged way of changing disk configs
2300
        try:
2301
          rd.ReAttachNet(multimaster)
2302
        except errors.BlockDeviceError, err:
2303
          return (False, "Can't change network configuration: %s" % str(err))
2304
    if all_connected:
2305
      break
2306
    time.sleep(sleep_time)
2307
    sleep_time = min(5, sleep_time * 1.5)
2308
  if not all_connected:
2309
    return (False, "Timeout in disk reconnecting")
2310
  if multimaster:
2311
    # change to primary mode
2312
    for rd in bdevs:
2313
      rd.Open()
2314
  if multimaster:
2315
    msg = "multi-master and primary"
2316
  else:
2317
    msg = "single-master"
2318
  return (True, "Disks are now configured as %s" % msg)
2319

    
2320

    
2321
def DrbdWaitSync(nodes_ip, disks):
2322
  """Wait until DRBDs have synchronized.
2323

2324
  """
2325
  status, bdevs = _FindDisks(nodes_ip, disks)
2326
  if not status:
2327
    return status, bdevs
2328

    
2329
  min_resync = 100
2330
  alldone = True
2331
  failure = False
2332
  for rd in bdevs:
2333
    stats = rd.GetProcStatus()
2334
    if not (stats.is_connected or stats.is_in_resync):
2335
      failure = True
2336
      break
2337
    alldone = alldone and (not stats.is_in_resync)
2338
    if stats.sync_percent is not None:
2339
      min_resync = min(min_resync, stats.sync_percent)
2340
  return (not failure, (alldone, min_resync))
2341

    
2342

    
2343
class HooksRunner(object):
2344
  """Hook runner.
2345

2346
  This class is instantiated on the node side (ganeti-noded) and not
2347
  on the master side.
2348

2349
  """
2350
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2351

    
2352
  def __init__(self, hooks_base_dir=None):
2353
    """Constructor for hooks runner.
2354

2355
    @type hooks_base_dir: str or None
2356
    @param hooks_base_dir: if not None, this overrides the
2357
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2358

2359
    """
2360
    if hooks_base_dir is None:
2361
      hooks_base_dir = constants.HOOKS_BASE_DIR
2362
    self._BASE_DIR = hooks_base_dir
2363

    
2364
  @staticmethod
2365
  def ExecHook(script, env):
2366
    """Exec one hook script.
2367

2368
    @type script: str
2369
    @param script: the full path to the script
2370
    @type env: dict
2371
    @param env: the environment with which to exec the script
2372
    @rtype: tuple (success, message)
2373
    @return: a tuple of success and message, where success
2374
        indicates the succes of the operation, and message
2375
        which will contain the error details in case we
2376
        failed
2377

2378
    """
2379
    # exec the process using subprocess and log the output
2380
    fdstdin = None
2381
    try:
2382
      fdstdin = open("/dev/null", "r")
2383
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2384
                               stderr=subprocess.STDOUT, close_fds=True,
2385
                               shell=False, cwd="/", env=env)
2386
      output = ""
2387
      try:
2388
        output = child.stdout.read(4096)
2389
        child.stdout.close()
2390
      except EnvironmentError, err:
2391
        output += "Hook script error: %s" % str(err)
2392

    
2393
      while True:
2394
        try:
2395
          result = child.wait()
2396
          break
2397
        except EnvironmentError, err:
2398
          if err.errno == errno.EINTR:
2399
            continue
2400
          raise
2401
    finally:
2402
      # try not to leak fds
2403
      for fd in (fdstdin, ):
2404
        if fd is not None:
2405
          try:
2406
            fd.close()
2407
          except EnvironmentError, err:
2408
            # just log the error
2409
            #logging.exception("Error while closing fd %s", fd)
2410
            pass
2411

    
2412
    return result == 0, output
2413

    
2414
  def RunHooks(self, hpath, phase, env):
2415
    """Run the scripts in the hooks directory.
2416

2417
    @type hpath: str
2418
    @param hpath: the path to the hooks directory which
2419
        holds the scripts
2420
    @type phase: str
2421
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2422
        L{constants.HOOKS_PHASE_POST}
2423
    @type env: dict
2424
    @param env: dictionary with the environment for the hook
2425
    @rtype: list
2426
    @return: list of 3-element tuples:
2427
      - script path
2428
      - script result, either L{constants.HKR_SUCCESS} or
2429
        L{constants.HKR_FAIL}
2430
      - output of the script
2431

2432
    @raise errors.ProgrammerError: for invalid input
2433
        parameters
2434

2435
    """
2436
    if phase == constants.HOOKS_PHASE_PRE:
2437
      suffix = "pre"
2438
    elif phase == constants.HOOKS_PHASE_POST:
2439
      suffix = "post"
2440
    else:
2441
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2442
    rr = []
2443

    
2444
    subdir = "%s-%s.d" % (hpath, suffix)
2445
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2446
    try:
2447
      dir_contents = utils.ListVisibleFiles(dir_name)
2448
    except OSError, err:
2449
      # FIXME: must log output in case of failures
2450
      return rr
2451

    
2452
    # we use the standard python sort order,
2453
    # so 00name is the recommended naming scheme
2454
    dir_contents.sort()
2455
    for relname in dir_contents:
2456
      fname = os.path.join(dir_name, relname)
2457
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2458
          self.RE_MASK.match(relname) is not None):
2459
        rrval = constants.HKR_SKIP
2460
        output = ""
2461
      else:
2462
        result, output = self.ExecHook(fname, env)
2463
        if not result:
2464
          rrval = constants.HKR_FAIL
2465
        else:
2466
          rrval = constants.HKR_SUCCESS
2467
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2468

    
2469
    return rr
2470

    
2471

    
2472
class IAllocatorRunner(object):
2473
  """IAllocator runner.
2474

2475
  This class is instantiated on the node side (ganeti-noded) and not on
2476
  the master side.
2477

2478
  """
2479
  def Run(self, name, idata):
2480
    """Run an iallocator script.
2481

2482
    @type name: str
2483
    @param name: the iallocator script name
2484
    @type idata: str
2485
    @param idata: the allocator input data
2486

2487
    @rtype: tuple
2488
    @return: four element tuple of:
2489
       - run status (one of the IARUN_ constants)
2490
       - stdout
2491
       - stderr
2492
       - fail reason (as from L{utils.RunResult})
2493

2494
    """
2495
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2496
                                  os.path.isfile)
2497
    if alloc_script is None:
2498
      return (constants.IARUN_NOTFOUND, None, None, None)
2499

    
2500
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2501
    try:
2502
      os.write(fd, idata)
2503
      os.close(fd)
2504
      result = utils.RunCmd([alloc_script, fin_name])
2505
      if result.failed:
2506
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2507
                result.fail_reason)
2508
    finally:
2509
      os.unlink(fin_name)
2510

    
2511
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2512

    
2513

    
2514
class DevCacheManager(object):
2515
  """Simple class for managing a cache of block device information.
2516

2517
  """
2518
  _DEV_PREFIX = "/dev/"
2519
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2520

    
2521
  @classmethod
2522
  def _ConvertPath(cls, dev_path):
2523
    """Converts a /dev/name path to the cache file name.
2524

2525
    This replaces slashes with underscores and strips the /dev
2526
    prefix. It then returns the full path to the cache file.
2527

2528
    @type dev_path: str
2529
    @param dev_path: the C{/dev/} path name
2530
    @rtype: str
2531
    @return: the converted path name
2532

2533
    """
2534
    if dev_path.startswith(cls._DEV_PREFIX):
2535
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2536
    dev_path = dev_path.replace("/", "_")
2537
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2538
    return fpath
2539

    
2540
  @classmethod
2541
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2542
    """Updates the cache information for a given device.
2543

2544
    @type dev_path: str
2545
    @param dev_path: the pathname of the device
2546
    @type owner: str
2547
    @param owner: the owner (instance name) of the device
2548
    @type on_primary: bool
2549
    @param on_primary: whether this is the primary
2550
        node nor not
2551
    @type iv_name: str
2552
    @param iv_name: the instance-visible name of the
2553
        device, as in objects.Disk.iv_name
2554

2555
    @rtype: None
2556

2557
    """
2558
    if dev_path is None:
2559
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2560
      return
2561
    fpath = cls._ConvertPath(dev_path)
2562
    if on_primary:
2563
      state = "primary"
2564
    else:
2565
      state = "secondary"
2566
    if iv_name is None:
2567
      iv_name = "not_visible"
2568
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2569
    try:
2570
      utils.WriteFile(fpath, data=fdata)
2571
    except EnvironmentError, err:
2572
      logging.exception("Can't update bdev cache for %s", dev_path)
2573

    
2574
  @classmethod
2575
  def RemoveCache(cls, dev_path):
2576
    """Remove data for a dev_path.
2577

2578
    This is just a wrapper over L{utils.RemoveFile} with a converted
2579
    path name and logging.
2580

2581
    @type dev_path: str
2582
    @param dev_path: the pathname of the device
2583

2584
    @rtype: None
2585

2586
    """
2587
    if dev_path is None:
2588
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2589
      return
2590
    fpath = cls._ConvertPath(dev_path)
2591
    try:
2592
      utils.RemoveFile(fpath)
2593
    except EnvironmentError, err:
2594
      logging.exception("Can't update bdev cache for %s", dev_path)