#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Git related functions."""
import datetime as dt
import pathlib as pl
import re
import typing
import numpy as np
import pandas as pd
import tqdm
from . import internals, scm
from .internals import log
default_client = "git"
def _parse_path_info(path_info: str) -> typing.Tuple[str, typing.Optional[str]]:
"""Parse path in git log
Reconstruct the relative path and the path it was copied from if " => " is found in the
path information.
Params:
path_info: git log path information. Can be a path or something like old_path => new_path
and variants where the common parts between old and new path are factored with braces.
Returns:
rel_path and copy_from_path where copy_from_path can be None.
"""
copy_from_path: typing.Optional[str] = None
try:
left, right = path_info.split(" => ")
try:
prefix, copy_from_path = left.split("{")
rel_path, suffix = right.split("}")
copy_from_path = (prefix + copy_from_path + suffix).replace("//", "/")
rel_path = (prefix + rel_path + suffix).replace("//", "/")
except ValueError: # no braces implies no prefix or suffix
copy_from_path = left
rel_path = right
except ValueError: # => was not found, no copy from.
rel_path = path_info
copy_from_path = None
return rel_path, copy_from_path
class _GitLogCollector(scm.ScmLogCollector):
"""Collect log from Git."""
_args = [
"log",
'--pretty=format:"[%h] [%an] [%ad] [%s]"',
"--date=iso",
"--numstat",
]
def __init__(
self, git_client: str = default_client, cwd: pl.Path = None, _pdb: bool = False
):
"""Initialize.
Compiles regular expressions to be used during parsing of log.
Args:
cwd: root of the directory under SCM.
git_client: name of git client.
_pdb: drop in debugger when output cannot be parsed.
"""
super().__init__(cwd=cwd)
self._pdb = _pdb
self.git_client = git_client
self.log_re = re.compile(r"^([-\d]+)\s+([-\d]+)\s+(.*)$")
def parse_path_elem(
self, path_elem: str
) -> typing.Tuple[int, int, str, typing.Optional[str]]:
"""Parse git output to identify lines added, removed and path.
Also handle renamed path.
Args:
path_elem: path element line.
Returns:
Quadruplet of added, removed, rel_path, copy_from_path where
copy_from_path may be None.
"""
match_log = self.log_re.match(path_elem)
if not match_log:
raise ValueError(f"{path_elem} not understood")
added, removed, path_info = match_log.groups()
rel_path, copy_from_path = _parse_path_info(path_info)
added_as_int = int(added) if added != "-" else np.nan
removed_as_int = int(removed) if removed != "-" else np.nan
return added_as_int, removed_as_int, rel_path, copy_from_path
def process_entry(
self, log_entry: typing.List[str]
) -> typing.Generator[scm.LogEntry, None, None]:
"""Convert a single xml <logentry/> element to csv rows.
If the log includes entries about binary files, the added/removed
columns are set to numpy.nan special value. Only reference the new name
when a path changes.
Args:
log_entry: Git log entry paragraph.
Yields:
One or more csv rows.
"""
try:
rev, author, date_str, *remainder = log_entry[0][1:-1].split("] [")
except ValueError:
log.warning("failed to parse %s", log_entry[0])
raise
msg = "] [".join(remainder)
date = dt.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S %z")
if len(log_entry) < 2:
entry = scm.LogEntry(
rev,
author=author,
date=date,
path=None,
message=msg,
kind="X",
added=0,
removed=0,
copyfrompath=None,
)
yield entry
return
for path_elem in log_entry[1:]:
path_elem = path_elem.strip()
if not path_elem:
break
# git log shows special characters in paths to indicate moves.
try:
added, removed, relpath, copyfrompath = self.parse_path_elem(path_elem)
except ValueError as err:
log.error(f"failed to parse {path_elem}: {err}")
continue
# - indicate binary files.
entry = scm.LogEntry(
rev,
author=author,
date=date,
path=relpath,
message=msg,
kind="f",
added=added,
removed=removed,
copyfrompath=copyfrompath,
)
yield entry
return
def process_log_entries(
self, text: typing.Sequence[str]
) -> typing.Generator[scm.LogEntry, None, None]:
"""See :member:`_ScmLogCollector.process_log_entries`."""
log_entry: typing.List[str] = []
for line in text:
# Unquote output. Not sure if anything is escaped though...
if len(line) > 2 and line[0] == '"' and line[-1] == '"':
line = line[1:-1]
if line.startswith("["):
if log_entry:
yield from self.process_entry(log_entry)
log_entry = []
log_entry.append(line)
continue
if not log_entry:
continue
log_entry.append(line)
if line != "":
continue
if log_entry:
yield from self.process_entry(log_entry)
return
def get_log(
self,
path: str = ".",
after: dt.datetime = None,
before: dt.datetime = None,
progress_bar: tqdm.tqdm = None,
) -> pd.DataFrame:
"""Retrieve log from git.
Args:
path: location of checked out subversion repository root. Defaults to .
after: only get the log after time stamp. Defaults to one year ago.
before: only get the log before time stamp. Defaults to now.
progress_bar: tqdm.tqdm progress bar.
Returns:
pandas.DataFrame with columns matching the fields of
codemetrics.scm.LogEntry.
"""
internals.check_run_in_root(path, self.cwd)
after, before = internals.handle_default_dates(after, before)
if progress_bar is not None and after is None:
raise ValueError("progress_bar requires 'after' parameter")
command = [self.git_client] + self._args
if after:
command += ["--after", f"{after:%Y-%m-%d}"]
if before:
command += ["--before", f"{before:%Y-%m-%d}"]
command.append(path)
# For debugging
# if self._pdb:
# import pdb
# pdb.set_trace()
results = internals.run(command, cwd=self.cwd).split("\n")
return self.process_log_output_to_df(
results, after=after, progress_bar=progress_bar
)
class _GitFileDownloader(scm.ScmDownloader):
"""Download files from Subversion."""
def __init__(self, git_client: str = None, cwd: pl.Path = None):
"""Initialize downloader.
Args:
git_client: name of git client.
"""
if not git_client:
git_client = default_client
super().__init__(client=git_client, command=["show"], cwd=cwd)
def _download(
self, revision: str, path: typing.Optional[str]
) -> scm.DownloadResult:
"""Download specific file and revision from git."""
command = self.command + [f"{revision}:{path}"]
content = internals.run(command, cwd=self.cwd)
return scm.DownloadResult(revision, path, content)
[docs]class GitProject(scm.Project):
"""Project for git SCM."""
def __init__(self, cwd: pl.Path = pl.Path("."), client: str = "git"):
"""Initialize a Subversion project.
Args:
cwd: root of the SCM project. Defaults to current directory.
client: git client. Defaults to git.
"""
super().__init__(cwd)
self.client = client
[docs] def download(self, data: pd.DataFrame) -> scm.DownloadResult:
"""Download results from Git.
Args:
data: pd.DataFrame containing at least revision and path.
Returns:
list of file contents.
"""
downloader = _GitFileDownloader(git_client=self.client, cwd=self.cwd)
df = data[["revision", "path"]]
if isinstance(df, pd.Series):
df = df.to_frame().T
revision, path = next(df.itertuples(index=False))
return downloader.download(revision, path)
[docs] def get_log(
self,
path: str = ".",
after: dt.datetime = None,
before: dt.datetime = None,
progress_bar: tqdm.tqdm = None,
# FIXME: Needed for Subversion though may be a better way.
relative_url: str = None,
_pdb: bool = False,
) -> pd.DataFrame:
"""Entry point to retrieve git log.
Args:
path: location of checked out file/directory to get the log for.
after: only get the log after time stamp. Defaults to one year ago.
before: only get the log before time stamp. Defaults to now.
progress_bar: tqdm.tqdm progress bar.
_pdb: drop in debugger on parsing errors.
Returns:
pandas.DataFrame with columns matching the fields of
codemetrics.scm.LogEntry.
Example::
last_year = datetime.datetime.now() - datetime.timedelta(365)
log_df = cm.git.get_git_log(path='src', after=last_year)
"""
collector = _GitLogCollector(git_client=self.client, cwd=self.cwd, _pdb=_pdb)
return collector.get_log(
after=after, before=before, path=path, progress_bar=progress_bar
)
[docs]def download(
data: pd.DataFrame, client: str = None, cwd: pl.Path = None
) -> scm.DownloadResult:
"""Downloads files from Subversion.
Args:
data: dataframe containing at least a (path, revision) columns to
identify the files to download.
client: Git client executable. Defaults to git.
cwd: working directory, typically the root of the directory under SCM.
Returns:
list of scm.DownloadResult.
"""
if not client:
client = default_client
downloader = _GitFileDownloader(git_client=client, cwd=cwd)
revision, path = next(data[["revision", "path"]].itertuples(index=False))
return downloader.download(revision, path)