Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 61 additions & 24 deletions blocksync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3
"""
Synchronise block devices over the network

Expand Down Expand Up @@ -54,14 +54,17 @@
else:
USE_NOREUSE = USE_DONTNEED = False

def do_create(f, size):
f = open(f, 'a', 0)
def wrap(x):
return ("'%s'" % x)

def do_create(dev, size):
f = open(dev, 'wb', 0)
f.truncate(size)
f.close()


def do_open(f, mode):
f = open(f, mode)
def do_open(dev, mode):
f = open(dev, mode)
if USE_NOREUSE:
fadvise(f, 0, 0, POSIX_FADV_NOREUSE)
f.seek(0, 2)
Expand Down Expand Up @@ -106,8 +109,19 @@ def server(dev, deleteonexit, options):
if size > 0:
do_create(dev, size)

print(dev, blocksize)
f, size = do_open(dev, 'rb+')
print(dev)
print(blocksize)
try:
if options.reverse:
f, size = do_open(dev, 'rb')
else:
f, size = do_open(dev, 'rb+')

except Exception as e:
# Error accessing file
print("-1")
return

print(size)
sys.stdout.flush()

Expand All @@ -130,12 +144,16 @@ def server(dev, deleteonexit, options):
stdout.flush()
res = stdin.read(COMPLEN)
if res == DIFF:
newblock = stdin.read(blocksize)
newblocklen = len(newblock)
f.seek(-newblocklen, 1)
f.write(newblock)
if USE_DONTNEED:
fadvise(f, f.tell() - newblocklen, newblocklen, POSIX_FADV_DONTNEED)
if options.reverse:
stdout.write(block)
stdout.flush()
else:
newblock = stdin.read(blocksize)
newblocklen = len(newblock)
f.seek(-newblocklen, 1)
f.write(newblock)
if USE_DONTNEED:
fadvise(f, f.tell() - newblocklen, newblocklen, POSIX_FADV_DONTNEED)
if i == maxblock:
break

Expand Down Expand Up @@ -185,7 +203,10 @@ def sync(workerid, srcdev, dsthost, dstdev, options):
print("[worker %d] Local fadvise: %s" % (workerid, fadv), file = options.outfile)

try:
f, size = do_open(srcdev, 'rb')
if options.reverse:
f, size = do_open(srcdev, 'rb+')
else:
f, size = do_open(srcdev, 'rb')
except Exception as e:
print("[worker %d] Error accessing source device! %s" % (workerid, e), file = options.outfile)
sys.exit(1)
Expand Down Expand Up @@ -237,12 +258,15 @@ def sync(workerid, srcdev, dsthost, dstdev, options):
servercmd = 'tmpserver'
remotescript = copy_self(workerid, cmd)

cmd += [options.interpreter, remotescript, servercmd, dstdev, '-b', str(blocksize)]
cmd += [options.interpreter, remotescript, servercmd, wrap(dstdev), '-b', str(blocksize)]

cmd += ['-d', str(options.fadvise), '-1', options.hash]
if options.addhash:
cmd += ['-2', options.addhash]

if options.reverse:
cmd += ['-R']

print("[worker %d] Running: %s" % (workerid, " ".join(cmd[2 if options.passenv and (dsthost != 'localhost') else 0:])), file = options.outfile)

p = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True)
Expand All @@ -257,16 +281,18 @@ def sync(workerid, srcdev, dsthost, dstdev, options):
fadv = p_out.readline().decode('UTF-8').strip()
print("[worker %d] Remote fadvise: %s" % (workerid, fadv), file = options.outfile)

p_in.write(bytes(("%d\n" % (size if options.createdest else 0)).encode("UTF-8")))
p_in.write(bytes(("%d\n" % (size if options.createdest and not dryrun else 0)).encode("UTF-8")))
p_in.flush()

line = p_out.readline().decode('UTF-8')
# line = p_out.readline().decode('UTF-8')
a = p_out.readline().decode('UTF-8').strip()
b = p_out.readline().decode('UTF-8').strip()
p.poll()
if p.returncode is not None:
print("[worker %d] Failed creating destination file on the remote host!" % workerid, file = options.outfile)
sys.exit(1)

a, b = line.split()
# a, b = line.split()
if a != dstdev:
print("[worker %d] Dest device (%s) doesn't match with the remote host (%s)!" % (workerid, dstdev, a), file = options.outfile)
sys.exit(1)
Expand All @@ -280,6 +306,9 @@ def sync(workerid, srcdev, dsthost, dstdev, options):
print("[worker %d] Error accessing device on remote host!" % workerid, file = options.outfile)
sys.exit(1)
remote_size = int(line)
if remote_size < 0:
print("[worker %d] Remote device doesn't exists!" % workerid, file = options.outfile)
sys.exit(1)
if size > remote_size:
print("[worker %d] Source device size (%d) doesn't fit into remote device size (%d)!" % (workerid, size, remote_size), file = options.outfile)
sys.exit(1)
Expand Down Expand Up @@ -317,9 +346,16 @@ def sync(workerid, srcdev, dsthost, dstdev, options):
else:
p_in.write(DIFF)
p_in.flush()
p_in.write(l_block)
p_in.flush()

if options.reverse:
newblock = p_out.read(blocksize)
newblocklen = len(newblock)
f.seek(-newblocklen, 1)
f.write(newblock)
if USE_DONTNEED:
fadvise(f, f.tell() - newblocklen, newblocklen, POSIX_FADV_DONTNEED)
else:
p_in.write(l_block)
p_in.flush()
if pause_ms:
time.sleep(pause_ms)

Expand Down Expand Up @@ -355,16 +391,17 @@ def sync(workerid, srcdev, dsthost, dstdev, options):
parser.add_option("-2", "--additionalhash", dest = "addhash", help = "second hash used for extra comparison (default is none)")
parser.add_option("-d", "--fadvise", dest = "fadvise", type = "int", help = "lower cache pressure by using posix_fadivse (requires Python 3 or python-fadvise; 0 = off, 1 = local on, 2 = remote on, 3 = both on; defaults to 3)", default = 3)
parser.add_option("-p", "--pause", dest = "pause", type="int", help = "pause between processing blocks, reduces system load (ms, defaults to 0)", default = 0)
parser.add_option("-c", "--cipher", dest = "cipher", help = "cipher specification for SSH (defaults to blowfish)", default = "blowfish")
parser.add_option("-C", "--compress", dest = "compress", action = "store_true", help = "enable compression over SSH (defaults to on)", default = True)
parser.add_option("-c", "--cipher", dest = "cipher", help = "cipher specification for SSH (defaults to aes128-ctr)", default = "aes128-ctr")
parser.add_option("-N", "--nocompress", dest = "compress", action = "store_false", help = "enable compression over SSH (defaults to on)", default = True)
parser.add_option("-R", "--reverse", dest = "reverse", action = "store_true", help = "Remote location to local", default = False)
parser.add_option("-i", "--id", dest = "keyfile", help = "SSH public key file")
parser.add_option("-P", "--pass", dest = "passenv", help = "environment variable containing SSH password (requires sshpass)")
parser.add_option("-s", "--sudo", dest = "sudo", action = "store_true", help = "use sudo on the remote end (defaults to off)", default = False)
parser.add_option("-x", "--extraparams", dest = "sshparams", help = "additional parameters to pass to SSH")
parser.add_option("-n", "--dryrun", dest = "dryrun", action = "store_true", help = "do a dry run (don't write anything, just report differences)", default = False)
parser.add_option("-T", "--createdest", dest = "createdest", action = "store_true", help = "create destination file using truncate(2). Should be safe for subsequent syncs as truncate only modifies the file when the size differs", default = False)
parser.add_option("-S", "--script", dest = "script", help = "location of script on remote host (otherwise current script is sent over)")
parser.add_option("-I", "--interpreter", dest = "interpreter", help = "[full path to] interpreter used to invoke remote server (defaults to python2)", default = "python2")
parser.add_option("-I", "--interpreter", dest = "interpreter", help = "[full path to] interpreter used to invoke remote server (defaults to python3)", default = "python3")
parser.add_option("-t", "--interval", dest = "interval", type = "int", help = "interval between stats output (seconds, defaults to 1)", default = 1)
parser.add_option("-o", "--output", dest = "outfile", help = "send output to file instead of console")
parser.add_option("-f", "--force", dest = "force", action= "store_true", help = "force sync and DO NOT ask for confirmation if the destination file already exists")
Expand Down