API Reference

This section contains the complete API reference for the simple-code-execution library.

Core Modules

Main Package

Execution Engine

Module for executing code.

code_execution.execution.seconds_to_human(seconds)[source]

Converts seconds to a human readable format.

code_execution.execution.safe_execute(command_to_run: List[str], working_dir: Path, timeout: int = 10, num_times: int = 1, stdin: str | None = None, stdout_postprocessor: Callable | None = None) CommandResult[source]

Executes a list of commands safely. :param command_to_run: The command to run. :param working_dir: The working directory to run them in. :param timeout Timeout.: :param num_times: Number of times to execute the command. Useful for getting

runtime and memory means.

Parameters:
  • stdin – The stdin for the command.

  • stdout_postprocessor – A postprocessor for the stdout.

Returns:

The result of executing the command.

code_execution.execution.serial_execute_code(key, sample: CommandsToRun) ExecutionResult[source]

Execute a file of code. :param sample: The sample to run.

Returns:

The execution result.

code_execution.execution.execute_single(execution_dict: Dict) Tuple[Tuple, ExecutionResult][source]

Executes a single program.

code_execution.execution.batched_execute_code(to_run: List[Dict]) List[Dict][source]

Executes a batch of commands.

code_execution.execution.sizeof_fmt(num, suffix='B')[source]

Human readable file size.

code_execution.execution.threaded_execution(to_run, execution_fn, max_threads, is_batched: bool = False)[source]

Executes a list of commands in parallel.

code_execution.execution.execute_commands(predictions, config: ExecutionConfig) Tuple[float, float, List[ExecutionResult]][source]

Executes a list of commands.

Entry Points

Module for entrypoints for code execution.

class code_execution.entrypoints.ChunkExecutionResult(results: Dict[Tuple[int, int], ExecutionResult], write_elapsed: float, exec_elapsed: float, pure_exec_elapsed: float, write_timings: Dict[str, float], cleanup_timings: Dict[str, float])[source]

Bases: object

Stores the execution result for a chunk of commands.

results: Dict[Tuple[int, int], ExecutionResult]
write_elapsed: float
exec_elapsed: float
pure_exec_elapsed: float
write_timings: Dict[str, float]
cleanup_timings: Dict[str, float]
__init__(results: Dict[Tuple[int, int], ExecutionResult], write_elapsed: float, exec_elapsed: float, pure_exec_elapsed: float, write_timings: Dict[str, float], cleanup_timings: Dict[str, float]) None
code_execution.entrypoints.execute_predictions(config: ExecutionConfig, pred_list: List[Dict], preprocessor: Callable[[Dict], Executable | List[Executable]], postprocessor: Callable[[Dict, Dict], Dict] | None = None, debug_dir: Path | None = None, preproc_returns_list: bool = False, preproc_batch_size: int = 1, error_directory: Path | None = None) OverallExecutionResults[source]

Executes the program predictions.

First preprocesses the commands to run, writes them to disk, then executes them, and finally postprocesses the results.

Parameters:
  • config – The config for execution.

  • pred_list – The list of predictions to execute.

  • preprocessor – The preprocessor function to create files and commands.

  • postprocessor – The postprocessor function for processing results.

  • debug_dir – Directory to save all files for debugging.

  • preproc_returns_list – Whether preprocessor returns one or many items.

  • preproc_batch_size – The batch size for preprocessing.

  • error_directory – Directory to save errors to.

Returns:

The results of the execution.

Return type:

OverallExecutionResults

Data Structures

Data structures for code execution.

class code_execution.data_structures.Command(command: ~typing.List[str], timeout: float | None = None, num_times: int = 1, stdin: ~typing.List[str] = <factory>)[source]

Bases: object

Dataclass for a command to execute.

Parameters:
  • command – The command to execute.

  • timeout – The timeout for the command. If not set, the default timeout is used.

  • num_times – Number of times to execute the command.

  • stdin – The stdin for the command.

