# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import shutil
import subprocess
import tempfile
from pathlib import Path
from threading import Lock
from typing import Iterable, Optional, Union
import numpy as np
from fasteners import InterProcessLock
from compiler_gym.datasets import Benchmark, BenchmarkSource, Dataset
from compiler_gym.datasets.benchmark import BenchmarkWithSource
from compiler_gym.datasets.uri import BenchmarkUri
from compiler_gym.envs.gcc.gcc import Gcc
from compiler_gym.util.commands import Popen
from compiler_gym.util.decorators import memoized_property
from compiler_gym.util.runfiles_path import runfiles_path
from compiler_gym.util.shell_format import plural
from compiler_gym.util.truncate import truncate
logger = logging.getLogger(__name__)
# The maximum value for the --seed argument to csmith.
UINT_MAX = (2**32) - 1
_CSMITH_BIN = runfiles_path("compiler_gym/third_party/csmith/csmith/bin/csmith")
_CSMITH_INCLUDES = runfiles_path(
"compiler_gym/third_party/csmith/csmith/include/csmith-2.3.0"
)
_CSMITH_INSTALL_LOCK = Lock()
# TODO(github.com/facebookresearch/CompilerGym/issues/325): This can be merged
# with the LLVM implementation.
class CsmithBenchmark(BenchmarkWithSource):
"""A CSmith benchmark."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._src = None
@classmethod
def create(cls, uri: str, bitcode: bytes, src: bytes) -> Benchmark:
"""Create a benchmark from paths."""
benchmark = cls.from_file_contents(uri, bitcode)
benchmark._src = src # pylint: disable=protected-access
return benchmark
@memoized_property
def sources(self) -> Iterable[BenchmarkSource]:
return [
BenchmarkSource(filename="source.c", contents=self._src),
]
@property
def source(self) -> str:
"""Return the single source file contents as a string."""
return self._src.decode("utf-8")
[docs]class CsmithDataset(Dataset):
"""A dataset which uses Csmith to generate programs.
Csmith is a tool that can generate random conformant C99 programs. It is
described in the publication:
Yang, Xuejun, Yang Chen, Eric Eide, and John Regehr. "Finding and
understanding bugs in C compilers." In Proceedings of the 32nd ACM
SIGPLAN conference on Programming Language Design and Implementation
(PLDI), pp. 283-294. 2011.
For up-to-date information about Csmith, see:
https://embed.cs.utah.edu/csmith/
Note that Csmith is a tool that is used to find errors in compilers. As
such, there is a higher likelihood that the benchmark cannot be used for an
environment and that :meth:`env.reset()
<compiler_gym.envs.CompilerEnv.reset>` will raise :class:`BenchmarkInitError
<compiler_gym.datasets.BenchmarkInitError>`.
"""
def __init__(
self,
gcc_bin: Union[Path, str],
site_data_base: Path,
sort_order: int = 0,
csmith_bin: Optional[Path] = None,
csmith_includes: Optional[Path] = None,
):
"""Constructor.
:param site_data_base: The base path of a directory that will be used to
store installed files.
:param sort_order: An optional numeric value that should be used to
order this dataset relative to others. Lowest value sorts first.
:param csmith_bin: The path of the Csmith binary to use. If not
provided, the version of Csmith shipped with CompilerGym is used.
:param csmith_includes: The path of the Csmith includes directory. If
not provided, the includes of the Csmith shipped with CompilerGym is
used.
"""
super().__init__(
name="generator://csmith-v0",
description="Random conformant C99 programs",
references={
"Paper": "http://web.cse.ohio-state.edu/~rountev.1/5343/pdf/pldi11.pdf",
"Homepage": "https://embed.cs.utah.edu/csmith/",
},
license="BSD",
site_data_base=site_data_base,
sort_order=sort_order,
benchmark_class=CsmithBenchmark,
)
self.gcc_bin = gcc_bin
self.csmith_bin_path = csmith_bin or _CSMITH_BIN
self.csmith_includes_path = csmith_includes or _CSMITH_INCLUDES
self._install_lockfile = self.site_data_path / ".install.LOCK"
@property
def size(self) -> int:
# Actually 2^32 - 1, but practically infinite for all intents and
# purposes.
return 0
@memoized_property
def gcc(self):
# Defer instantiation of Gcc from the constructor as it will fail if the
# given Gcc is not available. Memoize the result as initialization is
# expensive.
return Gcc(bin=self.gcc_bin)
def benchmark_uris(self) -> Iterable[str]:
return (f"{self.name}/{i}" for i in range(UINT_MAX))
def benchmark_from_parsed_uri(self, uri: BenchmarkUri) -> CsmithBenchmark:
return self.benchmark_from_seed(int(uri.path[1:]))
def _random_benchmark(self, random_state: np.random.Generator) -> Benchmark:
seed = random_state.integers(UINT_MAX)
return self.benchmark_from_seed(seed)
@property
def installed(self) -> bool:
return super().installed and (self.site_data_path / "includes").is_dir()
def install(self) -> None:
super().install()
if self.installed:
return
with _CSMITH_INSTALL_LOCK, InterProcessLock(self._install_lockfile):
if (self.site_data_path / "includes").is_dir():
return
# Copy the Csmith headers into the dataset's site directory path because
# in bazel builds this includes directory is a symlink, and we need
# actual files that we can use in a docker volume.
shutil.copytree(
self.csmith_includes_path,
self.site_data_path / "includes.tmp",
)
# Atomic directory rename to prevent race on install().
(self.site_data_path / "includes.tmp").rename(
self.site_data_path / "includes"
)
def benchmark_from_seed(
self, seed: int, max_retries: int = 3, retry_count: int = 0
) -> CsmithBenchmark:
"""Get a benchmark from a uint32 seed.
:param seed: A number in the range 0 <= n < 2^32.
:return: A benchmark instance.
:raises OSError: If Csmith fails.
:raises BenchmarkInitError: If the C program generated by Csmith cannot
be lowered to LLVM-IR.
"""
if retry_count >= max_retries:
raise OSError(
f"Csmith failed after {retry_count} {plural(retry_count, 'attempt', 'attempts')} "
f"with seed {seed}"
)
self.install()
# Run csmith with the given seed and pipe the output to clang to
# assemble a bitcode.
logger.debug("Exec csmith --seed %d", seed)
with Popen(
[str(self.csmith_bin_path), "--seed", str(seed)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as csmith:
# Generate the C source.
src, stderr = csmith.communicate(timeout=300)
if csmith.returncode:
try:
stderr = "\n".join(
truncate(stderr.decode("utf-8"), max_line_len=200, max_lines=20)
)
logger.warning("Csmith failed with seed %d: %s", seed, stderr)
except UnicodeDecodeError:
# Failed to interpret the stderr output, generate a generic
# error message.
logger.warning("Csmith failed with seed %d", seed)
return self.benchmark_from_seed(
seed, max_retries=max_retries, retry_count=retry_count + 1
)
# Pre-process the source.
with tempfile.TemporaryDirectory() as tmpdir:
src_file = f"{tmpdir}/src.c"
with open(src_file, "wb") as f:
f.write(src)
preprocessed_src = self.gcc(
"-E",
"-I",
str(self.site_data_path / "includes"),
"-o",
"-",
src_file,
cwd=tmpdir,
timeout=60,
volumes={
str(self.site_data_path / "includes"): {
"bind": str(self.site_data_path / "includes"),
"mode": "ro",
}
},
)
return self.benchmark_class.create(
f"{self.name}/{seed}", preprocessed_src.encode("utf-8"), src
)