Skip to content
This repository was archived by the owner on Jan 7, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions conf/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import struct
from collections import OrderedDict
import argparse
import threading

cetune_log_file = "../conf/cetune_process.log"
cetune_error_file = "../conf/cetune_error.log"
Expand Down Expand Up @@ -607,3 +608,31 @@ def parse_disk_format( disk_format_str ):
disk_type_list = disk_format_str.split(":")
return disk_type_list

class FuncThread(threading.Thread):
tlist = []
atlist = []
maxthreads = 8
evnt = threading.Event()
lck = threading.Lock()
def __init__(self, method, *args):
self._method = method
self._args = args
threading.Thread.__init__(self)

def run(self):
self._method(*self._args)
FuncThread.lck.acquire()
FuncThread.tlist.remove(self)
if len(FuncThread.tlist) == FuncThread.maxthreads - 1:
FuncThread.evnt.set()
FuncThread.evnt.clear()
FuncThread.lck.release()

@staticmethod
def newthread(method, *args):
FuncThread.lck.acquire()
ft = FuncThread(method, *args)
FuncThread.tlist.append(ft)
FuncThread.atlist.append(ft)
FuncThread.lck.release()
ft.start()
1 change: 1 addition & 0 deletions conf/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def list_required_config(self):
required_list["cluster"]["list_mon"] = ""
required_list["cluster"]["enable_rgw"] = "false"
required_list["cluster"]["disk_format"] = "osd:journal"
required_list["cluster"]["parallel_deploy"] = "true"
required_list["ceph_hard_config"] = OrderedDict()
required_list["ceph_hard_config"]["public_network"] = ""
required_list["ceph_hard_config"]["cluster_network"] = ""
Expand Down
1 change: 1 addition & 0 deletions conf/help.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ cosbench_controller = A host runs cosbench controller on, this node should be ab
cosbench_version = set to v0.4.c2 by default, when run a cosbench benchmark test, this cosbench version will be retained from github
custom_script = if benchmark_driver set to 'hook', user can add a '*.bash/*.sh' here, and CeTune will run this script at runtime. Ex: "bash hook.bash"
disk_format = disk type and sequence of ${hostname}.
parallel_deploy = true/false; whether to deploy ceph cluster in parallel.
disable_tuning_check = if true, cetune will not check ceph.conf before benchmark.
distributed_data_process = if true, cetune will distribute the data process work to all nodes and summarize in controller node. This option requires running controller_dependencies_install.py on all cetune workers.
45 changes: 39 additions & 6 deletions deploy/mod/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def cal_cephmap_diff(self, ceph_disk=False):

return cephconf_dict

def redeploy(self, gen_cephconf, ceph_disk=False):
def redeploy(self, gen_cephconf, ceph_disk=False, parallel_deploy=False):
common.printout("LOG","ceph.conf file generated")
if self.cluster["clean_build"] == "true":
clean_build = True
Expand All @@ -477,7 +477,7 @@ def redeploy(self, gen_cephconf, ceph_disk=False):
self.make_mon()
common.printout("LOG","Succeeded in building mon daemon")
common.printout("LOG","Started to build osd daemon")
self.make_osds(ceph_disk=ceph_disk)
self.make_osds(ceph_disk=ceph_disk, parallel_deploy=parallel_deploy)
common.printout("LOG","Succeeded in building osd daemon")
common.bash("cp -f ../conf/ceph.conf ../conf/ceph_current_status")

Expand Down Expand Up @@ -560,7 +560,7 @@ def distribute_conf(self):
common.pdsh(user, [node], "rm -rf /etc/ceph/ceph.conf")
common.scp(user, node, "../conf/ceph.conf", "/etc/ceph/")

def make_osds(self, osds=None, diff_map=None, ceph_disk=False):
def make_osds(self, osds=None, diff_map=None, ceph_disk=False, parallel_deploy=False):
user = self.cluster["user"]
if osds==None:
osds = sorted(self.cluster["osds"])
Expand All @@ -575,20 +575,53 @@ def make_osds(self, osds=None, diff_map=None, ceph_disk=False):
for line in mount_list_tmp.split('\n'):
tmp = line.split()
mount_list[node][tmp[0]] = tmp[2]

