Skip to content

Lean Processor

lean_automator.lean.processor

Orchestrates Lean code generation and verification for Knowledge Base items.

This module manages the process of generating formal Lean 4 code (both statement signatures and proofs) for mathematical items (KBItem) stored in the knowledge base. It interacts with an LLM client (GeminiClient) via the llm_interface module to generate the code based on LaTeX statements, informal proofs, and dependency context.

The core logic, orchestrated by generate_and_verify_lean, involves:

  1. Checking prerequisites and dependencies (_check_prerequisites_and_dependencies).

  2. Generating a Lean statement signature (_generate_statement_shell).

  3. Generating and verifying Lean proof tactics (_generate_and_verify_proof).

  4. Updating the KBItem status and content throughout the process.

  5. Providing batch processing capabilities (process_pending_lean_items).

Classes

Functions

generate_and_verify_lean(unique_name: str, client: GeminiClient, db_path: Optional[str] = None, lake_executable_path: str = 'lake', timeout_seconds: int = 120) -> bool async

Orchestrates the full Lean processing pipeline for a single KBItem.

Calls helper functions to check prerequisites, generate statement, and generate/verify proof as needed.

Parameters:

Name Type Description Default
unique_name str

The unique name of the KBItem to process.

required
client GeminiClient

An initialized LLM client instance.

required
db_path Optional[str]

Path to the knowledge base database file. Uses DEFAULT_DB_PATH if None.

None
lake_executable_path str

Path to the lake executable.

'lake'
timeout_seconds int

Timeout for lake build commands.

120

Returns:

Type Description
bool

True if the item is successfully processed (ends in PROVEN status),

bool

False otherwise.

