| import hashlib |
| import os |
| import urlparse |
| from abc import ABCMeta, abstractmethod |
| from Queue import Empty |
| from collections import defaultdict, OrderedDict, deque |
| from multiprocessing import Queue |
| |
| import manifestinclude |
| import manifestexpected |
| import wpttest |
| from mozlog import structured |
| |
| manifest = None |
| manifest_update = None |
| download_from_github = None |
| |
| def do_delayed_imports(): |
| # This relies on an already loaded module having set the sys.path correctly :( |
| global manifest, manifest_update, download_from_github |
| from manifest import manifest |
| from manifest import update as manifest_update |
| from manifest.download import download_from_github |
| |
| |
| class TestChunker(object): |
| def __init__(self, total_chunks, chunk_number): |
| self.total_chunks = total_chunks |
| self.chunk_number = chunk_number |
| assert self.chunk_number <= self.total_chunks |
| self.logger = structured.get_default_logger() |
| assert self.logger |
| |
| def __call__(self, manifest): |
| raise NotImplementedError |
| |
| |
| class Unchunked(TestChunker): |
| def __init__(self, *args, **kwargs): |
| TestChunker.__init__(self, *args, **kwargs) |
| assert self.total_chunks == 1 |
| |
| def __call__(self, manifest): |
| for item in manifest: |
| yield item |
| |
| |
| class HashChunker(TestChunker): |
| def __call__(self, manifest): |
| chunk_index = self.chunk_number - 1 |
| for test_type, test_path, tests in manifest: |
| h = int(hashlib.md5(test_path).hexdigest(), 16) |
| if h % self.total_chunks == chunk_index: |
| yield test_type, test_path, tests |
| |
| |
| class DirectoryHashChunker(TestChunker): |
| """Like HashChunker except the directory is hashed. |
| |
| This ensures that all tests in the same directory end up in the same |
| chunk. |
| """ |
| def __call__(self, manifest): |
| chunk_index = self.chunk_number - 1 |
| for test_type, test_path, tests in manifest: |
| h = int(hashlib.md5(os.path.dirname(test_path)).hexdigest(), 16) |
| if h % self.total_chunks == chunk_index: |
| yield test_type, test_path, tests |
| |
| |
| class EqualTimeChunker(TestChunker): |
| def _group_by_directory(self, manifest_items): |
| """Split the list of manifest items into a ordered dict that groups tests in |
| so that anything in the same subdirectory beyond a depth of 3 is in the same |
| group. So all tests in a/b/c, a/b/c/d and a/b/c/e will be grouped together |
| and separate to tests in a/b/f |
| |
| Returns: tuple (ordered dict of {test_dir: PathData}, total estimated runtime) |
| """ |
| |
| class PathData(object): |
| def __init__(self, path): |
| self.path = path |
| self.time = 0 |
| self.tests = [] |
| |
| by_dir = OrderedDict() |
| total_time = 0 |
| |
| for i, (test_type, test_path, tests) in enumerate(manifest_items): |
| test_dir = tuple(os.path.split(test_path)[0].split(os.path.sep)[:3]) |
| |
| if test_dir not in by_dir: |
| by_dir[test_dir] = PathData(test_dir) |
| |
| data = by_dir[test_dir] |
| time = sum(test.default_timeout if test.timeout != |
| "long" else test.long_timeout for test in tests) |
| data.time += time |
| total_time += time |
| data.tests.append((test_type, test_path, tests)) |
| |
| return by_dir, total_time |
| |
| def _maybe_remove(self, chunks, i, direction): |
| """Trial removing a chunk from one chunk to an adjacent one. |
| |
| :param chunks: - the list of all chunks |
| :param i: - the chunk index in the list of chunks to try removing from |
| :param direction: either "next" if we are going to move from the end to |
| the subsequent chunk, or "prev" if we are going to move |
| from the start into the previous chunk. |
| |
| :returns bool: Did a chunk get moved?""" |
| source_chunk = chunks[i] |
| if direction == "next": |
| target_chunk = chunks[i+1] |
| path_index = -1 |
| move_func = lambda: target_chunk.appendleft(source_chunk.pop()) |
| elif direction == "prev": |
| target_chunk = chunks[i-1] |
| path_index = 0 |
| move_func = lambda: target_chunk.append(source_chunk.popleft()) |
| else: |
| raise ValueError("Unexpected move direction %s" % direction) |
| |
| return self._maybe_move(source_chunk, target_chunk, path_index, move_func) |
| |
| def _maybe_add(self, chunks, i, direction): |
| """Trial adding a chunk from one chunk to an adjacent one. |
| |
| :param chunks: - the list of all chunks |
| :param i: - the chunk index in the list of chunks to try adding to |
| :param direction: either "next" if we are going to remove from the |
| the subsequent chunk, or "prev" if we are going to remove |
| from the the previous chunk. |
| |
| :returns bool: Did a chunk get moved?""" |
| target_chunk = chunks[i] |
| if direction == "next": |
| source_chunk = chunks[i+1] |
| path_index = 0 |
| move_func = lambda: target_chunk.append(source_chunk.popleft()) |
| elif direction == "prev": |
| source_chunk = chunks[i-1] |
| path_index = -1 |
| move_func = lambda: target_chunk.appendleft(source_chunk.pop()) |
| else: |
| raise ValueError("Unexpected move direction %s" % direction) |
| |
| return self._maybe_move(source_chunk, target_chunk, path_index, move_func) |
| |
| def _maybe_move(self, source_chunk, target_chunk, path_index, move_func): |
| """Move from one chunk to another, assess the change in badness, |
| and keep the move iff it decreases the badness score. |
| |
| :param source_chunk: chunk to move from |
| :param target_chunk: chunk to move to |
| :param path_index: 0 if we are moving from the start or -1 if we are moving from the |
| end |
| :param move_func: Function that actually moves between chunks""" |
| if len(source_chunk.paths) <= 1: |
| return False |
| |
| move_time = source_chunk.paths[path_index].time |
| |
| new_source_badness = self._badness(source_chunk.time - move_time) |
| new_target_badness = self._badness(target_chunk.time + move_time) |
| |
| delta_badness = ((new_source_badness + new_target_badness) - |
| (source_chunk.badness + target_chunk.badness)) |
| if delta_badness < 0: |
| move_func() |
| return True |
| |
| return False |
| |
| def _badness(self, time): |
| """Metric of badness for a specific chunk |
| |
| :param time: the time for a specific chunk""" |
| return (time - self.expected_time)**2 |
| |
| def _get_chunk(self, manifest_items): |
| by_dir, total_time = self._group_by_directory(manifest_items) |
| |
| if len(by_dir) < self.total_chunks: |
| raise ValueError("Tried to split into %i chunks, but only %i subdirectories included" % ( |
| self.total_chunks, len(by_dir))) |
| |
| self.expected_time = float(total_time) / self.total_chunks |
| |
| chunks = self._create_initial_chunks(by_dir) |
| |
| while True: |
| # Move a test from one chunk to the next until doing so no longer |
| # reduces the badness |
| got_improvement = self._update_chunks(chunks) |
| if not got_improvement: |
| break |
| |
| self.logger.debug(self.expected_time) |
| for i, chunk in chunks.iteritems(): |
| self.logger.debug("%i: %i, %i" % (i + 1, chunk.time, chunk.badness)) |
| |
| assert self._all_tests(by_dir) == self._chunked_tests(chunks) |
| |
| return self._get_tests(chunks) |
| |
| @staticmethod |
| def _all_tests(by_dir): |
| """Return a set of all tests in the manifest from a grouping by directory""" |
| return set(x[0] for item in by_dir.itervalues() |
| for x in item.tests) |
| |
| @staticmethod |
| def _chunked_tests(chunks): |
| """Return a set of all tests in the manifest from the chunk list""" |
| return set(x[0] for chunk in chunks.itervalues() |
| for path in chunk.paths |
| for x in path.tests) |
| |
| |
| def _create_initial_chunks(self, by_dir): |
| """Create an initial unbalanced list of chunks. |
| |
| :param by_dir: All tests in the manifest grouped by subdirectory |
| :returns list: A list of Chunk objects""" |
| |
| class Chunk(object): |
| def __init__(self, paths, index): |
| """List of PathData objects that together form a single chunk of |
| tests""" |
| self.paths = deque(paths) |
| self.time = sum(item.time for item in paths) |
| self.index = index |
| |
| def appendleft(self, path): |
| """Add a PathData object to the start of the chunk""" |
| self.paths.appendleft(path) |
| self.time += path.time |
| |
| def append(self, path): |
| """Add a PathData object to the end of the chunk""" |
| self.paths.append(path) |
| self.time += path.time |
| |
| def pop(self): |
| """Remove PathData object from the end of the chunk""" |
| assert len(self.paths) > 1 |
| self.time -= self.paths[-1].time |
| return self.paths.pop() |
| |
| def popleft(self): |
| """Remove PathData object from the start of the chunk""" |
| assert len(self.paths) > 1 |
| self.time -= self.paths[0].time |
| return self.paths.popleft() |
| |
| @property |
| def badness(self_): # noqa: N805 |
| """Badness metric for this chunk""" |
| return self._badness(self_.time) |
| |
| initial_size = len(by_dir) / self.total_chunks |
| chunk_boundaries = [initial_size * i |
| for i in xrange(self.total_chunks)] + [len(by_dir)] |
| |
| chunks = OrderedDict() |
| for i, lower in enumerate(chunk_boundaries[:-1]): |
| upper = chunk_boundaries[i + 1] |
| paths = by_dir.values()[lower:upper] |
| chunks[i] = Chunk(paths, i) |
| |
| assert self._all_tests(by_dir) == self._chunked_tests(chunks) |
| |
| return chunks |
| |
| def _update_chunks(self, chunks): |
| """Run a single iteration of the chunk update algorithm. |
| |
| :param chunks: - List of chunks |
| """ |
| #TODO: consider replacing this with a heap |
| sorted_chunks = sorted(chunks.values(), key=lambda x:-x.badness) |
| got_improvement = False |
| for chunk in sorted_chunks: |
| if chunk.time < self.expected_time: |
| f = self._maybe_add |
| else: |
| f = self._maybe_remove |
| |
| if chunk.index == 0: |
| order = ["next"] |
| elif chunk.index == self.total_chunks - 1: |
| order = ["prev"] |
| else: |
| if chunk.time < self.expected_time: |
| # First try to add a test from the neighboring chunk with the |
| # greatest total time |
| if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: |
| order = ["next", "prev"] |
| else: |
| order = ["prev", "next"] |
| else: |
| # First try to remove a test and add to the neighboring chunk with the |
| # lowest total time |
| if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: |
| order = ["prev", "next"] |
| else: |
| order = ["next", "prev"] |
| |
| for direction in order: |
| if f(chunks, chunk.index, direction): |
| got_improvement = True |
| break |
| |
| if got_improvement: |
| break |
| |
| return got_improvement |
| |
| def _get_tests(self, chunks): |
| """Return the list of tests corresponding to the chunk number we are running. |
| |
| :param chunks: List of chunks""" |
| tests = [] |
| for path in chunks[self.chunk_number - 1].paths: |
| tests.extend(path.tests) |
| |
| return tests |
| |
| def __call__(self, manifest_iter): |
| manifest = list(manifest_iter) |
| tests = self._get_chunk(manifest) |
| for item in tests: |
| yield item |
| |
| |
| class TestFilter(object): |
| def __init__(self, test_manifests, include=None, exclude=None, manifest_path=None): |
| if manifest_path is not None and include is None: |
| self.manifest = manifestinclude.get_manifest(manifest_path) |
| else: |
| self.manifest = manifestinclude.IncludeManifest.create() |
| self.manifest.set_defaults() |
| |
| if include: |
| self.manifest.set("skip", "true") |
| for item in include: |
| self.manifest.add_include(test_manifests, item) |
| |
| if exclude: |
| for item in exclude: |
| self.manifest.add_exclude(test_manifests, item) |
| |
| def __call__(self, manifest_iter): |
| for test_type, test_path, tests in manifest_iter: |
| include_tests = set() |
| for test in tests: |
| if self.manifest.include(test): |
| include_tests.add(test) |
| |
| if include_tests: |
| yield test_type, test_path, include_tests |
| |
| |
| class TagFilter(object): |
| def __init__(self, tags): |
| self.tags = set(tags) |
| |
| def __call__(self, test_iter): |
| for test in test_iter: |
| if test.tags & self.tags: |
| yield test |
| |
| |
| class ManifestLoader(object): |
| def __init__(self, test_paths, force_manifest_update=False, manifest_download=False, |
| types=None, meta_filters=None): |
| do_delayed_imports() |
| self.test_paths = test_paths |
| self.force_manifest_update = force_manifest_update |
| self.manifest_download = manifest_download |
| self.types = types |
| self.logger = structured.get_default_logger() |
| self.meta_filters = meta_filters |
| if self.logger is None: |
| self.logger = structured.structuredlog.StructuredLogger("ManifestLoader") |
| |
| def load(self): |
| rv = {} |
| for url_base, paths in self.test_paths.iteritems(): |
| manifest_file = self.load_manifest(url_base=url_base, |
| **paths) |
| path_data = {"url_base": url_base} |
| path_data.update(paths) |
| rv[manifest_file] = path_data |
| return rv |
| |
| def load_manifest(self, tests_path, manifest_path, metadata_path, url_base="/", **kwargs): |
| cache_root = os.path.join(metadata_path, ".cache") |
| if self.manifest_download: |
| download_from_github(manifest_path, tests_path) |
| return manifest.load_and_update(tests_path, manifest_path, url_base, |
| cache_root=cache_root, update=self.force_manifest_update, |
| meta_filters=self.meta_filters) |
| |
| |
| def iterfilter(filters, iter): |
| for f in filters: |
| iter = f(iter) |
| for item in iter: |
| yield item |
| |
| |
| class TestLoader(object): |
| def __init__(self, |
| test_manifests, |
| test_types, |
| run_info, |
| manifest_filters=None, |
| meta_filters=None, |
| chunk_type="none", |
| total_chunks=1, |
| chunk_number=1, |
| include_https=True, |
| skip_timeout=False): |
| |
| self.test_types = test_types |
| self.run_info = run_info |
| |
| self.manifest_filters = manifest_filters if manifest_filters is not None else [] |
| self.meta_filters = meta_filters if meta_filters is not None else [] |
| |
| self.manifests = test_manifests |
| self.tests = None |
| self.disabled_tests = None |
| self.include_https = include_https |
| self.skip_timeout = skip_timeout |
| |
| self.chunk_type = chunk_type |
| self.total_chunks = total_chunks |
| self.chunk_number = chunk_number |
| |
| self.chunker = {"none": Unchunked, |
| "hash": HashChunker, |
| "dir_hash": DirectoryHashChunker, |
| "equal_time": EqualTimeChunker}[chunk_type](total_chunks, |
| chunk_number) |
| |
| self._test_ids = None |
| |
| self.directory_manifests = {} |
| |
| self._load_tests() |
| |
| @property |
| def test_ids(self): |
| if self._test_ids is None: |
| self._test_ids = [] |
| for test_dict in [self.disabled_tests, self.tests]: |
| for test_type in self.test_types: |
| self._test_ids += [item.id for item in test_dict[test_type]] |
| return self._test_ids |
| |
| def get_test(self, manifest_file, manifest_test, inherit_metadata, test_metadata): |
| if test_metadata is not None: |
| inherit_metadata.append(test_metadata) |
| test_metadata = test_metadata.get_test(manifest_test.id) |
| |
| return wpttest.from_manifest(manifest_file, manifest_test, inherit_metadata, test_metadata) |
| |
| def load_dir_metadata(self, test_manifest, metadata_path, test_path): |
| rv = [] |
| path_parts = os.path.dirname(test_path).split(os.path.sep) |
| for i in xrange(len(path_parts) + 1): |
| path = os.path.join(metadata_path, os.path.sep.join(path_parts[:i]), "__dir__.ini") |
| if path not in self.directory_manifests: |
| self.directory_manifests[path] = manifestexpected.get_dir_manifest(path, |
| self.run_info) |
| manifest = self.directory_manifests[path] |
| if manifest is not None: |
| rv.append(manifest) |
| return rv |
| |
| def load_metadata(self, test_manifest, metadata_path, test_path): |
| inherit_metadata = self.load_dir_metadata(test_manifest, metadata_path, test_path) |
| test_metadata = manifestexpected.get_manifest( |
| metadata_path, test_path, test_manifest.url_base, self.run_info) |
| return inherit_metadata, test_metadata |
| |
| def iter_tests(self): |
| manifest_items = [] |
| manifests_by_url_base = {} |
| |
| for manifest in sorted(self.manifests.keys(), key=lambda x:x.url_base): |
| manifest_iter = iterfilter(self.manifest_filters, |
| manifest.itertypes(*self.test_types)) |
| manifest_items.extend(manifest_iter) |
| manifests_by_url_base[manifest.url_base] = manifest |
| |
| if self.chunker is not None: |
| manifest_items = self.chunker(manifest_items) |
| |
| for test_type, test_path, tests in manifest_items: |
| manifest_file = manifests_by_url_base[iter(tests).next().url_base] |
| metadata_path = self.manifests[manifest_file]["metadata_path"] |
| |
| inherit_metadata, test_metadata = self.load_metadata(manifest_file, metadata_path, test_path) |
| for test in tests: |
| yield test_path, test_type, self.get_test(manifest_file, test, inherit_metadata, test_metadata) |
| |
| def _load_tests(self): |
| """Read in the tests from the manifest file and add them to a queue""" |
| tests = {"enabled":defaultdict(list), |
| "disabled":defaultdict(list)} |
| |
| for test_path, test_type, test in self.iter_tests(): |
| enabled = not test.disabled() |
| if not self.include_https and test.environment["protocol"] == "https": |
| enabled = False |
| if self.skip_timeout and test.expected() == "TIMEOUT": |
| enabled = False |
| key = "enabled" if enabled else "disabled" |
| tests[key][test_type].append(test) |
| |
| self.tests = tests["enabled"] |
| self.disabled_tests = tests["disabled"] |
| |
| def groups(self, test_types, chunk_type="none", total_chunks=1, chunk_number=1): |
| groups = set() |
| |
| for test_type in test_types: |
| for test in self.tests[test_type]: |
| group = test.url.split("/")[1] |
| groups.add(group) |
| |
| return groups |
| |
| |
| class TestSource(object): |
| __metaclass__ = ABCMeta |
| |
| def __init__(self, test_queue): |
| self.test_queue = test_queue |
| self.current_group = None |
| self.current_metadata = None |
| |
| @abstractmethod |
| # noqa: N805 |
| #@classmethod (doesn't compose with @abstractmethod) |
| def make_queue(cls, tests, **kwargs): |
| pass |
| |
| @classmethod |
| def group_metadata(cls, state): |
| return {"scope": "/"} |
| |
| def group(self): |
| if not self.current_group or len(self.current_group) == 0: |
| try: |
| self.current_group, self.current_metadata = self.test_queue.get(block=False) |
| except Empty: |
| return None, None |
| return self.current_group, self.current_metadata |
| |
| |
| class GroupedSource(TestSource): |
| @classmethod |
| def new_group(cls, state, test, **kwargs): |
| raise NotImplementedError |
| |
| @classmethod |
| def make_queue(cls, tests, **kwargs): |
| test_queue = Queue() |
| groups = [] |
| |
| state = {} |
| |
| for test in tests: |
| if cls.new_group(state, test, **kwargs): |
| group_metadata = cls.group_metadata(state) |
| groups.append((deque(), group_metadata)) |
| |
| group, metadata = groups[-1] |
| group.append(test) |
| test.update_metadata(metadata) |
| |
| for item in groups: |
| test_queue.put(item) |
| return test_queue |
| |
| |
| class SingleTestSource(TestSource): |
| @classmethod |
| def make_queue(cls, tests, **kwargs): |
| test_queue = Queue() |
| processes = kwargs["processes"] |
| queues = [deque([]) for _ in xrange(processes)] |
| metadatas = [cls.group_metadata(None) for _ in xrange(processes)] |
| for test in tests: |
| idx = hash(test.id) % processes |
| group = queues[idx] |
| metadata = metadatas[idx] |
| group.append(test) |
| test.update_metadata(metadata) |
| |
| for item in zip(queues, metadatas): |
| test_queue.put(item) |
| |
| return test_queue |
| |
| |
| class PathGroupedSource(GroupedSource): |
| @classmethod |
| def new_group(cls, state, test, **kwargs): |
| depth = kwargs.get("depth") |
| if depth is True or depth == 0: |
| depth = None |
| path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth] |
| rv = path != state.get("prev_path") |
| state["prev_path"] = path |
| return rv |
| |
| @classmethod |
| def group_metadata(cls, state): |
| return {"scope": "/%s" % "/".join(state["prev_path"])} |