Revision c8457ce7
b/daemons/ganeti-noded | ||
---|---|---|
708 | 708 |
|
709 | 709 |
""" |
710 | 710 |
# TODO: What if a file fails to rename? |
711 |
return [backend.JobQueueRename(old, new) for old, new in params] |
|
711 |
return True, [backend.JobQueueRename(old, new) for old, new in params]
|
|
712 | 712 |
|
713 | 713 |
@staticmethod |
714 | 714 |
def perspective_jobqueue_set_drain(params): |
b/lib/backend.py | ||
---|---|---|
147 | 147 |
def JobQueuePurge(): |
148 | 148 |
"""Removes job queue files and archived jobs. |
149 | 149 |
|
150 |
@rtype: None |
|
150 |
@rtype: tuple |
|
151 |
@return: True, None |
|
151 | 152 |
|
152 | 153 |
""" |
153 | 154 |
_CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) |
154 | 155 |
_CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR) |
156 |
return True, None |
|
155 | 157 |
|
156 | 158 |
|
157 | 159 |
def GetMasterInfo(): |
... | ... | |
2138 | 2140 |
return True, None |
2139 | 2141 |
|
2140 | 2142 |
|
2141 |
def _IsJobQueueFile(file_name):
|
|
2143 |
def _EnsureJobQueueFile(file_name):
|
|
2142 | 2144 |
"""Checks whether the given filename is in the queue directory. |
2143 | 2145 |
|
2144 | 2146 |
@type file_name: str |
2145 | 2147 |
@param file_name: the file name we should check |
2146 |
@rtype: boolean
|
|
2147 |
@return: whether the file is under the queue directory
|
|
2148 |
@rtype: None
|
|
2149 |
@raises RPCFail: if the file is not valid
|
|
2148 | 2150 |
|
2149 | 2151 |
""" |
2150 | 2152 |
queue_dir = os.path.normpath(constants.QUEUE_DIR) |
2151 | 2153 |
result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) |
2152 | 2154 |
|
2153 | 2155 |
if not result: |
2154 |
logging.error("'%s' is not a file in the queue directory", |
|
2155 |
file_name) |
|
2156 |
|
|
2157 |
return result |
|
2156 |
_Fail("Passed job queue file '%s' does not belong to" |
|
2157 |
" the queue directory '%s'", file_name, queue_dir) |
|
2158 | 2158 |
|
2159 | 2159 |
|
2160 | 2160 |
def JobQueueUpdate(file_name, content): |
... | ... | |
2171 | 2171 |
@return: the success of the operation |
2172 | 2172 |
|
2173 | 2173 |
""" |
2174 |
if not _IsJobQueueFile(file_name): |
|
2175 |
return False |
|
2174 |
_EnsureJobQueueFile(file_name) |
|
2176 | 2175 |
|
2177 | 2176 |
# Write and replace the file atomically |
2178 | 2177 |
utils.WriteFile(file_name, data=_Decompress(content)) |
2179 | 2178 |
|
2180 |
return True |
|
2179 |
return True, None
|
|
2181 | 2180 |
|
2182 | 2181 |
|
2183 | 2182 |
def JobQueueRename(old, new): |
... | ... | |
2189 | 2188 |
@param old: the old (actual) file name |
2190 | 2189 |
@type new: str |
2191 | 2190 |
@param new: the desired file name |
2192 |
@rtype: boolean
|
|
2193 |
@return: the success of the operation |
|
2191 |
@rtype: tuple
|
|
2192 |
@return: the success of the operation and payload
|
|
2194 | 2193 |
|
2195 | 2194 |
""" |
2196 |
if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
|
|
2197 |
return False
|
|
2195 |
_EnsureJobQueueFile(old)
|
|
2196 |
_EnsureJobQueueFile(new)
|
|
2198 | 2197 |
|
2199 | 2198 |
utils.RenameFile(old, new, mkdir=True) |
2200 | 2199 |
|
2201 |
return True |
|
2200 |
return True, None
|
|
2202 | 2201 |
|
2203 | 2202 |
|
2204 | 2203 |
def JobQueueSetDrainFlag(drain_flag): |
... | ... | |
2208 | 2207 |
|
2209 | 2208 |
@type drain_flag: boolean |
2210 | 2209 |
@param drain_flag: if True, will set the drain flag, otherwise reset it. |
2211 |
@rtype: boolean
|
|
2212 |
@return: always True |
|
2210 |
@rtype: truple
|
|
2211 |
@return: always True, None
|
|
2213 | 2212 |
@warning: the function always returns True |
2214 | 2213 |
|
2215 | 2214 |
""" |
... | ... | |
2218 | 2217 |
else: |
2219 | 2218 |
utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) |
2220 | 2219 |
|
2221 |
return True |
|
2220 |
return True, None
|
|
2222 | 2221 |
|
2223 | 2222 |
|
2224 | 2223 |
def BlockdevClose(instance_name, disks): |
b/lib/jqueue.py | ||
---|---|---|
601 | 601 |
assert node_name != self._my_hostname |
602 | 602 |
|
603 | 603 |
# Clean queue directory on added node |
604 |
rpc.RpcRunner.call_jobqueue_purge(node_name) |
|
604 |
result = rpc.RpcRunner.call_jobqueue_purge(node_name) |
|
605 |
msg = result.RemoteFailMsg() |
|
606 |
if msg: |
|
607 |
logging.warning("Cannot cleanup queue directory on node %s: %s", |
|
608 |
node_name, msg) |
|
605 | 609 |
|
606 | 610 |
if not node.master_candidate: |
607 | 611 |
# remove if existing, ignoring errors |
... | ... | |
626 | 630 |
result = rpc.RpcRunner.call_jobqueue_update([node_name], |
627 | 631 |
[node.primary_ip], |
628 | 632 |
file_name, content) |
629 |
if not result[node_name]: |
|
630 |
logging.error("Failed to upload %s to %s", file_name, node_name) |
|
633 |
msg = result[node_name].RemoteFailMsg() |
|
634 |
if msg: |
|
635 |
logging.error("Failed to upload file %s to node %s: %s", |
|
636 |
file_name, node_name, msg) |
|
631 | 637 |
|
632 | 638 |
self._nodes[node_name] = node.primary_ip |
633 | 639 |
|
... | ... | |
664 | 670 |
success = [] |
665 | 671 |
|
666 | 672 |
for node in nodes: |
667 |
if result[node]: |
|
668 |
success.append(node) |
|
669 |
else: |
|
673 |
msg = result[node].RemoteFailMsg() |
|
674 |
if msg: |
|
670 | 675 |
failed.append(node) |
671 |
|
|
672 |
if failed: |
|
673 |
logging.error("%s failed on %s", failmsg, ", ".join(failed)) |
|
676 |
logging.error("RPC call %s failed on node %s: %s", |
|
677 |
result[node].call, node, msg) |
|
678 |
else: |
|
679 |
success.append(node) |
|
674 | 680 |
|
675 | 681 |
# +1 for the master node |
676 | 682 |
if (len(success) + 1) < len(failed): |
Also available in: Unified diff