Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 109 additions & 65 deletions solver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# python >= 3.7

from lattice import *
from partt import *
from rm_solver import RMSolver
Expand All @@ -14,6 +14,7 @@
import logging
import unittest


class PartitionDerivationSolver:
def __init__(self, partt=SequentialPartition(), lat=TwoPointLattice(), timeout=3):
assert isinstance(lat, Lattice)
Expand All @@ -40,7 +41,7 @@ def time_check(self):
return True
return False

def sound_derive(self, pconset:List[PCon], p):
def sound_derive(self, pconset: List[PCon], p):
conset = []
for pcon in pconset:
if self.time_check():
Expand All @@ -50,7 +51,7 @@ def sound_derive(self, pconset:List[PCon], p):
# print("Sound Derive for {0}: {1}".format(p, pretty_conset_print(conset)))
return conset

def complete_derive(self, pconset:List[PCon], p):
def complete_derive(self, pconset: List[PCon], p):
conset = []
for pcon in pconset:
if self.time_check():
Expand All @@ -60,7 +61,7 @@ def complete_derive(self, pconset:List[PCon], p):
# print("Complete Derive for {0}: {1}".format(p, pretty_conset_print(conset)))
return conset

def solve_one_shot(self, pconset:List[PCon]):
def solve_one_shot(self, pconset: List[PCon]):
done = {}
self.partt.init_partt(pconset)
self.partt.one_shot_partt()
Expand All @@ -73,15 +74,17 @@ def solve_one_shot(self, pconset:List[PCon]):
return False
if globals.DEBUG:
print(".")
if self.rm.solve(self.sound_derive(pconset, predicate)): # sound == complete, either is good
if self.rm.solve(
self.sound_derive(pconset, predicate)
): # sound == complete, either is good
done[predicate] = self.rm.solution
else:
self.unsolved = [predicate]
return False
self.solution = done
return True

def solve_early_accept(self, pconset:List[PCon])->bool:
def solve_early_accept(self, pconset: List[PCon]) -> bool:
done = {}
self.partt.init_partt(pconset)
while not self.partt.stop_partt:
Expand Down Expand Up @@ -110,7 +113,7 @@ def solve_early_accept(self, pconset:List[PCon])->bool:
self.unsolved = self.partt.parttset
return False

def solve_early_reject(self, pconset:List[PCon])->bool:
def solve_early_reject(self, pconset: List[PCon]) -> bool:
done = {}
self.partt.init_partt(pconset)
while not self.partt.stop_partt:
Expand All @@ -135,7 +138,7 @@ def solve_early_reject(self, pconset:List[PCon])->bool:
self.solution = done
return True

def solve_hybrid(self, pconset:List[PCon])->bool:
def solve_hybrid(self, pconset: List[PCon]) -> bool:
done = {}
self.partt.init_partt(pconset)
while not self.partt.stop_partt:
Expand Down Expand Up @@ -167,17 +170,24 @@ def solve_hybrid(self, pconset:List[PCon])->bool:
# If reach here: the partition algorithm does not reach an equivalent state at the last iteration
raise PartitionNotEquivalentAtEndError

def solve_constraint(self, pconset:List[PCon], approach=globals.EARLY_ACCETP_APPROACH)->bool:
def solve_constraint(
self, pconset: List[PCon], approach=globals.EARLY_ACCETP_APPROACH
) -> bool:
self.partt.z3.init_pconset(pconset)

print(self.partt.__class__.__name__ + " Partition, " + globals.Approach[approach] + " Approach:")
print(
self.partt.__class__.__name__
+ " Partition, "
+ globals.Approach[approach]
+ " Approach:"
)

solve = False
for app in globals.Approach.keys():
if approach == app:
t_start = time.time()
self.set_stop_time(t_start + 60*self.timeout)
solve = getattr(self, "solve_"+globals.Approach[app].lower())(pconset)
self.set_stop_time(t_start + 60 * self.timeout)
solve = getattr(self, "solve_" + globals.Approach[app].lower())(pconset)
self.time = time.time() - t_start
break

