Source code for codemetrics.core

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os.path
import typing

import lizard
import pandas as pd
import sklearn
import sklearn.cluster
import sklearn.feature_extraction.text

from . import internals, scm

__all__ = [
    "get_mass_changes",
    "get_ages",
    "get_hot_spots",
    "get_co_changes",
    "guess_components",
    "get_complexity",
]


[docs]def get_mass_changes( log: pd.DataFrame, min_path: int = None, max_changes_per_path: float = None ) -> pd.DataFrame: """Extract mass changesets from the SCM log data frame. Calculate the number of files changed by each revision and extract that list according to the threshold. Args: log: SCM log data is expected to contain at least revision, added, removed, and path columns. min_path: threshold for the number of files changed to consider the revision a mass change. max_changes_per_path: threshold for the number of changed lines (added + removed) per file that changed. Returns: revisions that had more files changed than the threshold as a pd.DataFrame with columns revision, path, changes and changes_per_path. """ data = log.reset_index().copy()[["revision", "path", "added", "removed"]] data["changes"] = data["added"] + data["removed"] data = ( data[["revision", "path", "changes"]] .groupby("revision", as_index=False) .agg({"path": "count", "changes": "sum"}) .assign(revision=lambda x: x["revision"].astype("string")) ) data["changes_per_path"] = data["changes"] / data["path"] if min_path is not None: data = data[data["path"] >= min_path] if max_changes_per_path is not None: data = data[data["changes_per_path"] <= max_changes_per_path] return data
[docs]def get_ages( data: pd.DataFrame, by: typing.Sequence[str] = None, ) -> pd.DataFrame: """Generate age of each file based on last change. Takes the output of a SCM log or just the date column and return get_ages. Args: data: log or date column of log. by: keys used to group data before calculating the age. See pandas.DataFrame.groupby. Defaults to ['path']. Returns: age of most recent modification as pandas.DataFrame. Example:: get_ages = codemetrics.get_ages(log_df) """ if by is None: by = ["path"] now = pd.to_datetime(internals.get_now(), utc=True) rv = data.groupby(by)["date"].max().reset_index() rv["age"] = now - pd.to_datetime(rv["date"], utc=True) rv["age"] /= pd.Timedelta(1, unit="D") return rv.drop(columns=["date"])
[docs]def get_hot_spots(log, loc, by=None, count_one_change_per=None): """Generate hot spots from SCM and loc data. Cross SCM log and lines of code as an approximation of complexity to determine paths that are complex and change often. Args: log: output log from SCM. loc: output from cloc. by: aggregation level can be path (default), another column. count_one_change_per: allows one to count one change by day or one change per JIRA instead of one change by revision. Returns: pandas.DataFrame """ if by is None: by = "path" if count_one_change_per is None: count_one_change_per = ["revision"] c_df = loc.copy() c_df = c_df.rename(columns={"code": "lines"}) columns = count_one_change_per + [by] ch_df = log[columns].drop_duplicates()[by].value_counts().to_frame("changes") df = pd.merge(c_df, ch_df, right_index=True, left_on=by, how="outer").reset_index( drop=True ) num_columns = df.select_dtypes(include=["number"]).columns df[num_columns] = df[num_columns].fillna(0) return df
[docs]def get_co_changes(log=None, by=None, on=None): """Generate co-changes report. Returns a DataFrame with the following columns: - primary: first path changed. - secondary: second path changed. - coupling: how often do the path change together. Args: log: output log from SCM. by: aggregation level. Defaults to path. on: Field name to join/merge on. Defaults to revision. Returns: pandas.DataFrame """ if by is None: by = "path" if on is None: on = "revision" df = log[[on, by]].drop_duplicates() sj = ( pd.merge(df, df, on=on) .rename(columns={by + "_x": by, by + "_y": "dependency"}) .groupby([by, "dependency"]) .count() .reset_index() ) result = pd.merge( sj[sj[by] == sj["dependency"]][[by, on]], sj[sj[by] != sj["dependency"]], on=by ).rename(columns={on + "_x": "changes", on + "_y": "cochanges"}) result["coupling"] = result["cochanges"] / result["changes"] return result[[by, "dependency", "changes", "cochanges", "coupling"]].sort_values( by="coupling", ascending=False )
[docs]def guess_components(paths, stop_words=None, n_clusters=8): """Guess components from an iterable of paths. Args: paths: list of string containing file paths in the project. stop_words: stop words. Passed to TfidfVectorizer. n_clusters: number of clusters. Passed to MiniBatchKMeans. Returns: pandas.DataFrame See Also: sklearn.feature_extraction.text.TfidfVectorizer sklearn.cluster.MiniBatchKMeans """ dirs = [os.path.dirname(p.replace("\\", "/")) for p in paths] vectorizer = sklearn.feature_extraction.text.TfidfVectorizer(stop_words=stop_words) transformed_dirs = vectorizer.fit_transform(dirs) algo = sklearn.cluster.MiniBatchKMeans clustering = algo(compute_labels=True, n_clusters=n_clusters) clustering.fit(transformed_dirs) def __cluster_name(center, threshold): df = pd.DataFrame( data={"feature": vectorizer.get_feature_names_out(), "weight": center} ) df.sort_values(by=["weight", "feature"], ascending=False, inplace=True) if (df["weight"] <= threshold).all(): return "" df = df[df["weight"] > threshold] return ".".join(df["feature"].tolist()) cluster_names = [ __cluster_name(center, 0.4) for center in clustering.cluster_centers_ ] components = [cluster_names[lbl] for lbl in clustering.labels_] rv = pd.DataFrame(data={"path": paths, "component": components}) rv.sort_values(by="component", inplace=True) return rv
# Exclude the parameters field for now. _lizard_fields = [ fld for fld in vars(lizard.FunctionInfo("", "")).keys() if fld not in ["filename", "parameters", "full_parameters"] ] _complexity_fields = _lizard_fields + "file_tokens file_nloc".split()
[docs]def get_complexity( group: typing.Union[pd.DataFrame, pd.Series], project: scm.Project ) -> pd.DataFrame: """Generate complexity information for files and revisions in dataframe. For each pair of (path, revision) in the input dataframe, analyze the code with lizard and return the output. Args: group: contains at least path and revision values. project: scm.Project derived class used to retrieve files for specific revision in `codemetrics.scm.DownloadResult` objects. Returns: Dataframe containing output of function-level lizard.analyze_ Example:: import codemetrics as cm log = cm.get_git_log() log.groupby(['revision', 'path']).\ apply(get_complexity, download_func=cm.git.download) .. _lizard.analyze: https://github.com/terryyin/lizard """ if len(group) == 0: internals.log.info("empty group %s", group) return pd.DataFrame({k: [] for k in _complexity_fields}) downloaded = project.download(group) path = downloaded.path content = downloaded.content info = lizard.analyze_file.analyze_source_code(path, content) df = pd.DataFrame(columns=_lizard_fields) if info.function_list: df = pd.DataFrame.from_records( [vars(d) for d in info.function_list], columns=_lizard_fields ) df = ( df.rename_axis("function") .assign( file_tokens=info.token_count, file_nloc=info.nloc, cyclomatic_complexity=lambda x: x["cyclomatic_complexity"].astype("Int32"), nloc=lambda x: x["nloc"].astype("Int32"), token_count=lambda x: x["token_count"].astype("Int32"), start_line=lambda x: x["start_line"].astype("Int32"), end_line=lambda x: x["end_line"].astype("Int32"), top_nesting_level=lambda x: x["top_nesting_level"].astype("Int32"), length=lambda x: x["length"].astype("Int32"), fan_in=lambda x: x["fan_in"].astype("Int32"), fan_out=lambda x: x["fan_out"].astype("Int32"), general_fan_out=lambda x: x["general_fan_out"].astype("Int32"), ) .astype({"name": "string", "long_name": "string"}) ) return df