blob: acd11f00b9ebbc76aa021ab2323a4858212f7404 [file] [log] [blame]
# Copyright (C) 2021-2022 Apple Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import calendar
import re
import requests
import sys
import time
import webkitcorepy
from .issue import Issue
from .tracker import Tracker as GenericTracker
from datetime import datetime
from webkitbugspy import User
class Tracker(GenericTracker):
ROOT_RE = re.compile(r'\Ahttps?://(?P<domain>\S+)\Z')
RE_TEMPLATES = [
r'\Ahttps?://{}/show_bug.cgi\?id=(?P<id>\d+)\Z',
r'\A{}/show_bug.cgi\?id=(?P<id>\d+)\Z',
]
class Encoder(GenericTracker.Encoder):
@webkitcorepy.decorators.hybridmethod
def default(context, obj):
if isinstance(obj, Tracker):
result = dict(
type='bugzilla',
url=obj.url,
)
if obj._res[len(Tracker.RE_TEMPLATES):]:
result['res'] = [compiled.pattern for compiled in obj._res[len(Tracker.RE_TEMPLATES):]]
return result
if isinstance(context, type):
raise TypeError('Cannot invoke parent class when classmethod')
return super(Tracker.Encoder, context).default(obj)
def __init__(self, url, users=None, res=None, login_attempts=3, redact=None):
super(Tracker, self).__init__(users=users, redact=redact)
self._logins_left = login_attempts + 1 if login_attempts else 1
match = self.ROOT_RE.match(url)
if not match:
raise TypeError("'{}' is not a valid bugzilla url".format(url))
self.url = url
self._res = [
re.compile(template.format(match.group('domain')))
for template in self.RE_TEMPLATES
] + (res or [])
def user(self, name=None, username=None, email=None):
user = super(Tracker, self).user(name=name, username=username, email=email)
if user:
return user
if name and username and email:
return self.users.create(
name=name,
username=username,
emails=[email],
)
if not username and not email:
raise RuntimeError("Failed to find username for '{}'".format(name))
response = requests.get('{url}/rest/user{query}'.format(
url=self.url,
query=self._login_arguments(
required=False,
query='names={name}'.format(name=username or email or name),
),
))
if response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
response = response.json().get('users') if response.status_code // 100 == 2 else None
if not response:
return self.users.create(
name=name,
username=username,
emails=[email] if email else None,
)
return self.users.create(
name=response[0]['real_name'],
username=response[0]['name'],
emails=[response[0]['name']],
)
def from_string(self, string):
for regex in self._res:
match = regex.match(string)
if match:
return self.issue(int(match.group('id')))
return None
def credentials(self, required=True, validate=False):
def validater(username, password):
quoted_username = requests.utils.quote(username)
response = requests.get('{}/rest/user/{}?login={}&password={}'.format(self.url, quoted_username, quoted_username, requests.utils.quote(password)))
if response.status_code == 200:
return True
sys.stderr.write('Login to {} for {} failed\n'.format(self.url, username))
return False
return webkitcorepy.credentials(
url=self.url,
required=required,
prompt=self.url.split('//')[-1],
validater=validater,
validate_existing_credentials=validate
)
def _login_arguments(self, required=False, query=None):
if not self._logins_left:
if required:
raise RuntimeError('Exhausted login attempts')
return '?{}'.format(query) if query else ''
username, password = self.credentials(required=required)
if not username or not password:
return '?{}'.format(query) if query else ''
return '?login={username}&password={password}{query}'.format(
username=requests.utils.quote(username),
password=requests.utils.quote(password),
query='&{}'.format(query) if query else '',
)
@webkitcorepy.decorators.Memoize()
def me(self):
username, _ = self.credentials(required=True)
return self.user(username=username)
def issue(self, id):
return Issue(id=int(id), tracker=self)
def populate(self, issue, member=None):
issue._link = '{}/show_bug.cgi?id={}'.format(self.url, issue.id)
issue._labels = []
if member in ('title', 'timestamp', 'creator', 'opened', 'assignee', 'watchers', 'project', 'component', 'version'):
response = requests.get('{}/rest/bug/{}{}'.format(self.url, issue.id, self._login_arguments(required=False)))
if response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
response = response.json().get('bugs', []) if response.status_code == 200 else None
if response:
response = response[0]
issue._title = response['summary']
issue._timestamp = int(calendar.timegm(datetime.strptime(response['creation_time'], '%Y-%m-%dT%H:%M:%SZ').timetuple()))
if response.get('creator_detail'):
issue._creator = self.user(
name=response['creator_detail'].get('real_name'),
username=response['creator_detail'].get('name'),
email=response['creator_detail'].get('email'),
)
else:
issue._creator = self.user(username=response['creator']) if response.get('creator') else None
issue._opened = response['status'] != 'RESOLVED'
if response.get('assigned_to_detail'):
issue._assignee = self.user(
name=response['assigned_to_detail'].get('real_name'),
username=response['assigned_to_detail'].get('name'),
email=response['assigned_to_detail'].get('email'),
)
else:
issue._assignee = self.user(username=response['assigned_to']) if response.get('assigned_to') else None
issue._watchers = []
for name in response.get('cc', []):
issue._watchers.append(self.user(username=name))
issue._project = response.get('product', '')
issue._component = response.get('component', '')
issue._version = response.get('version', '')
else:
sys.stderr.write("Failed to fetch '{}'\n".format(issue.link))
if member in ['description', 'comments']:
response = requests.get('{}/rest/bug/{}/comment{}'.format(self.url, issue.id, self._login_arguments(required=False)))
if response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
if response.status_code == 200:
response = response.json().get('bugs', {}).get(str(issue.id), {}).get('comments', None)
else:
response = None
if response:
issue._description = response[0].get('text')
issue._comments = [
Issue.Comment(
user=self.user(username=comment['creator']),
timestamp=int(calendar.timegm(datetime.strptime(comment['creation_time'], '%Y-%m-%dT%H:%M:%SZ').timetuple())),
content=comment['text'],
) for comment in response[1:]
]
else:
sys.stderr.write("Failed to fetch comments for '{}'\n".format(issue.link))
if member == 'references':
issue._references = []
refs = set()
for text in [issue.description] + [comment.content for comment in issue.comments]:
for match in self.REFERENCE_RE.findall(text):
candidate = GenericTracker.from_string(match[0]) or self.from_string(match[0])
if not candidate or candidate.link in refs or candidate.id == issue.id:
continue
issue._references.append(candidate)
refs.add(candidate.link)
response = requests.get('{url}/rest/bug/{id}{query}'.format(
url=self.url, id=issue.id,
query=self._login_arguments(required=False, query='include_fields=see_also'),
))
if response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
response = response.json().get('bugs', []) if response.status_code == 200 else None
if response:
for link in response[0].get('see_also', []):
candidate = GenericTracker.from_string(link) or self.from_string(link)
if not candidate or candidate.link in refs or candidate.id == issue.id:
continue
issue._references.append(candidate)
refs.add(candidate.link)
else:
sys.stderr.write("Failed to fetch related issues for '{}'\n".format(issue.link))
return issue
def set(self, issue, assignee=None, opened=None, why=None, project=None, component=None, version=None, **properties):
update_dict = dict()
if properties:
raise TypeError("'{}' is an invalid property".format(list(properties.keys())[0]))
if assignee:
if not isinstance(assignee, User):
raise TypeError("Must assign to '{}', not '{}'".format(User, type(assignee)))
issue._assignee = self.user(name=assignee.name, username=assignee.username, email=assignee.email)
update_dict['assigned_to'] = issue._assignee.username
if opened is not None:
issue._opened = bool(opened)
if issue._opened:
update_dict['status'] = 'REOPENED'
why = why or 'Reopening bug'
else:
update_dict['status'] = 'RESOLVED'
update_dict['resolution'] = 'FIXED'
if why is not None:
update_dict['comment'] = dict(body=why)
if project or component or version:
if not project and len(self.projects) == 1:
project = list(self.projects.keys())[0]
if not project:
raise ValueError('No project provided')
if not self.projects.get(project):
raise ValueError("'{}' is not a recognized project".format(project))
components = sorted(self.projects.get(project, {}).get('components', {}).keys())
if not component and len(components) == 1:
component = components[0]
if not component:
raise ValueError('No component provided')
if component and component not in components:
raise ValueError("'{}' is not a recognized component of '{}'".format(component, project))
versions = []
if component:
versions = self.projects.get(project, {}).get('components', {}).get(component, {}).get('versions', [])
if not versions:
versions = self.projects.get(project, {}).get('versions', [])
if not version:
version = versions[0]
if version not in versions:
raise ValueError("'{}' is not a recognized version in '{} {}'".format(version, project, component))
update_dict['product'] = project
update_dict['component'] = component
update_dict['version'] = version
if update_dict:
update_dict['ids'] = [issue.id]
response = None
try:
response = requests.put(
'{}/rest/bug/{}{}'.format(self.url, issue.id, self._login_arguments(required=True)),
json=update_dict,
)
except RuntimeError as e:
sys.stderr.write('{}\n'.format(e))
if response and response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
if not response or response.status_code // 100 != 2:
if assignee:
issue._assignee = None
if opened is not None:
issue._opened = None
sys.stderr.write("Failed to modify '{}'\n".format(issue))
return None
elif project and component and version:
issue._project = project
issue._component = component
issue._version = version
return issue
def add_comment(self, issue, text):
response = None
try:
response = requests.post(
'{}/rest/bug/{}/comment{}'.format(self.url, issue.id, self._login_arguments(required=True)),
json=dict(comment=text),
)
except RuntimeError as e:
sys.stderr.write('{}\n'.format(e))
if response and response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
if not response or response.status_code // 100 != 2:
sys.stderr.write("Failed to add comment to '{}'\n".format(issue))
return None
result = Issue.Comment(
user=self.me(),
timestamp=int(time.time()),
content=text,
)
if not issue._comments:
self.populate(issue, 'comments')
issue._comments.append(result)
return result
@property
@webkitcorepy.decorators.Memoize()
def projects(self):
response = requests.get('{}/rest/product_enterable{}'.format(self.url, self._login_arguments(required=False)))
if response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
if response.status_code // 100 != 2:
sys.stderr.write("Failed to retrieve project list'\n")
return dict()
result = dict()
for id in response.json().get('ids', []):
id_response = requests.get('{}/rest/product/{}{}'.format(self.url, id, self._login_arguments(required=False)))
if response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
if response.status_code // 100 != 2:
sys.stderr.write("Failed to query bugzilla about prod '{}'\n".format(id))
continue
for product in id_response.json()['products']:
if not product['is_active']:
continue
result[product['name']] = dict(
description=product['description'],
versions=[version['name'] for version in product['versions']],
components=dict(),
)
for component in product['components']:
if not component['is_active']:
continue
result[product['name']]['components'][component['name']] = dict(description=component['description'])
return result
def create(
self, title, description,
project=None, component=None, version=None, assign=True,
):
if not title:
raise ValueError('Must define title to create bug')
if not description:
raise ValueError('Must define description to create bug')
if not project and len(self.projects.keys()) == 1:
project = list(self.projects.keys())[0]
elif not project:
project = webkitcorepy.Terminal.choose(
'What project should the bug be associated with?',
options=sorted(self.projects.keys()), numbered=True,
)
if project not in self.projects:
raise ValueError("'{}' is not a recognized product on {}".format(project, self.url))
if not component and len(self.projects[project]['components'].keys()) == 1:
component = list(self.projects[project]['components'].keys())[0]
elif not component:
component = webkitcorepy.Terminal.choose(
"What component in '{}' should the bug be associated with?".format(project),
options=sorted(self.projects[project]['components'].keys()), numbered=True,
)
if component not in self.projects[project]['components']:
raise ValueError("'{}' is not a recognized component in '{}'".format(component, project))
if not version and len(self.projects[project]['versions']) == 1:
version = self.projects[project]['versions'][0]
elif not version:
version = webkitcorepy.Terminal.choose(
"What version of '{}' should the bug be associated with?".format(project),
options=self.projects[project]['versions'], numbered=True,
)
if version not in self.projects[project]['versions']:
raise ValueError("'{}' is not a recognized version for '{}'".format(version, project))
params = dict(
summary=title,
description=description,
product=project,
component=component,
version=version,
)
if assign:
params['assigned_to'] = self.me().username
response = None
try:
response = requests.post('{}/rest/bug{}'.format(self.url, self._login_arguments(required=True)), json=params)
except RuntimeError as e:
sys.stderr.write('{}\n'.format(e))
if response and response.status_code // 100 == 4 and self._logins_left:
self._logins_left -= 1
if not response or response.status_code // 100 != 2:
sys.stderr.write("Failed to create bug: {}\n".format(
response.json().get('message', '?') if response else 'Login attempts exhausted'),
)
return None
return self.issue(response.json()['id'])