Skip to content

Openhands Scaffold

📅 发表于 2026/02/07
🔄 更新于 2026/02/07
👁️ -- 次访问
📝 0 字
0 分钟

OpenHands SWE (新版的,基于SDK清晰一些)

准备数据Eval信息等

主函数(SWEEvaluator,EvalMetadata等信息)

python
def main() -> None:
    prompt_dir = (Path(__file__).parent / "prompts").resolve()
    choices = [str(p.relative_to(Path.cwd())) for p in prompt_dir.glob("*.j2")]
    default_prompt_path = prompt_dir / "default.j2"
    assert default_prompt_path.exists(), (
        f"Default prompt {default_prompt_path} not found"
    )

    parser = get_parser()
    parser.add_argument(
        "--prompt-path",
        type=str,
        default=str(default_prompt_path),
        choices=choices,
        help="Path to prompt template file",
    )
    args = parser.parse_args()

    # Validate max_attempts
    if args.max_attempts < 1:
        raise ValueError(f"max_attempts must be >= 1, got {args.max_attempts}")

    llm_config_path = args.llm_config_path
    if not os.path.isfile(llm_config_path):
        raise ValueError(f"LLM config file {llm_config_path} does not exist")
    with open(llm_config_path, "r") as f:
        llm_config = f.read()
    llm = LLM.model_validate_json(llm_config)
    logger.info("Using LLM config: %s", llm.model_dump_json(indent=2))

    dataset_description = (
        args.dataset.replace("/", "__") + "-" + args.split.replace("/", "__")
    )

    structured_output_dir = construct_eval_output_dir(
        base_dir=args.output_dir,
        dataset_name=dataset_description,
        model_name=llm.model,
        max_iterations=args.max_iterations,
        eval_note=args.note,
    )

    # Create critic instance from parsed arguments
    critic = create_critic(args)
    logger.info(f"Using critic: {type(critic).__name__}")

    metadata = EvalMetadata(
        llm=llm,
        dataset=args.dataset,
        dataset_split=args.split,
        max_iterations=args.max_iterations,
        eval_output_dir=structured_output_dir,
        details={},
        prompt_path=args.prompt_path,
        eval_limit=args.n_limit,
        env_setup_commands=["export PIP_CACHE_DIR=~/.cache/pip"], 
        max_attempts=args.max_attempts,
        critic=critic,
        selected_instances_file=args.select,
        max_retries=args.max_retries,
        workspace_type=args.workspace,
    )

    # Run orchestrator with a simple JSONL writer
    # 构造SWEBenchEvaluator 
    evaluator = SWEBenchEvaluation( 
        metadata=metadata, 
        num_workers=args.num_workers,
    ) 
		
    # 开始运行
    evaluator.run(on_result=get_default_on_result_writer(evaluator.output_path)) 

    logger.info("Evaluation completed!")


if __name__ == "__main__":
    main()

准备Instances (SWEBenchEvaluation.prepare_instances)

python
def prepare_instances(self) -> List[EvalInstance]:
    logger.info("Setting up SWE-bench evaluation data")

    df = get_dataset(
        dataset_name=self.metadata.dataset,
        split=self.metadata.dataset_split,
        eval_limit=self.metadata.eval_limit,
        selected_instances_file=self.metadata.selected_instances_file,
    )

    instances: List[EvalInstance] = []
    for _, row in df.iterrows():
        inst_id = str(row["instance_id"])
        instances.append(EvalInstance(id=inst_id, data=row.to_dict()))

    logger.info("Total instances to process: %d", len(instances))
    return instances

Prompt

markdown
I have access to a python code repository in the directory {{ instance.repo_path }} . You can explore and modify files using the available tools. Consider the following issue description:

<issue_description>
{{ instance.problem_statement }}
</issue_description>

Can you help me implement the necessary changes to the repository so that the requirements specified in the <issue_description> are met?
I've already taken care of all changes to any of the test files described in the <issue_description>. This means you DON'T have to modify the testing logic or any of the tests in any way!
Also the development Python environment is already set up for you (i.e., all dependencies already installed), so you don't need to install other packages.
Your task is to make the minimal changes to non-test files in the {{ instance.repo_path }} directory to ensure the <issue_description> is satisfied.

Follow these phases to resolve the issue:

Phase 1. READING: read the problem and reword it in clearer terms
   1.1 If there are code or config snippets. Express in words any best practices or conventions in them.
   1.2 Hightlight message errors, method names, variables, file names, stack traces, and technical details.
   1.3 Explain the problem in clear terms.
   1.4 Enumerate the steps to reproduce the problem.
   1.5 Hightlight any best practices to take into account when testing and fixing the issue

Phase 2. RUNNING: install and run the tests on the repository
   2.1 Activate the environment by running  
       ./opt/miniconda3/etc/profile.d/conda.sh ; conda activate testbed
   2.2 Follow the readme
   2.3 Install the environment and anything needed
   2.4 Iterate and figure out how to run the tests

Phase 3. EXPLORATION: find the files that are related to the problem and possible solutions
   3.1 Use `grep` to search for relevant methods, classes, keywords and error messages.
   3.2 Identify all files related to the problem statement.
   3.3 Propose the methods and files to fix the issue and explain why.
   3.4 From the possible file locations, select the most likely location to fix the issue.

Phase 4. TEST CREATION: before implementing any fix, create a script to reproduce and verify the issue.
   4.1 Look at existing test files in the repository to understand the test format/structure.
   4.2 Create a minimal reproduction script that reproduces the located issue.
   4.3 Run the reproduction script to confirm you are reproducing the issue.
   4.4 Adjust the reproduction script as necessary.

Phase 5. FIX ANALYSIS: state clearly the problem and how to fix it
   5.1 State clearly what the problem is.
   5.2 State clearly where the problem is located.
   5.3 State clearly how the test reproduces the issue.
   5.4 State clearly the best practices to take into account in the fix.
   5.5 State clearly how to fix the problem.

Phase 6. FIX IMPLEMENTATION: Edit the source code to implement your chosen solution.
   6.1 Make minimal, focused changes to fix the issue.

Phase 7. VERIFICATION: Test your implementation thoroughly.
   7.1 Run your reproduction script to verify the fix works.
   7.2 Add edge cases to your test script to ensure comprehensive coverage.
   7.3 Run existing tests related to the modified code to ensure you haven't broken anything.

8. FINAL REVIEW: Carefully re-read the problem description and compare your changes with the base commit {{ instance.base_commit }}.
   8.1 Ensure you've fully addressed all requirements.
   8.2 Run any tests in the repository related to:
     8.2.1 The issue you are fixing
     8.2.2 The files you modified
     8.2.3 The functions you changed
   8.3 If any tests fail, revise your implementation until all tests pass

Be thorough in your exploration, testing, and reasoning. It's fine if your thinking process is lengthy - quality and completeness are more important than brevity.

准备环境 (SWEBenchEvaluation.prepare_workspace)

准备instance 工作空间

参考代码

核心流程

  • 拉取instance官方docker镜像
  • 构建openhands agent运行镜像,基于官方镜像做的一些补充。

主流程(prepare_workerspace)

python
# ---- Hook: prepare a workspace per instance ----------------------------------
def prepare_workspace(
    self,
    instance: EvalInstance,
    resource_factor: int = 1,
    forward_env: list[str] | None = None,
) -> RemoteWorkspace:
    """
    Use DockerWorkspace by default.

    Args:
        instance: The evaluation instance to prepare workspace for.
        resource_factor: Resource factor for runtime allocation (default: 1).
                       Higher values allocate more CPU/memory resources.
                       Used by APIRemoteWorkspace for remote runtime allocation.
    """
    # instance的官方docker镜像
    official_docker_image = get_official_docker_image(instance.id) 
    build_target = "source-minimal"
    custom_tag = extract_custom_tag(official_docker_image)
    # For non-binary targets, append target suffix
    suffix = f"-{build_target}" if build_target != "binary" else ""
    base_agent_image = (
        f"{EVAL_AGENT_SERVER_IMAGE}:{SDK_SHORT_SHA}-{custom_tag}{suffix}"
    )
    wrap_needed = should_wrap_instance_id(instance.id)
    agent_server_image = base_agent_image

    if self.metadata.workspace_type == "docker":
        SKIP_BUILD = os.getenv("SKIP_BUILD", "1").lower() in ("1", "true", "yes")
        logger.info(f"SKIP_BUILD={SKIP_BUILD}")
        if not SKIP_BUILD:
            logger.info(
                f"Building workspace from {official_docker_image} "
                f"for instance {instance.id}. "
                "This may take a while...\n"
                "You can run benchmarks/swebench/build_images.py and set "
                "SWE_BENCH_SKIP_BUILD=1 to skip building and use pre-built "
                "agent-server image."
            )
            # 构建镜像
            output = build_image(
                base_image=official_docker_image,
                target_image=EVAL_AGENT_SERVER_IMAGE,
                custom_tag=custom_tag,
                target=build_target,
                push=False,
            )
            logger.info(f"Image build output: {output}")
            assert output.error is None, f"Image build failed: {output.error}"
            if base_agent_image not in output.tags:
                raise RuntimeError(
                    f"Built image tags {output.tags} do not include expected tag "
                    f"{base_agent_image}"
                )
            if wrap_needed:
                wrapped_result = wrap_image(base_agent_image, push=False)
                if wrapped_result.error:
                    raise RuntimeError(
                        "Wrapped image build failed: "
                        f"{wrapped_result.error}; log={wrapped_result.log_path}"
                    )

        workspace = DockerWorkspace( 
            server_image=agent_server_image, 
            working_dir="/workspace", 
            forward_env=forward_env or [], 
        )
    elif self.metadata.workspace_type == "remote":
        runtime_api_key = os.getenv("RUNTIME_API_KEY")
        sdk_short_sha = os.getenv("SDK_SHORT_SHA", SDK_SHORT_SHA)
        if not runtime_api_key:
            raise ValueError(
                "RUNTIME_API_KEY environment variable is not set for remote workspace"
            )

        agent_server_image = (
            f"{EVAL_AGENT_SERVER_IMAGE}:{sdk_short_sha}-{custom_tag}{suffix}"
        )
        if not image_exists(agent_server_image):
            raise RuntimeError(
                f"Agent server image {agent_server_image} does not exist in container registry, "
                "make sure to build, push it, and make it public accessible before using remote workspace."
            )
        logger.info(
            f"Using remote workspace with image {agent_server_image} "
            f"(sdk sha: {sdk_short_sha}, resource_factor: {resource_factor})"
        )
        startup_timeout = float(os.getenv("REMOTE_RUNTIME_STARTUP_TIMEOUT", "600"))
        workspace = APIRemoteWorkspace(
            runtime_api_url=os.getenv(
                "RUNTIME_API_URL", "https://runtime.eval.all-hands.dev"
            ),
            runtime_api_key=runtime_api_key,
            server_image=agent_server_image,
            target_type="source" if "source" in build_target else "binary",
            forward_env=forward_env or [],
            resource_factor=resource_factor,
            init_timeout=startup_timeout,
            startup_wait_timeout=startup_timeout,
        )
    else:
        raise ValueError(
            f"Unsupported workspace_type: {self.metadata.workspace_type}"
        )

    for cmd in self.metadata.env_setup_commands or []: 
        res = workspace.execute_command(cmd) 
        if res.exit_code != 0: 
            raise RuntimeError(
                f"Failed to run env setup command '{cmd}': {res.stderr}"
            )
        logger.debug(f"Ran env setup command '{cmd}': {res.stdout}")
    return workspace

官方Instance Docker镜像

python
def get_official_docker_image(
    instance_id: str,
    docker_image_prefix="docker.io/swebench/",
) -> str:
    # Official SWE-Bench image
    # swebench/sweb.eval.x86_64.django_1776_django-11333:v1
    repo, name = instance_id.split("__")
    official_image_name = docker_image_prefix.rstrip("/")
    official_image_name += f"/sweb.eval.x86_64.{repo}_1776_{name}:latest".lower()
    logger.debug(f"Official SWE-Bench image: {official_image_name}")
    return official_image_name

build_image 构建运行时镜像主函数

build_image 主函数

python
def build_image(
    base_image: str,
    target_image: str,
    custom_tag: str,
    target: TargetType = "source-minimal",
    push: bool = False,
) -> BuildOutput:
    # Get SDK info from submodule to ensure tags use the correct SDK SHA
    git_ref, git_sha, sdk_version = _get_sdk_submodule_info() 

    opts = BuildOptions(
        base_image=base_image,
        custom_tags=custom_tag,
        image=target_image,
        target=target,
        # SWE-Bench only supports linux/amd64 images
        platforms=["linux/amd64"],
        push=push,
        # Override git info to use SDK submodule info instead of benchmarks repo
        git_ref=git_ref,
        git_sha=git_sha,
        sdk_version=sdk_version,
    )
    for t in opts.all_tags:
        # Check if image exists or not
        if image_exists(t):
            logger.info("Image %s already exists. Skipping build.", t)
            return BuildOutput(base_image=base_image, tags=[t], error=None)
    tags = build(opts)
    return BuildOutput(base_image=base_image, tags=tags, error=None) 

class BuildOutput(BaseModel):
    time: str = Field(default_factory=lambda: datetime.now(UTC).isoformat())
    base_image: str
    tags: list[str]
    error: str | None = None
    log_path: str | None = None

获取openhands sdk 模块信息

python
def _get_sdk_submodule_info() -> tuple[str, str, str]:
    """
    Get SDK version info from the vendor/software-agent-sdk submodule.

    Returns:
        tuple[str, str, str]: (git_ref, git_sha, sdk_version)
    """
    # Find the benchmarks repo root (where this file lives)
    benchmarks_root = Path(__file__).resolve().parent.parent.parent
    sdk_path = benchmarks_root / "vendor" / "software-agent-sdk"

    # Get submodule SHA directly from the checked-out submodule
    # This is more direct than parsing git submodule status output
    try:
        result = subprocess.run(
            ["git", "rev-parse", "HEAD"],
            cwd=sdk_path,
            capture_output=True,
            text=True,
            check=True,
        )
        git_sha = result.stdout.strip() 
    except subprocess.CalledProcessError:
        logger.warning(
            "Failed to get SDK submodule SHA, using 'unknown'. "
            "Make sure submodules are initialized."
        )
        git_sha = "unknown"

    # Get submodule ref (current branch or HEAD)
    try:
        result = subprocess.run(
            ["git", "symbolic-ref", "-q", "--short", "HEAD"],
            cwd=sdk_path,
            capture_output=True,
            text=True,
            check=True,
        )
        git_ref = result.stdout.strip() 
    except subprocess.CalledProcessError:
        git_ref = "unknown"

    # Get SDK version from pyproject.toml
    pyproject_path = sdk_path / "openhands-sdk" / "pyproject.toml"
    sdk_version = "unknown"
    try:
        if pyproject_path.exists():
            with pyproject_path.open("rb") as f:
                config = tomllib.load(f)
            sdk_version = config.get("project", {}).get("version", "unknown")
    except Exception as e:
        logger.warning(f"Failed to read SDK version from pyproject.toml: {e}")

    logger.info(
        f"SDK submodule info: ref={git_ref}, sha={git_sha[:7]}, version={sdk_version}"
    )
    return git_ref, git_sha, sdk_version 

构建openhands-scaffold运行时镜像

基于base_image构建openhands作为scaffold的instance运行时镜像

python
def build(opts: BuildOptions) -> list[str]:
    """Single entry point for building the agent-server image."""
    dockerfile_path = _get_dockerfile_path(opts.sdk_project_root) 
    push = opts.push
    if push is None:
        push = IN_CI

    tags = opts.all_tags
    cache_tag, cache_tag_base = opts.cache_tags

    ctx = _make_build_context(opts.sdk_project_root)
    logger.info(f"[build] Clean build context: {ctx}")
    # docker 构建参数
    args = [ 
        "docker", 
        "buildx", 
        "build",
        "--file",
        str(dockerfile_path), 
        "--target",
        opts.target,
        "--build-arg",
        f"BASE_IMAGE={opts.base_image}", 
    ]
    if push:
        args += ["--platform", ",".join(opts.platforms), "--push"]
    else:
        args += ["--load"]

    for t in tags:
        args += ["--tag", t]

    # -------- cache strategy --------
    driver = _active_buildx_driver() or "unknown"
    local_cache_dir = _default_local_cache_dir()
    cache_args: list[str] = []

    if push:
        # Remote/CI builds: use registry cache + inline for maximum reuse.
        cache_args += [
            "--cache-from",
            f"type=registry,ref={opts.image}:{cache_tag}",
            "--cache-from",
            f"type=registry,ref={opts.image}:{cache_tag_base}-main",
            "--cache-to",
            f"type=registry,ref={opts.image}:{cache_tag},mode=max",
        ]
        logger.info("[build] Cache: registry (remote/CI) + inline")
    else:
        # Local/dev builds: prefer local dir cache if
        # driver supports it; otherwise inline-only.
        if driver == "docker-container": 
            local_cache_dir.mkdir(parents=True, exist_ok=True)
            cache_args += [ 
                "--cache-from",
                f"type=local,src={str(local_cache_dir)}",
                "--cache-to",
                f"type=local,dest={str(local_cache_dir)},mode=max",
            ]
            logger.info(
                f"[build] Cache: local dir at {local_cache_dir} (driver={driver})"
            )
        else:
            logger.warning(
                f"[build] WARNING: Active buildx driver is '{driver}', "
                "which does not support local dir caching. Fallback to INLINE CACHE\n"
                " Consider running the following commands to set up a "
                "compatible buildx environment:\n"
                "  1. docker buildx create --name openhands-builder "
                "--driver docker-container --use\n"
                "  2. docker buildx inspect --bootstrap\n"
            )
            # docker driver can't export caches; fall back to inline metadata only.
            cache_args += ["--build-arg", "BUILDKIT_INLINE_CACHE=1"]
            logger.info(f"[build] Cache: inline only (driver={driver})")
    
    # 拼接缓存参数
    args += cache_args + [str(ctx)] 

    logger.info(
        f"[build] Building target='{opts.target}' image='{opts.image}' "
        f"custom_tags='{opts.custom_tags}' from base='{opts.base_image}' "
        f"for platforms='{opts.platforms if push else 'local-arch'}'"
    )
    logger.info(
        f"[build] Git ref='{opts.git_ref}' sha='{opts.git_sha}' "
        f"package_version='{opts.sdk_version}'"
    )
    logger.info(f"[build] Cache tag: {cache_tag}")

    try:
      	# 真正基于args去构建镜像
        res = _run(args, cwd=str(ctx)) 
        sys.stdout.write(res.stdout or "") 
    except subprocess.CalledProcessError as e:
        logger.error(f"[build] ERROR: Build failed with exit code {e.returncode}")
        logger.error(f"[build] Command: {' '.join(e.cmd)}")
        logger.error(f"[build] Full stdout:\n{e.output}")
        logger.error(f"[build] Full stderr:\n{e.stderr}")
        raise
    finally:
        logger.info(f"[build] Cleaning {ctx}")
        shutil.rmtree(ctx, ignore_errors=True)

    logger.info("[build] Done. Tags:")
    for t in tags:
        logger.info(f" - {t}")
    return tags

执行初始化环境命令

执行初始化环境的命令

核心目的

  • metadata中获取env_setup_commands
  • workspace里,一条一条地运行命令
  • 如果某一条命令失败,则会抛出错误终止

示例

  • env_setup_commands=["export PIP_CACHE_DIR=~/.cache/pip"],
python
def prepare_workspace(
    self,
    instance: EvalInstance,
    resource_factor: int = 1,
    forward_env: list[str] | None = None,
) -> RemoteWorkspace:
    
  # ---
  # ---
  # 执行一些初始化环境的命令
  for cmd in self.metadata.env_setup_commands or []: 
      res = workspace.execute_command(cmd) 
      if res.exit_code != 0: 
          raise RuntimeError(
              f"Failed to run env setup command '{cmd}': {res.stderr}"
          )
      logger.debug(f"Ran env setup command '{cmd}': {res.stdout}")
  return workspace

执行Instance

Agent交互(SWEBenchEvaluation.evaluate_instance)

evaluate_instance

核心流程

  • 准备3个工具:FileEditor, Terminal, TaskTracker
  • 初始化agent:cli_mode,命令行模式
  • 依照instance去生成repo_path,把testbed里的代码复制一份到repo_path,并做git还原,确保无更改。
  • 使用repo_path作为当前work_dir
  • 根据instance、workspace、metadata获取instruction
  • 构建conversation对象,进行agent循环
  • 执行完成后,手动做git add 和 git commit,生成git patch
  • 保存结果
python
def evaluate_instance(
    self, instance: EvalInstance, workspace: RemoteWorkspace
) -> EvalOutput:
    """
    Create conversation, run agent, collect history and git patch.
    Do not write files here; just return EvalOutput.
    """
    # 工具
    tools = get_default_tools(
        # Disable browser tools in CLI mode
        enable_browser=False, 
    )
    # Agent 
    agent = Agent(
        llm=self.metadata.llm,
        tools=tools,
        system_prompt_kwargs={"cli_mode": True}, 
        # TODO: we can enable condenser and security analyzer later
        # and have them configurable via EvalMetadata
        # condenser=get_default_condenser(
        #     llm=self.metadata.llm.model_copy(update={"service_id": "condenser"})
        # ),
        # security_analyzer=LLMSecurityAnalyzer(),
    )

    assert isinstance(workspace, RemoteWorkspace)
		
    # repo 路径
    repo_path = f"/workspace/{instance.data['repo'].split('/')[-1]}/"
    instance.data["repo_path"] = repo_path

    persist_callback = build_event_persistence_callback(
        run_id=self.metadata.eval_output_dir,
        instance_id=instance.id,
        attempt=self.current_attempt,
    )
    # 构建一个conversation 对象
    conversation = Conversation(
        agent=agent,
        workspace=workspace,
        callbacks=[persist_callback],
        max_iteration_per_run=self.metadata.max_iterations,
    )

    logger.info("repo_path: %s", repo_path)
    # cp 一份全新的代码到repo_path去,作为工作空间
    cp_testebed_repo = workspace.execute_command(
        (f"mkdir -p {repo_path} ; cp -r /testbed/. {repo_path}")
    )
    assert cp_testebed_repo.exit_code == 0, (
        f"cp_testebed_repo failed: {cp_testebed_repo.stderr}"
    )

    # git reset,确保无修改
    git_reset = workspace.execute_command(f"cd {repo_path} ; git reset --hard")
    assert git_reset.exit_code == 0, f"git reset failed: {git_reset.stderr}"

    # 获得prompt
    instruction = get_instruction(
        instance=instance.data,
        metadata=self.metadata,
        workspace_path=workspace.working_dir,
    )
    conversation.send_message(instruction)
    # Run conversation with fake user responses to handle agent messages
    run_conversation_with_fake_user_response(conversation)

    # 手动 git add
    workspace.execute_command(f"cd {repo_path} ; git add -A")

    # 手动 git commit
    workspace.execute_command(
        f"cd {repo_path} && "
        "git config --global user.email 'evaluation@openhands.dev' && "
        "git config --global user.name 'OpenHands Evaluation' && "
        "git commit -m 'patch'"
    )

    # 生成 git patch
    base_commit = instance.data["base_commit"]
    git_patch_result = workspace.execute_command(
        (f"cd {repo_path} ; git --no-pager diff --no-color {base_commit} HEAD")
    )
    assert git_patch_result.exit_code == 0, (
        f"git diff failed: {git_patch_result.stderr}"
    )
    # 获得git_patch
    git_patch = git_patch_result.stdout

    # EvalOutput is your model; keep fields consistent with prior JSONL
    out = EvalOutput(
        instance_id=instance.id,
        attempt=self.current_attempt,
        test_result={
            "git_patch": git_patch,
        },
        instruction=instruction,
        error=None,
        history=list(conversation.state.events),
        metrics=conversation.conversation_stats.get_combined_metrics(),
    )
    return out

分数评估

分数评估

代码

核心

  • 格式转换
  • 调用swebench.harness.run_evaluation做评估。

主函数

