+ @staticmethod
+ def _FormatSubmitError(msg, ops):
+ """Formats errors which occurred while submitting a job.
+
+ """
+ return ("%s; opcodes %s" %
+ (msg, utils.CommaJoin(op.Summary() for op in ops)))
+
+ @staticmethod
+ def _ResolveJobDependencies(resolve_fn, deps):
+ """Resolves relative job IDs in dependencies.
+
+ @type resolve_fn: callable
+ @param resolve_fn: Function to resolve a relative job ID
+ @type deps: list
+ @param deps: Dependencies
+ @rtype: list
+ @return: Resolved dependencies
+
+ """
+ result = []
+
+ for (dep_job_id, dep_status) in deps:
+ if ht.TRelativeJobId(dep_job_id):
+ assert ht.TInt(dep_job_id) and dep_job_id < 0
+ try:
+ job_id = resolve_fn(dep_job_id)
+ except IndexError:
+ # Abort
+ return (False, "Unable to resolve relative job ID %s" % dep_job_id)
+ else:
+ job_id = dep_job_id
+
+ result.append((job_id, dep_status))
+
+ return (True, result)
+
+ def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
+ """Create and store multiple jobs.
+
+ @see: L{_SubmitJobUnlocked}
+
+ """
+ results = []
+ added_jobs = []
+
+ def resolve_fn(job_idx, reljobid):
+ assert reljobid < 0
+ return (previous_job_ids + job_ids[:job_idx])[reljobid]
+
+ for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
+ for op in ops:
+ if getattr(op, opcodes.DEPEND_ATTR, None):
+ (status, data) = \
+ self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
+ op.depends)
+ if not status:
+ # Abort resolving dependencies
+ assert ht.TNonEmptyString(data), "No error message"
+ break
+ # Use resolved dependencies
+ op.depends = data
+ else:
+ try:
+ job = self._SubmitJobUnlocked(job_id, ops)
+ except errors.GenericError, err:
+ status = False
+ data = self._FormatSubmitError(str(err), ops)
+ else:
+ status = True
+ data = job_id
+ added_jobs.append(job)
+
+ results.append((status, data))
+
+ return (results, added_jobs)
+
+ @locking.ssynchronized(_LOCK)