opts_mkfs = {}
opts = []
maxlen_od = 0
for osd in osds:
opts_mkfs[osd] = []
maxlen_od = len(diff_map[osd]) if len(diff_map[osd]) > maxlen_od else maxlen_od
for device_bundle_tmp in diff_map[osd]:
device_bundle = common.get_list(device_bundle_tmp)
osd_device = device_bundle[0][0]
journal_device = None
if self.cluster["ceph_conf"]["global"]["osd_objectstore"] == "filestore":
journal_device = device_bundle[0][1]
if not ceph_disk:
self.make_osd_fs( osd, osd_num, osd_device, journal_device, mount_list )
self.make_osd( osd, osd_num, osd_device, journal_device )
if parallel_deploy:
opts_mkfs[osd].append((osd, osd_num, osd_device, journal_device, mount_list))
opts.append((osd, osd_num, osd_device, journal_device))
else:
self.make_osd_fs( osd, osd_num, osd_device, journal_device, mount_list )
self.make_osd( osd, osd_num, osd_device, journal_device )
else:
self.make_osd_ceph_disk_prepare(osd, osd_device, journal_device, mount_list)
self.make_osd_ceph_disk_activate(osd, osd_device)
osd_num = osd_num+1
osd_num = osd_num + 1
if parallel_deploy:
common.printout("LOG", "==============begin mkfs in parallel ==============")
FuncThread = common.FuncThread
FuncThread.maxthreads = 5 * len(opts_mkfs.keys())
for index in range(maxlen_od):
for key in sorted(opts_mkfs.keys()):
if index >= len(opts_mkfs[key]) or len(opts_mkfs[key][index]) < 5:
continue
FuncThread.lck.acquire()
if len(FuncThread.tlist) >= FuncThread.maxthreads:
FuncThread.lck.release()
FuncThread.evnt.wait()
else:
FuncThread.lck.release()
FuncThread.newthread(self.make_osd_fs, opts_mkfs[key][index][0], opts_mkfs[key][index][1], opts_mkfs[key][index][2], opts_mkfs[key][index][3], opts_mkfs[key][index][4])
for t in FuncThread.atlist:
t.join()
FuncThread.tlist = []
common.printout("LOG", "===============end mkfs in parallel ===============")

for opt in opts:
if len(opt) == 4:
self.make_osd(opt[0], opt[1], opt[2], opt[3])

def make_osd_ceph_disk_prepare(self, osd, osd_device, journal_device, mount_list):
"""
Expand Down
4 changes: 2 additions & 2 deletions deploy/mod/deploy_rgw.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def __init__(self, tunings=""):
self.cluster["proxy"] = self.all_conf_data.get("cosbench_controller_proxy")
self.cluster["auth_url"] = "http://%s/auth/v1.0;retry=9" % self.cluster["rgw"][0]

def redeploy(self, gen_cephconf, ceph_disk=False):
def redeploy(self, gen_cephconf, ceph_disk=False, parallel_deploy=False):
self.map_diff = self.cal_cephmap_diff()
rgw_nodes = self.map_diff["radosgw"]
super(self.__class__, self).redeploy(gen_cephconf, ceph_disk=False)
super(self.__class__, self).redeploy(gen_cephconf, ceph_disk=False, parallel_deploy=parallel_deploy)
self.rgw_dependency_install()
self.rgw_install()
self.gen_cephconf(ceph_disk=ceph_disk)
Expand Down
10 changes: 7 additions & 3 deletions deploy/run_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def main(args):
default = False,
action='store_true'
)
parser.add_argument(
'--parallel_deploy',
default=False,
action='store_true'
)
parser.add_argument(
'--gen_cephconf',
default = False,
Expand All @@ -53,9 +58,8 @@ def main(args):
mydeploy = deploy.Deploy()
if args.with_rgw:
mydeploy = deploy_rgw.Deploy_RGW()
# mydeploy.deploy()
mydeploy.redeploy(args.gen_cephconf,
ceph_disk=args.ceph_disk)

mydeploy.redeploy(args.gen_cephconf, ceph_disk=args.ceph_disk, parallel_deploy=args.parallel_deploy)

if args.operation == "restart":
mydeploy = deploy.Deploy()
Expand Down
14 changes: 7 additions & 7 deletions workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self):
self.cluster["mons"] = self.all_conf_data.get_list("list_mon")
self.cluster["rgw"] = self.all_conf_data.get_list("rgw_server")
self.cluster["rgw_enable"] = self.all_conf_data.get("enable_rgw")
self.cluster["parallel_deploy"] = self.all_conf_data.get("parallel_deploy")
self.cluster["disable_tuning_check"] = self.all_conf_data.get("disable_tuning_check")
self.cluster["osd_daemon_num"] = 0
for osd in self.cluster["osds"]:
Expand All @@ -46,21 +47,20 @@ def run(self):
controller = self.cluster["head"]
osds = self.cluster["osds"]
pwd = os.path.abspath(os.path.join('..'))
run_deploy_args = ['redeploy']
if len(self.cluster["rgw"]) and self.cluster["rgw_enable"]=="true":
with_rgw = True
else:
with_rgw = False
run_deploy_args.append('--with_rgw')
if not (self.cluster.has_key("parallel_deploy") and self.cluster["parallel_deploy"]=="false"):
run_deploy_args.append('--parallel_deploy')

for section in self.worksheet:
for work in self.worksheet[section]['workstages'].split(','):
if work == "deploy":
common.printout("LOG","Check ceph version, reinstall ceph if necessary")
tuner.main(['--section', section, 'apply_version'])
tuner.main(['--section', section, '--no_check', 'apply_tuning'])
common.printout("LOG","Start to redeploy ceph")
if with_rgw:
run_deploy.main(['--with_rgw','redeploy'])
else:
run_deploy.main(['redeploy'])
run_deploy.main(run_deploy_args)
tuner.main(['--section', section, 'apply_tuning'])
elif work == "benchmark":
if not common.check_ceph_running( user, controller ):
Expand Down