python
def main() -> None:
    """Main entry point for the script."""
    parser = argparse.ArgumentParser(
        description="Convert OpenHands output to SWE-Bench format and run evaluation",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
    uv run swebench-eval output.jsonl
    uv run swebench-eval /path/to/output.jsonl --dataset princeton-nlp/SWE-bench_Lite
    uv run swebench-eval output.jsonl --model-name "MyModel-v1.0"
        """,
    )

    parser.add_argument("input_file", help="Path to the OpenHands output.jsonl file")

    parser.add_argument(
        "--dataset",
        default="princeton-nlp/SWE-bench_Verified",
        help="SWE-Bench dataset to evaluate against "
        "(default: princeton-nlp/SWE-bench_Verified)",
    )

    parser.add_argument(
        "--output-file",
        help="Output file for SWE-Bench format "
        "(default: input_file with .swebench.jsonl extension)",
    )

    parser.add_argument(
        "--skip-evaluation",
        action="store_true",
        help="Only convert format, skip running evaluation",
    )

    parser.add_argument(
        "--model-name",
        default="openhands",
        help="Model name to use in the model_name_or_path field (default: openhands)",
    )

    parser.add_argument(
        "--workers",
        default="12",
        help="Number of workers to use when evaluating",
    )

    args = parser.parse_args()

    # Validate input file
    input_file = Path(args.input_file)
    if not input_file.exists():
        logger.error(f"Input file does not exist: {input_file}")
        sys.exit(1)

    if not input_file.suffix == ".jsonl":
        logger.warning(f"Input file does not have .jsonl extension: {input_file}")

    # Determine output file
    if args.output_file:
        output_file = Path(args.output_file)
    else:
        output_file = input_file.with_suffix(".swebench.jsonl")

    logger.info(f"Input file: {input_file}")
    logger.info(f"Output file: {output_file}")
    logger.info(f"Dataset: {args.dataset}")
    logger.info(f"Model name: {args.model_name}")

    try:
        # Convert format
        convert_to_swebench_format(str(input_file), str(output_file), args.model_name)

        if not args.skip_evaluation:
            # Run evaluation
            run_swebench_evaluation(str(output_file), args.dataset, args.workers)

            # Move report file to input file directory with .report.json extension
            # SWE-Bench creates: {model_name.replace("/", "__")}.eval_{output_file.stem}.json
            report_filename = (
                f"{args.model_name.replace('/', '__')}.eval_{output_file.stem}.json"
            )
            report_path = output_file.parent / report_filename
            dest_report_path = input_file.with_suffix(".report.json")

            shutil.move(str(report_path), str(dest_report_path))
            logger.info(f"Moved report file to: {dest_report_path}")

            # Update Laminar datapoints with evaluation scores
            LaminarService.get().update_evaluation_scores(
                str(input_file), str(dest_report_path)
            )

        # Generate cost report as final step
        generate_cost_report(str(input_file))

        logger.info("Script completed successfully!")

    except Exception as e:
        logger.error(f"Script failed: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

格式转换

python
def convert_to_swebench_format(
    input_file: str, output_file: str, model_name: str = "OpenHands"
) -> None:
    """
    Convert OpenHands output.jsonl to SWE-Bench prediction format.

    OpenHands format:
    {
        "instance_id": "django__django-11333",
        "test_result": {
            "git_patch": "diff --git a/file.py b/file.py\n..."
        },
        "instruction": "...",
        "error": null,
        "history": [...]
    }

    SWE-Bench format:
    {
        "instance_id": "django__django-11333",
        "model_patch": "diff --git a/file.py b/file.py\n...",
        "model_name_or_path": "OpenHands"
    }
    """
    logger.info(f"Converting {input_file} to SWE-Bench format: {output_file}")

    converted_count = 0
    error_count = 0

    with open(input_file, "r") as infile, open(output_file, "w") as outfile:
        for line_num, line in enumerate(infile, 1):
            try:
                line = line.strip()
                if not line:
                    continue

                data = json.loads(line)

                # Extract required fields
                instance_id = data.get("instance_id")
                if not instance_id:
                    logger.warning(f"Line {line_num}: Missing instance_id")
                    error_count += 1
                    continue

                # Extract git_patch from test_result
                test_result = data.get("test_result", {})
                git_patch = test_result.get("git_patch", "")

                if not git_patch:
                    logger.warning(
                        f"Line {line_num}: Missing or empty git_patch for {instance_id}"
                    )
                    # Still create entry with empty patch
                    git_patch = ""

                # postprocess git_patch
                setup_files = ["pyproject.toml", "tox.ini", "setup.py"]
                git_patch = remove_files_from_patch(git_patch, setup_files)

                # Create SWE-Bench format entry
                swebench_entry = {
                    "instance_id": instance_id,
                    "model_patch": git_patch,
                    "model_name_or_path": model_name,
                }

                # Write to output file
                outfile.write(json.dumps(swebench_entry) + "\n")
                converted_count += 1

            except json.JSONDecodeError as e:
                logger.error(f"Line {line_num}: Invalid JSON - {e}")
                error_count += 1
            except Exception as e:
                logger.error(f"Line {line_num}: Unexpected error - {e}")
                error_count += 1

    logger.info(
        f"Conversion complete: {converted_count} entries converted, "
        f"{error_count} errors"
    )

    if converted_count == 0:
        raise ValueError("No valid entries were converted")
python
def run_swebench_evaluation(
    predictions_file: str,
    dataset: str = "princeton-nlp/SWE-bench_Verified",
    workers: str = "12",
) -> None:
    """
    Run SWE-Bench evaluation on the predictions file.

    Args:
        predictions_file: Path to the SWE-Bench format predictions file
        dataset: SWE-Bench dataset to evaluate against
        workers: Number of workers to use for evaluation
    """
    logger.info(f"Running SWE-Bench evaluation on {predictions_file}")

    try:
        # Get the directory of the predictions file
        predictions_path = Path(predictions_file)
        predictions_dir = predictions_path.parent
        predictions_filename = predictions_path.name

        # Run SWE-Bench evaluation using global python (not UV environment)
        # since swebench is installed globally
        cmd = [
            "uv",
            "run",
            "python",
            "-m",
            "swebench.harness.run_evaluation",
            "--dataset_name",
            dataset,
            "--predictions_path",
            predictions_filename,
            "--max_workers",
            str(workers),
            "--run_id",
            f"eval_{predictions_path.stem}",
        ]

        logger.info(f"Running command: {' '.join(cmd)}")
        logger.info(f"Working directory: {predictions_dir}")
        logger.info("SWE-Bench evaluation output:")
        print("-" * 80)

        # Stream output directly to console, running from predictions file directory
        result = subprocess.run(cmd, text=True, cwd=predictions_dir)

        print("-" * 80)
        if result.returncode == 0:
            logger.info("SWE-Bench evaluation completed successfully")
        else:
            logger.error(
                f"SWE-Bench evaluation failed with return code {result.returncode}"
            )
            raise subprocess.CalledProcessError(result.returncode, cmd)

    except FileNotFoundError:
        logger.error(
            "SWE-Bench evaluation command not found. "
            "Make sure SWE-Bench is properly installed."
        )
        raise
    except Exception as e:
        logger.error(f"Error running SWE-Bench evaluation: {e}")
        raise

工具概览(文件编辑+命令行+TaskTracker)

工具

代码地址

工具

  • FileEditorTool
  • TerminalTool
  • TaskTrackerTool
python
def get_default_tools(
    enable_browser: bool = True,
) -> list[Tool]:
    """Get the default set of tool specifications for the standard experience.

    Args:
        enable_browser: Whether to include browser tools.
    """
    register_default_tools(enable_browser=enable_browser)

    # Import tools to access their name attributes
    from openhands.tools.file_editor import FileEditorTool
    from openhands.tools.task_tracker import TaskTrackerTool
    from openhands.tools.terminal import TerminalTool

    tools = [
        Tool(name=TerminalTool.name),
        Tool(name=FileEditorTool.name),
        Tool(name=TaskTrackerTool.name),
    ]
    if enable_browser:
        from openhands.tools.browser_use import BrowserToolSet

        tools.append(Tool(name=BrowserToolSet.name))
    return tools

OpenHands Tools

Base 定义

Base Action

python
class Action(Schema, ABC):
    """Base schema for input action."""

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation of this action.

        This method can be overridden by subclasses to customize visualization.
        The base implementation displays all action fields systematically.
        """
        content = Text()

        # Display action name
        action_name = self.__class__.__name__
        content.append("Action: ", style="bold")
        content.append(action_name)
        content.append("\n\n")

        # Display all action fields systematically
        content.append("Arguments:", style="bold")
        action_fields = self.model_dump()
        content.append(display_dict(action_fields))

        return content

Base Observation

Base Observation

属性

  • ERROR_MESSAGE_HEADER:出错时的补充信息
  • content:list,元素可是文本图像
  • is_error:是否出错

text()

  • 把content list里的文本内容,拼接在一起

to_llm_content()

  • 把content转换成llm见的格式。出错时,对相关内容前面加一个ERROR_MESSAGE_HEADER
python
class Observation(Schema, ABC):
    """Base schema for output observation."""

    ERROR_MESSAGE_HEADER: ClassVar[str] = "[An error occurred during execution.]\n"

    content: list[TextContent | ImageContent] = Field(
        default_factory=list,
        description=(
            "Content returned from the tool as a list of "
            "TextContent/ImageContent objects. "
            "When there is an error, it should be written in this field."
        ),
    )
    is_error: bool = Field(
        default=False, description="Whether the observation indicates an error"
    )

    @classmethod
    def from_text(
        cls,
        text: str,
        is_error: bool = False,
        **kwargs: Any,
    ) -> "Self":
        """Utility to create an Observation from a simple text string.

        Args:
            text: The text content to include in the observation.
            is_error: Whether this observation represents an error.
            **kwargs: Additional fields for the observation subclass.

        Returns:
            An Observation instance with the text wrapped in a TextContent.
        """
        return cls(content=[TextContent(text=text)], is_error=is_error, **kwargs)

    @property
    def text(self) -> str:
        """Extract all text content from the observation.

        Returns:
            Concatenated text from all TextContent items in content.
        """
        return "".join(
            item.text for item in self.content if isinstance(item, TextContent)
        )

    @property
    def to_llm_content(self) -> Sequence[TextContent | ImageContent]:
        """
        Default content formatting for converting observation to LLM readable content.
        Subclasses can override to provide richer content (e.g., images, diffs).
        """
        llm_content: list[TextContent | ImageContent] = []

        # If is_error is true, prepend error message
        if self.is_error:
            llm_content.append(TextContent(text=self.ERROR_MESSAGE_HEADER))

        # Add content (now always a list)
        llm_content.extend(self.content)

        return llm_content

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation of this observation.

        Subclasses can override for custom visualization; by default we show the
        same text that would be sent to the LLM.
        """
        text = Text()

        if self.is_error:
            text.append("❌ ", style="red bold")
            text.append(self.ERROR_MESSAGE_HEADER, style="bold red")

        text_parts = content_to_str(self.to_llm_content)
        if text_parts:
            full_content = "".join(text_parts)
            text.append(full_content)
        else:
            text.append("[no text content]")
        return text

FileEditor Tool

FileEditorTool

代码地址

三层架构

  • 接口层:FileEditorTool
  • 逻辑层:FileEditorExecutor
  • 底层实现层:FileEditor

核心命令

  • View
  • create
  • Str_replace
  • insert
  • undo_edit

FileEditorAction

FileEditorAction

关键属性

  • command:执行的命令,只能有5个
    • view, create, str_replace, insert, undo_edit
  • path:文件路径

不同命令,需要不同的参数

  • createfile_text,文件的内容
  • str_replaceold_str, new_str,需要改变的新旧字符串
  • insert: insert_line,需要插入的行
  • viewview_range:需要查看的行数
python
class FileEditorAction(Action):
    """Schema for file editor operations."""
    # 命令
    command: CommandLiteral = Field( 
        description="The commands to run. Allowed options are: `view`, `create`, "
        "`str_replace`, `insert`, `undo_edit`."
    )
    # 文件路径
    path: str = Field(description="Absolute path to file or directory.") 
    file_text: str | None = Field(
        default=None,
        description="Required parameter of `create` command, with the content of "
        "the file to be created.",
    )
    old_str: str | None = Field(
        default=None,
        description="Required parameter of `str_replace` command containing the "
        "string in `path` to replace.",
    )
    new_str: str | None = Field(
        default=None,
        description="Optional parameter of `str_replace` command containing the "
        "new string (if not given, no string will be added). Required parameter "
        "of `insert` command containing the string to insert.",
    )
    insert_line: int | None = Field(
        default=None,
        ge=0,
        description="Required parameter of `insert` command. The `new_str` will "
        "be inserted AFTER the line `insert_line` of `path`.",
    )
    view_range: list[int] | None = Field(
        default=None,
        description="Optional parameter of `view` command when `path` points to a "
        "file. If none is given, the full file is shown. If provided, the file "
        "will be shown in the indicated line number range, e.g. [11, 12] will "
        "show lines 11 and 12. Indexing at 1 to start. Setting `[start_line, "
        "-1]` shows all lines from `start_line` to the end of the file.",
    )

FileEditorObservation

FileEditorObservation

属性

  • command, path, old_content, new_content等

方法

  • visualize:
  • _has_meaningful_diff
python
class FileEditorObservation(Observation):
    """A ToolResult that can be rendered as a CLI output."""

    command: CommandLiteral = Field(
        description=(
            "The command that was run: `view`, `create`, `str_replace`, "
            "`insert`, or `undo_edit`."
        )
    )

    path: str | None = Field(default=None, description="The file path that was edited.")
    prev_exist: bool = Field(
        default=True,
        description="Indicates if the file previously existed. If not, it was created.",
    )
    
    old_content: str | None = Field(
        default=None, description="The content of the file before the edit."
    )
    new_content: str | None = Field(
        default=None, description="The content of the file after the edit."
    )

    _diff_cache: Text | None = PrivateAttr(default=None)

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation of this observation.

        Shows diff visualization for meaningful changes (file creation, successful
        edits), otherwise falls back to agent observation.
        """
        text = Text()

        if self.is_error:
            text.append("❌ ", style="red bold")
            text.append(self.ERROR_MESSAGE_HEADER, style="bold red")

        if not self._has_meaningful_diff:
            return super().visualize

        assert self.path is not None, "path should be set for meaningful diff"
        # Generate and cache diff visualization
        if not self._diff_cache:
            change_applied = self.command != "view" and not self.is_error
            self._diff_cache = visualize_diff(
                self.path,
                self.old_content,
                self.new_content,
                n_context_lines=2,
                change_applied=change_applied,
            )

        # Combine error prefix with diff visualization
        text.append(self._diff_cache)
        return text

    @property
    def _has_meaningful_diff(self) -> bool:
        """Check if there's a meaningful diff to display."""
        if self.is_error:
            return False

        if not self.path:
            return False

        if self.command not in ("create", "str_replace", "insert", "undo_edit"):
            return False

        # File creation case
        if self.command == "create" and self.new_content and not self.prev_exist:
            return True

        # File modification cases (str_replace, insert, undo_edit)
        if self.command in ("str_replace", "insert", "undo_edit"):
            # Need both old and new content to show meaningful diff
            if self.old_content is not None and self.new_content is not None:
                # Only show diff if content actually changed
                return self.old_content != self.new_content

        return False

FileEditorTool 定义和描述

FileEditorTool

工具描述

代码流程

  • 根据工作目录初始化真实的executor
  • 基础(前2行)和剩下的工具描述,但默认还是完整的工具描述
  • 根据working_dir增强工具描述
python
Command = Literal[
    "view",
    "create",
    "str_replace",
    "insert",
    "undo_edit",
]


TOOL_DESCRIPTION = """Custom editing tool for viewing, creating and editing files in plain-text format
* State is persistent across command calls and discussions with the user
* If `path` is a text file, `view` displays the result of applying `cat -n`. If `path` is a directory, `view` lists non-hidden files and directories up to 2 levels deep
* The `create` command cannot be used if the specified `path` already exists as a file
* If a `command` generates a long output, it will be truncated and marked with `<response clipped>`
* The `undo_edit` command will revert the last edit made to the file at `path`
* This tool can be used for creating and editing files in plain-text format.


Before using this tool:
1. Use the view tool to understand the file's contents and context
2. Verify the directory path is correct (only applicable when creating new files):
   - Use the view tool to verify the parent directory exists and is the correct location

When making edits:
   - Ensure the edit results in idiomatic, correct code
   - Do not leave the code in a broken state
   - Always use absolute file paths (starting with /)

CRITICAL REQUIREMENTS FOR USING THIS TOOL:

1. EXACT MATCHING: The `old_str` parameter must match EXACTLY one or more consecutive lines from the file, including all whitespace and indentation. The tool will fail if `old_str` matches multiple locations or doesn't match exactly with the file content.

2. UNIQUENESS: The `old_str` must uniquely identify a single instance in the file:
   - Include sufficient context before and after the change point (3-5 lines recommended)
   - If not unique, the replacement will not be performed

3. REPLACEMENT: The `new_str` parameter should contain the edited lines that replace the `old_str`. Both strings must be different.

Remember: when making multiple file edits in a row to the same file, you should prefer to send all edits in a single message with multiple calls to this tool, rather than multiple messages with a single call each.
"""  # noqa: E501


class FileEditorTool(ToolDefinition[FileEditorAction, FileEditorObservation]):
    """A ToolDefinition subclass that automatically initializes a FileEditorExecutor."""

    @classmethod
    def create(
        cls,
        conv_state: "ConversationState",
    ) -> Sequence["FileEditorTool"]:
        """Initialize FileEditorTool with a FileEditorExecutor.

        Args:
            conv_state: Conversation state to get working directory from.
                         If provided, workspace_root will be taken from
                         conv_state.workspace
        """
        # Import here to avoid circular imports
        from openhands.tools.file_editor.impl import FileEditorExecutor

        # 根据工作目录初始化真实的executor,
        executor = FileEditorExecutor(workspace_root=conv_state.workspace.working_dir) 

        # Build the tool description with conditional image viewing support
        # Split TOOL_DESCRIPTION to insert image viewing line after the second bullet
        description_lines = TOOL_DESCRIPTION.split("\n")
        # 基础和剩下的工具描述
        base_description = "\n".join(description_lines[:2])  # First two lines
        remaining_description = "\n".join(description_lines[2:])  # Rest of description

        # Add image viewing line if LLM supports vision
        if conv_state.agent.llm.vision_is_active():
            tool_description = (
                f"{base_description}\n"
                "* If `path` is an image file (.png, .jpg, .jpeg, .gif, .webp, "
                ".bmp), `view` displays the image content\n"
                f"{remaining_description}"
            )
        else:
            tool_description = TOOL_DESCRIPTION

        # Add working directory information to the tool description
        # to guide the agent to use the correct directory instead of root
        working_dir = conv_state.workspace.working_dir
        # 增强描述,working_dir
        enhanced_description = (
            f"{tool_description}\n\n"
            f"Your current working directory is: {working_dir}\n"
            f"When exploring project structure, start with this directory "
            f"instead of the root filesystem."
        )

        # Initialize the parent Tool with the executor
        return [
            cls(
                action_type=FileEditorAction,
                observation_type=FileEditorObservation,
                description=enhanced_description,
                annotations=ToolAnnotations(
                    title="file_editor",
                    readOnlyHint=False,
                    destructiveHint=True,
                    idempotentHint=False,
                    openWorldHint=False,
                ),
                executor=executor,
            )
        ]


# Automatically register the tool when this module is imported
register_tool(FileEditorTool.name, FileEditorTool)
markdown
文件编辑器工具使用指南
这是一款用于查看、创建及编辑纯文本文件的自定义工具。

核心特性
状态持久化:在不同的命令调用和用户对话之间,操作状态是持续保留的。

智能查看 (view):

如果路径是文件,view 会显示带行号的内容(类似于执行 cat -n)。

如果路径是目录,view 会列出非隐藏文件和子目录(最多显示 2 层深度)。

创建限制 (create):如果指定路径已存在同名文件,则无法使用 create 命令。

输出截断:如果命令生成的输出过长,系统会自动截断并标记为 <response clipped>。

撤销功能 (undo_edit):该命令会撤销对指定路径文件所做的最后一次修改。

用途:本工具专用于纯文本格式文件的创建与编辑。

使用前的准备工作
先观察后操作:先使用 view 工具了解文件的内容和上下文背景。

验证目录路径(仅适用于创建新文件):

使用 view 工具确认父目录确实存在,且位置正确。

编辑时的准则
代码质量:确保修改后的代码符合惯例且逻辑正确。

完整性:不要让代码处于无法运行或损坏的状态。

绝对路径:始终使用绝对文件路径(以 / 开头)。

⚠️ 至关重要的核心要求 (CRITICAL REQUIREMENTS)
精确匹配 (EXACT MATCHING): old_str 参数必须与文件中的一行或多行完全匹配,包括所有的空格、缩进和换行符。如果 old_str 匹配到多个位置,或者与文件内容有细微差别,操作将执行失败。

唯一性 (UNIQUENESS): old_str 必须能唯一锁定文件中的某个位置:

建议在修改点前后包含足够的上下文(推荐提供 3 到 5 行)。

如果匹配不唯一,系统将不会执行替换操作。

替换逻辑 (REPLACEMENT): new_str 参数应包含替换 old_str 后的编辑行。这两个字符串必须不相同。

温馨提示
当需要对同一个文件连续进行多次编辑时,应优先选择在单条消息中发送多个工具调用,而不是分多条消息、每条只调用一次。

FileEditorExecutor (执行包装,对内)

python
from pathlib import Path
from typing import TYPE_CHECKING

from openhands.sdk.tool import ToolExecutor


if TYPE_CHECKING:
    from openhands.sdk.conversation import LocalConversation
from openhands.tools.file_editor.definition import (
    CommandLiteral,
    FileEditorAction,
    FileEditorObservation,
)
from openhands.tools.file_editor.editor import FileEditor
from openhands.tools.file_editor.exceptions import ToolError


# Module-global editor instance (lazily initialized in file_editor)
_GLOBAL_EDITOR: FileEditor | None = None


class FileEditorExecutor(ToolExecutor):
    """File editor executor with configurable file restrictions."""

    def __init__(
        self,
        workspace_root: str | None = None,
        allowed_edits_files: list[str] | None = None,
    ):
        self.editor: FileEditor = FileEditor(workspace_root=workspace_root)
        self.allowed_edits_files: set[Path] | None = (
            {Path(f).resolve() for f in allowed_edits_files}
            if allowed_edits_files
            else None
        )

    def __call__(
        self,
        action: FileEditorAction,
        conversation: "LocalConversation | None" = None,  # noqa: ARG002
    ) -> FileEditorObservation:
        # Enforce allowed_edits_files restrictions
        if self.allowed_edits_files is not None and action.command != "view":
            action_path = Path(action.path).resolve()
            if action_path not in self.allowed_edits_files:
                return FileEditorObservation.from_text(
                    text=(
                        f"Operation '{action.command}' is not allowed "
                        f"on file '{action_path}'. "
                        f"Only the following files can be edited: "
                        f"{sorted(str(p) for p in self.allowed_edits_files)}"
                    ),
                    command=action.command,
                    is_error=True,
                )

        result: FileEditorObservation | None = None
        try:
            result = self.editor(
                command=action.command,
                path=action.path,
                file_text=action.file_text,
                view_range=action.view_range,
                old_str=action.old_str,
                new_str=action.new_str,
                insert_line=action.insert_line,
            )
        except ToolError as e:
            result = FileEditorObservation.from_text(
                text=e.message, command=action.command, is_error=True
            )
        assert result is not None, "file_editor should always return a result"
        return result


def file_editor(
    command: CommandLiteral,
    path: str,
    file_text: str | None = None,
    view_range: list[int] | None = None,
    old_str: str | None = None,
    new_str: str | None = None,
    insert_line: int | None = None,
) -> FileEditorObservation:
    """A global FileEditor instance to be used by the tool."""

    global _GLOBAL_EDITOR
    if _GLOBAL_EDITOR is None:
        _GLOBAL_EDITOR = FileEditor()

    result: FileEditorObservation | None = None
    try:
        result = _GLOBAL_EDITOR(
            command=command,
            path=path,
            file_text=file_text,
            view_range=view_range,
            old_str=old_str,
            new_str=new_str,
            insert_line=insert_line,
        )
    except ToolError as e:
        result = FileEditorObservation.from_text(
            text=e.message, command=command, is_error=True
        )
    assert result is not None, "file_editor should always return a result"
    return result

FileEditor (真实的底层实现)

python
import base64
import mimetypes
import os
import re
import shutil
import tempfile
from pathlib import Path
from typing import get_args

from binaryornot.check import is_binary

from openhands.sdk import ImageContent, TextContent
from openhands.sdk.logger import get_logger
from openhands.sdk.utils.truncate import maybe_truncate
from openhands.tools.file_editor.definition import (
    CommandLiteral,
    FileEditorObservation,
)
from openhands.tools.file_editor.exceptions import (
    EditorToolParameterInvalidError,
    EditorToolParameterMissingError,
    FileValidationError,
    ToolError,
)
from openhands.tools.file_editor.utils.config import SNIPPET_CONTEXT_WINDOW
from openhands.tools.file_editor.utils.constants import (
    BINARY_FILE_CONTENT_TRUNCATED_NOTICE,
    DIRECTORY_CONTENT_TRUNCATED_NOTICE,
    MAX_RESPONSE_LEN_CHAR,
    TEXT_FILE_CONTENT_TRUNCATED_NOTICE,
)
from openhands.tools.file_editor.utils.encoding import (
    EncodingManager,
    with_encoding,
)
from openhands.tools.file_editor.utils.history import FileHistoryManager
from openhands.tools.file_editor.utils.shell import run_shell_cmd


logger = get_logger(__name__)

# Supported image extensions for viewing as base64-encoded content
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}


class FileEditor:
    """
    An filesystem editor tool that allows the agent to
    - view
    - create
    - navigate
    - edit files
    The tool parameters are defined by Anthropic and are not editable.

    Original implementation: https://github.com/anthropics/anthropic-quickstarts/blob/main/computer-use-demo/computer_use_demo/tools/edit.py
    """

    MAX_FILE_SIZE_MB: int = 10  # Maximum file size in MB
    _history_manager: FileHistoryManager
    _max_file_size: int
    _encoding_manager: EncodingManager
    _cwd: str

    def __init__(
        self,
        workspace_root: str | None = None,
        max_file_size_mb: int | None = None,
    ):
        """Initialize the editor.

        Args:
            max_file_size_mb: Maximum file size in MB. If None, uses the default
                MAX_FILE_SIZE_MB.
            workspace_root: Root directory that serves as the current working
                directory for relative path suggestions. Must be an absolute path.
                If None, no path suggestions will be provided for relative paths.
        """
        self._history_manager = FileHistoryManager(max_history_per_file=10)
        self._max_file_size = (
            (max_file_size_mb or self.MAX_FILE_SIZE_MB) * 1024 * 1024
        )  # Convert to bytes

        # Initialize encoding manager
        self._encoding_manager = EncodingManager()

        # Set cwd (current working directory) if workspace_root is provided
        if workspace_root is not None:
            workspace_path = Path(workspace_root)
            # Ensure workspace_root is an absolute path
            if not workspace_path.is_absolute():
                workspace_path = workspace_path.resolve()
            self._cwd = str(workspace_path)
        else:
            self._cwd = os.path.abspath(os.getcwd())
        logger.info(f"FileEditor initialized with cwd: {self._cwd}")

    def __call__(
        self,
        *,
        command: CommandLiteral,
        path: str,
        file_text: str | None = None,
        view_range: list[int] | None = None,
        old_str: str | None = None,
        new_str: str | None = None,
        insert_line: int | None = None,
    ) -> FileEditorObservation:
        _path = Path(path)
        self.validate_path(command, _path)
        if command == "view":
            return self.view(_path, view_range)
        elif command == "create":
            if file_text is None:
                raise EditorToolParameterMissingError(command, "file_text")
            self.write_file(_path, file_text)
            self._history_manager.add_history(_path, file_text)
            return FileEditorObservation.from_text(
                text=f"File created successfully at: {_path}",
                command=command,
                path=str(_path),
                new_content=file_text,
                prev_exist=False,
            )
        elif command == "str_replace":
            if old_str is None:
                raise EditorToolParameterMissingError(command, "old_str")
            if new_str is None:
                raise EditorToolParameterMissingError(command, "new_str")
            if new_str == old_str:
                raise EditorToolParameterInvalidError(
                    "new_str",
                    new_str,
                    "No replacement was performed. `new_str` and `old_str` must be "
                    "different.",
                )
            return self.str_replace(_path, old_str, new_str)
        elif command == "insert":
            if insert_line is None:
                raise EditorToolParameterMissingError(command, "insert_line")
            if new_str is None:
                raise EditorToolParameterMissingError(command, "new_str")
            return self.insert(_path, insert_line, new_str)
        elif command == "undo_edit":
            return self.undo_edit(_path)

        raise ToolError(
            f"Unrecognized command {command}. The allowed commands for "
            f"{self.__class__.__name__} tool are: {', '.join(get_args(CommandLiteral))}"
        )

    @with_encoding
    def _count_lines(self, path: Path, encoding: str = "utf-8") -> int:
        """
        Count the number of lines in a file safely.

        Args:
            path: Path to the file
            encoding: The encoding to use when reading the file (auto-detected by
                decorator)

        Returns:
            The number of lines in the file
        """
        with open(path, encoding=encoding) as f:
            return sum(1 for _ in f)

    @with_encoding
    def str_replace(
        self,
        path: Path,
        old_str: str,
        new_str: str | None,
    ) -> FileEditorObservation:
        """
        Implement the str_replace command, which replaces old_str with new_str in
        the file content.

        Args:
            path: Path to the file
            old_str: String to replace
            new_str: Replacement string
            enable_linting: Whether to run linting on the changes
            encoding: The encoding to use (auto-detected by decorator)
        """
        self.validate_file(path)
        new_str = new_str or ""

        # Read the entire file first to handle both single-line and multi-line
        # replacements
        file_content = self.read_file(path)

        # Find all occurrences using regex
        # Escape special regex characters in old_str to match it literally
        pattern = re.escape(old_str)
        occurrences = [
            (
                file_content.count("\n", 0, match.start()) + 1,  # line number
                match.group(),  # matched text
                match.start(),  # start position
            )
            for match in re.finditer(pattern, file_content)
        ]

        if not occurrences:
            # We found no occurrences, possibly because of extra white spaces at
            # either the front or back of the string.
            # Remove the white spaces and try again.
            old_str = old_str.strip()
            new_str = new_str.strip()
            pattern = re.escape(old_str)
            occurrences = [
                (
                    file_content.count("\n", 0, match.start()) + 1,  # line number
                    match.group(),  # matched text
                    match.start(),  # start position
                )
                for match in re.finditer(pattern, file_content)
            ]
            if not occurrences:
                raise ToolError(
                    f"No replacement was performed, old_str `{old_str}` did not "
                    f"appear verbatim in {path}."
                )
        if len(occurrences) > 1:
            line_numbers = sorted(set(line for line, _, _ in occurrences))
            raise ToolError(
                f"No replacement was performed. Multiple occurrences of old_str "
                f"`{old_str}` in lines {line_numbers}. Please ensure it is unique."
            )

        # We found exactly one occurrence
        replacement_line, matched_text, idx = occurrences[0]

        # Create new content by replacing just the matched text
        new_file_content = (
            file_content[:idx] + new_str + file_content[idx + len(matched_text) :]
        )

        # Write the new content to the file
        self.write_file(path, new_file_content)

        # Save the content to history
        self._history_manager.add_history(path, file_content)

        # Create a snippet of the edited section
        start_line = max(0, replacement_line - SNIPPET_CONTEXT_WINDOW)
        end_line = replacement_line + SNIPPET_CONTEXT_WINDOW + new_str.count("\n")

        # Read just the snippet range
        snippet = self.read_file(path, start_line=start_line + 1, end_line=end_line)

        # Prepare the success message
        success_message = f"The file {path} has been edited. "
        success_message += self._make_output(
            snippet, f"a snippet of {path}", start_line + 1
        )

        success_message += (
            "Review the changes and make sure they are as expected. Edit the "
            "file again if necessary."
        )
        return FileEditorObservation.from_text(
            text=success_message,
            command="str_replace",
            prev_exist=True,
            path=str(path),
            old_content=file_content,
            new_content=new_file_content,
        )

    def view(
        self, path: Path, view_range: list[int] | None = None
    ) -> FileEditorObservation:
        """
        View the contents of a file or a directory.
        """
        if path.is_dir():
            if view_range:
                raise EditorToolParameterInvalidError(
                    "view_range",
                    str(view_range),
                    "The `view_range` parameter is not allowed when `path` points to "
                    "a directory.",
                )

            # First count hidden files/dirs in current directory only
            # -mindepth 1 excludes . and .. automatically
            _, hidden_stdout, _ = run_shell_cmd(
                rf"find -L {path} -mindepth 1 -maxdepth 1 -name '.*'"
            )
            hidden_count = (
                len(hidden_stdout.strip().split("\n")) if hidden_stdout.strip() else 0
            )

            # Then get files/dirs up to 2 levels deep, excluding hidden entries at
            # both depth 1 and 2
            _, stdout, stderr = run_shell_cmd(
                rf"find -L {path} -maxdepth 2 -not \( -path '{path}/\.*' -o "
                rf"-path '{path}/*/\.*' \) | sort",
                truncate_notice=DIRECTORY_CONTENT_TRUNCATED_NOTICE,
            )
            if stderr:
                return FileEditorObservation.from_text(
                    text=stderr,
                    command="view",
                    is_error=True,
                    path=str(path),
                    prev_exist=True,
                )
            # Add trailing slashes to directories
            paths = stdout.strip().split("\n") if stdout.strip() else []
            formatted_paths = []
            for p in paths:
                if Path(p).is_dir():
                    formatted_paths.append(f"{p}/")
                else:
                    formatted_paths.append(p)

            msg = [
                f"Here's the files and directories up to 2 levels deep in {path}, "
                "excluding hidden items:\n" + "\n".join(formatted_paths)
            ]
            if hidden_count > 0:
                msg.append(
                    f"\n{hidden_count} hidden files/directories in this directory "
                    f"are excluded. You can use 'ls -la {path}' to see them."
                )
            stdout = "\n".join(msg)
            return FileEditorObservation.from_text(
                text=stdout,
                command="view",
                path=str(path),
                prev_exist=True,
            )

        # Check if the file is an image
        file_extension = path.suffix.lower()
        if file_extension in IMAGE_EXTENSIONS:
            # Read image file as base64
            try:
                with open(path, "rb") as f:
                    image_bytes = f.read()
                image_base64 = base64.b64encode(image_bytes).decode("utf-8")

                mime_type, _ = mimetypes.guess_type(str(path))
                if not mime_type or not mime_type.startswith("image/"):
                    mime_type = "image/png"
                output_msg = (
                    f"Image file {path} read successfully. Displaying image content."
                )
                image_url = f"data:{mime_type};base64,{image_base64}"
                return FileEditorObservation(
                    command="view",
                    content=[
                        TextContent(text=output_msg),
                        ImageContent(image_urls=[image_url]),
                    ],
                    path=str(path),
                    prev_exist=True,
                )
            except Exception as e:
                raise ToolError(f"Failed to read image file {path}: {e}") from None

        # Validate file and count lines
        self.validate_file(path)
        try:
            num_lines = self._count_lines(path)
        except UnicodeDecodeError as e:
            raise ToolError(
                f"Cannot view {path}: file contains binary content that cannot be "
                f"decoded as text. Error: {e}"
            ) from None

        start_line = 1
        if not view_range:
            file_content = self.read_file(path)
            output = self._make_output(file_content, str(path), start_line)

            return FileEditorObservation.from_text(
                text=output,
                command="view",
                path=str(path),
                prev_exist=True,
            )

        if len(view_range) != 2 or not all(isinstance(i, int) for i in view_range):
            raise EditorToolParameterInvalidError(
                "view_range",
                str(view_range),
                "It should be a list of two integers.",
            )

        start_line, end_line = view_range
        if start_line < 1 or start_line > num_lines:
            raise EditorToolParameterInvalidError(
                "view_range",
                str(view_range),
                f"Its first element `{start_line}` should be within the range of "
                f"lines of the file: {[1, num_lines]}.",
            )

        # Normalize end_line and provide a warning if it exceeds file length
        warning_message: str | None = None
        if end_line == -1:
            end_line = num_lines
        elif end_line > num_lines:
            warning_message = (
                f"We only show up to {num_lines} since there're only {num_lines} "
                "lines in this file."
            )
            end_line = num_lines

        if end_line < start_line:
            raise EditorToolParameterInvalidError(
                "view_range",
                str(view_range),
                f"Its second element `{end_line}` should be greater than or equal "
                f"to the first element `{start_line}`.",
            )

        file_content = self.read_file(path, start_line=start_line, end_line=end_line)

        # Get the detected encoding
        output = self._make_output(
            "\n".join(file_content.splitlines()), str(path), start_line
        )  # Remove extra newlines

        # Prepend warning if we truncated the end_line
        if warning_message:
            output = f"NOTE: {warning_message}\n{output}"

        return FileEditorObservation.from_text(
            text=output,
            command="view",
            path=str(path),
            prev_exist=True,
        )

    @with_encoding
    def write_file(self, path: Path, file_text: str, encoding: str = "utf-8") -> None:
        """
        Write the content of a file to a given path; raise a ToolError if an
        error occurs.

        Args:
            path: Path to the file to write
            file_text: Content to write to the file
            encoding: The encoding to use when writing the file (auto-detected by
                decorator)
        """
        self.validate_file(path)
        try:
            # Use open with encoding instead of path.write_text
            with open(path, "w", encoding=encoding) as f:
                f.write(file_text)
        except Exception as e:
            raise ToolError(f"Ran into {e} while trying to write to {path}") from None

    @with_encoding
    def insert(
        self,
        path: Path,
        insert_line: int,
        new_str: str,
        encoding: str = "utf-8",
    ) -> FileEditorObservation:
        """
        Implement the insert command, which inserts new_str at the specified line
        in the file content.

        Args:
            path: Path to the file
            insert_line: Line number where to insert the new content
            new_str: Content to insert
            enable_linting: Whether to run linting on the changes
            encoding: The encoding to use (auto-detected by decorator)
        """
        # Validate file and count lines
        self.validate_file(path)
        num_lines = self._count_lines(path)

        if insert_line < 0 or insert_line > num_lines:
            raise EditorToolParameterInvalidError(
                "insert_line",
                str(insert_line),
                f"It should be within the range of allowed values: {[0, num_lines]}",
            )

        new_str_lines = new_str.split("\n")

        # Create temporary file for the new content
        with tempfile.NamedTemporaryFile(
            mode="w", encoding=encoding, delete=False
        ) as temp_file:
            # Copy lines before insert point and save them for history
            history_lines = []
            with open(path, encoding=encoding) as f:
                for i, line in enumerate(f, 1):
                    if i > insert_line:
                        break
                    temp_file.write(line)
                    history_lines.append(line)

            # Insert new content
            for line in new_str_lines:
                temp_file.write(line + "\n")

            # Copy remaining lines and save them for history
            with open(path, encoding=encoding) as f:
                for i, line in enumerate(f, 1):
                    if i <= insert_line:
                        continue
                    temp_file.write(line)
                    history_lines.append(line)

        # Move temporary file to original location
        shutil.move(temp_file.name, path)

        # Read just the snippet range
        start_line = max(0, insert_line - SNIPPET_CONTEXT_WINDOW)
        end_line = min(
            num_lines + len(new_str_lines),
            insert_line + SNIPPET_CONTEXT_WINDOW + len(new_str_lines),
        )
        snippet = self.read_file(path, start_line=start_line + 1, end_line=end_line)

        # Save history - we already have the lines in memory
        file_text = "".join(history_lines)
        self._history_manager.add_history(path, file_text)

        # Read new content for result
        new_file_text = self.read_file(path)

        success_message = f"The file {path} has been edited. "
        success_message += self._make_output(
            snippet,
            "a snippet of the edited file",
            max(1, insert_line - SNIPPET_CONTEXT_WINDOW + 1),
        )

        success_message += (
            "Review the changes and make sure they are as expected (correct "
            "indentation, no duplicate lines, etc). Edit the file again if necessary."
        )
        return FileEditorObservation.from_text(
            text=success_message,
            command="insert",
            prev_exist=True,
            path=str(path),
            old_content=file_text,
            new_content=new_file_text,
        )

    def validate_path(self, command: CommandLiteral, path: Path) -> None:
        """
        Check that the path/command combination is valid.

        Validates:
        1. Path is absolute
        2. Path and command are compatible
        """
        # Check if its an absolute path
        if not path.is_absolute():
            suggestion_message = (
                "The path should be an absolute path, starting with `/`."
            )

            # Only suggest the absolute path if cwd is provided and the path exists
            if self._cwd is not None:
                suggested_path = self._cwd / path
                if suggested_path.exists():
                    suggestion_message += f" Maybe you meant {suggested_path}?"

            raise EditorToolParameterInvalidError(
                "path",
                str(path),
                suggestion_message,
            )

        # Check if path and command are compatible
        if command == "create" and path.exists():
            raise EditorToolParameterInvalidError(
                "path",
                str(path),
                f"File already exists at: {path}. Cannot overwrite files using "
                "command `create`.",
            )
        if command != "create" and not path.exists():
            raise EditorToolParameterInvalidError(
                "path",
                str(path),
                f"The path {path} does not exist. Please provide a valid path.",
            )
        if command != "view":
            if path.is_dir():
                raise EditorToolParameterInvalidError(
                    "path",
                    str(path),
                    f"The path {path} is a directory and only the `view` command can "
                    "be used on directories.",
                )

    def undo_edit(self, path: Path) -> FileEditorObservation:
        """
        Implement the undo_edit command.
        """
        current_text = self.read_file(path)
        old_text = self._history_manager.pop_last_history(path)
        if old_text is None:
            raise ToolError(f"No edit history found for {path}.")

        self.write_file(path, old_text)

        return FileEditorObservation.from_text(
            text=(
                f"Last edit to {path} undone successfully. "
                f"{self._make_output(old_text, str(path))}"
            ),
            command="undo_edit",
            path=str(path),
            prev_exist=True,
            old_content=current_text,
            new_content=old_text,
        )

    def validate_file(self, path: Path) -> None:
        """
        Validate a file for reading or editing operations.

        Args:
            path: Path to the file to validate

        Raises:
            FileValidationError: If the file fails validation
        """
        # Skip validation for directories or non-existent files (for create command)
        if not path.exists() or not path.is_file():
            return

        # Check file size
        file_size = os.path.getsize(path)
        max_size = self._max_file_size
        if file_size > max_size:
            raise FileValidationError(
                path=str(path),
                reason=(
                    f"File is too large ({file_size / 1024 / 1024:.1f}MB). "
                    f"Maximum allowed size is {int(max_size / 1024 / 1024)}MB."
                ),
            )

        # Check file type - allow image files
        file_extension = path.suffix.lower()
        if is_binary(str(path)) and file_extension not in IMAGE_EXTENSIONS:
            raise FileValidationError(
                path=str(path),
                reason=(
                    "File appears to be binary and this file type cannot be read "
                    "or edited by this tool."
                ),
            )

    @with_encoding
    def read_file(
        self,
        path: Path,
        start_line: int | None = None,
        end_line: int | None = None,
        encoding: str = "utf-8",  # Default will be overridden by decorator
    ) -> str:
        """
        Read the content of a file from a given path; raise a ToolError if an
        error occurs.

        Args:
            path: Path to the file to read
            start_line: Optional start line number (1-based). If provided with
                end_line, only reads that range.
            end_line: Optional end line number (1-based). Must be provided with
                start_line.
            encoding: The encoding to use when reading the file (auto-detected by
                decorator)
        """
        self.validate_file(path)
        try:
            if start_line is not None and end_line is not None:
                # Read only the specified line range
                lines = []
                with open(path, encoding=encoding) as f:
                    for i, line in enumerate(f, 1):
                        if i > end_line:
                            break
                        if i >= start_line:
                            lines.append(line)
                return "".join(lines)
            elif start_line is not None or end_line is not None:
                raise ValueError(
                    "Both start_line and end_line must be provided together"
                )
            else:
                # Use line-by-line reading to avoid loading entire file into memory
                with open(path, encoding=encoding) as f:
                    return "".join(f)
        except Exception as e:
            raise ToolError(f"Ran into {e} while trying to read {path}") from None

    def _make_output(
        self,
        snippet_content: str,
        snippet_description: str,
        start_line: int = 1,
        is_converted_markdown: bool = False,
    ) -> str:
        """
        Generate output for the CLI based on the content of a code snippet.
        """
        # If the content is converted from Markdown, we don't need line numbers
        if is_converted_markdown:
            snippet_content = maybe_truncate(
                snippet_content,
                truncate_after=MAX_RESPONSE_LEN_CHAR,
                truncate_notice=BINARY_FILE_CONTENT_TRUNCATED_NOTICE,
            )
            return (
                f"Here's the content of the file {snippet_description} displayed in "
                "Markdown format:\n" + snippet_content + "\n"
            )

        snippet_content = maybe_truncate(
            snippet_content,
            truncate_after=MAX_RESPONSE_LEN_CHAR,
            truncate_notice=TEXT_FILE_CONTENT_TRUNCATED_NOTICE,
        )

        snippet_content = "\n".join(
            [
                f"{i + start_line:6}\t{line}"
                for i, line in enumerate(snippet_content.split("\n"))
            ]
        )
        return (
            f"Here's the result of running `cat -n` on {snippet_description}:\n"
            + snippet_content
            + "\n"
        )

