blob: f4dc29a8d17264d5833b1102ea172ffc0c514103 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (C) 2019-2020 Apple Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of Apple Inc. ("Apple") nor the names of
# its contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# A webkitpy import needs to go first for autoinstaller to work with subsequent imports.
from webkitpy.common.net.credentials import Credentials
import argparse
import atexit
import logging
import requests
import os
from dateutil.parser import parse
_log = logging.getLogger(os.path.basename(__file__))
logging.basicConfig(level=logging.INFO, format='%(message)s')
bugzilla_self_user_id = None
bugzilla_token = None
def get_bugzilla_token():
credentials = Credentials('bugs.webkit.org', git_prefix='bugzilla')
account, password = credentials.read_credentials(use_stored_credentials=True)
if not account or not password:
raise Exception('Could not parse security tool output to get bugs.webkit.org credentials')
response = requests.get('https://bugs.webkit.org/rest/login', params={'login': account, 'password': password}).json()
global bugzilla_token
global bugzilla_self_user_id
bugzilla_token = response['token']
bugzilla_self_user_id = response['id']
atexit.register(invalidate_bugzilla_token)
def invalidate_bugzilla_token():
requests.get('https://bugs.webkit.org/rest/logout?' + bugzilla_token)
def get_user_info(id):
response = requests.get('https://bugs.webkit.org/rest/user/' + id,
params={'include_disabled': True, 'token': bugzilla_token}).json()
if len(response['users']) != 1:
raise Exception('Unexpected number of accounts found for ' + id + ': ' + str(len(response['users'])))
return response['users'][0]
def get_user_info_self():
global bugzilla_self_user_id
return get_user_info(str(bugzilla_self_user_id))
def can_use_this_tool():
my_groups = [x['name'] for x in get_user_info_self()['groups']]
return 'admin' in my_groups or ('editbugs' in my_groups and 'editusers' in my_groups and 'Spam-Masters' in my_groups and 'Security-Sensitive' in my_groups)
def disable_user(user_id, reason):
response = requests.put('https://bugs.webkit.org/rest/user/' + str(user_id),
params = {'email_enabled': 0, 'login_denied_text': reason, 'token': bugzilla_token})
response.raise_for_status()
def get_bugs_created_by_user(id):
response = requests.get('https://bugs.webkit.org/rest/bug',
params={'product': ['WebKit', 'Security'], 'email1': id, 'emailreporter1': '1', 'emailtype1': 'equals',
'include_fields': 'id,summary,product', 'token': bugzilla_token})
return response.json()['bugs']
def get_bugs_commented_on_by_user(id):
response = requests.get('https://bugs.webkit.org/rest/bug',
params={'product': ['WebKit', 'Security'], 'email1': id, 'emaillongdesc1': '1', 'emailtype1': 'equals',
'include_fields': 'id,summary,product', 'token': bugzilla_token})
return response.json()['bugs']
def get_comments(bug_id):
response = requests.get('https://bugs.webkit.org/rest/bug/' + str(bug_id) + '/comment', params={'token': bugzilla_token})
return response.json()['bugs'][str(bug_id)]["comments"]
def get_bugs_with_attachments_created_by_user(id):
response = requests.get('https://bugs.webkit.org/rest/bug',
params={'product': ['WebKit', 'Security'], 'f1': 'attachments.submitter', 'o1': 'equals', 'v1': id,
'include_fields': 'id,summary,product', 'token': bugzilla_token})
return response.json()['bugs']
def get_attachments(bug_id):
response = requests.get('https://bugs.webkit.org/rest/bug/' + str(bug_id) + '/attachment',
params={'include_fields': 'id,bug_id,creator,creation_time,summary,is_private', 'token': bugzilla_token})
return response.json()['bugs'][str(bug_id)]
def hide_bug(bug_id):
response = requests.put('https://bugs.webkit.org/rest/bug/' + str(bug_id),
json={'product': 'Spam', 'component': 'Spam', 'version': 'unspecified', 'is_creator_accessible': False, 'is_cc_accessible': False},
params = {'token': bugzilla_token})
if not response:
print('!!! Failed to move bug ' + str(bug_id) + ': ' + response.text)
def hide_comments(bug_id, comment_ids):
response = requests.put('https://bugs.webkit.org/rest/bug/' + str(bug_id),
json={'comment_is_private': {str(id): True for id in comment_ids}},
params={'token': bugzilla_token})
if not response:
print('!!! Failed to hide comments for bug ' + str(bug_id) + ': ' + response.text)
for comment_id in comment_ids:
response = requests.put('https://bugs.webkit.org/rest/bug/comment/' + str(comment_id) + '/tags',
json={'comment_id': comment_id, 'add': ['spam']},
params={'token': bugzilla_token})
if not response:
print('!!! Failed to mark comment with spam tag: ' + response.text)
def hide_attachments(bug_id, attachment_ids):
response = requests.put('https://bugs.webkit.org/rest/bug/attachment/1',
json={'ids': attachment_ids, 'is_private': True},
params={'token': bugzilla_token})
if not response:
print('!!! Failed to hide attachments for bug ' + str(bug_id) + ': ' + response.text)
def ask_yes_no(question, default='yes'):
if default is None:
prompt_string = ' [y/n] '
elif default == 'yes':
prompt_string = ' [Y/n] '
elif default == 'no':
prompt_string = ' [y/N] '
answers= {'yes': True, 'y': True, 'ye': True, 'no': False, 'n': False}
while True:
response = input(question + prompt_string).lower()
if default is not None and response == '':
return answers[default]
elif response in answers:
return answers[response]
def sanitized_string(string):
return string.translate(str.maketrans('', '', '\x1B'))
def main():
parser = argparse.ArgumentParser(description='Block Bugzilla spammers, and hide their comments.')
parser.add_argument('accounts', nargs='+', help='accounts to block')
args = parser.parse_args()
get_bugzilla_token()
if not can_use_this_tool():
print('You need to be a Bugzilla admin to use this tool.')
exit(1)
print('Fetching account activity...')
users_to_disable = []
for account_id in args.accounts:
user_info = get_user_info(account_id)
users_to_disable.append(user_info)
user_info['bugs_created'] = get_bugs_created_by_user(account_id)
created_bug_ids = [y['id'] for y in user_info['bugs_created']]
user_info['bugs_commented'] = [x for x in get_bugs_commented_on_by_user(account_id) if x['id'] not in created_bug_ids]
for bug in user_info['bugs_commented']:
bug['comments'] = get_comments(bug['id'])
user_info['bugs_with_attachments_added'] = [x for x in get_bugs_with_attachments_created_by_user(account_id) if x['id'] not in created_bug_ids]
for bug in user_info['bugs_with_attachments_added']:
bug['attachments'] = get_attachments(bug['id'])
for user in users_to_disable:
print(sanitized_string(user['real_name']) + ' <' + user['name'] + '>')
if not user['can_login']:
print('*** already disabled ***')
if user['bugs_created']:
print('Created ' + str(len(user['bugs_created'])) + ' bug(s):')
for bug in user['bugs_created']:
print('https://bugs.webkit.org/show_bug.cgi?id=' + str(bug['id']) + ' ' + sanitized_string(bug['summary']))
if user['bugs_commented']:
print('Commented on ' + str(len(user['bugs_commented'])) + ' bug(s):')
for bug in user['bugs_commented']:
print('https://bugs.webkit.org/show_bug.cgi?id=' + str(bug['id']) + ' ' + sanitized_string(bug['summary']))
for comment in bug['comments']:
if comment['creator'] == user['name']:
print('Comment ' + str(comment['count']) + ', ' + str(parse(comment['creation_time'])) + ': ' + sanitized_string(comment['text']))
if user['bugs_with_attachments_added']:
print('Added attachments to ' + str(len(user['bugs_with_attachments_added'])) + ' bug(s):')
for bug in user['bugs_with_attachments_added']:
print('https://bugs.webkit.org/show_bug.cgi?id=' + str(bug['id']) + ' ' + sanitized_string(bug['summary']))
for attachment in bug['attachments']:
if attachment['creator'] == user['name']:
print('Attachment ' + str(attachment['id']) + ', ' + str(parse(attachment['creation_time'])) + ': ' + sanitized_string(attachment['summary']))
print()
if not ask_yes_no("Block all these accounts, and hide their bugs and comments?"):
print("*** Exiting, no work performed ***")
exit(0)
# Block all of the users first, so that they don't receive e-mail notifications about further actions.
for user in users_to_disable:
if not user['can_login']:
print('User ' + str(user['name']) + ' is already disabled, skipping')
continue
print('Disabling user ' + str(user['name']))
disable_user(user['id'], 'spam')
for user in users_to_disable:
for bug in user['bugs_created']:
if bug['product'] == 'Spam':
print('Bug ' + str(bug['id']) + ' is already in the Spam product, skipping')
continue
print('Moving bug ' + str(bug['id']) + ' to the Spam product')
hide_bug(bug['id'])
for bug in user['bugs_commented']:
comments_to_hide = []
for comment in bug['comments']:
if comment['creator'] != user['name']:
continue
if comment['is_private']:
print('Comment ' + str(comment['count']) + ' on bug ' + str(bug['id']) + ' is already private, skipping')
continue
assert(comment['bug_id'] == bug['id'])
comments_to_hide.append(comment['id'])
if comments_to_hide:
print('Hiding comment(s) from user ' + str(user['name']) + ' on bug ' + str(bug['id']))
hide_comments(bug['id'], comments_to_hide)
for bug in user['bugs_with_attachments_added']:
attachments_to_hide = []
for attachment in bug['attachments']:
if attachment['creator'] != user['name']:
continue
if attachment['is_private']:
print('Attachment ' + str(attachment['id']) + ' on bug ' + str(bug['id']) + ' is already private, skipping')
continue
assert(attachment['bug_id'] == bug['id'])
attachments_to_hide.append(attachment['id'])
if attachments_to_hide:
print('Deleting attachment(s) from user ' + str(user['name']) + ' on bug ' + str(bug['id']))
hide_attachments(bug['id'], attachments_to_hide)
if __name__ == "__main__":
main()