command: List[str]
timeout: float | None = None
num_times: int = 1
stdin: List[str]
__init__(command: ~typing.List[str], timeout: float | None = None, num_times: int = 1, stdin: ~typing.List[str] = <factory>) None
class code_execution.data_structures.CommandResult(return_code: int, runtime: float, stdout: str, stderr: str, timed_out: bool, had_unexpected_error: bool = False)[source]

Bases: object

Dataclass for the result of executing a command.

Parameters:
  • return_code – The return code.

  • runtime – The runtime.

  • stdout – The stdout.

  • stderr – The stderr.

  • timed_out – Whether the command timed out.

  • had_unexpected_error – Whether the command had an unexpected error.

return_code: int
runtime: float
stdout: str
stderr: str
timed_out: bool
had_unexpected_error: bool = False
property had_error: bool

Whether the last command had an error.

__init__(return_code: int, runtime: float, stdout: str, stderr: str, timed_out: bool, had_unexpected_error: bool = False) None
class code_execution.data_structures.ExecutionResult(key: str, command_results: List[CommandResult], elapsed: float, cwd: str, tracked_files: Dict[str, str], expected_num_commands: int, writing_time: float = -1, cleanup_time: float = -1, preprocess_time: float = -1)[source]

Bases: object

Dataclass for the result of executing a list of commands.

Parameters:
  • key – The key for the result.

  • command_results – The results of the commands.

  • elapsed – The elapsed time.

  • cwd – The current working directory.

  • tracked_files – The tracked files.

  • expected_num_commands – The expected number of commands ran.

key: str
command_results: List[CommandResult]
elapsed: float
cwd: str
tracked_files: Dict[str, str]
expected_num_commands: int
writing_time: float = -1
cleanup_time: float = -1
preprocess_time: float = -1
property timed_out: bool

Whether the last command timed out.

property had_error: bool

Whether the last command had an error.

property last_cmd: CommandResult

The last command result.

all_had_return_code(return_code: int) bool[source]

Whether all commands had the same return code.

to_dict(include_command_results: bool = False) Dict[source]

Converts the result to a dictionary.

classmethod invalid_result(key: str, num_commands: int = 1, runtime: float = 10.0, return_code: int = 1, stdout: str = 'SyntaxError', stderr: str = 'Invalid', elapsed: float = 10.0) ExecutionResult[source]

Creates a dummy ExecutionResult that represents an invalid result. Useful for when your preprocessor finds a program you want to skip execution for.

__init__(key: str, command_results: List[CommandResult], elapsed: float, cwd: str, tracked_files: Dict[str, str], expected_num_commands: int, writing_time: float = -1, cleanup_time: float = -1, preprocess_time: float = -1) None
code_execution.data_structures.default_should_early_stop(cmd_idx: int, res: CommandResult, expected_rtr_code: int | None = 0, stop_for_timeout: bool = True, **_k) bool[source]
class code_execution.data_structures.Executable(files: ~typing.Dict[str, str], commands: ~typing.List[~code_execution.data_structures.Command], tracked_files: ~typing.List[str] = <factory>, should_early_stop: ~typing.Callable[[int, ~code_execution.data_structures.CommandResult], bool] = <function default_should_early_stop>, stdout_postprocessor: ~typing.Callable[[str], str] | None = None)[source]

Bases: object

Dataclass to represent the commands and setup needed to execute a prediction.

Parameters:
  • files – The files to write.

  • commands – The commands to run.

  • tracked_files – The files to get contents of after execution.

  • should_early_stop – A function that takes the index of the command and the result, returning a bool if the execution should stop early. THIS MUST BE PICKLEABLE

files: Dict[str, str]
commands: List[Command]
tracked_files: List[str]
should_early_stop(res: CommandResult, expected_rtr_code: int | None = 0, stop_for_timeout: bool = True, **_k) bool
stdout_postprocessor: Callable[[str], str] | None = None
__init__(files: ~typing.Dict[str, str], commands: ~typing.List[~code_execution.data_structures.Command], tracked_files: ~typing.List[str] = <factory>, should_early_stop: ~typing.Callable[[int, ~code_execution.data_structures.CommandResult], bool] = <function default_should_early_stop>, stdout_postprocessor: ~typing.Callable[[str], str] | None = None) None
class code_execution.data_structures.CommandsToRun(cwd: ~pathlib.Path, commands: ~typing.List[~code_execution.data_structures.Command], tracked_files: ~typing.List[str] = <factory>, should_early_stop: ~typing.Callable[[int, ~code_execution.data_structures.CommandResult], bool] = <function default_should_early_stop>, stdout_postprocessor: ~typing.Callable[[str], str] | None = None)[source]