Terminal Tool

Terminal Tool

代码地址

两层架构

  • 接口层:TerminalTool
  • 逻辑层:TerminalExecutor

核心命令

TerminalAction

python
class TerminalAction(Action):
    """Schema for bash command execution."""

    command: str = Field(
        description="The bash command to execute. Can be empty string to view additional logs when previous exit code is `-1`. Can be `C-c` (Ctrl+C) to interrupt the currently running process. Note: You can only execute one bash command at a time. If you need to run multiple commands sequentially, you can use `&&` or `;` to chain them together."  # noqa
    )
    is_input: bool = Field(
        default=False,
        description="If True, the command is an input to the running process. If False, the command is a bash command to be executed in the terminal. Default is False.",  # noqa
    )
    timeout: float | None = Field(
        default=None,
        ge=0,
        description=f"Optional. Sets a maximum time limit (in seconds) for running the command. If the command takes longer than this limit, you’ll be asked whether to continue or stop it. If you don’t set a value, the command will instead pause and ask for confirmation when it produces no new output for {NO_CHANGE_TIMEOUT_SECONDS} seconds. Use a higher value if the command is expected to take a long time (like installation or testing), or if it has a known fixed duration (like sleep).",  # noqa
    )
    reset: bool = Field(
        default=False,
        description="If True, reset the terminal by creating a new session. Use this only when the terminal becomes unresponsive. Note that all previously set environment variables and session state will be lost after reset. Cannot be used with is_input=True.",  # noqa
    )

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation with PS1-style bash prompt."""
        content = Text()

        # Create PS1-style prompt
        content.append("$ ", style="bold green")

        # Add command with syntax highlighting
        if self.command:
            content.append(self.command, style="white")
        else:
            content.append("[empty command]", style="italic")

        # Add metadata if present
        if self.is_input:
            content.append(" ", style="white")
            content.append("(input to running process)", style="yellow")

        if self.timeout is not None:
            content.append(" ", style="white")
            content.append(f"[timeout: {self.timeout}s]", style="cyan")

        if self.reset:
            content.append(" ", style="white")
            content.append("[reset terminal]", style="red bold")

        return content

TerminalObservation

python
class TerminalObservation(Observation):
    """A ToolResult that can be rendered as a CLI output."""

    command: str | None = Field(
        description="The bash command that was executed. Can be empty string if the observation is from a previous command that hit soft timeout and is not yet finished.",  # noqa
    )
    exit_code: int | None = Field(
        default=None,
        description="The exit code of the command. -1 indicates the process hit the soft timeout and is not yet finished.",  # noqa
    )
    timeout: bool = Field(
        default=False, description="Whether the command execution timed out."
    )
    metadata: CmdOutputMetadata = Field(
        default_factory=CmdOutputMetadata,
        description="Additional metadata captured from PS1 after command execution.",
    )
    full_output_save_dir: str | None = Field(
        default=None,
        description="Directory where full output files are saved",
    )

    @property
    def command_id(self) -> int | None:
        """Get the command ID from metadata."""
        return self.metadata.pid

    @property
    def to_llm_content(self) -> Sequence[TextContent | ImageContent]:
        llm_content: list[TextContent | ImageContent] = []

        # If is_error is true, prepend error message
        if self.is_error:
            llm_content.append(TextContent(text=self.ERROR_MESSAGE_HEADER))

        # TerminalObservation always has content as a single TextContent
        content_text = self.text

        ret = f"{self.metadata.prefix}{content_text}{self.metadata.suffix}"
        if self.metadata.working_dir:
            ret += f"\n[Current working directory: {self.metadata.working_dir}]"
        if self.metadata.py_interpreter_path:
            ret += f"\n[Python interpreter: {self.metadata.py_interpreter_path}]"
        if self.metadata.exit_code != -1:
            ret += f"\n[Command finished with exit code {self.metadata.exit_code}]"

        # Use enhanced truncation with file saving if working directory is available
        truncated_text = maybe_truncate(
            content=ret,
            truncate_after=MAX_CMD_OUTPUT_SIZE,
            save_dir=self.full_output_save_dir,
            tool_prefix="bash",
        )
        llm_content.append(TextContent(text=truncated_text))

        return llm_content

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation with terminal-style output formatting."""
        text = Text()

        if self.is_error:
            text.append("❌ ", style="red bold")
            text.append(self.ERROR_MESSAGE_HEADER, style="bold red")

        # TerminalObservation always has content as a single TextContent
        content_text = self.text

        if content_text:
            # Style the output based on content
            output_lines = content_text.split("\n")
            for line in output_lines:
                if line.strip():
                    # Color error-like lines differently
                    if any(
                        keyword in line.lower()
                        for keyword in ["error", "failed", "exception", "traceback"]
                    ):
                        text.append(line, style="red")
                    elif any(
                        keyword in line.lower() for keyword in ["warning", "warn"]
                    ):
                        text.append(line, style="yellow")
                    elif line.startswith("+ "):  # bash -x output
                        text.append(line, style="cyan")
                    else:
                        text.append(line, style="white")
                text.append("\n")

        # Add metadata with styling
        if hasattr(self, "metadata") and self.metadata:
            if self.metadata.working_dir:
                text.append("\n📁 ", style="blue")
                text.append(
                    f"Working directory: {self.metadata.working_dir}", style="blue"
                )

            if self.metadata.py_interpreter_path:
                text.append("\n🐍 ", style="green")
                text.append(
                    f"Python interpreter: {self.metadata.py_interpreter_path}",
                    style="green",
                )

            if (
                hasattr(self.metadata, "exit_code")
                and self.metadata.exit_code is not None
            ):
                if self.metadata.exit_code == 0:
                    text.append("\n✅ ", style="green")
                    text.append(f"Exit code: {self.metadata.exit_code}", style="green")
                elif self.metadata.exit_code == -1:
                    text.append("\n⏳ ", style="yellow")
                    text.append("Process still running (soft timeout)", style="yellow")
                else:
                    text.append("\n❌ ", style="red")
                    text.append(f"Exit code: {self.metadata.exit_code}", style="red")

        return text

TerminalTool (定义和描述)

python
TOOL_DESCRIPTION = """Execute a bash command in the terminal within a persistent shell session.


### Command Execution
* One command at a time: You can only execute one bash command at a time. If you need to run multiple commands sequentially, use `&&` or `;` to chain them together.
* Persistent session: Commands execute in a persistent shell session where environment variables, virtual environments, and working directory persist between commands.
* Soft timeout: Commands have a soft timeout of 10 seconds, once that's reached, you have the option to continue or interrupt the command (see section below for details)
* Shell options: Do NOT use `set -e`, `set -eu`, or `set -euo pipefail` in shell scripts or commands in this environment. The runtime may not support them and can cause unusable shell sessions. If you want to run multi-line bash commands, write the commands to a file and then run it, instead.

### Long-running Commands
* For commands that may run indefinitely, run them in the background and redirect output to a file, e.g. `python3 app.py > server.log 2>&1 &`.
* For commands that may run for a long time (e.g. installation or testing commands), or commands that run for a fixed amount of time (e.g. sleep), you should set the "timeout" parameter of your function call to an appropriate value.
* If a bash command returns exit code `-1`, this means the process hit the soft timeout and is not yet finished. By setting `is_input` to `true`, you can:
  - Send empty `command` to retrieve additional logs
  - Send text (set `command` to the text) to STDIN of the running process
  - Send control commands like `C-c` (Ctrl+C), `C-d` (Ctrl+D), or `C-z` (Ctrl+Z) to interrupt the process
  - If you do C-c, you can re-start the process with a longer "timeout" parameter to let it run to completion

### Best Practices
* Directory verification: Before creating new directories or files, first verify the parent directory exists and is the correct location.
* Directory management: Try to maintain working directory by using absolute paths and avoiding excessive use of `cd`.

### Output Handling
* Output truncation: If the output exceeds a maximum length, it will be truncated before being returned.

### Terminal Reset
* Terminal reset: If the terminal becomes unresponsive, you can set the "reset" parameter to `true` to create a new terminal session. This will terminate the current session and start fresh.
* Warning: Resetting the terminal will lose all previously set environment variables, working directory changes, and any running processes. Use this only when the terminal stops responding to commands.
"""  # noqa


class TerminalTool(ToolDefinition[TerminalAction, TerminalObservation]):
    """A ToolDefinition subclass that automatically initializes a TerminalExecutor with auto-detection."""  # noqa: E501

    @classmethod
    def create(
        cls,
        conv_state: "ConversationState",
        username: str | None = None,
        no_change_timeout_seconds: int | None = None,
        terminal_type: Literal["tmux", "subprocess"] | None = None,
        shell_path: str | None = None,
        executor: ToolExecutor | None = None,
    ) -> Sequence["TerminalTool"]:
        """Initialize TerminalTool with executor parameters.

        Args:
            conv_state: Conversation state to get working directory from.
                         If provided, working_dir will be taken from
                         conv_state.workspace
            username: Optional username for the bash session
            no_change_timeout_seconds: Timeout for no output change
            terminal_type: Force a specific session type:
                         ('tmux', 'subprocess').
                         If None, auto-detect based on system capabilities:
                         - On Windows: PowerShell if available, otherwise subprocess
                         - On Unix-like: tmux if available, otherwise subprocess
            shell_path: Path to the shell binary (for subprocess terminal type only).
                       If None, will auto-detect bash from PATH.
        """
        # Import here to avoid circular imports
        from openhands.tools.terminal.impl import TerminalExecutor

        working_dir = conv_state.workspace.working_dir
        if not os.path.isdir(working_dir):
            raise ValueError(f"working_dir '{working_dir}' is not a valid directory")

        # Initialize the executor
        if executor is None:
            executor = TerminalExecutor(
                working_dir=working_dir,
                username=username,
                no_change_timeout_seconds=no_change_timeout_seconds,
                terminal_type=terminal_type,
                shell_path=shell_path,
                full_output_save_dir=conv_state.env_observation_persistence_dir,
            )

        # Initialize the parent ToolDefinition with the executor
        return [
            cls(
                action_type=TerminalAction,
                observation_type=TerminalObservation,
                description=TOOL_DESCRIPTION,
                annotations=ToolAnnotations(
                    title="terminal",
                    readOnlyHint=False,
                    destructiveHint=True,
                    idempotentHint=False,
                    openWorldHint=True,
                ),
                executor=executor,
            )
        ]


# Automatically register the tool when this module is imported
register_tool(TerminalTool.name, TerminalTool)
markdown
Bash 终端工具使用指南
在持久化的终端会话中执行 Bash 命令。

### 命令执行
单条执行:一次只能执行一条 Bash 命令。如果需要按顺序运行多条命令,请使用 && 或 ; 进行链式连接。

持久化会话:命令在持久化的 Shell 会话中运行。环境变量、虚拟环境和工作目录在不同命令之间会保持生效。

软超时:命令设有 10 秒的软超时。一旦达到该时间,你可以选择继续运行或中断该命令(详见下文)。

Shell 选项限制:切勿在 Shell 脚本或命令中使用 set -e、set -eu 或 set -euo pipefail。当前的运行时环境可能不支持这些选项,并可能导致 Shell 会话无法使用。如果需要运行多行 Bash 命令,请先将命令写入文件,然后再运行该文件。

### 长期运行的命令
无限运行的命令:对于可能无限期运行的命令,请将其放入后台运行并重定向输出到文件。例如:python3 app.py > server.log 2>&1 &。

耗时较长的命令:对于安装、测试或固定时长的命令(如 sleep),你应该在函数调用中设置合适的 timeout(超时)参数。

超时处理(状态码 -1):如果 Bash 命令返回退出代码 -1,意味着进程触发了软超时但尚未结束。通过将 is_input 设置为 true,你可以:

发送空的 command(命令)来检索更多日志。

发送文本到正在运行的进程的 STDIN(标准输入)。

发送控制命令,如 C-c (Ctrl+C)、C-d (Ctrl+D) 或 C-z (Ctrl+Z) 来中断进程。

如果执行了 C-c,你可以调大 timeout 参数重新启动进程,以便让其运行完成。

### 最佳实践
目录验证:在创建新目录或文件之前,先验证父目录是否存在且位置正确。

目录管理:尽量通过使用绝对路径来维持工作目录,避免过度使用 cd 命令。

### 输出处理
输出截断:如果输出超过最大长度限制,在返回结果前内容会被截断。

### 终端重置
终端重置:如果终端变得无响应,你可以将 reset 参数设置为 true 来创建一个新的终端会话。这会终止当前会话并重新开始。

警告:重置终端会导致所有之前设置的环境变量、工作目录更改以及正在运行的进程丢失。仅在终端完全停止响应命令时才使用此功能。

关键点解析(供开发者参考):
持久化(Persistence):这是 OpenHands 的一个核心特点。你在第一个步骤里 export API_KEY=xxx,在后面的步骤里这个变量依然存在,不需要重复设置。

软超时(Soft Timeout):为了防止模型因为一个卡死的命令而永久等待,系统设定了 10 秒。如果返回 -1,模型可以决定是继续等待日志(发送空指令)还是杀掉进程。

禁止 set -e:这是为了防止 Shell 会话因为某个非致命错误而意外退出。

交互式输入:通过 is_input 字段,AI 可以像人类一样与需要输入 y/n 或密码的终端程序进行简单的交互。

TerminalExecutor (执行包装,对内)

TerminalExecutor

持久化Session 管理

  • create_terminal_session 自动选择最合适后端(tmuxsubprocess)。
  • 状态保持运行命令+维护Shell 状态
    • 在上一个命令 cd 切换了目录,下一个命令依然会在那个目录下执行。
  • 重置机制 (reset):终端卡死或不可用,它可以彻底杀掉旧进程并干净新会话。

环境变量与隐私安全 (Secrets & Envs)

  • 自动注入 (_export_envs)
  • 敏感信息脱敏 (Masking):防止终端泄露敏感信息给AI,调用 mask_secrets_in_output
python
import json
from typing import TYPE_CHECKING, Literal

from openhands.sdk.llm import TextContent
from openhands.sdk.logger import get_logger
from openhands.sdk.tool import ToolExecutor


if TYPE_CHECKING:
    from openhands.sdk.conversation import LocalConversation
from openhands.tools.terminal.definition import (
    TerminalAction,
    TerminalObservation,
)
from openhands.tools.terminal.terminal.factory import create_terminal_session
from openhands.tools.terminal.terminal.terminal_session import TerminalSession


logger = get_logger(__name__)


class TerminalExecutor(ToolExecutor[TerminalAction, TerminalObservation]):
    session: TerminalSession
    shell_path: str | None

    def __init__(
        self,
        working_dir: str,
        username: str | None = None,
        no_change_timeout_seconds: int | None = None,
        terminal_type: Literal["tmux", "subprocess"] | None = None,
        shell_path: str | None = None,
        full_output_save_dir: str | None = None,
    ):
        """Initialize TerminalExecutor with auto-detected or specified session type.

        Args:
            working_dir: Working directory for bash commands
            username: Optional username for the bash session
            no_change_timeout_seconds: Timeout for no output change
            terminal_type: Force a specific session type:
                         ('tmux', 'subprocess').
                         If None, auto-detect based on system capabilities
            shell_path: Path to the shell binary (for subprocess terminal type only).
                       If None, will auto-detect bash from PATH.
            full_output_save_dir: Path to directory to save full output
                                  logs and files, used when truncation is needed.
        """
        self.shell_path = shell_path
        # 创建session
        self.session = create_terminal_session( 
            work_dir=working_dir, 
            username=username, 
            no_change_timeout_seconds=no_change_timeout_seconds, 
            terminal_type=terminal_type, 
            shell_path=shell_path,
        )
        self.session.initialize()
        self.full_output_save_dir: str | None = full_output_save_dir
        logger.info(
            f"TerminalExecutor initialized with working_dir: {working_dir}, "
            f"username: {username}, "
            f"terminal_type: {terminal_type or self.session.__class__.__name__}"
        )

    def _export_envs(
        self, action: TerminalAction, conversation: "LocalConversation | None" = None
    ) -> None:
        if not action.command.strip():
            return

        if action.is_input:
            return

        # Get secrets from conversation
        env_vars = {}
        if conversation is not None:
            try:
                secret_registry = conversation.state.secret_registry
                env_vars = secret_registry.get_secrets_as_env_vars(action.command)
            except Exception:
                env_vars = {}

        if not env_vars:
            return

        export_statements = []
        for key, value in env_vars.items():
            export_statements.append(f"export {key}={json.dumps(value)}")
        exports_cmd = " && ".join(export_statements)

        logger.debug(f"Exporting {len(env_vars)} environment variables before command")

        # Execute the export command separately to persist env in the session
        _ = self.session.execute(
            TerminalAction(
                command=exports_cmd,
                is_input=False,
                timeout=action.timeout,
            )
        )

    def reset(self) -> TerminalObservation:
        """Reset the terminal session by creating a new instance.

        Returns:
            TerminalObservation with reset confirmation message
        """
        original_work_dir = self.session.work_dir
        original_username = self.session.username
        original_no_change_timeout = self.session.no_change_timeout_seconds

        self.session.close()
        self.session = create_terminal_session(
            work_dir=original_work_dir,
            username=original_username,
            no_change_timeout_seconds=original_no_change_timeout,
            terminal_type=None,  # Let it auto-detect like before
            shell_path=self.shell_path,
        )
        self.session.initialize()

        logger.info(
            f"Terminal session reset successfully with working_dir: {original_work_dir}"
        )

        return TerminalObservation.from_text(
            text=(
                "Terminal session has been reset. All previous environment "
                "variables and session state have been cleared."
            ),
            command="[RESET]",
            exit_code=0,
        )

    def __call__(
        self,
        action: TerminalAction,
        conversation: "LocalConversation | None" = None,
    ) -> TerminalObservation:
        # Validate field combinations
        if action.reset and action.is_input:
            raise ValueError("Cannot use reset=True with is_input=True")

        if action.reset or self.session._closed:
            reset_result = self.reset()

            # Handle command execution after reset
            if action.command.strip():
                command_action = TerminalAction(
                    command=action.command,
                    timeout=action.timeout,
                    is_input=False,  # is_input validated to be False when reset=True
                )
                self._export_envs(command_action, conversation)
                # 执行
                command_result = self.session.execute(command_action) 

                # Extract text from content
                reset_text = reset_result.text
                command_text = command_result.text

                observation = command_result.model_copy(
                    update={
                        "content": [
                            TextContent(text=f"{reset_text}\n\n{command_text}")
                        ],
                        "command": f"[RESET] {action.command}",
                    }
                )
            else:
                # Reset only, no command to execute
                observation = reset_result
        else:
            # If env keys detected, export env values to bash as a separate action first
            self._export_envs(action, conversation)
            observation = self.session.execute(action)

        # Apply automatic secrets masking
        content_text = observation.text

        if content_text and conversation is not None:
            try:
                secret_registry = conversation.state.secret_registry
                masked_content = secret_registry.mask_secrets_in_output(content_text)
                if masked_content:
                    data = observation.model_dump(
                        exclude={"content", "full_output_save_dir"}
                    )
                    return TerminalObservation.from_text(
                        text=masked_content,
                        full_output_save_dir=self.full_output_save_dir,
                        **data,
                    )
            except Exception:
                pass

        return observation

    def close(self) -> None:
        """Close the terminal session and clean up resources."""
        if hasattr(self, "session"):
            self.session.close()

TaskTrackerTool

Terminal Tool

代码地址

背景

  • 复杂开发任务,AI容易走丢忘记进度
  • Task Tracker:让 AI 能够结构化地规划任务记录状态并同步给用户

2层架构

  • 接口层:TaskTrackerTool
  • 逻辑层:TaskTrackerExecutor

意义

  • 对AI:强制 AI 在动手前先思考Decomposition),并记录当前处在哪个环节外部记忆体

    • 它把AI从散漫开发者,变成了先计划再执行随时记录专业工程师
  • 对用户:提高了透明度不用读AI冗长思考日志只需看TaskList,就能知整体完成度

TaskTrackerAction

TaskTracker Action

TaskItem

  • 定义了单任务结构

  • title: 任务标题。

  • notes: 详细笔记(比如记录实现细节或遇到的坑)。

  • 3种status: todo(待办)、in_progress(进行中)、done(完成)。

TaskTrackerAction 只有2个命令

  • view: 查看当前清单。
  • plan: 提交一份完整的、更新后的任务列表(它是全量更新,不是增量更新)。
python
# Type alias for task tracker status
TaskTrackerStatusType = Literal["todo", "in_progress", "done"]


class TaskItem(BaseModel):
    title: str = Field(..., description="A brief title for the task.")
    notes: str = Field("", description="Additional details or notes about the task.")
    status: TaskTrackerStatusType = Field(
        "todo",
        description="The current status of the task. "
        "One of 'todo', 'in_progress', or 'done'.",
    )


