diff --git a/blocksync.py b/blocksync.py index 1654395..018c2e6 100644 --- a/blocksync.py +++ b/blocksync.py @@ -291,11 +291,21 @@ def sync(workerid, srcdev, dsthost, dstdev, options): t0 = time.time() t_last = t0 + + if os.path.isfile("syncpos" + f.name.replace('/','___') + "_w_" + str(workerid) + "_of_" + str(options.workers)): + syncposlog = open("syncpos" + f.name.replace('/','___') + "_w_" + str(workerid) + "_of_" + str(options.workers),"r") + startpos = int(syncposlog.readline()) - blocksize + same_blocks = ( (startpos - (chunksize * workerid)) / blocksize ) + syncposlog.close() + print("[worker %d] Start on %d of previous run" % (workerid, startpos), file = options.outfile) + f.seek(startpos) + size_blocks = ceil(chunksize / float(blocksize)) p_in.write(bytes(("%d\n%d\n" % (startpos, size_blocks)).encode("UTF-8"))) p_in.flush() print("[worker %d] Start syncing %d blocks..." % (workerid, size_blocks), file = options.outfile) + for l_block in getblocks(f, blocksize): l1_sum = hash1(l_block).digest() r1_sum = p_out.read(hash1len) @@ -319,13 +329,12 @@ def sync(workerid, srcdev, dsthost, dstdev, options): p_in.flush() p_in.write(l_block) p_in.flush() - + syncposlog = open("syncpos" + f.name.replace('/','___') + "_w_" + str(workerid) + "_of_" + str(options.workers),"w+") + syncposlog.write(str(f.tell())) + syncposlog.close() if pause_ms: time.sleep(pause_ms) - if not interactive: - continue - t1 = float(time.time()) if (t1 - t_last) >= interval: done_blocks = same_blocks + diff_blocks @@ -335,7 +344,7 @@ def sync(workerid, srcdev, dsthost, dstdev, options): last_blocks = done_blocks t_last = t1 - if (same_blocks + diff_blocks) == size_blocks: + if (same_blocks + diff_blocks) >= size_blocks: break rate = size_blocks * blocksize / (1024.0 * 1024) / (time.time() - t0)