blob: 1e06de25d2ff57dd2c7bf1db669ea16c9b408841 [file] [log] [blame]
# Copyright (C) 2019 Apple Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import calendar
import io
import json
import time
import zipfile
from cassandra.cqlengine import columns
from collections import defaultdict
from datetime import datetime
from resultsdbpy.controller.commit import Commit
from resultsdbpy.controller.configuration import Configuration
from resultsdbpy.model.commit_context import CommitContext
from resultsdbpy.model.configuration_context import ClusteredByConfiguration
class UploadContext(object):
QUEUE_NAME = 'upload_queue'
PROCESS_TIMEOUT = 24 * 60 * 60
MAX_ATTEMPTS = 3
RETRY_TIME = 5 * 60 # After 5 minutes, re-try a task even if it's in-flight
MAX_TASKS_IN_SCAN = 10
class SuitesByConfiguration(ClusteredByConfiguration):
__table_name__ = 'suites_by_configuration_and_branch'
suite = columns.Text(primary_key=True, required=True)
branch = columns.Text(primary_key=True, required=True)
class UploadsByConfiguration(ClusteredByConfiguration):
__table_name__ = 'uploads_by_configuration_01'
suite = columns.Text(partition_key=True, required=True)
branch = columns.Text(partition_key=True, required=True)
uuid = columns.BigInt(primary_key=True, required=True, clustering_order='DESC')
sdk = columns.Text(primary_key=True, required=True)
commits = columns.Blob(required=True)
test_results = columns.Blob(required=True)
time_uploaded = columns.DateTime(required=True)
upload_version = columns.Integer(required=True)
def unpack(self):
return dict(
commits=[Commit.from_json(element) for element in json.loads(UploadContext.from_zip(bytearray(self.commits)))],
sdk=None if self.sdk == '?' else self.sdk,
test_results=json.loads(UploadContext.from_zip(bytearray(self.test_results))),
timestamp=calendar.timegm(self.time_uploaded.timetuple()),
version=self.upload_version,
)
def __init__(self, configuration_context, commit_context, ttl_seconds=None, async_processing=False):
self.ttl_seconds = ttl_seconds
self.configuration_context = configuration_context
self.commit_context = commit_context
self.cassandra = self.configuration_context.cassandra
self._process_upload_callbacks = defaultdict(dict)
with self:
self.cassandra.create_table(self.SuitesByConfiguration)
self.cassandra.create_table(self.UploadsByConfiguration)
self._async_processing = async_processing
self.redis = self.configuration_context.redis
def __enter__(self):
self.configuration_context.__enter__()
self.commit_context.__enter__()
def __exit__(self, *args, **kwargs):
self.commit_context.__exit__(*args, **kwargs)
self.configuration_context.__exit__(*args, **kwargs)
def register_upload_callback(self, name, callback, suite=None):
# If no suite is specified, all uploads from all suites will trigger the callback
self._process_upload_callbacks[suite][name] = callback
@classmethod
def to_zip(cls, value, archive_name='archive'):
if not isinstance(value, str):
raise TypeError(f'Expected type {str}, got {type(value)}')
compressed_file = io.BytesIO()
with zipfile.ZipFile(compressed_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file:
zip_file.writestr(archive_name, value)
return bytearray(compressed_file.getvalue())
@classmethod
def from_zip(cls, value, archive_name='archive'):
if not isinstance(value, bytearray):
raise TypeError(f'Expected type {bytearray}, got {type(value)}')
compressed_file = io.BytesIO()
compressed_file.write(value)
with zipfile.ZipFile(compressed_file, mode='r') as zip_file:
return zip_file.read(archive_name).decode('utf-8')
def synchronously_process_test_results(self, configuration, commits, suite, test_results, timestamp=None):
timestamp = timestamp or time.time()
# Allows partial errors to be forwarded back to the caller
result = {}
for name, callback in self._process_upload_callbacks[suite].items():
result[name] = callback(configuration=configuration, commits=commits, suite=suite, test_results=test_results, timestamp=timestamp)
for name, callback in self._process_upload_callbacks[None].items():
result[name] = callback(configuration=configuration, commits=commits, suite=suite, test_results=test_results, timestamp=timestamp)
return result
def _find_job_with_attempts(self):
now = int(time.time())
are_jobs_left = False
will_attempt = 0
with self.redis.lock(name=f'lock_{self.QUEUE_NAME}', timeout=60):
for key in self.redis.scan_iter(match=f'{self.QUEUE_NAME}*', count=self.MAX_TASKS_IN_SCAN):
are_jobs_left = True
key = key.decode('utf-8')
try:
value = json.loads(self.redis.get(key))
if now > value.get('started_processing', 0) + self.RETRY_TIME:
will_attempt = value.get('attempts', 0) + 1
except Exception:
will_attempt = 1
if will_attempt:
self.redis.set(
key,
json.dumps(dict(started_processing=now, attempts=will_attempt)),
ex=self.PROCESS_TIMEOUT,
)
return are_jobs_left, key, will_attempt
return are_jobs_left, None, None
def _do_job_for_key(self, key, attempts=1):
job_complete = False
try:
data = json.loads(self.redis.get(f'data_for_{key}'))
self.synchronously_process_test_results(
configuration=Configuration.from_json(data['configuration']),
commits=[Commit.from_json(commit_json) for commit_json in data['commits']],
suite=data['suite'],
timestamp=data['timestamp'],
test_results=data['test_results'],
)
job_complete = True
finally:
if job_complete or attempts >= self.MAX_ATTEMPTS:
self.redis.delete(key)
self.redis.delete(f'data_for_{key}')
else:
self.redis.set(
key,
json.dumps(dict(started_processing=0, attempts=attempts)),
ex=self.PROCESS_TIMEOUT,
)
def do_processing_work(self):
jobs_left = True
while jobs_left:
jobs_left, key, attempts = self._find_job_with_attempts()
if key:
self._do_job_for_key(key, attempts=attempts)
elif jobs_left:
time.sleep(10) # There are jobs, but other workers are processing them.
def process_test_results(self, configuration, commits, suite, test_results, timestamp=None):
timestamp = timestamp or time.time()
if not self._async_processing:
return self.synchronously_process_test_results(configuration, commits, suite, test_results=test_results, timestamp=timestamp)
for branch in self.commit_context.branch_keys_for_commits(commits):
hash_key = hash(configuration) ^ hash(branch) ^ hash(self.commit_context.uuid_for_commits(commits)) ^ hash(
suite)
self.redis.set(
f'{self.QUEUE_NAME}:{hash_key}',
json.dumps(dict(started_processing=0, attempts=0)),
ex=self.PROCESS_TIMEOUT,
)
self.redis.set(
f'data_for_{self.QUEUE_NAME}:{hash_key}',
json.dumps(dict(
configuration=Configuration.Encoder().default(configuration),
suite=suite,
commits=Commit.Encoder().default(commits),
timestamp=timestamp,
test_results=test_results,
)),
ex=self.PROCESS_TIMEOUT,
)
return {key: dict(status='Queued') for key in list(self._process_upload_callbacks[suite].keys()) + list(self._process_upload_callbacks[None].keys())}
def upload_test_results(self, configuration, commits, suite, test_results, timestamp=None, version=0):
if not isinstance(suite, str):
raise TypeError(f'Expected type {str}, got {type(suite)}')
for commit in commits:
if not isinstance(commit, Commit):
raise TypeError(f'Expected type {Commit}, got {type(commit)}')
if len(commits) < 1:
raise ValueError('Each test result must have at least 1 associated commit')
if not isinstance(test_results, dict):
raise TypeError(f'Expected type {dict}, got {type(test_results)}')
timestamp = timestamp or time.time()
if not isinstance(timestamp, datetime):
timestamp = datetime.utcfromtimestamp(int(timestamp))
uuid = self.commit_context.uuid_for_commits(commits)
branches = self.commit_context.branch_keys_for_commits(commits)
with self:
for branch in branches:
self.configuration_context.register_configuration(configuration, branch=branch, timestamp=timestamp)
self.configuration_context.insert_row_with_configuration(
self.SuitesByConfiguration.__table_name__, configuration, suite=suite, branch=branch,
ttl=int((uuid // Commit.TIMESTAMP_TO_UUID_MULTIPLIER) + self.ttl_seconds - time.time()) if self.ttl_seconds else None,
)
self.configuration_context.insert_row_with_configuration(
self.UploadsByConfiguration.__table_name__, configuration=configuration,
suite=suite, branch=branch, uuid=uuid, sdk=configuration.sdk or '?', time_uploaded=timestamp,
commits=self.to_zip(json.dumps(commits, cls=Commit.Encoder)),
test_results=self.to_zip(json.dumps(test_results)),
upload_version=version,
ttl=int((uuid // Commit.TIMESTAMP_TO_UUID_MULTIPLIER) + self.ttl_seconds - time.time()) if self.ttl_seconds else None,
)
def find_test_results(self, configurations, suite, branch=None, begin=None, end=None, recent=True, limit=100):
if not isinstance(suite, str):
raise TypeError(f'Expected type {str}, got {type(suite)}')
with self:
result = {}
for configuration in configurations:
result.update({config: [value.unpack() for value in values] for config, values in self.configuration_context.select_from_table_with_configurations(
self.UploadsByConfiguration.__table_name__, configurations=[configuration], recent=recent,
suite=suite, sdk=configuration.sdk, branch=branch or self.commit_context.DEFAULT_BRANCH_KEY,
uuid__gte=CommitContext.convert_to_uuid(begin),
uuid__lte=CommitContext.convert_to_uuid(end, CommitContext.timestamp_to_uuid()), limit=limit,
).items()})
return result
def find_suites(self, configurations, branch=None, recent=True, limit=100):
with self:
return {
config: [row.suite for row in rows] for config, rows in self.configuration_context.select_from_table_with_configurations(
self.SuitesByConfiguration.__table_name__, configurations=configurations, branch=branch, recent=recent, limit=limit,
).items()
}
class UploadCallbackContext(object):
@classmethod
def partial_status(cls, exception=None):
if exception:
return dict(
status='error',
description=str(exception),
)
return dict(status='ok')
def __init__(self, name, configuration_context, commit_context, ttl_seconds=None):
self.name = name
self.configuration_context = configuration_context
self.commit_context = commit_context
self.cassandra = self.configuration_context.cassandra
self.ttl_seconds = ttl_seconds
def __enter__(self):
self.configuration_context.__enter__()
self.commit_context.__enter__()
def __exit__(self, *args, **kwargs):
self.commit_context.__exit__(*args, **kwargs)
self.configuration_context.__exit__(*args, **kwargs)
def register(self, *args, **kwargs):
return dict(status='error', description=f'No register implemented for {self.name}')