class TaskTrackerAction(Action):
    """An action where the agent writes or updates a task list for task management."""

    command: Literal["view", "plan"] = Field(
        default="view",
        description="The command to execute. `view` shows the current task list. `plan` creates or updates the task list based on provided requirements and progress. Always `view` the current list before making changes.",  # noqa: E501
    )
    task_list: list[TaskItem] = Field(
        default_factory=list,
        description="The full task list. Required parameter of `plan` command.",
    )

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation with task management styling."""
        content = Text()

        # Add command header with icon
        if self.command == "view":
            content.append("👀 ", style="blue")
            content.append("View Task List", style="blue")
        else:  # plan
            content.append("📋 ", style="green")
            content.append("Update Task List", style="green")

        # Show task count if planning
        if self.command == "plan" and self.task_list:
            content.append(f" ({len(self.task_list)} tasks)")

        return content

TaskTrackerObservation

python

class TaskTrackerObservation(Observation):
    """This data class represents the result of a task tracking operation."""

    command: Literal["view", "plan"] = Field(
        description='The command that was executed: "view" or "plan".'
    )
    task_list: list[TaskItem] = Field(
        default_factory=list, description="The current task list"
    )

    @property
    def visualize(self) -> Text:
        """Return Rich Text representation with task list formatting."""
        text = Text()

        if self.is_error:
            text.append("❌ ", style="red bold")
            text.append(self.ERROR_MESSAGE_HEADER, style="bold red")

        if self.task_list:
            # Count tasks by status
            todo_count = sum(1 for task in self.task_list if task.status == "todo")
            in_progress_count = sum(
                1 for task in self.task_list if task.status == "in_progress"
            )
            done_count = sum(1 for task in self.task_list if task.status == "done")

            # Show status summary
            if self.command == "plan":
                text.append("✅ ", style="green")
                text.append("Task list updated: ", style="green")
            else:  # view command
                text.append("📋 ", style="blue")
                text.append("Current task list: ", style="blue")

            # Status counts
            status_parts = []
            if todo_count:
                status_parts.append(f"{todo_count} todo")
            if in_progress_count:
                status_parts.append(f"{in_progress_count} in progress")
            if done_count:
                status_parts.append(f"{done_count} done")

            if status_parts:
                text.append(", ".join(status_parts), style="white")
                text.append("\n\n")

            # Show the actual task list
            for i, task in enumerate(self.task_list, 1):
                # Status icon
                if task.status == "done":
                    text.append("✅ ", style="green")
                elif task.status == "in_progress":
                    text.append("🔄 ", style="yellow")
                else:  # todo
                    text.append("⏳ ", style="blue")

                # Task title
                text.append(f"{i}. {task.title}", style="white")

                # NEW: show notes under the title if present
                if task.notes:
                    text.append("\n   Notes: " + task.notes, style="italic")

                if i < len(self.task_list):
                    text.append("\n")
        else:
            text.append("📝 ", style="blue")
            text.append("Task list is empty")

        return text

TaskTrackerExecutor

TaskTrackerEexcutor

持久化 (TASKS.json)

  • 不同于简单的内存变量,它会将任务列表保存到磁盘。即使 AI 重启或对话中断,进度也不会丢失

格式化输出

  • 它重写了 visualize 属性。
  • 在 OpenHands 界面,会看到带图标(⏳、🔄、✅)的精美进度条,而非枯燥JSON。
python
class TaskTrackerExecutor(ToolExecutor[TaskTrackerAction, TaskTrackerObservation]):
    """Executor for the task tracker tool."""

    save_dir: Path | None

    def __init__(self, save_dir: str | None = None):
        """Initialize TaskTrackerExecutor.

        Args:
            save_dir: Optional directory to save tasks to. If provided, tasks will be
                     persisted to save_dir/TASKS.md
        """
        self.save_dir = Path(save_dir) if save_dir else None
        logger.info(f"TaskTrackerExecutor initialized with save_dir: {self.save_dir}")
        self._task_list: list[TaskItem] = []

        # Load existing tasks if save_dir is provided and file exists
        if self.save_dir:
            self._load_tasks()

    def __call__(
        self,
        action: TaskTrackerAction,
        conversation: "LocalConversation | None" = None,  # noqa: ARG002
    ) -> TaskTrackerObservation:
        """Execute the task tracker action."""
        if action.command == "plan":
            # Update the task list
            self._task_list = action.task_list
            # Save to file if save_dir is provided
            if self.save_dir:
                self._save_tasks()
            return TaskTrackerObservation.from_text(
                text=(
                    f"Task list has been updated with {len(self._task_list)} item(s)."
                ),
                command=action.command,
                task_list=self._task_list,
            )
        elif action.command == "view":
            # Return the current task list
            if not self._task_list:
                return TaskTrackerObservation.from_text(
                    text=('No task list found. Use the "plan" command to create one.'),
                    command=action.command,
                    task_list=[],
                )
            content = self._format_task_list(self._task_list)
            return TaskTrackerObservation.from_text(
                text=content,
                command=action.command,
                task_list=self._task_list,
            )
        else:
            return TaskTrackerObservation.from_text(
                text=(
                    f"Unknown command: {action.command}. "
                    'Supported commands are "view" and "plan".'
                ),
                is_error=True,
                command=action.command,
                task_list=[],
            )

    def _format_task_list(self, task_list: list[TaskItem]) -> str:
        """Format the task list for display."""
        if not task_list:
            return "No tasks in the list."

        content = "# Task List\n\n"
        for i, task in enumerate(task_list, 1):
            status_icon = {"todo": "⏳", "in_progress": "🔄", "done": "✅"}.get(
                task.status, "⏳"
            )

            title = task.title
            notes = task.notes

            content += f"{i}. {status_icon} {title}\n"
            if notes:
                content += f"   {notes}\n"
            content += "\n"

        return content.strip()

    def _load_tasks(self) -> None:
        """Load tasks from the TASKS.json file if it exists."""
        if not self.save_dir:
            return

        tasks_file = self.save_dir / "TASKS.json"
        if not tasks_file.exists():
            return

        try:
            with open(tasks_file, encoding="utf-8") as f:
                self._task_list = [TaskItem.model_validate(d) for d in json.load(f)]
        except (OSError, json.JSONDecodeError, TypeError, ValidationError) as e:
            logger.warning(
                f"Failed to load tasks from {tasks_file}: {e}. Starting with "
                "an empty task list."
            )
            self._task_list = []

    def _save_tasks(self) -> None:
        """Save tasks to the TASKS.json file."""
        if not self.save_dir:
            return

        tasks_file = self.save_dir / "TASKS.json"
        try:
            # Create the directory if it doesn't exist
            self.save_dir.mkdir(parents=True, exist_ok=True)

            with open(tasks_file, "w", encoding="utf-8") as f:
                json.dump([task.model_dump() for task in self._task_list], f, indent=2)
        except OSError as e:
            logger.warning(f"Failed to save tasks to {tasks_file}: {e}")
            pass

TaskTrackerTool

python
# Tool definition with detailed description
TASK_TRACKER_DESCRIPTION = """This tool provides structured task management capabilities for development workflows.
It enables systematic tracking of work items, progress monitoring, and efficient
organization of complex development activities.

The tool maintains visibility into project status and helps communicate
progress effectively to users.

## Application Guidelines

Utilize this tool in the following situations:

1. Multi-phase development work - When projects involve multiple sequential or
   parallel activities
2. Complex implementation tasks - Work requiring systematic planning and
   coordination across multiple components
3. Explicit user request for task organization - When users specifically ask
   for structured task management
4. Multiple concurrent requirements - When users present several work items
   that need coordination
5. Project initiation - Capture and organize user requirements at project start
6. Work commencement - Update task status to in_progress before beginning
   implementation. Maintain focus by limiting active work to one task
7. Task completion - Update status to done and identify any additional work
   that emerged during implementation

## Situations Where Tool Usage Is Unnecessary

Avoid using this tool when:

1. Single atomic tasks that require no decomposition
2. Trivial operations where tracking adds no organizational value
3. Simple activities completable in minimal steps
4. Pure information exchange or discussion

Note: For single straightforward tasks, proceed with direct implementation
rather than creating tracking overhead.

## Usage Scenarios

**Scenario A: Feature Development with Validation**
User request: "Build a user authentication system with login/logout functionality.
Don't forget to include input validation and error handling!"

Response approach: I'll implement a user authentication system with comprehensive
validation. Let me organize this work systematically.

*Task breakdown includes:*
1. Design authentication API endpoints and data models
2. Implement user login/logout controllers with session management
3. Add client-side form validation and error display
4. Create backend input sanitization and security checks
5. Implement comprehensive error handling for edge cases
6. Perform integration testing and fix any issues

**Scenario B: Codebase Refactoring**
User request: "I need to update all instances of the method 'fetchData' to
'retrieveInformation' throughout the entire project"

Response approach: Let me first analyze your codebase to identify all locations
where 'fetchData' appears.

*After code analysis*

I've located 12 occurrences of 'fetchData' across 6 files in your project.
I'll create a systematic plan to update these references.

*Organized task list includes specific file-by-file updates*

**Scenario C: Multi-feature Development**
User request: "Set up a blog platform with these components: admin dashboard,
article management, comment system, and user profiles."

Response approach: I'll help build your blog platform. Let me break down these
components into manageable implementation phases.

*Creates structured plan with each feature decomposed into specific development tasks*

## Counter-examples - Direct Implementation Preferred

**Counter-example A: Simple inquiry**
User request: "What's the syntax for a for loop in JavaScript?"

Response approach: Here's how to write a for loop in JavaScript:

```javascript
for (let i = 0; i < array.length; i++) {
    console.log(array[i]);
}
```

*No task tracking needed - single informational response*

**Counter-example B: Minor modification**
User request: "Please add a docstring to the processPayment function explaining
its parameters."

Response approach: I'll add a docstring to explain the processPayment function.

*Direct edit without task tracking for simple modification*

## Status Management and Workflow

1. **Status Values**: Track work using these states:
   - todo: Not yet initiated
   - in_progress: Currently active (maintain single focus)
   - done: Successfully completed

2. **Workflow Practices**:
   - Update status dynamically as work progresses
   - Mark completion immediately upon task finish
   - Limit active work to ONE task at any given time
   - Complete current activities before initiating new ones
   - Remove obsolete tasks from tracking entirely

3. **Completion Criteria**:
   - Mark tasks as done only when fully achieved
   - Keep status as in_progress if errors, blocks, or partial completion exist
   - Create new tasks for discovered issues or dependencies
   - Never mark done when:
       - Test suites are failing
       - Implementation remains incomplete
       - Unresolved errors persist
       - Required resources are unavailable

4. **Task Organization**:
   - Write precise, actionable descriptions
   - Decompose complex work into manageable units
   - Use descriptive, clear naming conventions

When uncertain, favor using this tool. Proactive task management demonstrates
systematic approach and ensures comprehensive requirement fulfillment."""  # noqa: E501


class TaskTrackerTool(ToolDefinition[TaskTrackerAction, TaskTrackerObservation]):
    """A ToolDefinition subclass that automatically initializes a TaskTrackerExecutor."""  # noqa: E501

    @classmethod
    def create(cls, conv_state: "ConversationState") -> Sequence["TaskTrackerTool"]:
        """Initialize TaskTrackerTool with a TaskTrackerExecutor.

        Args:
            conv_state: Conversation state to get persistence directory from.
                         If provided, save_dir will be taken from
                         conv_state.persistence_dir
        """
        executor = TaskTrackerExecutor(save_dir=conv_state.persistence_dir)

        # Initialize the parent Tool with the executor
        return [
            cls(
                description=TASK_TRACKER_DESCRIPTION,
                action_type=TaskTrackerAction,
                observation_type=TaskTrackerObservation,
                annotations=ToolAnnotations(
                    readOnlyHint=False,
                    destructiveHint=False,
                    idempotentHint=True,
                    openWorldHint=False,
                ),
                executor=executor,
            )
        ]


# Automatically register the tool when this module is imported
register_tool(TaskTrackerTool.name, TaskTrackerTool)

OpenHands Agent

AgentBase

AgentBase

代码

llm

  • system_message:可能根据llm.model,加载特定模板。

tools和_tools 脚手架

  • tools:配置态,定义的技能,具体见上文关键tools。
  • _tools:运行态,在_initialize 阶段真正实例化的tools,并行解析加载的。

agent_context

  • 上下文。在生成system_message时,会把agent_context追加进去
  • 比如:AGENTS.md 里写了 不要使用sudo命令,则会追加到system_message末尾

filter_tools_regex 行为约束

  • 去掉一些tool,非常实用,用于对 Agent 的权限进行“微操”,

include_default_tools

  • 默认工具:FinishTool, ThinkTool
python
from openhands.sdk.context.agent_context import AgentContext
from openhands.sdk.context.condenser import CondenserBase
from openhands.sdk.context.prompts.prompt import render_template
from openhands.sdk.critic.base import CriticBase
from openhands.sdk.llm import LLM
from openhands.sdk.llm.utils.model_prompt_spec import get_model_prompt_spec
from openhands.sdk.logger import get_logger
from openhands.sdk.mcp import create_mcp_tools
from openhands.sdk.tool import (
    BUILT_IN_TOOL_CLASSES,
    BUILT_IN_TOOLS,
    Tool,
    ToolDefinition,
    resolve_tool,
)
from openhands.sdk.utils.models import DiscriminatedUnionMixin


if TYPE_CHECKING:
    from openhands.sdk.conversation import ConversationState, LocalConversation
    from openhands.sdk.conversation.types import (
        ConversationCallbackType,
        ConversationTokenCallbackType,
    )

logger = get_logger(__name__)


class AgentBase(DiscriminatedUnionMixin, ABC):
    """Abstract base class for OpenHands agents.

    Agents are stateless and should be fully defined by their configuration.
    This base class provides the common interface and functionality that all
    agent implementations must follow.
    """

    model_config = ConfigDict(
        frozen=True,
        arbitrary_types_allowed=True,
    )

    llm: LLM = Field(
        ...,
        description="LLM configuration for the agent.",
        examples=[
            {
                "model": "litellm_proxy/anthropic/claude-sonnet-4-5-20250929",
                "base_url": "https://llm-proxy.eval.all-hands.dev",
                "api_key": "your_api_key_here",
            }
        ],
    )
    tools: list[Tool] = Field(
        default_factory=list,
        description="List of tools to initialize for the agent.",
        examples=[
            {"name": "TerminalTool", "params": {}},
            {"name": "FileEditorTool", "params": {}},
            {
                "name": "TaskTrackerTool",
                "params": {},
            },
        ],
    )
    mcp_config: dict[str, Any] = Field(
        default_factory=dict,
        description="Optional MCP configuration dictionary to create MCP tools.",
        examples=[
            {"mcpServers": {"fetch": {"command": "uvx", "args": ["mcp-server-fetch"]}}}
        ],
    )
    filter_tools_regex: str | None = Field(
        default=None,
        description="Optional regex to filter the tools available to the agent by name."
        " This is applied after any tools provided in `tools` and any MCP tools are"
        " added.",
        examples=["^(?!repomix)(.*)|^repomix.*pack_codebase.*$"],
    )
    include_default_tools: list[str] = Field(
        default_factory=lambda: [tool.__name__ for tool in BUILT_IN_TOOLS],
        description=(
            "List of default tool class names to include. By default, the agent "
            "includes 'FinishTool' and 'ThinkTool'. Set to an empty list to disable "
            "all default tools, or provide a subset to include only specific ones. "
            "Example: include_default_tools=['FinishTool'] to only include FinishTool, "
            "or include_default_tools=[] to disable all default tools."
        ),
        examples=[["FinishTool", "ThinkTool"], ["FinishTool"], []],
    )
    agent_context: AgentContext | None = Field(
        default=None,
        description="Optional AgentContext to initialize "
        "the agent with specific context.",
        examples=[
            {
                "skills": [
                    {
                        "name": "AGENTS.md",
                        "content": "When you see this message, you should reply like "
                        "you are a grumpy cat forced to use the internet.",
                        "type": "repo",
                    },
                    {
                        "name": "flarglebargle",
                        "content": (
                            "IMPORTANT! The user has said the magic word "
                            '"flarglebargle". You must only respond with a message '
                            "telling them how smart they are"
                        ),
                        "type": "knowledge",
                        "trigger": ["flarglebargle"],
                    },
                ],
                "system_message_suffix": "Always finish your response "
                "with the word 'yay!'",
                "user_message_prefix": "The first character of your "
                "response should be 'I'",
            }
        ],
    )
    system_prompt_filename: str = Field(
        default="system_prompt.j2",
        description=(
            "System prompt template filename. Can be either:\n"
            "- A relative filename (e.g., 'system_prompt.j2') loaded from the "
            "agent's prompts directory\n"
            "- An absolute path (e.g., '/path/to/custom_prompt.j2')"
        ),
    )
    security_policy_filename: str = Field(
        default="security_policy.j2",
        description=(
            "Security policy template filename. Can be either:\n"
            "- A relative filename (e.g., 'security_policy.j2') loaded from the "
            "agent's prompts directory\n"
            "- An absolute path (e.g., '/path/to/custom_security_policy.j2')"
        ),
    )
    system_prompt_kwargs: dict[str, object] = Field(
        default_factory=dict,
        description="Optional kwargs to pass to the system prompt Jinja2 template.",
        examples=[{"cli_mode": True}],
    )

    condenser: CondenserBase | None = Field(
        default=None,
        description="Optional condenser to use for condensing conversation history.",
        examples=[
            {
                "kind": "LLMSummarizingCondenser",
                "llm": {
                    "model": "litellm_proxy/anthropic/claude-sonnet-4-5-20250929",
                    "base_url": "https://llm-proxy.eval.all-hands.dev",
                    "api_key": "your_api_key_here",
                },
                "max_size": 80,
                "keep_first": 10,
            }
        ],
    )

    critic: CriticBase | None = Field(
        default=None,
        description=(
            "EXPERIMENTAL: Optional critic to evaluate agent actions and messages "
            "in real-time. API and behavior may change without notice. "
            "May impact performance, especially in 'all_actions' mode."
        ),
        examples=[{"kind": "AgentFinishedCritic"}],
    )

    # Runtime materialized tools; private and non-serializable
    _tools: dict[str, ToolDefinition] = PrivateAttr(default_factory=dict)
    _initialized: bool = PrivateAttr(default=False)

    @property
    def prompt_dir(self) -> str:
        """Returns the directory where this class's module file is located."""
        module = sys.modules[self.__class__.__module__]
        module_file = module.__file__  # e.g. ".../mypackage/mymodule.py"
        if module_file is None:
            raise ValueError(f"Module file for {module} is None")
        return os.path.join(os.path.dirname(module_file), "prompts")

    @property
    def name(self) -> str:
        """Returns the name of the Agent."""
        return self.__class__.__name__

    @property
    def system_message(self) -> str:
        """Compute system message on-demand to maintain statelessness."""
        template_kwargs = dict(self.system_prompt_kwargs)
        # Add security_policy_filename to template kwargs
        template_kwargs["security_policy_filename"] = self.security_policy_filename
        template_kwargs.setdefault("model_name", self.llm.model)
        if (
            "model_family" not in template_kwargs
            or "model_variant" not in template_kwargs
        ):
            spec = get_model_prompt_spec(
                self.llm.model, getattr(self.llm, "model_canonical_name", None)
            )
            if "model_family" not in template_kwargs and spec.family:
                template_kwargs["model_family"] = spec.family
            if "model_variant" not in template_kwargs and spec.variant:
                template_kwargs["model_variant"] = spec.variant
        system_message = render_template(
            prompt_dir=self.prompt_dir,
            template_name=self.system_prompt_filename,
            **template_kwargs,
        )
        if self.agent_context:
            _system_message_suffix = self.agent_context.get_system_message_suffix(
                llm_model=self.llm.model,
                llm_model_canonical=self.llm.model_canonical_name,
            )
            if _system_message_suffix:
                system_message += "\n\n" + _system_message_suffix
        return system_message

    def init_state(
        self,
        state: ConversationState,
        on_event: ConversationCallbackType,  # noqa: ARG002
    ) -> None:
        """Initialize the empty conversation state to prepare the agent for user
        messages.

        Typically this involves adding system message

        NOTE: state will be mutated in-place.
        """
        self._initialize(state)

    def _initialize(self, state: ConversationState):
        """Create an AgentBase instance from an AgentSpec."""

        if self._initialized:
            logger.warning("Agent already initialized; skipping re-initialization.")
            return

        tools: list[ToolDefinition] = []

        # Use ThreadPoolExecutor to parallelize tool resolution
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = []

            # Submit tool resolution tasks
            for tool_spec in self.tools:
                future = executor.submit(resolve_tool, tool_spec, state)
                futures.append(future)

            # Submit MCP tools creation if configured
            if self.mcp_config:
                future = executor.submit(create_mcp_tools, self.mcp_config, 30)
                futures.append(future)

            # Collect results as they complete
            for future in futures:
                result = future.result()
                tools.extend(result)

        logger.info(
            f"Loaded {len(tools)} tools from spec: {[tool.name for tool in tools]}"
        )
        if self.filter_tools_regex:
            pattern = re.compile(self.filter_tools_regex)
            tools = [tool for tool in tools if pattern.match(tool.name)]
            logger.info(
                f"Filtered to {len(tools)} tools after applying regex filter: "
                f"{[tool.name for tool in tools]}",
            )

        # Include default tools from include_default_tools; not subject to regex
        # filtering. Use explicit mapping to resolve tool class names.
        for tool_name in self.include_default_tools:
            tool_class = BUILT_IN_TOOL_CLASSES.get(tool_name)
            if tool_class is None:
                raise ValueError(
                    f"Unknown built-in tool class: '{tool_name}'. "
                    f"Expected one of: {list(BUILT_IN_TOOL_CLASSES.keys())}"
                )
            tool_instances = tool_class.create(state)
            tools.extend(tool_instances)

        # Check tool types
        for tool in tools:
            if not isinstance(tool, ToolDefinition):
                raise ValueError(
                    f"Tool {tool} is not an instance of 'ToolDefinition'. "
                    f"Got type: {type(tool)}"
                )

        # Check name duplicates
        tool_names = [tool.name for tool in tools]
        if len(tool_names) != len(set(tool_names)):
            duplicates = set(name for name in tool_names if tool_names.count(name) > 1)
            raise ValueError(f"Duplicate tool names found: {duplicates}")

        # Store tools in a dict for easy access
        self._tools = {tool.name: tool for tool in tools}
        self._initialized = True

    @abstractmethod
    def step(
        self,
        conversation: LocalConversation,
        on_event: ConversationCallbackType,
        on_token: ConversationTokenCallbackType | None = None,
    ) -> None:
        """Taking a step in the conversation.

        Typically this involves:
        1. Making a LLM call
        2. Executing the tool
        3. Updating the conversation state with
            LLM calls (role="assistant") and tool results (role="tool")
        4.1 If conversation is finished, set state.execution_status to FINISHED
        4.2 Otherwise, just return, Conversation will kick off the next step

        If the underlying LLM supports streaming, partial deltas are forwarded to
        ``on_token`` before the full response is returned.

        NOTE: state will be mutated in-place.
        """

    def verify(
        self,
        persisted: AgentBase,
        events: Sequence[Any] | None = None,  # noqa: ARG002
    ) -> AgentBase:
        """Verify that we can resume this agent from persisted state.

        We do not merge configuration between persisted and runtime Agent
        instances. Instead, we verify compatibility requirements and then
        continue with the runtime-provided Agent.

        Compatibility requirements:
        - Agent class/type must match.
        - Tools must match exactly (same tool names).

        Tools are part of the system prompt and cannot be changed mid-conversation.
        To use different tools, start a new conversation or use conversation forking
        (see https://github.com/OpenHands/OpenHands/issues/8560).

        All other configuration (LLM, agent_context, condenser, etc.) can be
        freely changed between sessions.

        Args:
            persisted: The agent loaded from persisted state.
            events: Unused, kept for API compatibility.

        Returns:
            This runtime agent (self) if verification passes.

        Raises:
            ValueError: If agent class or tools don't match.
        """
        if persisted.__class__ is not self.__class__:
            raise ValueError(
                "Cannot load from persisted: persisted agent is of type "
                f"{persisted.__class__.__name__}, but self is of type "
                f"{self.__class__.__name__}."
            )

        # Collect explicit tool names
        runtime_names = {tool.name for tool in self.tools}
        persisted_names = {tool.name for tool in persisted.tools}

        # Add builtin tool names from include_default_tools
        # These are runtime names like 'finish', 'think'
        for tool_class_name in self.include_default_tools:
            tool_class = BUILT_IN_TOOL_CLASSES.get(tool_class_name)
            if tool_class is not None:
                runtime_names.add(tool_class.name)

        for tool_class_name in persisted.include_default_tools:
            tool_class = BUILT_IN_TOOL_CLASSES.get(tool_class_name)
            if tool_class is not None:
                persisted_names.add(tool_class.name)

        if runtime_names == persisted_names:
            return self

        # Tools don't match - this is not allowed
        missing_in_runtime = persisted_names - runtime_names
        added_in_runtime = runtime_names - persisted_names

        details: list[str] = []
        if missing_in_runtime:
            details.append(f"removed: {sorted(missing_in_runtime)}")
        if added_in_runtime:
            details.append(f"added: {sorted(added_in_runtime)}")

        raise ValueError(
            f"Cannot resume conversation: tools cannot be changed mid-conversation "
            f"({'; '.join(details)}). "
            f"To use different tools, start a new conversation."
        )

    def model_dump_succint(self, **kwargs):
        """Like model_dump, but excludes None fields by default."""
        if "exclude_none" not in kwargs:
            kwargs["exclude_none"] = True
        dumped = super().model_dump(**kwargs)
        # remove tool schema details for brevity
        if "tools" in dumped and isinstance(dumped["tools"], dict):
            dumped["tools"] = list(dumped["tools"].keys())
        return dumped

    def get_all_llms(self) -> Generator[LLM, None, None]:
        """Recursively yield unique *base-class* LLM objects reachable from `self`.

        - Returns actual object references (not copies).
        - De-dupes by `id(LLM)`.
        - Cycle-safe via a visited set for *all* traversed objects.
        - Only yields objects whose type is exactly `LLM` (no subclasses).
        - Does not handle dataclasses.
        """
        yielded_ids: set[int] = set()
        visited: set[int] = set()

        def _walk(obj: object) -> Iterable[LLM]:
            oid = id(obj)
            # Guard against cycles on anything we might recurse into
            if oid in visited:
                return ()
            visited.add(oid)

            # Traverse LLM based classes and its fields
            # e.g., LLMRouter that is a subclass of LLM
            # yet contains LLM in its fields
            if isinstance(obj, LLM):
                llm_out: list[LLM] = []

                # Yield only the *raw* base-class LLM (exclude subclasses)
                if type(obj) is LLM and oid not in yielded_ids:
                    yielded_ids.add(oid)
                    llm_out.append(obj)

                # Traverse all fields for LLM objects
                for name in type(obj).model_fields:
                    try:
                        val = getattr(obj, name)
                    except Exception:
                        continue
                    llm_out.extend(_walk(val))
                return llm_out

            # Pydantic models: iterate declared fields
            if isinstance(obj, BaseModel):
                model_out: list[LLM] = []
                for name in type(obj).model_fields:
                    try:
                        val = getattr(obj, name)
                    except Exception:
                        continue
                    model_out.extend(_walk(val))
                return model_out

            # Built-in containers
            if isinstance(obj, dict):
                dict_out: list[LLM] = []
                for k, v in obj.items():
                    dict_out.extend(_walk(k))
                    dict_out.extend(_walk(v))
                return dict_out

            if isinstance(obj, (list, tuple, set, frozenset)):
                container_out: list[LLM] = []
                for item in obj:
                    container_out.extend(_walk(item))
                return container_out

            # Unknown object types: nothing to do
            return ()

        # Drive the traversal from self
        yield from _walk(self)

    @property
    def tools_map(self) -> dict[str, ToolDefinition]:
        """Get the initialized tools map.
        Raises:
            RuntimeError: If the agent has not been initialized.
        """
        if not self._initialized:
            raise RuntimeError("Agent not initialized; call _initialize() before use")
        return self._tools

Agent

Agent

代码

核心

  • 感知-思考-安全审查-执行-观察
  • 状态初始化安全防御动作循环记忆压缩
Agent 核心流程

1. 初始化:SystemPrompt与记忆/init_state

  • 系统提示词(行为准则)工具定义(技能说明)作为第一条消息
  • 防篡改检查:扫描前几个事件,确保 SystemPromptEvent 处于起始位置。
  • 状态恢复:如果是恢复,会识别出已有Prompt并跳过初始化直接进入工作状态。

2.消息准备与记忆管理(step前期)

  • 确认处理:优先执行Pending Actions
  • 上下文压缩(如果超长)
    • 调用 prepare_llm_messagescondenser 会介入把对话记录总结成摘要。

3. 核心决策阶段 (step中期)

  • 调用LLM:调用 make_llm_completion
  • 响应解析纯文字(回复用户)或工具调用执行操作)。
  • 异常修复:若返回JSON 格式有问题,fix_malformed_tool_arguments 会尝试自动纠错

4. 动作安全审查

  • 高风险操作有关卡

  • 风险标注 (_extract_security_risk)

    • LLM 标注当前操作风险等级读文件 LOW写文件 MEDIUM执行终端命令 HIGH
  • 安全拦截 (_requires_user_confirmation)

    • 根据 confirmation_policy,如果风险过高,Agent 会强行挂起
    • 状态变为 WAITING_FOR_CONFIRMATION等待人类点击允许
  • 自我批评 (_evaluate_with_critic)

    • 如果配置了 critic 属性,它会审视即将执行的 Action,给出评分和反馈。

5. 执行与反馈阶段 (_execute_action_event)

  • 动作动作获得许可后,Agent开始与环境交互

  • 执行动作调用工具TerminalFileEditor)。

  • 生成观察:工具返回执行结果(如编译错误、文件内容、命令输出)。

  • 更新状态:将 Action 和 Observation(环境反馈)存入对话历史

6.任务终点与循环

  • 继续循环:如果任务未完成,Agent基于新的观察结果开始下一轮思考
  • 任务结束:如果LLM调用 FinishTool,状态变为 FINISHED,并将控制权交还给用户。
流程概览图
  1. 用户输入 init_state (加载背景)
  2. prepare_llm_messages 上下文管理 (必要时压缩)
  3. make_llm_completion LLM 推理 (生成动作)
  4. _extract_security_risk 风险评估 (安全第一)
  5. _requires_user_confirmation 人类干预 (高危拦截)
  6. _execute_action_event 物理执行 (产生结果)
  7. 存入 EventLog 回到步骤 2

第一步:前置检查与“记忆恢复”

python
# 1. 检查是否有用户刚才“待确认”的动作
pending_actions = ConversationState.get_unmatched_actions(state.events)
if pending_actions:
    self._execute_actions(...) # 用户点确认后,立即执行之前的动作
    return

第二步:生成决策(调用 LLM)

python
# 2. 准备消息(这里会触发 condenser 压缩)
_messages_or_condensation = prepare_llm_messages(..., condenser=self.condenser)

# 3. 让 LLM 说话或调用工具
llm_response = make_llm_completion(self.llm, _messages, tools=...)

第三步:处理工具调用与安全拦截

python
state.execution_status = ConversationExecutionStatus.WAITING_FOR_CONFIRMATION
return # 暂停执行,等待用户在 UI 上点“允许”

第四步:执行与观察 (Observation)

如果通过了安全检查,_execute_action_event 会真正驱动 tool 运行,并将结果(Observation)存回对话历史,进入下一个循环。

python
import json

from pydantic import ValidationError, model_validator

import openhands.sdk.security.analyzer as analyzer
import openhands.sdk.security.risk as risk
from openhands.sdk.agent.base import AgentBase
from openhands.sdk.agent.utils import (
    fix_malformed_tool_arguments,
    make_llm_completion,
    prepare_llm_messages,
)
from openhands.sdk.conversation import (
    ConversationCallbackType,
    ConversationState,
    ConversationTokenCallbackType,
    LocalConversation,
)
from openhands.sdk.conversation.state import ConversationExecutionStatus
from openhands.sdk.critic.base import CriticResult
from openhands.sdk.event import (
    ActionEvent,
    AgentErrorEvent,
    LLMConvertibleEvent,
    MessageEvent,
    ObservationEvent,
    SystemPromptEvent,
    TokenEvent,
    UserRejectObservation,
)
from openhands.sdk.event.condenser import (
    Condensation,
    CondensationRequest,
)
from openhands.sdk.llm import (
    LLMResponse,
    Message,
    MessageToolCall,
    ReasoningItemModel,
    RedactedThinkingBlock,
    TextContent,
    ThinkingBlock,
)
from openhands.sdk.llm.exceptions import (
    FunctionCallValidationError,
    LLMContextWindowExceedError,
)
from openhands.sdk.logger import get_logger
from openhands.sdk.observability.laminar import (
    maybe_init_laminar,
    observe,
    should_enable_observability,
)
from openhands.sdk.observability.utils import extract_action_name
from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.sdk.tool import (
    Action,
    Observation,
)
from openhands.sdk.tool.builtins import (
    FinishAction,
    FinishTool,
    ThinkAction,
)