Source code in lean_automator/lean/processor.py
async def generate_and_verify_lean(
    unique_name: str,
    client: GeminiClient,
    db_path: Optional[str] = None,
    lake_executable_path: str = "lake",
    timeout_seconds: int = 120,
) -> bool:
    """Orchestrates the full Lean processing pipeline for a single KBItem.

    Calls helper functions to check prerequisites, generate statement, and
    generate/verify proof as needed.

    Args:
        unique_name: The unique name of the KBItem to process.
        client: An initialized LLM client instance.
        db_path: Path to the knowledge base database file. Uses
            `DEFAULT_DB_PATH` if None.
        lake_executable_path: Path to the `lake` executable.
        timeout_seconds: Timeout for `lake build` commands.

    Returns:
        True if the item is successfully processed (ends in PROVEN status),
        False otherwise.
    """
    start_time = time.time()
    effective_db_path = db_path or DEFAULT_DB_PATH
    overall_success = False

    # --- Basic readiness checks ---
    if not client or isinstance(client, type("DummyGeminiClient", (object,), {})):
        logger.error(
            f"GeminiClient missing or invalid. Cannot process Lean for {unique_name}."
        )
        return False
    if not all(
        [
            KBItem,
            ItemStatus,
            ItemType,
            get_kb_item_by_name,
            save_kb_item,
            lean_interaction,
            lean_proof_repair,
            llm_interface,
        ]
    ):
        logger.critical(
            f"Core modules/types missing for {unique_name}. Cannot process."
        )
        return False

    try:
        # --- Step 1: Check prerequisites and dependencies ---
        item, dependency_items, error_msg = _check_prerequisites_and_dependencies(
            unique_name, effective_db_path
        )

        if error_msg:
            logger.warning(f"Prerequisite check failed for {unique_name}: {error_msg}")
            # Save ERROR status only if item exists and status indicates an error
            # occurred during processing
            if item and hasattr(item, "update_status") and hasattr(ItemStatus, "ERROR"):
                # Avoid overwriting PROVEN status if that was the reason
                # for the 'error' message
                if getattr(item, "status", None) != ItemStatus.PROVEN:
                    # Check if the error requires setting ERROR status
                    if (
                        "missing required latex_statement" in error_msg
                        or "not ready" in error_msg
                        or "not accessible" in error_msg
                    ):
                        item.update_status(ItemStatus.ERROR, error_msg)
                        await save_kb_item(item, client=None, db_path=effective_db_path)
            return False  # Prerequisite failed

        if not item:  # Should not happen if error_msg is None, but check defensively
            logger.error(
                f"Prerequisite check returned no item and no error for {unique_name}."
            )
            return False

        # If item is already proven, the check function handles logging,
        # we just exit successfully.
        if (
            hasattr(ItemStatus, "PROVEN")
            and getattr(item, "status", None) == ItemStatus.PROVEN
        ):
            return True

        # --- Step 2: Determine if statement generation is needed ---
        needs_statement_generation = not getattr(item, "lean_code", None) or getattr(
            item, "status", None
        ) in {ItemStatus.LATEX_ACCEPTED, ItemStatus.PENDING_LEAN}
        statement_shell = None

        if needs_statement_generation:
            statement_shell = await _generate_statement_shell(
                item, dependency_items, client, effective_db_path
            )
            if not statement_shell:
                logger.error(f"Failed to generate statement shell for {unique_name}.")
                # Status already set to ERROR inside helper function
                return False
            # Re-fetch item state after generation as it was modified
            item = get_kb_item_by_name(unique_name, effective_db_path)
            if not item:
                logger.critical(
                    f"Item {unique_name} vanished after statement generation!"
                )
                return False  # Or raise
        else:
            logger.info(
                f"Skipping Lean statement generation for {unique_name}, "
                f"using existing lean_code."
            )
            statement_shell = getattr(item, "lean_code", None)
            # Validate existing shell
            if not statement_shell or ":= sorry" not in statement_shell.strip():
                error_msg = (
                    f"Existing lean_code for {unique_name} is invalid "
                    f"(missing ':= sorry')."
                )
                logger.error(error_msg)
                if hasattr(item, "update_status") and hasattr(ItemStatus, "ERROR"):
                    item.update_status(ItemStatus.ERROR, error_msg)
                    await save_kb_item(item, client=None, db_path=effective_db_path)
                return False

        # --- Step 3: Handle non-provable items ---
        item_type = getattr(item, "item_type", None)
        proof_required = getattr(item_type, "requires_proof", lambda: True)()

        if not proof_required:
            logger.info(
                f"Lean Proc: Statement generated/present for non-provable "
                f"{unique_name}. Marking PROVEN."
            )
            if hasattr(item, "update_status") and hasattr(ItemStatus, "PROVEN"):
                # Only update if not already proven
                if getattr(item, "status", None) != ItemStatus.PROVEN:
                    item.update_status(ItemStatus.PROVEN)
                    await save_kb_item(item, client=None, db_path=effective_db_path)
                overall_success = True  # Mark as success
            else:
                logger.error(
                    f"Failed to mark non-provable item {unique_name} as PROVEN."
                )
                overall_success = False  # Should not happen if checks passed
        else:
            # --- Step 4: Generate and verify proof ---
            proof_success = await _generate_and_verify_proof(
                item,
                statement_shell,
                dependency_items,
                client,
                effective_db_path,
                lake_executable_path,
                timeout_seconds,
            )
            overall_success = proof_success

    except Exception as e:
        logger.exception(
            f"Unhandled exception during generate_and_verify_lean for "
            f"{unique_name}: {e}"
        )
        # Attempt to mark item as error if possible
        try:
            item_err = get_kb_item_by_name(unique_name, effective_db_path)
            if (
                item_err
                and hasattr(item_err, "update_status")
                and hasattr(ItemStatus, "ERROR")
            ):
                if (
                    getattr(item_err, "status", None) != ItemStatus.PROVEN
                ):  # Avoid overwriting success
                    item_err.update_status(
                        ItemStatus.ERROR, f"Unhandled exception: {str(e)[:100]}"
                    )
                    await save_kb_item(item_err, client=None, db_path=effective_db_path)
        except Exception as save_err:
            logger.error(
                f"Failed to save ERROR status after unhandled exception for "
                f"{unique_name}: {save_err}"
            )
        overall_success = False  # Ensure failure on exception

    # --- Final Logging ---
    end_time = time.time()
    duration = end_time - start_time
    if overall_success:
        logger.info(
            f"Lean processing SUCCEEDED for {unique_name} in {duration:.2f} seconds."
        )
    else:
        # Fetch final state for accurate logging
        final_item = get_kb_item_by_name(unique_name, effective_db_path)
        final_status_name = "UNKNOWN"
        if final_item:
            final_status = getattr(final_item, "status", None)
            final_status_name = getattr(final_status, "name", "UNKNOWN")
            # Check again if it somehow ended as PROVEN despite failure path
            if hasattr(ItemStatus, "PROVEN") and final_status == ItemStatus.PROVEN:
                logger.warning(
                    f"Item {unique_name} ended as PROVEN despite processing path "
                    f"indicating failure. Assuming success."
                )
                overall_success = True  # Correct the outcome based on final state
                logger.info(
                    f"Lean processing SUCCEEDED (final status) for {unique_name} "
                    f"in {duration:.2f} seconds."
                )
                return True  # Return success

        logger.error(
            f"Lean processing FAILED for {unique_name}. "
            f"Final Status: {final_status_name}. Total time: {duration:.2f} seconds."
        )

    return overall_success

