Source code for codemetrics.svn

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""_SvnLogCollector related functions."""

import datetime as dt
import pathlib as pl
import re
import subprocess
import typing

# noinspection PyPep8Naming,PyPep8Naming
import xml.etree.ElementTree as ET

import numpy as np
import pandas as pd
import tqdm

from . import internals, scm
from .internals import log

default_client = "svn"


[docs]def to_date(datestr: str): """Convert str to datetime.datetime. The date returned by _SvnLogCollector is UTC according to git-svn man page. Date tzinfo is set to UTC. added and removed columns are set to np.nan for now. """ from dateutil import parser return parser.parse(datestr).replace(tzinfo=dt.timezone.utc)
[docs]def to_bool(bool_str: str): """Convert str to bool.""" bool_str_lc = bool_str.lower() if bool_str_lc in ("true", "1", "t"): return True if bool_str_lc in ("false", "0", "f", ""): return False raise ValueError(f"cannot interpret {bool_str} as a bool")
class _SvnLogCollector(scm.ScmLogCollector): """_ScmLogCollector interface adapter for _SvnLogCollector.""" _args = "log --xml -v".split() def __init__( self, cwd: pl.Path = None, svn_client: str = None, relative_url: str = None, ) -> None: """Initialize. Args: cwd: root of the directory under SCM. svn_client: name of svn client. relative_url: Subversion relative url (e.g. /project/trunk/). """ if not svn_client: svn_client = default_client super().__init__(cwd=cwd) self.svn_client = svn_client # FIXME: Can we get rid of _relative_url? self._relative_url = relative_url def update_urls(self) -> typing.Optional[str]: """Relative URL so we can generate local paths.""" rel_url_re = re.compile(r"^Relative URL: \^(.*)/?$") if not self._relative_url: # noinspection PyPep8 for line in internals.run( [self.svn_client, "info", "."], cwd=self.cwd ).split("\n"): match = rel_url_re.match(line) if match: self._relative_url = match.group(1) break return self._relative_url @property def relative_url(self) -> str: """Relative url of Subversion reposotory (e.g. /trunk/project). Note the carret (^) at the begining is stripped and there is no trailing slash at the end. """ if self._relative_url is None: self.update_urls() assert self._relative_url is not None return self._relative_url @staticmethod def _extract( elem: ET.Element, sub: str, log_entry: str, on_error=None ) -> typing.Optional[str]: """Extract subelement from element.""" try: subel = elem.find(f"./{sub}") if subel is not None: return subel.text except (AttributeError, SyntaxError) as err: log.warning("failed to retrieve %s in %s: %s", sub, log_entry, err) if on_error == "raise": raise return None def process_entry(self, log_entry: str): """Convert a single xml <logentry/> element to csv rows. Args: log_entry: <logentry/> element. Yields: One or more csv rows. """ elem = ET.fromstring(log_entry) rev = elem.attrib["revision"] author = self._extract(elem, "author", log_entry) date = self._extract(elem, "date", log_entry, "raise") message = self._extract(elem, "msg", log_entry) if message is not None: message = message.replace("\n", " ") rel_url_slash = self.relative_url + "/" for path_elem in elem.findall("*/path"): other = {} for sub in [ "text-mods", "kind", "action", "prop-mods", "copyfrom-rev", "copyfrom-path", ]: try: other[sub] = path_elem.attrib[sub] except (AttributeError, SyntaxError, KeyError): other[sub] = np.nan try: path_elem_text = path_elem.text assert path_elem_text is not None path = path_elem_text.replace(rel_url_slash, "") except (AttributeError, SyntaxError, ValueError) as err: msg = f"{err} processing rev {rev}" log.warning(msg) path = msg assert date is not None, "expected datetime got None" entry = scm.LogEntry( rev, author, to_date(date), path=path, message=message, textmods=to_bool(other["text-mods"]), kind=other["kind"], action=other["action"], propmods=to_bool(other["prop-mods"]), copyfromrev=other["copyfrom-rev"], copyfrompath=other["copyfrom-path"], added=np.nan, removed=np.nan, ) yield entry def process_log_entries(self, xml): # See parent. log_entry = "" for line in xml: if line.startswith("<logentry"): log_entry += line continue if not log_entry: continue log_entry += line if not line.startswith("</logentry>"): continue yield from self.process_entry(log_entry) log_entry = "" def get_log( self, path: str = ".", after: dt.datetime = None, before: dt.datetime = None, progress_bar: tqdm.tqdm = None, ) -> pd.DataFrame: """Entry point to retrieve _SvnLogCollector log. Call svn log --xml -v and return the output as a DataFrame. Args: path: local to get the log for. after: only get the log after time stamp. Defaults to one year ago. before: only get the log before time stamp. Defaults to now. progress_bar: tqdm.tqdm progress bar. Returns: pandas.DataFrame with columns matching the fields of codemetrics.scm.LogEntry. Example:: last_year = datetime.datetime.now() - datetime.timedelta(365) log_df = cm.git.get_git_log(path='src', after=last_year) """ internals.check_run_in_root(path, self.cwd) after, before = internals.handle_default_dates(after, before) before_str = "HEAD" if before: before_str = "{" + f"{before:%Y-%m-%d}" + "}" after_str = "{" + f"{after:%Y-%m-%d}" + "}" # noinspection PyPep8 command = ( [self.svn_client] + _SvnLogCollector._args + ["-r", f"{after_str}:{before_str}", path] ) results = internals.run(command, cwd=self.cwd).split("\n") return self.process_log_output_to_df( results, after=after, progress_bar=progress_bar )
[docs]class SvnDownloader(scm.ScmDownloader): """Download files from Subversion.""" def __init__( self, command: typing.List[str], svn_client: str = None, cwd: pl.Path = None ) -> None: """Initialize downloader. Args: svn_client: name of svn client. """ if not svn_client: svn_client = default_client super().__init__(command, client=svn_client, cwd=cwd) def _download(self, revision: str, path: str = None) -> scm.DownloadResult: """Download specific file and revision from git. Args: revision: revision number of the change set. path (optional): specific path to look up in this revision. Return: Result of svn command (e.g. show, diff, ...) """ command = self.command + [revision] if path: command += [path] content = internals.run(command, cwd=self.cwd) return scm.DownloadResult(revision, path, content)
[docs]def get_diff_stats( data: pd.DataFrame, svn_client: str = None, chunks=None, cwd: pl.Path = None ) -> typing.Union[None, pd.DataFrame]: """Download diff chunks statistics from Subversion. Args: data: revision ID of the change set. svn_client: Subversion client executable. Defaults to svn. chunks: if True, return statistics by chunk. Otherwise, return just added, and removed column for each path. If chunk is None, defaults to true for data frame and false for series. cwd: root of the directory under SCM. Returns: Dataframe containing the statistics for each chunk. Example:: import pandas as pd import codemetrics as cm log = cm.get_svn_log().set_index(['revision', 'path']) log.loc[:, ['added', 'removed']] = log.reset_index().\\ groupby('revision').\\ apply(cm.svn.get_diff_stats, chunks=False) """ if not svn_client: svn_client = default_client data = data.reset_index() # prevents messing with input frame. try: revision = data.iloc[0]["revision"] except Exception as err: log.warning("cannot find revision in group: %s\n%s", str(err), data) return None downloader = SvnDownloader(["diff", "--git", "-c"], svn_client=svn_client, cwd=cwd) try: downloaded = downloader.download(revision) except subprocess.CalledProcessError as err: message = f"cannot retrieve diff for {revision}: {err}: {err.stderr}" log.warning(message) return None df = scm.parse_diff_chunks(downloaded) if chunks is None: chunks = data.ndim == 2 if not chunks: df = df[["path", "added", "removed"]].groupby(["path"]).sum() else: df = df.set_index(["path", "chunk"]) return df
[docs]class SvnProject(scm.Project): """Project for Subversion SCM.""" def __init__(self, cwd: pl.Path = pl.Path(), client: str = "svn"): """Initialize a Subversion project. Args: cwd: root of the SCM project. client: svn client. """ super().__init__(cwd) self.client = client
[docs] def download(self, data: pd.DataFrame) -> scm.DownloadResult: """Download results from Subversion. Args: data: pd.DataFrame containing at least revision and path. Returns: list of file contents. """ downloader = SvnDownloader(["cat", "-r"], svn_client=self.client, cwd=self.cwd) df = data[["revision", "path"]] if isinstance(df, pd.Series): df = df.to_frame().T revision, path = next(df.itertuples(index=False)) return downloader.download(revision, path)
[docs] def get_log( self, path: str = ".", after: dt.datetime = None, before: dt.datetime = None, progress_bar: tqdm.tqdm = None, # FIXME: Why do we need path _and_ relative_url relative_url: str = None, _pdb=False, ) -> pd.DataFrame: """Entry point to retrieve svn log. Args: path: location to retrieve the log for. after: only get the log after time stamp. Defaults to one year ago. before: only get the log before time stamp. Defaults to now. progress_bar: tqdm.tqdm progress bar. relative_url: Subversion relative url (e.g. /project/trunk/). _pdb: drop in debugger on parsing errors. Returns: pandas.DataFrame with columns matching the fields of :class:`codemetrics.scm.LogEntry`. Example:: last_year = datetime.datetime.now() - datetime.timedelta(365) log_df = cm.svn.get_svn_log(path='src', after=last_year) """ collector = _SvnLogCollector( cwd=self.cwd, svn_client=self.client, relative_url=relative_url ) return collector.get_log( path=path, after=after, before=before, progress_bar=progress_bar )