logger = get_logger(__name__)
maybe_init_laminar()

# Maximum number of events to scan during init_state defensive checks.
# SystemPromptEvent must appear within this prefix (at index 0 or 1).
INIT_STATE_PREFIX_SCAN_WINDOW = 3


class Agent(AgentBase):
    """Main agent implementation for OpenHands.

    The Agent class provides the core functionality for running AI agents that can
    interact with tools, process messages, and execute actions. It inherits from
    AgentBase and implements the agent execution logic.

    Example:
        >>> from openhands.sdk import LLM, Agent, Tool
        >>> llm = LLM(model="claude-sonnet-4-20250514", api_key=SecretStr("key"))
        >>> tools = [Tool(name="TerminalTool"), Tool(name="FileEditorTool")]
        >>> agent = Agent(llm=llm, tools=tools)
    """

    @model_validator(mode="before")
    @classmethod
    def _add_security_prompt_as_default(cls, data):
        """Ensure llm_security_analyzer=True is always set before initialization."""
        if not isinstance(data, dict):
            return data

        kwargs = data.get("system_prompt_kwargs") or {}
        if not isinstance(kwargs, dict):
            kwargs = {}

        kwargs.setdefault("llm_security_analyzer", True)
        data["system_prompt_kwargs"] = kwargs
        return data

    def init_state(
        self,
        state: ConversationState,
        on_event: ConversationCallbackType,
    ) -> None:
        """Initialize conversation state.

        Invariants enforced by this method:
        - If a SystemPromptEvent is already present, it must be within the first 3
          events (index 0 or 1 in practice; index 2 is included in the scan window
          to detect a user message appearing before the system prompt).
        - A user MessageEvent should not appear before the SystemPromptEvent.

        These invariants keep event ordering predictable for downstream components
        (condenser, UI, etc.) and also prevent accidentally materializing the full
        event history during initialization.
        """
        super().init_state(state, on_event=on_event)

        # Defensive check: Analyze state to detect unexpected initialization scenarios
        # These checks help diagnose issues related to lazy loading and event ordering
        # See: https://github.com/OpenHands/software-agent-sdk/issues/1785
        #
        # NOTE: len() is O(1) for EventLog (file-backed implementation).
        event_count = len(state.events)

        # NOTE: state.events is intentionally an EventsListBase (Sequence-like), not
        # a plain list. Avoid materializing the full history via list(state.events)
        # here (conversations can reach 30k+ events).
        #
        # Invariant: when init_state is called, SystemPromptEvent (if present) must be
        # at index 0 or 1.
        #
        # Rationale:
        # - Local conversations start empty and init_state is responsible for adding
        #   the SystemPromptEvent as the first event.
        # - Remote conversations may receive an initial ConversationStateUpdateEvent
        #   from the agent-server immediately after subscription. In a typical remote
        #   session prefix you may see:
        #     [ConversationStateUpdateEvent, SystemPromptEvent, MessageEvent, ...]
        #
        # We intentionally only inspect the first few events (cheap for both local and
        # remote) to enforce this invariant.
        prefix_events = state.events[:INIT_STATE_PREFIX_SCAN_WINDOW]

        has_system_prompt = any(isinstance(e, SystemPromptEvent) for e in prefix_events)
        has_user_message = any(
            isinstance(e, MessageEvent) and e.source == "user" for e in prefix_events
        )
        # Log state for debugging initialization order issues
        logger.debug(
            f"init_state called: conversation_id={state.id}, "
            f"event_count={event_count}, "
            f"has_system_prompt={has_system_prompt}, "
            f"has_user_message={has_user_message}"
        )

        if has_system_prompt:
            # Restoring/resuming conversations is normal: a system prompt already
            # present means this conversation was initialized previously.
            logger.debug(
                "init_state: SystemPromptEvent already present; skipping init. "
                f"conversation_id={state.id}, event_count={event_count}."
            )
            return

        # Assert: A user message should never appear before the system prompt.
        #
        # NOTE: This is a best-effort check based on the first few events only.
        # Remote conversations can include a ConversationStateUpdateEvent near the
        # start, so we scan a small prefix window.
        if has_user_message:
            event_types = [type(e).__name__ for e in prefix_events]
            logger.error(
                f"init_state: User message found in prefix before SystemPromptEvent! "
                f"conversation_id={state.id}, prefix_events={event_types}"
            )
            raise AssertionError(
                "Unexpected state: user message exists before SystemPromptEvent. "
                f"conversation_id={state.id}, event_count={event_count}, "
                f"prefix_event_types={event_types}."
            )

        # Prepare system message
        event = SystemPromptEvent(
            source="agent",
            system_prompt=TextContent(text=self.system_message),
            # Tools are stored as ToolDefinition objects and converted to
            # OpenAI format with security_risk parameter during LLM completion.
            # See make_llm_completion() in agent/utils.py for details.
            tools=list(self.tools_map.values()),
        )
        on_event(event)

    def _should_evaluate_with_critic(self, action: Action | None) -> bool:
        """Determine if critic should evaluate based on action type and mode."""
        if self.critic is None:
            return False

        if self.critic.mode == "all_actions":
            return True

        # For "finish_and_message" mode, only evaluate FinishAction
        # (MessageEvent will be handled separately in step())
        if isinstance(action, FinishAction):
            return True

        return False

    def _evaluate_with_critic(
        self, conversation: LocalConversation, event: ActionEvent | MessageEvent
    ) -> CriticResult | None:
        """Run critic evaluation on the current event and history."""
        if self.critic is None:
            return None

        try:
            # Build event history including the current event
            events = list(conversation.state.events) + [event]
            llm_convertible_events = [
                e for e in events if isinstance(e, LLMConvertibleEvent)
            ]

            # Evaluate without git_patch for now
            critic_result = self.critic.evaluate(
                events=llm_convertible_events, git_patch=None
            )
            logger.info(
                f"✓ Critic evaluation: score={critic_result.score:.3f}, "
                f"success={critic_result.success}"
            )
            return critic_result
        except Exception as e:
            logger.error(f"✗ Critic evaluation failed: {e}", exc_info=True)
            return None

    def _execute_actions(
        self,
        conversation: LocalConversation,
        action_events: list[ActionEvent],
        on_event: ConversationCallbackType,
    ):
        for action_event in action_events:
            self._execute_action_event(conversation, action_event, on_event=on_event)

    @observe(name="agent.step", ignore_inputs=["state", "on_event"])
    def step(
        self,
        conversation: LocalConversation,
        on_event: ConversationCallbackType,
        on_token: ConversationTokenCallbackType | None = None,
    ) -> None:
        state = conversation.state
        # Check for pending actions (implicit confirmation)
        # and execute them before sampling new actions.
        pending_actions = ConversationState.get_unmatched_actions(state.events)
        if pending_actions:
            logger.info(
                "Confirmation mode: Executing %d pending action(s)",
                len(pending_actions),
            )
            self._execute_actions(conversation, pending_actions, on_event)
            return

        # Check if the last user message was blocked by a UserPromptSubmit hook
        # If so, skip processing and mark conversation as finished
        for event in reversed(list(state.events)):
            if isinstance(event, MessageEvent) and event.source == "user":
                reason = state.pop_blocked_message(event.id)
                if reason is not None:
                    logger.info(f"User message blocked by hook: {reason}")
                    state.execution_status = ConversationExecutionStatus.FINISHED
                    return
                break  # Only check the most recent user message

        # Prepare LLM messages using the utility function
        _messages_or_condensation = prepare_llm_messages(
            state.events, condenser=self.condenser, llm=self.llm
        )

        # Process condensation event before agent sampels another action
        if isinstance(_messages_or_condensation, Condensation):
            on_event(_messages_or_condensation)
            return

        _messages = _messages_or_condensation

        logger.debug(
            "Sending messages to LLM: "
            f"{json.dumps([m.model_dump() for m in _messages[1:]], indent=2)}"
        )

        try:
            llm_response = make_llm_completion(
                self.llm,
                _messages,
                tools=list(self.tools_map.values()),
                on_token=on_token,
            )
        except FunctionCallValidationError as e:
            logger.warning(f"LLM generated malformed function call: {e}")
            error_message = MessageEvent(
                source="user",
                llm_message=Message(
                    role="user",
                    content=[TextContent(text=str(e))],
                ),
            )
            on_event(error_message)
            return
        except LLMContextWindowExceedError as e:
            # If condenser is available and handles requests, trigger condensation
            if (
                self.condenser is not None
                and self.condenser.handles_condensation_requests()
            ):
                logger.warning(
                    "LLM raised context window exceeded error, triggering condensation"
                )
                on_event(CondensationRequest())
                return
            # No condenser available or doesn't handle requests; log helpful warning
            self._log_context_window_exceeded_warning()
            raise e

        # LLMResponse already contains the converted message and metrics snapshot
        message: Message = llm_response.message

        # Check if this is a reasoning-only response (e.g., from reasoning models)
        # or a message-only response without tool calls
        has_reasoning = (
            message.responses_reasoning_item is not None
            or message.reasoning_content is not None
            or (message.thinking_blocks and len(message.thinking_blocks) > 0)
        )
        has_content = any(
            isinstance(c, TextContent) and c.text.strip() for c in message.content
        )

        if message.tool_calls and len(message.tool_calls) > 0:
            if not all(isinstance(c, TextContent) for c in message.content):
                logger.warning(
                    "LLM returned tool calls but message content is not all "
                    "TextContent - ignoring non-text content"
                )

            # Generate unique batch ID for this LLM response
            thought_content = [c for c in message.content if isinstance(c, TextContent)]

            action_events: list[ActionEvent] = []
            for i, tool_call in enumerate(message.tool_calls):
                action_event = self._get_action_event(
                    tool_call,
                    conversation=conversation,
                    llm_response_id=llm_response.id,
                    on_event=on_event,
                    security_analyzer=state.security_analyzer,
                    thought=thought_content
                    if i == 0
                    else [],  # Only first gets thought
                    # Only first gets reasoning content
                    reasoning_content=message.reasoning_content if i == 0 else None,
                    # Only first gets thinking blocks
                    thinking_blocks=list(message.thinking_blocks) if i == 0 else [],
                    responses_reasoning_item=message.responses_reasoning_item
                    if i == 0
                    else None,
                )
                if action_event is None:
                    continue
                action_events.append(action_event)

            # Handle confirmation mode - exit early if actions need confirmation
            if self._requires_user_confirmation(state, action_events):
                return

            if action_events:
                self._execute_actions(conversation, action_events, on_event)

            # Emit VLLM token ids if enabled before returning
            self._maybe_emit_vllm_tokens(llm_response, on_event)
            return

        # No tool calls - emit message event for reasoning or content responses
        if not has_reasoning and not has_content:
            logger.warning("LLM produced empty response - continuing agent loop")

        msg_event = MessageEvent(
            source="agent",
            llm_message=message,
            llm_response_id=llm_response.id,
        )
        # Run critic evaluation if configured for finish_and_message mode
        if self.critic is not None and self.critic.mode == "finish_and_message":
            critic_result = self._evaluate_with_critic(conversation, msg_event)
            if critic_result is not None:
                # Create new event with critic result
                msg_event = msg_event.model_copy(
                    update={"critic_result": critic_result}
                )
        on_event(msg_event)

        # Emit VLLM token ids if enabled
        self._maybe_emit_vllm_tokens(llm_response, on_event)

        # Finish conversation if LLM produced content (awaits user input)
        # Continue if only reasoning without content (e.g., GPT-5 codex thinking)
        if has_content:
            logger.debug("LLM produced a message response - awaits user input")
            state.execution_status = ConversationExecutionStatus.FINISHED
            return

    def _requires_user_confirmation(
        self, state: ConversationState, action_events: list[ActionEvent]
    ) -> bool:
        """
        Decide whether user confirmation is needed to proceed.

        Rules:
            1. Confirmation mode is enabled
            2. Every action requires confirmation
            3. A single `FinishAction` never requires confirmation
            4. A single `ThinkAction` never requires confirmation
        """
        # A single `FinishAction` or `ThinkAction` never requires confirmation
        if len(action_events) == 1 and isinstance(
            action_events[0].action, (FinishAction, ThinkAction)
        ):
            return False

        # If there are no actions there is nothing to confirm
        if len(action_events) == 0:
            return False

        # If a security analyzer is registered, use it to grab the risks of the actions
        # involved. If not, we'll set the risks to UNKNOWN.
        if state.security_analyzer is not None:
            risks = [
                risk
                for _, risk in state.security_analyzer.analyze_pending_actions(
                    action_events
                )
            ]
        else:
            risks = [risk.SecurityRisk.UNKNOWN] * len(action_events)

        # Grab the confirmation policy from the state and pass in the risks.
        if any(state.confirmation_policy.should_confirm(risk) for risk in risks):
            state.execution_status = (
                ConversationExecutionStatus.WAITING_FOR_CONFIRMATION
            )
            return True

        return False

    def _extract_security_risk(
        self,
        arguments: dict,
        tool_name: str,
        read_only_tool: bool,
        security_analyzer: analyzer.SecurityAnalyzerBase | None = None,
    ) -> risk.SecurityRisk:
        requires_sr = isinstance(security_analyzer, LLMSecurityAnalyzer)
        raw = arguments.pop("security_risk", None)

        # Default risk value for action event
        # Tool is marked as read-only so security risk can be ignored
        if read_only_tool:
            return risk.SecurityRisk.UNKNOWN

        # Raises exception if failed to pass risk field when expected
        # Exception will be sent back to agent as error event
        # Strong models like GPT-5 can correct itself by retrying
        if requires_sr and raw is None:
            raise ValueError(
                f"Failed to provide security_risk field in tool '{tool_name}'"
            )

        # When using weaker models without security analyzer
        # safely ignore missing security risk fields
        if not requires_sr and raw is None:
            return risk.SecurityRisk.UNKNOWN

        # Raises exception if invalid risk enum passed by LLM
        security_risk = risk.SecurityRisk(raw)
        return security_risk

    def _extract_summary(self, tool_name: str, arguments: dict) -> str:
        """Extract and validate the summary field from tool arguments.

        Summary field is always requested but optional - if LLM doesn't provide
        it or provides invalid data, we generate a default summary using the
        tool name and arguments.

        Args:
            tool_name: Name of the tool being called
            arguments: Dictionary of tool arguments from LLM

        Returns:
            The summary string - either from LLM or a default generated one
        """
        summary = arguments.pop("summary", None)

        # If valid summary provided by LLM, use it
        if summary is not None and isinstance(summary, str) and summary.strip():
            return summary

        # Generate default summary: {tool_name}: {arguments}
        args_str = json.dumps(arguments)
        return f"{tool_name}: {args_str}"

    def _get_action_event(
        self,
        tool_call: MessageToolCall,
        conversation: LocalConversation,
        llm_response_id: str,
        on_event: ConversationCallbackType,
        security_analyzer: analyzer.SecurityAnalyzerBase | None = None,
        thought: list[TextContent] | None = None,
        reasoning_content: str | None = None,
        thinking_blocks: list[ThinkingBlock | RedactedThinkingBlock] | None = None,
        responses_reasoning_item: ReasoningItemModel | None = None,
    ) -> ActionEvent | None:
        """Converts a tool call into an ActionEvent, validating arguments.

        NOTE: state will be mutated in-place.
        """
        tool_name = tool_call.name
        tool = self.tools_map.get(tool_name, None)
        # Handle non-existing tools
        if tool is None:
            available = list(self.tools_map.keys())
            err = f"Tool '{tool_name}' not found. Available: {available}"
            logger.error(err)
            # Persist assistant function_call so next turn has matching call_id
            tc_event = ActionEvent(
                source="agent",
                thought=thought or [],
                reasoning_content=reasoning_content,
                thinking_blocks=thinking_blocks or [],
                responses_reasoning_item=responses_reasoning_item,
                tool_call=tool_call,
                tool_name=tool_call.name,
                tool_call_id=tool_call.id,
                llm_response_id=llm_response_id,
                action=None,
            )
            on_event(tc_event)
            event = AgentErrorEvent(
                error=err,
                tool_name=tool_name,
                tool_call_id=tool_call.id,
            )
            on_event(event)
            return

        # Validate arguments
        security_risk: risk.SecurityRisk = risk.SecurityRisk.UNKNOWN
        try:
            arguments = json.loads(tool_call.arguments)

            # Fix malformed arguments (e.g., JSON strings for list/dict fields)
            arguments = fix_malformed_tool_arguments(arguments, tool.action_type)
            security_risk = self._extract_security_risk(
                arguments,
                tool.name,
                tool.annotations.readOnlyHint if tool.annotations else False,
                security_analyzer,
            )
            assert "security_risk" not in arguments, (
                "Unexpected 'security_risk' key found in tool arguments"
            )

            summary = self._extract_summary(tool.name, arguments)

            action: Action = tool.action_from_arguments(arguments)
        except (json.JSONDecodeError, ValidationError, ValueError) as e:
            err = (
                f"Error validating args {tool_call.arguments} for tool "
                f"'{tool.name}': {e}"
            )
            # Persist assistant function_call so next turn has matching call_id
            tc_event = ActionEvent(
                source="agent",
                thought=thought or [],
                reasoning_content=reasoning_content,
                thinking_blocks=thinking_blocks or [],
                responses_reasoning_item=responses_reasoning_item,
                tool_call=tool_call,
                tool_name=tool_call.name,
                tool_call_id=tool_call.id,
                llm_response_id=llm_response_id,
                action=None,
            )
            on_event(tc_event)
            event = AgentErrorEvent(
                error=err,
                tool_name=tool_name,
                tool_call_id=tool_call.id,
            )
            on_event(event)
            return

        # Create initial action event
        action_event = ActionEvent(
            action=action,
            thought=thought or [],
            reasoning_content=reasoning_content,
            thinking_blocks=thinking_blocks or [],
            responses_reasoning_item=responses_reasoning_item,
            tool_name=tool.name,
            tool_call_id=tool_call.id,
            tool_call=tool_call,
            llm_response_id=llm_response_id,
            security_risk=security_risk,
            summary=summary,
        )

        # Run critic evaluation if configured
        if self._should_evaluate_with_critic(action):
            critic_result = self._evaluate_with_critic(conversation, action_event)
            if critic_result is not None:
                # Create new event with critic result
                action_event = action_event.model_copy(
                    update={"critic_result": critic_result}
                )

        on_event(action_event)
        return action_event

    @observe(ignore_inputs=["state", "on_event"])
    def _execute_action_event(
        self,
        conversation: LocalConversation,
        action_event: ActionEvent,
        on_event: ConversationCallbackType,
    ):
        """Execute an action event and update the conversation state.

        It will call the tool's executor and update the state & call callback fn
        with the observation.

        If the action was blocked by a PreToolUse hook (recorded in
        state.blocked_actions), a UserRejectObservation is emitted instead
        of executing the action.
        """
        state = conversation.state

        # Check if this action was blocked by a PreToolUse hook
        reason = state.pop_blocked_action(action_event.id)
        if reason is not None:
            logger.info(f"Action '{action_event.tool_name}' blocked by hook: {reason}")
            rejection = UserRejectObservation(
                action_id=action_event.id,
                tool_name=action_event.tool_name,
                tool_call_id=action_event.tool_call_id,
                rejection_reason=reason,
            )
            on_event(rejection)
            return rejection

        tool = self.tools_map.get(action_event.tool_name, None)
        if tool is None:
            raise RuntimeError(
                f"Tool '{action_event.tool_name}' not found. This should not happen "
                "as it was checked earlier."
            )

        # Execute actions!
        try:
            if should_enable_observability():
                tool_name = extract_action_name(action_event)
                observation: Observation = observe(name=tool_name, span_type="TOOL")(
                    tool
                )(action_event.action, conversation)
            else:
                observation = tool(action_event.action, conversation)
            assert isinstance(observation, Observation), (
                f"Tool '{tool.name}' executor must return an Observation"
            )
        except ValueError as e:
            # Tool execution raised a ValueError (e.g., invalid argument combination)
            # Convert to AgentErrorEvent so the agent can correct itself
            err = f"Error executing tool '{tool.name}': {e}"
            logger.warning(err)
            error_event = AgentErrorEvent(
                error=err,
                tool_name=tool.name,
                tool_call_id=action_event.tool_call.id,
            )
            on_event(error_event)
            return error_event

        obs_event = ObservationEvent(
            observation=observation,
            action_id=action_event.id,
            tool_name=tool.name,
            tool_call_id=action_event.tool_call.id,
        )
        on_event(obs_event)

        # Set conversation state
        if tool.name == FinishTool.name:
            state.execution_status = ConversationExecutionStatus.FINISHED
        return obs_event

    def _maybe_emit_vllm_tokens(
        self, llm_response: LLMResponse, on_event: ConversationCallbackType
    ) -> None:
        if (
            "return_token_ids" in self.llm.litellm_extra_body
        ) and self.llm.litellm_extra_body["return_token_ids"]:
            token_event = TokenEvent(
                source="agent",
                prompt_token_ids=llm_response.raw_response["prompt_token_ids"],
                response_token_ids=llm_response.raw_response["choices"][0][
                    "provider_specific_fields"
                ]["token_ids"],
            )
            on_event(token_event)

    def _log_context_window_exceeded_warning(self) -> None:
        """Log a helpful warning when context window is exceeded without a condenser."""
        if self.condenser is None:
            logger.warning(
                "\n"
                "=" * 80 + "\n"
                "⚠️  CONTEXT WINDOW EXCEEDED ERROR\n"
                "=" * 80 + "\n"
                "\n"
                "The LLM's context window has been exceeded, but no condenser is "
                "configured.\n"
                "\n"
                "Current configuration:\n"
                f"  • Condenser: None\n"
                f"  • LLM Model: {self.llm.model}\n"
                "\n"
                "To prevent this error, configure a condenser to automatically "
                "summarize\n"
                "conversation history when it gets too long.\n"
                "\n"
                "Example configuration:\n"
                "\n"
                "  from openhands.sdk import Agent, LLM\n"
                "  from openhands.sdk.context.condenser import "
                "LLMSummarizingCondenser\n"
                "\n"
                "  agent = Agent(\n"
                "      llm=LLM(model='your-model'),\n"
                "      condenser=LLMSummarizingCondenser(\n"
                "          llm=LLM(model='your-model'),  # Can use same or "
                "cheaper model\n"
                "          max_size=120,  # Maximum events before condensation\n"
                "          keep_first=4   # Number of initial events to preserve\n"
                "      )\n"
                "  )\n"
                "\n"
                "For more information, see: "
                "https://docs.openhands.dev/sdk/guides/context-condenser\n"
                "=" * 80
            )
        else:
            condenser_type = type(self.condenser).__name__
            handles_requests = self.condenser.handles_condensation_requests()
            condenser_config = self.condenser.model_dump(
                exclude={"llm"}, exclude_none=True
            )
            condenser_llm_obj = getattr(self.condenser, "llm", None)
            condenser_llm = (
                condenser_llm_obj.model if condenser_llm_obj is not None else "N/A"
            )

            logger.warning(
                "\n"
                "=" * 80 + "\n"
                "⚠️  CONTEXT WINDOW EXCEEDED ERROR\n"
                "=" * 80 + "\n"
                "\n"
                "The LLM's context window has been exceeded.\n"
                "\n"
                "Current configuration:\n"
                f"  • Condenser Type: {condenser_type}\n"
                f"  • Handles Condensation Requests: {handles_requests}\n"
                f"  • Condenser LLM: {condenser_llm}\n"
                f"  • Agent LLM Model: {self.llm.model}\n"
                f"  • Condenser Config: {json.dumps(condenser_config, indent=4)}\n"
                "\n"
                "Your condenser is configured but does not handle condensation "
                "requests\n"
                "(handles_condensation_requests() returned False).\n"
                "\n"
                "To fix this:\n"
                "  1. Use LLMSummarizingCondenser which handles condensation "
                "requests, OR\n"
                "  2. Implement handles_condensation_requests() in your custom "
                "condenser\n"
                "\n"
                "Example with LLMSummarizingCondenser:\n"
                "\n"
                "  from openhands.sdk.context.condenser import "
                "LLMSummarizingCondenser\n"
                "\n"
                "  agent = Agent(\n"
                "      llm=LLM(model='your-model'),\n"
                "      condenser=LLMSummarizingCondenser(\n"
                "          llm=LLM(model='your-model'),\n"
                "          max_size=120,\n"
                "          keep_first=4\n"
                "      )\n"
                "  )\n"
                "\n"
                "For more information, see: "
                "https://docs.openhands.dev/sdk/guides/context-condenser\n"
                "=" * 80
            )

OpenHands Workspace

基础返回值变量定义

接口返回值

代码地址

Git 相关

  • GitChange:GitChangeStatus (MOVED, ADDED, DELETED, UPDATED)
  • GitDiff

命令执行

  • command:执行的命令;exit_code:退出code
  • stdout:标准输出;stderr:错误输出
  • timeout_occurred:是否超时

文件操作(上传/下载)

  • success:是否成功;Error
  • source_pathdestination_path
python
from enum import Enum
from pathlib import Path

from pydantic import BaseModel


class GitChangeStatus(Enum):
    MOVED = "MOVED"
    ADDED = "ADDED"
    DELETED = "DELETED"
    UPDATED = "UPDATED"


class GitChange(BaseModel):
    status: GitChangeStatus
    path: Path


class GitDiff(BaseModel):
    modified: str | None
    original: str | None
python
"""Pydantic models for workspace operation results and build types."""

from typing import Literal

from pydantic import BaseModel, Field


TargetType = Literal["binary", "binary-minimal", "source", "source-minimal"]
PlatformType = Literal["linux/amd64", "linux/arm64"]


class CommandResult(BaseModel):
    """Result of executing a command in the workspace."""

    command: str = Field(description="The command that was executed")
    exit_code: int = Field(description="Exit code of the command") 
    stdout: str = Field(description="Standard output from the command") 
    stderr: str = Field(description="Standard error from the command")
    timeout_occurred: bool = Field(
        description="Whether the command timed out during execution"
    )


class FileOperationResult(BaseModel):
    """Result of a file upload or download operation."""

    success: bool = Field(description="Whether the operation was successful")
    source_path: str = Field(description="Path to the source file") 
    destination_path: str = Field(description="Path to the destination file") 
    file_size: int | None = Field(
        default=None, description="Size of the file in bytes (if successful)"
    )
    error: str | None = Field(
        default=None, description="Error message (if operation failed)"
    )

BaseWorkspace (基础接口)

BaseWorkerspace 核心能力

代码地址

关键属性

  • working_dir:工作目录

执行命令 execute_command, 最核心

  • 入参:commandcwd (当前工作目录)、timeout

上传下载文件 file_upload & file_download

  • 本地和沙箱环境之间,交换数据

git_changes & git_diff

  • git_changes:告诉工作区 有哪些文件被改动了
  • git_diff:具体的代码差异,Agent需要查看diff确认改动是否符合预期。

休眠和唤醒 pause & resume

  • 比如Docker Pause,冻结CPU,节省资源。

Python上下文管理器协议 __enter__ 和 __exit__

  • 可以使用with workspace 写代码,确保善后工作。
  • 比如Docker,清理临时挂载点。
py
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Annotated, Any

from pydantic import BeforeValidator, Field

from openhands.sdk.git.models import GitChange, GitDiff 
from openhands.sdk.logger import get_logger
from openhands.sdk.utils.models import DiscriminatedUnionMixin
from openhands.sdk.workspace.models import CommandResult, FileOperationResult 


logger = get_logger(__name__)


def _convert_path_to_str(v: str | Path) -> str:
    """Convert Path objects to string for working_dir."""
    if isinstance(v, Path):
        return str(v)
    return v