Bases: object

Dataclass to represent the information needed to run a command.

The main reason to have this class is to avoid the need to pass around the raw files to every function.

Parameters:
  • cwd – The current working directory.

  • commands – The commands to run.

  • tracked_files – The files to get contents of after execution.

cwd: Path
commands: List[Command]
tracked_files: List[str]
should_early_stop(res: CommandResult, expected_rtr_code: int | None = 0, stop_for_timeout: bool = True, **_k) bool
stdout_postprocessor: Callable[[str], str] | None = None
__init__(cwd: ~pathlib.Path, commands: ~typing.List[~code_execution.data_structures.Command], tracked_files: ~typing.List[str] = <factory>, should_early_stop: ~typing.Callable[[int, ~code_execution.data_structures.CommandResult], bool] = <function default_should_early_stop>, stdout_postprocessor: ~typing.Callable[[str], str] | None = None) None
class code_execution.data_structures.OverallExecutionResults(results: List[Dict], net_time: float, pure_exec_time: float, execution_time: float, writing_time: float, postprocessing_time: float, preprocessing_time: float, timestamp: str = None)[source]

Bases: object

results: List[Dict]
net_time: float
pure_exec_time: float
execution_time: float
writing_time: float
postprocessing_time: float
preprocessing_time: float
timestamp: str = None
property timing_dict: Dict
__init__(results: List[Dict], net_time: float, pure_exec_time: float, execution_time: float, writing_time: float, postprocessing_time: float, preprocessing_time: float, timestamp: str | None = None) None

Configuration

This file contains the configuration for the code execution module.

class code_execution.configs.ExecutionConfig(num_workers: int, max_tasks_per_process: int | None = None, write_rate_limit: int = 768, chunk_size: int = 1, batch_size: int = 1, disable_tqdm: bool = False, default_timeout: int = 10, max_execute_at_once: int = -1, num_executors: int = 4, log_freq: int = 1000, buffer_size: int = 100, display_write_progress: bool = False, write_log_freq: int = 100000)[source]

Bases: object

Config for execution. :param num_workers: The number of workers to use. :param batch_size: The batch size to use for pre- and post- processing. :param max_tasks_per_process: The maximum number of tasks to run per process. If not none,

the worker will be killed every max_tasks_per_process and a new one will be created.

Parameters:
  • write_rate_limit – The rate limit for writing files. By default it is 768.

  • chunk_size – The chunk size for parallel execution.

  • batch_size – The batch size to use for all parallel operations.

  • disable_tqdm – Whether to disable tqdm.

  • execution_chunk_size – The chunk size for execution.

  • default_timeout – The default timeout for execution.

  • max_execute_at_once – The maximum number of predictions to execute at a single time.

  • num_executors – The number of executor processes running.

  • log_freq – How often to log progress.

  • buffer_size – Chunk size to use for execution.

  • display_write_progress – Display progress bars for writing and cleaning up.

  • write_log_freq – Frequency for writing log messages.

num_workers: int
max_tasks_per_process: int | None = None
write_rate_limit: int = 768
chunk_size: int = 1
batch_size: int = 1
disable_tqdm: bool = False
default_timeout: int = 10
max_execute_at_once: int = -1
num_executors: int = 4
log_freq: int = 1000
buffer_size: int = 100
display_write_progress: bool = False
write_log_freq: int = 100000
property batched

Whether to use batched processing.

__init__(num_workers: int, max_tasks_per_process: int | None = None, write_rate_limit: int = 768, chunk_size: int = 1, batch_size: int = 1, disable_tqdm: bool = False, default_timeout: int = 10, max_execute_at_once: int = -1, num_executors: int = 4, log_freq: int = 1000, buffer_size: int = 100, display_write_progress: bool = False, write_log_freq: int = 100000) None

