diff --git a/analyzer/node_analyzer.py b/analyzer/node_analyzer.py new file mode 100644 index 0000000..f31fca9 --- /dev/null +++ b/analyzer/node_analyzer.py @@ -0,0 +1,242 @@ +# -*- coding: utf-8 -* +import os,sys +import argparse +lib_path = os.path.abspath(os.path.join('..')) +sys.path.append(lib_path) +from conf import * +from visualizer import * +from analyzer import * +import os, sys +import time +import pprint +import re +import yaml +from collections import OrderedDict +import json +import numpy +import copy +pp = pprint.PrettyPrinter(indent=4) + +class Node_analyzer(Analyzer): + def node_process_data(self): + self.tmp_dir = sys.argv[2] + self.node_name = sys.argv[7] + self.cluster["dest_dir"] = sys.argv[5] + self.result["session_name"] = sys.argv[5] + case_type = re.findall('\d\-\S+', self.cluster["dest_dir"])[0].split('-')[2] + if case_type == "vdbench": + self.result["description"] = "NULL" + user = self.cluster["user"] + if self.node_name in self.cluster["osds"]: + self.result["ceph"][self.node_name]={} + system, workload = self._node_process_data(self.tmp_dir) + self.result["ceph"][self.node_name]=system + self.result["ceph"].update(workload) + if self.node_name in self.cluster["rgw"]: + self.result["rgw"][self.node_name]={} + system, workload = self._node_process_data(self.tmp_dir) + self.result["rgw"][self.node_name]=system + self.result["rgw"].update(workload) + if self.node_name in self.cluster["client"]: + self.result["client"][self.node_name]={} + system, workload = self._node_process_data(self.tmp_dir) + self.result["client"][self.node_name]=system + self.result["workload"].update(workload) + if self.node_name in self.cluster["vclient"]: + params = self.result["session_name"].split('-') + self.cluster["vclient_disk"] = ["/dev/%s" % params[-1]] + self.result["vclient"][self.node_name]={} + system, workload = self._node_process_data(self.tmp_dir) + self.result["vclient"][self.node_name]=system + self.result["workload"].update(workload) + + result = self.format_result_for_visualizer( self.result ) + common.printout("LOG","Write analyzed results into result.json") + with open('%s/%s_result.json' %(self.tmp_dir,self.node_name), 'w') as f: + json.dump(result, f, indent=4) + + def node_ceph_version(self,dest_dir): + node_list = [] + node_list.extend(self.cluster["osds"]) + node_list.append(self.cluster["head"]) + version_list = {} + node_list = set(node_list) + for node in node_list: + if os.path.exists(os.path.join(dest_dir,node+'_ceph_version.txt')): + data = open(os.path.join(dest_dir,node+'_ceph_version.txt'),'r') + if data: + version_list[node] = data.read().strip('\n') + else: + version_list[node] = 'None' + else: + version_list[node] = 'None' + return version_list + + def _node_process_data(self, tmp_dir): + result = {} + fio_log_res = {} + workload_result = {} + for dir_name in os.listdir("%s" % (tmp_dir)): + if os.path.isfile("%s%s"%(tmp_dir,dir_name)): + common.printout("LOG","Processing %s_%s" % (tmp_dir, dir_name)) + if 'smartinfo.txt' in dir_name: + res = self.process_smartinfo_data( "%s/%s" % (tmp_dir, dir_name)) + result.update(res) + if 'cosbench' in dir_name: + workload_result.update(self.process_cosbench_data("%s/%s" %(tmp_dir, dir_name), dir_name)) + if '_sar.txt' in dir_name: + result.update(self.process_sar_data("%s/%s" % (tmp_dir, dir_name))) + if 'totals.html' in dir_name: + workload_result.update(self.process_vdbench_data("%s/%s" % (tmp_dir, dir_name), "%s_%s" % (tmp_dir, dir_name))) + if '_fio.txt' in dir_name: + workload_result.update(self.process_fio_data("%s/%s" % (tmp_dir, dir_name), dir_name)) + if '_fio_iops.1.log' in dir_name or '_fio_bw.1.log' in dir_name or '_fio_lat.1.log' in dir_name: + if "_fio_iops.1.log" in dir_name: + volume = dir_name.replace("_fio_iops.1.log", "") + if "_fio_bw.1.log" in dir_name: + volume = dir_name.replace("_fio_bw.1.log", "") + if "_fio_lat.1.log" in dir_name: + volume = dir_name.replace("_fio_lat.1.log", "") + if volume not in fio_log_res: + fio_log_res[volume] = {} + fio_log_res[volume]["fio_log"] = {} + fio_log_res[volume]["fio_log"] = self.process_fiolog_data("%s/%s" % (tmp_dir, dir_name), fio_log_res[volume]["fio_log"] ) + workload_result.update(fio_log_res) + if '_iostat.txt' in dir_name: + res = self.node_process_iostat_data( tmp_dir, "%s/%s" % (tmp_dir, dir_name),self.node_name) + result.update(res) + if '_interrupts_end.txt' in dir_name: + if os.path.exists("%s/%s" % ( tmp_dir, dir_name.replace('end','start'))): + interrupt_start = "%s/%s" % ( tmp_dir, dir_name) + interrupt_end = "%s/%s" % ( tmp_dir, dir_name.replace('end','start')) + self.interrupt_diff(tmp_dir,self.node_name,interrupt_start,interrupt_end) + if '_process_log.txt' in dir_name: + res = self.process_log_data( "%s/%s" % (tmp_dir, dir_name) ) + result.update(res) + if '.asok.txt' in dir_name: + try: + res = self.process_perfcounter_data("%s/%s" % (tmp_dir, dir_name)) + for key, value in res.items(): + if dir_name not in workload_result: + workload_result[dir_name] = OrderedDict() + workload_result[dir_name][key] = value + except: + pass + return [result, workload_result] + + + def interrupt_diff(self,tmp_dir,node_name,s_path,e_path): + s_p = s_path + e_p = e_path + result_name = node_name+'_interrupt.txt' + result_path = os.path.join(tmp_dir,result_name) + s_l = [] + e_l = [] + diff_list = [] + with open(s_p, 'r') as f: + s = f.readlines() + with open(e_p, 'r') as f: + e = f.readlines() + for i in s: + tmp = [] + tmp = i.split(' ') + while '' in tmp: + tmp.remove('') + s_l.append(tmp) + for i in e: + tmp = [] + tmp = i.split(' ') + while '' in tmp: + tmp.remove('') + e_l.append(tmp) + if self.check_interrupt(s_l,e_l): + for i in range(len(s_l)): + lines = [] + for j in range(len(s_l[i])): + if s_l[i][j].isdigit() and e_l[i][j].isdigit(): + diff_value = int(e_l[i][j]) - int(s_l[i][j]) + lines.append(int(e_l[i][j]) - int(s_l[i][j])) + else: + lines.append(e_l[i][j]) + diff_list.append(lines) + if os.path.exists(result_path): + os.remove(result_path) + output = open(result_path,'w+') + for line in diff_list: + line_str = '' + for col in range(len(line)): + if col != len(line)-1: + line_str += str(line[col])+' ' + else: + line_str += str(line[col]) + output.writelines(line_str) + output.close() + else: + print 'ERROR: interrupt_start lines and interrupt_end lines are diffrent ! can not calculate diffrent value!' + + def node_process_iostat_data(self, node, path,node_name): + result = {} + output_list = [] + dict_diskformat = {} + if node_name in self.cluster["osds"]: + output_list = common.parse_disk_format( self.cluster['diskformat'] ) + for i in range(len(output_list)): + disk_list=[] + for osd_journal in common.get_list(self.all_conf_data.get_list(node_name)): + tmp_dev_name = osd_journal[i].split('/')[2] + if 'nvme' in tmp_dev_name: + tmp_dev_name = common.parse_nvme( tmp_dev_name ) + if tmp_dev_name not in disk_list: + disk_list.append( tmp_dev_name ) + dict_diskformat[output_list[i]]=disk_list + elif node_name in self.cluster["vclient"]: + vdisk_list = [] + for disk in self.cluster["vclient_disk"]: + vdisk_list.append( disk.split('/')[2] ) + output_list = ["vdisk"] + # get total second + runtime = common.bash("grep 'Device' "+path+" | wc -l ").strip() + for output in output_list: + if output != "vdisk": + disk_list = " ".join(dict_diskformat[output]) + disk_num = len(list(set(dict_diskformat[output]))) + else: + disk_list = " ".join(vdisk_list) + disk_num = len(vdisk_list) + stdout = common.bash( "grep 'Device' -m 1 "+path+" | awk -F\"Device:\" '{print $2}'; cat "+path+" | awk -v dev=\""+disk_list+"\" -v line="+runtime+" 'BEGIN{split(dev,dev_arr,\" \");dev_count=0;for(k in dev_arr){count[k]=0;dev_count+=1};for(i=1;i<=line;i++)for(j=1;j<=NF;j++){res_arr[i,j]=0}}{for(k in dev_arr)if(dev_arr[k]==$1){cur_line=count[k];for(j=2;j<=NF;j++){res_arr[cur_line,j]+=$j;}count[k]+=1;col=NF}}END{for(i=1;i<=line;i++){for(j=2;j<=col;j++)printf (res_arr[i,j]/dev_count)\"\"FS; print \"\"}}'") + result[output] = common.convert_table_to_2Dlist(stdout) + result[output]["disk_num"] = disk_num + return result + +def main(args): + parser = argparse.ArgumentParser(description='Analyzer tool') + parser.add_argument( + 'operation', + ) + parser.add_argument( + '--case_name', + ) + parser.add_argument( + '--path', + ) + parser.add_argument( + '--path_detail', + ) + parser.add_argument( + '--node', + ) + parser.add_argument( + '--node_name', + ) + args = parser.parse_args(args) + process = Node_analyzer(args.path) + if args.operation == "node_process_data": + process.node_process_data() + else: + func = getattr(process, args.operation) + if func: + func(args.path_detail) + +if __name__ == '__main__': + import sys + main(sys.argv[1:]) diff --git a/benchmarking/mod/benchmark.py b/benchmarking/mod/benchmark.py index ab845b3..854176c 100644 --- a/benchmarking/mod/benchmark.py +++ b/benchmarking/mod/benchmark.py @@ -1,11 +1,15 @@ import subprocess from conf import * import copy +import json import os, sys import time import re import uuid +from visualizer import * from analyzer import * +from collections import OrderedDict +import threading lib_path = ( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) class Benchmark(object): @@ -59,10 +63,77 @@ def go(self, testcase, tuning): self.setStatus("Completed") common.printout("LOG","Post Process Result Data") + total_result = self.combine_nodes_result() + common.printout("LOG","Write analyzed results into result.json") try: - analyzer.main(['--path', self.cluster["dest_dir"], 'process_data']) + with open('%s/result.json' % self.cluster["dest_dir"], 'w') as f: + json.dump(total_result, f, indent=4) + view = visualizer.Visualizer(total_result, self.cluster["dest_dir"]) + output = view.generate_summary_page() except: - common.printout("ERROR","analyzer failed, pls try cd analyzer; python analyzer.py --path %s process_data " % self.cluster["dest_dir"]) + common.printout("ERROR","Write analyzed results into result.json failed") + + + def combine_nodes_result(self): + self.ab_nodes = self.check_osd_result_exists() + self.combine_result = OrderedDict() + self.nodes_result = OrderedDict() + self.clients_result = OrderedDict() + try: + for node in self.cluster["osd"]: + if node not in self.ab_nodes: + node_result = json.load(open('%s/raw/%s/%s_result.json' %(self.cluster["dest_dir"],node,node)), object_pairs_hook=OrderedDict) + if len(self.nodes_result) == 0: + self.nodes_result.update(node_result) + else: + for col in self.nodes_result["ceph"].keys(): + self.nodes_result["ceph"][col].update(node_result["ceph"][col]) + self.combine_result.update(self.nodes_result) + except Exception: + common.printout("ERROR","combine osd result failed!") + pass + + try: + for client in self.benchmark["distribution"].keys(): + client_reult = json.load(open('%s/raw/%s/%s_result.json' %(self.cluster["dest_dir"],client,client)), object_pairs_hook=OrderedDict) + if len(self.clients_result) == 0: + self.clients_result.update(client_reult) + else: + self.clients_result["workload"].update(client_reult["workload"]) + self.clients_result["client"].update(client_reult["client"]) + + self.combine_result["workload"] = self.clients_result["workload"] + self.combine_result["client"] = self.clients_result["client"] + except Exception: + common.printout("ERROR","combine client result failed!") + pass + ana = analyzer.Analyzer(self.cluster["dest_dir"]) + try: + self.combine_result["status"] = ana.getStatus() + self.combine_result["description"] = ana.getDescription() + except Exception: + common.printout("ERROR","combine status or decription failed!") + pass + + if len(self.combine_result) != 0: + self.combine_result = ana.summary_result(self.combine_result) + self.combine_result["summary"]["Download"] = {"Configuration":{"URL":"" % self.combine_result["session_name"]}} + node_ceph_version = {} + if ana.collect_node_ceph_version(os.path.join(self.cluster["dest_dir"],'raw')): + for key,value in ana.collect_node_ceph_version(os.path.join(self.cluster["dest_dir"],"raw")).items(): + node_ceph_version[key] = {"ceph_version":value} + self.combine_result["summary"]["Node"] = node_ceph_version + return self.combine_result + + def check_osd_result_exists(self): + nodes = self.cluster["osd"] + abnormal_node = [] + for osd in self.cluster["osd"]: + result_path = "%s/raw/%s/%s_result.json"%(self.cluster["dest_dir"],osd,osd) + if not os.path.exists(result_path): + abnormal_node.append(osd) + return abnormal_node + def create_image(self, volume_count, volume_size, poolname): user = self.cluster["user"] @@ -225,6 +296,9 @@ def run(self): common.printout("LOG","Start perfcounter data collector under %s " % nodes) common.pdsh(user, nodes, "echo `date +%s`' perfcounter start' >> %s/`hostname`_process_log.txt; for i in `seq 1 %d`; do find /var/run/ceph -name '*client*asok' | while read path; do filename=`echo $path | awk -F/ '{print $NF}'`;res_file=%s/`hostname`_${filename}.txt; echo `ceph --admin-daemon $path perf dump`, >> ${res_file} & done; sleep %s; done; echo `date +%s`' perfcounter stop' >> %s/`hostname`_process_log.txt;" % ('%s', dest_dir, time_tmp, dest_dir, monitor_interval, '%s', dest_dir), option="force") + def sleep(self): + time.sleep(30) + def archive(self): user = self.cluster["user"] head = self.cluster["head"] @@ -248,25 +322,98 @@ def archive(self): #write description to dir with open( "%s/conf/description" % dest_dir, 'w+' ) as f: f.write( self.benchmark["description"] ) + + #copy all.conf to all node's log path + self.cetune_path = os.path.join(self.cluster["tmp_dir"],'CeTune/') + self.loacl_cetune_path = '/' + try: + for i in os.path.join(os.getcwd().split('/')[:-1]): + self.loacl_cetune_path = os.path.join(self.loacl_cetune_path,i) + for osd in self.cluster["osd"]: + common.scp(user, osd, "%s/conf/all.conf" % (self.loacl_cetune_path), "%s/all.conf" % self.cluster["tmp_dir"]) + + for client in self.benchmark["distribution"].keys(): + common.scp(user, client, "%s/conf/all.conf" % (self.loacl_cetune_path), "%s/all.conf" % self.cluster["tmp_dir"]) + except Exception: + common.printout("ERROR","copy all.conf to nodes failed!") + pass + + #do analyzer at node + abnormal_node = [] + self.threads = [] + self.thrad_id = {} + for node in self.cluster["osd"]: + self.node_list = [] + self.node_list.append(node) + if self.check_node_cetune('root',node,self.cetune_path): + tr_name = 'tr_'+node + tr_name = threading.Thread(target=common.pdsh,args=(user, self.node_list,"cd /%s/analyzer/;python node_analyzer.py --path /%s node_process_data --case_name %s --node_name %s"%(self.cetune_path,self.cluster["tmp_dir"],dest_dir.split('/')[-1],node))) + self.threads.append(tr_name) + for i in self.threads: + i.setDaemon(True) + i.start() + i.join() + + time.sleep(15) + #collect osd data for node in self.cluster["osd"]: + common.bash("mkdir -p %s/raw/%s" % (dest_dir, node)) common.rscp(user, node, "%s/raw/%s/" % (dest_dir, node), "%s/*.txt" % self.cluster["tmp_dir"]) + common.rscp(user, node, "%s/raw/%s/" % (dest_dir, node), "%s/*.json" % self.cluster["tmp_dir"]) if "blktrace" in self.cluster["collector"]: common.rscp(user, node, "%s/raw/%s/" % (dest_dir, node), "%s/*blktrace*" % self.cluster["tmp_dir"]) if "lttng" in self.cluster["collector"]: common.rscp(user, node, "%s/raw/%s/" % (dest_dir, node), "%s/lttng-traces" % self.cluster["tmp_dir"]) + #do analyzer at client + abnormal_client = [] + self.threads = [] + self.thrad_id = {} + for client in self.benchmark["distribution"].keys(): + self.client_list = [] + self.client_list.append(client) + if self.check_node_cetune('root',client,self.cetune_path): + tr_name = 'tr_'+client + if client in self.cluster["head"]: + tr_name = threading.Thread(target=common.pdsh,args=(user, self.client_list,"cd /%s/analyzer/;python node_analyzer.py --path /%s node_process_data --case_name %s --node_name %s"%(self.loacl_cetune_path,self.cluster["tmp_dir"],dest_dir.split('/')[-1],client))) + else: + tr_name = threading.Thread(target=common.pdsh,args=(user, self.client_list,"cd /%s/analyzer/;python node_analyzer.py --path /%s node_process_data --case_name %s --node_name %s"%(self.cetune_path,self.cluster["tmp_dir"],dest_dir.split('/')[-1],client))) + self.threads.append(tr_name) + for i in self.threads: + i.setDaemon(True) + i.start() + i.join() + + time.sleep(15) + #collect client data for node in self.benchmark["distribution"].keys(): common.bash( "mkdir -p %s/raw/%s" % (dest_dir, node)) common.rscp(user, node, "%s/raw/%s/" % (dest_dir, node), "%s/*.txt" % self.cluster["tmp_dir"]) + common.rscp(user, node, "%s/raw/%s/" % (dest_dir, node), "%s/*.json" % self.cluster["tmp_dir"]) #save real runtime if self.real_runtime: with open("%s/real_runtime.txt" % dest_dir, "w") as f: f.write(str(int(self.real_runtime))) + def check_node_cetune(self,user,node,cetune_path): + try: + nodes = [] + nodes.append(node) + reback_msg = common.pdsh(user, nodes,"ls /%s"%cetune_path,option = "check_return") + self.status = True + for i in reback_msg: + if "No such file" in i: + self.status = False + return self.status + except e,Exception: + return False + + + def stop_data_collecters(self): #2. clean running process user = self.cluster["user"] diff --git a/conf/description.py b/conf/description.py index 554e3c3..018ed7b 100644 --- a/conf/description.py +++ b/conf/description.py @@ -50,5 +50,5 @@ def get_defaultvalue_by_key(self,key): if key in DefaultValue.read_defaultvalue_to_dict().keys(): return DefaultValue.read_defaultvalue_to_dict()[key] else: - print "the key is not exists." + #print "the key is not exists." return diff --git a/visualizer/visualizer.py b/visualizer/visualizer.py index b85b486..42a0d92 100644 --- a/visualizer/visualizer.py +++ b/visualizer/visualizer.py @@ -27,12 +27,13 @@ def __init__(self, result, path=None): else: all_path = path self.all_conf_data = config.Config("%s/all.conf" % all_path) + #self.db_path = "/home/" self.db_path = self.all_conf_data.get("dest_dir") self.result = result self.output = [] if path: self.path = path - self.session_name = os.path.basename(path.strip('/')) + self.result["session_name"] = os.path.basename(path.strip('/')) self.dest_dir_remote_bak = self.all_conf_data.get("dest_dir_remote_bak", dotry = True) self.user = self.all_conf_data.get("user")