class BaseWorkspace(DiscriminatedUnionMixin, ABC):
    """Abstract base class for workspace implementations.

    Workspaces provide a sandboxed environment where agents can execute commands,
    read/write files, and perform other operations. All workspace implementations
    support the context manager protocol for safe resource management.

    Example:
        >>> with workspace:
        ...     result = workspace.execute_command("echo 'hello'")
        ...     content = workspace.read_file("example.txt")
    """

    working_dir: Annotated[ 
        str,
        BeforeValidator(_convert_path_to_str),
        Field(
            description=(
                "The working directory for agent operations and tool execution. "
                "Accepts both string paths and Path objects. "
                "Path objects are automatically converted to strings."
            )
        ),
    ]

    def __enter__(self) -> "BaseWorkspace":
        """Enter the workspace context.

        Returns:
            Self for use in with statements
        """
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the workspace context and cleanup resources.

        Default implementation performs no cleanup. Subclasses should override
        to add cleanup logic (e.g., stopping containers, closing connections).

        Args:
            exc_type: Exception type if an exception occurred
            exc_val: Exception value if an exception occurred
            exc_tb: Exception traceback if an exception occurred
        """
        pass

    @abstractmethod
    def execute_command(
        self,
        command: str,
        cwd: str | Path | None = None,
        timeout: float = 30.0,
    ) -> CommandResult:
        """Execute a bash command on the system.

        Args:
            command: The bash command to execute
            cwd: Working directory for the command (optional)
            timeout: Timeout in seconds (defaults to 30.0)

        Returns:
            CommandResult: Result containing stdout, stderr, exit_code, and other
                metadata

        Raises:
            Exception: If command execution fails
        """
        ...

    @abstractmethod
    def file_upload(
        self,
        source_path: str | Path,
        destination_path: str | Path,
    ) -> FileOperationResult:
        """Upload a file to the system.

        Args:
            source_path: Path to the source file
            destination_path: Path where the file should be uploaded

        Returns:
            FileOperationResult: Result containing success status and metadata

        Raises:
            Exception: If file upload fails
        """
        ...

    @abstractmethod
    def file_download(
        self,
        source_path: str | Path,
        destination_path: str | Path,
    ) -> FileOperationResult:
        """Download a file from the system.

        Args:
            source_path: Path to the source file on the system
            destination_path: Path where the file should be downloaded

        Returns:
            FileOperationResult: Result containing success status and metadata

        Raises:
            Exception: If file download fails
        """
        ...

    @abstractmethod
    def git_changes(self, path: str | Path) -> list[GitChange]:
        """Get the git changes for the repository at the path given.

        Args:
            path: Path to the git repository

        Returns:
            list[GitChange]: List of changes

        Raises:
            Exception: If path is not a git repository or getting changes failed
        """

    @abstractmethod
    def git_diff(self, path: str | Path) -> GitDiff:
        """Get the git diff for the file at the path given.

        Args:
            path: Path to the file

        Returns:
            GitDiff: Git diff

        Raises:
            Exception: If path is not a git repository or getting diff failed
        """

    def pause(self) -> None:
        """Pause the workspace to conserve resources.

        For local workspaces, this is a no-op.
        For container-based workspaces, this pauses the container.

        Raises:
            NotImplementedError: If the workspace type does not support pausing.
        """
        raise NotImplementedError(f"{type(self).__name__} does not support pause()")

    def resume(self) -> None:
        """Resume a paused workspace.

        For local workspaces, this is a no-op.
        For container-based workspaces, this resumes the container.

        Raises:
            NotImplementedError: If the workspace type does not support resuming.
        """
        raise NotImplementedError(f"{type(self).__name__} does not support resume()")

RemoteWorkspace

RemoteWorkerspace

代码地址

核心架构

  • BaseWorkspace:提供标准接口定义
  • RemoteWorkspaceMixin真正执行细节,如构造URL处理HTTP负载解析API响应等。
  • RemoteWorkspace:调用者/接口,通过_execute方法驱动Mixin产生的generator

三级传递

  • 第一级SDK (代码)
    • ls等命令打包成HTTP Request发送出去
  • 第二级AgentServer (FastAPI)
    • 接收HTTP请求,解析JSON,调用底层runtime
    • 异步的,启动命令 返回command_id,然后client做状态轮询
  • 第三级Runtime/Sandbox(Docker)
    • 真正在linux容器里执行ls命令,把结果重定向输出给Server

Client-Server Generator 机制 (execute, Start+Poll)

Generator 机制

背景

  • 执行远程操作可能非常耗时。

Client-Server

  • RemoteWorkspace 本质是一个 HTTP Client
  • Serveropenhands-agent-server 接收请求,在它远端容器里执行命令。

Start-Polling 轮询机制

  • 第一步:发送启动命令请求
  • 第二步:命令没完成,Generator要求 轮询Polling State不断询问是否完成
  • 第三步:多次循环后命令结束,Generator产生最终结果

特点

  • 把网络通信和业务逻辑解耦。
RemoteWorkspace 作为Client
  • 根据对应方法,构造对应的Generator调用执行
  • generator获得初始kwargs
  • 循环调用:
    • 根据kwargs发起请求 [httpx.client]获得response
    • response给到[generator]生成新的kwargs
    • 不断重复1和2,直到终止
python
def execute_command(
    self,
    command: str,
    cwd: str | Path | None = None,
    timeout: float = 30.0,
) -> CommandResult:
    """Execute a bash command on the remote system.

    This method starts a bash command via the remote agent server API,
    then polls for the output until the command completes.

    Args:
        command: The bash command to execute
        cwd: Working directory (optional)
        timeout: Timeout in seconds

    Returns:
        CommandResult: Result with stdout, stderr, exit_code, and other metadata
    """
    # 产生generator和调用执行
    generator = self._execute_command_generator(command, cwd, timeout) 
    result = self._execute(generator) 
    return result


def _execute(self, generator: Generator[dict[str, Any], httpx.Response, Any]):
    try:
      	# 1. 问generator:第一个请求怎么发?
        kwargs = next(generator)
        while True:
          	# 2. 跑腿:去服务器发请求,拿回response
            response = self.client.request(**kwargs)
            # 3. 把response给generator,产出下一个请求参数
            kwargs = generator.send(response)
    except StopIteration as e:
        return e.value
RemoteWorkspaceMixin Start-Polling 机制

Start 启动命令

  • 目的:发送命令请求,获得 command_id
  • url:{self.host}/api/bash/start_bash_command
  • json: {"command": command}
  • 类型:post请求
  • 返回:command_id

轮询 状态

  • 目的:根据command_id 轮询命令状态,获得events
  • url: {self.host}/api/bash/bash_events/search
  • json: {"command_id__eq": "command_id", ...}
  • get 请求
  • 返回:search_result多个events
    • event里获取:stdout, stderr, exit_code 等内容。

最终返回结果

  • stdout:拼接多个stdout_parts
  • stderr: 拼接多个stderr_parts
  • exit_code
  • 最终,组成CommandResult对象作为最终返回结果
python
def _execute_command_generator(
    self,
    command: str,
    cwd: str | Path | None,
    timeout: float,
) -> Generator[dict[str, Any], httpx.Response, CommandResult]:
    """Execute a bash command on the remote system.

    This method starts a bash command via the remote agent server API,
    then polls for the output until the command completes.

    Args:
        command: The bash command to execute
        cwd: Working directory (optional)
        timeout: Timeout in seconds

    Returns:
        CommandResult: Result with stdout, stderr, exit_code, and other metadata
    """
    _logger.debug(f"Executing remote command: {command}")

    # Step 1: Start the bash command
    payload = { 
        "command": command, 
        "timeout": int(timeout),
    }
    if cwd is not None:
        payload["cwd"] = str(cwd)

    try:
        # Start the command
        # 1. start,获取command_id
        response: httpx.Response = yield { 
            "method": "POST",
            "url": f"{self.host}/api/bash/start_bash_command",
            "json": payload,
            "headers": self._headers,
            "timeout": timeout + 5.0,  # Add buffer to HTTP timeout
        }
        response.raise_for_status()
        bash_command = response.json()
        command_id = bash_command["id"]

        _logger.debug(f"Started command with ID: {command_id}")

        # Step 2: Poll for output until command completes
        # 2. 根据command_id,轮询状态是否完成
        start_time = time.time()
        stdout_parts = []
        stderr_parts = []
        exit_code = None
        last_order = -1  # Track highest order seen to fetch only new events
        seen_event_ids: set[str] = set()  # Track seen IDs to detect duplicates

        while time.time() - start_time < timeout:
            # Search for new events (order > last_order)
            params: dict[str, str | int] = { 
                "command_id__eq": command_id, 
                "sort_order": "TIMESTAMP", 
                "limit": 100,
                "kind__eq": "BashOutput",
            }
            if last_order >= 0:
                params["order__gt"] = last_order

            response = yield {
                "method": "GET", 
                "url": f"{self.host}/api/bash/bash_events/search",
                "params": params,
                "headers": self._headers,
                "timeout": timeout,
            }
            response.raise_for_status()
            search_result = response.json()

            # Process BashOutput events
            for event in search_result.get("items", []):
                if event.get("kind") == "BashOutput":
                    # Check for duplicates - safety check in case caller
                    # forgets to add kind__eq filter or API has a bug
                    event_id = event.get("id")
                    if event_id is not None:
                        if event_id in seen_event_ids:
                            raise RuntimeError(
                                f"Duplicate event received: {event_id}. "
                                "This should not happen with order__gt "
                                "filtering and kind filtering."
                            )
                        seen_event_ids.add(event_id)

                    # Track the highest order we've seen
                    event_order = event.get("order")
                    if event_order is not None and event_order > last_order:
                        last_order = event_order

                    if event.get("stdout"):
                        stdout_parts.append(event["stdout"])
                    if event.get("stderr"):
                        stderr_parts.append(event["stderr"])
                    if event.get("exit_code") is not None:
                        exit_code = event["exit_code"]

            # If we have an exit code, the command is complete
            if exit_code is not None:
                break

            # Wait a bit before polling again
            time.sleep(0.1)

        # If we timed out waiting for completion
        if exit_code is None:
            _logger.warning(f"Command timed out after {timeout} seconds: {command}")
            exit_code = -1
            stderr_parts.append(f"Command timed out after {timeout} seconds")

        # Combine all output parts
        stdout = "".join(stdout_parts) 
        stderr = "".join(stderr_parts) 

        return CommandResult( 
            command=command,
            exit_code=exit_code,
            stdout=stdout,
            stderr=stderr,
            timeout_occurred=exit_code == -1 and "timed out" in stderr,
        )

    except Exception as e:
        _logger.error(f"Remote command execution failed: {e}")
        return CommandResult(
            command=command,
            exit_code=-1,
            stdout="",
            stderr=f"Remote execution error: {str(e)}",
            timeout_occurred=False,
        )

RemoteWorkspace-源代码

RemoteWorkspace

属性

  • Clienthttpx.client

方法

  • 实现了BaseWorkspace定义接口
  • 同时继承RemoteWorkspaceMixin(generator),拥有API请求细节
  • 新增方法:_execute结合generator 轮询状态
python
class RemoteWorkspace(RemoteWorkspaceMixin, BaseWorkspace):
    """Remote workspace implementation that connects to an OpenHands agent server.

    RemoteWorkspace provides access to a sandboxed environment running on a remote
    OpenHands agent server. This is the recommended approach for production deployments
    as it provides better isolation and security.

    Example:
        >>> workspace = RemoteWorkspace(
        ...     host="https://agent-server.example.com",
        ...     working_dir="/workspace"
        ... )
        >>> with workspace:
        ...     result = workspace.execute_command("ls -la")
        ...     content = workspace.read_file("README.md")
    """

    _client: httpx.Client | None = PrivateAttr(default=None)

    def reset_client(self) -> None:
        """Reset the HTTP client to force re-initialization.

        This is useful when connection parameters (host, api_key) have changed
        and the client needs to be recreated with new values.
        """
        if self._client is not None:
            try:
                self._client.close()
            except Exception:
                pass
        self._client = None

    @property
    def client(self) -> httpx.Client:
        client = self._client
        if client is None:
            # Configure reasonable timeouts for HTTP requests
            # - connect: 10 seconds to establish connection
            # - read: 600 seconds (10 minutes) to read response (for LLM operations)
            # - write: 10 seconds to send request
            # - pool: 10 seconds to get connection from pool
            timeout = httpx.Timeout(
                connect=10.0, read=self.read_timeout, write=10.0, pool=10.0
            )
            client = httpx.Client( 
                base_url=self.host, 
                timeout=timeout, 
                headers=self._headers, 
                limits=httpx.Limits(max_connections=self.max_connections), 
            ) 
            self._client = client 
        return client 

    def _execute(self, generator: Generator[dict[str, Any], httpx.Response, Any]):
        try:
            kwargs = next(generator) 
            while True:
                response = self.client.request(**kwargs) 
                kwargs = generator.send(response) 
        except StopIteration as e:
            return e.value

    def execute_command(
        self,
        command: str,
        cwd: str | Path | None = None,
        timeout: float = 30.0,
    ) -> CommandResult:
        """Execute a bash command on the remote system.

        This method starts a bash command via the remote agent server API,
        then polls for the output until the command completes.

        Args:
            command: The bash command to execute
            cwd: Working directory (optional)
            timeout: Timeout in seconds

        Returns:
            CommandResult: Result with stdout, stderr, exit_code, and other metadata
        """
        generator = self._execute_command_generator(command, cwd, timeout) 
        result = self._execute(generator) 
        return result

    def file_upload(
        self,
        source_path: str | Path,
        destination_path: str | Path,
    ) -> FileOperationResult:
        """Upload a file to the remote system.

        Reads the local file and sends it to the remote system via HTTP API.

        Args:
            source_path: Path to the local source file
            destination_path: Path where the file should be uploaded on remote system

        Returns:
            FileOperationResult: Result with success status and metadata
        """
        generator = self._file_upload_generator(source_path, destination_path)
        result = self._execute(generator)
        return result

    def file_download(
        self,
        source_path: str | Path,
        destination_path: str | Path,
    ) -> FileOperationResult:
        """Download a file from the remote system.

        Requests the file from the remote system via HTTP API and saves it locally.

        Args:
            source_path: Path to the source file on remote system
            destination_path: Path where the file should be saved locally

        Returns:
            FileOperationResult: Result with success status and metadata
        """
        generator = self._file_download_generator(source_path, destination_path)
        result = self._execute(generator)
        return result

    def git_changes(self, path: str | Path) -> list[GitChange]:
        """Get the git changes for the repository at the path given.

        Args:
            path: Path to the git repository

        Returns:
            list[GitChange]: List of changes

        Raises:
            Exception: If path is not a git repository or getting changes failed
        """
        generator = self._git_changes_generator(path)
        result = self._execute(generator)
        return result

    def git_diff(self, path: str | Path) -> GitDiff:
        """Get the git diff for the file at the path given.

        Args:
            path: Path to the file

        Returns:
            GitDiff: Git diff

        Raises:
            Exception: If path is not a git repository or getting diff failed
        """
        generator = self._git_diff_generator(path)
        result = self._execute(generator)
        return result

    @property
    def alive(self) -> bool:
        """Check if the remote workspace is alive by querying the health endpoint.

        Returns:
            True if the health endpoint returns a successful response, False otherwise.
        """
        try:
            health_url = f"{self.host}/health"
            with urlopen(health_url, timeout=5.0) as resp:
                status = getattr(resp, "status", 200)
                return 200 <= status < 300
        except Exception:
            return False

RemoteWorkspaceMixin-源代码

RemoteWorkspaceMixin

属性

  • host, api_key
  • working_dir, read_timeout, max_connections

核心功能

  • generator,实现了各操作各阶段http请求参数
  • 支持执行命令文件上传下载git状态等。
python
class RemoteWorkspaceMixin(BaseModel):
    """Mixin providing remote workspace operations.
    This allows the same code to be used for sync and async."""

    host: str = Field(description="The remote host URL for the workspace.")
    api_key: str | None = Field(
        default=None, description="API key for authenticating with the remote host."
    )
    working_dir: str = Field(
        description="The working directory for agent operations and tool execution."
    )
    read_timeout: float = Field(
        default=600.0,
        description="Timeout in seconds for reading operations of httpx.Client.",
    )
    max_connections: int | None = Field(
        default=None,
        description="Maximum number of connections for httpx.Client. "
        "None means no limit, useful for running many conversations in parallel.",
    )

    def model_post_init(self, context: Any) -> None:
        # Set up remote host
        self.host = self.host.rstrip("/")
        return super().model_post_init(context)

    @property
    def _headers(self):
        headers = {}
        if self.api_key:
            headers["X-Session-API-Key"] = self.api_key
        return headers

    def _execute_command_generator(
        self,
        command: str,
        cwd: str | Path | None,
        timeout: float,
    ) -> Generator[dict[str, Any], httpx.Response, CommandResult]:
        """Execute a bash command on the remote system.

        This method starts a bash command via the remote agent server API,
        then polls for the output until the command completes.

        Args:
            command: The bash command to execute
            cwd: Working directory (optional)
            timeout: Timeout in seconds

        Returns:
            CommandResult: Result with stdout, stderr, exit_code, and other metadata
        """
        _logger.debug(f"Executing remote command: {command}")

        # Step 1: Start the bash command
        payload = {
            "command": command,
            "timeout": int(timeout),
        }
        if cwd is not None:
            payload["cwd"] = str(cwd)

        try:
            # Start the command
            response: httpx.Response = yield {
                "method": "POST",
                "url": f"{self.host}/api/bash/start_bash_command",
                "json": payload,
                "headers": self._headers,
                "timeout": timeout + 5.0,  # Add buffer to HTTP timeout
            }
            response.raise_for_status()
            bash_command = response.json()
            command_id = bash_command["id"]

            _logger.debug(f"Started command with ID: {command_id}")

            # Step 2: Poll for output until command completes
            start_time = time.time()
            stdout_parts = []
            stderr_parts = []
            exit_code = None
            last_order = -1  # Track highest order seen to fetch only new events
            seen_event_ids: set[str] = set()  # Track seen IDs to detect duplicates

            while time.time() - start_time < timeout:
                # Search for new events (order > last_order)
                params: dict[str, str | int] = {
                    "command_id__eq": command_id,
                    "sort_order": "TIMESTAMP",
                    "limit": 100,
                    "kind__eq": "BashOutput",
                }
                if last_order >= 0:
                    params["order__gt"] = last_order

                response = yield {
                    "method": "GET",
                    "url": f"{self.host}/api/bash/bash_events/search",
                    "params": params,
                    "headers": self._headers,
                    "timeout": timeout,
                }
                response.raise_for_status()
                search_result = response.json()

                # Process BashOutput events
                for event in search_result.get("items", []):
                    if event.get("kind") == "BashOutput":
                        # Check for duplicates - safety check in case caller
                        # forgets to add kind__eq filter or API has a bug
                        event_id = event.get("id")
                        if event_id is not None:
                            if event_id in seen_event_ids:
                                raise RuntimeError(
                                    f"Duplicate event received: {event_id}. "
                                    "This should not happen with order__gt "
                                    "filtering and kind filtering."
                                )
                            seen_event_ids.add(event_id)

                        # Track the highest order we've seen
                        event_order = event.get("order")
                        if event_order is not None and event_order > last_order:
                            last_order = event_order

                        if event.get("stdout"):
                            stdout_parts.append(event["stdout"])
                        if event.get("stderr"):
                            stderr_parts.append(event["stderr"])
                        if event.get("exit_code") is not None:
                            exit_code = event["exit_code"]

                # If we have an exit code, the command is complete
                if exit_code is not None:
                    break

                # Wait a bit before polling again
                time.sleep(0.1)

            # If we timed out waiting for completion
            if exit_code is None:
                _logger.warning(f"Command timed out after {timeout} seconds: {command}")
                exit_code = -1
                stderr_parts.append(f"Command timed out after {timeout} seconds")

            # Combine all output parts
            stdout = "".join(stdout_parts)
            stderr = "".join(stderr_parts)

            return CommandResult(
                command=command,
                exit_code=exit_code,
                stdout=stdout,
                stderr=stderr,
                timeout_occurred=exit_code == -1 and "timed out" in stderr,
            )

        except Exception as e:
            _logger.error(f"Remote command execution failed: {e}")
            return CommandResult(
                command=command,
                exit_code=-1,
                stdout="",
                stderr=f"Remote execution error: {str(e)}",
                timeout_occurred=False,
            )

    def _file_upload_generator(
        self,
        source_path: str | Path,
        destination_path: str | Path,
    ) -> Generator[dict[str, Any], httpx.Response, FileOperationResult]:
        """Upload a file to the remote system.

        Reads the local file and sends it to the remote system via HTTP API.

        Args:
            source_path: Path to the local source file
            destination_path: Path where the file should be uploaded on remote system

        Returns:
            FileOperationResult: Result with success status and metadata
        """
        source = Path(source_path)
        destination = Path(destination_path)

        _logger.debug(f"Remote file upload: {source} -> {destination}")

        try:
            # Read the file content
            with open(source, "rb") as f:
                file_content = f.read()

            # Prepare the upload
            files = {"file": (source.name, file_content)}
            data = {"destination_path": str(destination)}

            # Make HTTP call
            response: httpx.Response = yield {
                "method": "POST",
                "url": f"{self.host}/api/file/upload/{destination}",
                "files": files,
                "data": data,
                "headers": self._headers,
                "timeout": 60.0,
            }
            response.raise_for_status()
            result_data = response.json()

            # Convert the API response to our model
            return FileOperationResult(
                success=result_data.get("success", True),
                source_path=str(source),
                destination_path=str(destination),
                file_size=result_data.get("file_size"),
                error=result_data.get("error"),
            )

        except Exception as e:
            _logger.error(f"Remote file upload failed: {e}")
            return FileOperationResult(
                success=False,
                source_path=str(source),
                destination_path=str(destination),
                error=str(e),
            )

    def _file_download_generator(
        self,
        source_path: str | Path,
        destination_path: str | Path,
    ) -> Generator[dict[str, Any], httpx.Response, FileOperationResult]:
        """Download a file from the remote system.

        Requests the file from the remote system via HTTP API and saves it locally.

        Args:
            source_path: Path to the source file on remote system
            destination_path: Path where the file should be saved locally

        Returns:
            FileOperationResult: Result with success status and metadata
        """
        source = Path(source_path)
        destination = Path(destination_path)

        _logger.debug(f"Remote file download: {source} -> {destination}")

        try:
            # Construct URL with path parameter (not query parameter)
            # Double slash ensures FastAPI extracts path with leading slash
            # for absolute path validation
            source_str = str(source)
            url = f"/api/file/download//{source_str.lstrip('/')}"

            # Make HTTP call
            response = yield {
                "method": "GET",
                "url": url,
                "headers": self._headers,
                "timeout": 60.0,
            }
            response.raise_for_status()

            # Ensure destination directory exists
            destination.parent.mkdir(parents=True, exist_ok=True)

            # Write the file content
            with open(destination, "wb") as f:
                f.write(response.content)

            return FileOperationResult(
                success=True,
                source_path=str(source),
                destination_path=str(destination),
                file_size=len(response.content),
            )

        except Exception as e:
            _logger.error(f"Remote file download failed: {e}")
            return FileOperationResult(
                success=False,
                source_path=str(source),
                destination_path=str(destination),
                error=str(e),
            )

    def _git_changes_generator(
        self,
        path: str | Path,
    ) -> Generator[dict[str, Any], httpx.Response, list[GitChange]]:
        """Get the git changes for the repository at the path given.

        Args:
            path: Path to the git repository

        Returns:
            list[GitChange]: List of changes

        Raises:
            Exception: If path is not a git repository or getting changes failed
        """
        # Make HTTP call
        response = yield {
            "method": "GET",
            "url": Path("/api/git/changes") / self.working_dir / path,
            "headers": self._headers,
            "timeout": 60.0,
        }
        response.raise_for_status()
        type_adapter = TypeAdapter(list[GitChange])
        changes = type_adapter.validate_python(response.json())
        return changes

    def _git_diff_generator(
        self,
        path: str | Path,
    ) -> Generator[dict[str, Any], httpx.Response, GitDiff]:
        """Get the git diff for the file at the path given.

        Args:
            path: Path to the file

        Returns:
            GitDiff: Git diff

        Raises:
            Exception: If path is not a git repository or getting diff failed
        """
        # Make HTTP call
        response = yield {
            "method": "GET",
            "url": Path("/api/git/diff") / self.working_dir / path,
            "headers": self._headers,
            "timeout": 60.0,
        }
        response.raise_for_status()
        diff = GitDiff.model_validate(response.json())
        return diff

DockerWorkspace

核心功能

DockerWorkspace

代码地址

核心功能

  • 构建docker容器,运行之前预定义好的 openhands agent image,也只能运行这个镜像。
  • 同时继承自RemoteWorkspace,通过HTTP API 提供remote操作

关键属性

  • server_image:哪一个镜像,也就是openhands-agent-server的镜像
  • host_port:端口。
  • _image_name, _container_id
  • working_dir, forward_env:工作目录,
  • volumes & mount_dir:挂载,宿主机和容器做文件同步

关键方法

  • model_post_init:Pydantic V2当属性填充完成以后,自动会运行此方法
  • _start_container启动容器
  • cleanup:关闭容器

使用demo,执行完成以后,会自动调用cleanup方法

python
with DockerWorkspace(
      server_image="ghcr.io/openhands/agent-server:latest"
  ) as workspace:
  result = workspace.execute_command("ls -la")

Start Contanier 启动容器

python
def _start_container(self, image: str, context: Any) -> None:
    """Start the Docker container with the given image.

    This method handles all container lifecycle: port allocation, Docker
    validation, container creation, health checks, and RemoteWorkspace
    initialization.

    Args:
        image: The Docker image tag to use.
        context: The Pydantic context from model_post_init.
    """
    # Store the image name for cleanup
    self._image_name = image

    # Determine port
    if self.host_port is None:
        self.host_port = find_available_tcp_port() 
    else:
        self.host_port = int(self.host_port)

    if not check_port_available(self.host_port):
        raise RuntimeError(f"Port {self.host_port} is not available")

    if self.extra_ports:
        if not check_port_available(self.host_port + 1):
            raise RuntimeError(
                f"Port {self.host_port + 1} is not available for VSCode"
            )
        if not check_port_available(self.host_port + 2):
            raise RuntimeError(
                f"Port {self.host_port + 2} is not available for VNC"
            )

    # 确定保证docker 可用
    docker_ver = execute_command(["docker", "version"]).returncode 
    if docker_ver != 0:
        raise RuntimeError(
            "Docker is not available. Please install and start "
            "Docker Desktop/daemon."
        )

    # Prepare Docker run flags
    flags: list[str] = []
    for key in self.forward_env:
        if key in os.environ:
            flags += ["-e", f"{key}={os.environ[key]}"]

    for volume in self.volumes:
        flags += ["-v", volume]
        logger.info(f"Adding volume mount: {volume}")

    ports = ["-p", f"{self.host_port}:8000"]
    if self.extra_ports:
        ports += [
            "-p",
            f"{self.host_port + 1}:8001",  # VSCode
            "-p",
            f"{self.host_port + 2}:8002",  # Desktop VNC
        ]
    flags += ports

    # Add GPU support if enabled
    if self.enable_gpu:
        flags += ["--gpus", "all"]

    # Docker运行命令,根据image启动一个容器
    run_cmd = [
        "docker",
        "run",
        "-d",
        "--platform",
        self.platform,
        "--rm",
        "--ulimit",
        "nofile=65536:65536",  # prevent "too many open files" errors
        "--name",
        f"agent-server-{uuid.uuid4()}", 
        *flags,
        image,
        "--host",
        "0.0.0.0",
        "--port",
        "8000",
    ]
    proc = execute_command(run_cmd) 
    if proc.returncode != 0:
        raise RuntimeError(f"Failed to run docker container: {proc.stderr}")
		
    # 自动获取容器id
    self._container_id = proc.stdout.strip() 
    logger.info(f"Started container: {self._container_id}")

    # Optionally stream logs in background
    if self.detach_logs:
        self._logs_thread = threading.Thread(
            target=self._stream_docker_logs, daemon=True
        )
        self._logs_thread.start()

    # Set host for RemoteWorkspace to use
    # The container exposes port 8000, mapped to self.host_port
    # Override parent's host initialization
    # 动态重写host
    object.__setattr__(self, "host", f"http://localhost:{self.host_port}")
    # 无需API_KEY
    object.__setattr__(self, "api_key", None)

    # 可靠性保障, 确保FastAPI等服务初始化完成 Wait for container to be healthy
    self._wait_for_health()
    logger.info(f"Docker workspace is ready at {self.host}")

    # Now initialize the parent RemoteWorkspace with the container URL
    super().model_post_init(context)

wait_for_health

python
def _wait_for_health(self, timeout: float = 120.0) -> None:
    """Wait for the Docker container to become healthy."""
    start = time.time()
    health_url = f"http://127.0.0.1:{self.host_port}/health"

    while time.time() - start < timeout:
        try:
            with urlopen(health_url, timeout=1.0) as resp:
                if 200 <= getattr(resp, "status", 200) < 300:
                    return
        except Exception:
            pass

        # Check if container is still running
        if self._container_id:
            ps = execute_command(
                [
                    "docker",
                    "inspect",
                    "-f",
                    "{{.State.Running}}",
                    self._container_id,
                ]
            )
            if ps.stdout.strip() != "true":
                logs = execute_command(["docker", "logs", self._container_id])
                msg = (
                    "Container stopped unexpectedly. Logs:\n"
                    f"{logs.stdout}\n{logs.stderr}"
                )
                raise RuntimeError(msg)
        time.sleep(1)
  raise RuntimeError("Container failed to become healthy in time")

关闭容器

python
def __exit__(self, exc_type, exc_val, exc_tb) -> None:  # type: ignore[no-untyped-def]
    """Context manager exit - cleans up the Docker container."""
    self.cleanup()

def __del__(self) -> None:
    """Clean up the Docker container when the workspace is destroyed."""
    self.cleanup()

def cleanup(self) -> None:
    """Stop and remove the Docker container."""
    if self._container_id:
        # Stop logs streaming
        self._stop_logs.set()
        if self._logs_thread and self._logs_thread.is_alive():
            self._logs_thread.join(timeout=2)

        # Stop and remove the container
        logger.info(f"Stopping container: {self._container_id}")
        execute_command(["docker", "stop", self._container_id])
        self._container_id = None

    # Optionally delete the Docker image
    if self.cleanup_image and self._image_name:
        logger.info(f"Deleting Docker image: {self._image_name}")
        result = execute_command(["docker", "rmi", "-f", self._image_name]) 
        if result.returncode == 0:
            logger.info(f"Successfully deleted image: {self._image_name}")
        else:
            logger.warning(
                f"Failed to delete image {self._image_name}: {result.stderr}"
            )
        self._image_name = None

def pause(self) -> None:
    """Pause the Docker container to conserve resources.

    Uses `docker pause` to freeze all processes in the container without
    stopping it. The container can be resumed later with `resume()`.

    Raises:
        RuntimeError: If the container is not running or pause fails.
    """
    if not self._container_id:
        raise RuntimeError("Cannot pause: container is not running")

    logger.info(f"Pausing container: {self._container_id}")
    result = execute_command(["docker", "pause", self._container_id]) 
    if result.returncode != 0:
        raise RuntimeError(f"Failed to pause container: {result.stderr}")
    logger.info(f"Container paused: {self._container_id}")

def resume(self) -> None:
    """Resume a paused Docker container.

    Uses `docker unpause` to resume all processes in the container.

    Raises:
        RuntimeError: If the container is not running or resume fails.
    """
    if not self._container_id:
        raise RuntimeError("Cannot resume: container is not running")

    logger.info(f"Resuming container: {self._container_id}")
    result = execute_command(["docker", "unpause", self._container_id])
    if result.returncode != 0:
        raise RuntimeError(f"Failed to resume container: {result.stderr}")

    # Wait for health after resuming (use same timeout as initial startup)
    self._wait_for_health(timeout=120.0)
    logger.info(f"Container resumed: {self._container_id}")

宿主机执行命令(非容器内部)

宿主机执行命令

宿主机执行命令

  • utils.command.execute_command
  • 执行者:OpenHands 框架本身
  • 调用Python的subprocess模块启动管理Docker容器

容器内部执行命令

  • workspace.execute_command
  • 执行者:容器内部
  • 通过HTTP发给容器里的Server,让Agent去改代码跑测试
python
def execute_command(
    cmd: list[str] | str,
    env: dict[str, str] | None = None,
    cwd: str | None = None,
    timeout: float | None = None,
    print_output: bool = True,
) -> subprocess.CompletedProcess:
    # For string commands, use shell=True to handle shell operators properly
    # 字符串,就直接使用shell;如果是list,则直接传给内核
    if isinstance(cmd, str):
        cmd_to_run = cmd
        use_shell = True  # 字符串模式:交给 Shell 拼接和解析
        logger.info("$ %s", cmd)
    else:
        cmd_to_run = cmd
        use_shell = False # 列表模式:直接传给内核,不走 Shell
        # 注意这里!
        logger.info("$ %s", " ".join(shlex.quote(c) for c in cmd))

    proc = subprocess.Popen(
        cmd_to_run,
        cwd=cwd,
        env=sanitized_env(env),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        bufsize=1,
        shell=use_shell,
    )
    if proc is None:
        raise RuntimeError("Failed to start process")

    # Read line by line, echo to parent stdout/stderr
    stdout_lines: list[str] = []
    stderr_lines: list[str] = []
    if proc.stdout is None or proc.stderr is None:
        raise RuntimeError("Failed to capture stdout/stderr")

    def read_stream(stream, lines, output_stream):
        try:
            for line in stream:
                if print_output:
                    output_stream.write(line)
                    output_stream.flush()
                lines.append(line)
        except Exception as e:
            logger.error(f"Failed to read stream: {e}")

    # Read stdout and stderr concurrently to avoid deadlock
    stdout_thread = threading.Thread(
        target=read_stream, args=(proc.stdout, stdout_lines, sys.stdout)
    )
    stderr_thread = threading.Thread(
        target=read_stream, args=(proc.stderr, stderr_lines, sys.stderr)
    )

    stdout_thread.start()
    stderr_thread.start()

    try:
        proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        proc.kill()
        stdout_thread.join()
        stderr_thread.join()
        return subprocess.CompletedProcess(
            cmd_to_run,
            -1,  # Indicate timeout with -1 exit code
            "".join(stdout_lines),
            "".join(stderr_lines),
        )

    stdout_thread.join(timeout=timeout)
    stderr_thread.join(timeout=timeout)

    return subprocess.CompletedProcess(
        cmd_to_run,
        proc.returncode,
        "".join(stdout_lines),
        "".join(stderr_lines),
    )

源代码

python
class DockerWorkspace(RemoteWorkspace):
    """Remote workspace that sets up and manages a Docker container.

    This workspace creates a Docker container running a pre-built OpenHands agent
    server image, waits for it to become healthy, and then provides remote workspace
    operations through the container's HTTP API.

    Note: This class only works with pre-built images. To build images on-the-fly
    from a base image, use DockerDevWorkspace instead.

    Example:
        with DockerWorkspace(
            server_image="ghcr.io/openhands/agent-server:latest"
        ) as workspace:
            result = workspace.execute_command("ls -la")
    """

    # Override parent fields with defaults
    working_dir: str = Field(
        default="/workspace",
        description="Working directory inside the container.",
    )
    host: str = Field(
        default="",
        description=("Remote host URL (set automatically during container startup)."),
    )

    # Docker-specific configuration
    server_image: str | None = Field(
        default="ghcr.io/openhands/agent-server:latest-python",
        description="Pre-built agent server image to use.",
    )
    host_port: int | None = Field(
        default=None,
        description="Port to bind the container to. If None, finds available port.",
    )
    forward_env: list[str] = Field(
        default_factory=lambda: ["DEBUG"],
        description="Environment variables to forward to the container.",
    )
    mount_dir: str | None = Field(
        default=None,
        description="Optional host directory to mount into the container.",
    )
    volumes: list[str] = Field(
        default_factory=list,
        description="Additional volume mounts for the Docker container.",
    )
    detach_logs: bool = Field(
        default=True, description="Whether to stream Docker logs in background."
    )
    platform: PlatformType = Field(
        default="linux/amd64", description="Platform for the Docker image."
    )
    extra_ports: bool = Field(
        default=False,
        description="Whether to expose additional ports (VSCode, VNC).",
    )
    enable_gpu: bool = Field(
        default=False,
        description="Whether to enable GPU support with --gpus all.",
    )
    cleanup_image: bool = Field(
        default=False,
        description="Whether to delete the Docker image when cleaning up workspace.",
    )

    _container_id: str | None = PrivateAttr(default=None)
    _image_name: str | None = PrivateAttr(default=None)
    _logs_thread: threading.Thread | None = PrivateAttr(default=None)
    _stop_logs: threading.Event = PrivateAttr(default_factory=threading.Event)

    @model_validator(mode="after")
    def _validate_server_image(self):
        """Ensure server_image is set when using DockerWorkspace directly."""
        if self.__class__ is DockerWorkspace and self.server_image is None:
            raise ValueError("server_image must be provided")
        return self

    @model_validator(mode="after")
    def _validate_mount_dir(self):
        if self.mount_dir:
            warn_deprecated(
                "DockerWorkspace.mount_dir",
                deprecated_in="1.10.0",
                removed_in=None,
                details="Use DockerWorkspace.volumes instead",
            )
            self.volumes.append(f"{self.mount_dir}:/workspace")
        return self

    def model_post_init(self, context: Any) -> None:
        """Set up the Docker container and initialize the remote workspace."""
        # Subclasses should call get_image() to get the image to use
        # This allows them to build or prepare the image before container startup
        image = self.get_image()
        self._start_container(image, context)

    def get_image(self) -> str:
        """Get the Docker image to use for the container.

        Subclasses can override this to provide custom image resolution logic
        (e.g., building images on-the-fly).

        Returns:
            The Docker image tag to use.
        """
        if self.server_image is None:
            raise ValueError("server_image must be set")
        return self.server_image

    def _start_container(self, image: str, context: Any) -> None:
        """Start the Docker container with the given image.

        This method handles all container lifecycle: port allocation, Docker
        validation, container creation, health checks, and RemoteWorkspace
        initialization.

        Args:
            image: The Docker image tag to use.
            context: The Pydantic context from model_post_init.
        """
        # Store the image name for cleanup
        self._image_name = image

        # Determine port
        if self.host_port is None:
            self.host_port = find_available_tcp_port()
        else:
            self.host_port = int(self.host_port)

        if not check_port_available(self.host_port):
            raise RuntimeError(f"Port {self.host_port} is not available")

        if self.extra_ports:
            if not check_port_available(self.host_port + 1):
                raise RuntimeError(
                    f"Port {self.host_port + 1} is not available for VSCode"
                )
            if not check_port_available(self.host_port + 2):
                raise RuntimeError(
                    f"Port {self.host_port + 2} is not available for VNC"
                )

        # Ensure docker is available
        docker_ver = execute_command(["docker", "version"]).returncode
        if docker_ver != 0:
            raise RuntimeError(
                "Docker is not available. Please install and start "
                "Docker Desktop/daemon."
            )

        # Prepare Docker run flags
        flags: list[str] = []
        for key in self.forward_env:
            if key in os.environ:
                flags += ["-e", f"{key}={os.environ[key]}"]

        for volume in self.volumes:
            flags += ["-v", volume]
            logger.info(f"Adding volume mount: {volume}")

        ports = ["-p", f"{self.host_port}:8000"]
        if self.extra_ports:
            ports += [
                "-p",
                f"{self.host_port + 1}:8001",  # VSCode
                "-p",
                f"{self.host_port + 2}:8002",  # Desktop VNC
            ]
        flags += ports

        # Add GPU support if enabled
        if self.enable_gpu:
            flags += ["--gpus", "all"]

        # Run container
        run_cmd = [
            "docker",
            "run",
            "-d",
            "--platform",
            self.platform,
            "--rm",
            "--ulimit",
            "nofile=65536:65536",  # prevent "too many open files" errors
            "--name",
            f"agent-server-{uuid.uuid4()}",
            *flags,
            image,
            "--host",
            "0.0.0.0",
            "--port",
            "8000",
        ]
        proc = execute_command(run_cmd)
        if proc.returncode != 0:
            raise RuntimeError(f"Failed to run docker container: {proc.stderr}")

        self._container_id = proc.stdout.strip()
        logger.info(f"Started container: {self._container_id}")

        # Optionally stream logs in background
        if self.detach_logs:
            self._logs_thread = threading.Thread(
                target=self._stream_docker_logs, daemon=True
            )
            self._logs_thread.start()

        # Set host for RemoteWorkspace to use
        # The container exposes port 8000, mapped to self.host_port
        # Override parent's host initialization
        object.__setattr__(self, "host", f"http://localhost:{self.host_port}")
        object.__setattr__(self, "api_key", None)

        # Wait for container to be healthy
        self._wait_for_health()
        logger.info(f"Docker workspace is ready at {self.host}")

        # Now initialize the parent RemoteWorkspace with the container URL
        super().model_post_init(context)

    def _stream_docker_logs(self) -> None:
        """Stream Docker logs to stdout in the background."""
        if not self._container_id:
            return
        try:
            p = subprocess.Popen(
                ["docker", "logs", "-f", self._container_id],
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
            )
            if p.stdout is None:
                return
            for line in iter(p.stdout.readline, ""):
                if self._stop_logs.is_set():
                    break
                if line:
                    sys.stdout.write(f"[DOCKER] {line}")
                    sys.stdout.flush()
        except Exception as e:
            sys.stderr.write(f"Error streaming docker logs: {e}\n")
        finally:
            try:
                self._stop_logs.set()
            except Exception:
                pass

    def _wait_for_health(self, timeout: float = 120.0) -> None:
        """Wait for the Docker container to become healthy."""
        start = time.time()
        health_url = f"http://127.0.0.1:{self.host_port}/health"

        while time.time() - start < timeout:
            try:
                with urlopen(health_url, timeout=1.0) as resp:
                    if 200 <= getattr(resp, "status", 200) < 300:
                        return
            except Exception:
                pass

            # Check if container is still running
            if self._container_id:
                ps = execute_command(
                    [
                        "docker",
                        "inspect",
                        "-f",
                        "{{.State.Running}}",
                        self._container_id,
                    ]
                )
                if ps.stdout.strip() != "true":
                    logs = execute_command(["docker", "logs", self._container_id])
                    msg = (
                        "Container stopped unexpectedly. Logs:\n"
                        f"{logs.stdout}\n{logs.stderr}"
                    )
                    raise RuntimeError(msg)
            time.sleep(1)
        raise RuntimeError("Container failed to become healthy in time")

    def __enter__(self) -> "DockerWorkspace":
        """Context manager entry - returns the workspace itself."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:  # type: ignore[no-untyped-def]
        """Context manager exit - cleans up the Docker container."""
        self.cleanup()

    def __del__(self) -> None:
        """Clean up the Docker container when the workspace is destroyed."""
        self.cleanup()

    def cleanup(self) -> None:
        """Stop and remove the Docker container."""
        if self._container_id:
            # Stop logs streaming
            self._stop_logs.set()
            if self._logs_thread and self._logs_thread.is_alive():
                self._logs_thread.join(timeout=2)

            # Stop and remove the container
            logger.info(f"Stopping container: {self._container_id}")
            execute_command(["docker", "stop", self._container_id])
            self._container_id = None

        # Optionally delete the Docker image
        if self.cleanup_image and self._image_name:
            logger.info(f"Deleting Docker image: {self._image_name}")
            result = execute_command(["docker", "rmi", "-f", self._image_name])
            if result.returncode == 0:
                logger.info(f"Successfully deleted image: {self._image_name}")
            else:
                logger.warning(
                    f"Failed to delete image {self._image_name}: {result.stderr}"
                )
            self._image_name = None

    def pause(self) -> None:
        """Pause the Docker container to conserve resources.

        Uses `docker pause` to freeze all processes in the container without
        stopping it. The container can be resumed later with `resume()`.

        Raises:
            RuntimeError: If the container is not running or pause fails.
        """
        if not self._container_id:
            raise RuntimeError("Cannot pause: container is not running")

        logger.info(f"Pausing container: {self._container_id}")
        result = execute_command(["docker", "pause", self._container_id])
        if result.returncode != 0:
            raise RuntimeError(f"Failed to pause container: {result.stderr}")
        logger.info(f"Container paused: {self._container_id}")

    def resume(self) -> None:
        """Resume a paused Docker container.

        Uses `docker unpause` to resume all processes in the container.

        Raises:
            RuntimeError: If the container is not running or resume fails.
        """
        if not self._container_id:
            raise RuntimeError("Cannot resume: container is not running")

        logger.info(f"Resuming container: {self._container_id}")
        result = execute_command(["docker", "unpause", self._container_id])
        if result.returncode != 0:
            raise RuntimeError(f"Failed to resume container: {result.stderr}")

        # Wait for health after resuming (use same timeout as initial startup)
        self._wait_for_health(timeout=120.0)
        logger.info(f"Container resumed: {self._container_id}")