Processing

Functions for preprocessing and postprocessing the commands to run.

class code_execution.processing.PredTimingsCollection(preprocess_time: Dict[str, float], writing_time: Dict[str, float], cleanup_time: Dict[str, float])[source]

Bases: object

Stores the timings per prediction for different aspects.

preprocess_time: Dict[str, float]
writing_time: Dict[str, float]
cleanup_time: Dict[str, float]
__init__(preprocess_time: Dict[str, float], writing_time: Dict[str, float], cleanup_time: Dict[str, float]) None
code_execution.processing.default_postprocessor(prediction: Dict, result: ExecutionResult, **_) Dict[source]

Adds the result to the prediction dict.

code_execution.processing.preprocess_commands(config: ExecutionConfig, dir_to_use: Path, pred_list: List[Dict], preprocessor: Callable[[Dict], Executable | ExecutionResult], preproc_returns_list: bool = False, batch_size: int = 1) Tuple[List[Dict], List[Dict], Dict[Tuple[int, int], ExecutionResult]][source]

Preprocesses the commands to run. :param config: The execution config. :param dir_to_use: The directory to use for execution. :param pred_list: The list of predictions. :param preprocessor: The preprocessor to use. :param preproc_returns_list: Whether the preprocessor returns a list of executables. :param batch_size: The batch size to use for execution. :param error_directory: The directory to save errors to.

Returns:

The files to write to disk. commands_to_run: The commands to run. filtered_out: The results that were filtered out during preprocessing,

these will be added back after execution.

timings: The timings of the preprocessing each example.

Return type:

files_to_write

code_execution.processing.postprocess_commands(raw_preds: Dict, results: Dict[Tuple[int, int], ExecutionResult], postprocessor: Callable[[Dict, ExecutionResult], Dict], timings: PredTimingsCollection, returned_multiple: bool, disable_tqdm: bool = False, log_freq: int = 1000) List[Dict][source]

Postprocesses the commands after exeuction.

Parameters:
  • raw_preds (Dict) – The raw predictions before postprocessing, used to add back information.

  • results (Dict[Tuple[int, int], ExecutionResult]) – The results of executions where the key is used for ordering and the value is the result post execution.

  • postprocessor (Callable) – The postprocessor function to use.

  • timings (PredTimingsCollection) – The timings of the predictions.

  • returned_multiple (bool) – Whether the preprocessor returned multiple results per prediction.

  • disable_tqdm (bool, optional) – Whether to disable tqdm. Defaults to False.

  • log_freq (int, optional) – How often to log. Defaults to 1000.

Returns:

The postprocessed results.

Return type:

List[Dict]

File Writing

Module for handling writing executables to disk.

exception code_execution.file_writing.WritingFailure[source]

Bases: Exception

Exception raised when writing a file fails.

code_execution.file_writing.write_executables(files_to_write: List[Tuple], write_rate_limit: int, enable_tqdm: bool = False, log_freq: int = 100000)[source]

Writes the executables to the disk.

Parameters:
  • files_to_write (List[Dict]) – The list of files to write. Each item is a dict where the key is a absolute path to the file and the value is the contents.

  • write_rate_limit (int) – The asynchronous write rate limit.

  • enable_tqdm (bool, optional) – Whether to enable the progress bars. Defaults to False.

Raises:

ValueError – If the prediction directory does not exist.

code_execution.file_writing.cleanup(files: List[Tuple], rate_limit: int, enable_tqdm: bool = False)[source]

Cleans up the executables on the disk.

