Source code for compiler_gym.envs.llvm.datasets.csmith

# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import subprocess
from pathlib import Path
from typing import Iterable, List, Optional

import numpy as np

from compiler_gym.datasets import Benchmark, BenchmarkSource, Dataset
from compiler_gym.datasets.benchmark import BenchmarkWithSource
from compiler_gym.datasets.uri import BenchmarkUri
from compiler_gym.envs.llvm import llvm_benchmark
from compiler_gym.envs.llvm.llvm_benchmark import ClangInvocation
from compiler_gym.errors import BenchmarkInitError
from compiler_gym.service.proto import BenchmarkDynamicConfig, Command
from compiler_gym.util.commands import Popen, communicate
from compiler_gym.util.decorators import memoized_property
from compiler_gym.util.runfiles_path import runfiles_path
from compiler_gym.util.shell_format import plural
from compiler_gym.util.truncate import truncate

logger = logging.getLogger(__name__)

# The maximum value for the --seed argument to csmith.
UINT_MAX = (2**32) - 1

_CSMITH_BIN = runfiles_path("compiler_gym/third_party/csmith/csmith/bin/csmith")
_CSMITH_INCLUDES = runfiles_path(
    "compiler_gym/third_party/csmith/csmith/include/csmith-2.3.0"
)


class CsmithBenchmark(BenchmarkWithSource):
    """A CSmith benchmark."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._src = None
        self.proto.dynamic_config.MergeFrom(
            BenchmarkDynamicConfig(
                build_cmd=Command(
                    argument=["$CC", "$IN"] + llvm_benchmark.get_system_library_flags(),
                    outfile=["a.out"],
                    timeout_seconds=60,
                ),
                run_cmd=Command(
                    argument=["./a.out"],
                    timeout_seconds=300,
                ),
            )
        )

    @classmethod
    def create(cls, uri: str, bitcode: bytes, src: bytes) -> Benchmark:
        """Create a benchmark from paths."""
        benchmark = cls.from_file_contents(uri, bitcode)
        benchmark._src = src  # pylint: disable=protected-access
        return benchmark

    @memoized_property
    def sources(self) -> Iterable[BenchmarkSource]:
        return [
            BenchmarkSource(filename="source.c", contents=self._src),
        ]

    @property
    def source(self) -> str:
        """Return the single source file contents as a string."""
        return self._src.decode("utf-8")


[docs]class CsmithDataset(Dataset): """A dataset which uses Csmith to generate programs. Csmith is a tool that can generate random conformant C99 programs. It is described in the publication: Yang, Xuejun, Yang Chen, Eric Eide, and John Regehr. "Finding and understanding bugs in C compilers." In Proceedings of the 32nd ACM SIGPLAN conference on Programming Language Design and Implementation (PLDI), pp. 283-294. 2011. For up-to-date information about Csmith, see: https://embed.cs.utah.edu/csmith/ Note that Csmith is a tool that is used to find errors in compilers. As such, there is a higher likelihood that the benchmark cannot be used for an environment and that :meth:`env.reset() <compiler_gym.envs.CompilerEnv.reset>` will raise :class:`BenchmarkInitError <compiler_gym.datasets.BenchmarkInitError>`. """ def __init__( self, site_data_base: Path, sort_order: int = 0, csmith_bin: Optional[Path] = None, csmith_includes: Optional[Path] = None, ): """Constructor. :param site_data_base: The base path of a directory that will be used to store installed files. :param sort_order: An optional numeric value that should be used to order this dataset relative to others. Lowest value sorts first. :param csmith_bin: The path of the Csmith binary to use. If not provided, the version of Csmith shipped with CompilerGym is used. :param csmith_includes: The path of the Csmith includes directory. If not provided, the includes of the Csmith shipped with CompilerGym is used. """ super().__init__( name="generator://csmith-v0", description="Random conformant C99 programs", references={ "Paper": "http://web.cse.ohio-state.edu/~rountev.1/5343/pdf/pldi11.pdf", "Homepage": "https://embed.cs.utah.edu/csmith/", }, license="BSD", site_data_base=site_data_base, sort_order=sort_order, benchmark_class=CsmithBenchmark, ) self.csmith_bin_path = csmith_bin or _CSMITH_BIN self.csmith_includes_path = csmith_includes or _CSMITH_INCLUDES # The command that is used to compile an LLVM-IR bitcode file from a # Csmith input. Reads from stdin, writes to stdout. self.clang_compile_command: List[str] = ClangInvocation.from_c_file( "-", # Read from stdin. copt=[ "-xc", # The C programming language. "-ferror-limit=1", # Stop on first error. "-w", # No warnings. f"-I{self.csmith_includes_path}", # Include the Csmith headers. ], ).command( outpath="-" # Write to stdout. ) @property def size(self) -> int: # Actually 2^32 - 1, but practically infinite for all intents and # purposes. return 0 def benchmark_uris(self) -> Iterable[str]: return (f"{self.name}/{i}" for i in range(UINT_MAX)) def benchmark_from_parsed_uri(self, uri: BenchmarkUri) -> CsmithBenchmark: seed = int(uri.path[1:]) return self.benchmark_from_seed(seed) def _random_benchmark(self, random_state: np.random.Generator) -> Benchmark: seed = random_state.integers(UINT_MAX) return self.benchmark_from_seed(seed) def benchmark_from_seed( self, seed: int, max_retries: int = 3, retry_count: int = 0 ) -> CsmithBenchmark: """Get a benchmark from a uint32 seed. :param seed: A number in the range 0 <= n < 2^32. :return: A benchmark instance. :raises OSError: If Csmith fails. :raises BenchmarkInitError: If the C program generated by Csmith cannot be lowered to LLVM-IR. """ if retry_count >= max_retries: raise OSError( f"Csmith failed after {retry_count} {plural(retry_count, 'attempt', 'attempts')} " f"with seed {seed}" ) self.install() # Run csmith with the given seed and pipe the output to clang to # assemble a bitcode. logger.debug("Exec csmith --seed %d", seed) try: with Popen( [str(self.csmith_bin_path), "--seed", str(seed)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) as csmith: # Generate the C source. src, stderr = communicate(csmith, timeout=300) if csmith.returncode: try: stderr = "\n".join( truncate( stderr.decode("utf-8"), max_line_len=200, max_lines=20 ) ) logger.warning("Csmith failed with seed %d: %s", seed, stderr) except UnicodeDecodeError: # Failed to interpret the stderr output, generate a generic # error message. logger.warning("Csmith failed with seed %d", seed) return self.benchmark_from_seed( seed, max_retries=max_retries, retry_count=retry_count + 1 ) # Compile to IR. with Popen( self.clang_compile_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) as clang: stdout, _ = communicate(clang, input=src, timeout=300) if clang.returncode: compile_cmd = " ".join(self.clang_compile_command) raise BenchmarkInitError( f"Compilation job failed!\n" f"Csmith seed: {seed}\n" f"Command: {compile_cmd}\n" ) except subprocess.TimeoutExpired: raise BenchmarkInitError( f"Benchmark generation using seed {seed} timed out" ) return self.benchmark_class.create(f"{self.name}/{seed}", stdout, src)