# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import subprocess
from pathlib import Path
from typing import Iterable, List, Optional
import numpy as np
from compiler_gym.datasets import Benchmark, BenchmarkSource, Dataset
from compiler_gym.datasets.benchmark import BenchmarkWithSource
from compiler_gym.datasets.uri import BenchmarkUri
from compiler_gym.envs.llvm import llvm_benchmark
from compiler_gym.envs.llvm.llvm_benchmark import ClangInvocation
from compiler_gym.errors import BenchmarkInitError
from compiler_gym.service.proto import BenchmarkDynamicConfig, Command
from compiler_gym.util.commands import Popen, communicate
from compiler_gym.util.decorators import memoized_property
from compiler_gym.util.runfiles_path import runfiles_path
from compiler_gym.util.shell_format import plural
from compiler_gym.util.truncate import truncate
logger = logging.getLogger(__name__)
# The maximum value for the --seed argument to csmith.
UINT_MAX = (2**32) - 1
_CSMITH_BIN = runfiles_path("compiler_gym/third_party/csmith/csmith/bin/csmith")
_CSMITH_INCLUDES = runfiles_path(
"compiler_gym/third_party/csmith/csmith/include/csmith-2.3.0"
)
class CsmithBenchmark(BenchmarkWithSource):
"""A CSmith benchmark."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._src = None
self.proto.dynamic_config.MergeFrom(
BenchmarkDynamicConfig(
build_cmd=Command(
argument=["$CC", "$IN"] + llvm_benchmark.get_system_library_flags(),
outfile=["a.out"],
timeout_seconds=60,
),
run_cmd=Command(
argument=["./a.out"],
timeout_seconds=300,
),
)
)
@classmethod
def create(cls, uri: str, bitcode: bytes, src: bytes) -> Benchmark:
"""Create a benchmark from paths."""
benchmark = cls.from_file_contents(uri, bitcode)
benchmark._src = src # pylint: disable=protected-access
return benchmark
@memoized_property
def sources(self) -> Iterable[BenchmarkSource]:
return [
BenchmarkSource(filename="source.c", contents=self._src),
]
@property
def source(self) -> str:
"""Return the single source file contents as a string."""
return self._src.decode("utf-8")
[docs]class CsmithDataset(Dataset):
"""A dataset which uses Csmith to generate programs.
Csmith is a tool that can generate random conformant C99 programs. It is
described in the publication:
Yang, Xuejun, Yang Chen, Eric Eide, and John Regehr. "Finding and
understanding bugs in C compilers." In Proceedings of the 32nd ACM
SIGPLAN conference on Programming Language Design and Implementation
(PLDI), pp. 283-294. 2011.
For up-to-date information about Csmith, see:
https://embed.cs.utah.edu/csmith/
Note that Csmith is a tool that is used to find errors in compilers. As
such, there is a higher likelihood that the benchmark cannot be used for an
environment and that :meth:`env.reset()
<compiler_gym.envs.CompilerEnv.reset>` will raise :class:`BenchmarkInitError
<compiler_gym.datasets.BenchmarkInitError>`.
"""
def __init__(
self,
site_data_base: Path,
sort_order: int = 0,
csmith_bin: Optional[Path] = None,
csmith_includes: Optional[Path] = None,
):
"""Constructor.
:param site_data_base: The base path of a directory that will be used to
store installed files.
:param sort_order: An optional numeric value that should be used to
order this dataset relative to others. Lowest value sorts first.
:param csmith_bin: The path of the Csmith binary to use. If not
provided, the version of Csmith shipped with CompilerGym is used.
:param csmith_includes: The path of the Csmith includes directory. If
not provided, the includes of the Csmith shipped with CompilerGym is
used.
"""
super().__init__(
name="generator://csmith-v0",
description="Random conformant C99 programs",
references={
"Paper": "http://web.cse.ohio-state.edu/~rountev.1/5343/pdf/pldi11.pdf",
"Homepage": "https://embed.cs.utah.edu/csmith/",
},
license="BSD",
site_data_base=site_data_base,
sort_order=sort_order,
benchmark_class=CsmithBenchmark,
)
self.csmith_bin_path = csmith_bin or _CSMITH_BIN
self.csmith_includes_path = csmith_includes or _CSMITH_INCLUDES
# The command that is used to compile an LLVM-IR bitcode file from a
# Csmith input. Reads from stdin, writes to stdout.
self.clang_compile_command: List[str] = ClangInvocation.from_c_file(
"-", # Read from stdin.
copt=[
"-xc", # The C programming language.
"-ferror-limit=1", # Stop on first error.
"-w", # No warnings.
f"-I{self.csmith_includes_path}", # Include the Csmith headers.
],
).command(
outpath="-" # Write to stdout.
)
@property
def size(self) -> int:
# Actually 2^32 - 1, but practically infinite for all intents and
# purposes.
return 0
def benchmark_uris(self) -> Iterable[str]:
return (f"{self.name}/{i}" for i in range(UINT_MAX))
def benchmark_from_parsed_uri(self, uri: BenchmarkUri) -> CsmithBenchmark:
seed = int(uri.path[1:])
return self.benchmark_from_seed(seed)
def _random_benchmark(self, random_state: np.random.Generator) -> Benchmark:
seed = random_state.integers(UINT_MAX)
return self.benchmark_from_seed(seed)
def benchmark_from_seed(
self, seed: int, max_retries: int = 3, retry_count: int = 0
) -> CsmithBenchmark:
"""Get a benchmark from a uint32 seed.
:param seed: A number in the range 0 <= n < 2^32.
:return: A benchmark instance.
:raises OSError: If Csmith fails.
:raises BenchmarkInitError: If the C program generated by Csmith cannot
be lowered to LLVM-IR.
"""
if retry_count >= max_retries:
raise OSError(
f"Csmith failed after {retry_count} {plural(retry_count, 'attempt', 'attempts')} "
f"with seed {seed}"
)
self.install()
# Run csmith with the given seed and pipe the output to clang to
# assemble a bitcode.
logger.debug("Exec csmith --seed %d", seed)
try:
with Popen(
[str(self.csmith_bin_path), "--seed", str(seed)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as csmith:
# Generate the C source.
src, stderr = communicate(csmith, timeout=300)
if csmith.returncode:
try:
stderr = "\n".join(
truncate(
stderr.decode("utf-8"), max_line_len=200, max_lines=20
)
)
logger.warning("Csmith failed with seed %d: %s", seed, stderr)
except UnicodeDecodeError:
# Failed to interpret the stderr output, generate a generic
# error message.
logger.warning("Csmith failed with seed %d", seed)
return self.benchmark_from_seed(
seed, max_retries=max_retries, retry_count=retry_count + 1
)
# Compile to IR.
with Popen(
self.clang_compile_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
) as clang:
stdout, _ = communicate(clang, input=src, timeout=300)
if clang.returncode:
compile_cmd = " ".join(self.clang_compile_command)
raise BenchmarkInitError(
f"Compilation job failed!\n"
f"Csmith seed: {seed}\n"
f"Command: {compile_cmd}\n"
)
except subprocess.TimeoutExpired:
raise BenchmarkInitError(
f"Benchmark generation using seed {seed} timed out"
)
return self.benchmark_class.create(f"{self.name}/{seed}", stdout, src)