Expand All @@ -194,7 +204,13 @@ def solve_constraint(self, pconset:List[PCon], approach=globals.EARLY_ACCETP_APP
def pretty_solution(self):
msg = "{\n"
for predicate in self.solution:
msg += "\t" + predicate + " => " + RMSolver.pretty_solution(self.solution[predicate]) + "\n"
msg += (
"\t"
+ predicate
+ " => "
+ RMSolver.pretty_solution(self.solution[predicate])
+ "\n"
)
msg += "}"
return msg

Expand All @@ -204,68 +220,96 @@ def test_solver_combination(self):
solver = PartitionDerivationSolver(CombinationPartition())
parser = CoreConstraintParser()
for i in globals.Approach.keys():
self.assertTrue(solver.solve_constraint(parser.parse('''True => L <: ax And az <: ax; dddd>0 => H <: ay; Not(dddd>0) => L <: ay;
dddd>0 => H<: ay; dddd<0=> ay<:ax; True=>ax<:L'''), i))
self.assertTrue(
solver.solve_constraint(
parser.parse(
"""True => L <: ax And az <: ax; dddd>0 => H <: ay; Not(dddd>0) => L <: ay;
dddd>0 => H<: ay; dddd<0=> ay<:ax; True=>ax<:L"""
),
i,
)
)
for i in globals.Approach.keys():
self.assertFalse(solver.solve_constraint(parser.parse('''a>0=>H<:a And H<:c; a<0 => a<:L And L <: b; b>0=>H <:b; And(bbbb<0, (dddd<0)) => b<:L;
c>0=>H<:c And L<:a; c<0=>c<:L'''), i))
self.assertFalse(
solver.solve_constraint(
parser.parse(
"""a>0=>H<:a And H<:c; a<0 => a<:L And L <: b; b>0=>H <:b; And(bbbb<0, (dddd<0)) => b<:L;
c>0=>H<:c And L<:a; c<0=>c<:L"""
),
i,
)
)

def test_solver_sequential(self):
solver = PartitionDerivationSolver(SequentialPartition())
parser = CoreConstraintParser()
for i in globals.Approach.keys():
self.assertTrue(solver.solve_constraint(parser.parse('''True => L <: ax And az <: ax; d>0 => H <: ay; Not(d>0) => L <: ay;
d>0 => H<: ay; d<0=> ay<:ax; True=>ax<:L'''), i))
self.assertTrue(
solver.solve_constraint(
parser.parse(
"""True => L <: ax And az <: ax; d>0 => H <: ay; Not(d>0) => L <: ay;
d>0 => H<: ay; d<0=> ay<:ax; True=>ax<:L"""
),
i,
)
)
for i in globals.Approach.keys():
self.assertFalse(solver.solve_constraint(parser.parse('''a>0=>H<:a And H<:c; a<0 => a<:L And L <: b; b>0=>H <:b; And(b<0, (d<0)) => b<:L;
c>0=>H<:c And L<:a; c<0=>c<:L'''), i))
self.assertFalse(
solver.solve_constraint(
parser.parse(
"""a>0=>H<:a And H<:c; a<0 => a<:L And L <: b; b>0=>H <:b; And(b<0, (d<0)) => b<:L;
c>0=>H<:c And L<:a; c<0=>c<:L"""
),
i,
)
)


if __name__ == '__main__':
# if len(sys.argv) <= 1:
# print ("Usage: test_solver.py [test_file] -op [op]")
# print ("\t [test_file]: test file .con; use \'all\' to run all the file under tests/")
# print ("\t -partt [0-1]: -partt [0-1]: specifying partition algorithm: 0=sequential,")
# print ("\t\t\t1=combinational; if not specified, it tests all the partt ")
# print ("\t\t\talgorithms")
# print ("\t -appr [0-3]: specifying approaches: 0=hybrid, 1=early-accept, 2=one-shot,")
# print ("\t\t\t3=early-reject; if not specified, it tests [0,1,2] approaches.")
# print ("\t -time i : specifying time-out minutes; default i=3 min.")
# print ("\t -debug [0-1] : print debug message; default 1 with debug message on")
if __name__ == "__main__":
# if len(sys.argv) <= 1:
# print ("Usage: test_solver.py [test_file] -op [op]")
# print ("\t [test_file]: test file .con; use \'all\' to run all the file under tests/")
# print ("\t -partt [0-1]: -partt [0-1]: specifying partition algorithm: 0=sequential,")
# print ("\t\t\t1=combinational; if not specified, it tests all the partt ")
# print ("\t\t\talgorithms")
# print ("\t -appr [0-3]: specifying approaches: 0=hybrid, 1=early-accept, 2=one-shot,")
# print ("\t\t\t3=early-reject; if not specified, it tests [0,1,2] approaches.")
# print ("\t -time i : specifying time-out minutes; default i=3 min.")
# print ("\t -debug [0-1] : print debug message; default 1 with debug message on")

# else:
# file_name = output_file_name("Test1")
# # print file_name
# # try:
# partt = globals.ParttAlg.keys()
# appr = globals.Approach.keys()
# #appr.pop(globals.EARLY_REJECT_APPROACH)

# for i in range(2, len(sys.argv)):
# if sys.argv[i] == "-partt":
# partt = [int(sys.argv[i+1])]
# print (partt)
# elif sys.argv[i] == "-appr":
# appr = [int(sys.argv[i+1])]
# elif sys.argv[i] == "-time":
# globals.STOP_MIN = float(sys.argv[i+1])
# elif sys.argv[i] == "-debug":
# globals.DEBUG = int(sys.argv[i+1])
# else:
# file_name = output_file_name("Test1")
# # print file_name
# # try:
# partt = globals.ParttAlg.keys()
# appr = globals.Approach.keys()
# #appr.pop(globals.EARLY_REJECT_APPROACH)

# file_test = sys.argv[1]
# if sys.argv[1] == "all":
# file_test = [TEST_DIR + file for file in os.listdir(TEST_DIR)]
# for test_file_name in file_test:
# test_file(test_file_name, partt, appr)
# else:
# test_file(file_test, partt, appr)
# for i in range(2, len(sys.argv)):
# if sys.argv[i] == "-partt":
# partt = [int(sys.argv[i+1])]
# print (partt)
# elif sys.argv[i] == "-appr":
# appr = [int(sys.argv[i+1])]
# elif sys.argv[i] == "-time":
# globals.STOP_MIN = float(sys.argv[i+1])
# elif sys.argv[i] == "-debug":
# globals.DEBUG = int(sys.argv[i+1])

# # save result:
# if files:
# store_result_file(file_name, files, number, partt_alg, perform)
# file_test = sys.argv[1]
# if sys.argv[1] == "all":
# file_test = [TEST_DIR + file for file in os.listdir(TEST_DIR)]
# for test_file_name in file_test:
# test_file(test_file_name, partt, appr)
# else:
# test_file(file_test, partt, appr)

# if sys.argv[1] == 'all':
# plot_line_chart(file_name + plt_ext, partt, appr, number, partt_alg, perform)
unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)
#unittest.main()
#unittest.main(verbosity=2)
# # save result:
# if files:
# store_result_file(file_name, files, number, partt_alg, perform)

# if sys.argv[1] == 'all':
# plot_line_chart(file_name + plt_ext, partt, appr, number, partt_alg, perform)
unittest.main(argv=["first-arg-is-ignored"], exit=False, verbosity=2)
# unittest.main()
# unittest.main(verbosity=2)
Loading