Revision c8457ce7

b/daemons/ganeti-noded
708 708

  
709 709
    """
710 710
    # TODO: What if a file fails to rename?
711
    return [backend.JobQueueRename(old, new) for old, new in params]
711
    return True, [backend.JobQueueRename(old, new) for old, new in params]
712 712

  
713 713
  @staticmethod
714 714
  def perspective_jobqueue_set_drain(params):
b/lib/backend.py
147 147
def JobQueuePurge():
148 148
  """Removes job queue files and archived jobs.
149 149

  
150
  @rtype: None
150
  @rtype: tuple
151
  @return: True, None
151 152

  
152 153
  """
153 154
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154 155
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
156
  return True, None
155 157

  
156 158

  
157 159
def GetMasterInfo():
......
2138 2140
  return True, None
2139 2141

  
2140 2142

  
2141
def _IsJobQueueFile(file_name):
2143
def _EnsureJobQueueFile(file_name):
2142 2144
  """Checks whether the given filename is in the queue directory.
2143 2145

  
2144 2146
  @type file_name: str
2145 2147
  @param file_name: the file name we should check
2146
  @rtype: boolean
2147
  @return: whether the file is under the queue directory
2148
  @rtype: None
2149
  @raises RPCFail: if the file is not valid
2148 2150

  
2149 2151
  """
2150 2152
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2151 2153
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2152 2154

  
2153 2155
  if not result:
2154
    logging.error("'%s' is not a file in the queue directory",
2155
                  file_name)
2156

  
2157
  return result
2156
    _Fail("Passed job queue file '%s' does not belong to"
2157
          " the queue directory '%s'", file_name, queue_dir)
2158 2158

  
2159 2159

  
2160 2160
def JobQueueUpdate(file_name, content):
......
2171 2171
  @return: the success of the operation
2172 2172

  
2173 2173
  """
2174
  if not _IsJobQueueFile(file_name):
2175
    return False
2174
  _EnsureJobQueueFile(file_name)
2176 2175

  
2177 2176
  # Write and replace the file atomically
2178 2177
  utils.WriteFile(file_name, data=_Decompress(content))
2179 2178

  
2180
  return True
2179
  return True, None
2181 2180

  
2182 2181

  
2183 2182
def JobQueueRename(old, new):
......
2189 2188
  @param old: the old (actual) file name
2190 2189
  @type new: str
2191 2190
  @param new: the desired file name
2192
  @rtype: boolean
2193
  @return: the success of the operation
2191
  @rtype: tuple
2192
  @return: the success of the operation and payload
2194 2193

  
2195 2194
  """
2196
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2197
    return False
2195
  _EnsureJobQueueFile(old)
2196
  _EnsureJobQueueFile(new)
2198 2197

  
2199 2198
  utils.RenameFile(old, new, mkdir=True)
2200 2199

  
2201
  return True
2200
  return True, None
2202 2201

  
2203 2202

  
2204 2203
def JobQueueSetDrainFlag(drain_flag):
......
2208 2207

  
2209 2208
  @type drain_flag: boolean
2210 2209
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2211
  @rtype: boolean
2212
  @return: always True
2210
  @rtype: truple
2211
  @return: always True, None
2213 2212
  @warning: the function always returns True
2214 2213

  
2215 2214
  """
......
2218 2217
  else:
2219 2218
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2220 2219

  
2221
  return True
2220
  return True, None
2222 2221

  
2223 2222

  
2224 2223
def BlockdevClose(instance_name, disks):
b/lib/jqueue.py
601 601
    assert node_name != self._my_hostname
602 602

  
603 603
    # Clean queue directory on added node
604
    rpc.RpcRunner.call_jobqueue_purge(node_name)
604
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
605
    msg = result.RemoteFailMsg()
606
    if msg:
607
      logging.warning("Cannot cleanup queue directory on node %s: %s",
608
                      node_name, msg)
605 609

  
606 610
    if not node.master_candidate:
607 611
      # remove if existing, ignoring errors
......
626 630
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
627 631
                                                  [node.primary_ip],
628 632
                                                  file_name, content)
629
      if not result[node_name]:
630
        logging.error("Failed to upload %s to %s", file_name, node_name)
633
      msg = result[node_name].RemoteFailMsg()
634
      if msg:
635
        logging.error("Failed to upload file %s to node %s: %s",
636
                      file_name, node_name, msg)
631 637

  
632 638
    self._nodes[node_name] = node.primary_ip
633 639

  
......
664 670
    success = []
665 671

  
666 672
    for node in nodes:
667
      if result[node]:
668
        success.append(node)
669
      else:
673
      msg = result[node].RemoteFailMsg()
674
      if msg:
670 675
        failed.append(node)
671

  
672
    if failed:
673
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
676
        logging.error("RPC call %s failed on node %s: %s",
677
                      result[node].call, node, msg)
678
      else:
679
        success.append(node)
674 680

  
675 681
    # +1 for the master node
676 682
    if (len(success) + 1) < len(failed):

Also available in: Unified diff