Openhands SWE 早期笔记 (旧版本代码,地址已移动)

SWE样例见:SWE数据样例

评测主流程-process_instance

参考链接
python
def process_instance(
    instance: pd.Series,
    metadata: EvalMetadata,
    reset_logger: bool = True,
    runtime_failure_count: int = 0,
) -> EvalOutput:
    config = get_config(instance, metadata)

    metadata = copy.deepcopy(metadata)
    metadata.details['runtime_failure_count'] = runtime_failure_count
    metadata.details['remote_runtime_resource_factor'] = (
        config.sandbox.remote_runtime_resource_factor
    )
		
    # 拉取构建
    runtime = create_runtime(config) 
    call_async_from_sync(runtime.connect)

    try:
      	# 初始化,获取prompt
        initialize_runtime(runtime, instance, metadata)
        message_action = get_instruction(instance, metadata)

        # Here's how you can run the agent (similar to the `main` function) and get the final task state
        state: State | None = asyncio.run(
            run_controller(
                config=config,
                initial_user_action=message_action,
                runtime=runtime,
                fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN[
                    metadata.agent_class
                ],
            )
        )

        # if fatal error, throw EvalError to trigger re-run
        if is_fatal_evaluation_error(state.last_error):
            raise EvalException('Fatal error detected: ' + state.last_error)

        # ======= THIS IS SWE-Bench specific =======
        # Get git patch
        if DATASET_TYPE == 'SWE-bench-Live':
            from evaluation.benchmarks.swe_bench.live_utils import (
                complete_runtime as complete_runtime_fn,
            )
        else:
            complete_runtime_fn = complete_runtime
        return_val = complete_runtime_fn(runtime, instance)
        git_patch = return_val['git_patch']
        logger.info(
            f'Got git diff for instance {instance.instance_id}:\n--------\n{git_patch}\n--------'
        )
    finally:
        runtime.close()
    # ==========================================

    # ======= Attempt to evaluate the agent's edits =======
    # we use eval_infer.sh to evaluate the agent's edits, not here
    # because the agent may alter the environment / testcases
    test_result = {
        'git_patch': git_patch,
    }

    # If you are working on some simpler benchmark that only evaluates the final model output (e.g., in a MessageAction)
    # You can simply get the LAST `MessageAction` from the returned `state.history` and parse it for evaluation.
    if state is None:
        raise ValueError('State should not be None.')

    # NOTE: this is NO LONGER the event stream, but an agent history that includes delegate agent's events
    histories = [event_to_dict(event) for event in state.history]
    metrics = get_metrics(state)

    # Save the output
    instruction = message_action.content
    if message_action.image_urls:
        instruction += (
            '\n\n<image_urls>' + '\n'.join(message_action.image_urls) + '</image_urls>'
        )
    output = EvalOutput(
        instance_id=instance.instance_id,
        instruction=instruction,
        instance=instance.to_dict(),  # SWE Bench specific
        test_result=test_result,
        metadata=metadata,
        history=histories,
        metrics=metrics,
        error=state.last_error if state and state.last_error else None,
    )
    return output

获取配置OpenHandsConfig

关键配置
  • base_container_image:数据实例镜像地址,根据instance_id任务做解析
  • sandbox 配置

OpenHandsConfig

python
def get_config(
    instance: pd.Series,
    metadata: EvalMetadata,
) -> OpenHandsConfig:
    # We use a different instance image for the each instance of swe-bench eval
    use_swebench_official_image = DATASET_TYPE != 'SWE-Gym'
    # 数据实例的镜像地址
    base_container_image = get_instance_docker_image(
        instance['instance_id'],
        swebench_official_image=use_swebench_official_image,
    )
    logger.info(
        f'Using instance container image: {base_container_image}. '
        f'Please make sure this image exists. '
        f'Submit an issue on https://github.com/OpenHands/OpenHands if you run into any issues.'
    )
    # sandbox 配置
    sandbox_config = get_default_sandbox_config_for_eval()
    # 数据实例的镜像地址,即数据基础镜像
    sandbox_config.base_container_image = base_container_image
    sandbox_config.enable_auto_lint = True
    sandbox_config.use_host_network = False
    # Add platform to the sandbox config to solve issue 4401
    sandbox_config.platform = 'linux/amd64'
    sandbox_config.remote_runtime_resource_factor = get_instance_resource_factor(
        dataset_name=metadata.dataset,
        instance_id=instance['instance_id'],
    )
    # openhands config,主配置
    config = get_openhands_config_for_eval( 
        metadata=metadata,
        enable_browser=RUN_WITH_BROWSING,
        runtime=os.environ.get('RUNTIME', 'docker'),
        sandbox_config=sandbox_config,
    )

    config.set_llm_config(
        update_llm_config_for_completions_logging(
            metadata.llm_config, metadata.eval_output_dir, instance['instance_id']
        )
    )
    # get 'draft_editor' config if exists
    config.set_llm_config(get_llm_config_arg('draft_editor'), 'draft_editor')

    model_routing_config = get_model_routing_config_arg()
    model_routing_config.llms_for_routing = (
        get_llms_for_routing_config()
    )  # Populate with LLMs for routing from config.toml file
    
    # agent 配置
    agent_config = AgentConfig(
        enable_jupyter=False,
        enable_browsing=RUN_WITH_BROWSING,
        enable_llm_editor=ENABLE_LLM_EDITOR,
        enable_mcp=False,
        condenser=metadata.condenser_config,
        enable_prompt_extensions=False,
        model_routing=model_routing_config,
        system_prompt_filename=metadata.agent_config.system_prompt_filename
        if metadata.agent_config
        else 'system_prompt.j2',
    )
    config.set_agent_config(agent_config)

    return config

Sandbox Config

python
def get_default_sandbox_config_for_eval() -> SandboxConfig:
    return SandboxConfig(
        use_host_network=False,
        # large enough timeout, since some testcases take very long to run
        timeout=300,
        api_key=os.environ.get('ALLHANDS_API_KEY', None),
        runtime_startup_env_vars={'NO_CHANGE_TIMEOUT_SECONDS': '30'},
        remote_runtime_api_url=os.environ.get('SANDBOX_REMOTE_RUNTIME_API_URL'),
        keep_runtime_alive=False,
        remote_runtime_init_timeout=3600,
        remote_runtime_api_timeout=120,
        remote_runtime_enable_retries=True,
        remote_runtime_class='sysbox',
    )

EvalMetadata

eval metadata,评估时的一些设置

python
def make_metadata(
    llm_config: LLMConfig,
    dataset_name: str,
    agent_class: str,
    max_iterations: int,
    eval_note: str | None,
    eval_output_dir: str,
    data_split: str | None = None,
    details: dict[str, Any] | None = None,
    agent_config: AgentConfig | None = None,
    condenser_config: CondenserConfig | None = None,
) -> EvalMetadata:
    model_name = llm_config.model.split('/')[-1]
    model_path = model_name.replace(':', '_').replace('@', '-')
    eval_note = f'_N_{eval_note}' if eval_note else ''

    eval_output_path = os.path.join(
        eval_output_dir,
        dataset_name,
        agent_class,
        f'{model_path}_maxiter_{max_iterations}{eval_note}',
    )

    pathlib.Path(eval_output_path).mkdir(parents=True, exist_ok=True)
    pathlib.Path(os.path.join(eval_output_path, 'logs')).mkdir(
        parents=True, exist_ok=True
    )
    logger.info(f'Using evaluation output directory: {eval_output_path}')

    metadata = EvalMetadata(
        agent_class=agent_class,
        llm_config=llm_config,
        agent_config=agent_config,
        max_iterations=max_iterations,
        eval_output_dir=eval_output_path,
        start_time=time.strftime('%Y-%m-%d %H:%M:%S'),
        git_commit=subprocess.check_output(['git', 'rev-parse', 'HEAD'])
        .decode('utf-8')
        .strip(),
        dataset=dataset_name,
        data_split=data_split,
        details=details,
        condenser_config=condenser_config
        if condenser_config
        else NoOpCondenserConfig(),
        instruction_template_name=os.environ.get('INSTRUCTION_TEMPLATE_NAME'),
    )
    metadata_json = metadata.model_dump_json()
    logger.info(f'Metadata: {metadata_json}')
    with open(os.path.join(eval_output_path, 'metadata.json'), 'w') as f:
        f.write(metadata_json)

    return metadata

镜像环境准备

解析实例镜像地址

python
def get_instance_docker_image(
    instance_id: str,
    swebench_official_image: bool = False,
) -> str:
    if swebench_official_image:
        # Official SWE-Bench image
        # swebench/sweb.eval.x86_64.django_1776_django-11333:v1
        # SWE-bench-Live uses the same naming convention as SWE-Bench
        if DATASET_TYPE == 'SWE-bench-Live':
            docker_image_prefix = 'docker.io/starryzhang/'
        elif DATASET_TYPE == 'SWE-bench':
            docker_image_prefix = 'docker.io/swebench/'
        elif DATASET_TYPE == 'SWE-rebench':
            docker_image_prefix = 'docker.io/swerebench/'
        repo, name = instance_id.split('__')
        image_name = f'{docker_image_prefix.rstrip("/")}/sweb.eval.x86_64.{repo}_1776_{name}:latest'.lower()
        logger.debug(f'Using official SWE-Bench image: {image_name}')
        return image_name
    else:
        # OpenHands version of the image
        # docker.io/xingyaoww/
        docker_image_prefix = DEFAULT_DOCKER_IMAGE_PREFIX
        image_name = 'sweb.eval.x86_64.' + instance_id
        image_name = image_name.replace(
            '__', '_s_'
        )  # to comply with docker image naming convention
        return (docker_image_prefix.rstrip('/') + '/' + image_name).lower()

通用Runtime构建入口

process_instance 处的调用代码

python
def process_instance(
    instance: pd.Series,
    metadata: EvalMetadata,
    reset_logger: bool = True,
    runtime_failure_count: int = 0,
) -> EvalOutput:
  # ....
  # ....
  # 构建runtime
  runtime = create_runtime(config) 
  call_async_from_sync(runtime.connect) 
  try:
      initialize_runtime(runtime, instance, metadata) 
      message_action = get_instruction(instance, metadata) 

      # Here's how you can run the agent (similar to the `main` function) and get the final task state
      state: State | None = asyncio.run(
          run_controller( 
              config=config, 
              initial_user_action=message_action, 
              runtime=runtime, 
              fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN[ 
                  metadata.agent_class 
              ], 
          ) 
      )
通用create_runtime函数

参考链接

python
def create_runtime(
    config: OpenHandsConfig,
    llm_registry: LLMRegistry | None = None,
    sid: str | None = None,
    headless_mode: bool = True,
    agent: Agent | None = None,
    git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
) -> Runtime:
    """Create a runtime for the agent to run on.

    Args:
        config: The app config.
        sid: (optional) The session id. IMPORTANT: please don't set this unless you know what you're doing.
            Set it to incompatible value will cause unexpected behavior on RemoteRuntime.
        headless_mode: Whether the agent is run in headless mode. `create_runtime` is typically called within evaluation scripts,
            where we don't want to have the VSCode UI open, so it defaults to True.
        agent: (optional) The agent instance to use for configuring the runtime.

    Returns:
        The created Runtime instance (not yet connected or initialized).
    """
    # if sid is provided on the command line, use it as the name of the event stream
    # otherwise generate it on the basis of the configured jwt_secret
    # we can do this better, this is just so that the sid is retrieved when we want to restore the session
    session_id = sid or generate_sid(config)

    # set up the event stream
    file_store = get_file_store(config.file_store, config.file_store_path)
    event_stream = EventStream(session_id, file_store)

    # agent class
    if agent:
        agent_cls = type(agent)
    else:
        agent_cls = Agent.get_cls(config.default_agent)

    # runtime and tools
    runtime_cls = get_runtime_cls(config.runtime)
    # logger.debug(f'Initializing runtime: {runtime_cls.__name__}')
    logger.info(f'Initializing runtime: {runtime_cls.__name__}')
    runtime: Runtime = runtime_cls(
        config=config,
        event_stream=event_stream,
        sid=session_id,
        plugins=agent_cls.sandbox_plugins,
        headless_mode=headless_mode,
        llm_registry=llm_registry or LLMRegistry(config),
        git_provider_tokens=git_provider_tokens,
    )

    # Log the plugins that have been registered with the runtime for debugging purposes
    logger.debug(
        f'Runtime created with plugins: {[plugin.name for plugin in runtime.plugins]}'
    )

    return runtime

DockerRuntime 构造函数

DockerRuntime 构造函数

参考链接

关键参数

  • base_container_image:数据基础镜像
  • runtime_container_image:运行时镜像?默认为None
  • sandbox.local_runtime_url
python
class DockerRuntime(ActionExecutionClient):
    """This runtime will subscribe the event stream.

    When receive an event, it will send the event to runtime-client which run inside the docker environment.

    Args:
        config (OpenHandsConfig): The application configuration.
        event_stream (EventStream): The event stream to subscribe to.
        sid (str, optional): The session ID. Defaults to 'default'.
        plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
        env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
    """

    _shutdown_listener_id: UUID | None = None

    def __init__(
        self,
        config: OpenHandsConfig,
        event_stream: EventStream,
        llm_registry: LLMRegistry,
        sid: str = 'default',
        plugins: list[PluginRequirement] | None = None,
        env_vars: dict[str, str] | None = None,
        status_callback: Callable | None = None,
        attach_to_existing: bool = False,
        headless_mode: bool = True,
        user_id: str | None = None,
        git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
        main_module: str = DEFAULT_MAIN_MODULE,
    ):
        if not DockerRuntime._shutdown_listener_id:
            DockerRuntime._shutdown_listener_id = add_shutdown_listener(
                lambda: stop_all_containers(CONTAINER_NAME_PREFIX)
            )

        self.config = config
        self.status_callback = status_callback

        self._host_port = -1
        self._container_port = -1
        self._vscode_port = -1
        self._app_ports: list[int] = []

        # Port locks to prevent race conditions
        self._host_port_lock: PortLock | None = None
        self._vscode_port_lock: PortLock | None = None
        self._app_port_locks: list[PortLock] = []

        if os.environ.get('DOCKER_HOST_ADDR'):
            logger.info(
                f'Using DOCKER_HOST_IP: {os.environ["DOCKER_HOST_ADDR"]} for local_runtime_url'
            )
            self.config.sandbox.local_runtime_url = (
                f'http://{os.environ["DOCKER_HOST_ADDR"]}'
            )

        self.docker_client: docker.DockerClient = self._init_docker_client() 
        self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
        # base_container_image
        self.base_container_image = self.config.sandbox.base_container_image 
        self.runtime_container_image = self.config.sandbox.runtime_container_image 
        # CONTAINER_NAME_PREFIX = 'openhands-runtime-', sid 为session id
        self.container_name = CONTAINER_NAME_PREFIX + sid 
        self.container: Container | None = None
        self.main_module = main_module

        self.runtime_builder = DockerRuntimeBuilder(self.docker_client)

        # Buffer for container logs
        self.log_streamer: LogStreamer | None = None

        super().__init__(
            config,
            event_stream,
            llm_registry,
            sid,
            plugins,
            env_vars,
            status_callback,
            attach_to_existing,
            headless_mode,
            user_id,
            git_provider_tokens,
        )

        # Log runtime_extra_deps after base class initialization so self.sid is available
        if self.config.sandbox.runtime_extra_deps:
            self.log(
                'debug',
                f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}',
            )
  	@staticmethod
    @lru_cache(maxsize=1)
    def _init_docker_client() -> docker.DockerClient:
        try:
            return docker.from_env()
        except Exception as ex:
            logger.error(
                'Launch docker client failed. Please make sure you have installed docker and started docker desktop/daemon.',
            )
            raise ex

Docker准备运行时容器

connect 主方法

参考链接

核心方法

  • 如果已有容器,则
    • 启动并attach到该容器设置api url
  • 如果没有容器,则
    • 拉取构建镜像
    • 初始化容器
python
class DockerRuntime(ActionExecutionClient):
  '''省略若干代码
  '''
  async def connect(self) -> None:
      self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
      try:
          await call_sync_from_async(self._attach_to_container) 
      except docker.errors.NotFound as e: 
          if self.attach_to_existing:
              self.log(
                  'warning',
                  f'Container {self.container_name} not found.',
              )
              raise AgentRuntimeDisconnectedError from e
          # 重新构建镜像
          self.maybe_build_runtime_container_image() 
          self.log(
              'info', f'Starting runtime with image: {self.runtime_container_image}'
          )
          # 初始化容器
          await call_sync_from_async(self.init_container) 
          self.log(
              'info',
              f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
          )

      if DEBUG_RUNTIME and self.container:
          self.log_streamer = LogStreamer(self.container, self.log)
      else:
          self.log_streamer = None

      if not self.attach_to_existing:
          self.log('info', f'Waiting for client to become ready at {self.api_url}...')
          self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)

      await call_sync_from_async(self.wait_until_alive)

      if not self.attach_to_existing:
          self.log('info', 'Runtime is ready.')

      if not self.attach_to_existing:
          await call_sync_from_async(self.setup_initial_env)

      self.log(
          'debug',
          f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}. VSCode URL: {self.vscode_url}',
      )
      if not self.attach_to_existing:
          self.set_runtime_status(RuntimeStatus.READY)
      self._runtime_initialized = True

      for network_name in self.config.sandbox.additional_networks:
          try:
              network = self.docker_client.networks.get(network_name)
              if self.container is not None:
                  network.connect(self.container)
              else:
                  self.log(
                      'warning',
                      f'Container not available to connect to network {network_name}',
                  )
          except Exception as e:
              self.log(
                  'error',
                  f'Error: Failed to connect instance {self.container_name} to network {network_name}',
              )
              self.log('error', str(e))
Attatch 到已有容器
  • 启动容器设置api_url
python
def _attach_to_container(self) -> None:
  # 容器名称,在init里根据session id来自定义的
  self.container = self.docker_client.containers.get(self.container_name) 
  if self.container.status == 'exited':
    	# 启动容器
      self.container.start() 

  config = self.container.attrs['Config']
  for env_var in config['Env']:
      if env_var.startswith('port='):
          self._host_port = int(env_var.split('port=')[1])
          self._container_port = self._host_port
      elif env_var.startswith('VSCODE_PORT='):
          self._vscode_port = int(env_var.split('VSCODE_PORT=')[1])

  self._app_ports = []
  exposed_ports = config.get('ExposedPorts')
  if exposed_ports:
      for exposed_port in exposed_ports.keys():
          exposed_port = int(exposed_port.split('/tcp')[0])
          if (
              exposed_port != self._host_port
              and exposed_port != self._vscode_port
          ):
              self._app_ports.append(exposed_port)
  # api_url:sandbox_url + 容器端口
  self.api_url = f'{self.config.sandbox.local_runtime_url}:{self._container_port}'
  self.log(
      'debug',
      f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
  )

attach失败,则重新创建镜像,根据实例的base_container_image

python
def maybe_build_runtime_container_image(self):
  if self.runtime_container_image is None: 
      if self.base_container_image is None: 
          raise ValueError(
              'Neither runtime container image nor base container image is set'
          )
      self.set_runtime_status(RuntimeStatus.BUILDING_RUNTIME)
      self.runtime_container_image = build_runtime_image( 
          self.base_container_image, 
          self.runtime_builder,
          platform=self.config.sandbox.platform,
          extra_deps=self.config.sandbox.runtime_extra_deps,
          force_rebuild=self.config.sandbox.force_rebuild_runtime,
          extra_build_args=self.config.sandbox.runtime_extra_build_args,
          enable_browser=self.config.enable_browser,
      )

Docker从头开始构建镜像

build_runtime_image 入口

参考链接

核心内容

  • 传入base_imageruntime_builder
