From 73ccf80d672f29bcdf0739e4dd1ee92198e6acd9 Mon Sep 17 00:00:00 2001 From: "Zhaowangx.Liang" Date: Mon, 13 Mar 2017 15:31:53 +0800 Subject: [PATCH] asynchronous processing the make_osd_fs Signed-off-by: Zhaowangx.Liang --- conf/common.py | 29 ++++++++++++++++++++++++++ conf/handler.py | 1 + conf/help.conf | 1 + deploy/mod/deploy.py | 45 ++++++++++++++++++++++++++++++++++------ deploy/mod/deploy_rgw.py | 4 ++-- deploy/run_deploy.py | 10 ++++++--- workflow/workflow.py | 14 ++++++------- 7 files changed, 86 insertions(+), 18 deletions(-) diff --git a/conf/common.py b/conf/common.py index c1ef073..af44292 100644 --- a/conf/common.py +++ b/conf/common.py @@ -16,6 +16,7 @@ import struct from collections import OrderedDict import argparse +import threading cetune_log_file = "../conf/cetune_process.log" cetune_error_file = "../conf/cetune_error.log" @@ -607,3 +608,31 @@ def parse_disk_format( disk_format_str ): disk_type_list = disk_format_str.split(":") return disk_type_list +class FuncThread(threading.Thread): + tlist = [] + atlist = [] + maxthreads = 8 + evnt = threading.Event() + lck = threading.Lock() + def __init__(self, method, *args): + self._method = method + self._args = args + threading.Thread.__init__(self) + + def run(self): + self._method(*self._args) + FuncThread.lck.acquire() + FuncThread.tlist.remove(self) + if len(FuncThread.tlist) == FuncThread.maxthreads - 1: + FuncThread.evnt.set() + FuncThread.evnt.clear() + FuncThread.lck.release() + + @staticmethod + def newthread(method, *args): + FuncThread.lck.acquire() + ft = FuncThread(method, *args) + FuncThread.tlist.append(ft) + FuncThread.atlist.append(ft) + FuncThread.lck.release() + ft.start() diff --git a/conf/handler.py b/conf/handler.py index facb3db..80b97fa 100644 --- a/conf/handler.py +++ b/conf/handler.py @@ -200,6 +200,7 @@ def list_required_config(self): required_list["cluster"]["list_mon"] = "" required_list["cluster"]["enable_rgw"] = "false" required_list["cluster"]["disk_format"] = "osd:journal" + required_list["cluster"]["parallel_deploy"] = "true" required_list["ceph_hard_config"] = OrderedDict() required_list["ceph_hard_config"]["public_network"] = "" required_list["ceph_hard_config"]["cluster_network"] = "" diff --git a/conf/help.conf b/conf/help.conf index b440f85..09b69c7 100644 --- a/conf/help.conf +++ b/conf/help.conf @@ -46,5 +46,6 @@ cosbench_controller = A host runs cosbench controller on, this node should be ab cosbench_version = set to v0.4.c2 by default, when run a cosbench benchmark test, this cosbench version will be retained from github custom_script = if benchmark_driver set to 'hook', user can add a '*.bash/*.sh' here, and CeTune will run this script at runtime. Ex: "bash hook.bash" disk_format = disk type and sequence of ${hostname}. +parallel_deploy = true/false; whether to deploy ceph cluster in parallel. disable_tuning_check = if true, cetune will not check ceph.conf before benchmark. distributed_data_process = if true, cetune will distribute the data process work to all nodes and summarize in controller node. This option requires running controller_dependencies_install.py on all cetune workers. diff --git a/deploy/mod/deploy.py b/deploy/mod/deploy.py index 9d66690..97da085 100644 --- a/deploy/mod/deploy.py +++ b/deploy/mod/deploy.py @@ -454,7 +454,7 @@ def cal_cephmap_diff(self, ceph_disk=False): return cephconf_dict - def redeploy(self, gen_cephconf, ceph_disk=False): + def redeploy(self, gen_cephconf, ceph_disk=False, parallel_deploy=False): common.printout("LOG","ceph.conf file generated") if self.cluster["clean_build"] == "true": clean_build = True @@ -477,7 +477,7 @@ def redeploy(self, gen_cephconf, ceph_disk=False): self.make_mon() common.printout("LOG","Succeeded in building mon daemon") common.printout("LOG","Started to build osd daemon") - self.make_osds(ceph_disk=ceph_disk) + self.make_osds(ceph_disk=ceph_disk, parallel_deploy=parallel_deploy) common.printout("LOG","Succeeded in building osd daemon") common.bash("cp -f ../conf/ceph.conf ../conf/ceph_current_status") @@ -560,7 +560,7 @@ def distribute_conf(self): common.pdsh(user, [node], "rm -rf /etc/ceph/ceph.conf") common.scp(user, node, "../conf/ceph.conf", "/etc/ceph/") - def make_osds(self, osds=None, diff_map=None, ceph_disk=False): + def make_osds(self, osds=None, diff_map=None, ceph_disk=False, parallel_deploy=False): user = self.cluster["user"] if osds==None: osds = sorted(self.cluster["osds"]) @@ -575,7 +575,13 @@ def make_osds(self, osds=None, diff_map=None, ceph_disk=False): for line in mount_list_tmp.split('\n'): tmp = line.split() mount_list[node][tmp[0]] = tmp[2] + + opts_mkfs = {} + opts = [] + maxlen_od = 0 for osd in osds: + opts_mkfs[osd] = [] + maxlen_od = len(diff_map[osd]) if len(diff_map[osd]) > maxlen_od else maxlen_od for device_bundle_tmp in diff_map[osd]: device_bundle = common.get_list(device_bundle_tmp) osd_device = device_bundle[0][0] @@ -583,12 +589,39 @@ def make_osds(self, osds=None, diff_map=None, ceph_disk=False): if self.cluster["ceph_conf"]["global"]["osd_objectstore"] == "filestore": journal_device = device_bundle[0][1] if not ceph_disk: - self.make_osd_fs( osd, osd_num, osd_device, journal_device, mount_list ) - self.make_osd( osd, osd_num, osd_device, journal_device ) + if parallel_deploy: + opts_mkfs[osd].append((osd, osd_num, osd_device, journal_device, mount_list)) + opts.append((osd, osd_num, osd_device, journal_device)) + else: + self.make_osd_fs( osd, osd_num, osd_device, journal_device, mount_list ) + self.make_osd( osd, osd_num, osd_device, journal_device ) else: self.make_osd_ceph_disk_prepare(osd, osd_device, journal_device, mount_list) self.make_osd_ceph_disk_activate(osd, osd_device) - osd_num = osd_num+1 + osd_num = osd_num + 1 + if parallel_deploy: + common.printout("LOG", "==============begin mkfs in parallel ==============") + FuncThread = common.FuncThread + FuncThread.maxthreads = 5 * len(opts_mkfs.keys()) + for index in range(maxlen_od): + for key in sorted(opts_mkfs.keys()): + if index >= len(opts_mkfs[key]) or len(opts_mkfs[key][index]) < 5: + continue + FuncThread.lck.acquire() + if len(FuncThread.tlist) >= FuncThread.maxthreads: + FuncThread.lck.release() + FuncThread.evnt.wait() + else: + FuncThread.lck.release() + FuncThread.newthread(self.make_osd_fs, opts_mkfs[key][index][0], opts_mkfs[key][index][1], opts_mkfs[key][index][2], opts_mkfs[key][index][3], opts_mkfs[key][index][4]) + for t in FuncThread.atlist: + t.join() + FuncThread.tlist = [] + common.printout("LOG", "===============end mkfs in parallel ===============") + + for opt in opts: + if len(opt) == 4: + self.make_osd(opt[0], opt[1], opt[2], opt[3]) def make_osd_ceph_disk_prepare(self, osd, osd_device, journal_device, mount_list): """ diff --git a/deploy/mod/deploy_rgw.py b/deploy/mod/deploy_rgw.py index 95fab60..56806dd 100644 --- a/deploy/mod/deploy_rgw.py +++ b/deploy/mod/deploy_rgw.py @@ -25,10 +25,10 @@ def __init__(self, tunings=""): self.cluster["proxy"] = self.all_conf_data.get("cosbench_controller_proxy") self.cluster["auth_url"] = "http://%s/auth/v1.0;retry=9" % self.cluster["rgw"][0] - def redeploy(self, gen_cephconf, ceph_disk=False): + def redeploy(self, gen_cephconf, ceph_disk=False, parallel_deploy=False): self.map_diff = self.cal_cephmap_diff() rgw_nodes = self.map_diff["radosgw"] - super(self.__class__, self).redeploy(gen_cephconf, ceph_disk=False) + super(self.__class__, self).redeploy(gen_cephconf, ceph_disk=False, parallel_deploy=parallel_deploy) self.rgw_dependency_install() self.rgw_install() self.gen_cephconf(ceph_disk=ceph_disk) diff --git a/deploy/run_deploy.py b/deploy/run_deploy.py index 90dee96..6691260 100644 --- a/deploy/run_deploy.py +++ b/deploy/run_deploy.py @@ -33,6 +33,11 @@ def main(args): default = False, action='store_true' ) + parser.add_argument( + '--parallel_deploy', + default=False, + action='store_true' + ) parser.add_argument( '--gen_cephconf', default = False, @@ -53,9 +58,8 @@ def main(args): mydeploy = deploy.Deploy() if args.with_rgw: mydeploy = deploy_rgw.Deploy_RGW() -# mydeploy.deploy() - mydeploy.redeploy(args.gen_cephconf, - ceph_disk=args.ceph_disk) + + mydeploy.redeploy(args.gen_cephconf, ceph_disk=args.ceph_disk, parallel_deploy=args.parallel_deploy) if args.operation == "restart": mydeploy = deploy.Deploy() diff --git a/workflow/workflow.py b/workflow/workflow.py index 05a1cf0..024bfb9 100644 --- a/workflow/workflow.py +++ b/workflow/workflow.py @@ -27,6 +27,7 @@ def __init__(self): self.cluster["mons"] = self.all_conf_data.get_list("list_mon") self.cluster["rgw"] = self.all_conf_data.get_list("rgw_server") self.cluster["rgw_enable"] = self.all_conf_data.get("enable_rgw") + self.cluster["parallel_deploy"] = self.all_conf_data.get("parallel_deploy") self.cluster["disable_tuning_check"] = self.all_conf_data.get("disable_tuning_check") self.cluster["osd_daemon_num"] = 0 for osd in self.cluster["osds"]: @@ -46,10 +47,12 @@ def run(self): controller = self.cluster["head"] osds = self.cluster["osds"] pwd = os.path.abspath(os.path.join('..')) + run_deploy_args = ['redeploy'] if len(self.cluster["rgw"]) and self.cluster["rgw_enable"]=="true": - with_rgw = True - else: - with_rgw = False + run_deploy_args.append('--with_rgw') + if not (self.cluster.has_key("parallel_deploy") and self.cluster["parallel_deploy"]=="false"): + run_deploy_args.append('--parallel_deploy') + for section in self.worksheet: for work in self.worksheet[section]['workstages'].split(','): if work == "deploy": @@ -57,10 +60,7 @@ def run(self): tuner.main(['--section', section, 'apply_version']) tuner.main(['--section', section, '--no_check', 'apply_tuning']) common.printout("LOG","Start to redeploy ceph") - if with_rgw: - run_deploy.main(['--with_rgw','redeploy']) - else: - run_deploy.main(['redeploy']) + run_deploy.main(run_deploy_args) tuner.main(['--section', section, 'apply_tuning']) elif work == "benchmark": if not common.check_ceph_running( user, controller ):