Revision 23752136 lib/jqueue.py

b/lib/jqueue.py
290 290
    assert self._last_serial is not None, ("Serial file was modified between"
291 291
                                           " check in jstore and here")
292 292

  
293
    # Get initial list of nodes
294
    self._nodes = self.context.cfg.GetNodeList()
295

  
296
    # TODO: Check consistency across nodes
297

  
293 298
    # Setup worker pool
294 299
    self._wpool = _JobQueueWorkerPool(self)
295 300

  
......
314 319
    finally:
315 320
      self.release()
316 321

  
322
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
323
    """Writes a file locally and then replicates it to all nodes.
324

  
325
    """
326
    utils.WriteFile(file_name, data=data)
327

  
328
    nodes = self._nodes[:]
329

  
330
    # Remove master node
331
    try:
332
      nodes.remove(self._my_hostname)
333
    except ValueError:
334
      pass
335

  
336
    failed_nodes = 0
337
    result = rpc.call_upload_file(nodes, file_name)
338
    for node in nodes:
339
      if not result[node]:
340
        failed_nodes += 1
341
        logging.error("Copy of job queue file to node %s failed", node)
342

  
343
    # TODO: check failed_nodes
344

  
317 345
  def _FormatJobID(self, job_id):
318 346
    if not isinstance(job_id, (int, long)):
319 347
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
......
334 362
    serial = self._last_serial + 1
335 363

  
336 364
    # Write to file
337
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
338
                    data="%s\n" % serial)
365
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
366
                                        "%s\n" % serial)
339 367

  
340 368
    # Keep it only if we were able to write the file
341 369
    self._last_serial = serial
342 370

  
343
    # Distribute the serial to the other nodes
344
    try:
345
      nodes.remove(self._my_hostname)
346
    except ValueError:
347
      pass
348

  
349
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
350
    for node in nodes:
351
      if not result[node]:
352
        logging.error("copy of job queue file to node %s failed", node)
353

  
354 371
    return self._FormatJobID(serial)
355 372

  
356 373
  @staticmethod
......
450 467
  @_RequireOpenQueue
451 468
  def UpdateJobUnlocked(self, job):
452 469
    filename = self._GetJobPath(job.id)
470
    data = serializer.DumpJson(job.Serialize(), indent=False)
453 471
    logging.debug("Writing job %s to %s", job.id, filename)
454
    utils.WriteFile(filename,
455
                    data=serializer.DumpJson(job.Serialize(), indent=False))
472
    self._WriteAndReplicateFileUnlocked(filename, data)
456 473
    self._CleanCacheUnlocked([job.id])
457 474

  
458 475
  def _CleanCacheUnlocked(self, exclude):

Also available in: Unified diff