python
def build_runtime_image(
    base_image: str,
    runtime_builder: RuntimeBuilder,
    platform: str | None = None,
    extra_deps: str | None = None,
    build_folder: str | None = None,
    dry_run: bool = False,
    force_rebuild: bool = False,
    extra_build_args: list[str] | None = None,
    enable_browser: bool = True,
) -> str:
    """Prepares the final docker build folder.

    If dry_run is False, it will also build the OpenHands runtime Docker image using the docker build folder.

    Parameters:
    - base_image (str): The name of the base Docker image to use
    - runtime_builder (RuntimeBuilder): The runtime builder to use
    - platform (str): The target platform for the build (e.g. linux/amd64, linux/arm64)
    - extra_deps (str):
    - build_folder (str): The directory to use for the build. If not provided a temporary directory will be used
    - dry_run (bool): if True, it will only ready the build folder. It will not actually build the Docker image
    - force_rebuild (bool): if True, it will create the Dockerfile which uses the base_image
    - extra_build_args (List[str]): Additional build arguments to pass to the builder
    - enable_browser (bool): Whether to enable browser support (install Playwright)

    Returns:
    - str: <image_repo>:<MD5 hash>. Where MD5 hash is the hash of the docker build folder

    See https://docs.all-hands.dev/usage/architecture/runtime for more details.
    """
    if build_folder is None:
        with tempfile.TemporaryDirectory() as temp_dir:
            result = build_runtime_image_in_folder( 
                base_image=base_image, 
                runtime_builder=runtime_builder,
                build_folder=Path(temp_dir),
                extra_deps=extra_deps,
                dry_run=dry_run,
                force_rebuild=force_rebuild,
                platform=platform,
                extra_build_args=extra_build_args,
                enable_browser=enable_browser,
            )
            return result

    result = build_runtime_image_in_folder(
        base_image=base_image,
        runtime_builder=runtime_builder,
        build_folder=Path(build_folder),
        extra_deps=extra_deps,
        dry_run=dry_run,
        force_rebuild=force_rebuild,
        platform=platform,
        extra_build_args=extra_build_args,
        enable_browser=enable_browser,
    )
    return result
build_runtime_image_in_folder

参考链接

核心内容

  • 基础镜像,base_image:
    • 实例的指定镜像
    • 如swebench/sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439:latest
  • 运行时镜像,runtime_image:
    • Openhands真正启动时的镜像
    • 以base_image为底座,把openhands的runtime_client和agent_skills等安装进去

三层缓存

  • Versioned Tag:底座标签,OpenHands 版本 + 系统级软件 + 基础镜像名
  • Lock Tag:依赖标签,Openhands所需要的python第三方包。
  • Source Tag:源代码标签,最终运行的镜像。

构建策略

  • 先找Source Tag
    • 有的话,直接启动。
  • 再找Lock Tag
    • 代码变量,但重新搞一个
    • 基于Lock Image,只把代码拷贝进去。
  • 接着找 Versioned Tag
    • 依赖也变了,最基础的OpenHadns也没有。
    • 从 Versioned镜像,重新安装依赖,拷贝代码。
  • 全都找不到,重新开始构建
    • 原数据镜像 + 更新系统 + 装python + 安装依赖 + 拷贝代码等。
python
def build_runtime_image_in_folder(
    base_image: str,
    runtime_builder: RuntimeBuilder,
    build_folder: Path,
    extra_deps: str | None,
    dry_run: bool,
    force_rebuild: bool,
    platform: str | None = None,
    extra_build_args: list[str] | None = None,
    enable_browser: bool = True,
) -> str:
  	# 解析runtime_image_repo和tag,tag未使用
    # 'ghcr.io/openhands/runtime', 'oh_v1.2.1_image_98232eab__1776_scikit-learn-13439_tag_latest'
    runtime_image_repo, _ = get_runtime_image_repo_and_tag(base_image) 
    # 'oh_v1.2.1_2sfgez3r5yzdqiuh'
    lock_tag = (
        f'oh_v{get_version()}_{get_hash_for_lock_files(base_image, enable_browser)}'
    )
    # 'oh_v1.2.1_docker.io_s_swebench_s_sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439_t_latest'
    versioned_tag = (
        # truncate the base image to 96 characters to fit in the tag max length (128 characters)
        f'oh_v{get_version()}_{get_tag_for_versioned_image(base_image)}'
    )
    # 'ghcr.io/openhands/runtime:oh_v1.2.1_docker.io_s_swebench_s_sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439_t_latest'
    versioned_image_name = f'{runtime_image_repo}:{versioned_tag}'
    # 'oh_v1.2.1_2sfgez3r5yzdqiuh_rdcry28kzl7vjpv1'
    source_tag = f'{lock_tag}_{get_hash_for_source_files()}'
    # 'ghcr.io/openhands/runtime:oh_v1.2.1_2sfgez3r5yzdqiuh_rdcry28kzl7vjpv1'
    hash_image_name = f'{runtime_image_repo}:{source_tag}'

    logger.info(f'Building image: {hash_image_name}')
    if force_rebuild:
        logger.debug(
            f'Force rebuild: [{runtime_image_repo}:{source_tag}] from scratch.'
        )
        prep_build_folder(
            build_folder,
            base_image,
            build_from=BuildFromImageType.SCRATCH,
            extra_deps=extra_deps,
            enable_browser=enable_browser,
        )
        if not dry_run:
            _build_sandbox_image(
                build_folder,
                runtime_builder,
                runtime_image_repo,
                source_tag,
                lock_tag,
                versioned_tag,
                platform,
                extra_build_args=extra_build_args,
            )
        return hash_image_name
    # 'ghcr.io/openhands/runtime:oh_v1.2.1_2sfgez3r5yzdqiuh'
    lock_image_name = f'{runtime_image_repo}:{lock_tag}'
    build_from = BuildFromImageType.SCRATCH

    # If the exact image already exists, we do not need to build it
    if runtime_builder.image_exists(hash_image_name, False):
        logger.debug(f'Reusing Image [{hash_image_name}]')
        return hash_image_name

    # We look for an existing image that shares the same lock_tag. If such an image exists, we
    # can use it as the base image for the build and just copy source files. This makes the build
    # much faster.
    if runtime_builder.image_exists(lock_image_name):
        logger.debug(f'Build [{hash_image_name}] from lock image [{lock_image_name}]')
        build_from = BuildFromImageType.LOCK
        base_image = lock_image_name
    elif runtime_builder.image_exists(versioned_image_name):
        logger.info(
            f'Build [{hash_image_name}] from versioned image [{versioned_image_name}]'
        )
        build_from = BuildFromImageType.VERSIONED
        base_image = versioned_image_name
    else:
        logger.debug(f'Build [{hash_image_name}] from scratch')

    prep_build_folder(build_folder, base_image, build_from, extra_deps, enable_browser)
    if not dry_run:
        _build_sandbox_image(
            build_folder,
            runtime_builder,
            runtime_image_repo,
            source_tag=source_tag,
            lock_tag=lock_tag,
            # Only tag the versioned image if we are building from scratch.
            # This avoids too much layers when you lay one image on top of another multiple times
            versioned_tag=(
                versioned_tag if build_from == BuildFromImageType.SCRATCH else None
            ),
            platform=platform,
            extra_build_args=extra_build_args,
        )

    return hash_image_name
python
def get_runtime_image_repo_and_tag(base_image: str) -> tuple[str, str]:
    """Retrieves the Docker repo and tag associated with the Docker image.

    Parameters:
    - base_image (str): The name of the base Docker image

    Returns:
    - tuple[str, str]: The Docker repo and tag of the Docker image
    """
    if get_runtime_image_repo() in base_image:
        logger.debug(
            f'The provided image [{base_image}] is already a valid runtime image.\n'
            f'Will try to reuse it as is.'
        )

        if ':' not in base_image:
            base_image = base_image + ':latest'
        repo, tag = base_image.split(':')
        return repo, tag
    else:
        if ':' not in base_image:
            base_image = base_image + ':latest'
        [repo, tag] = base_image.split(':')

        # Hash the repo if it's too long
        if len(repo) > 32:
            repo_hash = hashlib.md5(repo[:-24].encode()).hexdigest()[:8]

            repo = f'{repo_hash}_{repo[-24:]}'  # Use 8 char hash + last 24 chars
        repo = repo.replace('/', '_s_')

        new_tag = f'oh_v{get_version()}_image_{repo}_tag_{tag}'

        # if it's still too long, hash the entire image name
        if len(new_tag) > 128:
            new_tag = f'oh_v{get_version()}_image_{hashlib.md5(new_tag.encode()).hexdigest()[:64]}'
            logger.warning(
                f'The new tag [{new_tag}] is still too long, so we use an hash of the entire image name: {new_tag}'
            )

        return get_runtime_image_repo(), new_tag

def get_runtime_image_repo() -> str:
    return os.getenv('OH_RUNTIME_RUNTIME_IMAGE_REPO', 'ghcr.io/openhands/runtime')
笔记
  • copy openhands源代码
  • 拷贝:skills文件夹
  • 拷贝:pyproject.toml、petry.lock文件
  • 生成dockerfile,/tmp/tmpomhu7xu3/Dockerfile
python
def prep_build_folder(
    build_folder: Path,
    base_image: str,
    build_from: BuildFromImageType,
    extra_deps: str | None,
    enable_browser: bool = True,
) -> None:
    # Copy the source code to directory. It will end up in build_folder/code
    # If package is not found, build from source code
    openhands_source_dir = Path(openhands.__file__).parent
    project_root = openhands_source_dir.parent
    logger.debug(f'Building source distribution using project root: {project_root}')

    # Copy the 'openhands' directory (Source code)
    shutil.copytree(
        openhands_source_dir,
        Path(build_folder, 'code', 'openhands'),
        ignore=shutil.ignore_patterns(
            '.*/',
            '__pycache__/',
            '*.pyc',
            '*.md',
        ),
    )

    # Copy the 'skills' directory (Skills)
    shutil.copytree(Path(project_root, 'skills'), Path(build_folder, 'code', 'skills'))

    # Copy pyproject.toml and poetry.lock files
    for file in ['pyproject.toml', 'poetry.lock']:
        src = Path(openhands_source_dir, file)
        if not src.exists():
            src = Path(project_root, file)
        shutil.copy2(src, Path(build_folder, 'code', file))

    # Create a Dockerfile and write it to build_folder
    dockerfile_content = _generate_dockerfile(
        base_image,
        build_from=build_from,
        extra_deps=extra_deps,
        enable_browser=enable_browser,
    )
    dockerfile_path = Path(build_folder, 'Dockerfile')
    with open(str(dockerfile_path), 'w') as f:
        f.write(dockerfile_content)

Dockerfile 示例

Docker file 如下:

Docker file

0. 核心功能

  • 把swe-bench的毛坯房镜像,做一层精装修。

1. 基础镜像和全局变量

  • FROM:指定基础数据镜像,如 swe-bench镜像。
  • SHELL:从 /bin/sh 改为 /bin/bash,并开启命令模式
  • ENV:设置环境变量

2. 系统工具安装

  • apt-get,uv 等

3. 用户与权限管理

  • 创建openhands用户,UID=1000
  • NOPASSWD:设置无需密码即可执行sudo

4. Python环境管理 (Micromamba + Poeytry)

  • 切换到用户openhands来安装。
  • Micromamba:很快的conda替代品,管理python
  • Poetry:管理项目依赖包

5. VSCoder Server 安装

  • 下载安装vscode server,启动网页版的vscode,AI和用户都可通过浏览器直接改代码

6. 项目源代码拷贝

  • openhands core、skills等文件夹

7. 自动化配置

  • 激活环境、用户等,进入即可使用。
bash
# 基础镜像与全局变量
FROM docker.io/swebench/sweb.eval.x86_64.scikit-learn_1776_scikit-learn-13439:latest
SHELL ["/bin/bash", "-c"]
# 公共环境变量,Shared environment variables
ENV POETRY_VIRTUALENVS_PATH=/openhands/poetry \
    MAMBA_ROOT_PREFIX=/openhands/micromamba \
    LANG=C.UTF-8 \
    LC_ALL=C.UTF-8 \
    EDITOR=code \
    VISUAL=code \
    GIT_EDITOR="code --wait" \
    OPENVSCODE_SERVER_ROOT=/openhands/.openvscode-server
bash
# ================================================================
# START: Build Runtime Image from Scratch
# ================================================================
# This is used in cases where the base image is something more generic like nikolaik/python-nodejs
# rather than the current OpenHands release


# Set PATH early to ensure system commands are available
ENV PATH="/usr/bin:/bin:/usr/sbin:/sbin:$PATH"

# Install base system dependencies

RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        wget curl ca-certificates sudo apt-utils git jq tmux build-essential ripgrep ffmpeg \
        coreutils util-linux procps findutils grep sed \
        libasound2-plugins libatomic1 && \
    (apt-get install -y --no-install-recommends libgl1 || apt-get install -y --no-install-recommends libgl1-mesa-glx) && \
        # Install Docker dependencies
    apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl gnupg lsb-release


# Install uv (required by MCP)
RUN curl -LsSf https://astral.sh/uv/install.sh | env UV_INSTALL_DIR="/openhands/bin" sh
# Add /openhands/bin to PATH
ENV PATH="/openhands/bin:${PATH}"

# Remove UID 1000 and GID 1000 users/groups that might conflict with openhands user
RUN (if getent passwd 1000 | grep -q pn; then userdel pn; fi) && \
    (if getent passwd 1000 | grep -q ubuntu; then userdel ubuntu; fi) && \
    (if getent group 1000 | grep -q pn; then groupdel pn; fi) && \
    (if getent group 1000 | grep -q ubuntu; then groupdel ubuntu; fi)

# Create openhands group and user (with fallback IDs if 1000 is taken)
RUN (if getent group 1000 >/dev/null 2>&1; then \
        groupadd openhands; \
    else \
        groupadd -g 1000 openhands; \
    fi) && \
    (if getent passwd 1000 >/dev/null 2>&1; then \
        useradd -g openhands -m -s /bin/bash openhands; \
    else \
        useradd -u 1000 -g openhands -m -s /bin/bash openhands; \
    fi) && \
    usermod -aG sudo openhands && \
    echo 'openhands ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \
    # Set empty password for openhands user to allow passwordless su
    passwd -d openhands && \
    # Set empty password for root user as well to ensure su works in both directions
    passwd -d root && \
    # Ensure root can su to openhands without password by configuring PAM
    sed -i '/pam_rootok.so/d' /etc/pam.d/su && \
    sed -i '1i auth sufficient pam_rootok.so' /etc/pam.d/su

# Create necessary directories
RUN mkdir -p /openhands && \
    mkdir -p /openhands/logs && \
    mkdir -p /openhands/poetry && \
    chown -R openhands:openhands /openhands


# ================================================================
# Define Docker installation macro

# Install Docker only if not a swebench or mswebench image

# ================================================================

# Install micromamba,conda替代品
RUN mkdir -p /openhands/micromamba/bin && \
    /bin/bash -c "PREFIX_LOCATION=/openhands/micromamba BIN_FOLDER=/openhands/micromamba/bin INIT_YES=no CONDA_FORGE_YES=yes $(curl -L https://micro.mamba.pm/install.sh)" && \
    /openhands/micromamba/bin/micromamba config remove channels defaults && \
    /openhands/micromamba/bin/micromamba config list && \
    chown -R openhands:openhands /openhands/micromamba && \
    # Create read-only shared access to micromamba for all users
    # This allows both root and openhands users to access the same packages
    # while maintaining security by keeping openhands as the owner
    chmod -R 755 /openhands/micromamba && \
    # Create a separate writable location for root's micromamba cache/config
    mkdir -p /root/.local/share/micromamba && \
    # Set up environment variables for system-wide access
    echo 'export PATH="/openhands/micromamba/bin:$PATH"' >> /etc/environment

# Create the openhands virtual environment and install poetry and python
# Run as openhands user to avoid expensive chown -R operations later

# openhands 用户
USER openhands
RUN /openhands/micromamba/bin/micromamba create -n openhands -y && \
    /openhands/micromamba/bin/micromamba install -n openhands -c conda-forge poetry python=3.12 -y
USER root

# Create a clean openhands directory including only the pyproject.toml, poetry.lock and openhands/__init__.py
RUN \
    if [ -d /openhands/code ]; then rm -rf /openhands/code; fi && \
    mkdir -p /openhands/code/openhands && \
    touch /openhands/code/openhands/__init__.py && \
    chown -R openhands:openhands /openhands/code && \
    # Set global git configuration to ensure proper author/committer information
    git config --global user.name "openhands" && \
    git config --global user.email "openhands@all-hands.dev"

COPY --chown=openhands:openhands ./code/pyproject.toml ./code/poetry.lock /openhands/code/


# Install user-level dependencies as openhands user
# 安装openhands用户级依赖
WORKDIR /openhands/code

USER openhands
RUN \
    /openhands/micromamba/bin/micromamba config set changeps1 False && \
    /openhands/micromamba/bin/micromamba run -n openhands poetry config virtualenvs.path /openhands/poetry && \
    /openhands/micromamba/bin/micromamba run -n openhands poetry env use python3.12 && \
    # Install project dependencies
    /openhands/micromamba/bin/micromamba run -n openhands poetry install --only main,runtime --no-interaction --no-root && \
    # Clean up user caches
    /openhands/micromamba/bin/micromamba run -n openhands poetry cache clear --all . -n && \
    /openhands/micromamba/bin/micromamba clean --all


# Install system-level dependencies that require root
USER root
RUN \
    
    # Set environment variables (requires root)
    /openhands/micromamba/bin/micromamba run -n openhands poetry run python -c "import sys; print('OH_INTERPRETER_PATH=' + sys.executable)" >> /etc/environment && \
    # Set permissions for shared read-only access
    # Note: chown -R operations removed - files are now created with correct ownership
    # by running micromamba/poetry as openhands user (see install_dependencies_user)
    chmod -R 755 /openhands/poetry && \
    chmod -R 755 /openhands/micromamba && \
    mkdir -p /openhands/workspace && chmod -R g+rws,o+rw /openhands/workspace && \
    chown -R openhands:openhands /openhands/workspace && \
    # Ensure PATH includes system binaries early in startup
    echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:$PATH"' >> /etc/environment && \
    echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:$PATH"' >> /etc/bash.bashrc && \
    # Set up conda environment activation for all users
    echo 'eval "$(/openhands/micromamba/bin/micromamba shell hook --shell bash)"' >> /etc/bash.bashrc && \
    echo 'micromamba activate openhands 2>/dev/null || true' >> /etc/bash.bashrc && \
    # Set up environment for root user
    echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:/openhands/micromamba/bin:$PATH"' >> /root/.bashrc && \
    echo 'export PLAYWRIGHT_BROWSERS_PATH=/opt/playwright-browsers' >> /root/.bashrc && \
    echo 'eval "$(/openhands/micromamba/bin/micromamba shell hook --shell bash)"' >> /root/.bashrc && \
    echo 'micromamba activate openhands 2>/dev/null || true' >> /root/.bashrc && \
    # Clean up system packages (requires root)
    apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*



# ================================================================
# END: Build Runtime Image from Scratch
# ================================================================
bash
# Ensure openhands user/group and base dirs exist even when not building from scratch
USER root
RUN \
    # Ensure group exists (prefer GID 1000 if available)
    if ! getent group openhands >/dev/null 2>&1; then \
        if getent group 1000 >/dev/null 2>&1; then groupadd openhands; else groupadd -g 1000 openhands; fi; \
    fi && \
    # Ensure user exists (prefer UID 1000 if available)
    if ! id -u openhands >/dev/null 2>&1; then \
        if getent passwd 1000 >/dev/null 2>&1; then useradd -m -s /bin/bash -g openhands openhands; else useradd -u 1000 -g openhands -m -s /bin/bash openhands; fi; \
    fi && \
    # Ensure home and required directories exist before later steps
    mkdir -p /home/openhands && \
    mkdir -p /openhands && \
    mkdir -p $(dirname ${OPENVSCODE_SERVER_ROOT}) && \
    # Ensure ownership is correct for all OpenHands paths
    chown -R openhands:openhands /home/openhands || true && \
    chown -R openhands:openhands /openhands || true


# Reference:
# 1. https://github.com/gitpod-io/openvscode-server
# 2. https://github.com/gitpod-io/openvscode-releases

# Setup VSCode Server
ARG RELEASE_TAG="openvscode-server-v1.98.2"
ARG RELEASE_ORG="gitpod-io"
# ARG USERNAME=openvscode-server
# ARG USER_UID=1000
# ARG USER_GID=1000

RUN if [ -z "${RELEASE_TAG}" ]; then \
        echo "The RELEASE_TAG build arg must be set." >&2 && \
        exit 1; \
    fi && \
    arch=$(uname -m) && \
    if [ "${arch}" = "x86_64" ]; then \
        arch="x64"; \
    elif [ "${arch}" = "aarch64" ]; then \
        arch="arm64"; \
    elif [ "${arch}" = "armv7l" ]; then \
        arch="armhf"; \
    fi && \
    wget https://github.com/${RELEASE_ORG}/openvscode-server/releases/download/${RELEASE_TAG}/${RELEASE_TAG}-linux-${arch}.tar.gz && \
    tar -xzf ${RELEASE_TAG}-linux-${arch}.tar.gz && \
    if [ -d "${OPENVSCODE_SERVER_ROOT}" ]; then rm -rf "${OPENVSCODE_SERVER_ROOT}"; fi && \
    mv ${RELEASE_TAG}-linux-${arch} ${OPENVSCODE_SERVER_ROOT} && \
    cp ${OPENVSCODE_SERVER_ROOT}/bin/remote-cli/openvscode-server ${OPENVSCODE_SERVER_ROOT}/bin/remote-cli/code && \
    rm -f ${RELEASE_TAG}-linux-${arch}.tar.gz && \
    chown -R openhands:openhands ${OPENVSCODE_SERVER_ROOT}


# ================================================================
# Copy Project source files
# ================================================================
RUN if [ -d /openhands/code/openhands ]; then rm -rf /openhands/code/openhands; fi
COPY --chown=openhands:openhands ./code/pyproject.toml ./code/poetry.lock /openhands/code/
RUN if [ -d /openhands/code/skills ]; then rm -rf /openhands/code/skills; fi
COPY --chown=openhands:openhands ./code/skills /openhands/code/skills
COPY --chown=openhands:openhands ./code/openhands /openhands/code/openhands
RUN chmod a+rwx /openhands/code/openhands/__init__.py && \
    chown -R openhands:openhands /openhands/code


# ================================================================
# END: Build from versioned image
# ================================================================
bash
# Install extra dependencies if specified (as openhands user)

# Set up environment for openhands user
USER root
RUN \
    # Set up environment for openhands user
    echo 'export PATH="/usr/bin:/bin:/usr/sbin:/sbin:/openhands/micromamba/bin:$PATH"' >> /home/openhands/.bashrc && \
    echo 'export PLAYWRIGHT_BROWSERS_PATH=/opt/playwright-browsers' >> /home/openhands/.bashrc && \
    echo 'eval "$(/openhands/micromamba/bin/micromamba shell hook --shell bash)"' >> /home/openhands/.bashrc && \
    echo 'micromamba activate openhands 2>/dev/null || true' >> /home/openhands/.bashrc && \
    chown openhands:openhands /home/openhands/.bashrc

DockerRuntimeBuilder

python
def build(
    self,
    path: str,
    tags: list[str],
    platform: str | None = None,
    extra_build_args: list[str] | None = None,
    use_local_cache: bool = False,
) -> str:
    """Builds a Docker image using BuildKit and handles the build logs appropriately.

    Args:
        path (str): The path to the Docker build context.
        tags (list[str]): A list of image tags to apply to the built image.
        platform (str, optional): The target platform for the build. Defaults to None.
        use_local_cache (bool, optional): Whether to use and update the local build cache. Defaults to True.
        extra_build_args (list[str], optional): Additional arguments to pass to the Docker build command. Defaults to None.

    Returns:
        str: The name of the built Docker image.

    Raises:
        AgentRuntimeBuildError: If the Docker server version is incompatible or if the build process fails.

    Note:
        This method uses Docker BuildKit for improved build performance and caching capabilities.
        If `use_local_cache` is True, it will attempt to use and update the build cache in a local directory.
        The `extra_build_args` parameter allows for passing additional Docker build arguments as needed.
    """
    self.docker_client = docker.from_env()
    version_info = self.docker_client.version()
    server_version = version_info.get('Version', '').split('+')[0].replace('-', '.')
    components = version_info.get('Components')
    self.is_podman = (
        components is not None
        and len(components) > 0
        and components[0].get('Name', '').startswith('Podman')
    )
    if tuple(map(int, server_version.split('.'))) < (18, 9) and not self.is_podman:
        raise AgentRuntimeBuildError(
            'Docker server version must be >= 18.09 to use BuildKit'
        )

    if self.is_podman and tuple(map(int, server_version.split('.'))) < (4, 9):
        raise AgentRuntimeBuildError('Podman server version must be >= 4.9.0')

    if not DockerRuntimeBuilder.check_buildx(self.is_podman):
        # when running openhands in a container, there might not be a "docker"
        # binary available, in which case we need to download docker binary.
        # since the official openhands app image is built from debian, we use
        # debian way to install docker binary
        logger.info(
            'No docker binary available inside openhands-app container, trying to download online...'
        )
        commands = [
            'apt-get update',
            'apt-get install -y ca-certificates curl gnupg',
            'install -m 0755 -d /etc/apt/keyrings',
            'curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc',
            'chmod a+r /etc/apt/keyrings/docker.asc',
            'echo \
              "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
              $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
              tee /etc/apt/sources.list.d/docker.list > /dev/null',
            'apt-get update',
            'apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin',
        ]
        for cmd in commands:
            try:
                subprocess.run(
                    cmd, shell=True, check=True, stdout=subprocess.DEVNULL
                )
            except subprocess.CalledProcessError as e:
                logger.error(f'Image build failed:\n{e}')
                logger.error(f'Command output:\n{e.output}')
                raise
        logger.info('Downloaded and installed docker binary')

    target_image_hash_name = tags[0]
    target_image_repo, target_image_source_tag = target_image_hash_name.split(':')
    target_image_tag = tags[1].split(':')[1] if len(tags) > 1 else None

    buildx_cmd = [
        'docker' if not self.is_podman else 'podman',
        'buildx',
        'build',
        '--progress=plain',
        f'--build-arg=OPENHANDS_RUNTIME_VERSION={get_version()}',
        f'--build-arg=OPENHANDS_RUNTIME_BUILD_TIME={datetime.datetime.now().isoformat()}',
        f'--tag={target_image_hash_name}',
        '--load',
    ]

    # Include the platform argument only if platform is specified
    if platform:
        buildx_cmd.append(f'--platform={platform}')

    cache_dir = '/tmp/.buildx-cache'
    if use_local_cache and self._is_cache_usable(cache_dir):
        buildx_cmd.extend(
            [
                f'--cache-from=type=local,src={cache_dir}',
                f'--cache-to=type=local,dest={cache_dir},mode=max',
            ]
        )

    if extra_build_args:
        buildx_cmd.extend(extra_build_args)

    buildx_cmd.append(path)  # must be last!

    self.rolling_logger.start(
        f'================ {buildx_cmd[0].upper()} BUILD STARTED ================'
    )

    builder_cmd = ['docker', 'buildx', 'use', 'default']
    subprocess.Popen(
        builder_cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True,
    )

    try:
        process = subprocess.Popen(
            buildx_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True,
            bufsize=1,
        )

        output_lines = []
        if process.stdout:
            for line in iter(process.stdout.readline, ''):
                line = line.strip()
                if line:
                    output_lines.append(line)  # Store all output lines
                    self._output_logs(line)

        return_code = process.wait()

        if return_code != 0:
            # Use the collected output for error reporting
            output_str = '\n'.join(output_lines)
            raise subprocess.CalledProcessError(
                return_code,
                process.args,
                output=output_str,  # Use the collected output
                stderr=None,
            )

    except subprocess.CalledProcessError as e:
        logger.error(f'Image build failed with exit code {e.returncode}')
        if e.output:
            logger.error(f'Command output:\n{e.output}')
        elif self.rolling_logger.is_enabled() and self.rolling_logger.all_lines:
            logger.error(f'Docker build output:\n{self.rolling_logger.all_lines}')
        raise

    except subprocess.TimeoutExpired:
        logger.error('Image build timed out')
        raise

    except FileNotFoundError as e:
        logger.error(f'Python executable not found: {e}')
        raise

    except PermissionError as e:
        logger.error(
            f'Permission denied when trying to execute the build command:\n{e}'
        )
        raise

    except Exception as e:
        logger.error(f'An unexpected error occurred during the build process: {e}')
        raise

    logger.info(f'Image [{target_image_hash_name}] build finished.')

    if target_image_tag:
        image = self.docker_client.images.get(target_image_hash_name)
        image.tag(target_image_repo, target_image_tag)
        logger.info(
            f'Re-tagged image [{target_image_hash_name}] with more generic tag [{target_image_tag}]'
        )

    # Check if the image is built successfully
    image = self.docker_client.images.get(target_image_hash_name)
    if image is None:
        raise AgentRuntimeBuildError(
            f'Build failed: Image {target_image_hash_name} not found'
        )

    tags_str = (
        f'{target_image_source_tag}, {target_image_tag}'
        if target_image_tag
        else target_image_source_tag
    )
    logger.info(
        f'Image {target_image_repo} with tags [{tags_str}] built successfully'
    )
    return target_image_hash_name
python
def image_exists(self, image_name: str, pull_from_repo: bool = True) -> bool:
    """Check if the image exists in the registry (try to pull it first) or in the local store.

    Args:
        image_name (str): The Docker image to check (<image repo>:<image tag>)
        pull_from_repo (bool): Whether to pull from the remote repo if the image not present locally
    Returns:
        bool: Whether the Docker image exists in the registry or in the local store
    """
    if not image_name:
        logger.error(f'Invalid image name: `{image_name}`')
        return False

    try:
        logger.debug(f'Checking, if image exists locally:\n{image_name}')
        self.docker_client.images.get(image_name)
        logger.debug('Image found locally.')
        return True
    except docker.errors.ImageNotFound:
        if not pull_from_repo:
            logger.debug(
                f'Image {image_name} {colorize("not found", TermColor.WARNING)} locally'
            )
            return False
        try:
            logger.debug(
                'Image not found locally. Trying to pull it, please wait...'
            )

            layers: dict[str, dict[str, str]] = {}
            previous_layer_count = 0

            if ':' in image_name:
                image_repo, image_tag = image_name.split(':', 1)
            else:
                image_repo = image_name
                image_tag = None

            for line in self.docker_client.api.pull(
                image_repo, tag=image_tag, stream=True, decode=True
            ):
                self._output_build_progress(line, layers, previous_layer_count)
                previous_layer_count = len(layers)
            logger.debug('Image pulled')
            return True
        except docker.errors.ImageNotFound:
            logger.debug('Could not find image locally or in registry.')
            return False
        except Exception as e:
            msg = f'Image {colorize("could not be pulled", TermColor.ERROR)}: '
            ex_msg = str(e)
            if 'Not Found' in ex_msg:
                msg += 'image not found in registry.'
            else:
                msg += f'{ex_msg}'
            logger.debug(msg)
            return False

交互代码

总访客数:   ·   总访问量:
PLM's Blog @ 2016 - 2026