| # Copyright (c) 2011 Google Inc. All rights reserved. |
| # Copyright (c) 2009, 2018 Apple Inc. All rights reserved. |
| # Copyright (c) 2010 Research In Motion Limited. All rights reserved. |
| # Copyright (c) 2013 University of Szeged. All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| # |
| # WebKit's Python module for interacting with Bugzilla |
| |
| import logging |
| import mimetypes |
| import re |
| import socket |
| import sys |
| import urllib |
| |
| from datetime import datetime # used in timestamp() |
| |
| from webkitpy.common.unicode_compatibility import BytesIO, StringIO, unicode |
| from webkitpy.common.net.bugzilla.attachment import Attachment |
| from webkitpy.common.net.bugzilla.bug import Bug |
| |
| from webkitpy.common.config import committers |
| import webkitpy.common.config.urls as config_urls |
| from webkitpy.common.net.bugzilla.constants import BUGZILLA_DATE_FORMAT |
| from webkitpy.common.net.credentials import Credentials |
| from webkitpy.common.net.networktransaction import NetworkTransaction |
| from webkitpy.common.system.user import User |
| from webkitpy.thirdparty.BeautifulSoup import BeautifulSoup, BeautifulStoneSoup, SoupStrainer |
| |
| _log = logging.getLogger(__name__) |
| |
| |
| class EditUsersParser(object): |
| def __init__(self): |
| self._group_name_to_group_string_cache = {} |
| |
| def _login_and_uid_from_row(self, row): |
| first_cell = row.find("td") |
| # The first row is just headers, we skip it. |
| if not first_cell: |
| return None |
| # When there were no results, we have a fake "<none>" entry in the table. |
| if first_cell.find(text="<none>"): |
| return None |
| # Otherwise the <td> contains a single <a> which contains the login name or a single <i> with the string "<none>". |
| anchor_tag = first_cell.find("a") |
| login = unicode(anchor_tag.string).strip() |
| user_id = int(re.search(r"userid=(\d+)", str(anchor_tag['href'])).group(1)) |
| return (login, user_id) |
| |
| def login_userid_pairs_from_edit_user_results(self, results_page): |
| soup = BeautifulSoup(results_page, convertEntities=BeautifulSoup.HTML_ENTITIES) |
| results_table = soup.find(id="admin_table") |
| login_userid_pairs = [self._login_and_uid_from_row(row) for row in results_table('tr')] |
| # Filter out None from the logins. |
| return list(filter(lambda pair: bool(pair), login_userid_pairs)) |
| |
| def _group_name_and_string_from_row(self, row): |
| label_element = row.find('label') |
| group_string = unicode(label_element['for']) |
| group_name = unicode(label_element.find('strong').string).rstrip(':') |
| return (group_name, group_string) |
| |
| def user_dict_from_edit_user_page(self, page): |
| soup = BeautifulSoup(page, convertEntities=BeautifulSoup.HTML_ENTITIES) |
| user_table = soup.find("table", {'class': 'main'}) |
| user_dict = {} |
| for row in user_table('tr'): |
| label_element = row.find('label') |
| if not label_element: |
| continue # This must not be a row we know how to parse. |
| if row.find('table'): |
| continue # Skip the <tr> holding the groups table. |
| |
| key = label_element['for'] |
| if "group" in key: |
| key = "groups" |
| value = user_dict.get('groups', set()) |
| # We must be parsing a "tr" inside the inner group table. |
| (group_name, _) = self._group_name_and_string_from_row(row) |
| if row.find('input', {'type': 'checkbox', 'checked': 'checked'}): |
| value.add(group_name) |
| else: |
| value = unicode(row.find('td').string).strip() |
| user_dict[key] = value |
| return user_dict |
| |
| def _group_rows_from_edit_user_page(self, edit_user_page): |
| soup = BeautifulSoup(edit_user_page, convertEntities=BeautifulSoup.HTML_ENTITIES) |
| return soup('td', {'class': 'groupname'}) |
| |
| def group_string_from_name(self, edit_user_page, group_name): |
| # Bugzilla uses "group_NUMBER" strings, which may be different per install |
| # so we just look them up once and cache them. |
| if not self._group_name_to_group_string_cache: |
| rows = self._group_rows_from_edit_user_page(edit_user_page) |
| name_string_pairs = map(self._group_name_and_string_from_row, rows) |
| self._group_name_to_group_string_cache = dict(name_string_pairs) |
| return self._group_name_to_group_string_cache[group_name] |
| |
| |
| def timestamp(): |
| return datetime.now().strftime("%Y%m%d%H%M%S") |
| |
| |
| # A container for all of the logic for making and parsing bugzilla queries. |
| class BugzillaQueries(object): |
| |
| def __init__(self, bugzilla): |
| self._bugzilla = bugzilla |
| |
| def _is_xml_bugs_form(self, form): |
| # ClientForm.HTMLForm.find_control throws if the control is not found, |
| # so we do a manual search instead: |
| return "xml" in [control.id for control in form.controls] |
| |
| # This is kinda a hack. There is probably a better way to get this information from bugzilla. |
| def _parse_result_count(self, results_page): |
| result_count_text = BeautifulSoup(results_page).find(attrs={'class': 'bz_result_count'}).string |
| if result_count_text is None: |
| _log.warn("BeautifulSoup returned None while finding class: bz_result_count in:\n{}".format(results_page)) |
| return 0 |
| result_count_parts = result_count_text.strip().split(" ") |
| if result_count_parts[0] == "Zarro": |
| return 0 |
| if result_count_parts[0] == "One": |
| return 1 |
| return int(result_count_parts[0]) |
| |
| # Note: _load_query, _fetch_bug and _fetch_bugs_from_advanced_query |
| # are the only methods which access self._bugzilla. |
| |
| def _load_query(self, query): |
| self._bugzilla.authenticate() |
| full_url = "%s%s" % (config_urls.bug_server_url, query) |
| return self._bugzilla.browser.open(full_url) |
| |
| def _fetch_bugs_from_advanced_query(self, query): |
| results_page = self._load_query(query) |
| # Some simple searches can return a single result. |
| results_url = results_page.geturl() |
| if results_url.find("/show_bug.cgi?id=") != -1: |
| bug_id = int(results_url.split("=")[-1]) |
| return [self._fetch_bug(bug_id)] |
| if not self._parse_result_count(results_page): |
| _log.warn('Failed to find bugs for {}'.format(results_url)) |
| return [] |
| # Bugzilla results pages have an "XML" submit button at the bottom |
| # which can be used to get an XML page containing all of the <bug> elements. |
| # This is slighty lame that this assumes that _load_query used |
| # self._bugzilla.browser and that it's in an acceptable state. |
| self._bugzilla.browser.select_form(predicate=self._is_xml_bugs_form) |
| bugs_xml = self._bugzilla.browser.submit() |
| return self._bugzilla._parse_bugs_from_xml(bugs_xml) |
| |
| def _fetch_bug(self, bug_id): |
| return self._bugzilla.fetch_bug(bug_id) |
| |
| def _fetch_bug_ids_advanced_query(self, query): |
| soup = BeautifulSoup(self._load_query(query)) |
| # The contents of the <a> inside the cells in the first column happen |
| # to be the bug id. |
| return [int(bug_link_cell.find("a").string) |
| for bug_link_cell in soup('td', "first-child")] |
| |
| def _parse_attachment_ids_request_query(self, page, since=None): |
| # Formats |
| digits = re.compile("\d+") |
| attachment_href = re.compile("attachment.cgi\?id=\d+&action=review") |
| # if no date is given, return all ids |
| if not since: |
| attachment_links = SoupStrainer("a", href=attachment_href) |
| return [int(digits.search(tag["href"]).group(0)) |
| for tag in BeautifulSoup(page, parseOnlyThese=attachment_links)] |
| |
| # Parse the main table only |
| date_format = re.compile("\d{4}-\d{2}-\d{2} \d{2}:\d{2}") |
| mtab = SoupStrainer("table", {"class": "requests"}) |
| soup = BeautifulSoup(page, parseOnlyThese=mtab) |
| patch_ids = [] |
| |
| for row in soup.findAll("tr"): |
| patch_tag = row.find("a", {"href": attachment_href}) |
| if not patch_tag: |
| continue |
| patch_id = int(digits.search(patch_tag["href"]).group(0)) |
| date_tag = row.find("td", text=date_format) |
| if date_tag and datetime.strptime(date_format.search(unicode(date_tag)).group(0), "%Y-%m-%d %H:%M") < since: |
| continue |
| patch_ids.append(patch_id) |
| return patch_ids |
| |
| def _fetch_attachment_ids_request_query(self, query, since=None): |
| return self._parse_attachment_ids_request_query(self._load_query(query), since) |
| |
| def _parse_quips(self, page): |
| soup = BeautifulSoup(page, convertEntities=BeautifulSoup.HTML_ENTITIES) |
| quips = soup.find(text=re.compile(r"Existing quips:")).findNext("ul").findAll("li") |
| return [unicode(quip_entry.string) for quip_entry in quips] |
| |
| def fetch_quips(self): |
| return self._parse_quips(self._load_query("/quips.cgi?action=show")) |
| |
| # List of all r+'d bugs. |
| def fetch_bug_ids_from_pending_commit_list(self): |
| needs_commit_query_url = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=review%2B" |
| return self._fetch_bug_ids_advanced_query(needs_commit_query_url) |
| |
| def fetch_bugs_matching_quicksearch(self, search_string): |
| # We may want to use a more explicit query than "quicksearch". |
| # If quicksearch changes we should probably change to use |
| # a normal buglist.cgi?query_format=advanced query. |
| quicksearch_url = "buglist.cgi?quicksearch=%s" % urllib.quote(search_string) |
| return self._fetch_bugs_from_advanced_query(quicksearch_url) |
| |
| # Currently this returns all bugs across all components. |
| # In the future we may wish to extend this API to construct more restricted searches. |
| def fetch_bugs_matching_search(self, search_string): |
| query = "buglist.cgi?query_format=advanced" |
| if search_string: |
| query += "&short_desc_type=allwordssubstr&short_desc=%s" % urllib.quote(search_string) |
| return self._fetch_bugs_from_advanced_query(query) |
| |
| def fetch_patches_from_pending_commit_list(self): |
| return sum([self._fetch_bug(bug_id).reviewed_patches() |
| for bug_id in self.fetch_bug_ids_from_pending_commit_list()], []) |
| |
| def fetch_bugs_from_review_queue(self, cc_email=None): |
| query = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=review?" |
| |
| if cc_email: |
| query += "&emailcc1=1&emailtype1=substring&email1=%s" % urllib.quote(cc_email) |
| |
| return self._fetch_bugs_from_advanced_query(query) |
| |
| def fetch_bug_ids_from_commit_queue(self): |
| commit_queue_url = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=commit-queue%2B&order=Last+Changed" |
| return self._fetch_bug_ids_advanced_query(commit_queue_url) |
| |
| def fetch_patches_from_commit_queue(self): |
| # This function will only return patches which have valid committers |
| # set. It won't reject patches with invalid committers/reviewers. |
| return sum([self._fetch_bug(bug_id).commit_queued_patches() |
| for bug_id in self.fetch_bug_ids_from_commit_queue()], []) |
| |
| def fetch_bug_ids_from_review_queue(self): |
| review_queue_url = "buglist.cgi?query_format=advanced&bug_status=UNCONFIRMED&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&field0-0-0=flagtypes.name&type0-0-0=equals&value0-0-0=review?" |
| return self._fetch_bug_ids_advanced_query(review_queue_url) |
| |
| # This method will make several requests to bugzilla. |
| def fetch_patches_from_review_queue(self, limit=None): |
| # [:None] returns the whole array. |
| return sum([self._fetch_bug(bug_id).unreviewed_patches() |
| for bug_id in self.fetch_bug_ids_from_review_queue()[:limit]], []) |
| |
| # NOTE: This is the only client of _fetch_attachment_ids_request_query |
| # This method only makes one request to bugzilla. |
| def fetch_attachment_ids_from_review_queue(self, since=None, only_security_bugs=False): |
| review_queue_url = "request.cgi?action=queue&type=review&group=type" |
| if only_security_bugs: |
| review_queue_url += '&product=Security' |
| return self._fetch_attachment_ids_request_query(review_queue_url, since) |
| |
| # This only works if your account has edituser privileges. |
| # We could easily parse https://bugs.webkit.org/userprefs.cgi?tab=permissions to |
| # check permissions, but bugzilla will just return an error if we don't have them. |
| def fetch_login_userid_pairs_matching_substring(self, search_string): |
| review_queue_url = "editusers.cgi?action=list&matchvalue=login_name&matchstr=%s&matchtype=substr" % urllib.quote(search_string) |
| results_page = self._load_query(review_queue_url) |
| # We could pull the EditUsersParser off Bugzilla if needed. |
| return EditUsersParser().login_userid_pairs_from_edit_user_results(results_page) |
| |
| def is_invalid_bugzilla_email(self, search_string): |
| review_queue_url = "request.cgi?action=queue&requester=%s&product=&type=review&requestee=&component=&group=requestee" % urllib.quote(search_string) |
| results_page = self._load_query(review_queue_url) |
| return bool(re.search("did not match anything", results_page.read())) |
| |
| |
| class CommitQueueFlag(object): |
| mark_for_nothing = 0 |
| mark_for_commit_queue = 1 |
| mark_for_landing = 2 |
| |
| |
| class Bugzilla(object): |
| def __init__(self, committers=committers.CommitterList()): |
| self.authenticated = False |
| self.queries = BugzillaQueries(self) |
| self.committers = committers |
| self.cached_quips = [] |
| self.edit_user_parser = EditUsersParser() |
| self._browser = None |
| |
| def _get_browser(self): |
| if not self._browser: |
| self.setdefaulttimeout(600) |
| from webkitpy.thirdparty.autoinstalled.mechanize import Browser |
| self._browser = Browser() |
| self._browser.set_handle_robots(False) |
| return self._browser |
| |
| def _set_browser(self, value): |
| self._browser = value |
| |
| browser = property(_get_browser, _set_browser) |
| |
| def setdefaulttimeout(self, value): |
| socket.setdefaulttimeout(value) |
| |
| def open_url(self, url): |
| return NetworkTransaction().run(lambda: self.browser.open(url), url) |
| |
| def fetch_user(self, user_id): |
| self.authenticate() |
| edit_user_page = self.open_url(self.edit_user_url_for_id(user_id)) |
| return self.edit_user_parser.user_dict_from_edit_user_page(edit_user_page) |
| |
| def add_user_to_groups(self, user_id, group_names): |
| self.authenticate() |
| user_edit_page = self.open_url(self.edit_user_url_for_id(user_id)) |
| self.browser.select_form(nr=1) |
| for group_name in group_names: |
| group_string = self.edit_user_parser.group_string_from_name(user_edit_page, group_name) |
| self.browser.find_control(group_string).items[0].selected = True |
| self.browser.submit() |
| |
| def quips(self): |
| # We only fetch and parse the list of quips once per instantiation |
| # so that we do not burden bugs.webkit.org. |
| if not self.cached_quips: |
| self.cached_quips = self.queries.fetch_quips() |
| return self.cached_quips |
| |
| def bug_url_for_bug_id(self, bug_id, xml=False): |
| if not bug_id: |
| return None |
| content_type = "&ctype=xml&excludefield=attachmentdata" if xml else "" |
| return "%sshow_bug.cgi?id=%s%s" % (config_urls.bug_server_url, bug_id, content_type) |
| |
| def short_bug_url_for_bug_id(self, bug_id): |
| if not bug_id: |
| return None |
| return "http://webkit.org/b/%s" % bug_id |
| |
| def add_attachment_url(self, bug_id): |
| return "%sattachment.cgi?action=enter&bugid=%s" % (config_urls.bug_server_url, bug_id) |
| |
| def attachment_url_for_id(self, attachment_id, action="view"): |
| if not attachment_id: |
| return None |
| action_param = "" |
| if action and action != "view": |
| action_param = "&action=%s" % action |
| return "%sattachment.cgi?id=%s%s" % (config_urls.bug_server_url, |
| attachment_id, |
| action_param) |
| |
| def edit_user_url_for_id(self, user_id): |
| return "%seditusers.cgi?action=edit&userid=%s" % (config_urls.bug_server_url, user_id) |
| |
| def _parse_attachment_flag(self, |
| element, |
| flag_name, |
| attachment, |
| result_key): |
| flag = element.find('flag', attrs={'name': flag_name}) |
| if flag: |
| attachment[flag_name] = flag['status'] |
| if flag['status'] == '+': |
| attachment[result_key] = flag['setter'] |
| # Sadly show_bug.cgi?ctype=xml does not expose the flag modification date. |
| |
| def _string_contents(self, soup): |
| # WebKit's bugzilla instance uses UTF-8. |
| # BeautifulStoneSoup always returns Unicode strings, however |
| # the .string method returns a (unicode) NavigableString. |
| # NavigableString can confuse other parts of the code, so we |
| # convert from NavigableString to a real unicode() object using unicode(). |
| return unicode(soup.string) |
| |
| @classmethod |
| def _parse_date(cls, date_string): |
| (date, time, time_zone) = date_string.split(" ") |
| if time.count(':') == 1: |
| # Add seconds into the time. |
| time += ':0' |
| # Ignore the timezone because python doesn't understand timezones out of the box. |
| date_string = "%s %s" % (date, time) |
| return datetime.strptime(date_string, BUGZILLA_DATE_FORMAT) |
| |
| def _date_contents(self, soup): |
| return self._parse_date(self._string_contents(soup)) |
| |
| def _parse_attachment_element(self, element, bug_id): |
| attachment = {} |
| attachment['bug_id'] = bug_id |
| if sys.version_info > (3, 0): |
| attachment['is_obsolete'] = (element.has_attr('isobsolete') and element['isobsolete'] == "1") |
| attachment['is_patch'] = (element.has_attr('ispatch') and element['ispatch'] == "1") |
| else: |
| attachment['is_obsolete'] = (element.has_key('isobsolete') and element['isobsolete'] == "1") |
| attachment['is_patch'] = (element.has_key('ispatch') and element['ispatch'] == "1") |
| attachment['id'] = int(element.find('attachid').string) |
| # FIXME: No need to parse out the url here. |
| attachment['url'] = self.attachment_url_for_id(attachment['id']) |
| attachment["attach_date"] = self._date_contents(element.find("date")) |
| attachment['name'] = self._string_contents(element.find('desc')) |
| attachment['attacher_email'] = self._string_contents(element.find('attacher')) |
| attachment['type'] = self._string_contents(element.find('type')) |
| self._parse_attachment_flag( |
| element, 'review', attachment, 'reviewer_email') |
| self._parse_attachment_flag( |
| element, 'commit-queue', attachment, 'committer_email') |
| return attachment |
| |
| def _parse_log_descr_element(self, element): |
| comment = {} |
| comment['comment_email'] = self._string_contents(element.find('who')) |
| comment['comment_date'] = self._date_contents(element.find('bug_when')) |
| comment['text'] = self._string_contents(element.find('thetext')) |
| return comment |
| |
| def _parse_bugs_from_xml(self, page): |
| soup = BeautifulSoup(page) |
| # Without the unicode() call, BeautifulSoup occasionally complains of being |
| # passed None for no apparent reason. |
| return [Bug(self._parse_bug_dictionary_from_xml(unicode(bug_xml)), self) for bug_xml in soup('bug')] |
| |
| def _parse_bug_dictionary_from_xml(self, page): |
| soup = BeautifulStoneSoup(page, convertEntities=BeautifulStoneSoup.XML_ENTITIES) |
| bug_element = soup.find('bug') |
| if bug_element and bug_element.get('error', '') == 'NotPermitted': |
| _log.warning("You don't have permission to view this bug.") |
| return {} |
| bug = {} |
| bug["id"] = int(soup.find("bug_id").string) |
| bug["title"] = self._string_contents(soup.find("short_desc")) |
| bug["bug_status"] = self._string_contents(soup.find("bug_status")) |
| dup_id = soup.find("dup_id") |
| if dup_id: |
| bug["dup_id"] = self._string_contents(dup_id) |
| bug["reporter_email"] = self._string_contents(soup.find("reporter")) |
| bug["assigned_to_email"] = self._string_contents(soup.find("assigned_to")) |
| bug["cc_emails"] = [self._string_contents(element) for element in soup.findAll('cc')] |
| bug["attachments"] = [self._parse_attachment_element(element, bug["id"]) for element in soup.findAll('attachment')] |
| bug["comments"] = [self._parse_log_descr_element(element) for element in soup.findAll('long_desc')] |
| bug['groups'] = frozenset([self._string_contents(group) for group in soup.findAll('group')]) |
| |
| return bug |
| |
| # Makes testing fetch_*_from_bug() possible until we have a better |
| # BugzillaNetwork abstration. |
| |
| def _fetch_bug_page(self, bug_id): |
| bug_url = self.bug_url_for_bug_id(bug_id, xml=True) |
| _log.info("Fetching: %s" % bug_url) |
| return self.open_url(bug_url) |
| |
| def fetch_bug_dictionary(self, bug_id): |
| try: |
| self.authenticate() |
| return self._parse_bug_dictionary_from_xml(self._fetch_bug_page(bug_id)) |
| except KeyboardInterrupt: |
| raise |
| |
| # FIXME: A BugzillaCache object should provide all these fetch_ methods. |
| |
| def fetch_bug(self, bug_id): |
| bug_dictionary = self.fetch_bug_dictionary(bug_id) |
| if bug_dictionary: |
| return Bug(bug_dictionary, self) |
| return None |
| |
| def fetch_attachment_contents(self, attachment_id): |
| attachment_url = self.attachment_url_for_id(attachment_id) |
| _log.info("Fetching: %s" % attachment_url) |
| # We need to authenticate to download patches from security bugs. |
| self.authenticate() |
| return self.open_url(attachment_url).read() |
| |
| class AccessError(Exception): |
| NOT_PERMITTED = 1 |
| OTHER = 2 |
| |
| def __init__(self, attachment_id, error_code, bug_title): |
| super(Bugzilla.AccessError, self).__init__('Failed to access {}'.format(attachment_id)) |
| self.attachment_id = attachment_id |
| self.error_code = error_code |
| self.bug_title = bug_title |
| |
| def _parse_bug_title_from_attachment_page(self, page): |
| return BeautifulSoup(page).find('div', attrs={'id': 'bug_title'}) |
| |
| def _parse_bug_id_from_attachment_page(self, page): |
| # The "Up" relation happens to point to the bug. |
| title = self._parse_bug_title_from_attachment_page(page) |
| if not title: |
| _log.warning("This attachment does not exist (or you don't have permission to view it).") |
| return (None, Bugzilla.AccessError.OTHER) |
| if title.getText() == 'Bug Access Denied': |
| _log.warning("You don't have permission to view this attachment.") |
| return (None, Bugzilla.AccessError.NOT_PERMITTED) |
| match = re.search("show_bug.cgi\?id=(?P<bug_id>\d+)", str(title)) |
| if not match: |
| _log.warning("Unable to parse bug id from attachment") |
| return (None, None) |
| return (int(match.group('bug_id')), None) |
| |
| def bug_id_for_attachment_id(self, attachment_id, throw_on_access_error=False): |
| return NetworkTransaction().run(lambda: self.get_bug_id_for_attachment_id(attachment_id, throw_on_access_error)) |
| |
| def get_bug_id_for_attachment_id(self, attachment_id, throw_on_access_error=False): |
| self.authenticate() |
| |
| attachment_url = self.attachment_url_for_id(attachment_id, 'edit') |
| _log.info("Fetching: %s" % attachment_url) |
| page = self.open_url(attachment_url) |
| bug_id, error_code = self._parse_bug_id_from_attachment_page(page) |
| if bug_id: |
| return bug_id |
| if error_code is not None and throw_on_access_error: |
| raise Bugzilla.AccessError(attachment_id, error_code, str(self._parse_bug_title_from_attachment_page(page))) |
| return None |
| |
| # FIXME: This should just return Attachment(id), which should be able to |
| # lazily fetch needed data. |
| |
| def fetch_attachment(self, attachment_id, throw_on_access_error=False): |
| # We could grab all the attachment details off of the attachment edit |
| # page but we already have working code to do so off of the bugs page, |
| # so re-use that. |
| bug_id = self.bug_id_for_attachment_id(attachment_id, throw_on_access_error) |
| if not bug_id: |
| _log.warning("Unable to parse bug_id from attachment {}".format(attachment_id)) |
| return None |
| attachments = self.fetch_bug(bug_id).attachments(include_obsolete=True) |
| for attachment in attachments: |
| if attachment.id() == int(attachment_id): |
| return attachment |
| _log.error("Error in fetching attachment {}, bug_id: {}".format(attachment_id, bug_id)) |
| return None # This should never be hit. |
| |
| def authenticate(self): |
| if self.authenticated: |
| return |
| |
| credentials = Credentials(config_urls.bug_server_host, git_prefix="bugzilla") |
| |
| attempts = 0 |
| while not self.authenticated: |
| attempts += 1 |
| username, password = credentials.read_credentials(use_stored_credentials=attempts == 1) |
| |
| _log.info("Logging in as %s..." % username) |
| self.open_url(config_urls.bug_server_url + |
| "index.cgi?GoAheadAndLogIn=1") |
| self.browser.select_form(name="login") |
| self.browser['Bugzilla_login'] = username |
| self.browser['Bugzilla_password'] = password |
| self.browser.find_control("Bugzilla_restrictlogin").items[0].selected = False |
| response = self.browser.submit() |
| |
| match = re.search("<title>(.+?)</title>", response.read()) |
| # If the resulting page has a title, and it contains the word |
| # "invalid" assume it's the login failure page. |
| if match and re.search("Invalid", match.group(1), re.IGNORECASE): |
| errorMessage = "Bugzilla login failed: %s" % match.group(1) |
| # raise an exception only if this was the last attempt |
| if attempts < 5: |
| _log.error(errorMessage) |
| else: |
| raise Exception(errorMessage) |
| else: |
| self.authenticated = True |
| self.username = username |
| |
| def _commit_queue_flag(self, commit_flag): |
| if commit_flag == CommitQueueFlag.mark_for_landing: |
| user = self.committers.contributor_by_email(self.username) |
| if not user: |
| _log.warning("Your Bugzilla login is not listed in contributors.json. Uploading with cq? instead of cq+") |
| elif not user.can_commit: |
| _log.warning("You're not a committer yet or haven't updated contributors.json yet. Uploading with cq? instead of cq+") |
| else: |
| return '+' |
| |
| if commit_flag != CommitQueueFlag.mark_for_nothing: |
| return '?' |
| return 'X' |
| |
| def _fill_attachment_form(self, |
| description, |
| file_object, |
| mark_for_review=False, |
| commit_flag=CommitQueueFlag.mark_for_nothing, |
| is_patch=False, |
| filename=None, |
| mimetype=None): |
| self.browser['description'] = description |
| if is_patch: |
| self.browser['ispatch'] = ("1",) |
| # FIXME: Should this use self._find_select_element_for_flag? |
| self.browser['flag_type-1'] = ('?',) if mark_for_review else ('X',) |
| self.browser['flag_type-3'] = (self._commit_queue_flag(commit_flag),) |
| |
| filename = filename or "%s.patch" % timestamp() |
| if not mimetype: |
| mimetypes.add_type('text/plain', '.patch') # Make sure mimetypes knows about .patch |
| mimetype, _ = mimetypes.guess_type(filename) |
| if not mimetype: |
| mimetype = "text/plain" # Bugzilla might auto-guess for us and we might not need this? |
| self.browser.add_file(file_object, mimetype, filename, 'data') |
| |
| def _file_object_for_upload(self, file_or_string): |
| if hasattr(file_or_string, 'read'): |
| return file_or_string |
| # Only if file_or_string is not already encoded do we want to encode it. |
| if isinstance(file_or_string, unicode): |
| file_or_string = file_or_string.encode('utf-8') |
| return BytesIO(file_or_string) |
| |
| # timestamp argument is just for unittests. |
| def _filename_for_upload(self, file_object, bug_id, extension="txt", timestamp=timestamp): |
| if hasattr(file_object, "name"): |
| return file_object.name |
| return "bug-%s-%s.%s" % (bug_id, timestamp(), extension) |
| |
| def add_attachment_to_bug(self, bug_id, file_or_string, description, filename=None, comment_text=None, mimetype=None): |
| self.authenticate() |
| _log.info('Adding attachment "%s" to %s' % (description, self.bug_url_for_bug_id(bug_id))) |
| self.open_url(self.add_attachment_url(bug_id)) |
| self.browser.select_form(name="entryform") |
| file_object = self._file_object_for_upload(file_or_string) |
| filename = filename or self._filename_for_upload(file_object, bug_id) |
| self._fill_attachment_form(description, file_object, filename=filename, mimetype=mimetype) |
| if comment_text: |
| _log.info(comment_text) |
| self.browser['comment'] = comment_text |
| self.browser.submit() |
| |
| @staticmethod |
| def _parse_attachment_id_from_add_patch_to_bug_response(response_html): |
| match = re.search('<title>Attachment (?P<attachment_id>\d+) added to Bug \d+</title>', response_html) |
| if match: |
| return match.group('attachment_id') |
| _log.warning('Unable to parse attachment id') |
| return None |
| |
| # FIXME: The arguments to this function should be simplified and then |
| # this should be merged into add_attachment_to_bug |
| def add_patch_to_bug(self, |
| bug_id, |
| file_or_string, |
| description, |
| comment_text=None, |
| mark_for_review=False, |
| mark_for_commit_queue=False, |
| mark_for_landing=False): |
| self.authenticate() |
| _log.info('Adding patch "%s" to %s' % (description, self.bug_url_for_bug_id(bug_id))) |
| |
| self.open_url(self.add_attachment_url(bug_id)) |
| self.browser.select_form(name="entryform") |
| file_object = self._file_object_for_upload(file_or_string) |
| filename = self._filename_for_upload(file_object, bug_id, extension="patch") |
| commit_flag = CommitQueueFlag.mark_for_nothing |
| if mark_for_landing: |
| commit_flag = CommitQueueFlag.mark_for_landing |
| elif mark_for_commit_queue: |
| commit_flag = CommitQueueFlag.mark_for_commit_queue |
| |
| self._fill_attachment_form(description, |
| file_object, |
| mark_for_review=mark_for_review, |
| commit_flag=commit_flag, |
| is_patch=True, |
| filename=filename) |
| if comment_text: |
| _log.info(comment_text) |
| self.browser['comment'] = comment_text |
| response = self.browser.submit() |
| return self._parse_attachment_id_from_add_patch_to_bug_response(response.read()) |
| |
| # FIXME: There has to be a more concise way to write this method. |
| def _check_create_bug_response(self, response_html): |
| match = re.search("<title>Bug (?P<bug_id>\d+) Submitted[^<]*</title>", |
| response_html) |
| if match: |
| return match.group('bug_id') |
| |
| match = re.search( |
| '<div id="bugzilla-body">(?P<error_message>.+)<div id="footer">', |
| response_html, |
| re.DOTALL) |
| error_message = "FAIL" |
| if match: |
| text_lines = BeautifulSoup( |
| match.group('error_message')).findAll(text=True) |
| error_message = "\n" + '\n'.join( |
| [" " + line.strip() |
| for line in text_lines if line.strip()]) |
| raise Exception("Bug not created: %s" % error_message) |
| |
| def create_bug(self, |
| bug_title, |
| bug_description, |
| component=None, |
| diff=None, |
| patch_description=None, |
| cc=None, |
| blocked=None, |
| assignee=None, |
| mark_for_review=False, |
| mark_for_commit_queue=False): |
| self.authenticate() |
| |
| _log.info('Creating bug with title "%s"' % bug_title) |
| self.open_url(config_urls.bug_server_url + "enter_bug.cgi?product=WebKit") |
| self.browser.select_form(name="Create") |
| component_items = self.browser.find_control('component').items |
| component_names = map(lambda item: item.name, component_items) |
| if not component: |
| component = "New Bugs" |
| if component not in component_names: |
| component = User.prompt_with_list("Please pick a component:", component_names) |
| self.browser["component"] = [component] |
| if cc: |
| self.browser["cc"] = cc |
| if blocked: |
| self.browser["blocked"] = unicode(blocked) |
| if not assignee: |
| assignee = self.username |
| if assignee and not self.browser.find_control("assigned_to").disabled: |
| self.browser["assigned_to"] = assignee |
| self.browser["short_desc"] = bug_title |
| self.browser["comment"] = bug_description |
| |
| if diff: |
| # _fill_attachment_form expects a file-like object |
| # Patch files are already binary, so no encoding needed. |
| assert(isinstance(diff, str)) |
| patch_file_object = StringIO(diff) |
| commit_flag = CommitQueueFlag.mark_for_nothing |
| if mark_for_commit_queue: |
| commit_flag = CommitQueueFlag.mark_for_commit_queue |
| |
| self._fill_attachment_form( |
| patch_description, |
| patch_file_object, |
| mark_for_review=mark_for_review, |
| commit_flag=commit_flag, |
| is_patch=True) |
| |
| response = self.browser.submit() |
| |
| bug_id = self._check_create_bug_response(response.read()) |
| _log.info("Bug %s created." % bug_id) |
| _log.info("%sshow_bug.cgi?id=%s" % (config_urls.bug_server_url, bug_id)) |
| return bug_id |
| |
| def _find_select_element_for_flag(self, flag_name): |
| # FIXME: This will break if we ever re-order attachment flags |
| if flag_name == "review": |
| return self.browser.find_control(type='select', nr=0) |
| elif flag_name == "commit-queue": |
| return self.browser.find_control(type='select', nr=1) |
| raise Exception("Don't know how to find flag named \"%s\"" % flag_name) |
| |
| def clear_attachment_flags(self, |
| attachment_id, |
| additional_comment_text=None): |
| self.authenticate() |
| |
| comment_text = "Clearing flags on attachment: %s" % attachment_id |
| if additional_comment_text: |
| comment_text += "\n\n%s" % additional_comment_text |
| _log.info(comment_text) |
| |
| self.open_url(self.attachment_url_for_id(attachment_id, 'edit')) |
| self.browser.select_form(nr=1) |
| self.browser.set_value(comment_text, name='comment', nr=1) |
| self._find_select_element_for_flag('review').value = ("X",) |
| self._find_select_element_for_flag('commit-queue').value = ("X",) |
| self.browser.submit() |
| |
| def set_flag_on_attachment(self, |
| attachment_id, |
| flag_name, |
| flag_value, |
| comment_text=None): |
| # FIXME: We need a way to test this function on a live bugzilla |
| # instance. |
| |
| self.authenticate() |
| _log.info(comment_text) |
| self.open_url(self.attachment_url_for_id(attachment_id, 'edit')) |
| self.browser.select_form(nr=1) |
| |
| if comment_text: |
| self.browser.set_value(comment_text, name='comment', nr=1) |
| |
| self._find_select_element_for_flag(flag_name).value = (flag_value,) |
| self.browser.submit() |
| |
| # FIXME: All of these bug editing methods have a ridiculous amount of |
| # copy/paste code. |
| |
| def obsolete_attachment(self, attachment_id, comment_text=None): |
| self.authenticate() |
| |
| _log.info("Obsoleting attachment: %s" % attachment_id) |
| self.open_url(self.attachment_url_for_id(attachment_id, 'edit')) |
| self.browser.select_form(nr=1) |
| self.browser.find_control('isobsolete').items[0].selected = True |
| # Also clear any review flag (to remove it from review/commit queues) |
| self._find_select_element_for_flag('review').value = ("X",) |
| self._find_select_element_for_flag('commit-queue').value = ("X",) |
| if comment_text: |
| _log.info(comment_text) |
| # Bugzilla has two textareas named 'comment', one is somehow |
| # hidden. We want the first. |
| self.browser.set_value(comment_text, name='comment', nr=1) |
| self.browser.submit() |
| |
| def add_cc_to_bug(self, bug_id, email_address_list): |
| self.authenticate() |
| |
| _log.info("Adding %s to the CC list for bug %s" % (email_address_list, bug_id)) |
| self.open_url(self.bug_url_for_bug_id(bug_id)) |
| self.browser.select_form(name="changeform") |
| self.browser["newcc"] = ", ".join(email_address_list) |
| self.browser.submit() |
| |
| def add_keyword_to_bug(self, bug_id, keyword): |
| self.authenticate() |
| |
| _log.info("Adding %s to the keyword list for bug %s" % (keyword, bug_id)) |
| self.open_url(self.bug_url_for_bug_id(bug_id)) |
| self.browser.select_form(name="changeform") |
| self.browser["keywords"] = keyword |
| self.browser.submit() |
| |
| def post_comment_to_bug(self, bug_id, comment_text, cc=None, see_also=None): |
| self.authenticate() |
| |
| _log.info("Adding comment to bug %s" % bug_id) |
| self.open_url(self.bug_url_for_bug_id(bug_id)) |
| self.browser.select_form(name="changeform") |
| self.browser["comment"] = comment_text |
| if see_also: |
| _log.info("Adding PR link to See Also List for bug %s" % bug_id) |
| self.browser["see_also"] = ", ".join(see_also) |
| if cc: |
| self.browser["newcc"] = ", ".join(cc) |
| self.browser.submit() |
| |
| def close_bug_as_fixed(self, bug_id, comment_text=None): |
| self.authenticate() |
| |
| _log.info("Closing bug %s as fixed" % bug_id) |
| self.open_url(self.bug_url_for_bug_id(bug_id)) |
| self.browser.select_form(name="changeform") |
| if comment_text: |
| self.browser['comment'] = comment_text |
| self.browser['bug_status'] = ['RESOLVED'] |
| self.browser['resolution'] = ['FIXED'] |
| self.browser.submit() |
| |
| def _has_control(self, form, id): |
| return id in [control.id for control in form.controls] |
| |
| def reassign_bug(self, bug_id, assignee=None, comment_text=None): |
| self.authenticate() |
| |
| if not assignee: |
| assignee = self.username |
| |
| _log.info("Assigning bug %s to %s" % (bug_id, assignee)) |
| self.open_url(self.bug_url_for_bug_id(bug_id)) |
| self.browser.select_form(name="changeform") |
| |
| if not self._has_control(self.browser, "assigned_to"): |
| _log.warning("""Failed to assign bug to you (can't find assigned_to) control. |
| Ignore this message if you don't have EditBugs privileges (https://bugs.webkit.org/userprefs.cgi?tab=permissions)""") |
| return |
| |
| if comment_text: |
| _log.info(comment_text) |
| self.browser["comment"] = comment_text |
| self.browser["assigned_to"] = assignee |
| self.browser.submit() |
| |
| def reopen_bug(self, bug_id, comment_text): |
| self.authenticate() |
| |
| _log.info("Re-opening bug %s" % bug_id) |
| # Bugzilla requires a comment when re-opening a bug, so we know it will |
| # never be None. |
| _log.info(comment_text) |
| self.open_url(self.bug_url_for_bug_id(bug_id)) |
| self.browser.select_form(name="changeform") |
| bug_status = self.browser.find_control("bug_status", type="select") |
| # This is a hack around the fact that ClientForm.ListControl seems to |
| # have no simpler way to ask if a control has an item named "REOPENED" |
| # without using exceptions for control flow. |
| possible_bug_statuses = list(map(lambda item: item.name, bug_status.items)) |
| if "REOPENED" in possible_bug_statuses: |
| bug_status.value = ["REOPENED"] |
| # If the bug was never confirmed it will not have a "REOPENED" |
| # state, but only an "UNCONFIRMED" state. |
| elif "UNCONFIRMED" in possible_bug_statuses: |
| bug_status.value = ["UNCONFIRMED"] |
| else: |
| # FIXME: This logic is slightly backwards. We won't print this |
| # message if the bug is already open with state "UNCONFIRMED". |
| _log.info("Did not reopen bug %s, it appears to already be open with status %s." % (bug_id, bug_status.value)) |
| self.browser['comment'] = comment_text |
| self.browser.submit() |