blob: d10b54ffa43f9dec04ad36f2e30bcc18907e9eca [file] [log] [blame]
# Copyright (C) 2021 Apple Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import calendar
import os
import re
import requests
import sys
from .issue import Issue
from .tracker import Tracker as GenericTracker
from datetime import datetime
from webkitcorepy import credentials, decorators, OutputCapture
class Tracker(GenericTracker):
ROOT_RE = re.compile(r'\Ahttps?://(?P<domain>\S+)\Z')
def __init__(self, url, users=None, res=None):
super(Tracker, self).__init__(users=users)
match = self.ROOT_RE.match(url)
if not match:
raise self.Exception("'{}' is not a valid bugzilla url".format(url))
self.url = url
self._res = [
re.compile(r'\Ahttps?://{}/show_bug.cgi\?id=(?P<id>\d+)\Z'.format(match.group('domain'))),
re.compile(r'\A{}/show_bug.cgi\?id=(?P<id>\d+)\Z'.format(match.group('domain'))),
] + (res or [])
def user(self, name=None, username=None, email=None):
user = super(Tracker, self).user(name=name, username=username, email=email)
if user:
return user
if name and username and email:
return self.users.create(
name=name,
username=username,
emails=[email],
)
if not username and not email:
raise RuntimeError("Failed to find username for '{}'".format(name))
response = requests.get('{}/rest/user?names={}'.format(self.url, username or email or name))
response = response.json().get('users') if response.status_code // 100 == 2 else None
if not response:
return self.users.create(
name=name,
username=username,
emails=[email] if email else None,
)
return self.users.create(
name=response[0]['real_name'],
username=response[0]['name'],
emails=[response[0]['name']],
)
def from_string(self, string):
for regex in self._res:
match = regex.match(string)
if match:
return self.issue(int(match.group('id')))
return None
def credentials(self, required=True):
return credentials(
url=self.url,
required=required,
prompt=self.url.split('//')[-1],
)
def issue(self, id):
return Issue(id=int(id), tracker=self)
def populate(self, issue, member=None):
issue._link = '{}/show_bug.cgi?id={}'.format(self.url, issue.id)
if member in ['title', 'timestamp', 'creator', 'opened', 'assignee', 'watchers']:
response = requests.get('{}/rest/bug/{}'.format(self.url, issue.id))
response = response.json().get('bugs', []) if response.status_code == 200 else None
if response:
response = response[0]
issue._title = response['summary']
issue._timestamp = int(calendar.timegm(datetime.strptime(response['creation_time'], '%Y-%m-%dT%H:%M:%SZ').timetuple()))
if response.get('creator_detail'):
issue._creator = self.user(
name=response['creator_detail'].get('real_name'),
username=response['creator_detail'].get('name'),
email=response['creator_detail'].get('email'),
)
else:
issue._creator = self.user(username=response['creator']) if response.get('creator') else None
issue._opened = response['status'] != 'RESOLVED'
if response.get('assigned_to_detail'):
issue._assignee = self.user(
name=response['assigned_to_detail'].get('real_name'),
username=response['assigned_to_detail'].get('name'),
email=response['assigned_to_detail'].get('email'),
)
else:
issue._assignee = self.user(username=response['assigned_to']) if response.get('assigned_to') else None
issue._watchers = []
for name in response.get('cc', []):
issue._watchers.append(self.user(username=name))
else:
sys.stderr.write("Failed to fetch '{}'\n".format(issue.link))
if member in ['description', 'comments']:
response = requests.get('{}/rest/bug/{}/comment'.format(self.url, issue.id))
if response.status_code == 200:
response = response.json().get('bugs', {}).get(str(issue.id), {}).get('comments', None)
else:
response = None
if response:
issue._description = response[0].get('text')
issue._comments = [
Issue.Comment(
user=self.user(username=comment['creator']),
timestamp=int(calendar.timegm(datetime.strptime(comment['creation_time'], '%Y-%m-%dT%H:%M:%SZ').timetuple())),
content=comment['text'],
) for comment in response[1:]
]
else:
sys.stderr.write("Failed to fetch comments for '{}'\n".format(issue.link))
if member == 'references':
issue._references = []
refs = set()
for text in [issue.description] + [comment.content for comment in issue.comments]:
for match in self.REFERENCE_RE.findall(text):
candidate = GenericTracker.from_string(match[0]) or self.from_string(match[0])
if not candidate or candidate.link in refs or candidate.id == issue.id:
continue
issue._references.append(candidate)
refs.add(candidate.link)
response = requests.get('{}/rest/bug/{}?include_fields=see_also'.format(self.url, issue.id))
response = response.json().get('bugs', []) if response.status_code == 200 else None
if response:
for link in response[0].get('see_also', []):
candidate = GenericTracker.from_string(link) or self.from_string(link)
if not candidate or candidate.link in refs or candidate.id == issue.id:
continue
issue._references.append(candidate)
refs.add(candidate.link)
else:
sys.stderr.write("Failed to fetch related issues for '{}'\n".format(issue.link))
return issue