Parameters:
  • files (List[Tuple]) – The list of files to clean up.

  • rate_limit (int) – The rate limit (# threads) for cleaning up the files.

  • disable_tqdm (bool) – Disable the progress bars.

  • quiet (bool, optional) – Whether to suppress logging. Defaults to False.

Raises:

ValueError – If the prediction directory exists after cleanup.

Code Trees

code_execution.code_trees.safe_ast_parse(code) Module[source]

Safely parse a string of code into an AST, if possible. Otherwise return None.

code_execution.code_trees.is_valid_python(code)[source]

Checks if the code is valid python.

code_execution.code_trees.is_simple_test_case(tree)[source]

Checks if the test case is an assert with a function call on the left.

code_execution.code_trees.get_global_imports(tree: Module) List[str][source]

Get the global imports from an ast tree as a list of strings.

code_execution.code_trees.convert_call_to_assert(call: str, expected_output: str, requires_float=False, return_str: bool = False) Module | str[source]

Coverts call code to an assertion with an expected output.

The call code must end in an ast.Expr node, which is the node that will be converted to an assertion.

The expected output must be an expression.

Parameters:
  • call – The code to be converted to an assertion.

  • expected_output – The expected output of the call.

  • requires_float – Whether the expected output is a float. If so, we will add a tolerance of 1e-6.

  • return_str – Whether to return the converted code as a string or as an ast tree.

Returns:

The converted ast tree or the converted code.

code_execution.code_trees.convert_test_list_to_assert(test_list: List[Tuple[str, str, bool] | str], timeout: float = -1.0, convert_to_string: bool = False) List[AST | str][source]

Converts a list of test cases to assertion nodes.

Parameters:
  • test_list – A list of test cases. Each test case can be a string or a tuple of (call, output, requires_float). If the test case is a string, it will be parsed as a call. If it is a tuple, it will be converted to an assertion.

  • timeout – The timeout for parsing the test cases.

  • convert_to_string – Whether to convert the resulting AST to a string.

Returns:

A list of converted test cases as AST nodes or strings.

code_execution.code_trees.wrap_assert_in_try_print(idx: int, call: str, output: str, requires_float: bool, print_formatter: Callable[[int], Tuple[str, str, List[Tuple[str, str]]]]) str[source]

Wraps a test case in a try-except block that prints the result.

The resulting code will be: ``` try:

{ASSERTION} print({pass_str})

except AssertionError:

print({fail_str})

` The exceptions will be appended as: ` except {exception_type} as e:

print({print_string})

```

Parameters:
  • idx – The index of the test case.

  • call – The call code.

  • output – The expected output code.

  • requires_float – Whether the expected output is a float.

  • print_formatter – A function that takes in the index and returns the pass, fail, and a list of length 2 tuples for exceptions. For the exception strings, they should be in the format (exception_type, print_string). The except clause will be `except {exception_type} as e

  • print ({print_string}) – resulting strings will be passed directly to print().

code_execution.code_trees.remove_deep_trees(code_lines: List[str], tree_process_func: Callable, timeout: float)[source]
code_execution.code_trees.default_tree_process_func(code: str)[source]
code_execution.code_trees.remove_trees_from_lists(codes: ~typing.List, tree_process_func: ~typing.Callable = <function default_tree_process_func>, timeout=2, num_workers=4, batch_size=100, **parallel_kwargs) List[List[str]][source]

Removes deep trees from the code.

Utilities

Utility functions for code execution.

code_execution.utils.in_notebook()[source]

Checks if the code is running in a notebook.

class code_execution.utils.RunThread(func, *args, **kwargs)[source]

Bases: Thread

Class that will allow asycnio to run in a thread when called from Jupyter.

__init__(func, *args, **kwargs)[source]

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

code_execution.utils.notebook_safe_async_run(target, *args, **kwargs)[source]

Run an async function in a thread.

code_execution.utils.wrap_processor(processor_fn: Callable, batch_size: int, returns_list: bool) Callable[source]

Wraps a processor function to handle batching.

code_execution.utils.get_pred_dir(idx: int, parent: Path)[source]

Gets the prediction directory for a prediction.

exception code_execution.utils.ContextTimeLimitException[source]

Bases: Exception

Timeout error for running commands.

code_execution.utils.timeout_signal_handler(signum, frame)[source]
class code_execution.utils.TimeoutContext(seconds, on_end=None)[source]

Bases: object

__init__(seconds, on_end=None)[source]
code_execution.utils.timeout_decorator(seconds: int = 10)[source]
code_execution.utils.time_limit(seconds: float, on_end: Callable = None)[source]

Sets a time limit.

class code_execution.utils.WriteOnlyStringIO(initial_value='', newline='\n')[source]

Bases: StringIO

StringIO that throws an exception when it’s read from

read(*args, **kwargs)[source]

Read at most size characters, returned as a string.

If the argument is negative or omitted, read until EOF is reached. Return an empty string at EOF.

readline(*args, **kwargs)[source]

Read until newline or EOF.

Returns an empty string if EOF is hit immediately.

readlines(*args, **kwargs)[source]

Return a list of lines from the stream.

hint can be specified to control the number of lines read: no more lines will be read if the total size (in bytes/characters) of all lines so far exceeds hint.

readable(*args, **kwargs)[source]

Returns True if the IO object can be read.

class code_execution.utils.redirect_stdin(new_target)[source]

Bases: _RedirectStream

code_execution.utils.swallow_io()[source]
code_execution.utils.get_module_and_call(module_name: str) Tuple[str, str, str][source]

Gets the import code, call, and module source code for a module.

Parameters:

module_name (str) – The name of the module to get.

Returns:

Import code, call, and module source code.

Return type:

Tuple[str,str, str]

code_execution.utils.get_results_from_generator(generator: Generator, total: int, target_returns_multiple: bool, garbage_collect_freq: int, log_freq: int)[source]

Gets the results from a generator.

Parameters:
  • generator (Generator) – The generator to get results from.

  • total (int) – The total number of items in the generator.

  • target_returns_multiple (bool) – If the target returns multiple items per iteration.

  • disable_tqdm (bool) – Whether to disable the progress bar.

  • garbage_collect_freq (int) – How often to perform garbage collection.

  • log_freq (int) – How often to log if not using tqdm.

  • quiet (bool, optional) – Whether to suppress logging. Defaults to False.

Returns:

The results from the generator.

Return type:

List

code_execution.utils.run_in_parallel(target: Callable, args: List, num_workers: int, desc: str | None = None, max_tasks_per_process: int | None = None, disable_tqdm: bool = False, garbage_collect_freq: int = 500, chunk_size: int = 1, log_freq: int = 500, target_returns_multiple: bool = False, tqdm_kwargs: Dict | None = None) List[source]

Runs a function in parallel.

Parameters:
  • target (Callable) – The function to run.

  • args (List) – The arguments to pass to the function.

  • num_workers (int) – The number of workers to use.

  • desc (str) – The description to use for the progress bar.

  • max_tasks_per_process (Optional[int], optional) – Maximum number of tasks before starting a new process. Defaults to None.

  • disable_tqdm (bool, optional) – Disable the progress bar. Defaults to False.

  • garbage_collect_freq (int, optional) – How often to perform garbage collection. Defaults to 500.

  • chunk_size (int, optional) – The chunk size to use for imap. Defaults to 1.

  • log_freq (int, optional) – How often to log if not using tqdm. Defaults to 500.

  • target_returns_multiple (bool, optional) – If the target returns multiple so that .extend is used instead of .append. Defaults to False.

  • tqdm_kwargs (Optional[Dict], optional) – Additional keyword arguments to pass to tqdm. Defaults to None.

Returns:

The results of target(a) for each a in args.

Return type:

List

code_execution.utils.configure_logging(level: int = 10, format: str | None = None, datefmt: str | None = None)[source]
code_execution.utils.get_mem_limit_code(mem_limit: str | None, trailing: str = '\n') str[source]

Gets the code to set the memory limit.

Parameters:
  • mem_limit (str) – The memory limit value as a string. You can do something like “4 * 1024” or “1024”. If None, will return an empty string.

  • trailing – The trailing characters to add to the code.

Returns:

The code to set the memory limit.

Return type:

str

Metrics

Metrics for evaluating the performance of the code execution.

code_execution.metrics.naive_process_result(result)[source]

The most naive way to process the result.

code_execution.metrics.pass_at_k(n, c, k)[source]
Parameters:
  • n – total number of samples

  • c – number of correct samples

  • k – k in pass@$k$

code_execution.metrics.calc_question_metric(question_predictions)[source]

Calculate question metrics from the predictions.