blob: fe98c24d7aa8230a57b9078a54f6d0149e7d99cc [file] [log] [blame]
# Copyright (C) 2020-2022 Apple Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import calendar
import logging
import os
import json
import re
import six
import subprocess
import sys
import time
from collections import defaultdict
from webkitcorepy import run, decorators, NestedFuzzyDict
from webkitscmpy.local import Scm
from webkitscmpy import remote, Commit, Contributor, log
class Git(Scm):
class Cache(object):
def __init__(self, repo, guranteed_for=10):
self.repo = repo
self._ordered_commits = {}
self._hash_to_identifiers = NestedFuzzyDict(primary_size=6)
self._ordered_revisions = {}
self._revisions_to_identifiers = {}
self._last_populated = {}
self._guranteed_for = guranteed_for
self.load()
def load(self):
if not os.path.exists(self.path):
return
try:
with open(self.path) as file:
content = json.load(file)
self._ordered_commits = content['hashes']
self._ordered_revisions = content['revisions']
self._fill(self.repo.default_branch)
for branch in self._ordered_commits.keys():
if branch == self.repo.default_branch:
continue
self._fill(branch)
except BaseException:
pass
@property
def path(self):
return os.path.join(self.repo.common_directory, 'identifiers.json')
def _fill(self, branch):
default_branch = self.repo.default_branch
if branch == default_branch:
branch_point = None
else:
branch_point = int(self._hash_to_identifiers[self._ordered_commits[branch][0]].split('@')[0])
index = len(self._ordered_commits[branch]) - 1
while index:
identifier = self._hash_to_identifiers.get(self._ordered_commits[branch][index])
if identifier:
id_branch = identifier.split('@')[-1]
if branch in (default_branch, id_branch):
break
if branch != self.repo.prioritize_branches((branch, id_branch)):
break
identifier = '{}@{}'.format('{}.{}'.format(branch_point, index) if branch_point else index, branch)
self._hash_to_identifiers[self._ordered_commits[branch][index]] = identifier
if self._ordered_revisions[branch][index]:
self._revisions_to_identifiers[self._ordered_revisions[branch][index]] = identifier
index -= 1
def populate(self, branch=None):
branch = branch or self.repo.branch
if not branch:
return
if self._last_populated.get(branch, 0) + self._guranteed_for > time.time():
return
default_branch = self.repo.default_branch
is_default_branch = branch == default_branch
if branch not in self._ordered_commits:
self._ordered_commits[branch] = [''] if is_default_branch else []
self._ordered_revisions[branch] = [0] if is_default_branch else []
# If we aren't on the default branch, we will need the default branch to determine when
# our branch intersects with the default branch.
if not is_default_branch:
self.populate(branch=self.repo.default_branch)
hashes = []
revisions = []
def _append(branch, hash, revision=None):
hashes.append(hash)
revisions.append(revision)
identifier = self._hash_to_identifiers.get(hash, '')
return identifier.endswith(default_branch) or identifier.endswith(branch)
intersected = False
log = None
try:
kwargs = dict()
if sys.version_info >= (3, 6):
kwargs = dict(encoding='utf-8')
self._last_populated[branch] = time.time()
log = subprocess.Popen(
[self.repo.executable(), 'log', branch, '--no-decorate', '--date=unix', '--'],
cwd=self.repo.root_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs
)
if log.poll():
raise self.repo.Exception("Failed to construct branch history for '{}'".format(branch))
hash = None
revision = None
line = log.stdout.readline()
while line:
if line.startswith(' git-svn-id: '):
match = self.repo.GIT_SVN_REVISION.match(line.lstrip())
if match:
revision = int(match.group('revision'))
if not line.startswith('commit '):
line = log.stdout.readline()
continue
if hash and _append(branch, hash, revision=revision):
hash = None
intersected = True
break
hash = line.split(' ')[1].rstrip()
revision = None
line = log.stdout.readline()
if hash:
intersected = _append(branch, hash, revision=revision)
finally:
# If our `git log` operation failed, we can't count on the validity of our cache
if log and log.returncode:
return
if log and log.poll() is None:
log.kill()
if not hashes or intersected and len(hashes) <= 1:
return
hashes.reverse()
revisions.reverse()
order = len(self._ordered_commits[branch]) - 1
while order > 0:
if hashes[0] == self._ordered_commits[branch][order]:
order -= 1
break
order -= 1
self._ordered_commits[branch] = self._ordered_commits[branch][:order + 1] + hashes
self._ordered_revisions[branch] = self._ordered_revisions[branch][:order + 1] + revisions
self._fill(branch)
try:
with open(self.path, 'w') as file:
json.dump(dict(
hashes=self._ordered_commits,
revisions=self._ordered_revisions,
), file, indent=4)
except (IOError, OSError):
self.repo.log("Failed to write identifier cache to '{}'".format(self.path))
def clear(self, branch):
for d in [self._ordered_commits, self._ordered_revisions, self._last_populated]:
if branch in d:
del d[branch]
self._hash_to_identifiers = NestedFuzzyDict(primary_size=6)
self._revisions_to_identifiers = {}
if self.repo.default_branch not in self._ordered_commits:
return
self._fill(self.repo.default_branch)
for branch in self._ordered_commits.keys():
if branch == self.repo.default_branch:
continue
self._fill(branch)
try:
with open(self.path, 'w') as file:
json.dump(dict(
hashes=self._ordered_commits,
revisions=self._ordered_revisions,
), file, indent=4)
except (IOError, OSError):
self.repo.log("Failed to write identifier cache to '{}'".format(self.path))
def to_hash(self, revision=None, identifier=None, populate=True, branch=None):
if revision:
identifier = self.to_identifier(revision=revision, populate=populate, branch=branch)
parts = Commit._parse_identifier(identifier, do_assert=False)
if not parts:
return None
_, b_count, branch = parts
if b_count < 0:
return None
if branch in self._ordered_commits and len(self._ordered_commits[branch]) > b_count:
return self._ordered_commits[branch][b_count]
self.load()
if branch in self._ordered_commits and len(self._ordered_commits[branch]) > b_count:
return self._ordered_commits[branch][b_count]
if populate:
self.populate(branch=branch)
return self.to_hash(identifier=identifier, populate=False)
return None
def to_revision(self, hash=None, identifier=None, populate=True, branch=None):
if hash:
identifier = self.to_identifier(hash=hash, populate=populate, branch=branch)
parts = Commit._parse_identifier(identifier, do_assert=False)
if not parts:
return None
_, b_count, branch = parts
if b_count < 0:
return None
if branch in self._ordered_revisions and len(self._ordered_revisions[branch]) > b_count:
return self._ordered_revisions[branch][b_count]
self.load()
if branch in self._ordered_revisions and len(self._ordered_revisions[branch]) > b_count:
return self._ordered_revisions[branch][b_count]
if populate:
self.populate(branch=branch)
return self.to_revision(identifier=identifier, populate=False)
return None
def to_identifier(self, hash=None, revision=None, populate=True, branch=None):
revision = Commit._parse_revision(revision, do_assert=False)
if revision:
if revision in self._revisions_to_identifiers:
return self._revisions_to_identifiers[revision]
self.load()
if revision in self._revisions_to_identifiers:
return self._revisions_to_identifiers[revision]
if populate:
self.populate(branch=branch)
return self.to_identifier(revision=revision, populate=False)
return None
hash = Commit._parse_hash(hash, do_assert=False)
if hash:
try:
candidate = self._hash_to_identifiers.get(hash)
except KeyError: # Means the hash wasn't specific enough
return None
if candidate:
return candidate
self.load()
candidate = self._hash_to_identifiers.get(hash)
if candidate:
return candidate
if populate:
self.populate(branch=branch)
return self.to_identifier(hash=hash, populate=False)
return None
GIT_COMMIT = re.compile(r'commit (?P<hash>[0-9a-f]+)')
SSH_REMOTE = re.compile('(ssh://)?git@(?P<host>[^:/]+)[:/](?P<path>.+).git')
HTTP_REMOTE = re.compile(r'(?P<protocol>https?)://(?P<host>[^\/]+)/(?P<path>.+).git')
REMOTE_BRANCH = re.compile(r'remotes\/(?P<remote>[^\/]+)\/(?P<branch>.+)')
USER_REMOTE = re.compile(r'(?P<name>[^:]+):(?P<branch>.+)')
GIT_CONFIG_EXTENSION = 'git_config_extension'
PROJECT_CONFIG_OPTIONS = {
'pull.rebase': ['true', 'false'],
'webkitscmpy.pull-request': ['overwrite', 'append'],
'webkitscmpy.history': ['when-user-owned', 'disabled', 'always', 'never'],
'webkitscmpy.update-fork': ['true', 'false'],
'webkitscmpy.auto-check': ['true', 'false'],
}
CONFIG_LOCATIONS = ['global', 'repository', 'project']
@classmethod
@decorators.Memoize()
def executable(cls):
return Scm.executable('git')
@classmethod
def is_checkout(cls, path):
return run([cls.executable(), 'rev-parse', '--show-toplevel'], cwd=path, capture_output=True).returncode == 0
@decorators.hybridmethod
@decorators.Memoize()
def config(context, location=None):
args = [context.executable(), 'config', '-l']
kwargs = dict(capture_output=True, encoding='utf-8')
if location and location not in context.CONFIG_LOCATIONS:
raise TypeError("'{}' is not a valid git config location".format(location))
if isinstance(context, type) and location in ['repository', 'project']:
raise TypeError("Cannot find '{}' git config without local checkout".format(location))
if isinstance(context, type) or location == 'global':
args += ['--global']
else:
kwargs['cwd'] = context.root_path
if location == 'project':
# Without a project config, use the library defaults
if not context.metadata or not os.path.isfile(os.path.join(context.metadata, context.GIT_CONFIG_EXTENSION)):
return {key: values[0] for key, values in context.PROJECT_CONFIG_OPTIONS.items()}
args += ['--file', os.path.join(context.metadata, context.GIT_CONFIG_EXTENSION)]
command = run(args, **kwargs)
if command.returncode:
sys.stderr.write("Failed to run '{}'{}\n".format(
' '.join(args),
'' if isinstance(context, type) else ' in {}'.format(context.root_path),
))
return {}
result = {}
for line in command.stdout.splitlines():
parts = line.split('=')
result[parts[0]] = '='.join(parts[1:])
# When no location argument is provided, combine the project config and the repository config
if not isinstance(context, type) and not location:
default_config_values = context.config(location='project')
else:
default_config_values = {key: values[0] for key, values in Git.PROJECT_CONFIG_OPTIONS.items()}
for key, value in default_config_values.items():
if not result.get(key):
result[key] = value
elif not Git.PROJECT_CONFIG_OPTIONS.get(key):
continue
elif result.get(key) not in Git.PROJECT_CONFIG_OPTIONS[key]:
sys.stderr.write("'{}' is not a valid value for '{}', using '{}' instead\n".format(result[key], key, value))
result[key] = value
return result
def __init__(self, path, dev_branches=None, prod_branches=None, contributors=None, id=None, cached=sys.version_info > (3, 0)):
super(Git, self).__init__(path, dev_branches=dev_branches, prod_branches=prod_branches, contributors=contributors, id=id)
self._branch = None
self.cache = self.Cache(self) if self.root_path and cached else None
if not self.root_path:
raise OSError('Provided path {} is not a git repository'.format(path))
@decorators.Memoize(cached=False)
def info(self):
if not self.is_svn:
raise self.Exception('Cannot run SVN info on a git checkout which is not git-svn')
info_result = run([self.executable(), 'svn', 'info'], cwd=self.path, capture_output=True, encoding='utf-8')
if info_result.returncode:
return {}
result = {}
for line in info_result.stdout.splitlines():
split = line.split(': ')
result[split[0]] = ': '.join(split[1:])
return result
@property
@decorators.Memoize()
def is_svn(self):
config = os.path.join(self.common_directory, 'config')
if not os.path.isfile(config):
return False
with open(config, 'r') as config:
for line in config.readlines():
if line.startswith('[svn-remote "svn"]'):
return True
return False
@property
def is_git(self):
return True
@property
@decorators.Memoize()
def root_path(self):
result = run([self.executable(), 'rev-parse', '--show-toplevel'], cwd=self.path, capture_output=True, encoding='utf-8')
if result.returncode:
return None
return result.stdout.rstrip()
@property
@decorators.Memoize()
def common_directory(self):
result = run([self.executable(), 'rev-parse', '--git-common-dir'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if result.returncode:
return os.path.join(self.root_path, '.git')
return os.path.abspath(os.path.join(self.root_path, result.stdout.rstrip()))
@property
@decorators.Memoize()
def default_branch(self):
for name in ['HEAD', 'main', 'master']:
result = run([self.executable(), 'rev-parse', '--symbolic-full-name', 'refs/remotes/origin/{}'.format(name)],
cwd=self.path, capture_output=True, encoding='utf-8')
s = result.stdout.strip()
if result.returncode == 0 and s:
assert s.startswith('refs/remotes/origin/')
return s[len('refs/remotes/origin/'):]
candidates = self.branches
if 'main' in candidates:
return 'main'
if 'master' in candidates:
return 'master'
return None
@property
def branch(self):
if self._branch:
return self._branch
status = run([self.executable(), 'status'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if status.returncode:
raise self.Exception('Failed to run `git status` for {}'.format(self.root_path))
if status.stdout.splitlines()[0].startswith('HEAD detached at'):
return None
result = run([self.executable(), 'rev-parse', '--abbrev-ref', 'HEAD'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if result.returncode:
raise self.Exception('Failed to retrieve branch for {}'.format(self.root_path))
self._branch = result.stdout.rstrip()
return self._branch
@property
def branches(self):
return self.branches_for()
def tags(self, remote=None):
if not remote:
tags = run([self.executable(), 'tag'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if tags.returncode:
raise self.Exception('Failed to retrieve tag list for {}'.format(self.root_path))
return tags.stdout.splitlines()
tags = run([self.executable(), 'ls-remote', '--tags', remote], cwd=self.root_path, capture_output=True, encoding='utf-8')
if tags.returncode:
raise self.Exception('Failed to retrieve tag list for {} in {}'.format(remote, self.root_path))
result = []
for line in tags.stdout.splitlines():
if line.endswith('^{}'):
continue
result.append('/'.join(line.split('/')[2:]))
return result
def url(self, name=None, cached=None):
return self.config(cached=cached).get('remote.{}.url'.format(name or 'origin'))
@decorators.Memoize()
def remote(self, name=None):
url = self.url(name=name)
ssh_match = self.SSH_REMOTE.match(url)
http_match = self.HTTP_REMOTE.match(url)
if ssh_match:
url = 'https://{}/{}'.format(ssh_match.group('host'), ssh_match.group('path'))
elif http_match:
url = '{}://{}/{}'.format(http_match.group('protocol'), http_match.group('host'), http_match.group('path'))
try:
return remote.Scm.from_url(url)
except OSError:
pass
return None
def _commit_count(self, native_parameter):
revision_count = run(
[self.executable(), 'rev-list', '--count', '--no-merges', native_parameter],
cwd=self.root_path, capture_output=True, encoding='utf-8',
)
if revision_count.returncode:
raise self.Exception('Failed to retrieve revision count for {}'.format(native_parameter))
return int(revision_count.stdout)
@decorators.Memoize(cached=False)
def branches_for(self, hash=None, remote=True):
branch = run(
[self.executable(), 'branch'] + (['--contains', hash] if hash else ['-a']),
cwd=self.root_path,
capture_output=True,
encoding='utf-8',
)
if branch.returncode:
raise self.Exception('Failed to retrieve branch list for {}'.format(self.root_path))
result = defaultdict(set)
for branch in [branch.lstrip(' *') for branch in filter(lambda branch: '->' not in branch, branch.stdout.splitlines())]:
match = self.REMOTE_BRANCH.match(branch)
if match:
result[match.group('remote')].add(match.group('branch'))
else:
result[None].add(branch)
if remote is False:
return sorted(result[None])
if remote is True:
return sorted(set.union(*result.values()))
if isinstance(remote, str):
return sorted(result[remote])
return result
def commit(self, hash=None, revision=None, identifier=None, branch=None, tag=None, include_log=True, include_identifier=True):
# Only git-svn checkouts can convert revisions to fully qualified commits, unless we happen to have a SVN cache built
if revision:
if hash:
raise ValueError('Cannot define both hash and revision')
hash = self.cache.to_hash(revision=revision, branch=branch) if self.cache else None
# If we don't have an SVN cache built, and we're not git-svn, we can't reason about revisions
if revision and not hash and not self.is_svn:
raise self.Exception('This git checkout does not support SVN revisions')
# Determine the hash for a provided Subversion revision
elif revision and not hash:
if hash:
raise ValueError('Cannot define both hash and revision')
revision = Commit._parse_revision(revision, do_assert=True)
revision_log = run(
[self.executable(), 'svn', 'find-rev', 'r{}'.format(revision)],
cwd=self.root_path, capture_output=True, encoding='utf-8',
timeout=3,
)
if revision_log.returncode:
raise self.Exception("Failed to retrieve commit information for 'r{}'".format(revision))
hash = revision_log.stdout.rstrip()
if not hash:
raise self.Exception("Failed to find 'r{}'".format(revision))
default_branch = self.default_branch
parsed_branch_point = None
log_format = ['-1', '--no-decorate', '--date=unix'] if include_log else ['-1', '--no-decorate', '--date=unix', '--format=short']
# Determine the `git log` output and branch for a given identifier
if identifier is not None:
if revision:
raise ValueError('Cannot define both revision and identifier')
if hash:
raise ValueError('Cannot define both hash and identifier')
if tag:
raise ValueError('Cannot define both tag and identifier')
parsed_branch_point, identifier, parsed_branch = Commit._parse_identifier(identifier, do_assert=True)
if parsed_branch:
if branch and branch != parsed_branch:
raise ValueError(
"Caller passed both 'branch' and 'identifier', but specified different branches ({} and {})".format(
branch, parsed_branch,
),
)
branch = parsed_branch
hash = self.cache.to_hash(identifier='{}@{}'.format(identifier, parsed_branch), branch=branch) if self.cache else None
# If the cache managed to convert the identifier to a hash, we can skip some computation
if hash:
log = run(
[self.executable(), 'log', hash] + log_format + ['--'],
cwd=self.root_path,
capture_output=True,
encoding='utf-8',
)
if log.returncode:
raise self.Exception("Failed to retrieve commit information for '{}'".format(hash))
# The cache has failed to convert the identifier, we need to do it the expensive way
else:
baseline = branch or 'HEAD'
is_default = baseline == default_branch
if baseline == 'HEAD':
is_default = default_branch in self.branches_for(baseline)
if is_default and parsed_branch_point:
raise self.Exception('Cannot provide a branch point for a commit on the default branch')
base_count = self._commit_count(baseline if is_default else '{}..{}'.format(default_branch, baseline))
if identifier > base_count:
raise self.Exception('Identifier {} cannot be found on the specified branch in the current checkout'.format(identifier))
log = run(
[self.executable(), 'log', '{}~{}'.format(branch or 'HEAD', base_count - identifier)] + log_format + ['--'],
cwd=self.root_path,
capture_output=True,
encoding='utf-8',
)
if log.returncode:
raise self.Exception("Failed to retrieve commit information for 'i{}@{}'".format(identifier, branch or 'HEAD'))
# Negative identifiers are actually commits on the default branch, we will need to re-compute the identifier
if identifier < 0 and is_default:
raise self.Exception('Illegal negative identifier on the default branch')
if identifier < 0:
identifier = None
# Determine the `git log` output for a given branch or tag
elif branch or tag:
if hash:
raise ValueError('Cannot define both tag/branch and hash')
if branch and tag:
raise ValueError('Cannot define both tag and branch')
log = run([self.executable(), 'log', branch or tag] + log_format + ['--'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if log.returncode:
raise self.Exception("Failed to retrieve commit information for '{}'".format(branch or tag))
# Determine the `git log` output for a given hash
else:
hash = Commit._parse_hash(hash, do_assert=True)
log = run([self.executable(), 'log', hash or 'HEAD'] + log_format + ['--'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if log.returncode:
raise self.Exception("Failed to retrieve commit information for '{}'".format(hash or 'HEAD'))
# Fully define the hash from the `git log` output
match = self.GIT_COMMIT.match(log.stdout.splitlines()[0])
if not match:
raise self.Exception('Invalid commit hash in git log')
hash = match.group('hash')
branch_point = None
# A commit is often on multiple branches, the canonical branch is the one with the highest priority
if branch != default_branch:
branch = self.prioritize_branches(self.branches_for(hash))
if not identifier and include_identifier:
cached_identifier = self.cache.to_identifier(hash=hash, branch=branch) if self.cache else None
if cached_identifier:
branch_point, identifier, branch = Commit._parse_identifier(cached_identifier)
# Compute the identifier if the function did not receive one and we were asked to
if not identifier and include_identifier:
identifier = self._commit_count(hash if branch == default_branch else '{}..{}'.format(default_branch, hash))
# Only compute the branch point we're on something other than the default branch
if not branch_point and include_identifier and branch != default_branch:
branch_point = self._commit_count(hash) - identifier
if branch_point and parsed_branch_point and branch_point != parsed_branch_point:
raise ValueError("Provided 'branch_point' does not match branch point of specified branch")
# Check the commit log for a git-svn revision
logcontent = '\n'.join(line[4:] for line in log.stdout.splitlines()[4:])
matches = self.GIT_SVN_REVISION.findall(logcontent)
revision = int(matches[-1].split('@')[0]) if matches else None
# We only care about when a commit was commited
commit_time = run(
[self.executable(), 'show', '-s', '--format=%ct', hash],
cwd=self.root_path, capture_output=True, encoding='utf-8',
)
if commit_time.returncode:
raise self.Exception('Failed to retrieve commit time for {}'.format(hash))
timestamp = int(commit_time.stdout.lstrip())
# Comparing commits in different repositories involves comparing timestamps. This is problematic because it git,
# it's possible for a series of commits to share a commit time. To handle this case, we assign each commit a
# zero-indexed "order" within it's timestamp.
order = 0
while not identifier or order + 1 < identifier + (branch_point or 0):
commit_time = run(
[self.executable(), 'show', '-s', '--format=%ct', '{}~{}'.format(hash, order + 1)],
cwd=self.root_path, capture_output=True, encoding='utf-8',
)
if commit_time.returncode:
break
if int(commit_time.stdout.lstrip()) != timestamp:
break
order += 1
return Commit(
repository_id=self.id,
hash=hash,
revision=revision,
identifier=identifier if include_identifier else None,
branch_point=branch_point,
branch=branch,
timestamp=timestamp,
order=order,
author=Contributor.from_scm_log(log.stdout.splitlines()[1], self.contributors),
message=logcontent if include_log else None,
)
def _args_from_content(self, content, include_log=True):
author = None
timestamp = None
for line in content.splitlines()[:4]:
split = line.split(': ')
if split[0] == 'Author':
author = Contributor.from_scm_log(line.lstrip(), self.contributors)
elif split[0] == 'CommitDate':
timestamp = int(line.split(' ')[-1])
message = ''
for line in content.splitlines()[5:]:
message += line[4:] + '\n'
matches = self.GIT_SVN_REVISION.findall(message)
return dict(
revision=int(matches[-1].split('@')[0]) if matches else None,
author=author,
timestamp=timestamp,
message=message.rstrip() if include_log else None,
)
def commits(self, begin=None, end=None, include_log=True, include_identifier=True):
begin, end = self._commit_range(begin=begin, end=end, include_identifier=include_identifier)
try:
log = None
log = subprocess.Popen(
[self.executable(), 'log', '--format=fuller', '--no-decorate', '--date=unix', '{}..{}'.format(begin.hash, end.hash), '--'],
cwd=self.root_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**(dict(encoding='utf-8') if sys.version_info > (3, 6) else dict())
)
if log.poll():
raise self.Exception("Failed to construct history for '{}'".format(end.branch))
line = log.stdout.readline()
previous = [end]
while line:
if not line.startswith('commit '):
raise OSError('Failed to parse `git log` format')
branch_point = previous[-1].branch_point
identifier = previous[-1].identifier
hash = line.split(' ')[-1].rstrip()
if hash != previous[-1].hash:
identifier -= 1
if not identifier:
identifier = branch_point
branch_point = None
content = ''
line = log.stdout.readline()
while line and not line.startswith('commit '):
content += line
line = log.stdout.readline()
commit = Commit(
repository_id=self.id,
hash=hash,
branch=end.branch if identifier and branch_point else self.default_branch,
identifier=identifier if include_identifier else None,
branch_point=branch_point if include_identifier else None,
order=0,
**self._args_from_content(content, include_log=include_log)
)
# Ensure that we don't duplicate the first and last commits
if commit.hash == previous[-1].hash:
previous[-1] = commit
# If we share a timestamp with the previous commit, that means that this commit has an order
# less than the set of commits cached in previous
elif commit.timestamp == previous[-1].timestamp:
for cached in previous:
cached.order += 1
previous.append(commit)
# If we don't share a timestamp with the previous set of commits, we should return all commits
# cached in previous.
else:
for cached in previous:
yield cached
previous = [commit]
for cached in previous:
cached.order += begin.order
yield cached
finally:
if log and log.poll() is None:
log.kill()
def find(self, argument, include_log=True, include_identifier=True):
if not isinstance(argument, six.string_types):
raise ValueError("Expected 'argument' to be a string, not '{}'".format(type(argument)))
# Map any candidate default branch to the one used by this repository
if argument in self.DEFAULT_BRANCHES:
argument = self.default_branch
# See if the argument the user specified is a recognized commit format
parsed_commit = Commit.parse(argument, do_assert=False)
if parsed_commit:
if parsed_commit.branch in self.DEFAULT_BRANCHES:
parsed_commit.branch = self.default_branch
return self.commit(
hash=parsed_commit.hash,
revision=parsed_commit.revision,
identifier=parsed_commit.identifier,
branch=parsed_commit.branch,
include_log=include_log,
include_identifier=include_identifier,
)
# The argument isn't a recognized commit format, hopefully it is a valid git ref of some form
output = run(
[self.executable(), 'rev-parse', argument],
cwd=self.root_path, capture_output=True, encoding='utf-8',
)
if output.returncode:
raise ValueError("'{}' is not an argument recognized by git".format(argument))
return self.commit(hash=output.stdout.rstrip(), include_log=include_log, include_identifier=include_identifier)
def _to_git_ref(self, argument):
if not argument:
return None
if not isinstance(argument, six.string_types):
raise ValueError("Expected 'argument' to be a string, not '{}'".format(type(argument)))
parsed_commit = Commit.parse(argument, do_assert=False)
if parsed_commit and not parsed_commit.hash:
return self.commit(
hash=parsed_commit.hash,
revision=parsed_commit.revision,
identifier=parsed_commit.identifier,
branch=parsed_commit.branch,
include_log=False,
include_identifier=False,
).hash
return argument
def checkout(self, argument):
self._branch = None
if log.level > logging.WARNING:
log_arg = ['-q']
elif log.level < logging.WARNING:
log_arg = ['--progress']
else:
log_arg = []
match = self.USER_REMOTE.match(argument)
rmt = self.remote()
if match and isinstance(rmt, remote.GitHub):
name = match.group('name')
username = name.split('/')[0]
repo_name = rmt.name if '/' not in name else name.split('/', 1)[-1]
name = username + repo_name[len(rmt.name):]
if not self.url(name):
url = self.url()
if '://' in url:
rmt = '{}://{}/{}/{}.git'.format(url.split(':')[0], url.split('/')[2], username, repo_name)
elif ':' in url:
rmt = '{}:{}/{}.git'.format(url.split(':')[0], username, repo_name)
else:
sys.stderr.write("Failed to convert '{}' to '{}' remote\n".format(url, username))
return None
if run(
[self.executable(), 'remote', 'add', name, rmt],
capture_output=True, cwd=self.root_path,
).returncode:
sys.stderr.write("Failed to add remote '{}' as '{}'\n".format(rmt, name))
return None
self.config.clear()
branch = match.group('branch')
rc = run(
[self.executable(), 'checkout'] + ['-B', branch, '{}/{}'.format(name, branch)] + log_arg,
cwd=self.root_path,
).returncode
if not rc:
return self.commit()
if rc == 128:
run([self.executable(), 'fetch', name], cwd=self.root_path)
return None if run(
[self.executable(), 'checkout'] + ['-B', branch, '{}/{}'.format(name, branch)] + log_arg,
cwd=self.root_path,
).returncode else self.commit()
return None if run(
[self.executable(), 'checkout', self._to_git_ref(argument)] + log_arg + ['--'],
cwd=self.root_path,
).returncode else self.commit()
def rebase(self, target, base=None, head='HEAD', recommit=True):
if head == self.default_branch or self.prod_branches.match(head):
raise RuntimeError("Rebasing production branch '{}' banned in tooling!".format(head))
target = self._to_git_ref(target)
base = self._to_git_ref(base)
head = self._to_git_ref(head)
code = run([self.executable(), 'rebase', '--onto', target, base or target, head], cwd=self.root_path).returncode
if self.cache:
self.cache.clear(head if head != 'HEAD' else self.branch)
if code or not recommit:
return code
return run([
self.executable(), 'filter-branch', '-f',
'--env-filter', "GIT_AUTHOR_DATE='{date}';GIT_COMMITTER_DATE='{date}'".format(
date='{} -{}'.format(int(time.time()), self.gmtoffset())
), 'refs/heads/{}...{}'.format(target, head),
], cwd=self.root_path, env={'FILTER_BRANCH_SQUELCH_WARNING': '1'}, capture_output=True).returncode
def fetch(self, branch, remote='origin'):
return run(
[self.executable(), 'fetch', remote, '{}:{}'.format(branch, branch)],
cwd=self.root_path,
).returncode
def pull(self, rebase=None, branch=None, remote='origin'):
commit = self.commit() if self.is_svn or branch else None
code = 0
if branch and self.branch != branch:
code = self.fetch(branch=branch, remote=remote)
if not code:
command = [self.executable(), 'pull'] + ([remote, branch] if branch else [])
if rebase is True:
command += ['--rebase=True', '--autostash']
elif rebase is False:
command += ['--rebase=False']
code = run(command, cwd=self.root_path).returncode
if self.cache and rebase and branch != self.branch:
self.cache.clear(self.branch)
if not code and branch and rebase:
result = run([self.executable(), 'rev-parse', 'HEAD'], cwd=self.root_path, capture_output=True, encoding='utf-8')
if not result.returncode and result.stdout.rstrip() != commit.hash:
code = run([
self.executable(),
'filter-branch', '-f',
'--env-filter', "GIT_AUTHOR_DATE='{date}';GIT_COMMITTER_DATE='{date}'".format(
date='{} -{}'.format(int(time.time()), self.gmtoffset())
), 'HEAD...{}'.format('{}/{}'.format(remote, branch)),
], cwd=self.root_path, env={'FILTER_BRANCH_SQUELCH_WARNING': '1'}).returncode
if not code and self.is_svn and commit.revision:
return run([
self.executable(), 'svn', 'fetch', '--log-window-size=5000', '-r', '{}:HEAD'.format(commit.revision),
], cwd=self.root_path).returncode
return code
def clean(self):
return run([
self.executable(), 'reset', 'HEAD', '--hard',
], cwd=self.root_path).returncode
def modified(self, staged=None):
if staged in [True, False]:
command = run(
[self.executable(), 'diff', '--name-only'] + (['--staged'] if staged else []),
capture_output=True, encoding='utf-8', cwd=self.root_path,
)
if command.returncode:
return []
return command.stdout.splitlines()
# When the user hasn't specified what they're looking for, we need to make some assumptions.
# If all staged files are added, the user probably wants to include non-staged files too
command = run(
[self.executable(), 'diff', '--name-status', '--staged'],
capture_output=True, encoding='utf-8', cwd=self.root_path,
)
if command.returncode:
return []
added = set()
for line in command.stdout.splitlines():
state, file = line.split(None, 1)
if state == 'A':
added.add(file)
staged = self.modified(staged=True)
if set(staged) - added:
return staged
return staged + self.modified(staged=False)
def diff_lines(self, base, head=None):
base = self._to_git_ref(base)
head = self._to_git_ref(head)
kwargs = dict()
if sys.version_info >= (3, 6):
kwargs = dict(encoding='utf-8')
target = '{}..{}'.format(base, head) if head else base
proc = subprocess.Popen(
[self.executable(), 'diff', target],
cwd=self.root_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs
)
if proc.poll():
sys.stderr.write("Failed to generate diff for '{}'\n".format(target))
line = proc.stdout.readline()
while line:
yield line.rstrip()
line = proc.stdout.readline()