OpenHands SWE (新版的,基于SDK清晰一些)
准备数据Eval信息等
主函数(SWEEvaluator,EvalMetadata等信息)
def main() -> None:
prompt_dir = (Path(__file__).parent / "prompts").resolve()
choices = [str(p.relative_to(Path.cwd())) for p in prompt_dir.glob("*.j2")]
default_prompt_path = prompt_dir / "default.j2"
assert default_prompt_path.exists(), (
f"Default prompt {default_prompt_path} not found"
)
parser = get_parser()
parser.add_argument(
"--prompt-path",
type=str,
default=str(default_prompt_path),
choices=choices,
help="Path to prompt template file",
)
args = parser.parse_args()
# Validate max_attempts
if args.max_attempts < 1:
raise ValueError(f"max_attempts must be >= 1, got {args.max_attempts}")
llm_config_path = args.llm_config_path
if not os.path.isfile(llm_config_path):
raise ValueError(f"LLM config file {llm_config_path} does not exist")
with open(llm_config_path, "r") as f:
llm_config = f.read()
llm = LLM.model_validate_json(llm_config)
logger.info("Using LLM config: %s", llm.model_dump_json(indent=2))
dataset_description = (
args.dataset.replace("/", "__") + "-" + args.split.replace("/", "__")
)
structured_output_dir = construct_eval_output_dir(
base_dir=args.output_dir,
dataset_name=dataset_description,
model_name=llm.model,
max_iterations=args.max_iterations,
eval_note=args.note,
)
# Create critic instance from parsed arguments
critic = create_critic(args)
logger.info(f"Using critic: {type(critic).__name__}")
metadata = EvalMetadata(
llm=llm,
dataset=args.dataset,
dataset_split=args.split,
max_iterations=args.max_iterations,
eval_output_dir=structured_output_dir,
details={},
prompt_path=args.prompt_path,
eval_limit=args.n_limit,
env_setup_commands=["export PIP_CACHE_DIR=~/.cache/pip"],
max_attempts=args.max_attempts,
critic=critic,
selected_instances_file=args.select,
max_retries=args.max_retries,
workspace_type=args.workspace,
)
# Run orchestrator with a simple JSONL writer
# 构造SWEBenchEvaluator
evaluator = SWEBenchEvaluation(
metadata=metadata,
num_workers=args.num_workers,
)
# 开始运行
evaluator.run(on_result=get_default_on_result_writer(evaluator.output_path))
logger.info("Evaluation completed!")
if __name__ == "__main__":
main()准备Instances (SWEBenchEvaluation.prepare_instances)
def prepare_instances(self) -> List[EvalInstance]:
logger.info("Setting up SWE-bench evaluation data")
df = get_dataset(
dataset_name=self.metadata.dataset,
split=self.metadata.dataset_split,
eval_limit=self.metadata.eval_limit,
selected_instances_file=self.metadata.selected_instances_file,
)
instances: List[EvalInstance] = []
for _, row in df.iterrows():
inst_id = str(row["instance_id"])
instances.append(EvalInstance(id=inst_id, data=row.to_dict()))
logger.info("Total instances to process: %d", len(instances))
return instancesPrompt
I have access to a python code repository in the directory {{ instance.repo_path }} . You can explore and modify files using the available tools. Consider the following issue description:
<issue_description>
{{ instance.problem_statement }}
</issue_description>
Can you help me implement the necessary changes to the repository so that the requirements specified in the <issue_description> are met?
I've already taken care of all changes to any of the test files described in the <issue_description>. This means you DON'T have to modify the testing logic or any of the tests in any way!
Also the development Python environment is already set up for you (i.e., all dependencies already installed), so you don't need to install other packages.
Your task is to make the minimal changes to non-test files in the {{ instance.repo_path }} directory to ensure the <issue_description> is satisfied.
Follow these phases to resolve the issue:
Phase 1. READING: read the problem and reword it in clearer terms
1.1 If there are code or config snippets. Express in words any best practices or conventions in them.
1.2 Hightlight message errors, method names, variables, file names, stack traces, and technical details.
1.3 Explain the problem in clear terms.
1.4 Enumerate the steps to reproduce the problem.
1.5 Hightlight any best practices to take into account when testing and fixing the issue
Phase 2. RUNNING: install and run the tests on the repository
2.1 Activate the environment by running
./opt/miniconda3/etc/profile.d/conda.sh ; conda activate testbed
2.2 Follow the readme
2.3 Install the environment and anything needed
2.4 Iterate and figure out how to run the tests
Phase 3. EXPLORATION: find the files that are related to the problem and possible solutions
3.1 Use `grep` to search for relevant methods, classes, keywords and error messages.
3.2 Identify all files related to the problem statement.
3.3 Propose the methods and files to fix the issue and explain why.
3.4 From the possible file locations, select the most likely location to fix the issue.
Phase 4. TEST CREATION: before implementing any fix, create a script to reproduce and verify the issue.
4.1 Look at existing test files in the repository to understand the test format/structure.
4.2 Create a minimal reproduction script that reproduces the located issue.
4.3 Run the reproduction script to confirm you are reproducing the issue.
4.4 Adjust the reproduction script as necessary.
Phase 5. FIX ANALYSIS: state clearly the problem and how to fix it
5.1 State clearly what the problem is.
5.2 State clearly where the problem is located.
5.3 State clearly how the test reproduces the issue.
5.4 State clearly the best practices to take into account in the fix.
5.5 State clearly how to fix the problem.
Phase 6. FIX IMPLEMENTATION: Edit the source code to implement your chosen solution.
6.1 Make minimal, focused changes to fix the issue.
Phase 7. VERIFICATION: Test your implementation thoroughly.
7.1 Run your reproduction script to verify the fix works.
7.2 Add edge cases to your test script to ensure comprehensive coverage.
7.3 Run existing tests related to the modified code to ensure you haven't broken anything.
8. FINAL REVIEW: Carefully re-read the problem description and compare your changes with the base commit {{ instance.base_commit }}.
8.1 Ensure you've fully addressed all requirements.
8.2 Run any tests in the repository related to:
8.2.1 The issue you are fixing
8.2.2 The files you modified
8.2.3 The functions you changed
8.3 If any tests fail, revise your implementation until all tests pass
Be thorough in your exploration, testing, and reasoning. It's fine if your thinking process is lengthy - quality and completeness are more important than brevity.准备环境 (SWEBenchEvaluation.prepare_workspace)
参考代码
核心流程
- 拉取instance官方docker镜像
- 构建openhands agent运行镜像,基于官方镜像做的一些补充。
主流程(prepare_workerspace)
# ---- Hook: prepare a workspace per instance ----------------------------------
def prepare_workspace(
self,
instance: EvalInstance,
resource_factor: int = 1,
forward_env: list[str] | None = None,
) -> RemoteWorkspace:
"""
Use DockerWorkspace by default.
Args:
instance: The evaluation instance to prepare workspace for.
resource_factor: Resource factor for runtime allocation (default: 1).
Higher values allocate more CPU/memory resources.
Used by APIRemoteWorkspace for remote runtime allocation.
"""
# instance的官方docker镜像
official_docker_image = get_official_docker_image(instance.id)
build_target = "source-minimal"
custom_tag = extract_custom_tag(official_docker_image)
# For non-binary targets, append target suffix
suffix = f"-{build_target}" if build_target != "binary" else ""
base_agent_image = (
f"{EVAL_AGENT_SERVER_IMAGE}:{SDK_SHORT_SHA}-{custom_tag}{suffix}"
)
wrap_needed = should_wrap_instance_id(instance.id)
agent_server_image = base_agent_image
if self.metadata.workspace_type == "docker":
SKIP_BUILD = os.getenv("SKIP_BUILD", "1").lower() in ("1", "true", "yes")
logger.info(f"SKIP_BUILD={SKIP_BUILD}")
if not SKIP_BUILD:
logger.info(
f"Building workspace from {official_docker_image} "
f"for instance {instance.id}. "
"This may take a while...\n"
"You can run benchmarks/swebench/build_images.py and set "
"SWE_BENCH_SKIP_BUILD=1 to skip building and use pre-built "
"agent-server image."
)
# 构建镜像
output = build_image(
base_image=official_docker_image,
target_image=EVAL_AGENT_SERVER_IMAGE,
custom_tag=custom_tag,
target=build_target,
push=False,
)
logger.info(f"Image build output: {output}")
assert output.error is None, f"Image build failed: {output.error}"
if base_agent_image not in output.tags:
raise RuntimeError(
f"Built image tags {output.tags} do not include expected tag "
f"{base_agent_image}"
)
if wrap_needed:
wrapped_result = wrap_image(base_agent_image, push=False)
if wrapped_result.error:
raise RuntimeError(
"Wrapped image build failed: "
f"{wrapped_result.error}; log={wrapped_result.log_path}"
)
workspace = DockerWorkspace(
server_image=agent_server_image,
working_dir="/workspace",
forward_env=forward_env or [],
)
elif self.metadata.workspace_type == "remote":
runtime_api_key = os.getenv("RUNTIME_API_KEY")
sdk_short_sha = os.getenv("SDK_SHORT_SHA", SDK_SHORT_SHA)
if not runtime_api_key:
raise ValueError(
"RUNTIME_API_KEY environment variable is not set for remote workspace"
)
agent_server_image = (
f"{EVAL_AGENT_SERVER_IMAGE}:{sdk_short_sha}-{custom_tag}{suffix}"
)
if not image_exists(agent_server_image):
raise RuntimeError(
f"Agent server image {agent_server_image} does not exist in container registry, "
"make sure to build, push it, and make it public accessible before using remote workspace."
)
logger.info(
f"Using remote workspace with image {agent_server_image} "
f"(sdk sha: {sdk_short_sha}, resource_factor: {resource_factor})"
)
startup_timeout = float(os.getenv("REMOTE_RUNTIME_STARTUP_TIMEOUT", "600"))
workspace = APIRemoteWorkspace(
runtime_api_url=os.getenv(
"RUNTIME_API_URL", "https://runtime.eval.all-hands.dev"
),
runtime_api_key=runtime_api_key,
server_image=agent_server_image,
target_type="source" if "source" in build_target else "binary",
forward_env=forward_env or [],
resource_factor=resource_factor,
init_timeout=startup_timeout,
startup_wait_timeout=startup_timeout,
)
else:
raise ValueError(
f"Unsupported workspace_type: {self.metadata.workspace_type}"
)
for cmd in self.metadata.env_setup_commands or []:
res = workspace.execute_command(cmd)
if res.exit_code != 0:
raise RuntimeError(
f"Failed to run env setup command '{cmd}': {res.stderr}"
)
logger.debug(f"Ran env setup command '{cmd}': {res.stdout}")
return workspace官方Instance Docker镜像
def get_official_docker_image(
instance_id: str,
docker_image_prefix="docker.io/swebench/",
) -> str:
# Official SWE-Bench image
# swebench/sweb.eval.x86_64.django_1776_django-11333:v1
repo, name = instance_id.split("__")
official_image_name = docker_image_prefix.rstrip("/")
official_image_name += f"/sweb.eval.x86_64.{repo}_1776_{name}:latest".lower()
logger.debug(f"Official SWE-Bench image: {official_image_name}")
return official_image_namebuild_image 构建运行时镜像主函数
build_image 主函数
def build_image(
base_image: str,
target_image: str,
custom_tag: str,
target: TargetType = "source-minimal",
push: bool = False,
) -> BuildOutput:
# Get SDK info from submodule to ensure tags use the correct SDK SHA
git_ref, git_sha, sdk_version = _get_sdk_submodule_info()
opts = BuildOptions(
base_image=base_image,
custom_tags=custom_tag,
image=target_image,
target=target,
# SWE-Bench only supports linux/amd64 images
platforms=["linux/amd64"],
push=push,
# Override git info to use SDK submodule info instead of benchmarks repo
git_ref=git_ref,
git_sha=git_sha,
sdk_version=sdk_version,
)
for t in opts.all_tags:
# Check if image exists or not
if image_exists(t):
logger.info("Image %s already exists. Skipping build.", t)
return BuildOutput(base_image=base_image, tags=[t], error=None)
tags = build(opts)
return BuildOutput(base_image=base_image, tags=tags, error=None)
class BuildOutput(BaseModel):
time: str = Field(default_factory=lambda: datetime.now(UTC).isoformat())
base_image: str
tags: list[str]
error: str | None = None
log_path: str | None = None获取openhands sdk 模块信息
def _get_sdk_submodule_info() -> tuple[str, str, str]:
"""
Get SDK version info from the vendor/software-agent-sdk submodule.
Returns:
tuple[str, str, str]: (git_ref, git_sha, sdk_version)
"""
# Find the benchmarks repo root (where this file lives)
benchmarks_root = Path(__file__).resolve().parent.parent.parent
sdk_path = benchmarks_root / "vendor" / "software-agent-sdk"
# Get submodule SHA directly from the checked-out submodule
# This is more direct than parsing git submodule status output
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=sdk_path,
capture_output=True,
text=True,
check=True,
)
git_sha = result.stdout.strip()
except subprocess.CalledProcessError:
logger.warning(
"Failed to get SDK submodule SHA, using 'unknown'. "
"Make sure submodules are initialized."
)
git_sha = "unknown"
# Get submodule ref (current branch or HEAD)
try:
result = subprocess.run(
["git", "symbolic-ref", "-q", "--short", "HEAD"],
cwd=sdk_path,
capture_output=True,
text=True,
check=True,
)
git_ref = result.stdout.strip()
except subprocess.CalledProcessError:
git_ref = "unknown"
# Get SDK version from pyproject.toml
pyproject_path = sdk_path / "openhands-sdk" / "pyproject.toml"
sdk_version = "unknown"
try:
if pyproject_path.exists():
with pyproject_path.open("rb") as f:
config = tomllib.load(f)
sdk_version = config.get("project", {}).get("version", "unknown")
except Exception as e:
logger.warning(f"Failed to read SDK version from pyproject.toml: {e}")
logger.info(
f"SDK submodule info: ref={git_ref}, sha={git_sha[:7]}, version={sdk_version}"
)
return git_ref, git_sha, sdk_version 构建openhands-scaffold运行时镜像
基于base_image构建openhands作为scaffold的instance运行时镜像
def build(opts: BuildOptions) -> list[str]:
"""Single entry point for building the agent-server image."""
dockerfile_path = _get_dockerfile_path(opts.sdk_project_root)
push = opts.push
if push is None:
push = IN_CI
tags = opts.all_tags
cache_tag, cache_tag_base = opts.cache_tags
ctx = _make_build_context(opts.sdk_project_root)
logger.info(f"[build] Clean build context: {ctx}")
# docker 构建参数
args = [
"docker",
"buildx",
"build",
"--file",
str(dockerfile_path),
"--target",
opts.target,
"--build-arg",
f"BASE_IMAGE={opts.base_image}",
]
if push:
args += ["--platform", ",".join(opts.platforms), "--push"]
else:
args += ["--load"]
for t in tags:
args += ["--tag", t]
# -------- cache strategy --------
driver = _active_buildx_driver() or "unknown"
local_cache_dir = _default_local_cache_dir()
cache_args: list[str] = []
if push:
# Remote/CI builds: use registry cache + inline for maximum reuse.
cache_args += [
"--cache-from",
f"type=registry,ref={opts.image}:{cache_tag}",
"--cache-from",
f"type=registry,ref={opts.image}:{cache_tag_base}-main",
"--cache-to",
f"type=registry,ref={opts.image}:{cache_tag},mode=max",
]
logger.info("[build] Cache: registry (remote/CI) + inline")
else:
# Local/dev builds: prefer local dir cache if
# driver supports it; otherwise inline-only.
if driver == "docker-container":
local_cache_dir.mkdir(parents=True, exist_ok=True)
cache_args += [
"--cache-from",
f"type=local,src={str(local_cache_dir)}",
"--cache-to",
f"type=local,dest={str(local_cache_dir)},mode=max",
]
logger.info(
f"[build] Cache: local dir at {local_cache_dir} (driver={driver})"
)
else:
logger.warning(
f"[build] WARNING: Active buildx driver is '{driver}', "
"which does not support local dir caching. Fallback to INLINE CACHE\n"
" Consider running the following commands to set up a "
"compatible buildx environment:\n"
" 1. docker buildx create --name openhands-builder "
"--driver docker-container --use\n"
" 2. docker buildx inspect --bootstrap\n"
)
# docker driver can't export caches; fall back to inline metadata only.
cache_args += ["--build-arg", "BUILDKIT_INLINE_CACHE=1"]
logger.info(f"[build] Cache: inline only (driver={driver})")
# 拼接缓存参数
args += cache_args + [str(ctx)]
logger.info(
f"[build] Building target='{opts.target}' image='{opts.image}' "
f"custom_tags='{opts.custom_tags}' from base='{opts.base_image}' "
f"for platforms='{opts.platforms if push else 'local-arch'}'"
)
logger.info(
f"[build] Git ref='{opts.git_ref}' sha='{opts.git_sha}' "
f"package_version='{opts.sdk_version}'"
)
logger.info(f"[build] Cache tag: {cache_tag}")
try:
# 真正基于args去构建镜像
res = _run(args, cwd=str(ctx))
sys.stdout.write(res.stdout or "")
except subprocess.CalledProcessError as e:
logger.error(f"[build] ERROR: Build failed with exit code {e.returncode}")
logger.error(f"[build] Command: {' '.join(e.cmd)}")
logger.error(f"[build] Full stdout:\n{e.output}")
logger.error(f"[build] Full stderr:\n{e.stderr}")
raise
finally:
logger.info(f"[build] Cleaning {ctx}")
shutil.rmtree(ctx, ignore_errors=True)
logger.info("[build] Done. Tags:")
for t in tags:
logger.info(f" - {t}")
return tags执行初始化环境命令
核心目的
- 从
metadata中获取env_setup_commands - 在
workspace里,一条一条地运行命令。 - 如果某一条
命令失败,则会抛出错误,终止。
示例
env_setup_commands=["export PIP_CACHE_DIR=~/.cache/pip"],
def prepare_workspace(
self,
instance: EvalInstance,
resource_factor: int = 1,
forward_env: list[str] | None = None,
) -> RemoteWorkspace:
# ---
# ---
# 执行一些初始化环境的命令
for cmd in self.metadata.env_setup_commands or []:
res = workspace.execute_command(cmd)
if res.exit_code != 0:
raise RuntimeError(
f"Failed to run env setup command '{cmd}': {res.stderr}"
)
logger.debug(f"Ran env setup command '{cmd}': {res.stdout}")
return workspace执行Instance
Agent交互(SWEBenchEvaluation.evaluate_instance)
核心流程
- 准备3个工具:FileEditor, Terminal, TaskTracker
- 初始化agent:cli_mode,命令行模式
- 依照instance去生成repo_path,把testbed里的代码复制一份到repo_path,并做git还原,确保无更改。
- 使用repo_path作为当前work_dir
- 根据instance、workspace、metadata获取instruction
- 构建conversation对象,进行agent循环
- 执行完成后,手动做git add 和 git commit,生成git patch
- 保存结果
def evaluate_instance(
self, instance: EvalInstance, workspace: RemoteWorkspace
) -> EvalOutput:
"""
Create conversation, run agent, collect history and git patch.
Do not write files here; just return EvalOutput.
"""
# 工具
tools = get_default_tools(
# Disable browser tools in CLI mode
enable_browser=False,
)
# Agent
agent = Agent(
llm=self.metadata.llm,
tools=tools,
system_prompt_kwargs={"cli_mode": True},
# TODO: we can enable condenser and security analyzer later
# and have them configurable via EvalMetadata
# condenser=get_default_condenser(
# llm=self.metadata.llm.model_copy(update={"service_id": "condenser"})
# ),
# security_analyzer=LLMSecurityAnalyzer(),
)
assert isinstance(workspace, RemoteWorkspace)
# repo 路径
repo_path = f"/workspace/{instance.data['repo'].split('/')[-1]}/"
instance.data["repo_path"] = repo_path
persist_callback = build_event_persistence_callback(
run_id=self.metadata.eval_output_dir,
instance_id=instance.id,
attempt=self.current_attempt,
)
# 构建一个conversation 对象
conversation = Conversation(
agent=agent,
workspace=workspace,
callbacks=[persist_callback],
max_iteration_per_run=self.metadata.max_iterations,
)
logger.info("repo_path: %s", repo_path)
# cp 一份全新的代码到repo_path去,作为工作空间
cp_testebed_repo = workspace.execute_command(
(f"mkdir -p {repo_path} ; cp -r /testbed/. {repo_path}")
)
assert cp_testebed_repo.exit_code == 0, (
f"cp_testebed_repo failed: {cp_testebed_repo.stderr}"
)
# git reset,确保无修改
git_reset = workspace.execute_command(f"cd {repo_path} ; git reset --hard")
assert git_reset.exit_code == 0, f"git reset failed: {git_reset.stderr}"
# 获得prompt
instruction = get_instruction(
instance=instance.data,
metadata=self.metadata,
workspace_path=workspace.working_dir,
)
conversation.send_message(instruction)
# Run conversation with fake user responses to handle agent messages
run_conversation_with_fake_user_response(conversation)
# 手动 git add
workspace.execute_command(f"cd {repo_path} ; git add -A")
# 手动 git commit
workspace.execute_command(
f"cd {repo_path} && "
"git config --global user.email 'evaluation@openhands.dev' && "
"git config --global user.name 'OpenHands Evaluation' && "
"git commit -m 'patch'"
)
# 生成 git patch
base_commit = instance.data["base_commit"]
git_patch_result = workspace.execute_command(
(f"cd {repo_path} ; git --no-pager diff --no-color {base_commit} HEAD")
)
assert git_patch_result.exit_code == 0, (
f"git diff failed: {git_patch_result.stderr}"
)
# 获得git_patch
git_patch = git_patch_result.stdout
# EvalOutput is your model; keep fields consistent with prior JSONL
out = EvalOutput(
instance_id=instance.id,
attempt=self.current_attempt,
test_result={
"git_patch": git_patch,
},
instruction=instruction,
error=None,
history=list(conversation.state.events),
metrics=conversation.conversation_stats.get_combined_metrics(),
)
return out分数评估
主函数
def main() -> None:
"""Main entry point for the script."""
parser = argparse.ArgumentParser(
description="Convert OpenHands output to SWE-Bench format and run evaluation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
uv run swebench-eval output.jsonl
uv run swebench-eval /path/to/output.jsonl --dataset princeton-nlp/SWE-bench_Lite
uv run swebench-eval output.jsonl --model-name "MyModel-v1.0"
""",
)
parser.add_argument("input_file", help="Path to the OpenHands output.jsonl file")
parser.add_argument(
"--dataset",
default="princeton-nlp/SWE-bench_Verified",
help="SWE-Bench dataset to evaluate against "
"(default: princeton-nlp/SWE-bench_Verified)",
)
parser.add_argument(
"--output-file",
help="Output file for SWE-Bench format "
"(default: input_file with .swebench.jsonl extension)",
)
parser.add_argument(
"--skip-evaluation",
action="store_true",
help="Only convert format, skip running evaluation",
)
parser.add_argument(
"--model-name",
default="openhands",
help="Model name to use in the model_name_or_path field (default: openhands)",
)
parser.add_argument(
"--workers",
default="12",
help="Number of workers to use when evaluating",
)
args = parser.parse_args()
# Validate input file
input_file = Path(args.input_file)
if not input_file.exists():
logger.error(f"Input file does not exist: {input_file}")
sys.exit(1)
if not input_file.suffix == ".jsonl":
logger.warning(f"Input file does not have .jsonl extension: {input_file}")
# Determine output file
if args.output_file:
output_file = Path(args.output_file)
else:
output_file = input_file.with_suffix(".swebench.jsonl")
logger.info(f"Input file: {input_file}")
logger.info(f"Output file: {output_file}")
logger.info(f"Dataset: {args.dataset}")
logger.info(f"Model name: {args.model_name}")
try:
# Convert format
convert_to_swebench_format(str(input_file), str(output_file), args.model_name)
if not args.skip_evaluation:
# Run evaluation
run_swebench_evaluation(str(output_file), args.dataset, args.workers)
# Move report file to input file directory with .report.json extension
# SWE-Bench creates: {model_name.replace("/", "__")}.eval_{output_file.stem}.json
report_filename = (
f"{args.model_name.replace('/', '__')}.eval_{output_file.stem}.json"
)
report_path = output_file.parent / report_filename
dest_report_path = input_file.with_suffix(".report.json")
shutil.move(str(report_path), str(dest_report_path))
logger.info(f"Moved report file to: {dest_report_path}")
# Update Laminar datapoints with evaluation scores
LaminarService.get().update_evaluation_scores(
str(input_file), str(dest_report_path)
)
# Generate cost report as final step
generate_cost_report(str(input_file))
logger.info("Script completed successfully!")
except Exception as e:
logger.error(f"Script failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()格式转换
def convert_to_swebench_format(
input_file: str, output_file: str, model_name: str = "OpenHands"
) -> None:
"""
Convert OpenHands output.jsonl to SWE-Bench prediction format.
OpenHands format:
{
"instance_id": "django__django-11333",
"test_result": {
"git_patch": "diff --git a/file.py b/file.py\n..."
},
"instruction": "...",
"error": null,
"history": [...]
}
SWE-Bench format:
{
"instance_id": "django__django-11333",
"model_patch": "diff --git a/file.py b/file.py\n...",
"model_name_or_path": "OpenHands"
}
"""
logger.info(f"Converting {input_file} to SWE-Bench format: {output_file}")
converted_count = 0
error_count = 0
with open(input_file, "r") as infile, open(output_file, "w") as outfile:
for line_num, line in enumerate(infile, 1):
try:
line = line.strip()
if not line:
continue
data = json.loads(line)
# Extract required fields
instance_id = data.get("instance_id")
if not instance_id:
logger.warning(f"Line {line_num}: Missing instance_id")
error_count += 1
continue
# Extract git_patch from test_result
test_result = data.get("test_result", {})
git_patch = test_result.get("git_patch", "")
if not git_patch:
logger.warning(
f"Line {line_num}: Missing or empty git_patch for {instance_id}"
)
# Still create entry with empty patch
git_patch = ""
# postprocess git_patch
setup_files = ["pyproject.toml", "tox.ini", "setup.py"]
git_patch = remove_files_from_patch(git_patch, setup_files)
# Create SWE-Bench format entry
swebench_entry = {
"instance_id": instance_id,
"model_patch": git_patch,
"model_name_or_path": model_name,
}
# Write to output file
outfile.write(json.dumps(swebench_entry) + "\n")
converted_count += 1
except json.JSONDecodeError as e:
logger.error(f"Line {line_num}: Invalid JSON - {e}")
error_count += 1
except Exception as e:
logger.error(f"Line {line_num}: Unexpected error - {e}")
error_count += 1
logger.info(
f"Conversion complete: {converted_count} entries converted, "
f"{error_count} errors"
)
if converted_count == 0:
raise ValueError("No valid entries were converted")def run_swebench_evaluation(
predictions_file: str,
dataset: str = "princeton-nlp/SWE-bench_Verified",
workers: str = "12",
) -> None:
"""
Run SWE-Bench evaluation on the predictions file.
Args:
predictions_file: Path to the SWE-Bench format predictions file
dataset: SWE-Bench dataset to evaluate against
workers: Number of workers to use for evaluation
"""
logger.info(f"Running SWE-Bench evaluation on {predictions_file}")
try:
# Get the directory of the predictions file
predictions_path = Path(predictions_file)
predictions_dir = predictions_path.parent
predictions_filename = predictions_path.name
# Run SWE-Bench evaluation using global python (not UV environment)
# since swebench is installed globally
cmd = [
"uv",
"run",
"python",
"-m",
"swebench.harness.run_evaluation",
"--dataset_name",
dataset,
"--predictions_path",
predictions_filename,
"--max_workers",
str(workers),
"--run_id",
f"eval_{predictions_path.stem}",
]
logger.info(f"Running command: {' '.join(cmd)}")
logger.info(f"Working directory: {predictions_dir}")
logger.info("SWE-Bench evaluation output:")
print("-" * 80)
# Stream output directly to console, running from predictions file directory
result = subprocess.run(cmd, text=True, cwd=predictions_dir)
print("-" * 80)
if result.returncode == 0:
logger.info("SWE-Bench evaluation completed successfully")
else:
logger.error(
f"SWE-Bench evaluation failed with return code {result.returncode}"
)
raise subprocess.CalledProcessError(result.returncode, cmd)
except FileNotFoundError:
logger.error(
"SWE-Bench evaluation command not found. "
"Make sure SWE-Bench is properly installed."
)
raise
except Exception as e:
logger.error(f"Error running SWE-Bench evaluation: {e}")
raise工具概览(文件编辑+命令行+TaskTracker)
def get_default_tools(
enable_browser: bool = True,
) -> list[Tool]:
"""Get the default set of tool specifications for the standard experience.
Args:
enable_browser: Whether to include browser tools.
"""
register_default_tools(enable_browser=enable_browser)
# Import tools to access their name attributes
from openhands.tools.file_editor import FileEditorTool
from openhands.tools.task_tracker import TaskTrackerTool
from openhands.tools.terminal import TerminalTool
tools = [
Tool(name=TerminalTool.name),
Tool(name=FileEditorTool.name),
Tool(name=TaskTrackerTool.name),
]
if enable_browser:
from openhands.tools.browser_use import BrowserToolSet
tools.append(Tool(name=BrowserToolSet.name))
return toolsOpenHands Tools
Base 定义
Base Action
class Action(Schema, ABC):
"""Base schema for input action."""
@property
def visualize(self) -> Text:
"""Return Rich Text representation of this action.
This method can be overridden by subclasses to customize visualization.
The base implementation displays all action fields systematically.
"""
content = Text()
# Display action name
action_name = self.__class__.__name__
content.append("Action: ", style="bold")
content.append(action_name)
content.append("\n\n")
# Display all action fields systematically
content.append("Arguments:", style="bold")
action_fields = self.model_dump()
content.append(display_dict(action_fields))
return contentBase Observation
属性
ERROR_MESSAGE_HEADER:出错时的补充信息content:list,元素可是文本或图像。is_error:是否出错
text()
- 把content list里的文本内容,拼接在一起
to_llm_content()
- 把content转换成llm见的格式。出错时,对相关内容前面加一个
ERROR_MESSAGE_HEADER
class Observation(Schema, ABC):
"""Base schema for output observation."""
ERROR_MESSAGE_HEADER: ClassVar[str] = "[An error occurred during execution.]\n"
content: list[TextContent | ImageContent] = Field(
default_factory=list,
description=(
"Content returned from the tool as a list of "
"TextContent/ImageContent objects. "
"When there is an error, it should be written in this field."
),
)
is_error: bool = Field(
default=False, description="Whether the observation indicates an error"
)
@classmethod
def from_text(
cls,
text: str,
is_error: bool = False,
**kwargs: Any,
) -> "Self":
"""Utility to create an Observation from a simple text string.
Args:
text: The text content to include in the observation.
is_error: Whether this observation represents an error.
**kwargs: Additional fields for the observation subclass.
Returns:
An Observation instance with the text wrapped in a TextContent.
"""
return cls(content=[TextContent(text=text)], is_error=is_error, **kwargs)
@property
def text(self) -> str:
"""Extract all text content from the observation.
Returns:
Concatenated text from all TextContent items in content.
"""
return "".join(
item.text for item in self.content if isinstance(item, TextContent)
)
@property
def to_llm_content(self) -> Sequence[TextContent | ImageContent]:
"""
Default content formatting for converting observation to LLM readable content.
Subclasses can override to provide richer content (e.g., images, diffs).
"""
llm_content: list[TextContent | ImageContent] = []
# If is_error is true, prepend error message
if self.is_error:
llm_content.append(TextContent(text=self.ERROR_MESSAGE_HEADER))
# Add content (now always a list)
llm_content.extend(self.content)
return llm_content
@property
def visualize(self) -> Text:
"""Return Rich Text representation of this observation.
Subclasses can override for custom visualization; by default we show the
same text that would be sent to the LLM.
"""
text = Text()
if self.is_error:
text.append("❌ ", style="red bold")
text.append(self.ERROR_MESSAGE_HEADER, style="bold red")
text_parts = content_to_str(self.to_llm_content)
if text_parts:
full_content = "".join(text_parts)
text.append(full_content)
else:
text.append("[no text content]")
return textFileEditor Tool
代码地址
- openhands-tools/openhands/tools/file_editor/definition.py
- openhands-tools/openhands/tools/file_editor/impl.py
- openhands-tools/openhands/tools/file_editor/editor.py
三层架构
- 接口层:
FileEditorTool - 逻辑层:
FileEditorExecutor - 底层实现层:
FileEditor
核心命令
- View
- create
- Str_replace
- insert
- undo_edit
FileEditorAction
关键属性
- command:执行的命令,只能有5个
view,create,str_replace,insert,undo_edit
- path:文件路径
不同命令,需要不同的参数
- create:
file_text,文件的内容 - str_replace:
old_str,new_str,需要改变的新旧字符串 - insert:
insert_line,需要插入的行 - view:
view_range:需要查看的行数
class FileEditorAction(Action):
"""Schema for file editor operations."""
# 命令
command: CommandLiteral = Field(
description="The commands to run. Allowed options are: `view`, `create`, "
"`str_replace`, `insert`, `undo_edit`."
)
# 文件路径
path: str = Field(description="Absolute path to file or directory.")
file_text: str | None = Field(
default=None,
description="Required parameter of `create` command, with the content of "
"the file to be created.",
)
old_str: str | None = Field(
default=None,
description="Required parameter of `str_replace` command containing the "
"string in `path` to replace.",
)
new_str: str | None = Field(
default=None,
description="Optional parameter of `str_replace` command containing the "
"new string (if not given, no string will be added). Required parameter "
"of `insert` command containing the string to insert.",
)
insert_line: int | None = Field(
default=None,
ge=0,
description="Required parameter of `insert` command. The `new_str` will "
"be inserted AFTER the line `insert_line` of `path`.",
)
view_range: list[int] | None = Field(
default=None,
description="Optional parameter of `view` command when `path` points to a "
"file. If none is given, the full file is shown. If provided, the file "
"will be shown in the indicated line number range, e.g. [11, 12] will "
"show lines 11 and 12. Indexing at 1 to start. Setting `[start_line, "
"-1]` shows all lines from `start_line` to the end of the file.",
)FileEditorObservation
属性
- command, path, old_content, new_content等
方法
visualize:_has_meaningful_diff
class FileEditorObservation(Observation):
"""A ToolResult that can be rendered as a CLI output."""
command: CommandLiteral = Field(
description=(
"The command that was run: `view`, `create`, `str_replace`, "
"`insert`, or `undo_edit`."
)
)
path: str | None = Field(default=None, description="The file path that was edited.")
prev_exist: bool = Field(
default=True,
description="Indicates if the file previously existed. If not, it was created.",
)
old_content: str | None = Field(
default=None, description="The content of the file before the edit."
)
new_content: str | None = Field(
default=None, description="The content of the file after the edit."
)
_diff_cache: Text | None = PrivateAttr(default=None)
@property
def visualize(self) -> Text:
"""Return Rich Text representation of this observation.
Shows diff visualization for meaningful changes (file creation, successful
edits), otherwise falls back to agent observation.
"""
text = Text()
if self.is_error:
text.append("❌ ", style="red bold")
text.append(self.ERROR_MESSAGE_HEADER, style="bold red")
if not self._has_meaningful_diff:
return super().visualize
assert self.path is not None, "path should be set for meaningful diff"
# Generate and cache diff visualization
if not self._diff_cache:
change_applied = self.command != "view" and not self.is_error
self._diff_cache = visualize_diff(
self.path,
self.old_content,
self.new_content,
n_context_lines=2,
change_applied=change_applied,
)
# Combine error prefix with diff visualization
text.append(self._diff_cache)
return text
@property
def _has_meaningful_diff(self) -> bool:
"""Check if there's a meaningful diff to display."""
if self.is_error:
return False
if not self.path:
return False
if self.command not in ("create", "str_replace", "insert", "undo_edit"):
return False
# File creation case
if self.command == "create" and self.new_content and not self.prev_exist:
return True
# File modification cases (str_replace, insert, undo_edit)
if self.command in ("str_replace", "insert", "undo_edit"):
# Need both old and new content to show meaningful diff
if self.old_content is not None and self.new_content is not None:
# Only show diff if content actually changed
return self.old_content != self.new_content
return FalseFileEditorTool 定义和描述
工具描述
代码流程
- 根据工作目录
初始化真实的executor - 基础(前2行)和剩下的工具描述,但默认还是完整的
工具描述 - 根据
working_dir来增强工具描述
Command = Literal[
"view",
"create",
"str_replace",
"insert",
"undo_edit",
]
TOOL_DESCRIPTION = """Custom editing tool for viewing, creating and editing files in plain-text format
* State is persistent across command calls and discussions with the user
* If `path` is a text file, `view` displays the result of applying `cat -n`. If `path` is a directory, `view` lists non-hidden files and directories up to 2 levels deep
* The `create` command cannot be used if the specified `path` already exists as a file
* If a `command` generates a long output, it will be truncated and marked with `<response clipped>`
* The `undo_edit` command will revert the last edit made to the file at `path`
* This tool can be used for creating and editing files in plain-text format.
Before using this tool:
1. Use the view tool to understand the file's contents and context
2. Verify the directory path is correct (only applicable when creating new files):
- Use the view tool to verify the parent directory exists and is the correct location
When making edits:
- Ensure the edit results in idiomatic, correct code
- Do not leave the code in a broken state
- Always use absolute file paths (starting with /)
CRITICAL REQUIREMENTS FOR USING THIS TOOL:
1. EXACT MATCHING: The `old_str` parameter must match EXACTLY one or more consecutive lines from the file, including all whitespace and indentation. The tool will fail if `old_str` matches multiple locations or doesn't match exactly with the file content.
2. UNIQUENESS: The `old_str` must uniquely identify a single instance in the file:
- Include sufficient context before and after the change point (3-5 lines recommended)
- If not unique, the replacement will not be performed
3. REPLACEMENT: The `new_str` parameter should contain the edited lines that replace the `old_str`. Both strings must be different.
Remember: when making multiple file edits in a row to the same file, you should prefer to send all edits in a single message with multiple calls to this tool, rather than multiple messages with a single call each.
""" # noqa: E501
class FileEditorTool(ToolDefinition[FileEditorAction, FileEditorObservation]):
"""A ToolDefinition subclass that automatically initializes a FileEditorExecutor."""
@classmethod
def create(
cls,
conv_state: "ConversationState",
) -> Sequence["FileEditorTool"]:
"""Initialize FileEditorTool with a FileEditorExecutor.
Args:
conv_state: Conversation state to get working directory from.
If provided, workspace_root will be taken from
conv_state.workspace
"""
# Import here to avoid circular imports
from openhands.tools.file_editor.impl import FileEditorExecutor
# 根据工作目录初始化真实的executor,
executor = FileEditorExecutor(workspace_root=conv_state.workspace.working_dir)
# Build the tool description with conditional image viewing support
# Split TOOL_DESCRIPTION to insert image viewing line after the second bullet
description_lines = TOOL_DESCRIPTION.split("\n")
# 基础和剩下的工具描述
base_description = "\n".join(description_lines[:2]) # First two lines
remaining_description = "\n".join(description_lines[2:]) # Rest of description
# Add image viewing line if LLM supports vision
if conv_state.agent.llm.vision_is_active():
tool_description = (
f"{base_description}\n"
"* If `path` is an image file (.png, .jpg, .jpeg, .gif, .webp, "
".bmp), `view` displays the image content\n"
f"{remaining_description}"
)
else:
tool_description = TOOL_DESCRIPTION
# Add working directory information to the tool description
# to guide the agent to use the correct directory instead of root
working_dir = conv_state.workspace.working_dir
# 增强描述,working_dir
enhanced_description = (
f"{tool_description}\n\n"
f"Your current working directory is: {working_dir}\n"
f"When exploring project structure, start with this directory "
f"instead of the root filesystem."
)
# Initialize the parent Tool with the executor
return [
cls(
action_type=FileEditorAction,
observation_type=FileEditorObservation,
description=enhanced_description,
annotations=ToolAnnotations(
title="file_editor",
readOnlyHint=False,
destructiveHint=True,
idempotentHint=False,
openWorldHint=False,
),
executor=executor,
)
]
# Automatically register the tool when this module is imported
register_tool(FileEditorTool.name, FileEditorTool)文件编辑器工具使用指南
这是一款用于查看、创建及编辑纯文本文件的自定义工具。
核心特性
状态持久化:在不同的命令调用和用户对话之间,操作状态是持续保留的。
智能查看 (view):
如果路径是文件,view 会显示带行号的内容(类似于执行 cat -n)。
如果路径是目录,view 会列出非隐藏文件和子目录(最多显示 2 层深度)。
创建限制 (create):如果指定路径已存在同名文件,则无法使用 create 命令。
输出截断:如果命令生成的输出过长,系统会自动截断并标记为 <response clipped>。
撤销功能 (undo_edit):该命令会撤销对指定路径文件所做的最后一次修改。
用途:本工具专用于纯文本格式文件的创建与编辑。
使用前的准备工作
先观察后操作:先使用 view 工具了解文件的内容和上下文背景。
验证目录路径(仅适用于创建新文件):
使用 view 工具确认父目录确实存在,且位置正确。
编辑时的准则
代码质量:确保修改后的代码符合惯例且逻辑正确。
完整性:不要让代码处于无法运行或损坏的状态。
绝对路径:始终使用绝对文件路径(以 / 开头)。
⚠️ 至关重要的核心要求 (CRITICAL REQUIREMENTS)
精确匹配 (EXACT MATCHING): old_str 参数必须与文件中的一行或多行完全匹配,包括所有的空格、缩进和换行符。如果 old_str 匹配到多个位置,或者与文件内容有细微差别,操作将执行失败。
唯一性 (UNIQUENESS): old_str 必须能唯一锁定文件中的某个位置:
建议在修改点前后包含足够的上下文(推荐提供 3 到 5 行)。
如果匹配不唯一,系统将不会执行替换操作。
替换逻辑 (REPLACEMENT): new_str 参数应包含替换 old_str 后的编辑行。这两个字符串必须不相同。
温馨提示
当需要对同一个文件连续进行多次编辑时,应优先选择在单条消息中发送多个工具调用,而不是分多条消息、每条只调用一次。FileEditorExecutor (执行包装,对内)
from pathlib import Path
from typing import TYPE_CHECKING
from openhands.sdk.tool import ToolExecutor
if TYPE_CHECKING:
from openhands.sdk.conversation import LocalConversation
from openhands.tools.file_editor.definition import (
CommandLiteral,
FileEditorAction,
FileEditorObservation,
)
from openhands.tools.file_editor.editor import FileEditor
from openhands.tools.file_editor.exceptions import ToolError
# Module-global editor instance (lazily initialized in file_editor)
_GLOBAL_EDITOR: FileEditor | None = None
class FileEditorExecutor(ToolExecutor):
"""File editor executor with configurable file restrictions."""
def __init__(
self,
workspace_root: str | None = None,
allowed_edits_files: list[str] | None = None,
):
self.editor: FileEditor = FileEditor(workspace_root=workspace_root)
self.allowed_edits_files: set[Path] | None = (
{Path(f).resolve() for f in allowed_edits_files}
if allowed_edits_files
else None
)
def __call__(
self,
action: FileEditorAction,
conversation: "LocalConversation | None" = None, # noqa: ARG002
) -> FileEditorObservation:
# Enforce allowed_edits_files restrictions
if self.allowed_edits_files is not None and action.command != "view":
action_path = Path(action.path).resolve()
if action_path not in self.allowed_edits_files:
return FileEditorObservation.from_text(
text=(
f"Operation '{action.command}' is not allowed "
f"on file '{action_path}'. "
f"Only the following files can be edited: "
f"{sorted(str(p) for p in self.allowed_edits_files)}"
),
command=action.command,
is_error=True,
)
result: FileEditorObservation | None = None
try:
result = self.editor(
command=action.command,
path=action.path,
file_text=action.file_text,
view_range=action.view_range,
old_str=action.old_str,
new_str=action.new_str,
insert_line=action.insert_line,
)
except ToolError as e:
result = FileEditorObservation.from_text(
text=e.message, command=action.command, is_error=True
)
assert result is not None, "file_editor should always return a result"
return result
def file_editor(
command: CommandLiteral,
path: str,
file_text: str | None = None,
view_range: list[int] | None = None,
old_str: str | None = None,
new_str: str | None = None,
insert_line: int | None = None,
) -> FileEditorObservation:
"""A global FileEditor instance to be used by the tool."""
global _GLOBAL_EDITOR
if _GLOBAL_EDITOR is None:
_GLOBAL_EDITOR = FileEditor()
result: FileEditorObservation | None = None
try:
result = _GLOBAL_EDITOR(
command=command,
path=path,
file_text=file_text,
view_range=view_range,
old_str=old_str,
new_str=new_str,
insert_line=insert_line,
)
except ToolError as e:
result = FileEditorObservation.from_text(
text=e.message, command=command, is_error=True
)
assert result is not None, "file_editor should always return a result"
return resultFileEditor (真实的底层实现)
import base64
import mimetypes
import os
import re
import shutil
import tempfile
from pathlib import Path
from typing import get_args
from binaryornot.check import is_binary
from openhands.sdk import ImageContent, TextContent
from openhands.sdk.logger import get_logger
from openhands.sdk.utils.truncate import maybe_truncate
from openhands.tools.file_editor.definition import (
CommandLiteral,
FileEditorObservation,
)
from openhands.tools.file_editor.exceptions import (
EditorToolParameterInvalidError,
EditorToolParameterMissingError,
FileValidationError,
ToolError,
)
from openhands.tools.file_editor.utils.config import SNIPPET_CONTEXT_WINDOW
from openhands.tools.file_editor.utils.constants import (
BINARY_FILE_CONTENT_TRUNCATED_NOTICE,
DIRECTORY_CONTENT_TRUNCATED_NOTICE,
MAX_RESPONSE_LEN_CHAR,
TEXT_FILE_CONTENT_TRUNCATED_NOTICE,
)
from openhands.tools.file_editor.utils.encoding import (
EncodingManager,
with_encoding,
)
from openhands.tools.file_editor.utils.history import FileHistoryManager
from openhands.tools.file_editor.utils.shell import run_shell_cmd
logger = get_logger(__name__)
# Supported image extensions for viewing as base64-encoded content
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}
class FileEditor:
"""
An filesystem editor tool that allows the agent to
- view
- create
- navigate
- edit files
The tool parameters are defined by Anthropic and are not editable.
Original implementation: https://github.com/anthropics/anthropic-quickstarts/blob/main/computer-use-demo/computer_use_demo/tools/edit.py
"""
MAX_FILE_SIZE_MB: int = 10 # Maximum file size in MB
_history_manager: FileHistoryManager
_max_file_size: int
_encoding_manager: EncodingManager
_cwd: str
def __init__(
self,
workspace_root: str | None = None,
max_file_size_mb: int | None = None,
):
"""Initialize the editor.
Args:
max_file_size_mb: Maximum file size in MB. If None, uses the default
MAX_FILE_SIZE_MB.
workspace_root: Root directory that serves as the current working
directory for relative path suggestions. Must be an absolute path.
If None, no path suggestions will be provided for relative paths.
"""
self._history_manager = FileHistoryManager(max_history_per_file=10)
self._max_file_size = (
(max_file_size_mb or self.MAX_FILE_SIZE_MB) * 1024 * 1024
) # Convert to bytes
# Initialize encoding manager
self._encoding_manager = EncodingManager()
# Set cwd (current working directory) if workspace_root is provided
if workspace_root is not None:
workspace_path = Path(workspace_root)
# Ensure workspace_root is an absolute path
if not workspace_path.is_absolute():
workspace_path = workspace_path.resolve()
self._cwd = str(workspace_path)
else:
self._cwd = os.path.abspath(os.getcwd())
logger.info(f"FileEditor initialized with cwd: {self._cwd}")
def __call__(
self,
*,
command: CommandLiteral,
path: str,
file_text: str | None = None,
view_range: list[int] | None = None,
old_str: str | None = None,
new_str: str | None = None,
insert_line: int | None = None,
) -> FileEditorObservation:
_path = Path(path)
self.validate_path(command, _path)
if command == "view":
return self.view(_path, view_range)
elif command == "create":
if file_text is None:
raise EditorToolParameterMissingError(command, "file_text")
self.write_file(_path, file_text)
self._history_manager.add_history(_path, file_text)
return FileEditorObservation.from_text(
text=f"File created successfully at: {_path}",
command=command,
path=str(_path),
new_content=file_text,
prev_exist=False,
)
elif command == "str_replace":
if old_str is None:
raise EditorToolParameterMissingError(command, "old_str")
if new_str is None:
raise EditorToolParameterMissingError(command, "new_str")
if new_str == old_str:
raise EditorToolParameterInvalidError(
"new_str",
new_str,
"No replacement was performed. `new_str` and `old_str` must be "
"different.",
)
return self.str_replace(_path, old_str, new_str)
elif command == "insert":
if insert_line is None:
raise EditorToolParameterMissingError(command, "insert_line")
if new_str is None:
raise EditorToolParameterMissingError(command, "new_str")
return self.insert(_path, insert_line, new_str)
elif command == "undo_edit":
return self.undo_edit(_path)
raise ToolError(
f"Unrecognized command {command}. The allowed commands for "
f"{self.__class__.__name__} tool are: {', '.join(get_args(CommandLiteral))}"
)
@with_encoding
def _count_lines(self, path: Path, encoding: str = "utf-8") -> int:
"""
Count the number of lines in a file safely.
Args:
path: Path to the file
encoding: The encoding to use when reading the file (auto-detected by
decorator)
Returns:
The number of lines in the file
"""
with open(path, encoding=encoding) as f:
return sum(1 for _ in f)
@with_encoding
def str_replace(
self,
path: Path,
old_str: str,
new_str: str | None,
) -> FileEditorObservation:
"""
Implement the str_replace command, which replaces old_str with new_str in
the file content.
Args:
path: Path to the file
old_str: String to replace
new_str: Replacement string
enable_linting: Whether to run linting on the changes
encoding: The encoding to use (auto-detected by decorator)
"""
self.validate_file(path)
new_str = new_str or ""
# Read the entire file first to handle both single-line and multi-line
# replacements
file_content = self.read_file(path)
# Find all occurrences using regex
# Escape special regex characters in old_str to match it literally
pattern = re.escape(old_str)
occurrences = [
(
file_content.count("\n", 0, match.start()) + 1, # line number
match.group(), # matched text
match.start(), # start position
)
for match in re.finditer(pattern, file_content)
]
if not occurrences:
# We found no occurrences, possibly because of extra white spaces at
# either the front or back of the string.
# Remove the white spaces and try again.
old_str = old_str.strip()
new_str = new_str.strip()
pattern = re.escape(old_str)
occurrences = [
(
file_content.count("\n", 0, match.start()) + 1, # line number
match.group(), # matched text
match.start(), # start position
)
for match in re.finditer(pattern, file_content)
]
if not occurrences:
raise ToolError(
f"No replacement was performed, old_str `{old_str}` did not "
f"appear verbatim in {path}."
)
if len(occurrences) > 1:
line_numbers = sorted(set(line for line, _, _ in occurrences))
raise ToolError(
f"No replacement was performed. Multiple occurrences of old_str "
f"`{old_str}` in lines {line_numbers}. Please ensure it is unique."
)
# We found exactly one occurrence
replacement_line, matched_text, idx = occurrences[0]
# Create new content by replacing just the matched text
new_file_content = (
file_content[:idx] + new_str + file_content[idx + len(matched_text) :]
)
# Write the new content to the file
self.write_file(path, new_file_content)
# Save the content to history
self._history_manager.add_history(path, file_content)
# Create a snippet of the edited section
start_line = max(0, replacement_line - SNIPPET_CONTEXT_WINDOW)
end_line = replacement_line + SNIPPET_CONTEXT_WINDOW + new_str.count("\n")
# Read just the snippet range
snippet = self.read_file(path, start_line=start_line + 1, end_line=end_line)
# Prepare the success message
success_message = f"The file {path} has been edited. "
success_message += self._make_output(
snippet, f"a snippet of {path}", start_line + 1
)
success_message += (
"Review the changes and make sure they are as expected. Edit the "
"file again if necessary."
)
return FileEditorObservation.from_text(
text=success_message,
command="str_replace",
prev_exist=True,
path=str(path),
old_content=file_content,
new_content=new_file_content,
)
def view(
self, path: Path, view_range: list[int] | None = None
) -> FileEditorObservation:
"""
View the contents of a file or a directory.
"""
if path.is_dir():
if view_range:
raise EditorToolParameterInvalidError(
"view_range",
str(view_range),
"The `view_range` parameter is not allowed when `path` points to "
"a directory.",
)
# First count hidden files/dirs in current directory only
# -mindepth 1 excludes . and .. automatically
_, hidden_stdout, _ = run_shell_cmd(
rf"find -L {path} -mindepth 1 -maxdepth 1 -name '.*'"
)
hidden_count = (
len(hidden_stdout.strip().split("\n")) if hidden_stdout.strip() else 0
)
# Then get files/dirs up to 2 levels deep, excluding hidden entries at
# both depth 1 and 2
_, stdout, stderr = run_shell_cmd(
rf"find -L {path} -maxdepth 2 -not \( -path '{path}/\.*' -o "
rf"-path '{path}/*/\.*' \) | sort",
truncate_notice=DIRECTORY_CONTENT_TRUNCATED_NOTICE,
)
if stderr:
return FileEditorObservation.from_text(
text=stderr,
command="view",
is_error=True,
path=str(path),
prev_exist=True,
)
# Add trailing slashes to directories
paths = stdout.strip().split("\n") if stdout.strip() else []
formatted_paths = []
for p in paths:
if Path(p).is_dir():
formatted_paths.append(f"{p}/")
else:
formatted_paths.append(p)
msg = [
f"Here's the files and directories up to 2 levels deep in {path}, "
"excluding hidden items:\n" + "\n".join(formatted_paths)
]
if hidden_count > 0:
msg.append(
f"\n{hidden_count} hidden files/directories in this directory "
f"are excluded. You can use 'ls -la {path}' to see them."
)
stdout = "\n".join(msg)
return FileEditorObservation.from_text(
text=stdout,
command="view",
path=str(path),
prev_exist=True,
)
# Check if the file is an image
file_extension = path.suffix.lower()
if file_extension in IMAGE_EXTENSIONS:
# Read image file as base64
try:
with open(path, "rb") as f:
image_bytes = f.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
mime_type, _ = mimetypes.guess_type(str(path))
if not mime_type or not mime_type.startswith("image/"):
mime_type = "image/png"
output_msg = (
f"Image file {path} read successfully. Displaying image content."
)
image_url = f"data:{mime_type};base64,{image_base64}"
return FileEditorObservation(
command="view",
content=[
TextContent(text=output_msg),
ImageContent(image_urls=[image_url]),
],
path=str(path),
prev_exist=True,
)
except Exception as e:
raise ToolError(f"Failed to read image file {path}: {e}") from None
# Validate file and count lines
self.validate_file(path)
try:
num_lines = self._count_lines(path)
except UnicodeDecodeError as e:
raise ToolError(
f"Cannot view {path}: file contains binary content that cannot be "
f"decoded as text. Error: {e}"
) from None
start_line = 1
if not view_range:
file_content = self.read_file(path)
output = self._make_output(file_content, str(path), start_line)
return FileEditorObservation.from_text(
text=output,
command="view",
path=str(path),
prev_exist=True,
)
if len(view_range) != 2 or not all(isinstance(i, int) for i in view_range):
raise EditorToolParameterInvalidError(
"view_range",
str(view_range),
"It should be a list of two integers.",
)
start_line, end_line = view_range
if start_line < 1 or start_line > num_lines:
raise EditorToolParameterInvalidError(
"view_range",
str(view_range),
f"Its first element `{start_line}` should be within the range of "
f"lines of the file: {[1, num_lines]}.",
)
# Normalize end_line and provide a warning if it exceeds file length
warning_message: str | None = None
if end_line == -1:
end_line = num_lines
elif end_line > num_lines:
warning_message = (
f"We only show up to {num_lines} since there're only {num_lines} "
"lines in this file."
)
end_line = num_lines
if end_line < start_line:
raise EditorToolParameterInvalidError(
"view_range",
str(view_range),
f"Its second element `{end_line}` should be greater than or equal "
f"to the first element `{start_line}`.",
)
file_content = self.read_file(path, start_line=start_line, end_line=end_line)
# Get the detected encoding
output = self._make_output(
"\n".join(file_content.splitlines()), str(path), start_line
) # Remove extra newlines
# Prepend warning if we truncated the end_line
if warning_message:
output = f"NOTE: {warning_message}\n{output}"
return FileEditorObservation.from_text(
text=output,
command="view",
path=str(path),
prev_exist=True,
)
@with_encoding
def write_file(self, path: Path, file_text: str, encoding: str = "utf-8") -> None:
"""
Write the content of a file to a given path; raise a ToolError if an
error occurs.
Args:
path: Path to the file to write
file_text: Content to write to the file
encoding: The encoding to use when writing the file (auto-detected by
decorator)
"""
self.validate_file(path)
try:
# Use open with encoding instead of path.write_text
with open(path, "w", encoding=encoding) as f:
f.write(file_text)
except Exception as e:
raise ToolError(f"Ran into {e} while trying to write to {path}") from None
@with_encoding
def insert(
self,
path: Path,
insert_line: int,
new_str: str,
encoding: str = "utf-8",
) -> FileEditorObservation:
"""
Implement the insert command, which inserts new_str at the specified line
in the file content.
Args:
path: Path to the file
insert_line: Line number where to insert the new content
new_str: Content to insert
enable_linting: Whether to run linting on the changes
encoding: The encoding to use (auto-detected by decorator)
"""
# Validate file and count lines
self.validate_file(path)
num_lines = self._count_lines(path)
if insert_line < 0 or insert_line > num_lines:
raise EditorToolParameterInvalidError(
"insert_line",
str(insert_line),
f"It should be within the range of allowed values: {[0, num_lines]}",
)
new_str_lines = new_str.split("\n")
# Create temporary file for the new content
with tempfile.NamedTemporaryFile(
mode="w", encoding=encoding, delete=False
) as temp_file:
# Copy lines before insert point and save them for history
history_lines = []
with open(path, encoding=encoding) as f:
for i, line in enumerate(f, 1):
if i > insert_line:
break
temp_file.write(line)
history_lines.append(line)
# Insert new content
for line in new_str_lines:
temp_file.write(line + "\n")
# Copy remaining lines and save them for history
with open(path, encoding=encoding) as f:
for i, line in enumerate(f, 1):
if i <= insert_line:
continue
temp_file.write(line)
history_lines.append(line)
# Move temporary file to original location
shutil.move(temp_file.name, path)
# Read just the snippet range
start_line = max(0, insert_line - SNIPPET_CONTEXT_WINDOW)
end_line = min(
num_lines + len(new_str_lines),
insert_line + SNIPPET_CONTEXT_WINDOW + len(new_str_lines),
)
snippet = self.read_file(path, start_line=start_line + 1, end_line=end_line)
# Save history - we already have the lines in memory
file_text = "".join(history_lines)
self._history_manager.add_history(path, file_text)
# Read new content for result
new_file_text = self.read_file(path)
success_message = f"The file {path} has been edited. "
success_message += self._make_output(
snippet,
"a snippet of the edited file",
max(1, insert_line - SNIPPET_CONTEXT_WINDOW + 1),
)
success_message += (
"Review the changes and make sure they are as expected (correct "
"indentation, no duplicate lines, etc). Edit the file again if necessary."
)
return FileEditorObservation.from_text(
text=success_message,
command="insert",
prev_exist=True,
path=str(path),
old_content=file_text,
new_content=new_file_text,
)
def validate_path(self, command: CommandLiteral, path: Path) -> None:
"""
Check that the path/command combination is valid.
Validates:
1. Path is absolute
2. Path and command are compatible
"""
# Check if its an absolute path
if not path.is_absolute():
suggestion_message = (
"The path should be an absolute path, starting with `/`."
)
# Only suggest the absolute path if cwd is provided and the path exists
if self._cwd is not None:
suggested_path = self._cwd / path
if suggested_path.exists():
suggestion_message += f" Maybe you meant {suggested_path}?"
raise EditorToolParameterInvalidError(
"path",
str(path),
suggestion_message,
)
# Check if path and command are compatible
if command == "create" and path.exists():
raise EditorToolParameterInvalidError(
"path",
str(path),
f"File already exists at: {path}. Cannot overwrite files using "
"command `create`.",
)
if command != "create" and not path.exists():
raise EditorToolParameterInvalidError(
"path",
str(path),
f"The path {path} does not exist. Please provide a valid path.",
)
if command != "view":
if path.is_dir():
raise EditorToolParameterInvalidError(
"path",
str(path),
f"The path {path} is a directory and only the `view` command can "
"be used on directories.",
)
def undo_edit(self, path: Path) -> FileEditorObservation:
"""
Implement the undo_edit command.
"""
current_text = self.read_file(path)
old_text = self._history_manager.pop_last_history(path)
if old_text is None:
raise ToolError(f"No edit history found for {path}.")
self.write_file(path, old_text)
return FileEditorObservation.from_text(
text=(
f"Last edit to {path} undone successfully. "
f"{self._make_output(old_text, str(path))}"
),
command="undo_edit",
path=str(path),
prev_exist=True,
old_content=current_text,
new_content=old_text,
)
def validate_file(self, path: Path) -> None:
"""
Validate a file for reading or editing operations.
Args:
path: Path to the file to validate
Raises:
FileValidationError: If the file fails validation
"""
# Skip validation for directories or non-existent files (for create command)
if not path.exists() or not path.is_file():
return
# Check file size
file_size = os.path.getsize(path)
max_size = self._max_file_size
if file_size > max_size:
raise FileValidationError(
path=str(path),
reason=(
f"File is too large ({file_size / 1024 / 1024:.1f}MB). "
f"Maximum allowed size is {int(max_size / 1024 / 1024)}MB."
),
)
# Check file type - allow image files
file_extension = path.suffix.lower()
if is_binary(str(path)) and file_extension not in IMAGE_EXTENSIONS:
raise FileValidationError(
path=str(path),
reason=(
"File appears to be binary and this file type cannot be read "
"or edited by this tool."
),
)
@with_encoding
def read_file(
self,
path: Path,
start_line: int | None = None,
end_line: int | None = None,
encoding: str = "utf-8", # Default will be overridden by decorator
) -> str:
"""
Read the content of a file from a given path; raise a ToolError if an
error occurs.
Args:
path: Path to the file to read
start_line: Optional start line number (1-based). If provided with
end_line, only reads that range.
end_line: Optional end line number (1-based). Must be provided with
start_line.
encoding: The encoding to use when reading the file (auto-detected by
decorator)
"""
self.validate_file(path)
try:
if start_line is not None and end_line is not None:
# Read only the specified line range
lines = []
with open(path, encoding=encoding) as f:
for i, line in enumerate(f, 1):
if i > end_line:
break
if i >= start_line:
lines.append(line)
return "".join(lines)
elif start_line is not None or end_line is not None:
raise ValueError(
"Both start_line and end_line must be provided together"
)
else:
# Use line-by-line reading to avoid loading entire file into memory
with open(path, encoding=encoding) as f:
return "".join(f)
except Exception as e:
raise ToolError(f"Ran into {e} while trying to read {path}") from None
def _make_output(
self,
snippet_content: str,
snippet_description: str,
start_line: int = 1,
is_converted_markdown: bool = False,
) -> str:
"""
Generate output for the CLI based on the content of a code snippet.
"""
# If the content is converted from Markdown, we don't need line numbers
if is_converted_markdown:
snippet_content = maybe_truncate(
snippet_content,
truncate_after=MAX_RESPONSE_LEN_CHAR,
truncate_notice=BINARY_FILE_CONTENT_TRUNCATED_NOTICE,
)
return (
f"Here's the content of the file {snippet_description} displayed in "
"Markdown format:\n" + snippet_content + "\n"
)
snippet_content = maybe_truncate(
snippet_content,
truncate_after=MAX_RESPONSE_LEN_CHAR,
truncate_notice=TEXT_FILE_CONTENT_TRUNCATED_NOTICE,
)
snippet_content = "\n".join(
[
f"{i + start_line:6}\t{line}"
for i, line in enumerate(snippet_content.split("\n"))
]
)
return (
f"Here's the result of running `cat -n` on {snippet_description}:\n"
+ snippet_content
+ "\n"
)Terminal Tool
代码地址
两层架构
- 接口层:
TerminalTool - 逻辑层:
TerminalExecutor
核心命令
TerminalAction
class TerminalAction(Action):
"""Schema for bash command execution."""
command: str = Field(
description="The bash command to execute. Can be empty string to view additional logs when previous exit code is `-1`. Can be `C-c` (Ctrl+C) to interrupt the currently running process. Note: You can only execute one bash command at a time. If you need to run multiple commands sequentially, you can use `&&` or `;` to chain them together." # noqa
)
is_input: bool = Field(
default=False,
description="If True, the command is an input to the running process. If False, the command is a bash command to be executed in the terminal. Default is False.", # noqa
)
timeout: float | None = Field(
default=None,
ge=0,
description=f"Optional. Sets a maximum time limit (in seconds) for running the command. If the command takes longer than this limit, you’ll be asked whether to continue or stop it. If you don’t set a value, the command will instead pause and ask for confirmation when it produces no new output for {NO_CHANGE_TIMEOUT_SECONDS} seconds. Use a higher value if the command is expected to take a long time (like installation or testing), or if it has a known fixed duration (like sleep).", # noqa
)
reset: bool = Field(
default=False,
description="If True, reset the terminal by creating a new session. Use this only when the terminal becomes unresponsive. Note that all previously set environment variables and session state will be lost after reset. Cannot be used with is_input=True.", # noqa
)
@property
def visualize(self) -> Text:
"""Return Rich Text representation with PS1-style bash prompt."""
content = Text()
# Create PS1-style prompt
content.append("$ ", style="bold green")
# Add command with syntax highlighting
if self.command:
content.append(self.command, style="white")
else:
content.append("[empty command]", style="italic")
# Add metadata if present
if self.is_input:
content.append(" ", style="white")
content.append("(input to running process)", style="yellow")
if self.timeout is not None:
content.append(" ", style="white")
content.append(f"[timeout: {self.timeout}s]", style="cyan")
if self.reset:
content.append(" ", style="white")
content.append("[reset terminal]", style="red bold")
return contentTerminalObservation
class TerminalObservation(Observation):
"""A ToolResult that can be rendered as a CLI output."""
command: str | None = Field(
description="The bash command that was executed. Can be empty string if the observation is from a previous command that hit soft timeout and is not yet finished.", # noqa
)
exit_code: int | None = Field(
default=None,
description="The exit code of the command. -1 indicates the process hit the soft timeout and is not yet finished.", # noqa
)
timeout: bool = Field(
default=False, description="Whether the command execution timed out."
)
metadata: CmdOutputMetadata = Field(
default_factory=CmdOutputMetadata,
description="Additional metadata captured from PS1 after command execution.",
)
full_output_save_dir: str | None = Field(
default=None,
description="Directory where full output files are saved",
)
@property
def command_id(self) -> int | None:
"""Get the command ID from metadata."""
return self.metadata.pid
@property
def to_llm_content(self) -> Sequence[TextContent | ImageContent]:
llm_content: list[TextContent | ImageContent] = []
# If is_error is true, prepend error message
if self.is_error:
llm_content.append(TextContent(text=self.ERROR_MESSAGE_HEADER))
# TerminalObservation always has content as a single TextContent
content_text = self.text
ret = f"{self.metadata.prefix}{content_text}{self.metadata.suffix}"
if self.metadata.working_dir:
ret += f"\n[Current working directory: {self.metadata.working_dir}]"
if self.metadata.py_interpreter_path:
ret += f"\n[Python interpreter: {self.metadata.py_interpreter_path}]"
if self.metadata.exit_code != -1:
ret += f"\n[Command finished with exit code {self.metadata.exit_code}]"
# Use enhanced truncation with file saving if working directory is available
truncated_text = maybe_truncate(
content=ret,
truncate_after=MAX_CMD_OUTPUT_SIZE,
save_dir=self.full_output_save_dir,
tool_prefix="bash",
)
llm_content.append(TextContent(text=truncated_text))
return llm_content
@property
def visualize(self) -> Text:
"""Return Rich Text representation with terminal-style output formatting."""
text = Text()
if self.is_error:
text.append("❌ ", style="red bold")
text.append(self.ERROR_MESSAGE_HEADER, style="bold red")
# TerminalObservation always has content as a single TextContent
content_text = self.text
if content_text:
# Style the output based on content
output_lines = content_text.split("\n")
for line in output_lines:
if line.strip():
# Color error-like lines differently
if any(
keyword in line.lower()
for keyword in ["error", "failed", "exception", "traceback"]
):
text.append(line, style="red")
elif any(
keyword in line.lower() for keyword in ["warning", "warn"]
):
text.append(line, style="yellow")
elif line.startswith("+ "): # bash -x output
text.append(line, style="cyan")
else:
text.append(line, style="white")
text.append("\n")
# Add metadata with styling
if hasattr(self, "metadata") and self.metadata:
if self.metadata.working_dir:
text.append("\n📁 ", style="blue")
text.append(
f"Working directory: {self.metadata.working_dir}", style="blue"
)
if self.metadata.py_interpreter_path:
text.append("\n🐍 ", style="green")
text.append(
f"Python interpreter: {self.metadata.py_interpreter_path}",
style="green",
)
if (
hasattr(self.metadata, "exit_code")
and self.metadata.exit_code is not None
):
if self.metadata.exit_code == 0:
text.append("\n✅ ", style="green")
text.append(f"Exit code: {self.metadata.exit_code}", style="green")
elif self.metadata.exit_code == -1:
text.append("\n⏳ ", style="yellow")
text.append("Process still running (soft timeout)", style="yellow")
else:
text.append("\n❌ ", style="red")
text.append(f"Exit code: {self.metadata.exit_code}", style="red")
return textTerminalTool (定义和描述)
TOOL_DESCRIPTION = """Execute a bash command in the terminal within a persistent shell session.
### Command Execution
* One command at a time: You can only execute one bash command at a time. If you need to run multiple commands sequentially, use `&&` or `;` to chain them together.
* Persistent session: Commands execute in a persistent shell session where environment variables, virtual environments, and working directory persist between commands.
* Soft timeout: Commands have a soft timeout of 10 seconds, once that's reached, you have the option to continue or interrupt the command (see section below for details)
* Shell options: Do NOT use `set -e`, `set -eu`, or `set -euo pipefail` in shell scripts or commands in this environment. The runtime may not support them and can cause unusable shell sessions. If you want to run multi-line bash commands, write the commands to a file and then run it, instead.
### Long-running Commands
* For commands that may run indefinitely, run them in the background and redirect output to a file, e.g. `python3 app.py > server.log 2>&1 &`.
* For commands that may run for a long time (e.g. installation or testing commands), or commands that run for a fixed amount of time (e.g. sleep), you should set the "timeout" parameter of your function call to an appropriate value.
* If a bash command returns exit code `-1`, this means the process hit the soft timeout and is not yet finished. By setting `is_input` to `true`, you can:
- Send empty `command` to retrieve additional logs
- Send text (set `command` to the text) to STDIN of the running process
- Send control commands like `C-c` (Ctrl+C), `C-d` (Ctrl+D), or `C-z` (Ctrl+Z) to interrupt the process
- If you do C-c, you can re-start the process with a longer "timeout" parameter to let it run to completion
### Best Practices
* Directory verification: Before creating new directories or files, first verify the parent directory exists and is the correct location.
* Directory management: Try to maintain working directory by using absolute paths and avoiding excessive use of `cd`.
### Output Handling
* Output truncation: If the output exceeds a maximum length, it will be truncated before being returned.
### Terminal Reset
* Terminal reset: If the terminal becomes unresponsive, you can set the "reset" parameter to `true` to create a new terminal session. This will terminate the current session and start fresh.
* Warning: Resetting the terminal will lose all previously set environment variables, working directory changes, and any running processes. Use this only when the terminal stops responding to commands.
""" # noqa
class TerminalTool(ToolDefinition[TerminalAction, TerminalObservation]):
"""A ToolDefinition subclass that automatically initializes a TerminalExecutor with auto-detection.""" # noqa: E501
@classmethod
def create(
cls,
conv_state: "ConversationState",
username: str | None = None,
no_change_timeout_seconds: int | None = None,
terminal_type: Literal["tmux", "subprocess"] | None = None,
shell_path: str | None = None,
executor: ToolExecutor | None = None,
) -> Sequence["TerminalTool"]:
"""Initialize TerminalTool with executor parameters.
Args:
conv_state: Conversation state to get working directory from.
If provided, working_dir will be taken from
conv_state.workspace
username: Optional username for the bash session
no_change_timeout_seconds: Timeout for no output change
terminal_type: Force a specific session type:
('tmux', 'subprocess').
If None, auto-detect based on system capabilities:
- On Windows: PowerShell if available, otherwise subprocess
- On Unix-like: tmux if available, otherwise subprocess
shell_path: Path to the shell binary (for subprocess terminal type only).
If None, will auto-detect bash from PATH.
"""
# Import here to avoid circular imports
from openhands.tools.terminal.impl import TerminalExecutor
working_dir = conv_state.workspace.working_dir
if not os.path.isdir(working_dir):
raise ValueError(f"working_dir '{working_dir}' is not a valid directory")
# Initialize the executor
if executor is None:
executor = TerminalExecutor(
working_dir=working_dir,
username=username,
no_change_timeout_seconds=no_change_timeout_seconds,
terminal_type=terminal_type,
shell_path=shell_path,
full_output_save_dir=conv_state.env_observation_persistence_dir,
)
# Initialize the parent ToolDefinition with the executor
return [
cls(
action_type=TerminalAction,
observation_type=TerminalObservation,
description=TOOL_DESCRIPTION,
annotations=ToolAnnotations(
title="terminal",
readOnlyHint=False,
destructiveHint=True,
idempotentHint=False,
openWorldHint=True,
),
executor=executor,
)
]
# Automatically register the tool when this module is imported
register_tool(TerminalTool.name, TerminalTool)Bash 终端工具使用指南
在持久化的终端会话中执行 Bash 命令。
### 命令执行
单条执行:一次只能执行一条 Bash 命令。如果需要按顺序运行多条命令,请使用 && 或 ; 进行链式连接。
持久化会话:命令在持久化的 Shell 会话中运行。环境变量、虚拟环境和工作目录在不同命令之间会保持生效。
软超时:命令设有 10 秒的软超时。一旦达到该时间,你可以选择继续运行或中断该命令(详见下文)。
Shell 选项限制:切勿在 Shell 脚本或命令中使用 set -e、set -eu 或 set -euo pipefail。当前的运行时环境可能不支持这些选项,并可能导致 Shell 会话无法使用。如果需要运行多行 Bash 命令,请先将命令写入文件,然后再运行该文件。
### 长期运行的命令
无限运行的命令:对于可能无限期运行的命令,请将其放入后台运行并重定向输出到文件。例如:python3 app.py > server.log 2>&1 &。
耗时较长的命令:对于安装、测试或固定时长的命令(如 sleep),你应该在函数调用中设置合适的 timeout(超时)参数。
超时处理(状态码 -1):如果 Bash 命令返回退出代码 -1,意味着进程触发了软超时但尚未结束。通过将 is_input 设置为 true,你可以:
发送空的 command(命令)来检索更多日志。
发送文本到正在运行的进程的 STDIN(标准输入)。
发送控制命令,如 C-c (Ctrl+C)、C-d (Ctrl+D) 或 C-z (Ctrl+Z) 来中断进程。
如果执行了 C-c,你可以调大 timeout 参数重新启动进程,以便让其运行完成。
### 最佳实践
目录验证:在创建新目录或文件之前,先验证父目录是否存在且位置正确。
目录管理:尽量通过使用绝对路径来维持工作目录,避免过度使用 cd 命令。
### 输出处理
输出截断:如果输出超过最大长度限制,在返回结果前内容会被截断。
### 终端重置
终端重置:如果终端变得无响应,你可以将 reset 参数设置为 true 来创建一个新的终端会话。这会终止当前会话并重新开始。
警告:重置终端会导致所有之前设置的环境变量、工作目录更改以及正在运行的进程丢失。仅在终端完全停止响应命令时才使用此功能。
关键点解析(供开发者参考):
持久化(Persistence):这是 OpenHands 的一个核心特点。你在第一个步骤里 export API_KEY=xxx,在后面的步骤里这个变量依然存在,不需要重复设置。
软超时(Soft Timeout):为了防止模型因为一个卡死的命令而永久等待,系统设定了 10 秒。如果返回 -1,模型可以决定是继续等待日志(发送空指令)还是杀掉进程。
禁止 set -e:这是为了防止 Shell 会话因为某个非致命错误而意外退出。
交互式输入:通过 is_input 字段,AI 可以像人类一样与需要输入 y/n 或密码的终端程序进行简单的交互。TerminalExecutor (执行包装,对内)
持久化Session 管理
create_terminal_session自动选择最合适后端(tmux或subprocess)。- 状态保持:
运行命令+维护Shell 状态。- 在上一个命令
cd切换了目录,下一个命令依然会在那个目录下执行。
- 在上一个命令
- 重置机制 (
reset):终端卡死或不可用,它可以彻底杀掉旧进程并干净新会话。
环境变量与隐私安全 (Secrets & Envs)
- 自动注入 (
_export_envs): - 敏感信息脱敏 (Masking):防止终端泄露敏感信息给AI,调用
mask_secrets_in_output
import json
from typing import TYPE_CHECKING, Literal
from openhands.sdk.llm import TextContent
from openhands.sdk.logger import get_logger
from openhands.sdk.tool import ToolExecutor
if TYPE_CHECKING:
from openhands.sdk.conversation import LocalConversation
from openhands.tools.terminal.definition import (
TerminalAction,
TerminalObservation,
)
from openhands.tools.terminal.terminal.factory import create_terminal_session
from openhands.tools.terminal.terminal.terminal_session import TerminalSession
logger = get_logger(__name__)
class TerminalExecutor(ToolExecutor[TerminalAction, TerminalObservation]):
session: TerminalSession
shell_path: str | None
def __init__(
self,
working_dir: str,
username: str | None = None,
no_change_timeout_seconds: int | None = None,
terminal_type: Literal["tmux", "subprocess"] | None = None,
shell_path: str | None = None,
full_output_save_dir: str | None = None,
):
"""Initialize TerminalExecutor with auto-detected or specified session type.
Args:
working_dir: Working directory for bash commands
username: Optional username for the bash session
no_change_timeout_seconds: Timeout for no output change
terminal_type: Force a specific session type:
('tmux', 'subprocess').
If None, auto-detect based on system capabilities
shell_path: Path to the shell binary (for subprocess terminal type only).
If None, will auto-detect bash from PATH.
full_output_save_dir: Path to directory to save full output
logs and files, used when truncation is needed.
"""
self.shell_path = shell_path
# 创建session
self.session = create_terminal_session(
work_dir=working_dir,
username=username,
no_change_timeout_seconds=no_change_timeout_seconds,
terminal_type=terminal_type,
shell_path=shell_path,
)
self.session.initialize()
self.full_output_save_dir: str | None = full_output_save_dir
logger.info(
f"TerminalExecutor initialized with working_dir: {working_dir}, "
f"username: {username}, "
f"terminal_type: {terminal_type or self.session.__class__.__name__}"
)
def _export_envs(
self, action: TerminalAction, conversation: "LocalConversation | None" = None
) -> None:
if not action.command.strip():
return
if action.is_input:
return
# Get secrets from conversation
env_vars = {}
if conversation is not None:
try:
secret_registry = conversation.state.secret_registry
env_vars = secret_registry.get_secrets_as_env_vars(action.command)
except Exception:
env_vars = {}
if not env_vars:
return
export_statements = []
for key, value in env_vars.items():
export_statements.append(f"export {key}={json.dumps(value)}")
exports_cmd = " && ".join(export_statements)
logger.debug(f"Exporting {len(env_vars)} environment variables before command")
# Execute the export command separately to persist env in the session
_ = self.session.execute(
TerminalAction(
command=exports_cmd,
is_input=False,
timeout=action.timeout,
)
)
def reset(self) -> TerminalObservation:
"""Reset the terminal session by creating a new instance.
Returns:
TerminalObservation with reset confirmation message
"""
original_work_dir = self.session.work_dir
original_username = self.session.username
original_no_change_timeout = self.session.no_change_timeout_seconds
self.session.close()
self.session = create_terminal_session(
work_dir=original_work_dir,
username=original_username,
no_change_timeout_seconds=original_no_change_timeout,
terminal_type=None, # Let it auto-detect like before
shell_path=self.shell_path,
)
self.session.initialize()
logger.info(
f"Terminal session reset successfully with working_dir: {original_work_dir}"
)
return TerminalObservation.from_text(
text=(
"Terminal session has been reset. All previous environment "
"variables and session state have been cleared."
),
command="[RESET]",
exit_code=0,
)
def __call__(
self,
action: TerminalAction,
conversation: "LocalConversation | None" = None,
) -> TerminalObservation:
# Validate field combinations
if action.reset and action.is_input:
raise ValueError("Cannot use reset=True with is_input=True")
if action.reset or self.session._closed:
reset_result = self.reset()
# Handle command execution after reset
if action.command.strip():
command_action = TerminalAction(
command=action.command,
timeout=action.timeout,
is_input=False, # is_input validated to be False when reset=True
)
self._export_envs(command_action, conversation)
# 执行
command_result = self.session.execute(command_action)
# Extract text from content
reset_text = reset_result.text
command_text = command_result.text
observation = command_result.model_copy(
update={
"content": [
TextContent(text=f"{reset_text}\n\n{command_text}")
],
"command": f"[RESET] {action.command}",
}
)
else:
# Reset only, no command to execute
observation = reset_result
else:
# If env keys detected, export env values to bash as a separate action first
self._export_envs(action, conversation)
observation = self.session.execute(action)
# Apply automatic secrets masking
content_text = observation.text
if content_text and conversation is not None:
try:
secret_registry = conversation.state.secret_registry
masked_content = secret_registry.mask_secrets_in_output(content_text)
if masked_content:
data = observation.model_dump(
exclude={"content", "full_output_save_dir"}
)
return TerminalObservation.from_text(
text=masked_content,
full_output_save_dir=self.full_output_save_dir,
**data,
)
except Exception:
pass
return observation
def close(self) -> None:
"""Close the terminal session and clean up resources."""
if hasattr(self, "session"):
self.session.close()TaskTrackerTool
代码地址
背景
- 复杂开发任务,AI容易
走丢忘记进度。 - Task Tracker:让 AI 能够
结构化地规划任务、记录状态并同步给用户。
2层架构
- 接口层:
TaskTrackerTool - 逻辑层:
TaskTrackerExecutor
意义
对AI:强制 AI 在
动手前先思考(Decomposition),并记录当前处在哪个环节。外部记忆体- 它把AI从
散漫开发者,变成了先计划、再执行、随时记录的专业工程师。
- 它把AI从
对用户:提高了
透明度。不用读AI冗长思考日志,只需看TaskList,就能知整体完成度。
TaskTrackerAction
TaskItem
定义了
单任务结构。title: 任务标题。notes: 详细笔记(比如记录实现细节或遇到的坑)。3种status:todo(待办)、in_progress(进行中)、done(完成)。
TaskTrackerAction 只有2个命令
view: 查看当前清单。plan: 提交一份完整的、更新后的任务列表(它是全量更新,不是增量更新)。
# Type alias for task tracker status
TaskTrackerStatusType = Literal["todo", "in_progress", "done"]
class TaskItem(BaseModel):
title: str = Field(..., description="A brief title for the task.")
notes: str = Field("", description="Additional details or notes about the task.")
status: TaskTrackerStatusType = Field(
"todo",
description="The current status of the task. "
"One of 'todo', 'in_progress', or 'done'.",
)
class TaskTrackerAction(Action):
"""An action where the agent writes or updates a task list for task management."""
command: Literal["view", "plan"] = Field(
default="view",
description="The command to execute. `view` shows the current task list. `plan` creates or updates the task list based on provided requirements and progress. Always `view` the current list before making changes.", # noqa: E501
)
task_list: list[TaskItem] = Field(
default_factory=list,
description="The full task list. Required parameter of `plan` command.",
)
@property
def visualize(self) -> Text:
"""Return Rich Text representation with task management styling."""
content = Text()
# Add command header with icon
if self.command == "view":
content.append("👀 ", style="blue")
content.append("View Task List", style="blue")
else: # plan
content.append("📋 ", style="green")
content.append("Update Task List", style="green")
# Show task count if planning
if self.command == "plan" and self.task_list:
content.append(f" ({len(self.task_list)} tasks)")
return contentTaskTrackerObservation
class TaskTrackerObservation(Observation):
"""This data class represents the result of a task tracking operation."""
command: Literal["view", "plan"] = Field(
description='The command that was executed: "view" or "plan".'
)
task_list: list[TaskItem] = Field(
default_factory=list, description="The current task list"
)
@property
def visualize(self) -> Text:
"""Return Rich Text representation with task list formatting."""
text = Text()
if self.is_error:
text.append("❌ ", style="red bold")
text.append(self.ERROR_MESSAGE_HEADER, style="bold red")
if self.task_list:
# Count tasks by status
todo_count = sum(1 for task in self.task_list if task.status == "todo")
in_progress_count = sum(
1 for task in self.task_list if task.status == "in_progress"
)
done_count = sum(1 for task in self.task_list if task.status == "done")
# Show status summary
if self.command == "plan":
text.append("✅ ", style="green")
text.append("Task list updated: ", style="green")
else: # view command
text.append("📋 ", style="blue")
text.append("Current task list: ", style="blue")
# Status counts
status_parts = []
if todo_count:
status_parts.append(f"{todo_count} todo")
if in_progress_count:
status_parts.append(f"{in_progress_count} in progress")
if done_count:
status_parts.append(f"{done_count} done")
if status_parts:
text.append(", ".join(status_parts), style="white")
text.append("\n\n")
# Show the actual task list
for i, task in enumerate(self.task_list, 1):
# Status icon
if task.status == "done":
text.append("✅ ", style="green")
elif task.status == "in_progress":
text.append("🔄 ", style="yellow")
else: # todo
text.append("⏳ ", style="blue")
# Task title
text.append(f"{i}. {task.title}", style="white")
# NEW: show notes under the title if present
if task.notes:
text.append("\n Notes: " + task.notes, style="italic")
if i < len(self.task_list):
text.append("\n")
else:
text.append("📝 ", style="blue")
text.append("Task list is empty")
return textTaskTrackerExecutor
持久化 (TASKS.json)
- 不同于简单的内存变量,它会将
任务列表保存到磁盘。即使 AI 重启或对话中断,进度也不会丢失。
格式化输出
- 它重写了
visualize属性。 - 在 OpenHands 界面,会看到带图标(⏳、🔄、✅)的精美进度条,而非枯燥JSON。
class TaskTrackerExecutor(ToolExecutor[TaskTrackerAction, TaskTrackerObservation]):
"""Executor for the task tracker tool."""
save_dir: Path | None
def __init__(self, save_dir: str | None = None):
"""Initialize TaskTrackerExecutor.
Args:
save_dir: Optional directory to save tasks to. If provided, tasks will be
persisted to save_dir/TASKS.md
"""
self.save_dir = Path(save_dir) if save_dir else None
logger.info(f"TaskTrackerExecutor initialized with save_dir: {self.save_dir}")
self._task_list: list[TaskItem] = []
# Load existing tasks if save_dir is provided and file exists
if self.save_dir:
self._load_tasks()
def __call__(
self,
action: TaskTrackerAction,
conversation: "LocalConversation | None" = None, # noqa: ARG002
) -> TaskTrackerObservation:
"""Execute the task tracker action."""
if action.command == "plan":
# Update the task list
self._task_list = action.task_list
# Save to file if save_dir is provided
if self.save_dir:
self._save_tasks()
return TaskTrackerObservation.from_text(
text=(
f"Task list has been updated with {len(self._task_list)} item(s)."
),
command=action.command,
task_list=self._task_list,
)
elif action.command == "view":
# Return the current task list
if not self._task_list:
return TaskTrackerObservation.from_text(
text=('No task list found. Use the "plan" command to create one.'),
command=action.command,
task_list=[],
)
content = self._format_task_list(self._task_list)
return TaskTrackerObservation.from_text(
text=content,
command=action.command,
task_list=self._task_list,
)
else:
return TaskTrackerObservation.from_text(
text=(
f"Unknown command: {action.command}. "
'Supported commands are "view" and "plan".'
),
is_error=True,
command=action.command,
task_list=[],
)
def _format_task_list(self, task_list: list[TaskItem]) -> str:
"""Format the task list for display."""
if not task_list:
return "No tasks in the list."
content = "# Task List\n\n"
for i, task in enumerate(task_list, 1):
status_icon = {"todo": "⏳", "in_progress": "🔄", "done": "✅"}.get(
task.status, "⏳"
)
title = task.title
notes = task.notes
content += f"{i}. {status_icon} {title}\n"
if notes:
content += f" {notes}\n"
content += "\n"
return content.strip()
def _load_tasks(self) -> None:
"""Load tasks from the TASKS.json file if it exists."""
if not self.save_dir:
return
tasks_file = self.save_dir / "TASKS.json"
if not tasks_file.exists():
return
try:
with open(tasks_file, encoding="utf-8") as f:
self._task_list = [TaskItem.model_validate(d) for d in json.load(f)]
except (OSError, json.JSONDecodeError, TypeError, ValidationError) as e:
logger.warning(
f"Failed to load tasks from {tasks_file}: {e}. Starting with "
"an empty task list."
)
self._task_list = []
def _save_tasks(self) -> None:
"""Save tasks to the TASKS.json file."""
if not self.save_dir:
return
tasks_file = self.save_dir / "TASKS.json"
try:
# Create the directory if it doesn't exist
self.save_dir.mkdir(parents=True, exist_ok=True)
with open(tasks_file, "w", encoding="utf-8") as f:
json.dump([task.model_dump() for task in self._task_list], f, indent=2)
except OSError as e:
logger.warning(f"Failed to save tasks to {tasks_file}: {e}")
passTaskTrackerTool
# Tool definition with detailed description
TASK_TRACKER_DESCRIPTION = """This tool provides structured task management capabilities for development workflows.
It enables systematic tracking of work items, progress monitoring, and efficient
organization of complex development activities.
The tool maintains visibility into project status and helps communicate
progress effectively to users.
## Application Guidelines
Utilize this tool in the following situations:
1. Multi-phase development work - When projects involve multiple sequential or
parallel activities
2. Complex implementation tasks - Work requiring systematic planning and
coordination across multiple components
3. Explicit user request for task organization - When users specifically ask
for structured task management
4. Multiple concurrent requirements - When users present several work items
that need coordination
5. Project initiation - Capture and organize user requirements at project start
6. Work commencement - Update task status to in_progress before beginning
implementation. Maintain focus by limiting active work to one task
7. Task completion - Update status to done and identify any additional work
that emerged during implementation
## Situations Where Tool Usage Is Unnecessary
Avoid using this tool when:
1. Single atomic tasks that require no decomposition
2. Trivial operations where tracking adds no organizational value
3. Simple activities completable in minimal steps
4. Pure information exchange or discussion
Note: For single straightforward tasks, proceed with direct implementation
rather than creating tracking overhead.
## Usage Scenarios
**Scenario A: Feature Development with Validation**
User request: "Build a user authentication system with login/logout functionality.
Don't forget to include input validation and error handling!"
Response approach: I'll implement a user authentication system with comprehensive
validation. Let me organize this work systematically.
*Task breakdown includes:*
1. Design authentication API endpoints and data models
2. Implement user login/logout controllers with session management
3. Add client-side form validation and error display
4. Create backend input sanitization and security checks
5. Implement comprehensive error handling for edge cases
6. Perform integration testing and fix any issues
**Scenario B: Codebase Refactoring**
User request: "I need to update all instances of the method 'fetchData' to
'retrieveInformation' throughout the entire project"
Response approach: Let me first analyze your codebase to identify all locations
where 'fetchData' appears.
*After code analysis*
I've located 12 occurrences of 'fetchData' across 6 files in your project.
I'll create a systematic plan to update these references.
*Organized task list includes specific file-by-file updates*
**Scenario C: Multi-feature Development**
User request: "Set up a blog platform with these components: admin dashboard,
article management, comment system, and user profiles."
Response approach: I'll help build your blog platform. Let me break down these
components into manageable implementation phases.
*Creates structured plan with each feature decomposed into specific development tasks*
## Counter-examples - Direct Implementation Preferred
**Counter-example A: Simple inquiry**
User request: "What's the syntax for a for loop in JavaScript?"
Response approach: Here's how to write a for loop in JavaScript:
```javascript
for (let i = 0; i < array.length; i++) {
console.log(array[i]);
}
```
*No task tracking needed - single informational response*
**Counter-example B: Minor modification**
User request: "Please add a docstring to the processPayment function explaining
its parameters."
Response approach: I'll add a docstring to explain the processPayment function.
*Direct edit without task tracking for simple modification*
## Status Management and Workflow
1. **Status Values**: Track work using these states:
- todo: Not yet initiated
- in_progress: Currently active (maintain single focus)
- done: Successfully completed
2. **Workflow Practices**:
- Update status dynamically as work progresses
- Mark completion immediately upon task finish
- Limit active work to ONE task at any given time
- Complete current activities before initiating new ones
- Remove obsolete tasks from tracking entirely
3. **Completion Criteria**:
- Mark tasks as done only when fully achieved
- Keep status as in_progress if errors, blocks, or partial completion exist
- Create new tasks for discovered issues or dependencies
- Never mark done when:
- Test suites are failing
- Implementation remains incomplete
- Unresolved errors persist
- Required resources are unavailable
4. **Task Organization**:
- Write precise, actionable descriptions
- Decompose complex work into manageable units
- Use descriptive, clear naming conventions
When uncertain, favor using this tool. Proactive task management demonstrates
systematic approach and ensures comprehensive requirement fulfillment.""" # noqa: E501
class TaskTrackerTool(ToolDefinition[TaskTrackerAction, TaskTrackerObservation]):
"""A ToolDefinition subclass that automatically initializes a TaskTrackerExecutor.""" # noqa: E501
@classmethod
def create(cls, conv_state: "ConversationState") -> Sequence["TaskTrackerTool"]:
"""Initialize TaskTrackerTool with a TaskTrackerExecutor.
Args:
conv_state: Conversation state to get persistence directory from.
If provided, save_dir will be taken from
conv_state.persistence_dir
"""
executor = TaskTrackerExecutor(save_dir=conv_state.persistence_dir)
# Initialize the parent Tool with the executor
return [
cls(
description=TASK_TRACKER_DESCRIPTION,
action_type=TaskTrackerAction,
observation_type=TaskTrackerObservation,
annotations=ToolAnnotations(
readOnlyHint=False,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
),
executor=executor,
)
]
# Automatically register the tool when this module is imported
register_tool(TaskTrackerTool.name, TaskTrackerTool)OpenHands Agent
AgentBase
代码
llm
system_message:可能根据llm.model,加载特定模板。
tools和_tools 脚手架
tools:配置态,定义的技能,具体见上文关键tools。_tools:运行态,在_initialize阶段真正实例化的tools,并行解析加载的。
agent_context
- 上下文。在生成
system_message时,会把agent_context追加进去。 - 比如:
AGENTS.md里写了不要使用sudo命令,则会追加到system_message末尾
filter_tools_regex 行为约束
- 去掉一些tool,非常实用,用于对 Agent 的权限进行“微操”,
include_default_tools
- 默认工具:
FinishTool,ThinkTool。
from openhands.sdk.context.agent_context import AgentContext
from openhands.sdk.context.condenser import CondenserBase
from openhands.sdk.context.prompts.prompt import render_template
from openhands.sdk.critic.base import CriticBase
from openhands.sdk.llm import LLM
from openhands.sdk.llm.utils.model_prompt_spec import get_model_prompt_spec
from openhands.sdk.logger import get_logger
from openhands.sdk.mcp import create_mcp_tools
from openhands.sdk.tool import (
BUILT_IN_TOOL_CLASSES,
BUILT_IN_TOOLS,
Tool,
ToolDefinition,
resolve_tool,
)
from openhands.sdk.utils.models import DiscriminatedUnionMixin
if TYPE_CHECKING:
from openhands.sdk.conversation import ConversationState, LocalConversation
from openhands.sdk.conversation.types import (
ConversationCallbackType,
ConversationTokenCallbackType,
)
logger = get_logger(__name__)
class AgentBase(DiscriminatedUnionMixin, ABC):
"""Abstract base class for OpenHands agents.
Agents are stateless and should be fully defined by their configuration.
This base class provides the common interface and functionality that all
agent implementations must follow.
"""
model_config = ConfigDict(
frozen=True,
arbitrary_types_allowed=True,
)
llm: LLM = Field(
...,
description="LLM configuration for the agent.",
examples=[
{
"model": "litellm_proxy/anthropic/claude-sonnet-4-5-20250929",
"base_url": "https://llm-proxy.eval.all-hands.dev",
"api_key": "your_api_key_here",
}
],
)
tools: list[Tool] = Field(
default_factory=list,
description="List of tools to initialize for the agent.",
examples=[
{"name": "TerminalTool", "params": {}},
{"name": "FileEditorTool", "params": {}},
{
"name": "TaskTrackerTool",
"params": {},
},
],
)
mcp_config: dict[str, Any] = Field(
default_factory=dict,
description="Optional MCP configuration dictionary to create MCP tools.",
examples=[
{"mcpServers": {"fetch": {"command": "uvx", "args": ["mcp-server-fetch"]}}}
],
)
filter_tools_regex: str | None = Field(
default=None,
description="Optional regex to filter the tools available to the agent by name."
" This is applied after any tools provided in `tools` and any MCP tools are"
" added.",
examples=["^(?!repomix)(.*)|^repomix.*pack_codebase.*$"],
)
include_default_tools: list[str] = Field(
default_factory=lambda: [tool.__name__ for tool in BUILT_IN_TOOLS],
description=(
"List of default tool class names to include. By default, the agent "
"includes 'FinishTool' and 'ThinkTool'. Set to an empty list to disable "
"all default tools, or provide a subset to include only specific ones. "
"Example: include_default_tools=['FinishTool'] to only include FinishTool, "
"or include_default_tools=[] to disable all default tools."
),
examples=[["FinishTool", "ThinkTool"], ["FinishTool"], []],
)
agent_context: AgentContext | None = Field(
default=None,
description="Optional AgentContext to initialize "
"the agent with specific context.",
examples=[
{
"skills": [
{
"name": "AGENTS.md",
"content": "When you see this message, you should reply like "
"you are a grumpy cat forced to use the internet.",
"type": "repo",
},
{
"name": "flarglebargle",
"content": (
"IMPORTANT! The user has said the magic word "
'"flarglebargle". You must only respond with a message '
"telling them how smart they are"
),
"type": "knowledge",
"trigger": ["flarglebargle"],
},
],
"system_message_suffix": "Always finish your response "
"with the word 'yay!'",
"user_message_prefix": "The first character of your "
"response should be 'I'",
}
],
)
system_prompt_filename: str = Field(
default="system_prompt.j2",
description=(
"System prompt template filename. Can be either:\n"
"- A relative filename (e.g., 'system_prompt.j2') loaded from the "
"agent's prompts directory\n"
"- An absolute path (e.g., '/path/to/custom_prompt.j2')"
),
)
security_policy_filename: str = Field(
default="security_policy.j2",
description=(
"Security policy template filename. Can be either:\n"
"- A relative filename (e.g., 'security_policy.j2') loaded from the "
"agent's prompts directory\n"
"- An absolute path (e.g., '/path/to/custom_security_policy.j2')"
),
)
system_prompt_kwargs: dict[str, object] = Field(
default_factory=dict,
description="Optional kwargs to pass to the system prompt Jinja2 template.",
examples=[{"cli_mode": True}],
)
condenser: CondenserBase | None = Field(
default=None,
description="Optional condenser to use for condensing conversation history.",
examples=[
{
"kind": "LLMSummarizingCondenser",
"llm": {
"model": "litellm_proxy/anthropic/claude-sonnet-4-5-20250929",
"base_url": "https://llm-proxy.eval.all-hands.dev",
"api_key": "your_api_key_here",
},
"max_size": 80,
"keep_first": 10,
}
],
)
critic: CriticBase | None = Field(
default=None,
description=(
"EXPERIMENTAL: Optional critic to evaluate agent actions and messages "
"in real-time. API and behavior may change without notice. "
"May impact performance, especially in 'all_actions' mode."
),
examples=[{"kind": "AgentFinishedCritic"}],
)
# Runtime materialized tools; private and non-serializable
_tools: dict[str, ToolDefinition] = PrivateAttr(default_factory=dict)
_initialized: bool = PrivateAttr(default=False)
@property
def prompt_dir(self) -> str:
"""Returns the directory where this class's module file is located."""
module = sys.modules[self.__class__.__module__]
module_file = module.__file__ # e.g. ".../mypackage/mymodule.py"
if module_file is None:
raise ValueError(f"Module file for {module} is None")
return os.path.join(os.path.dirname(module_file), "prompts")
@property
def name(self) -> str:
"""Returns the name of the Agent."""
return self.__class__.__name__
@property
def system_message(self) -> str:
"""Compute system message on-demand to maintain statelessness."""
template_kwargs = dict(self.system_prompt_kwargs)
# Add security_policy_filename to template kwargs
template_kwargs["security_policy_filename"] = self.security_policy_filename
template_kwargs.setdefault("model_name", self.llm.model)
if (
"model_family" not in template_kwargs
or "model_variant" not in template_kwargs
):
spec = get_model_prompt_spec(
self.llm.model, getattr(self.llm, "model_canonical_name", None)
)
if "model_family" not in template_kwargs and spec.family:
template_kwargs["model_family"] = spec.family
if "model_variant" not in template_kwargs and spec.variant:
template_kwargs["model_variant"] = spec.variant
system_message = render_template(
prompt_dir=self.prompt_dir,
template_name=self.system_prompt_filename,
**template_kwargs,
)
if self.agent_context:
_system_message_suffix = self.agent_context.get_system_message_suffix(
llm_model=self.llm.model,
llm_model_canonical=self.llm.model_canonical_name,
)
if _system_message_suffix:
system_message += "\n\n" + _system_message_suffix
return system_message
def init_state(
self,
state: ConversationState,
on_event: ConversationCallbackType, # noqa: ARG002
) -> None:
"""Initialize the empty conversation state to prepare the agent for user
messages.
Typically this involves adding system message
NOTE: state will be mutated in-place.
"""
self._initialize(state)
def _initialize(self, state: ConversationState):
"""Create an AgentBase instance from an AgentSpec."""
if self._initialized:
logger.warning("Agent already initialized; skipping re-initialization.")
return
tools: list[ToolDefinition] = []
# Use ThreadPoolExecutor to parallelize tool resolution
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
# Submit tool resolution tasks
for tool_spec in self.tools:
future = executor.submit(resolve_tool, tool_spec, state)
futures.append(future)
# Submit MCP tools creation if configured
if self.mcp_config:
future = executor.submit(create_mcp_tools, self.mcp_config, 30)
futures.append(future)
# Collect results as they complete
for future in futures:
result = future.result()
tools.extend(result)
logger.info(
f"Loaded {len(tools)} tools from spec: {[tool.name for tool in tools]}"
)
if self.filter_tools_regex:
pattern = re.compile(self.filter_tools_regex)
tools = [tool for tool in tools if pattern.match(tool.name)]
logger.info(
f"Filtered to {len(tools)} tools after applying regex filter: "
f"{[tool.name for tool in tools]}",
)
# Include default tools from include_default_tools; not subject to regex
# filtering. Use explicit mapping to resolve tool class names.
for tool_name in self.include_default_tools:
tool_class = BUILT_IN_TOOL_CLASSES.get(tool_name)
if tool_class is None:
raise ValueError(
f"Unknown built-in tool class: '{tool_name}'. "
f"Expected one of: {list(BUILT_IN_TOOL_CLASSES.keys())}"
)
tool_instances = tool_class.create(state)
tools.extend(tool_instances)
# Check tool types
for tool in tools:
if not isinstance(tool, ToolDefinition):
raise ValueError(
f"Tool {tool} is not an instance of 'ToolDefinition'. "
f"Got type: {type(tool)}"
)
# Check name duplicates
tool_names = [tool.name for tool in tools]
if len(tool_names) != len(set(tool_names)):
duplicates = set(name for name in tool_names if tool_names.count(name) > 1)
raise ValueError(f"Duplicate tool names found: {duplicates}")
# Store tools in a dict for easy access
self._tools = {tool.name: tool for tool in tools}
self._initialized = True
@abstractmethod
def step(
self,
conversation: LocalConversation,
on_event: ConversationCallbackType,
on_token: ConversationTokenCallbackType | None = None,
) -> None:
"""Taking a step in the conversation.
Typically this involves:
1. Making a LLM call
2. Executing the tool
3. Updating the conversation state with
LLM calls (role="assistant") and tool results (role="tool")
4.1 If conversation is finished, set state.execution_status to FINISHED
4.2 Otherwise, just return, Conversation will kick off the next step
If the underlying LLM supports streaming, partial deltas are forwarded to
``on_token`` before the full response is returned.
NOTE: state will be mutated in-place.
"""
def verify(
self,
persisted: AgentBase,
events: Sequence[Any] | None = None, # noqa: ARG002
) -> AgentBase:
"""Verify that we can resume this agent from persisted state.
We do not merge configuration between persisted and runtime Agent
instances. Instead, we verify compatibility requirements and then
continue with the runtime-provided Agent.
Compatibility requirements:
- Agent class/type must match.
- Tools must match exactly (same tool names).
Tools are part of the system prompt and cannot be changed mid-conversation.
To use different tools, start a new conversation or use conversation forking
(see https://github.com/OpenHands/OpenHands/issues/8560).
All other configuration (LLM, agent_context, condenser, etc.) can be
freely changed between sessions.
Args:
persisted: The agent loaded from persisted state.
events: Unused, kept for API compatibility.
Returns:
This runtime agent (self) if verification passes.
Raises:
ValueError: If agent class or tools don't match.
"""
if persisted.__class__ is not self.__class__:
raise ValueError(
"Cannot load from persisted: persisted agent is of type "
f"{persisted.__class__.__name__}, but self is of type "
f"{self.__class__.__name__}."
)
# Collect explicit tool names
runtime_names = {tool.name for tool in self.tools}
persisted_names = {tool.name for tool in persisted.tools}
# Add builtin tool names from include_default_tools
# These are runtime names like 'finish', 'think'
for tool_class_name in self.include_default_tools:
tool_class = BUILT_IN_TOOL_CLASSES.get(tool_class_name)
if tool_class is not None:
runtime_names.add(tool_class.name)
for tool_class_name in persisted.include_default_tools:
tool_class = BUILT_IN_TOOL_CLASSES.get(tool_class_name)
if tool_class is not None:
persisted_names.add(tool_class.name)
if runtime_names == persisted_names:
return self
# Tools don't match - this is not allowed
missing_in_runtime = persisted_names - runtime_names
added_in_runtime = runtime_names - persisted_names
details: list[str] = []
if missing_in_runtime:
details.append(f"removed: {sorted(missing_in_runtime)}")
if added_in_runtime:
details.append(f"added: {sorted(added_in_runtime)}")
raise ValueError(
f"Cannot resume conversation: tools cannot be changed mid-conversation "
f"({'; '.join(details)}). "
f"To use different tools, start a new conversation."
)
def model_dump_succint(self, **kwargs):
"""Like model_dump, but excludes None fields by default."""
if "exclude_none" not in kwargs:
kwargs["exclude_none"] = True
dumped = super().model_dump(**kwargs)
# remove tool schema details for brevity
if "tools" in dumped and isinstance(dumped["tools"], dict):
dumped["tools"] = list(dumped["tools"].keys())
return dumped
def get_all_llms(self) -> Generator[LLM, None, None]:
"""Recursively yield unique *base-class* LLM objects reachable from `self`.
- Returns actual object references (not copies).
- De-dupes by `id(LLM)`.
- Cycle-safe via a visited set for *all* traversed objects.
- Only yields objects whose type is exactly `LLM` (no subclasses).
- Does not handle dataclasses.
"""
yielded_ids: set[int] = set()
visited: set[int] = set()
def _walk(obj: object) -> Iterable[LLM]:
oid = id(obj)
# Guard against cycles on anything we might recurse into
if oid in visited:
return ()
visited.add(oid)
# Traverse LLM based classes and its fields
# e.g., LLMRouter that is a subclass of LLM
# yet contains LLM in its fields
if isinstance(obj, LLM):
llm_out: list[LLM] = []
# Yield only the *raw* base-class LLM (exclude subclasses)
if type(obj) is LLM and oid not in yielded_ids:
yielded_ids.add(oid)
llm_out.append(obj)
# Traverse all fields for LLM objects
for name in type(obj).model_fields:
try:
val = getattr(obj, name)
except Exception:
continue
llm_out.extend(_walk(val))
return llm_out
# Pydantic models: iterate declared fields
if isinstance(obj, BaseModel):
model_out: list[LLM] = []
for name in type(obj).model_fields:
try:
val = getattr(obj, name)
except Exception:
continue
model_out.extend(_walk(val))
return model_out
# Built-in containers
if isinstance(obj, dict):
dict_out: list[LLM] = []
for k, v in obj.items():
dict_out.extend(_walk(k))
dict_out.extend(_walk(v))
return dict_out
if isinstance(obj, (list, tuple, set, frozenset)):
container_out: list[LLM] = []
for item in obj:
container_out.extend(_walk(item))
return container_out
# Unknown object types: nothing to do
return ()
# Drive the traversal from self
yield from _walk(self)
@property
def tools_map(self) -> dict[str, ToolDefinition]:
"""Get the initialized tools map.
Raises:
RuntimeError: If the agent has not been initialized.
"""
if not self._initialized:
raise RuntimeError("Agent not initialized; call _initialize() before use")
return self._toolsAgent
1. 初始化:SystemPrompt与记忆/init_state
- 将
系统提示词(行为准则)和工具定义(技能说明)作为第一条消息。 - 防篡改检查:扫描前几个事件,确保
SystemPromptEvent处于起始位置。 - 状态恢复:如果是恢复,会识别出已有Prompt并跳过初始化直接进入工作状态。
2.消息准备与记忆管理(step前期)
- 确认处理:优先执行Pending Actions
- 上下文压缩(如果超长):
- 调用
prepare_llm_messages,condenser会介入把对话记录总结成摘要。
- 调用
3. 核心决策阶段 (step中期)
- 调用LLM:调用
make_llm_completion。 - 响应解析:纯文字(回复用户)或工具调用(
执行操作)。 - 异常修复:若返回JSON 格式有问题,
fix_malformed_tool_arguments会尝试自动纠错。
4. 动作安全审查
高风险操作都有关卡风险标注 (
_extract_security_risk)- LLM 标注当前
操作风险等级:读文件 LOW,写文件 MEDIUM,执行终端命令 HIGH
- LLM 标注当前
安全拦截 (
_requires_user_confirmation):- 根据
confirmation_policy,如果风险过高,Agent 会强行挂起。 - 状态变为
WAITING_FOR_CONFIRMATION,等待人类点击允许。
- 根据
自我批评 (
_evaluate_with_critic):- 如果配置了
critic属性,它会审视即将执行的 Action,给出评分和反馈。
- 如果配置了
5. 执行与反馈阶段 (_execute_action_event)
动作动作获得许可后,Agent开始与环境交互
执行动作:
调用工具(Terminal或FileEditor)。生成观察:工具返回执行结果(如编译错误、文件内容、命令输出)。
更新状态:将
Action 和 Observation(环境反馈)存入对话历史。
6.任务终点与循环
- 继续循环:如果任务未完成,Agent基于
新的观察结果,开始下一轮思考。 - 任务结束:如果LLM调用
FinishTool,状态变为FINISHED,并将控制权交还给用户。
- 用户输入
init_state(加载背景) prepare_llm_messages上下文管理 (必要时压缩) make_llm_completionLLM 推理 (生成动作) _extract_security_risk风险评估 (安全第一) _requires_user_confirmation人类干预 (高危拦截) _execute_action_event物理执行 (产生结果) - 存入 EventLog
回到步骤 2
第一步:前置检查与“记忆恢复”
# 1. 检查是否有用户刚才“待确认”的动作
pending_actions = ConversationState.get_unmatched_actions(state.events)
if pending_actions:
self._execute_actions(...) # 用户点确认后,立即执行之前的动作
return第二步:生成决策(调用 LLM)
# 2. 准备消息(这里会触发 condenser 压缩)
_messages_or_condensation = prepare_llm_messages(..., condenser=self.condenser)
# 3. 让 LLM 说话或调用工具
llm_response = make_llm_completion(self.llm, _messages, tools=...)第三步:处理工具调用与安全拦截
state.execution_status = ConversationExecutionStatus.WAITING_FOR_CONFIRMATION
return # 暂停执行,等待用户在 UI 上点“允许”第四步:执行与观察 (Observation)
如果通过了安全检查,_execute_action_event 会真正驱动 tool 运行,并将结果(Observation)存回对话历史,进入下一个循环。
import json
from pydantic import ValidationError, model_validator
import openhands.sdk.security.analyzer as analyzer
import openhands.sdk.security.risk as risk
from openhands.sdk.agent.base import AgentBase
from openhands.sdk.agent.utils import (
fix_malformed_tool_arguments,
make_llm_completion,
prepare_llm_messages,
)
from openhands.sdk.conversation import (
ConversationCallbackType,
ConversationState,
ConversationTokenCallbackType,
LocalConversation,
)
from openhands.sdk.conversation.state import ConversationExecutionStatus
from openhands.sdk.critic.base import CriticResult
from openhands.sdk.event import (
ActionEvent,
AgentErrorEvent,
LLMConvertibleEvent,
MessageEvent,
ObservationEvent,
SystemPromptEvent,
TokenEvent,
UserRejectObservation,
)
from openhands.sdk.event.condenser import (
Condensation,
CondensationRequest,
)
from openhands.sdk.llm import (
LLMResponse,
Message,
MessageToolCall,
ReasoningItemModel,
RedactedThinkingBlock,
TextContent,
ThinkingBlock,
)
from openhands.sdk.llm.exceptions import (
FunctionCallValidationError,
LLMContextWindowExceedError,
)
from openhands.sdk.logger import get_logger
from openhands.sdk.observability.laminar import (
maybe_init_laminar,
observe,
should_enable_observability,
)
from openhands.sdk.observability.utils import extract_action_name
from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.sdk.tool import (
Action,
Observation,
)
from openhands.sdk.tool.builtins import (
FinishAction,
FinishTool,
ThinkAction,
)
logger = get_logger(__name__)
maybe_init_laminar()
# Maximum number of events to scan during init_state defensive checks.
# SystemPromptEvent must appear within this prefix (at index 0 or 1).
INIT_STATE_PREFIX_SCAN_WINDOW = 3
class Agent(AgentBase):
"""Main agent implementation for OpenHands.
The Agent class provides the core functionality for running AI agents that can
interact with tools, process messages, and execute actions. It inherits from
AgentBase and implements the agent execution logic.
Example:
>>> from openhands.sdk import LLM, Agent, Tool
>>> llm = LLM(model="claude-sonnet-4-20250514", api_key=SecretStr("key"))
>>> tools = [Tool(name="TerminalTool"), Tool(name="FileEditorTool")]
>>> agent = Agent(llm=llm, tools=tools)
"""
@model_validator(mode="before")
@classmethod
def _add_security_prompt_as_default(cls, data):
"""Ensure llm_security_analyzer=True is always set before initialization."""
if not isinstance(data, dict):
return data
kwargs = data.get("system_prompt_kwargs") or {}
if not isinstance(kwargs, dict):
kwargs = {}
kwargs.setdefault("llm_security_analyzer", True)
data["system_prompt_kwargs"] = kwargs
return data
def init_state(
self,
state: ConversationState,
on_event: ConversationCallbackType,
) -> None:
"""Initialize conversation state.
Invariants enforced by this method:
- If a SystemPromptEvent is already present, it must be within the first 3
events (index 0 or 1 in practice; index 2 is included in the scan window
to detect a user message appearing before the system prompt).
- A user MessageEvent should not appear before the SystemPromptEvent.
These invariants keep event ordering predictable for downstream components
(condenser, UI, etc.) and also prevent accidentally materializing the full
event history during initialization.
"""
super().init_state(state, on_event=on_event)
# Defensive check: Analyze state to detect unexpected initialization scenarios
# These checks help diagnose issues related to lazy loading and event ordering
# See: https://github.com/OpenHands/software-agent-sdk/issues/1785
#
# NOTE: len() is O(1) for EventLog (file-backed implementation).
event_count = len(state.events)
# NOTE: state.events is intentionally an EventsListBase (Sequence-like), not
# a plain list. Avoid materializing the full history via list(state.events)
# here (conversations can reach 30k+ events).
#
# Invariant: when init_state is called, SystemPromptEvent (if present) must be
# at index 0 or 1.
#
# Rationale:
# - Local conversations start empty and init_state is responsible for adding
# the SystemPromptEvent as the first event.
# - Remote conversations may receive an initial ConversationStateUpdateEvent
# from the agent-server immediately after subscription. In a typical remote
# session prefix you may see:
# [ConversationStateUpdateEvent, SystemPromptEvent, MessageEvent, ...]
#
# We intentionally only inspect the first few events (cheap for both local and
# remote) to enforce this invariant.
prefix_events = state.events[:INIT_STATE_PREFIX_SCAN_WINDOW]
has_system_prompt = any(isinstance(e, SystemPromptEvent) for e in prefix_events)
has_user_message = any(
isinstance(e, MessageEvent) and e.source == "user" for e in prefix_events
)
# Log state for debugging initialization order issues
logger.debug(
f"init_state called: conversation_id={state.id}, "
f"event_count={event_count}, "
f"has_system_prompt={has_system_prompt}, "
f"has_user_message={has_user_message}"
)
if has_system_prompt:
# Restoring/resuming conversations is normal: a system prompt already
# present means this conversation was initialized previously.
logger.debug(
"init_state: SystemPromptEvent already present; skipping init. "
f"conversation_id={state.id}, event_count={event_count}."
)
return
# Assert: A user message should never appear before the system prompt.
#
# NOTE: This is a best-effort check based on the first few events only.
# Remote conversations can include a ConversationStateUpdateEvent near the
# start, so we scan a small prefix window.
if has_user_message:
event_types = [type(e).__name__ for e in prefix_events]
logger.error(
f"init_state: User message found in prefix before SystemPromptEvent! "
f"conversation_id={state.id}, prefix_events={event_types}"
)
raise AssertionError(
"Unexpected state: user message exists before SystemPromptEvent. "
f"conversation_id={state.id}, event_count={event_count}, "
f"prefix_event_types={event_types}."
)
# Prepare system message
event = SystemPromptEvent(
source="agent",
system_prompt=TextContent(text=self.system_message),
# Tools are stored as ToolDefinition objects and converted to
# OpenAI format with security_risk parameter during LLM completion.
# See make_llm_completion() in agent/utils.py for details.
tools=list(self.tools_map.values()),
)
on_event(event)
def _should_evaluate_with_critic(self, action: Action | None) -> bool:
"""Determine if critic should evaluate based on action type and mode."""
if self.critic is None:
return False
if self.critic.mode == "all_actions":
return True
# For "finish_and_message" mode, only evaluate FinishAction
# (MessageEvent will be handled separately in step())
if isinstance(action, FinishAction):
return True
return False
def _evaluate_with_critic(
self, conversation: LocalConversation, event: ActionEvent | MessageEvent
) -> CriticResult | None:
"""Run critic evaluation on the current event and history."""
if self.critic is None:
return None
try:
# Build event history including the current event
events = list(conversation.state.events) + [event]
llm_convertible_events = [
e for e in events if isinstance(e, LLMConvertibleEvent)
]
# Evaluate without git_patch for now
critic_result = self.critic.evaluate(
events=llm_convertible_events, git_patch=None
)
logger.info(
f"✓ Critic evaluation: score={critic_result.score:.3f}, "
f"success={critic_result.success}"
)
return critic_result
except Exception as e:
logger.error(f"✗ Critic evaluation failed: {e}", exc_info=True)
return None
def _execute_actions(
self,
conversation: LocalConversation,
action_events: list[ActionEvent],
on_event: ConversationCallbackType,
):
for action_event in action_events:
self._execute_action_event(conversation, action_event, on_event=on_event)
@observe(name="agent.step", ignore_inputs=["state", "on_event"])
def step(
self,
conversation: LocalConversation,
on_event: ConversationCallbackType,
on_token: ConversationTokenCallbackType | None = None,
) -> None:
state = conversation.state
# Check for pending actions (implicit confirmation)
# and execute them before sampling new actions.
pending_actions = ConversationState.get_unmatched_actions(state.events)
if pending_actions:
logger.info(
"Confirmation mode: Executing %d pending action(s)",
len(pending_actions),
)
self._execute_actions(conversation, pending_actions, on_event)
return
# Check if the last user message was blocked by a UserPromptSubmit hook
# If so, skip processing and mark conversation as finished
for event in reversed(list(state.events)):
if isinstance(event, MessageEvent) and event.source == "user":
reason = state.pop_blocked_message(event.id)
if reason is not None:
logger.info(f"User message blocked by hook: {reason}")
state.execution_status = ConversationExecutionStatus.FINISHED
return
break # Only check the most recent user message
# Prepare LLM messages using the utility function
_messages_or_condensation = prepare_llm_messages(
state.events, condenser=self.condenser, llm=self.llm
)
# Process condensation event before agent sampels another action
if isinstance(_messages_or_condensation, Condensation):
on_event(_messages_or_condensation)
return
_messages = _messages_or_condensation
logger.debug(
"Sending messages to LLM: "
f"{json.dumps([m.model_dump() for m in _messages[1:]], indent=2)}"
)
try:
llm_response = make_llm_completion(
self.llm,
_messages,
tools=list(self.tools_map.values()),
on_token=on_token,
)
except FunctionCallValidationError as e:
logger.warning(f"LLM generated malformed function call: {e}")
error_message = MessageEvent(
source="user",
llm_message=Message(
role="user",
content=[TextContent(text=str(e))],
),
)
on_event(error_message)
return
except LLMContextWindowExceedError as e:
# If condenser is available and handles requests, trigger condensation
if (
self.condenser is not None
and self.condenser.handles_condensation_requests()
):
logger.warning(
"LLM raised context window exceeded error, triggering condensation"
)
on_event(CondensationRequest())
return
# No condenser available or doesn't handle requests; log helpful warning
self._log_context_window_exceeded_warning()
raise e
# LLMResponse already contains the converted message and metrics snapshot
message: Message = llm_response.message
# Check if this is a reasoning-only response (e.g., from reasoning models)
# or a message-only response without tool calls
has_reasoning = (
message.responses_reasoning_item is not None
or message.reasoning_content is not None
or (message.thinking_blocks and len(message.thinking_blocks) > 0)
)
has_content = any(
isinstance(c, TextContent) and c.text.strip() for c in message.content
)
if message.tool_calls and len(message.tool_calls) > 0:
if not all(isinstance(c, TextContent) for c in message.content):
logger.warning(
"LLM returned tool calls but message content is not all "
"TextContent - ignoring non-text content"
)
# Generate unique batch ID for this LLM response
thought_content = [c for c in message.content if isinstance(c, TextContent)]
action_events: list[ActionEvent] = []
for i, tool_call in enumerate(message.tool_calls):
action_event = self._get_action_event(
tool_call,
conversation=conversation,
llm_response_id=llm_response.id,
on_event=on_event,
security_analyzer=state.security_analyzer,
thought=thought_content
if i == 0
else [], # Only first gets thought
# Only first gets reasoning content
reasoning_content=message.reasoning_content if i == 0 else None,
# Only first gets thinking blocks
thinking_blocks=list(message.thinking_blocks) if i == 0 else [],
responses_reasoning_item=message.responses_reasoning_item
if i == 0
else None,
)
if action_event is None:
continue
action_events.append(action_event)
# Handle confirmation mode - exit early if actions need confirmation
if self._requires_user_confirmation(state, action_events):
return
if action_events:
self._execute_actions(conversation, action_events, on_event)
# Emit VLLM token ids if enabled before returning
self._maybe_emit_vllm_tokens(llm_response, on_event)
return
# No tool calls - emit message event for reasoning or content responses
if not has_reasoning and not has_content:
logger.warning("LLM produced empty response - continuing agent loop")
msg_event = MessageEvent(
source="agent",
llm_message=message,
llm_response_id=llm_response.id,
)
# Run critic evaluation if configured for finish_and_message mode
if self.critic is not None and self.critic.mode == "finish_and_message":
critic_result = self._evaluate_with_critic(conversation, msg_event)
if critic_result is not None:
# Create new event with critic result
msg_event = msg_event.model_copy(
update={"critic_result": critic_result}
)
on_event(msg_event)
# Emit VLLM token ids if enabled
self._maybe_emit_vllm_tokens(llm_response, on_event)
# Finish conversation if LLM produced content (awaits user input)
# Continue if only reasoning without content (e.g., GPT-5 codex thinking)
if has_content:
logger.debug("LLM produced a message response - awaits user input")
state.execution_status = ConversationExecutionStatus.FINISHED
return
def _requires_user_confirmation(
self, state: ConversationState, action_events: list[ActionEvent]
) -> bool:
"""
Decide whether user confirmation is needed to proceed.
Rules:
1. Confirmation mode is enabled
2. Every action requires confirmation
3. A single `FinishAction` never requires confirmation
4. A single `ThinkAction` never requires confirmation
"""
# A single `FinishAction` or `ThinkAction` never requires confirmation
if len(action_events) == 1 and isinstance(
action_events[0].action, (FinishAction, ThinkAction)
):
return False
# If there are no actions there is nothing to confirm
if len(action_events) == 0:
return False
# If a security analyzer is registered, use it to grab the risks of the actions
# involved. If not, we'll set the risks to UNKNOWN.
if state.security_analyzer is not None:
risks = [
risk
for _, risk in state.security_analyzer.analyze_pending_actions(
action_events
)
]
else:
risks = [risk.SecurityRisk.UNKNOWN] * len(action_events)
# Grab the confirmation policy from the state and pass in the risks.
if any(state.confirmation_policy.should_confirm(risk) for risk in risks):
state.execution_status = (
ConversationExecutionStatus.WAITING_FOR_CONFIRMATION
)
return True
return False
def _extract_security_risk(
self,
arguments: dict,
tool_name: str,
read_only_tool: bool,
security_analyzer: analyzer.SecurityAnalyzerBase | None = None,
) -> risk.SecurityRisk:
requires_sr = isinstance(security_analyzer, LLMSecurityAnalyzer)
raw = arguments.pop("security_risk", None)
# Default risk value for action event
# Tool is marked as read-only so security risk can be ignored
if read_only_tool:
return risk.SecurityRisk.UNKNOWN
# Raises exception if failed to pass risk field when expected
# Exception will be sent back to agent as error event
# Strong models like GPT-5 can correct itself by retrying
if requires_sr and raw is None:
raise ValueError(
f"Failed to provide security_risk field in tool '{tool_name}'"
)
# When using weaker models without security analyzer
# safely ignore missing security risk fields
if not requires_sr and raw is None:
return risk.SecurityRisk.UNKNOWN
# Raises exception if invalid risk enum passed by LLM
security_risk = risk.SecurityRisk(raw)
return security_risk
def _extract_summary(self, tool_name: str, arguments: dict) -> str:
"""Extract and validate the summary field from tool arguments.
Summary field is always requested but optional - if LLM doesn't provide
it or provides invalid data, we generate a default summary using the
tool name and arguments.
Args:
tool_name: Name of the tool being called
arguments: Dictionary of tool arguments from LLM
Returns:
The summary string - either from LLM or a default generated one
"""
summary = arguments.pop("summary", None)
# If valid summary provided by LLM, use it
if summary is not None and isinstance(summary, str) and summary.strip():
return summary
# Generate default summary: {tool_name}: {arguments}
args_str = json.dumps(arguments)
return f"{tool_name}: {args_str}"
def _get_action_event(
self,
tool_call: MessageToolCall,
conversation: LocalConversation,
llm_response_id: str,
on_event: ConversationCallbackType,
security_analyzer: analyzer.SecurityAnalyzerBase | None = None,
thought: list[TextContent] | None = None,
reasoning_content: str | None = None,
thinking_blocks: list[ThinkingBlock | RedactedThinkingBlock] | None = None,
responses_reasoning_item: ReasoningItemModel | None = None,
) -> ActionEvent | None:
"""Converts a tool call into an ActionEvent, validating arguments.
NOTE: state will be mutated in-place.
"""
tool_name = tool_call.name
tool = self.tools_map.get(tool_name, None)
# Handle non-existing tools
if tool is None:
available = list(self.tools_map.keys())
err = f"Tool '{tool_name}' not found. Available: {available}"
logger.error(err)
# Persist assistant function_call so next turn has matching call_id
tc_event = ActionEvent(
source="agent",
thought=thought or [],
reasoning_content=reasoning_content,
thinking_blocks=thinking_blocks or [],
responses_reasoning_item=responses_reasoning_item,
tool_call=tool_call,
tool_name=tool_call.name,
tool_call_id=tool_call.id,
llm_response_id=llm_response_id,
action=None,
)
on_event(tc_event)
event = AgentErrorEvent(
error=err,
tool_name=tool_name,
tool_call_id=tool_call.id,
)
on_event(event)
return
# Validate arguments
security_risk: risk.SecurityRisk = risk.SecurityRisk.UNKNOWN
try:
arguments = json.loads(tool_call.arguments)
# Fix malformed arguments (e.g., JSON strings for list/dict fields)
arguments = fix_malformed_tool_arguments(arguments, tool.action_type)
security_risk = self._extract_security_risk(
arguments,
tool.name,
tool.annotations.readOnlyHint if tool.annotations else False,
security_analyzer,
)
assert "security_risk" not in arguments, (
"Unexpected 'security_risk' key found in tool arguments"
)
summary = self._extract_summary(tool.name, arguments)
action: Action = tool.action_from_arguments(arguments)
except (json.JSONDecodeError, ValidationError, ValueError) as e:
err = (
f"Error validating args {tool_call.arguments} for tool "
f"'{tool.name}': {e}"
)
# Persist assistant function_call so next turn has matching call_id
tc_event = ActionEvent(
source="agent",
thought=thought or [],
reasoning_content=reasoning_content,
thinking_blocks=thinking_blocks or [],
responses_reasoning_item=responses_reasoning_item,
tool_call=tool_call,
tool_name=tool_call.name,
tool_call_id=tool_call.id,
llm_response_id=llm_response_id,
action=None,
)
on_event(tc_event)
event = AgentErrorEvent(
error=err,
tool_name=tool_name,
tool_call_id=tool_call.id,
)
on_event(event)
return
# Create initial action event
action_event = ActionEvent(
action=action,
thought=thought or [],
reasoning_content=reasoning_content,
thinking_blocks=thinking_blocks or [],
responses_reasoning_item=responses_reasoning_item,
tool_name=tool.name,
tool_call_id=tool_call.id,
tool_call=tool_call,
llm_response_id=llm_response_id,
security_risk=security_risk,
summary=summary,
)
# Run critic evaluation if configured
if self._should_evaluate_with_critic(action):
critic_result = self._evaluate_with_critic(conversation, action_event)
if critic_result is not None:
# Create new event with critic result
action_event = action_event.model_copy(
update={"critic_result": critic_result}
)
on_event(action_event)
return action_event
@observe(ignore_inputs=["state", "on_event"])
def _execute_action_event(
self,
conversation: LocalConversation,
action_event: ActionEvent,
on_event: ConversationCallbackType,
):
"""Execute an action event and update the conversation state.
It will call the tool's executor and update the state & call callback fn
with the observation.
If the action was blocked by a PreToolUse hook (recorded in
state.blocked_actions), a UserRejectObservation is emitted instead
of executing the action.
"""
state = conversation.state
# Check if this action was blocked by a PreToolUse hook
reason = state.pop_blocked_action(action_event.id)
if reason is not None:
logger.info(f"Action '{action_event.tool_name}' blocked by hook: {reason}")
rejection = UserRejectObservation(
action_id=action_event.id,
tool_name=action_event.tool_name,
tool_call_id=action_event.tool_call_id,
rejection_reason=reason,
)
on_event(rejection)
return rejection
tool = self.tools_map.get(action_event.tool_name, None)
if tool is None:
raise RuntimeError(
f"Tool '{action_event.tool_name}' not found. This should not happen "
"as it was checked earlier."
)
# Execute actions!
try:
if should_enable_observability():
tool_name = extract_action_name(action_event)
observation: Observation = observe(name=tool_name, span_type="TOOL")(
tool
)(action_event.action, conversation)
else:
observation = tool(action_event.action, conversation)
assert isinstance(observation, Observation), (
f"Tool '{tool.name}' executor must return an Observation"
)
except ValueError as e:
# Tool execution raised a ValueError (e.g., invalid argument combination)
# Convert to AgentErrorEvent so the agent can correct itself
err = f"Error executing tool '{tool.name}': {e}"
logger.warning(err)
error_event = AgentErrorEvent(
error=err,
tool_name=tool.name,
tool_call_id=action_event.tool_call.id,
)
on_event(error_event)
return error_event
obs_event = ObservationEvent(
observation=observation,
action_id=action_event.id,
tool_name=tool.name,
tool_call_id=action_event.tool_call.id,
)
on_event(obs_event)
# Set conversation state
if tool.name == FinishTool.name:
state.execution_status = ConversationExecutionStatus.FINISHED
return obs_event
def _maybe_emit_vllm_tokens(
self, llm_response: LLMResponse, on_event: ConversationCallbackType
) -> None:
if (
"return_token_ids" in self.llm.litellm_extra_body
) and self.llm.litellm_extra_body["return_token_ids"]:
token_event = TokenEvent(
source="agent",
prompt_token_ids=llm_response.raw_response["prompt_token_ids"],
response_token_ids=llm_response.raw_response["choices"][0][
"provider_specific_fields"
]["token_ids"],
)
on_event(token_event)
def _log_context_window_exceeded_warning(self) -> None:
"""Log a helpful warning when context window is exceeded without a condenser."""
if self.condenser is None:
logger.warning(
"\n"
"=" * 80 + "\n"
"⚠️ CONTEXT WINDOW EXCEEDED ERROR\n"
"=" * 80 + "\n"
"\n"
"The LLM's context window has been exceeded, but no condenser is "
"configured.\n"
"\n"
"Current configuration:\n"
f" • Condenser: None\n"
f" • LLM Model: {self.llm.model}\n"
"\n"
"To prevent this error, configure a condenser to automatically "
"summarize\n"
"conversation history when it gets too long.\n"
"\n"
"Example configuration:\n"
"\n"
" from openhands.sdk import Agent, LLM\n"
" from openhands.sdk.context.condenser import "
"LLMSummarizingCondenser\n"
"\n"
" agent = Agent(\n"
" llm=LLM(model='your-model'),\n"
" condenser=LLMSummarizingCondenser(\n"
" llm=LLM(model='your-model'), # Can use same or "
"cheaper model\n"
" max_size=120, # Maximum events before condensation\n"
" keep_first=4 # Number of initial events to preserve\n"
" )\n"
" )\n"
"\n"
"For more information, see: "
"https://docs.openhands.dev/sdk/guides/context-condenser\n"
"=" * 80
)
else:
condenser_type = type(self.condenser).__name__
handles_requests = self.condenser.handles_condensation_requests()
condenser_config = self.condenser.model_dump(
exclude={"llm"}, exclude_none=True
)
condenser_llm_obj = getattr(self.condenser, "llm", None)
condenser_llm = (
condenser_llm_obj.model if condenser_llm_obj is not None else "N/A"
)
logger.warning(
"\n"
"=" * 80 + "\n"
"⚠️ CONTEXT WINDOW EXCEEDED ERROR\n"
"=" * 80 + "\n"
"\n"
"The LLM's context window has been exceeded.\n"
"\n"
"Current configuration:\n"
f" • Condenser Type: {condenser_type}\n"
f" • Handles Condensation Requests: {handles_requests}\n"
f" • Condenser LLM: {condenser_llm}\n"
f" • Agent LLM Model: {self.llm.model}\n"
f" • Condenser Config: {json.dumps(condenser_config, indent=4)}\n"
"\n"
"Your condenser is configured but does not handle condensation "
"requests\n"
"(handles_condensation_requests() returned False).\n"
"\n"
"To fix this:\n"
" 1. Use LLMSummarizingCondenser which handles condensation "
"requests, OR\n"
" 2. Implement handles_condensation_requests() in your custom "
"condenser\n"
"\n"
"Example with LLMSummarizingCondenser:\n"
"\n"
" from openhands.sdk.context.condenser import "
"LLMSummarizingCondenser\n"
"\n"
" agent = Agent(\n"
" llm=LLM(model='your-model'),\n"
" condenser=LLMSummarizingCondenser(\n"
" llm=LLM(model='your-model'),\n"
" max_size=120,\n"
" keep_first=4\n"
" )\n"
" )\n"
"\n"
"For more information, see: "
"https://docs.openhands.dev/sdk/guides/context-condenser\n"
"=" * 80
)OpenHands Workspace
基础返回值变量定义
代码地址
Git 相关
GitChange:GitChangeStatus (MOVED,ADDED,DELETED,UPDATED)GitDiff
命令执行
command:执行的命令;exit_code:退出codestdout:标准输出;stderr:错误输出- timeout_occurred:是否超时
文件操作(上传/下载)
success:是否成功;Error:source_path;destination_path
from enum import Enum
from pathlib import Path
from pydantic import BaseModel
class GitChangeStatus(Enum):
MOVED = "MOVED"
ADDED = "ADDED"
DELETED = "DELETED"
UPDATED = "UPDATED"
class GitChange(BaseModel):
status: GitChangeStatus
path: Path
class GitDiff(BaseModel):
modified: str | None
original: str | None"""Pydantic models for workspace operation results and build types."""
from typing import Literal
from pydantic import BaseModel, Field
TargetType = Literal["binary", "binary-minimal", "source", "source-minimal"]
PlatformType = Literal["linux/amd64", "linux/arm64"]
class CommandResult(BaseModel):
"""Result of executing a command in the workspace."""
command: str = Field(description="The command that was executed")
exit_code: int = Field(description="Exit code of the command")
stdout: str = Field(description="Standard output from the command")
stderr: str = Field(description="Standard error from the command")
timeout_occurred: bool = Field(
description="Whether the command timed out during execution"
)
class FileOperationResult(BaseModel):
"""Result of a file upload or download operation."""
success: bool = Field(description="Whether the operation was successful")
source_path: str = Field(description="Path to the source file")
destination_path: str = Field(description="Path to the destination file")
file_size: int | None = Field(
default=None, description="Size of the file in bytes (if successful)"
)
error: str | None = Field(
default=None, description="Error message (if operation failed)"
)BaseWorkspace (基础接口)
代码地址
关键属性
- working_dir:工作目录
执行命令 execute_command, 最核心
- 入参:
command、cwd(当前工作目录)、timeout
上传下载文件 file_upload & file_download
- 在
本地和沙箱环境之间,交换数据。
git_changes & git_diff
git_changes:告诉工作区 有哪些文件被改动了git_diff:具体的代码差异,Agent需要查看diff确认改动是否符合预期。
休眠和唤醒 pause & resume
- 比如Docker Pause,冻结CPU,节省资源。
Python上下文管理器协议 __enter__ 和 __exit__
- 可以使用
with workspace写代码,确保善后工作。 - 比如Docker,清理临时挂载点。
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Annotated, Any
from pydantic import BeforeValidator, Field
from openhands.sdk.git.models import GitChange, GitDiff
from openhands.sdk.logger import get_logger
from openhands.sdk.utils.models import DiscriminatedUnionMixin
from openhands.sdk.workspace.models import CommandResult, FileOperationResult
logger = get_logger(__name__)
def _convert_path_to_str(v: str | Path) -> str:
"""Convert Path objects to string for working_dir."""
if isinstance(v, Path):
return str(v)
return v
class BaseWorkspace(DiscriminatedUnionMixin, ABC):
"""Abstract base class for workspace implementations.
Workspaces provide a sandboxed environment where agents can execute commands,
read/write files, and perform other operations. All workspace implementations
support the context manager protocol for safe resource management.
Example:
>>> with workspace:
... result = workspace.execute_command("echo 'hello'")
... content = workspace.read_file("example.txt")
"""
working_dir: Annotated[
str,
BeforeValidator(_convert_path_to_str),
Field(
description=(
"The working directory for agent operations and tool execution. "
"Accepts both string paths and Path objects. "
"Path objects are automatically converted to strings."
)
),
]
def __enter__(self) -> "BaseWorkspace":
"""Enter the workspace context.
Returns:
Self for use in with statements
"""
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the workspace context and cleanup resources.
Default implementation performs no cleanup. Subclasses should override
to add cleanup logic (e.g., stopping containers, closing connections).
Args:
exc_type: Exception type if an exception occurred
exc_val: Exception value if an exception occurred
exc_tb: Exception traceback if an exception occurred
"""
pass
@abstractmethod
def execute_command(
self,
command: str,
cwd: str | Path | None = None,
timeout: float = 30.0,
) -> CommandResult:
"""Execute a bash command on the system.
Args:
command: The bash command to execute
cwd: Working directory for the command (optional)
timeout: Timeout in seconds (defaults to 30.0)
Returns:
CommandResult: Result containing stdout, stderr, exit_code, and other
metadata
Raises:
Exception: If command execution fails
"""
...
@abstractmethod
def file_upload(
self,
source_path: str | Path,
destination_path: str | Path,
) -> FileOperationResult:
"""Upload a file to the system.
Args:
source_path: Path to the source file
destination_path: Path where the file should be uploaded
Returns:
FileOperationResult: Result containing success status and metadata
Raises:
Exception: If file upload fails
"""
...
@abstractmethod
def file_download(
self,
source_path: str | Path,
destination_path: str | Path,
) -> FileOperationResult:
"""Download a file from the system.
Args:
source_path: Path to the source file on the system
destination_path: Path where the file should be downloaded
Returns:
FileOperationResult: Result containing success status and metadata
Raises:
Exception: If file download fails
"""
...
@abstractmethod
def git_changes(self, path: str | Path) -> list[GitChange]:
"""Get the git changes for the repository at the path given.
Args:
path: Path to the git repository
Returns:
list[GitChange]: List of changes
Raises:
Exception: If path is not a git repository or getting changes failed
"""
@abstractmethod
def git_diff(self, path: str | Path) -> GitDiff:
"""Get the git diff for the file at the path given.
Args:
path: Path to the file
Returns:
GitDiff: Git diff
Raises:
Exception: If path is not a git repository or getting diff failed
"""
def pause(self) -> None:
"""Pause the workspace to conserve resources.
For local workspaces, this is a no-op.
For container-based workspaces, this pauses the container.
Raises:
NotImplementedError: If the workspace type does not support pausing.
"""
raise NotImplementedError(f"{type(self).__name__} does not support pause()")
def resume(self) -> None:
"""Resume a paused workspace.
For local workspaces, this is a no-op.
For container-based workspaces, this resumes the container.
Raises:
NotImplementedError: If the workspace type does not support resuming.
"""
raise NotImplementedError(f"{type(self).__name__} does not support resume()")RemoteWorkspace
代码地址
核心架构
BaseWorkspace:提供标准接口定义RemoteWorkspaceMixin:真正执行细节,如构造URL、处理HTTP负载、解析API响应等。RemoteWorkspace:调用者/接口,通过_execute方法驱动Mixin产生的generator。
三级传递
- 第一级:
SDK (代码)。- 把
ls等命令打包成HTTP Request,发送出去
- 把
- 第二级:
AgentServer (FastAPI)。接收HTTP请求,解析JSON,调用底层runtime。- 异步的,
启动命令返回command_id,然后client做状态轮询。
- 第三级:
Runtime/Sandbox(Docker)- 真正在linux容器里
执行ls命令,把结果重定向输出给Server。
- 真正在linux容器里
Client-Server Generator 机制 (execute, Start+Poll)
背景
- 执行远程操作可能非常耗时。
Client-Server
RemoteWorkspace本质是一个HTTP ClientServer:openhands-agent-server接收请求,在它远端容器里执行命令。
Start-Polling 轮询机制
- 第一步:
发送启动命令请求 - 第二步:命令没完成,
Generator要求轮询Polling State,不断询问是否完成。 - 第三步:
多次循环后,命令结束,Generator产生最终结果。
特点
- 把网络通信和业务逻辑解耦。
- 根据
对应方法,构造对应的Generator,调用执行 - 从
generator获得初始kwargs - 循环调用:
根据kwargs发起请求[httpx.client],获得response- 把
response给到[generator],生成新的kwargs - 不断重复1和2,
直到终止
def execute_command(
self,
command: str,
cwd: str | Path | None = None,
timeout: float = 30.0,
) -> CommandResult:
"""Execute a bash command on the remote system.
This method starts a bash command via the remote agent server API,
then polls for the output until the command completes.
Args:
command: The bash command to execute
cwd: Working directory (optional)
timeout: Timeout in seconds
Returns:
CommandResult: Result with stdout, stderr, exit_code, and other metadata
"""
# 产生generator和调用执行
generator = self._execute_command_generator(command, cwd, timeout)
result = self._execute(generator)
return result
def _execute(self, generator: Generator[dict[str, Any], httpx.Response, Any]):
try:
# 1. 问generator:第一个请求怎么发?
kwargs = next(generator)
while True:
# 2. 跑腿:去服务器发请求,拿回response
response = self.client.request(**kwargs)
# 3. 把response给generator,产出下一个请求参数
kwargs = generator.send(response)
except StopIteration as e:
return e.valueStart 启动命令
- 目的:发送
命令请求,获得command_id - url:
{self.host}/api/bash/start_bash_command - json:
{"command": command} - 类型:
post请求 - 返回:
command_id
轮询 状态
- 目的:根据
command_id轮询命令状态,获得events - url:
{self.host}/api/bash/bash_events/search - json:
{"command_id__eq": "command_id", ...} get 请求- 返回:
search_result,多个events- 从
event里获取:stdout,stderr,exit_code等内容。
- 从
最终返回结果
stdout:拼接多个stdout_partsstderr: 拼接多个stderr_partsexit_code- 最终,组成
CommandResult对象作为最终返回结果
def _execute_command_generator(
self,
command: str,
cwd: str | Path | None,
timeout: float,
) -> Generator[dict[str, Any], httpx.Response, CommandResult]:
"""Execute a bash command on the remote system.
This method starts a bash command via the remote agent server API,
then polls for the output until the command completes.
Args:
command: The bash command to execute
cwd: Working directory (optional)
timeout: Timeout in seconds
Returns:
CommandResult: Result with stdout, stderr, exit_code, and other metadata
"""
_logger.debug(f"Executing remote command: {command}")
# Step 1: Start the bash command
payload = {
"command": command,
"timeout": int(timeout),
}
if cwd is not None:
payload["cwd"] = str(cwd)
try:
# Start the command
# 1. start,获取command_id
response: httpx.Response = yield {
"method": "POST",
"url": f"{self.host}/api/bash/start_bash_command",
"json": payload,
"headers": self._headers,
"timeout": timeout + 5.0, # Add buffer to HTTP timeout
}
response.raise_for_status()
bash_command = response.json()
command_id = bash_command["id"]
_logger.debug(f"Started command with ID: {command_id}")
# Step 2: Poll for output until command completes
# 2. 根据command_id,轮询状态是否完成
start_time = time.time()
stdout_parts = []
stderr_parts = []
exit_code = None
last_order = -1 # Track highest order seen to fetch only new events
seen_event_ids: set[str] = set() # Track seen IDs to detect duplicates
while time.time() - start_time < timeout:
# Search for new events (order > last_order)
params: dict[str, str | int] = {
"command_id__eq": command_id,
"sort_order": "TIMESTAMP",
"limit": 100,
"kind__eq": "BashOutput",
}
if last_order >= 0:
params["order__gt"] = last_order
response = yield {
"method": "GET",
"url": f"{self.host}/api/bash/bash_events/search",
"params": params,
"headers": self._headers,
"timeout": timeout,
}
response.raise_for_status()
search_result = response.json()
# Process BashOutput events
for event in search_result.get("items", []):
if event.get("kind") == "BashOutput":
# Check for duplicates - safety check in case caller
# forgets to add kind__eq filter or API has a bug
event_id = event.get("id")
if event_id is not None:
if event_id in seen_event_ids:
raise RuntimeError(
f"Duplicate event received: {event_id}. "
"This should not happen with order__gt "
"filtering and kind filtering."
)
seen_event_ids.add(event_id)
# Track the highest order we've seen
event_order = event.get("order")
if event_order is not None and event_order > last_order:
last_order = event_order
if event.get("stdout"):
stdout_parts.append(event["stdout"])
if event.get("stderr"):
stderr_parts.append(event["stderr"])
if event.get("exit_code") is not None:
exit_code = event["exit_code"]
# If we have an exit code, the command is complete
if exit_code is not None:
break
# Wait a bit before polling again
time.sleep(0.1)
# If we timed out waiting for completion
if exit_code is None:
_logger.warning(f"Command timed out after {timeout} seconds: {command}")
exit_code = -1
stderr_parts.append(f"Command timed out after {timeout} seconds")
# Combine all output parts
stdout = "".join(stdout_parts)
stderr = "".join(stderr_parts)
return CommandResult(
command=command,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
timeout_occurred=exit_code == -1 and "timed out" in stderr,
)
except Exception as e:
_logger.error(f"Remote command execution failed: {e}")
return CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr=f"Remote execution error: {str(e)}",
timeout_occurred=False,
)RemoteWorkspace-源代码
属性
Client:httpx.client
方法
- 实现了
BaseWorkspace定义接口 - 同时
继承RemoteWorkspaceMixin(generator),拥有API请求细节。 - 新增方法:
_execute,结合generator轮询状态。
class RemoteWorkspace(RemoteWorkspaceMixin, BaseWorkspace):
"""Remote workspace implementation that connects to an OpenHands agent server.
RemoteWorkspace provides access to a sandboxed environment running on a remote
OpenHands agent server. This is the recommended approach for production deployments
as it provides better isolation and security.
Example:
>>> workspace = RemoteWorkspace(
... host="https://agent-server.example.com",
... working_dir="/workspace"
... )
>>> with workspace:
... result = workspace.execute_command("ls -la")
... content = workspace.read_file("README.md")
"""
_client: httpx.Client | None = PrivateAttr(default=None)
def reset_client(self) -> None:
"""Reset the HTTP client to force re-initialization.
This is useful when connection parameters (host, api_key) have changed
and the client needs to be recreated with new values.
"""
if self._client is not None:
try:
self._client.close()
except Exception:
pass
self._client = None
@property
def client(self) -> httpx.Client:
client = self._client
if client is None:
# Configure reasonable timeouts for HTTP requests
# - connect: 10 seconds to establish connection
# - read: 600 seconds (10 minutes) to read response (for LLM operations)
# - write: 10 seconds to send request
# - pool: 10 seconds to get connection from pool
timeout = httpx.Timeout(
connect=10.0, read=self.read_timeout, write=10.0, pool=10.0
)
client = httpx.Client(
base_url=self.host,
timeout=timeout,
headers=self._headers,
limits=httpx.Limits(max_connections=self.max_connections),
)
self._client = client
return client
def _execute(self, generator: Generator[dict[str, Any], httpx.Response, Any]):
try:
kwargs = next(generator)
while True:
response = self.client.request(**kwargs)
kwargs = generator.send(response)
except StopIteration as e:
return e.value
def execute_command(
self,
command: str,
cwd: str | Path | None = None,
timeout: float = 30.0,
) -> CommandResult:
"""Execute a bash command on the remote system.
This method starts a bash command via the remote agent server API,
then polls for the output until the command completes.
Args:
command: The bash command to execute
cwd: Working directory (optional)
timeout: Timeout in seconds
Returns:
CommandResult: Result with stdout, stderr, exit_code, and other metadata
"""
generator = self._execute_command_generator(command, cwd, timeout)
result = self._execute(generator)
return result
def file_upload(
self,
source_path: str | Path,
destination_path: str | Path,
) -> FileOperationResult:
"""Upload a file to the remote system.
Reads the local file and sends it to the remote system via HTTP API.
Args:
source_path: Path to the local source file
destination_path: Path where the file should be uploaded on remote system
Returns:
FileOperationResult: Result with success status and metadata
"""
generator = self._file_upload_generator(source_path, destination_path)
result = self._execute(generator)
return result
def file_download(
self,
source_path: str | Path,
destination_path: str | Path,
) -> FileOperationResult:
"""Download a file from the remote system.
Requests the file from the remote system via HTTP API and saves it locally.
Args:
source_path: Path to the source file on remote system
destination_path: Path where the file should be saved locally
Returns:
FileOperationResult: Result with success status and metadata
"""
generator = self._file_download_generator(source_path, destination_path)
result = self._execute(generator)
return result
def git_changes(self, path: str | Path) -> list[GitChange]:
"""Get the git changes for the repository at the path given.
Args:
path: Path to the git repository
Returns:
list[GitChange]: List of changes
Raises:
Exception: If path is not a git repository or getting changes failed
"""
generator = self._git_changes_generator(path)
result = self._execute(generator)
return result
def git_diff(self, path: str | Path) -> GitDiff:
"""Get the git diff for the file at the path given.
Args:
path: Path to the file
Returns:
GitDiff: Git diff
Raises:
Exception: If path is not a git repository or getting diff failed
"""
generator = self._git_diff_generator(path)
result = self._execute(generator)
return result
@property
def alive(self) -> bool:
"""Check if the remote workspace is alive by querying the health endpoint.
Returns:
True if the health endpoint returns a successful response, False otherwise.
"""
try:
health_url = f"{self.host}/health"
with urlopen(health_url, timeout=5.0) as resp:
status = getattr(resp, "status", 200)
return 200 <= status < 300
except Exception:
return FalseRemoteWorkspaceMixin-源代码
属性
host,api_keyworking_dir,read_timeout,max_connections
核心功能
generator,实现了各操作各阶段的http请求参数- 支持
执行命令、文件上传下载、git状态等。
class RemoteWorkspaceMixin(BaseModel):
"""Mixin providing remote workspace operations.
This allows the same code to be used for sync and async."""
host: str = Field(description="The remote host URL for the workspace.")
api_key: str | None = Field(
default=None, description="API key for authenticating with the remote host."
)
working_dir: str = Field(
description="The working directory for agent operations and tool execution."
)
read_timeout: float = Field(
default=600.0,
description="Timeout in seconds for reading operations of httpx.Client.",
)
max_connections: int | None = Field(
default=None,
description="Maximum number of connections for httpx.Client. "
"None means no limit, useful for running many conversations in parallel.",
)
def model_post_init(self, context: Any) -> None:
# Set up remote host
self.host = self.host.rstrip("/")
return super().model_post_init(context)
@property
def _headers(self):
headers = {}
if self.api_key:
headers["X-Session-API-Key"] = self.api_key
return headers
def _execute_command_generator(
self,
command: str,
cwd: str | Path | None,
timeout: float,
) -> Generator[dict[str, Any], httpx.Response, CommandResult]:
"""Execute a bash command on the remote system.
This method starts a bash command via the remote agent server API,
then polls for the output until the command completes.
Args:
command: The bash command to execute
cwd: Working directory (optional)
timeout: Timeout in seconds
Returns:
CommandResult: Result with stdout, stderr, exit_code, and other metadata
"""
_logger.debug(f"Executing remote command: {command}")
# Step 1: Start the bash command
payload = {
"command": command,
"timeout": int(timeout),
}
if cwd is not None:
payload["cwd"] = str(cwd)
try:
# Start the command
response: httpx.Response = yield {
"method": "POST",
"url": f"{self.host}/api/bash/start_bash_command",
"json": payload,
"headers": self._headers,
"timeout": timeout + 5.0, # Add buffer to HTTP timeout
}
response.raise_for_status()
bash_command = response.json()
command_id = bash_command["id"]
_logger.debug(f"Started command with ID: {command_id}")
# Step 2: Poll for output until command completes
start_time = time.time()
stdout_parts = []
stderr_parts = []
exit_code = None
last_order = -1 # Track highest order seen to fetch only new events
seen_event_ids: set[str] = set() # Track seen IDs to detect duplicates
while time.time() - start_time < timeout:
# Search for new events (order > last_order)
params: dict[str, str | int] = {
"command_id__eq": command_id,
"sort_order": "TIMESTAMP",
"limit": 100,
"kind__eq": "BashOutput",
}
if last_order >= 0:
params["order__gt"] = last_order
response = yield {
"method": "GET",
"url": f"{self.host}/api/bash/bash_events/search",
"params": params,
"headers": self._headers,
"timeout": timeout,
}
response.raise_for_status()
search_result = response.json()
# Process BashOutput events
for event in search_result.get("items", []):
if event.get("kind") == "BashOutput":
# Check for duplicates - safety check in case caller
# forgets to add kind__eq filter or API has a bug
event_id = event.get("id")
if event_id is not None:
if event_id in seen_event_ids:
raise RuntimeError(
f"Duplicate event received: {event_id}. "
"This should not happen with order__gt "
"filtering and kind filtering."
)
seen_event_ids.add(event_id)
# Track the highest order we've seen
event_order = event.get("order")
if event_order is not None and event_order > last_order:
last_order = event_order
if event.get("stdout"):
stdout_parts.append(event["stdout"])
if event.get("stderr"):
stderr_parts.append(event["stderr"])
if event.get("exit_code") is not None:
exit_code = event["exit_code"]
# If we have an exit code, the command is complete
if exit_code is not None:
break
# Wait a bit before polling again
time.sleep(0.1)
# If we timed out waiting for completion
if exit_code is None:
_logger.warning(f"Command timed out after {timeout} seconds: {command}")
exit_code = -1
stderr_parts.append(f"Command timed out after {timeout} seconds")
# Combine all output parts
stdout = "".join(stdout_parts)
stderr = "".join(stderr_parts)
return CommandResult(
command=command,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
timeout_occurred=exit_code == -1 and "timed out" in stderr,
)
except Exception as e:
_logger.error(f"Remote command execution failed: {e}")
return CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr=f"Remote execution error: {str(e)}",
timeout_occurred=False,
)
def _file_upload_generator(
self,
source_path: str | Path,
destination_path: str | Path,
) -> Generator[dict[str, Any], httpx.Response, FileOperationResult]:
"""Upload a file to the remote system.
Reads the local file and sends it to the remote system via HTTP API.
Args:
source_path: Path to the local source file
destination_path: Path where the file should be uploaded on remote system
Returns:
FileOperationResult: Result with success status and metadata
"""
source = Path(source_path)
destination = Path(destination_path)
_logger.debug(f"Remote file upload: {source} -> {destination}")
try:
# Read the file content
with open(source, "rb") as f:
file_content = f.read()
# Prepare the upload
files = {"file": (source.name, file_content)}
data = {"destination_path": str(destination)}
# Make HTTP call
response: httpx.Response = yield {
"method": "POST",
"url": f"{self.host}/api/file/upload/{destination}",
"files": files,
"data": data,
"headers": self._headers,
"timeout": 60.0,
}
response.raise_for_status()
result_data = response.json()
# Convert the API response to our model
return FileOperationResult(
success=result_data.get("success", True),
source_path=str(source),
destination_path=str(destination),
file_size=result_data.get("file_size"),
error=result_data.get("error"),
)
except Exception as e:
_logger.error(f"Remote file upload failed: {e}")
return FileOperationResult(
success=False,
source_path=str(source),
destination_path=str(destination),
error=str(e),
)
def _file_download_generator(
self,
source_path: str | Path,
destination_path: str | Path,
) -> Generator[dict[str, Any], httpx.Response, FileOperationResult]:
"""Download a file from the remote system.
Requests the file from the remote system via HTTP API and saves it locally.
Args:
source_path: Path to the source file on remote system
destination_path: Path where the file should be saved locally
Returns:
FileOperationResult: Result with success status and metadata
"""
source = Path(source_path)
destination = Path(destination_path)
_logger.debug(f"Remote file download: {source} -> {destination}")
try:
# Construct URL with path parameter (not query parameter)
# Double slash ensures FastAPI extracts path with leading slash
# for absolute path validation
source_str = str(source)
url = f"/api/file/download//{source_str.lstrip('/')}"
# Make HTTP call
response = yield {
"method": "GET",
"url": url,
"headers": self._headers,
"timeout": 60.0,
}
response.raise_for_status()
# Ensure destination directory exists
destination.parent.mkdir(parents=True, exist_ok=True)
# Write the file content
with open(destination, "wb") as f:
f.write(response.content)
return FileOperationResult(
success=True,
source_path=str(source),
destination_path=str(destination),
file_size=len(response.content),
)
except Exception as e:
_logger.error(f"Remote file download failed: {e}")
return FileOperationResult(
success=False,
source_path=str(source),
destination_path=str(destination),
error=str(e),
)
def _git_changes_generator(
self,
path: str | Path,
) -> Generator[dict[str, Any], httpx.Response, list[GitChange]]:
"""Get the git changes for the repository at the path given.
Args:
path: Path to the git repository
Returns:
list[GitChange]: List of changes
Raises:
Exception: If path is not a git repository or getting changes failed
"""
# Make HTTP call
response = yield {
"method": "GET",
"url": Path("/api/git/changes") / self.working_dir / path,
"headers": self._headers,
"timeout": 60.0,
}
response.raise_for_status()
type_adapter = TypeAdapter(list[GitChange])
changes = type_adapter.validate_python(response.json())
return changes
def _git_diff_generator(
self,
path: str | Path,
) -> Generator[dict[str, Any], httpx.Response, GitDiff]:
"""Get the git diff for the file at the path given.
Args:
path: Path to the file
Returns:
GitDiff: Git diff
Raises:
Exception: If path is not a git repository or getting diff failed
"""
# Make HTTP call
response = yield {
"method": "GET",
"url": Path("/api/git/diff") / self.working_dir / path,
"headers": self._headers,
"timeout": 60.0,
}
response.raise_for_status()
diff = GitDiff.model_validate(response.json())
return diffDockerWorkspace
核心功能
代码地址
核心功能
- 构建
docker容器,运行之前预定义好的openhands agent image,也只能运行这个镜像。 - 同时继承自RemoteWorkspace,通过HTTP API 提供remote操作
关键属性
server_image:哪一个镜像,也就是openhands-agent-server的镜像。host_port:端口。_image_name,_container_idworking_dir,forward_env:工作目录,volumes&mount_dir:挂载,宿主机和容器做文件同步
关键方法
model_post_init:Pydantic V2当属性填充完成以后,自动会运行此方法。_start_container:启动容器cleanup:关闭容器
使用demo,执行完成以后,会自动调用cleanup方法。
with DockerWorkspace(
server_image="ghcr.io/openhands/agent-server:latest"
) as workspace:
result = workspace.execute_command("ls -la")Start Contanier 启动容器
def _start_container(self, image: str, context: Any) -> None:
"""Start the Docker container with the given image.
This method handles all container lifecycle: port allocation, Docker
validation, container creation, health checks, and RemoteWorkspace
initialization.
Args:
image: The Docker image tag to use.
context: The Pydantic context from model_post_init.
"""
# Store the image name for cleanup
self._image_name = image
# Determine port
if self.host_port is None:
self.host_port = find_available_tcp_port()
else:
self.host_port = int(self.host_port)
if not check_port_available(self.host_port):
raise RuntimeError(f"Port {self.host_port} is not available")
if self.extra_ports:
if not check_port_available(self.host_port + 1):
raise RuntimeError(
f"Port {self.host_port + 1} is not available for VSCode"
)
if not check_port_available(self.host_port + 2):
raise RuntimeError(
f"Port {self.host_port + 2} is not available for VNC"
)
# 确定保证docker 可用
docker_ver = execute_command(["docker", "version"]).returncode
if docker_ver != 0:
raise RuntimeError(
"Docker is not available. Please install and start "
"Docker Desktop/daemon."
)
# Prepare Docker run flags
flags: list[str] = []
for key in self.forward_env:
if key in os.environ:
flags += ["-e", f"{key}={os.environ[key]}"]
for volume in self.volumes:
flags += ["-v", volume]
logger.info(f"Adding volume mount: {volume}")
ports = ["-p", f"{self.host_port}:8000"]
if self.extra_ports:
ports += [
"-p",
f"{self.host_port + 1}:8001", # VSCode
"-p",
f"{self.host_port + 2}:8002", # Desktop VNC
]
flags += ports
# Add GPU support if enabled
if self.enable_gpu:
flags += ["--gpus", "all"]
# Docker运行命令,根据image启动一个容器
run_cmd = [
"docker",
"run",
"-d",
"--platform",
self.platform,
"--rm",
"--ulimit",
"nofile=65536:65536", # prevent "too many open files" errors
"--name",
f"agent-server-{uuid.uuid4()}",
*flags,
image,
"--host",
"0.0.0.0",
"--port",
"8000",
]
proc = execute_command(run_cmd)
if proc.returncode != 0:
raise RuntimeError(f"Failed to run docker container: {proc.stderr}")
# 自动获取容器id
self._container_id = proc.stdout.strip()
logger.info(f"Started container: {self._container_id}")
# Optionally stream logs in background
if self.detach_logs:
self._logs_thread = threading.Thread(
target=self._stream_docker_logs, daemon=True
)
self._logs_thread.start()
# Set host for RemoteWorkspace to use
# The container exposes port 8000, mapped to self.host_port
# Override parent's host initialization
# 动态重写host
object.__setattr__(self, "host", f"http://localhost:{self.host_port}")
# 无需API_KEY
object.__setattr__(self, "api_key", None)
# 可靠性保障, 确保FastAPI等服务初始化完成 Wait for container to be healthy
self._wait_for_health()
logger.info(f"Docker workspace is ready at {self.host}")
# Now initialize the parent RemoteWorkspace with the container URL
super().model_post_init(context)wait_for_health
def _wait_for_health(self, timeout: float = 120.0) -> None:
"""Wait for the Docker container to become healthy."""
start = time.time()
health_url = f"http://127.0.0.1:{self.host_port}/health"
while time.time() - start < timeout:
try:
with urlopen(health_url, timeout=1.0) as resp:
if 200 <= getattr(resp, "status", 200) < 300:
return
except Exception:
pass
# Check if container is still running
if self._container_id:
ps = execute_command(
[
"docker",
"inspect",
"-f",
"{{.State.Running}}",
self._container_id,
]
)
if ps.stdout.strip() != "true":
logs = execute_command(["docker", "logs", self._container_id])
msg = (
"Container stopped unexpectedly. Logs:\n"
f"{logs.stdout}\n{logs.stderr}"
)
raise RuntimeError(msg)
time.sleep(1)
raise RuntimeError("Container failed to become healthy in time")关闭容器
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def]
"""Context manager exit - cleans up the Docker container."""
self.cleanup()
def __del__(self) -> None:
"""Clean up the Docker container when the workspace is destroyed."""
self.cleanup()
def cleanup(self) -> None:
"""Stop and remove the Docker container."""
if self._container_id:
# Stop logs streaming
self._stop_logs.set()
if self._logs_thread and self._logs_thread.is_alive():
self._logs_thread.join(timeout=2)
# Stop and remove the container
logger.info(f"Stopping container: {self._container_id}")
execute_command(["docker", "stop", self._container_id])
self._container_id = None
# Optionally delete the Docker image
if self.cleanup_image and self._image_name:
logger.info(f"Deleting Docker image: {self._image_name}")
result = execute_command(["docker", "rmi", "-f", self._image_name])
if result.returncode == 0:
logger.info(f"Successfully deleted image: {self._image_name}")
else:
logger.warning(
f"Failed to delete image {self._image_name}: {result.stderr}"
)
self._image_name = None
def pause(self) -> None:
"""Pause the Docker container to conserve resources.
Uses `docker pause` to freeze all processes in the container without
stopping it. The container can be resumed later with `resume()`.
Raises:
RuntimeError: If the container is not running or pause fails.
"""
if not self._container_id:
raise RuntimeError("Cannot pause: container is not running")
logger.info(f"Pausing container: {self._container_id}")
result = execute_command(["docker", "pause", self._container_id])
if result.returncode != 0:
raise RuntimeError(f"Failed to pause container: {result.stderr}")
logger.info(f"Container paused: {self._container_id}")
def resume(self) -> None:
"""Resume a paused Docker container.
Uses `docker unpause` to resume all processes in the container.
Raises:
RuntimeError: If the container is not running or resume fails.
"""
if not self._container_id:
raise RuntimeError("Cannot resume: container is not running")
logger.info(f"Resuming container: {self._container_id}")
result = execute_command(["docker", "unpause", self._container_id])
if result.returncode != 0:
raise RuntimeError(f"Failed to resume container: {result.stderr}")
# Wait for health after resuming (use same timeout as initial startup)
self._wait_for_health(timeout=120.0)
logger.info(f"Container resumed: {self._container_id}")宿主机执行命令(非容器内部)
宿主机执行命令
utils.command.execute_command- 执行者:
OpenHands 框架本身 - 调用
Python的subprocess模块,启动管理Docker容器
容器内部执行命令
workspace.execute_command- 执行者:
容器内部 - 通过
HTTP发给容器里的Server,让Agent去改代码、跑测试
def execute_command(
cmd: list[str] | str,
env: dict[str, str] | None = None,
cwd: str | None = None,
timeout: float | None = None,
print_output: bool = True,
) -> subprocess.CompletedProcess:
# For string commands, use shell=True to handle shell operators properly
# 字符串,就直接使用shell;如果是list,则直接传给内核
if isinstance(cmd, str):
cmd_to_run = cmd
use_shell = True # 字符串模式:交给 Shell 拼接和解析
logger.info("$ %s", cmd)
else:
cmd_to_run = cmd
use_shell = False # 列表模式:直接传给内核,不走 Shell
# 注意这里!
logger.info("$ %s", " ".join(shlex.quote(c) for c in cmd))
proc = subprocess.Popen(
cmd_to_run,
cwd=cwd,
env=sanitized_env(env),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
shell=use_shell,
)
if proc is None:
raise RuntimeError("Failed to start process")
# Read line by line, echo to parent stdout/stderr
stdout_lines: list[str] = []
stderr_lines: list[str] = []
if proc.stdout is None or proc.stderr is None:
raise RuntimeError("Failed to capture stdout/stderr")
def read_stream(stream, lines, output_stream):
try:
for line in stream:
if print_output:
output_stream.write(line)
output_stream.flush()
lines.append(line)
except Exception as e:
logger.error(f"Failed to read stream: {e}")
# Read stdout and stderr concurrently to avoid deadlock
stdout_thread = threading.Thread(
target=read_stream, args=(proc.stdout, stdout_lines, sys.stdout)
)
stderr_thread = threading.Thread(
target=read_stream, args=(proc.stderr, stderr_lines, sys.stderr)
)
stdout_thread.start()
stderr_thread.start()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
stdout_thread.join()
stderr_thread.join()
return subprocess.CompletedProcess(
cmd_to_run,
-1, # Indicate timeout with -1 exit code
"".join(stdout_lines),
"".join(stderr_lines),
)
stdout_thread.join(timeout=timeout)
stderr_thread.join(timeout=timeout)
return subprocess.CompletedProcess(
cmd_to_run,
proc.returncode,
"".join(stdout_lines),
"".join(stderr_lines),
)源代码
class DockerWorkspace(RemoteWorkspace):
"""Remote workspace that sets up and manages a Docker container.
This workspace creates a Docker container running a pre-built OpenHands agent
server image, waits for it to become healthy, and then provides remote workspace
operations through the container's HTTP API.
Note: This class only works with pre-built images. To build images on-the-fly
from a base image, use DockerDevWorkspace instead.
Example:
with DockerWorkspace(
server_image="ghcr.io/openhands/agent-server:latest"
) as workspace:
result = workspace.execute_command("ls -la")
"""
# Override parent fields with defaults
working_dir: str = Field(
default="/workspace",
description="Working directory inside the container.",
)
host: str = Field(
default="",
description=("Remote host URL (set automatically during container startup)."),
)
# Docker-specific configuration
server_image: str | None = Field(
default="ghcr.io/openhands/agent-server:latest-python",
description="Pre-built agent server image to use.",
)
host_port: int | None = Field(
default=None,
description="Port to bind the container to. If None, finds available port.",
)
forward_env: list[str] = Field(
default_factory=lambda: ["DEBUG"],
description="Environment variables to forward to the container.",
)
mount_dir: str | None = Field(
default=None,
description="Optional host directory to mount into the container.",
)
volumes: list[str] = Field(
default_factory=list,
description="Additional volume mounts for the Docker container.",
)
detach_logs: bool = Field(
default=True, description="Whether to stream Docker logs in background."
)
platform: PlatformType = Field(
default="linux/amd64", description="Platform for the Docker image."
)
extra_ports: bool = Field(
default=False,
description="Whether to expose additional ports (VSCode, VNC).",
)
enable_gpu: bool = Field(
default=False,
description="Whether to enable GPU support with --gpus all.",
)
cleanup_image: bool = Field(
default=False,
description="Whether to delete the Docker image when cleaning up workspace.",
)
_container_id: str | None = PrivateAttr(default=None)
_image_name: str | None = PrivateAttr(default=None)
_logs_thread: threading.Thread | None = PrivateAttr(default=None)
_stop_logs: threading.Event = PrivateAttr(default_factory=threading.Event)
@model_validator(mode="after")
def _validate_server_image(self):
"""Ensure server_image is set when using DockerWorkspace directly."""
if self.__class__ is DockerWorkspace and self.server_image is None:
raise ValueError("server_image must be provided")
return self
@model_validator(mode="after")
def _validate_mount_dir(self):
if self.mount_dir:
warn_deprecated(
"DockerWorkspace.mount_dir",
deprecated_in="1.10.0",
removed_in=None,
details="Use DockerWorkspace.volumes instead",
)
self.volumes.append(f"{self.mount_dir}:/workspace")
return self
def model_post_init(self, context: Any) -> None:
"""Set up the Docker container and initialize the remote workspace."""
# Subclasses should call get_image() to get the image to use
# This allows them to build or prepare the image before container startup
image = self.get_image()
self._start_container(image, context)
def get_image(self) -> str:
"""Get the Docker image to use for the container.
Subclasses can override this to provide custom image resolution logic
(e.g., building images on-the-fly).
Returns:
The Docker image tag to use.
"""
if self.server_image is None:
raise ValueError("server_image must be set")
return self.server_image
def _start_container(self, image: str, context: Any) -> None:
"""Start the Docker container with the given image.
This method handles all container lifecycle: port allocation, Docker
validation, container creation, health checks, and RemoteWorkspace
initialization.
Args:
image: The Docker image tag to use.
context: The Pydantic context from model_post_init.
"""
# Store the image name for cleanup
self._image_name = image
# Determine port
if self.host_port is None:
self.host_port = find_available_tcp_port()
else:
self.host_port = int(self.host_port)
if not check_port_available(self.host_port):
raise RuntimeError(f"Port {self.host_port} is not available")
if self.extra_ports:
if not check_port_available(self.host_port + 1):
raise RuntimeError(
f"Port {self.host_port + 1} is not available for VSCode"
)
if not check_port_available(self.host_port + 2):
raise RuntimeError(
f"Port {self.host_port + 2} is not available for VNC"
)
# Ensure docker is available
docker_ver = execute_command(["docker", "version"]).returncode
if docker_ver != 0:
raise RuntimeError(
"Docker is not available. Please install and start "
"Docker Desktop/daemon."
)
# Prepare Docker run flags
flags: list[str] = []
for key in self.forward_env:
if key in os.environ:
flags += ["-e", f"{key}={os.environ[key]}"]
for volume in self.volumes:
flags += ["-v", volume]
logger.info(f"Adding volume mount: {volume}")
ports = ["-p", f"{self.host_port}:8000"]
if self.extra_ports:
ports += [
"-p",
f"{self.host_port + 1}:8001", # VSCode
"-p",
f"{self.host_port + 2}:8002", # Desktop VNC
]
flags += ports
# Add GPU support if enabled
if self.enable_gpu:
flags += ["--gpus", "all"]
# Run container
run_cmd = [
"docker",
"run",
"-d",
"--platform",
self.platform,
"--rm",
"--ulimit",
"nofile=65536:65536", # prevent "too many open files" errors
"--name",
f"agent-server-{uuid.uuid4()}",
*flags,
image,
"--host",
"0.0.0.0",
"--port",
"8000",
]
proc = execute_command(run_cmd)
if proc.returncode != 0:
raise RuntimeError(f"Failed to run docker container: {proc.stderr}")
self._container_id = proc.stdout.strip()
logger.info(f"Started container: {self._container_id}")
# Optionally stream logs in background
if self.detach_logs:
self._logs_thread = threading.Thread(
target=self._stream_docker_logs, daemon=True
)
self._logs_thread.start()
# Set host for RemoteWorkspace to use
# The container exposes port 8000, mapped to self.host_port
# Override parent's host initialization
object.__setattr__(self, "host", f"http://localhost:{self.host_port}")
object.__setattr__(self, "api_key", None)
# Wait for container to be healthy
self._wait_for_health()
logger.info(f"Docker workspace is ready at {self.host}")
# Now initialize the parent RemoteWorkspace with the container URL
super().model_post_init(context)
def _stream_docker_logs(self) -> None:
"""Stream Docker logs to stdout in the background."""
if not self._container_id:
return
try:
p = subprocess.Popen(
["docker", "logs", "-f", self._container_id],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
if p.stdout is None:
return
for line in iter(p.stdout.readline, ""):
if self._stop_logs.is_set():
break
if line:
sys.stdout.write(f"[DOCKER] {line}")
sys.stdout.flush()
except Exception as e:
sys.stderr.write(f"Error streaming docker logs: {e}\n")
finally:
try:
self._stop_logs.set()
except Exception:
pass
def _wait_for_health(self, timeout: float = 120.0) -> None:
"""Wait for the Docker container to become healthy."""
start = time.time()
health_url = f"http://127.0.0.1:{self.host_port}/health"
while time.time() - start < timeout:
try:
with urlopen(health_url, timeout=1.0) as resp:
if 200 <= getattr(resp, "status", 200) < 300:
return
except Exception:
pass
# Check if container is still running
if self._container_id:
ps = execute_command(
[
"docker",
"inspect",
"-f",
"{{.State.Running}}",
self._container_id,
]
)
if ps.stdout.strip() != "true":
logs = execute_command(["docker", "logs", self._container_id])
msg = (
"Container stopped unexpectedly. Logs:\n"
f"{logs.stdout}\n{logs.stderr}"
)
raise RuntimeError(msg)
time.sleep(1)
raise RuntimeError("Container failed to become healthy in time")
def __enter__(self) -> "DockerWorkspace":
"""Context manager entry - returns the workspace itself."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def]
"""Context manager exit - cleans up the Docker container."""
self.cleanup()
def __del__(self) -> None:
"""Clean up the Docker container when the workspace is destroyed."""
self.cleanup()
def cleanup(self) -> None:
"""Stop and remove the Docker container."""
if self._container_id:
# Stop logs streaming
self._stop_logs.set()
if self._logs_thread and self._logs_thread.is_alive():
self._logs_thread.join(timeout=2)
# Stop and remove the container
logger.info(f"Stopping container: {self._container_id}")
execute_command(["docker", "stop", self._container_id])
self._container_id = None
# Optionally delete the Docker image
if self.cleanup_image and self._image_name:
logger.info(f"Deleting Docker image: {self._image_name}")
result = execute_command(["docker", "rmi", "-f", self._image_name])
if result.returncode == 0:
logger.info(f"Successfully deleted image: {self._image_name}")
else:
logger.warning(
f"Failed to delete image {self._image_name}: {result.stderr}"
)
self._image_name = None
def pause(self) -> None:
"""Pause the Docker container to conserve resources.
Uses `docker pause` to freeze all processes in the container without
stopping it. The container can be resumed later with `resume()`.
Raises:
RuntimeError: If the container is not running or pause fails.
"""
if not self._container_id:
raise RuntimeError("Cannot pause: container is not running")
logger.info(f"Pausing container: {self._container_id}")
result = execute_command(["docker", "pause", self._container_id])
if result.returncode != 0:
raise RuntimeError(f"Failed to pause container: {result.stderr}")
logger.info(f"Container paused: {self._container_id}")
def resume(self) -> None:
"""Resume a paused Docker container.
Uses `docker unpause` to resume all processes in the container.
Raises:
RuntimeError: If the container is not running or resume fails.
"""
if not self._container_id:
raise RuntimeError("Cannot resume: container is not running")
logger.info(f"Resuming container: {self._container_id}")
result = execute_command(["docker", "unpause", self._container_id])
if result.returncode != 0:
raise RuntimeError(f"Failed to resume container: {result.stderr}")
# Wait for health after resuming (use same timeout as initial startup)
self._wait_for_health(timeout=120.0)
logger.info(f"Container resumed: {self._container_id}")Openhands SWE 早期笔记 (旧版本代码,地址已移动)
SWE样例见:SWE数据样例
评测主流程-process_instance
def process_instance(
instance: pd.Series,
metadata: EvalMetadata,
reset_logger: bool = True,
runtime_failure_count: int = 0,
) -> EvalOutput:
config = get_config(instance, metadata)
metadata = copy.deepcopy(metadata)
metadata.details['runtime_failure_count'] = runtime_failure_count
metadata.details['remote_runtime_resource_factor'] = (
config.sandbox.remote_runtime_resource_factor
)
# 拉取构建
runtime = create_runtime(config)
call_async_from_sync(runtime.connect)
try:
# 初始化,获取prompt
initialize_runtime(runtime, instance, metadata)
message_action = get_instruction(instance, metadata)
# Here's how you can run the agent (similar to the `main` function) and get the final task state
state: State | None = asyncio.run(
run_controller(
config=config,
initial_user_action=message_action,
runtime=runtime,
fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN[
metadata.agent_class
],
)
)
# if fatal error, throw EvalError to trigger re-run
if is_fatal_evaluation_error(state.last_error):
raise EvalException('Fatal error detected: ' + state.last_error)
# ======= THIS IS SWE-Bench specific =======
# Get git patch
if DATASET_TYPE == 'SWE-bench-Live':
from evaluation.benchmarks.swe_bench.live_utils import (
complete_runtime as complete_runtime_fn,
)
else:
complete_runtime_fn = complete_runtime
return_val = complete_runtime_fn(runtime, instance)
git_patch = return_val['git_patch']
logger.info(
f'Got git diff for instance {instance.instance_id}:\n--------\n{git_patch}\n--------'
)
finally:
runtime.close()
# ==========================================
# ======= Attempt to evaluate the agent's edits =======
# we use eval_infer.sh to evaluate the agent's edits, not here
# because the agent may alter the environment / testcases
test_result = {
'git_patch': git_patch,
}
# If you are working on some simpler benchmark that only evaluates the final model output (e.g., in a MessageAction)
# You can simply get the LAST `MessageAction` from the returned `state.history` and parse it for evaluation.
if state is None:
raise ValueError('State should not be None.')
# NOTE: this is NO LONGER the event stream, but an agent history that includes delegate agent's events
histories = [event_to_dict(event) for event in state.history]
metrics = get_metrics(state)
# Save the output
instruction = message_action.content
if message_action.image_urls:
instruction += (
'\n\n<image_urls>' + '\n'.join(message_action.image_urls) + '</image_urls>'
)
output = EvalOutput(
instance_id=instance.instance_id,
instruction=instruction,
instance=instance.to_dict(), # SWE Bench specific
test_result=test_result,
metadata=metadata,
history=histories,
metrics=metrics,
error=state.last_error if state and state.last_error else None,
)
return output获取配置OpenHandsConfig
base_container_image:数据实例镜像地址,根据instance_id和任务去做解析- sandbox 配置
OpenHandsConfig
def get_config(
instance: pd.Series,
metadata: EvalMetadata,
) -> OpenHandsConfig:
# We use a different instance image for the each instance of swe-bench eval
use_swebench_official_image = DATASET_TYPE != 'SWE-Gym'
# 数据实例的镜像地址
base_container_image = get_instance_docker_image(
instance['instance_id'],
swebench_official_image=use_swebench_official_image,
)
logger.info(
f'Using instance container image: {base_container_image}. '
f'Please make sure this image exists. '
f'Submit an issue on https://github.com/OpenHands/OpenHands if you run into any issues.'
)
# sandbox 配置
sandbox_config = get_default_sandbox_config_for_eval()
# 数据实例的镜像地址,即数据基础镜像
sandbox_config.base_container_image = base_container_image
sandbox_config.enable_auto_lint = True
sandbox_config.use_host_network = False
# Add platform to the sandbox config to solve issue 4401
sandbox_config.platform = 'linux/amd64'
sandbox_config.remote_runtime_resource_factor = get_instance_resource_factor(
dataset_name=metadata.dataset,
instance_id=instance['instance_id'],
)
# openhands config,主配置
config = get_openhands_config_for_eval(
metadata=metadata,
enable_browser=RUN_WITH_BROWSING,
runtime=os.environ.get('RUNTIME', 'docker'),
sandbox_config=sandbox_config,
)
config.set_llm_config(
update_llm_config_for_completions_logging(
metadata.llm_config, metadata.eval_output_dir, instance['instance_id']
)
)
# get 'draft_editor' config if exists
config.set_llm_config(get_llm_config_arg('draft_editor'), 'draft_editor')
model_routing_config = get_model_routing_config_arg()
model_routing_config.llms_for_routing = (
get_llms_for_routing_config()
) # Populate with LLMs for routing from config.toml file
# agent 配置
agent_config = AgentConfig(
enable_jupyter=False,
enable_browsing=RUN_WITH_BROWSING,
enable_llm_editor=ENABLE_LLM_EDITOR,
enable_mcp=False,
condenser=metadata.condenser_config,
enable_prompt_extensions=False,
model_routing=model_routing_config,
system_prompt_filename=metadata.agent_config.system_prompt_filename
if metadata.agent_config
else 'system_prompt.j2',
)
config.set_agent_config(agent_config)
return configSandbox Config
def get_default_sandbox_config_for_eval() -> SandboxConfig:
return SandboxConfig(
use_host_network=False,
# large enough timeout, since some testcases take very long to run
timeout=300,
api_key=os.environ.get('ALLHANDS_API_KEY', None),
runtime_startup_env_vars={'NO_CHANGE_TIMEOUT_SECONDS': '30'},
remote_runtime_api_url=os.environ.get('SANDBOX_REMOTE_RUNTIME_API_URL'),
keep_runtime_alive=False,
remote_runtime_init_timeout=3600,
remote_runtime_api_timeout=120,
remote_runtime_enable_retries=True,
remote_runtime_class='sysbox',
)EvalMetadata
eval metadata,评估时的一些设置
def make_metadata(
llm_config: LLMConfig,
dataset_name: str,
agent_class: str,
max_iterations: int,
eval_note: str | None,
eval_output_dir: str,
data_split: str | None = None,
details: dict[str, Any] | None = None,
agent_config: AgentConfig | None = None,
condenser_config: CondenserConfig | None = None,
) -> EvalMetadata:
model_name = llm_config.model.split('/')[-1]
model_path = model_name.replace(':', '_').replace('@', '-')
eval_note = f'_N_{eval_note}' if eval_note else ''
eval_output_path = os.path.join(
eval_output_dir,
dataset_name,
agent_class,
f'{model_path}_maxiter_{max_iterations}{eval_note}',
)
pathlib.Path(eval_output_path).mkdir(parents=True, exist_ok=True)
pathlib.Path(os.path.join(eval_output_path, 'logs')).mkdir(
parents=True, exist_ok=True
)
logger.info(f'Using evaluation output directory: {eval_output_path}')
metadata = EvalMetadata(
agent_class=agent_class,
llm_config=llm_config,
agent_config=agent_config,
max_iterations=max_iterations,
eval_output_dir=eval_output_path,
start_time=time.strftime('%Y-%m-%d %H:%M:%S'),
git_commit=subprocess.check_output(['git', 'rev-parse', 'HEAD'])
.decode('utf-8')
.strip(),
dataset=dataset_name,
data_split=data_split,
details=details,
condenser_config=condenser_config
if condenser_config
else NoOpCondenserConfig(),
instruction_template_name=os.environ.get('INSTRUCTION_TEMPLATE_NAME'),
)
metadata_json = metadata.model_dump_json()
logger.info(f'Metadata: {metadata_json}')
with open(os.path.join(eval_output_path, 'metadata.json'), 'w') as f:
f.write(metadata_json)
return metadata镜像环境准备
解析实例镜像地址
def get_instance_docker_image(
instance_id: str,
swebench_official_image: bool = False,
) -> str:
if swebench_official_image:
# Official SWE-Bench image
# swebench/sweb.eval.x86_64.django_1776_django-11333:v1
# SWE-bench-Live uses the same naming convention as SWE-Bench
if DATASET_TYPE == 'SWE-bench-Live':
docker_image_prefix = 'docker.io/starryzhang/'
elif DATASET_TYPE == 'SWE-bench':
docker_image_prefix = 'docker.io/swebench/'
elif DATASET_TYPE == 'SWE-rebench':
docker_image_prefix = 'docker.io/swerebench/'
repo, name = instance_id.split('__')
image_name = f'{docker_image_prefix.rstrip("/")}/sweb.eval.x86_64.{repo}_1776_{name}:latest'.lower()
logger.debug(f'Using official SWE-Bench image: {image_name}')
return image_name
else:
# OpenHands version of the image
# docker.io/xingyaoww/
docker_image_prefix = DEFAULT_DOCKER_IMAGE_PREFIX
image_name = 'sweb.eval.x86_64.' + instance_id
image_name = image_name.replace(
'__', '_s_'
) # to comply with docker image naming convention
return (docker_image_prefix.rstrip('/') + '/' + image_name).lower()通用Runtime构建入口
process_instance 处的调用代码
def process_instance(
instance: pd.Series,
metadata: EvalMetadata,
reset_logger: bool = True,
runtime_failure_count: int = 0,
) -> EvalOutput:
# ....
# ....
# 构建runtime
runtime = create_runtime(config)
call_async_from_sync(runtime.connect)
try:
initialize_runtime(runtime, instance, metadata)
message_action = get_instruction(instance, metadata)
# Here's how you can run the agent (similar to the `main` function) and get the final task state
state: State | None = asyncio.run(
run_controller(
config=config,
initial_user_action=message_action,
runtime=runtime,
fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN[
metadata.agent_class
],
)
)def create_runtime(
config: OpenHandsConfig,
llm_registry: LLMRegistry | None = None,
sid: str | None = None,
headless_mode: bool = True,
agent: Agent | None = None,
git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
) -> Runtime:
"""Create a runtime for the agent to run on.
Args:
config: The app config.
sid: (optional) The session id. IMPORTANT: please don't set this unless you know what you're doing.
Set it to incompatible value will cause unexpected behavior on RemoteRuntime.
headless_mode: Whether the agent is run in headless mode. `create_runtime` is typically called within evaluation scripts,
where we don't want to have the VSCode UI open, so it defaults to True.
agent: (optional) The agent instance to use for configuring the runtime.
Returns:
The created Runtime instance (not yet connected or initialized).
"""
# if sid is provided on the command line, use it as the name of the event stream
# otherwise generate it on the basis of the configured jwt_secret
# we can do this better, this is just so that the sid is retrieved when we want to restore the session
session_id = sid or generate_sid(config)
# set up the event stream
file_store = get_file_store(config.file_store, config.file_store_path)
event_stream = EventStream(session_id, file_store)
# agent class
if agent:
agent_cls = type(agent)
else:
agent_cls = Agent.get_cls(config.default_agent)
# runtime and tools
runtime_cls = get_runtime_cls(config.runtime)
# logger.debug(f'Initializing runtime: {runtime_cls.__name__}')
logger.info(f'Initializing runtime: {runtime_cls.__name__}')
runtime: Runtime = runtime_cls(
config=config,
event_stream=event_stream,
sid=session_id,
plugins=agent_cls.sandbox_plugins,
headless_mode=headless_mode,
llm_registry=llm_registry or LLMRegistry(config),
git_provider_tokens=git_provider_tokens,
)
# Log the plugins that have been registered with the runtime for debugging purposes
logger.debug(
f'Runtime created with plugins: {[plugin.name for plugin in runtime.plugins]}'
)
return runtimeDockerRuntime 构造函数
参考链接
关键参数
base_container_image:数据基础镜像runtime_container_image:运行时镜像?默认为Nonesandbox.local_runtime_url:
class DockerRuntime(ActionExecutionClient):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the docker environment.
Args:
config (OpenHandsConfig): The application configuration.
event_stream (EventStream): The event stream to subscribe to.
sid (str, optional): The session ID. Defaults to 'default'.
plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
"""
_shutdown_listener_id: UUID | None = None
def __init__(
self,
config: OpenHandsConfig,
event_stream: EventStream,
llm_registry: LLMRegistry,
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_callback: Callable | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
user_id: str | None = None,
git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
main_module: str = DEFAULT_MAIN_MODULE,
):
if not DockerRuntime._shutdown_listener_id:
DockerRuntime._shutdown_listener_id = add_shutdown_listener(
lambda: stop_all_containers(CONTAINER_NAME_PREFIX)
)
self.config = config
self.status_callback = status_callback
self._host_port = -1
self._container_port = -1
self._vscode_port = -1
self._app_ports: list[int] = []
# Port locks to prevent race conditions
self._host_port_lock: PortLock | None = None
self._vscode_port_lock: PortLock | None = None
self._app_port_locks: list[PortLock] = []
if os.environ.get('DOCKER_HOST_ADDR'):
logger.info(
f'Using DOCKER_HOST_IP: {os.environ["DOCKER_HOST_ADDR"]} for local_runtime_url'
)
self.config.sandbox.local_runtime_url = (
f'http://{os.environ["DOCKER_HOST_ADDR"]}'
)
self.docker_client: docker.DockerClient = self._init_docker_client()
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
# base_container_image
self.base_container_image = self.config.sandbox.base_container_image
self.runtime_container_image = self.config.sandbox.runtime_container_image
# CONTAINER_NAME_PREFIX = 'openhands-runtime-', sid 为session id
self.container_name = CONTAINER_NAME_PREFIX + sid
self.container: Container | None = None
self.main_module = main_module
self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
# Buffer for container logs
self.log_streamer: LogStreamer | None = None
super().__init__(
config,
event_stream,
llm_registry,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
headless_mode,
user_id,
git_provider_tokens,
)
# Log runtime_extra_deps after base class initialization so self.sid is available
if self.config.sandbox.runtime_extra_deps:
self.log(
'debug',
f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
)
@staticmethod
@lru_cache(maxsize=1)
def _init_docker_client() -> docker.DockerClient:
try:
return docker.from_env()
except Exception as ex:
logger.error(
'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
)
raise exDocker准备运行时容器
参考链接
核心方法
- 如果已有容器,则
启动并attach到该容器,设置api url
- 如果没有容器,则
拉取构建镜像初始化容器
class DockerRuntime(ActionExecutionClient):
'''省略若干代码
'''
async def connect(self) -> None:
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
try:
await call_sync_from_async(self._attach_to_container)
except docker.errors.NotFound as e:
if self.attach_to_existing:
self.log(
'warning',
f'Container {self.container_name} not found.',
)
raise AgentRuntimeDisconnectedError from e
# 重新构建镜像
self.maybe_build_runtime_container_image()
self.log(
'info', f'Starting runtime with image: {self.runtime_container_image}'
)
# 初始化容器
await call_sync_from_async(self.init_container)
self.log(
'info',
f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
)
if DEBUG_RUNTIME and self.container:
self.log_streamer = LogStreamer(self.container, self.log)
else:
self.log_streamer = None
if not self.attach_to_existing:
self.log('info', f'Waiting for client to become ready at {self.api_url}...')
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
await call_sync_from_async(self.wait_until_alive)
if not self.attach_to_existing:
self.log('info', 'Runtime is ready.')
if not self.attach_to_existing:
await call_sync_from_async(self.setup_initial_env)
self.log(
'debug',
f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}. VSCode URL: {self.vscode_url}',
)
if not self.attach_to_existing:
self.set_runtime_status(RuntimeStatus.READY)
self._runtime_initialized = True
for network_name in self.config.sandbox.additional_networks:
try:
network = self.docker_client.networks.get(network_name)
if self.container is not None:
network.connect(self.container)
else:
self.log(
'warning',
f'Container not available to connect to network {network_name}',
)
except Exception as e:
self.log(
'error',
f'Error: Failed to connect instance {self.container_name} to network {network_name}',
)
self.log('error', str(e))- 去
启动容器、设置api_url。
def _attach_to_container(self) -> None:
# 容器名称,在init里根据session id来自定义的
self.container = self.docker_client.containers.get(self.container_name)
if self.container.status == 'exited':
# 启动容器
self.container.start()
config = self.container.attrs['Config']
for env_var in config['Env']:
if env_var.startswith('port='):
self._host_port = int(env_var.split('port=')[1])
self._container_port = self._host_port
elif env_var.startswith('VSCODE_PORT='):
self._vscode_port = int(env_var.split('VSCODE_PORT=')[1])
self._app_ports = []
exposed_ports = config.get('ExposedPorts')
if exposed_ports:
for exposed_port in exposed_ports.keys():
exposed_port = int(exposed_port.split('/tcp')[0])
if (
exposed_port != self._host_port
and exposed_port != self._vscode_port
):
self._app_ports.append(exposed_port)
# api_url:sandbox_url + 容器端口
self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
self.log(
'debug',
f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
)attach失败,则重新创建镜像,根据实例的base_container_image
def maybe_build_runtime_container_image(self):
if self.runtime_container_image is None:
if self.base_container_image is None:
raise ValueError(
'Neither runtime container image nor base container image is set'
)
self.set_runtime_status(RuntimeStatus.BUILDING_RUNTIME)
self.runtime_container_image = build_runtime_image(
self.base_container_image,
self.runtime_builder,
platform=self.config.sandbox.platform,
extra_deps=self.config.sandbox.runtime_extra_deps,
force_rebuild=self.config.sandbox.force_rebuild_runtime,
extra_build_args=self.config.sandbox.runtime_extra_build_args,
enable_browser=self.config.enable_browser,
)Docker从头开始构建镜像
def build_runtime_image(
base_image: str,
runtime_builder: RuntimeBuilder,
platform: str | None = None,
extra_deps: str | None = None,
build_folder: str | None = None,
dry_run: bool = False,
force_rebuild: bool = False,
extra_build_args: list[str] | None = None,
enable_browser: bool = True,
) -> str:
"""Prepares the final docker build folder.
If dry_run is False, it will also build the OpenHands runtime Docker image using the docker build folder.
Parameters:
- base_image (str): The name of the base Docker image to use
- runtime_builder (RuntimeBuilder): The runtime builder to use
- platform (str): The target platform for the build (e.g. linux/amd64, linux/arm64)
- extra_deps (str):
- build_folder (str): The directory to use for the build. If not provided a temporary directory will be used
- dry_run (bool): if True, it will only ready the build folder. It will not actually build the Docker image
- force_rebuild (bool): if True, it will create the Dockerfile which uses the base_image
- extra_build_args (List[str]): Additional build arguments to pass to the builder
- enable_browser (bool): Whether to enable browser support (install Playwright)
Returns:
- str: <image_repo>:<MD5 hash>. Where MD5 hash is the hash of the docker build folder
See https://docs.all-hands.dev/usage/architecture/runtime for more details.
"""
if build_folder is None:
with tempfile.TemporaryDirectory() as temp_dir:
result = build_runtime_image_in_folder(
base_image=base_image,
runtime_builder=runtime_builder,
build_folder=Path(temp_dir),
extra_deps=extra_deps,
dry_run=dry_run,
force_rebuild=force_rebuild,
platform=platform,
extra_build_args=extra_build_args,
enable_browser=enable_browser,
)
return result
result = build_runtime_image_in_folder(
base_image=base_image,
runtime_builder=runtime_builder,
build_folder=Path(build_folder),
extra_deps=extra_deps,
dry_run=dry_run,
force_rebuild=force_rebuild,
platform=platform,
extra_build_args=extra_build_args,
enable_browser=enable_browser,
)
return result参考链接
核心内容
- 基础镜像,base_image:
- 实例的指定镜像
- 如swebench/sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439:latest
- 运行时镜像,runtime_image:
- Openhands真正启动时的镜像
- 以base_image为底座,把openhands的runtime_client和agent_skills等安装进去
三层缓存
- Versioned Tag:底座标签,OpenHands 版本 + 系统级软件 + 基础镜像名
- Lock Tag:依赖标签,Openhands所需要的python第三方包。
- Source Tag:源代码标签,最终运行的镜像。
构建策略
- 先找Source Tag
- 有的话,直接启动。
- 再找Lock Tag
- 代码变量,但重新搞一个
- 基于Lock Image,只把代码拷贝进去。
- 接着找 Versioned Tag
- 依赖也变了,最基础的OpenHadns也没有。
- 从 Versioned镜像,重新安装依赖,拷贝代码。
- 全都找不到,重新开始构建
- 原数据镜像 + 更新系统 + 装python + 安装依赖 + 拷贝代码等。
def build_runtime_image_in_folder(
base_image: str,
runtime_builder: RuntimeBuilder,
build_folder: Path,
extra_deps: str | None,
dry_run: bool,
force_rebuild: bool,
platform: str | None = None,
extra_build_args: list[str] | None = None,
enable_browser: bool = True,
) -> str:
# 解析runtime_image_repo和tag,tag未使用
# 'ghcr.io/openhands/runtime', 'oh_v1.2.1_image_98232eab__1776_scikit-learn-13439_tag_latest'
runtime_image_repo, _ = get_runtime_image_repo_and_tag(base_image)
# 'oh_v1.2.1_2sfgez3r5yzdqiuh'
lock_tag = (
f'oh_v{get_version()}_{get_hash_for_lock_files(base_image, enable_browser)}'
)
# 'oh_v1.2.1_docker.io_s_swebench_s_sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439_t_latest'
versioned_tag = (
# truncate the base image to 96 characters to fit in the tag max length (128 characters)
f'oh_v{get_version()}_{get_tag_for_versioned_image(base_image)}'
)
# 'ghcr.io/openhands/runtime:oh_v1.2.1_docker.io_s_swebench_s_sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439_t_latest'
versioned_image_name = f'{runtime_image_repo}:{versioned_tag}'
# 'oh_v1.2.1_2sfgez3r5yzdqiuh_rdcry28kzl7vjpv1'
source_tag = f'{lock_tag}_{get_hash_for_source_files()}'
# 'ghcr.io/openhands/runtime:oh_v1.2.1_2sfgez3r5yzdqiuh_rdcry28kzl7vjpv1'
hash_image_name = f'{runtime_image_repo}:{source_tag}'
logger.info(f'Building image: {hash_image_name}')
if force_rebuild:
logger.debug(
f'Force rebuild: [{runtime_image_repo}:{source_tag}] from scratch.'
)
prep_build_folder(
build_folder,
base_image,
build_from=BuildFromImageType.SCRATCH,
extra_deps=extra_deps,
enable_browser=enable_browser,
)
if not dry_run:
_build_sandbox_image(
build_folder,
runtime_builder,
runtime_image_repo,
source_tag,
lock_tag,
versioned_tag,
platform,
extra_build_args=extra_build_args,
)
return hash_image_name
# 'ghcr.io/openhands/runtime:oh_v1.2.1_2sfgez3r5yzdqiuh'
lock_image_name = f'{runtime_image_repo}:{lock_tag}'
build_from = BuildFromImageType.SCRATCH
# If the exact image already exists, we do not need to build it
if runtime_builder.image_exists(hash_image_name, False):
logger.debug(f'Reusing Image [{hash_image_name}]')
return hash_image_name
# We look for an existing image that shares the same lock_tag. If such an image exists, we
# can use it as the base image for the build and just copy source files. This makes the build
# much faster.
if runtime_builder.image_exists(lock_image_name):
logger.debug(f'Build [{hash_image_name}] from lock image [{lock_image_name}]')
build_from = BuildFromImageType.LOCK
base_image = lock_image_name
elif runtime_builder.image_exists(versioned_image_name):
logger.info(
f'Build [{hash_image_name}] from versioned image [{versioned_image_name}]'
)
build_from = BuildFromImageType.VERSIONED
base_image = versioned_image_name
else:
logger.debug(f'Build [{hash_image_name}] from scratch')
prep_build_folder(build_folder, base_image, build_from, extra_deps, enable_browser)
if not dry_run:
_build_sandbox_image(
build_folder,
runtime_builder,
runtime_image_repo,
source_tag=source_tag,
lock_tag=lock_tag,
# Only tag the versioned image if we are building from scratch.
# This avoids too much layers when you lay one image on top of another multiple times
versioned_tag=(
versioned_tag if build_from == BuildFromImageType.SCRATCH else None
),
platform=platform,
extra_build_args=extra_build_args,
)
return hash_image_namedef get_runtime_image_repo_and_tag(base_image: str) -> tuple[str, str]:
"""Retrieves the Docker repo and tag associated with the Docker image.
Parameters:
- base_image (str): The name of the base Docker image
Returns:
- tuple[str, str]: The Docker repo and tag of the Docker image
"""
if get_runtime_image_repo() in base_image:
logger.debug(
f'The provided image [{base_image}] is already a valid runtime image.\n'
f'Will try to reuse it as is.'
)
if ':' not in base_image:
base_image = base_image + ':latest'
repo, tag = base_image.split(':')
return repo, tag
else:
if ':' not in base_image:
base_image = base_image + ':latest'
[repo, tag] = base_image.split(':')
# Hash the repo if it's too long
if len(repo) > 32:
repo_hash = hashlib.md5(repo[:-24].encode()).hexdigest()[:8]
repo = f'{repo_hash}_{repo[-24:]}' # Use 8 char hash + last 24 chars
repo = repo.replace('/', '_s_')
new_tag = f'oh_v{get_version()}_image_{repo}_tag_{tag}'
# if it's still too long, hash the entire image name
if len(new_tag) > 128:
new_tag = f'oh_v{get_version()}_image_{hashlib.md5(new_tag.encode()).hexdigest()[:64]}'
logger.warning(
f'The new tag [{new_tag}] is still too long, so we use an hash of the entire image name: {new_tag}'
)
return get_runtime_image_repo(), new_tag
def get_runtime_image_repo() -> str:
return os.getenv('OH_RUNTIME_RUNTIME_IMAGE_REPO', 'ghcr.io/openhands/runtime')- copy openhands源代码
- 拷贝:skills文件夹
- 拷贝:pyproject.toml、petry.lock文件
- 生成dockerfile,
/tmp/tmpomhu7xu3/Dockerfile
def prep_build_folder(
build_folder: Path,
base_image: str,
build_from: BuildFromImageType,
extra_deps: str | None,
enable_browser: bool = True,
) -> None:
# Copy the source code to directory. It will end up in build_folder/code
# If package is not found, build from source code
openhands_source_dir = Path(openhands.__file__).parent
project_root = openhands_source_dir.parent
logger.debug(f'Building source distribution using project root: {project_root}')
# Copy the 'openhands' directory (Source code)
shutil.copytree(
openhands_source_dir,
Path(build_folder, 'code', 'openhands'),
ignore=shutil.ignore_patterns(
'.*/',
'__pycache__/',
'*.pyc',
'*.md',
),
)
# Copy the 'skills' directory (Skills)
shutil.copytree(Path(project_root, 'skills'), Path(build_folder, 'code', 'skills'))
# Copy pyproject.toml and poetry.lock files
for file in ['pyproject.toml', 'poetry.lock']:
src = Path(openhands_source_dir, file)
if not src.exists():
src = Path(project_root, file)
shutil.copy2(src, Path(build_folder, 'code', file))
# Create a Dockerfile and write it to build_folder
dockerfile_content = _generate_dockerfile(
base_image,
build_from=build_from,
extra_deps=extra_deps,
enable_browser=enable_browser,
)
dockerfile_path = Path(build_folder, 'Dockerfile')
with open(str(dockerfile_path), 'w') as f:
f.write(dockerfile_content)Dockerfile 示例
Docker file 如下:
0. 核心功能
- 把swe-bench的毛坯房镜像,做一层精装修。
1. 基础镜像和全局变量
- FROM:指定基础数据镜像,如 swe-bench镜像。
- SHELL:从
/bin/sh改为/bin/bash,并开启命令模式 - ENV:设置环境变量
2. 系统工具安装
- apt-get,uv 等
3. 用户与权限管理
- 创建
openhands用户,UID=1000, - NOPASSWD:设置
无需密码即可执行sudo。
4. Python环境管理 (Micromamba + Poeytry)
- 切换到用户openhands来安装。
- Micromamba:很快的conda替代品,管理python
- Poetry:管理项目依赖包
5. VSCoder Server 安装
下载安装vscode server,启动网页版的vscode,AI和用户都可通过浏览器直接改代码。
6. 项目源代码拷贝
- openhands core、skills等文件夹
7. 自动化配置
- 激活环境、用户等,进入即可使用。
# 基础镜像与全局变量
FROM docker.io/swebench/sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439:latest
SHELL ["/bin/bash", "-c"]
# 公共环境变量,Shared environment variables
ENV POETRY_VIRTUALENVS_PATH=/openhands/poetry \
MAMBA_ROOT_PREFIX=/openhands/micromamba \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
EDITOR=code \
VISUAL=code \
GIT_EDITOR="code --wait" \
OPENVSCODE_SERVER_ROOT=/openhands/.openvscode-server# ================================================================
# START: Build Runtime Image from Scratch
# ================================================================
# This is used in cases where the base image is something more generic like nikolaik/python-nodejs
# rather than the current OpenHands release
# Set PATH early to ensure system commands are available
ENV PATH="/usr/bin:/bin:/usr/sbin:/sbin:$PATH"
# Install base system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
wget curl ca-certificates sudo apt-utils git jq tmux build-essential ripgrep ffmpeg \
coreutils util-linux procps findutils grep sed \
libasound2-plugins libatomic1 && \
(apt-get install -y --no-install-recommends libgl1 || apt-get install -y --no-install-recommends libgl1-mesa-glx) && \
# Install Docker dependencies
apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl gnupg lsb-release
# Install uv (required by MCP)
RUN curl -LsSf https://astral.sh/uv/install.sh | env UV_INSTALL_DIR="/openhands/bin" sh
# Add /openhands/bin to PATH
ENV PATH="/openhands/bin:${PATH}"
# Remove UID 1000 and GID 1000 users/groups that might conflict with openhands user
RUN (if getent passwd 1000 | grep -q pn; then userdel pn; fi) && \
(if getent passwd 1000 | grep -q ubuntu; then userdel ubuntu; fi) && \
(if getent group 1000 | grep -q pn; then groupdel pn; fi) && \
(if getent group 1000 | grep -q ubuntu; then groupdel ubuntu; fi)
# Create openhands group and user (with fallback IDs if 1000 is taken)
RUN (if getent group 1000 >/dev/null 2>&1; then \
groupadd openhands; \
else \
groupadd -g 1000 openhands; \
fi) && \
(if getent passwd 1000 >/dev/null 2>&1; then \
useradd -g openhands -m -s /bin/bash openhands; \
else \
useradd -u 1000 -g openhands -m -s /bin/bash openhands; \
fi) && \
usermod -aG sudo openhands && \
echo 'openhands ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \
# Set empty password for openhands user to allow passwordless su
passwd -d openhands && \
# Set empty password for root user as well to ensure su works in both directions
passwd -d root && \
# Ensure root can su to openhands without password by configuring PAM
sed -i '/pam_rootok.so/d' /etc/pam.d/su && \
sed -i '1i auth sufficient pam_rootok.so' /etc/pam.d/su
# Create necessary directories
RUN mkdir -p /openhands && \
mkdir -p /openhands/logs && \
mkdir -p /openhands/poetry && \
chown -R openhands:openhands /openhands
# ================================================================
# Define Docker installation macro
# Install Docker only if not a swebench or mswebench image
# ================================================================
# Install micromamba,conda替代品
RUN mkdir -p /openhands/micromamba/bin && \
/bin/bash -c "PREFIX_LOCATION=/openhands/micromamba BIN_FOLDER=/openhands/micromamba/bin INIT_YES=no CONDA_FORGE_YES=yes $(curl -L https://micro.mamba.pm/install.sh)" && \
/openhands/micromamba/bin/micromamba config remove channels defaults && \
/openhands/micromamba/bin/micromamba config list && \
chown -R openhands:openhands /openhands/micromamba && \
# Create read-only shared access to micromamba for all users
# This allows both root and openhands users to access the same packages
# while maintaining security by keeping openhands as the owner
chmod -R 755 /openhands/micromamba && \
# Create a separate writable location for root's micromamba cache/config
mkdir -p /root/.local/share/micromamba && \
# Set up environment variables for system-wide access
echo 'export PATH="/openhands/micromamba/bin:$PATH"' >> /etc/environment
# Create the openhands virtual environment and install poetry and python
# Run as openhands user to avoid expensive chown -R operations later
# openhands 用户
USER openhands
RUN /openhands/micromamba/bin/micromamba create -n openhands -y && \
/openhands/micromamba/bin/micromamba install -n openhands -c conda-forge poetry python=3.12 -y
USER root
# Create a clean openhands directory including only the pyproject.toml, poetry.lock and openhands/__init__.py
RUN \
if [ -d /openhands/code ]; then rm -rf /openhands/code; fi && \
mkdir -p /openhands/code/openhands && \
touch /openhands/code/openhands/__init__.py && \
chown -R openhands:openhands /openhands/code && \
# Set global git configuration to ensure proper author/committer information
git config --global user.name "openhands" && \
git config --global user.email "openhands@all-hands.dev"
COPY --chown=openhands:openhands ./code/pyproject.toml ./code/poetry.lock /openhands/code/
# Install user-level dependencies as openhands user
# 安装openhands用户级依赖
WORKDIR /openhands/code
USER openhands
RUN \
/openhands/micromamba/bin/micromamba config set changeps1 False && \
/openhands/micromamba/bin/micromamba run -n openhands poetry config virtualenvs.path /openhands/poetry && \
/openhands/micromamba/bin/micromamba run -n openhands poetry env use python3.12 && \
# Install project dependencies
/openhands/micromamba/bin/micromamba run -n openhands poetry install --only main,runtime --no-interaction --no-root && \
# Clean up user caches
/openhands/micromamba/bin/micromamba run -n openhands poetry cache clear --all . -n && \
/openhands/micromamba/bin/micromamba clean --all
# Install system-level dependencies that require root
USER root
RUN \
# Set environment variables (requires root)
/openhands/micromamba/bin/micromamba run -n openhands poetry run python -c "import sys; print('OH_INTERPRETER_PATH=' + sys.executable)" >> /etc/environment && \
# Set permissions for shared read-only access
# Note: chown -R operations removed - files are now created with correct ownership
# by running micromamba/poetry as openhands user (see install_dependencies_user)
chmod -R 755 /openhands/poetry && \
chmod -R 755 /openhands/micromamba && \
mkdir -p /openhands/workspace && chmod -R g+rws,o+rw /openhands/workspace && \
chown -R openhands:openhands /openhands/workspace && \
# Ensure PATH includes system binaries early in startup
echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:$PATH"' >> /etc/environment && \
echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:$PATH"' >> /etc/bash.bashrc && \
# Set up conda environment activation for all users
echo 'eval "$(/openhands/micromamba/bin/micromamba shell hook --shell bash)"' >> /etc/bash.bashrc && \
echo 'micromamba activate openhands 2>/dev/null || true' >> /etc/bash.bashrc && \
# Set up environment for root user
echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:/openhands/micromamba/bin:$PATH"' >> /root/.bashrc && \
echo 'export PLAYWRIGHT_BROWSERS_PATH=/opt/playwright-browsers' >> /root/.bashrc && \
echo 'eval "$(/openhands/micromamba/bin/micromamba shell hook --shell bash)"' >> /root/.bashrc && \
echo 'micromamba activate openhands 2>/dev/null || true' >> /root/.bashrc && \
# Clean up system packages (requires root)
apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# ================================================================
# END: Build Runtime Image from Scratch
# ================================================================# Ensure openhands user/group and base dirs exist even when not building from scratch
USER root
RUN \
# Ensure group exists (prefer GID 1000 if available)
if ! getent group openhands >/dev/null 2>&1; then \
if getent group 1000 >/dev/null 2>&1; then groupadd openhands; else groupadd -g 1000 openhands; fi; \
fi && \
# Ensure user exists (prefer UID 1000 if available)
if ! id -u openhands >/dev/null 2>&1; then \
if getent passwd 1000 >/dev/null 2>&1; then useradd -m -s /bin/bash -g openhands openhands; else useradd -u 1000 -g openhands -m -s /bin/bash openhands; fi; \
fi && \
# Ensure home and required directories exist before later steps
mkdir -p /home/openhands && \
mkdir -p /openhands && \
mkdir -p $(dirname ${OPENVSCODE_SERVER_ROOT}) && \
# Ensure ownership is correct for all OpenHands paths
chown -R openhands:openhands /home/openhands || true && \
chown -R openhands:openhands /openhands || true
# Reference:
# 1. https://github.com/gitpod-io/openvscode-server
# 2. https://github.com/gitpod-io/openvscode-releases
# Setup VSCode Server
ARG RELEASE_TAG="openvscode-server-v1.98.2"
ARG RELEASE_ORG="gitpod-io"
# ARG USERNAME=openvscode-server
# ARG USER_UID=1000
# ARG USER_GID=1000
RUN if [ -z "${RELEASE_TAG}" ]; then \
echo "The RELEASE_TAG build arg must be set." >&2 && \
exit 1; \
fi && \
arch=$(uname -m) && \
if [ "${arch}" = "x86_64" ]; then \
arch="x64"; \
elif [ "${arch}" = "aarch64" ]; then \
arch="arm64"; \
elif [ "${arch}" = "armv7l" ]; then \
arch="armhf"; \
fi && \
wget https://github.com/${RELEASE_ORG}/openvscode-server/releases/download/${RELEASE_TAG}/${RELEASE_TAG}-linux-${arch}.tar.gz && \
tar -xzf ${RELEASE_TAG}-linux-${arch}.tar.gz && \
if [ -d "${OPENVSCODE_SERVER_ROOT}" ]; then rm -rf "${OPENVSCODE_SERVER_ROOT}"; fi && \
mv ${RELEASE_TAG}-linux-${arch} ${OPENVSCODE_SERVER_ROOT} && \
cp ${OPENVSCODE_SERVER_ROOT}/bin/remote-cli/openvscode-server ${OPENVSCODE_SERVER_ROOT}/bin/remote-cli/code && \
rm -f ${RELEASE_TAG}-linux-${arch}.tar.gz && \
chown -R openhands:openhands ${OPENVSCODE_SERVER_ROOT}
# ================================================================
# Copy Project source files
# ================================================================
RUN if [ -d /openhands/code/openhands ]; then rm -rf /openhands/code/openhands; fi
COPY --chown=openhands:openhands ./code/pyproject.toml ./code/poetry.lock /openhands/code/
RUN if [ -d /openhands/code/skills ]; then rm -rf /openhands/code/skills; fi
COPY --chown=openhands:openhands ./code/skills /openhands/code/skills
COPY --chown=openhands:openhands ./code/openhands /openhands/code/openhands
RUN chmod a+rwx /openhands/code/openhands/__init__.py && \
chown -R openhands:openhands /openhands/code
# ================================================================
# END: Build from versioned image
# ================================================================# Install extra dependencies if specified (as openhands user)
# Set up environment for openhands user
USER root
RUN \
# Set up environment for openhands user
echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:/openhands/micromamba/bin:$PATH"' >> /home/openhands/.bashrc && \
echo 'export PLAYWRIGHT_BROWSERS_PATH=/opt/playwright-browsers' >> /home/openhands/.bashrc && \
echo 'eval "$(/openhands/micromamba/bin/micromamba shell hook --shell bash)"' >> /home/openhands/.bashrc && \
echo 'micromamba activate openhands 2>/dev/null || true' >> /home/openhands/.bashrc && \
chown openhands:openhands /home/openhands/.bashrcDockerRuntimeBuilder
def build(
self,
path: str,
tags: list[str],
platform: str | None = None,
extra_build_args: list[str] | None = None,
use_local_cache: bool = False,
) -> str:
"""Builds a Docker image using BuildKit and handles the build logs appropriately.
Args:
path (str): The path to the Docker build context.
tags (list[str]): A list of image tags to apply to the built image.
platform (str, optional): The target platform for the build. Defaults to None.
use_local_cache (bool, optional): Whether to use and update the local build cache. Defaults to True.
extra_build_args (list[str], optional): Additional arguments to pass to the Docker build command. Defaults to None.
Returns:
str: The name of the built Docker image.
Raises:
AgentRuntimeBuildError: If the Docker server version is incompatible or if the build process fails.
Note:
This method uses Docker BuildKit for improved build performance and caching capabilities.
If `use_local_cache` is True, it will attempt to use and update the build cache in a local directory.
The `extra_build_args` parameter allows for passing additional Docker build arguments as needed.
"""
self.docker_client = docker.from_env()
version_info = self.docker_client.version()
server_version = version_info.get('Version', '').split('+')[0].replace('-', '.')
components = version_info.get('Components')
self.is_podman = (
components is not None
and len(components) > 0
and components[0].get('Name', '').startswith('Podman')
)
if tuple(map(int, server_version.split('.'))) < (18, 9) and not self.is_podman:
raise AgentRuntimeBuildError(
'Docker server version must be >= 18.09 to use BuildKit'
)
if self.is_podman and tuple(map(int, server_version.split('.'))) < (4, 9):
raise AgentRuntimeBuildError('Podman server version must be >= 4.9.0')
if not DockerRuntimeBuilder.check_buildx(self.is_podman):
# when running openhands in a container, there might not be a "docker"
# binary available, in which case we need to download docker binary.
# since the official openhands app image is built from debian, we use
# debian way to install docker binary
logger.info(
'No docker binary available inside openhands-app container, trying to download online...'
)
commands = [
'apt-get update',
'apt-get install -y ca-certificates curl gnupg',
'install -m 0755 -d /etc/apt/keyrings',
'curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc',
'chmod a+r /etc/apt/keyrings/docker.asc',
'echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
tee /etc/apt/sources.list.d/docker.list > /dev/null',
'apt-get update',
'apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin',
]
for cmd in commands:
try:
subprocess.run(
cmd, shell=True, check=True, stdout=subprocess.DEVNULL
)
except subprocess.CalledProcessError as e:
logger.error(f'Image build failed:\n{e}')
logger.error(f'Command output:\n{e.output}')
raise
logger.info('Downloaded and installed docker binary')
target_image_hash_name = tags[0]
target_image_repo, target_image_source_tag = target_image_hash_name.split(':')
target_image_tag = tags[1].split(':')[1] if len(tags) > 1 else None
buildx_cmd = [
'docker' if not self.is_podman else 'podman',
'buildx',
'build',
'--progress=plain',
f'--build-arg=OPENHANDS_RUNTIME_VERSION={get_version()}',
f'--build-arg=OPENHANDS_RUNTIME_BUILD_TIME={datetime.datetime.now().isoformat()}',
f'--tag={target_image_hash_name}',
'--load',
]
# Include the platform argument only if platform is specified
if platform:
buildx_cmd.append(f'--platform={platform}')
cache_dir = '/tmp/.buildx-cache'
if use_local_cache and self._is_cache_usable(cache_dir):
buildx_cmd.extend(
[
f'--cache-from=type=local,src={cache_dir}',
f'--cache-to=type=local,dest={cache_dir},mode=max',
]
)
if extra_build_args:
buildx_cmd.extend(extra_build_args)
buildx_cmd.append(path) # must be last!
self.rolling_logger.start(
f'================ {buildx_cmd[0].upper()} BUILD STARTED ================'
)
builder_cmd = ['docker', 'buildx', 'use', 'default']
subprocess.Popen(
builder_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
try:
process = subprocess.Popen(
buildx_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
output_lines = []
if process.stdout:
for line in iter(process.stdout.readline, ''):
line = line.strip()
if line:
output_lines.append(line) # Store all output lines
self._output_logs(line)
return_code = process.wait()
if return_code != 0:
# Use the collected output for error reporting
output_str = '\n'.join(output_lines)
raise subprocess.CalledProcessError(
return_code,
process.args,
output=output_str, # Use the collected output
stderr=None,
)
except subprocess.CalledProcessError as e:
logger.error(f'Image build failed with exit code {e.returncode}')
if e.output:
logger.error(f'Command output:\n{e.output}')
elif self.rolling_logger.is_enabled() and self.rolling_logger.all_lines:
logger.error(f'Docker build output:\n{self.rolling_logger.all_lines}')
raise
except subprocess.TimeoutExpired:
logger.error('Image build timed out')
raise
except FileNotFoundError as e:
logger.error(f'Python executable not found: {e}')
raise
except PermissionError as e:
logger.error(
f'Permission denied when trying to execute the build command:\n{e}'
)
raise
except Exception as e:
logger.error(f'An unexpected error occurred during the build process: {e}')
raise
logger.info(f'Image [{target_image_hash_name}] build finished.')
if target_image_tag:
image = self.docker_client.images.get(target_image_hash_name)
image.tag(target_image_repo, target_image_tag)
logger.info(
f'Re-tagged image [{target_image_hash_name}] with more generic tag [{target_image_tag}]'
)
# Check if the image is built successfully
image = self.docker_client.images.get(target_image_hash_name)
if image is None:
raise AgentRuntimeBuildError(
f'Build failed: Image {target_image_hash_name} not found'
)
tags_str = (
f'{target_image_source_tag}, {target_image_tag}'
if target_image_tag
else target_image_source_tag
)
logger.info(
f'Image {target_image_repo} with tags [{tags_str}] built successfully'
)
return target_image_hash_namedef image_exists(self, image_name: str, pull_from_repo: bool = True) -> bool:
"""Check if the image exists in the registry (try to pull it first) or in the local store.
Args:
image_name (str): The Docker image to check (<image repo>:<image tag>)
pull_from_repo (bool): Whether to pull from the remote repo if the image not present locally
Returns:
bool: Whether the Docker image exists in the registry or in the local store
"""
if not image_name:
logger.error(f'Invalid image name: `{image_name}`')
return False
try:
logger.debug(f'Checking, if image exists locally:\n{image_name}')
self.docker_client.images.get(image_name)
logger.debug('Image found locally.')
return True
except docker.errors.ImageNotFound:
if not pull_from_repo:
logger.debug(
f'Image {image_name} {colorize("not found", TermColor.WARNING)} locally'
)
return False
try:
logger.debug(
'Image not found locally. Trying to pull it, please wait...'
)
layers: dict[str, dict[str, str]] = {}
previous_layer_count = 0
if ':' in image_name:
image_repo, image_tag = image_name.split(':', 1)
else:
image_repo = image_name
image_tag = None
for line in self.docker_client.api.pull(
image_repo, tag=image_tag, stream=True, decode=True
):
self._output_build_progress(line, layers, previous_layer_count)
previous_layer_count = len(layers)
logger.debug('Image pulled')
return True
except docker.errors.ImageNotFound:
logger.debug('Could not find image locally or in registry.')
return False
except Exception as e:
msg = f'Image {colorize("could not be pulled", TermColor.ERROR)}: '
ex_msg = str(e)
if 'Not Found' in ex_msg:
msg += 'image not found in registry.'
else:
msg += f'{ex_msg}'
logger.debug(msg)
return False