| #!/usr/bin/env python |
| |
| # Copyright (C) 2019 Apple Inc. All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions |
| # are met: |
| # |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of Apple Inc. ("Apple") nor the names of |
| # its contributors may be used to endorse or promote products derived |
| # from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY |
| # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
| # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
| # DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY |
| # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
| # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
| # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
| # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
| # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| import argparse |
| import atexit |
| import logging |
| import requests |
| import os |
| import sys |
| from dateutil.parser import parse |
| from webkitpy.common.net.credentials import Credentials |
| |
| _log = logging.getLogger(os.path.basename(__file__)) |
| logging.basicConfig(level=logging.INFO, format='%(message)s') |
| |
| bugzilla_self_user_id = None |
| bugzilla_token = None |
| |
| def get_bugzilla_token(): |
| credentials = Credentials('bugs.webkit.org', git_prefix='bugzilla') |
| account, password = credentials.read_credentials(use_stored_credentials=True) |
| if not account or not password: |
| raise Exception('Could not parse security tool output to get bugs.webkit.org credentials') |
| |
| response = requests.get('https://bugs.webkit.org/rest/login', params={'login': account, 'password': password}).json() |
| global bugzilla_token |
| global bugzilla_self_user_id |
| bugzilla_token = response['token'] |
| bugzilla_self_user_id = response['id'] |
| atexit.register(invalidate_bugzilla_token) |
| |
| |
| def invalidate_bugzilla_token(): |
| requests.get('https://bugs.webkit.org/rest/logout?' + bugzilla_token) |
| |
| |
| def get_user_info(id): |
| response = requests.get('https://bugs.webkit.org/rest/user/' + id, |
| params={'include_disabled': True, 'token': bugzilla_token}).json() |
| if len(response['users']) != 1: |
| raise Exception('Unexpected number of accounts found for ' + id + ': ' + str(len(response['users']))) |
| return response['users'][0] |
| |
| |
| def get_user_info_self(): |
| global bugzilla_self_user_id |
| return get_user_info(str(bugzilla_self_user_id)) |
| |
| |
| def can_use_this_tool(): |
| my_groups = [x['name'] for x in get_user_info_self()['groups']] |
| return 'admin' in my_groups or ('editbugs' in my_groups and 'editusers' in my_groups and 'Spam-Masters' in my_groups and 'Security-Sensitive' in my_groups) |
| |
| |
| def disable_user(user_id, reason): |
| response = requests.put('https://bugs.webkit.org/rest/user/' + str(user_id), |
| params = {'email_enabled': 0, 'login_denied_text': reason, 'token': bugzilla_token}) |
| response.raise_for_status() |
| |
| |
| def get_bugs_created_by_user(id): |
| response = requests.get('https://bugs.webkit.org/rest/bug', |
| params={'product': ['WebKit', 'Security'], 'email1': id, 'emailreporter1': '1', 'emailtype1': 'equals', |
| 'include_fields': 'id,summary,product', 'token': bugzilla_token}) |
| return response.json()['bugs'] |
| |
| |
| def get_bugs_commented_on_by_user(id): |
| response = requests.get('https://bugs.webkit.org/rest/bug', |
| params={'product': ['WebKit', 'Security'], 'email1': id, 'emaillongdesc1': '1', 'emailtype1': 'equals', |
| 'include_fields': 'id,summary,product', 'token': bugzilla_token}) |
| return response.json()['bugs'] |
| |
| |
| def get_comments(bug_id): |
| response = requests.get('https://bugs.webkit.org/rest/bug/' + str(bug_id) + '/comment', params={'token': bugzilla_token}) |
| return response.json()['bugs'][str(bug_id)]["comments"] |
| |
| |
| def get_bugs_with_attachments_created_by_user(id): |
| response = requests.get('https://bugs.webkit.org/rest/bug', |
| params={'product': ['WebKit', 'Security'], 'f1': 'attachments.submitter', 'o1': 'equals', 'v1': id, |
| 'include_fields': 'id,summary,product', 'token': bugzilla_token}) |
| return response.json()['bugs'] |
| |
| |
| def get_attachments(bug_id): |
| response = requests.get('https://bugs.webkit.org/rest/bug/' + str(bug_id) + '/attachment', |
| params={'include_fields': 'id,bug_id,creator,creation_time,summary,is_private', 'token': bugzilla_token}) |
| return response.json()['bugs'][str(bug_id)] |
| |
| |
| def hide_bug(bug_id): |
| response = requests.put('https://bugs.webkit.org/rest/bug/' + str(bug_id), |
| json={'product': 'Spam', 'component': 'Spam', 'version': 'unspecified', 'is_creator_accessible': False, 'is_cc_accessible': False}, |
| params = {'token': bugzilla_token}) |
| if not response: |
| print '!!! Failed to move bug ' + str(bug_id) + ': ' + response.text |
| |
| |
| def hide_comments(bug_id, comment_ids): |
| response = requests.put('https://bugs.webkit.org/rest/bug/' + str(bug_id), |
| json={'comment_is_private': {str(id): True for id in comment_ids}}, |
| params={'token': bugzilla_token}) |
| if not response: |
| print '!!! Failed to hide comments for bug ' + str(bug_id) + ': ' + response.text |
| for comment_id in comment_ids: |
| response = requests.put('https://bugs.webkit.org/rest/bug/comment/' + str(comment_id) + '/tags', |
| json={'comment_id': comment_id, 'add': ['spam']}, |
| params={'token': bugzilla_token}) |
| if not response: |
| print '!!! Failed to mark comment with spam tag: ' + response.text |
| |
| def hide_attachments(bug_id, attachment_ids): |
| response = requests.put('https://bugs.webkit.org/rest/bug/attachment/1', |
| json={'ids': attachment_ids, 'is_private': True}, |
| params={'token': bugzilla_token}) |
| if not response: |
| print '!!! Failed to hide attachments for bug ' + str(bug_id) + ': ' + response.text |
| |
| |
| def ask_yes_no(question, default='yes'): |
| if default is None: |
| prompt_string = ' [y/n] ' |
| elif default == 'yes': |
| prompt_string = ' [Y/n] ' |
| elif default == 'no': |
| prompt_string = ' [y/N] ' |
| |
| answers= {'yes': True, 'y': True, 'ye': True, 'no': False, 'n': False} |
| while True: |
| sys.stdout.write(question + prompt_string) |
| response = raw_input().lower() |
| if default is not None and response == '': |
| return answers[default] |
| elif response in answers: |
| return answers[response] |
| |
| |
| def sanitized_string(string): |
| return string.encode('utf-8').translate(None, '\x1B') |
| |
| def main(): |
| parser = argparse.ArgumentParser(description='Block Bugzilla spammers, and hide their comments.') |
| parser.add_argument('accounts', nargs='+', help='accounts to block') |
| args = parser.parse_args() |
| |
| get_bugzilla_token() |
| |
| if not can_use_this_tool(): |
| print 'You need to be a Bugzilla admin to use this tool.' |
| exit(1) |
| |
| print 'Fetching account activity...' |
| |
| users_to_disable = [] |
| for account_id in args.accounts: |
| user_info = get_user_info(account_id) |
| users_to_disable.append(user_info) |
| user_info['bugs_created'] = get_bugs_created_by_user(account_id) |
| created_bug_ids = [y['id'] for y in user_info['bugs_created']] |
| user_info['bugs_commented'] = [x for x in get_bugs_commented_on_by_user(account_id) if x['id'] not in created_bug_ids] |
| for bug in user_info['bugs_commented']: |
| bug['comments'] = get_comments(bug['id']) |
| user_info['bugs_with_attachments_added'] = [x for x in get_bugs_with_attachments_created_by_user(account_id) if x['id'] not in created_bug_ids] |
| for bug in user_info['bugs_with_attachments_added']: |
| bug['attachments'] = get_attachments(bug['id']) |
| |
| for user in users_to_disable: |
| print sanitized_string(user['real_name']) + ' <' + user['name'] + '>' |
| if not user['can_login']: |
| print '*** already disabled ***' |
| if user['bugs_created']: |
| print 'Created ' + str(len(user['bugs_created'])) + ' bug(s):' |
| for bug in user['bugs_created']: |
| print 'https://bugs.webkit.org/show_bug.cgi?id=' + str(bug['id']) + ' ' + sanitized_string(bug['summary']) |
| if user['bugs_commented']: |
| print 'Commented on ' + str(len(user['bugs_commented'])) + ' bug(s):' |
| for bug in user['bugs_commented']: |
| print 'https://bugs.webkit.org/show_bug.cgi?id=' + str(bug['id']) + ' ' + sanitized_string(bug['summary']) |
| for comment in bug['comments']: |
| if comment['creator'] == user['name']: |
| print 'Comment ' + str(comment['count']) + ', ' + str(parse(comment['creation_time'])) + ': ' + sanitized_string(comment['text']) |
| if user['bugs_with_attachments_added']: |
| print 'Added attachments to ' + str(len(user['bugs_with_attachments_added'])) + ' bug(s):' |
| for bug in user['bugs_with_attachments_added']: |
| print 'https://bugs.webkit.org/show_bug.cgi?id=' + str(bug['id']) + ' ' + sanitized_string(bug['summary']) |
| for attachment in bug['attachments']: |
| if attachment['creator'] == user['name']: |
| print 'Attachment ' + str(attachment['id']) + ', ' + str(parse(attachment['creation_time'])) + ': ' + sanitized_string(attachment['summary']) |
| print |
| |
| if not ask_yes_no("Block all these accounts, and hide their bugs and comments?"): |
| print "*** Exiting, no work performed ***" |
| exit(0) |
| |
| # Block all of the users first, so that they don't receive e-mail notifications about further actions. |
| for user in users_to_disable: |
| if not user['can_login']: |
| print 'User ' + str(user['name']) + ' is already disabled, skipping' |
| continue |
| print 'Disabling user ' + str(user['name']) |
| disable_user(user['id'], 'spam') |
| |
| for user in users_to_disable: |
| for bug in user['bugs_created']: |
| if bug['product'] == 'Spam': |
| print 'Bug ' + str(bug['id']) + ' is already in the Spam product, skipping' |
| continue |
| print 'Moving bug ' + str(bug['id']) + ' to the Spam product' |
| hide_bug(bug['id']) |
| |
| for bug in user['bugs_commented']: |
| comments_to_hide = [] |
| for comment in bug['comments']: |
| if comment['creator'] != user['name']: |
| continue |
| if comment['is_private']: |
| print 'Comment ' + str(comment['count']) + ' on bug ' + str(bug['id']) + ' is already private, skipping' |
| continue |
| assert(comment['bug_id'] == bug['id']) |
| comments_to_hide.append(comment['id']) |
| if comments_to_hide: |
| print 'Hiding comment(s) from user ' + str(user['name']) + ' on bug ' + str(bug['id']) |
| hide_comments(bug['id'], comments_to_hide) |
| |
| for bug in user['bugs_with_attachments_added']: |
| attachments_to_hide = [] |
| for attachment in bug['attachments']: |
| if attachment['creator'] != user['name']: |
| continue |
| if attachment['is_private']: |
| print 'Attachment ' + str(attachment['id']) + ' on bug ' + str(bug['id']) + ' is already private, skipping' |
| continue |
| assert(attachment['bug_id'] == bug['id']) |
| attachments_to_hide.append(attachment['id']) |
| if attachments_to_hide: |
| print 'Deleting attachment(s) from user ' + str(user['name']) + ' on bug ' + str(bug['id']) |
| hide_attachments(bug['id'], attachments_to_hide) |
| |
| |
| if __name__ == "__main__": |
| main() |