diff options
Diffstat (limited to 'test-chill/testchill/gcov.py')
-rw-r--r-- | test-chill/testchill/gcov.py | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/test-chill/testchill/gcov.py b/test-chill/testchill/gcov.py new file mode 100644 index 0000000..668c00e --- /dev/null +++ b/test-chill/testchill/gcov.py @@ -0,0 +1,224 @@ +from __future__ import print_function +import functools +import itertools +import os +import os.path +import sys + +from . import util + +class GcovFile(object): + def __init__(self, src_file_name, cov_file_path, lines, properties): + """ + @param src_file_name Name of the source file. + @param cov_file_path Full path to the coverage file. + @param lines List of GcovLine objects. + @param properties Properties from the coverage file. + """ + self.src_file_name = src_file_name + self.cov_file_path = cov_file_path + self.lines = lines + self.properties = properties + + @staticmethod + def parse_file(gcov, fname, process=None): + """ + Parse a file into a GcovFile object. + @param gcov Gcov object that tis file is a part of. + @param gname File name. + @param process Process name + """ + util.shell('gcov', [fname], wd=gcov.srcdir) + cov_file_path = os.path.join(gcov.srcdir, fname + '.gcov') + src_file_name = fname + if os.path.exists(cov_file_path): + with open(cov_file_path, 'r') as f: + lines, properties = GcovFile.parse_lines(f.readlines(), process) + return GcovFile(src_file_name, cov_file_path, lines, properties) + else: + return None + + @staticmethod + def parse_lines(str_lines, process): + """ + Parse a string from a coverage file into a list of GcovLine objects. + @param str_lines Full text of a coverage file. + @param process Name of the process that executed the code. + """ + properties = dict() + lines = [] + for line in str_lines: + if line[-1] == '\n': + line = line[0:-1] + pline = line.split(':') + pline = list(map(str.strip, pline[0:2])) + pline[2:] + if pline[1] == '0': + properties[pline[2]] = pline[3].strip() + elif pline[0][0] == '-': + lines.append(GcovLine(int(pline[1]), dict(), ':'.join(pline[2:]))) + elif pline[0][0] == '#': + lines.append(GcovLine(int(pline[1]), {process : 0}, ':'.join(pline[2:]))) + else: + lines.append(GcovLine(int(pline[1]), {process : int(pline[0])}, ':'.join(pline[2:]))) + return lines, properties + + @staticmethod + def union(left, right): + """ + Merge two different coverages of the same file into a single coverage object. + """ + return left | right + + def __or__(self, right): + """ + Merge two different coverages of the same file into a single coverage object. + """ + new_file = self.clone() + new_file.merge(right) + return new_file + + def __ior__(self, right): + """ + Merge two different coverages of the same file into a single coverage object. + """ + self.merge(right) + return self + + def merge(self, other): + """ + Merge another coeverage into self. + """ + assert self.src_file_name == other.src_file_name + GcovLine.merge_lines(self.lines, other.lines) + self.properties.update(other.properties) + + def clone(self): + """ + Create a shallow clone. + """ + return GcovFile(self.src_file_name, self.cov_file_path, list(self.lines), dict(self.properties)) + + +class GcovLine(object): + def __init__(self, lineno, count_by_process, code): + """ + @param lineno Line number. + @param count_by_prcess A dictionary of execution counts by name of the process that executed them. + @param code Source code from this line. + """ + self.lineno = lineno + self.count_by_process = count_by_process + self.code = code + + @staticmethod + def merge_lines(lines, other_lines): + """ + Merge lines from other_line into lines. + """ + for line, other_line in zip(lines, other_lines): + assert line.lineno == other_line.lineno + assert line.code == other_line.code + line.count_by_process.update(other_line.count_by_process) + + def count(self): + """ + The total number of times this line was executed. + """ + runable_list = [l for l in self.count_by_process.values() if l is not None] + if len(runable_list) == 0: + return None + else: + return sum(runable_list) + + def __repr__(self): + return str((self.lineno, self.count_by_process, self.code)) + + +class Gcov(object): + def __init__(self, srcdir): + self.srcdir = srcdir + self.files = dict() + + @staticmethod + def parse(srcdir, process=None): + gcov = Gcov(srcdir) + gcov._append(filter(lambda f: f is not None, map(functools.partial(GcovFile.parse_file, gcov, process=process), + util.filterext(['cc','c','cpp','h','hh'], os.listdir(srcdir))))) + return gcov + + def _append(self, files): + for f in files: + if f.src_file_name in self.files: + self.files[f.src_file_name].merge(f) + else: + self.files[f.src_file_name] = f + + def __or__(self, right): + new_cov = self.clone() + new_cov.merge(right) + return new_cov + + def __ior__(self, right): + self.merge(right) + return self + + @staticmethod + def union(left, right): + return left | right + + def merge(self, other): + self._append(other.files.values()) + + def clone(self): + new_cov = Gcov(self.srcdir) + new_cov._append(iter(f.clone() for f in self.files.values())) + return new_cov + + +class GcovSet(object): + def __init__(self): + self.coverage_by_program = dict() + + def addprogram(self, prog_name, src_dir): + self.coverage_by_program[prog_name] = Gcov(src_dir) + + def addcoverage(self, prog_name, process_name): + cov = self.coverage_by_program[prog_name] + cov.merge(Gcov.parse(cov.srcdir, process_name)) + + #def unexecuted_lines(self): + # covlist = sorted(self.coverage_by_program.values(), key=lambda c: c.srcdir) + # for src, grp in itertools.groupby(covlist, lambda c: c.srcdir): + # files = functools.reduce(lambda a, c: a | c, grp).files.values() + # file_lines = iter((f.src_file_name, iter(l for l in f.lines if l.count() == 0)) for f in files) + # yield src, file_lines + # + #def pretty_print(self, outfile=sys.stdout, width=60, stats=['unexecuted', 'unexecuted.bysrc']): + # print('='*width, file=outfile) + # print(' CODE COVERAGE', file=outfile) + # + # if 'unexecuted' in stats: + # print('='*width, file=outfile) + # print(' unexecuted lines', file=outfile) + # if 'unexecuted.bysrc' in stats: + # for src, file_lines in self.unexecuted_lines(): + # print((src + ':'), file=outfile) + # print('-'*width, file=outfile) + # for src_file_name, lines in file_lines: + # print(' ' + src_file_name + ':', file=outfile) + # for line in lines: + # print("{}:{}".format(str(line.lineno).rjust(5), line.code), file=outfile) + # #print('='*width, file=outfile) + # #print(prog, file=outfile) + # #print('-'*width, file=outfile) + + def _get_coverage_by_file(self): + return functools.reduce(lambda a,b: a|b, self.coverage_by_program.values()).files + + def _get_filenames(self): + return self.coverage_by_file.keys() + + coverage_by_file = property(_get_coverage_by_file) + filenames = property(_get_filenames) + + |