process_pending_lean_items(client: GeminiClient, db_path: Optional[str] = None, limit: Optional[int] = None, process_statuses: Optional[List[ItemStatus]] = None, **kwargs) async

Processes multiple items requiring Lean code generation and verification.

Queries the database for items in specified statuses (defaulting to LATEX_ACCEPTED, PENDING_LEAN, LEAN_VALIDATION_FAILED). It then iterates through eligible items (checking status again before processing), calls generate_and_verify_lean for each one up to an optional limit, and logs summary statistics.

Parameters:

Name Type Description Default
client GeminiClient

An initialized LLM client instance.

required
db_path Optional[str]

Path to the database file. If None, uses DEFAULT_DB_PATH.

None
limit Optional[int]

Max number of items to process in this batch.

None
process_statuses Optional[List[ItemStatus]]

List of statuses to query for processing. Defaults to LATEX_ACCEPTED, PENDING_LEAN, LEAN_VALIDATION_FAILED.

None
**kwargs

Additional keyword arguments passed to generate_and_verify_lean.

{}
Source code in lean_automator/lean/processor.py
async def process_pending_lean_items(
    client: GeminiClient,
    db_path: Optional[str] = None,
    limit: Optional[int] = None,
    process_statuses: Optional[List[ItemStatus]] = None,
    **kwargs,  # Pass other args like lake path, timeout to generate_and_verify_lean
):
    """Processes multiple items requiring Lean code generation and verification.

    Queries the database for items in specified statuses (defaulting to
    LATEX_ACCEPTED, PENDING_LEAN, LEAN_VALIDATION_FAILED). It then iterates
    through eligible items (checking status again before processing), calls
    `generate_and_verify_lean` for each one up to an optional limit, and logs
    summary statistics.

    Args:
        client: An initialized LLM client instance.
        db_path: Path to the database file. If None, uses `DEFAULT_DB_PATH`.
        limit: Max number of items to process in this batch.
        process_statuses: List of statuses to query for processing. Defaults
            to LATEX_ACCEPTED, PENDING_LEAN, LEAN_VALIDATION_FAILED.
        **kwargs: Additional keyword arguments passed to `generate_and_verify_lean`.
    """
    # Basic dependency checks
    if not all(
        [
            KBItem,
            ItemStatus,
            ItemType,
            get_items_by_status,
            get_kb_item_by_name,
            save_kb_item,
        ]
    ):
        logger.critical(
            "Required KB modules/functions not fully loaded. "
            "Cannot batch process Lean items."
        )
        return
    if not client or isinstance(client, type("DummyGeminiClient", (object,), {})):
        logger.error("GeminiClient missing or invalid. Cannot batch process.")
        return

    effective_db_path = db_path or DEFAULT_DB_PATH
    # Determine target statuses
    try:
        default_statuses = {
            ItemStatus.PENDING_LEAN,
            ItemStatus.LATEX_ACCEPTED,
            ItemStatus.LEAN_VALIDATION_FAILED,
        }
        if process_statuses is None:
            process_statuses_set = default_statuses
        else:
            valid_statuses = {s for s in process_statuses if isinstance(s, ItemStatus)}
            if len(valid_statuses) != len(process_statuses):
                logger.warning(
                    "Invalid status types provided to process_pending_lean_items."
                )
            process_statuses_set = (
                valid_statuses if valid_statuses else default_statuses
            )
            if not valid_statuses:
                logger.warning("No valid process_statuses provided, using defaults.")
    except AttributeError:
        logger.critical(
            "Cannot determine batch processing statuses: "
            "ItemStatus members not accessible."
        )
        return

    processed_count = 0
    success_count = 0
    fail_count = 0
    items_to_process_names = []

    status_names = [getattr(s, "name", "UNKNOWN") for s in process_statuses_set]
    logger.info(
        f"Starting Lean batch processing. Querying for statuses: {status_names}."
    )

    # --- Collect eligible items ---
    for status in process_statuses_set:
        if limit is not None and len(items_to_process_names) >= limit:
            break
        try:
            if not callable(get_items_by_status):
                continue  # Skip if function not loaded

            items_gen = get_items_by_status(status, effective_db_path)
            count_for_status = 0
            for item in items_gen:
                if limit is not None and len(items_to_process_names) >= limit:
                    break
                item_unique_name = getattr(item, "unique_name", None)
                if not item_unique_name:
                    continue

                # Add item if not already collected
                if item_unique_name not in items_to_process_names:
                    # Check if non-provable item with code can be marked PROVEN directly
                    item_type = getattr(item, "item_type", None)
                    item_lean_code = getattr(item, "lean_code", None)
                    needs_proof = getattr(item_type, "requires_proof", lambda: True)()
                    current_item_status = getattr(
                        item, "status", None
                    )  # Should be same as loop 'status'

                    if (
                        not needs_proof
                        and item_lean_code
                        and current_item_status == ItemStatus.LATEX_ACCEPTED
                    ):
                        logger.info(
                            f"Pre-marking non-provable item {item_unique_name} "
                            f"with code as PROVEN."
                        )
                        if hasattr(item, "update_status") and hasattr(
                            ItemStatus, "PROVEN"
                        ):
                            item.update_status(ItemStatus.PROVEN)
                            await save_kb_item(
                                item, client=None, db_path=effective_db_path
                            )
                        continue  # Skip adding to main processing list

                    # Otherwise, add to list for processing
                    items_to_process_names.append(item_unique_name)
                    count_for_status += 1

            logger.debug(
                f"Found {count_for_status} potential items with status "
                f"{getattr(status, 'name', 'UNKNOWN')}"
            )

        except Exception as e:
            logger.error(
                f"Failed to retrieve items with status "
                f"{getattr(status, 'name', 'UNKNOWN')}: {e}"
            )

    if not items_to_process_names:
        logger.info("No eligible items found requiring Lean processing.")
        return

    logger.info(
        f"Collected {len(items_to_process_names)} unique items for Lean processing."
    )

    # --- Process collected items ---
    for unique_name in items_to_process_names:
        # Re-fetch item state before processing to double-check eligibility
        try:
            current_item_state = get_kb_item_by_name(unique_name, effective_db_path)
            if not current_item_state:
                logger.warning(
                    f"Skipping {unique_name}: Item disappeared before processing."
                )
                continue

            item_status = getattr(current_item_state, "status", None)
            # Check if status is still one of the target statuses
            if item_status not in process_statuses_set:
                status_name = getattr(item_status, "name", "None")
                logger.info(
                    f"Skipping {unique_name}: Status ({status_name}) changed, "
                    f"no longer eligible."
                )
                continue

        except Exception as fetch_err:
            logger.error(
                f"Error re-fetching state for {unique_name}: {fetch_err}. Skipping."
            )
            continue

        # Proceed with processing the eligible item
        item_id = getattr(current_item_state, "id", "N/A")
        status_name = getattr(item_status, "name", "None")
        logger.info(
            f"--- Processing Lean for: {unique_name} (ID: {item_id}, "
            f"Status: {status_name}) ---"
        )
        processed_count += 1
        try:
            # Call the main orchestrator function
            success = await generate_and_verify_lean(
                unique_name, client, effective_db_path, **kwargs
            )
            if success:
                success_count += 1
            else:
                fail_count += 1
            # Detailed outcome logged within generate_and_verify_lean
        except Exception as e:
            # Catch unexpected errors from the orchestrator
            logger.exception(
                f"Critical error during batch processing of {unique_name}: {e}"
            )
            fail_count += 1
            # Attempt to mark item as ERROR in DB
            try:
                err_item = get_kb_item_by_name(unique_name, effective_db_path)
                if (
                    err_item
                    and hasattr(err_item, "update_status")
                    and hasattr(ItemStatus, "ERROR")
                ):
                    current_err_status = getattr(err_item, "status", None)
                    is_proven = (
                        hasattr(ItemStatus, "PROVEN")
                        and current_err_status == ItemStatus.PROVEN
                    )
                    if not is_proven:  # Don't overwrite success
                        err_item.update_status(
                            ItemStatus.ERROR,
                            f"Batch processing crashed: {str(e)[:100]}",
                        )
                        await save_kb_item(
                            err_item, client=None, db_path=effective_db_path
                        )
            except Exception as save_err:
                logger.error(
                    f"Failed to save ERROR status for {unique_name} after batch crash: "
                    f"{save_err}"
                )

        logger.info(f"--- Finished processing Lean for {unique_name} ---")

    logger.info(
        f"Lean Batch Processing Complete. Total Processed: {processed_count}, "
        f"Succeeded: {success_count}, Failed: {fail_count}"
    )