Source code for langchain_prolog.runnable

__all__ = ["PrologConfig", "PrologInput", "PrologSolution", "PrologResult", "PrologRunnable"]
from importlib import resources
from pathlib import Path
from typing import (
    Any,
    AsyncIterator,
    Dict,
    Iterator,
    List,
    Literal,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)

import janus_swi as janus
import langchain
from langchain_core.callbacks.manager import (
    BaseCallbackHandler,
    CallbackManager,
    Callbacks,
)
from langchain_core.globals import get_verbose
from langchain_core.runnables import Runnable
from langchain_core.runnables.config import (
    RunnableConfig,
    get_config_list,
)
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    PrivateAttr,
    create_model,
    field_validator,
)

from .exceptions import (
    PrologFileNotFoundError,
    PrologRuntimeError,
    PrologValueError,
)


langchain.verbose = False
langchain.debug = False
langchain.llm_cache = False

PrologInput = Optional[Union[str, Dict[Any, Any], BaseModel]]
PrologSolution = Dict[Any, Any]
PrologResult = Union[Literal[True], Literal[False], List[PrologSolution], PrologRuntimeError]


class _DoNothing(BaseCallbackHandler):
    """
    A callback handler that does nothing.
    It is used when PrologRunnable is used as a tool and not as a chain.
    """

    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
        pass

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        pass

    def on_chain_error(self, error: BaseException, **kwargs: Any) -> None:
        pass


[docs] class PrologConfig(BaseModel): """Configuration for the Prolog interpreter.""" model_config = ConfigDict( arbitrary_types_allowed=True, ) rules_file: Optional[Union[str, Path]] = Field(default=None, description="Path to the Prolog rules file") default_predicate: Optional[str] = Field( default=None, description="Default predicate to use when no predicate is specified", ) query_schema: Optional[Type[BaseModel]] = Field(default=None, description="Pydantic model for input validation") prolog_flags: Dict[str, Any] = Field(default_factory=dict, description="Custom Prolog flags to set")
[docs] @field_validator("rules_file", mode="before") def validate_rules_file(cls, v: Optional[Union[str, Path]]) -> Optional[Path]: if v is None: return None path = Path(v) if not path.exists(): raise PrologFileNotFoundError(f"Prolog rules file not found: {path}") return path
[docs] @field_validator("default_predicate", mode="before") def validate_default_predicate(cls, v: Optional[str]) -> Optional[str]: if v is not None: if not v.strip(): raise PrologValueError("Default predicate cannot be empty") if not v.isidentifier(): raise PrologValueError("Default predicate must be a valid Prolog identifier") return v
[docs] @field_validator("prolog_flags", mode="before") def validate_prolog_flags(cls, v: Dict[str, Any]) -> Dict[str, Any]: valid_flags = set() flags_iter = janus.query("current_prolog_flag(Flag, Value).") while flags_iter: try: flag = flags_iter.next() if not flag: break valid_flags.add(flag["Flag"]) except Exception: pass invalid_flags = set(v.keys()) - valid_flags if invalid_flags: raise PrologValueError(f"Invalid Prolog flags: {invalid_flags}") return v
[docs] class PrologRunnable(Runnable[PrologInput, PrologResult]): """A runnable that executes Prolog queries.""" model_config = ConfigDict(arbitrary_types_allowed=True) _prolog_config: PrologConfig = PrivateAttr()
[docs] def __init__( self, prolog_config: Optional[Union[PrologConfig, Dict]] = None, *, verbose: bool = False, callbacks: Optional[Callbacks] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, name: Optional[str] = None, **kwargs: Any, ) -> None: """ Initialize PrologRunnable with configuration. Args: prolog_config (PologConfig | dict, optional): Configuration for the Prolog interpreter verbose (bool): Whether to enable verbose output. Defaults to False. callbacks (Callbacks, optional): Callback functions. tags (list[str], optional): Optional list of tags. metadata (dict, optional): Optional metadata dictionary. name (str, optional): The name of the Runnable. Used for debugging and tracing. **kwargs: Additional arguments passed to the Runnable class constructor Raises: PrologValueError: If the configuration is invalid PrologFileNotFoundError: If the rules file doesn't exist PrologRuntimeError: If rules can't be loaded """ # Convert dict to PrologConfig if necessary if isinstance(prolog_config, dict): prolog_config = PrologConfig(**prolog_config) elif prolog_config is None: prolog_config = PrologConfig() self._prolog_config = prolog_config.model_copy() # Set Prolog flags for flag, value in prolog_config.prolog_flags.items(): try: janus.query_once(f"set_prolog_flag({flag}, {value}).") except Exception as e: raise PrologRuntimeError(f"Error setting Prolog flag {flag} to {value}: {str(e)}") # Load the consult_ex module try: with resources.files("langchain_prolog").joinpath("consult_ex.pl").open("r") as f: consult_ex_path = Path(f.name) janus.consult(str(consult_ex_path)) except Exception as e: raise PrologRuntimeError(f"Error loading Prolog consult_ex module: {str(e)}") # Load rules file if specified if prolog_config.rules_file: self.load_rules(prolog_config.rules_file) self.verbose = verbose or get_verbose() self.callbacks = callbacks self.tags = tags or [] self.tags.append("prolog") self.metadata = metadata or {} self.metadata["prolog_config"] = prolog_config.model_dump() self.name = name or self.__class__.__name__ Runnable.__init__(self, **kwargs)
@property def prolog_config(self) -> PrologConfig: """Get the Prolog configuration.""" return self._prolog_config
[docs] def load_rules(self, rules_file: Union[str, Path]) -> None: """ Load Prolog rules from a file. Args: rules_file (str | Path): Path to the Prolog rules file Raises: PrologFileNotFoundError: If rules_file doesn't exist PrologRuntimeError: If rules can't be loaded or have gramatical errors """ rules_path = Path(rules_file) if not rules_path.exists(): raise PrologFileNotFoundError(f"Prolog rules file not found: {rules_file}") try: # Convert path to Prolog atom string (with single quotes) prolog_path = f"'{str(rules_path)}'" janus.query_once(f"consult_ex({prolog_path})") except Exception as e: raise PrologRuntimeError(f"Error loading Prolog rules: {str(e)}")
[docs] @classmethod def create_schema(cls, predicate_name: str, arg_names: List[str]) -> Type[BaseModel]: """ Create a Pydantic model for a Prolog predicate. Args: predicate_name (str): Name (functor) of the Prolog predicate arg_names (list[str]): List of argument names in order Returns: BaseModel: Pydantic model class for the predicate Raises: PrologValueError: If predicate_name is empty or arg_names contains invalid names """ if not predicate_name or not isinstance(predicate_name, str) or not predicate_name.isidentifier(): raise PrologValueError("predicate_name must be valid Python identifiers") if not all(isinstance(name, str) and name.isidentifier() for name in arg_names): raise PrologValueError("All argument names must be valid Python identifiers") fields: Dict[str, Any] = {name: (Optional[Any], ...) for name in arg_names} return create_model(f"{predicate_name}", __config__=ConfigDict(), **fields)
def _process_input(self, input_data: PrologInput) -> Union[str, BaseModel]: """ Process and validate input data. Args: input_data (str | dict | BaseModel): Input data to process Returns: str | BaseModel: Processed input ready for query building Raises: PrologValueError: If input type is invalid or schema is missing """ if input_data is None: if not self._prolog_config.default_predicate: raise PrologValueError("Input data cannot be None if no default predicate set") return self._prolog_config.default_predicate + "()" if isinstance(input_data, (str, BaseModel)): return input_data if isinstance(input_data, dict) and self._prolog_config.query_schema: return self._prolog_config.query_schema(**input_data) raise PrologValueError("Invalid input type or missing schema for dictionary input") def _build_query(self, input_data: Union[str, BaseModel]) -> str: """ Build a Prolog query from input data. Args: input_data (str | BaseModel): String query or Pydantic model instance Returns: str: Prolog query string Raises: PrologValueError: If input_data is of invalid type or invalid query format """ if isinstance(input_data, str): if not input_data.strip() and not self._prolog_config.default_predicate: raise PrologValueError("Prolog query string cannot be empty") # Check if the input contains parentheses if "(" in input_data: # Validate parentheses matching if input_data.count("(") != input_data.count(")"): raise PrologValueError("Mismatched parentheses in query") return input_data # Use default predicate if no explicit predicate is provided if not self._prolog_config.default_predicate: raise PrologValueError(f"No default predicate set for argument-only query: {input_data}") return f"{self._prolog_config.default_predicate}({input_data})" if isinstance(input_data, BaseModel): # Convert Pydantic model to Prolog query. None values are converted to Prolog variables params = [] for field_name, field_value in input_data: value = field_value if field_value is not None else field_name.capitalize() params.append(str(value)) return f"{input_data.__class__.__name__}({', '.join(params)})" raise PrologValueError("Input must be either string or valid Pydantic model") @staticmethod def _get_prolog_kwargs(config: RunnableConfig) -> Dict[str, str]: return config.get("configurable", {}).get("prolog_kwargs", {}) def _execute_query(self, query: str, **kwargs: Any) -> Iterator[Dict[Any, Any]]: """ Execute a Prolog query safely. Args: query (str): Prolog query string **kwargs: Additional arguments for query execution. Supported keyword arguments: inputs, truth_vals Returns: Iterator of query solutions Raises: PrologValueError: If unsupported query keyword arguments are provided PrologRuntimeError: If query execution fails """ valid_kwargs = {"inputs", "truth_vals"} invalid_kwargs = set(kwargs.keys()) - valid_kwargs if invalid_kwargs: raise PrologValueError(f"Unsupported query arguments: {invalid_kwargs}") try: return janus.query(query, **kwargs) except Exception as e: raise PrologRuntimeError(f"Prolog execution error: {str(e)}") @staticmethod def _clean_solution(solution: Dict[Any, Any]) -> PrologSolution: """Clean a single solution dictionary.""" return {k: v for k, v in solution.items() if k != "truth"}
[docs] def invoke(self, input: PrologInput, config: Optional[RunnableConfig] = None, **kwargs: Any) -> PrologResult: """ Execute a Prolog query and return all solutions at once. Args: input (str | dict | BaseModel): Prolog query config (RunnableConfig, optional): Optional langchain runnable configuration **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Returns: Prolog query result. True, False or a list of dictionaries Raises: PrologValueError: If input type is invalid or schema is missing for a dictionay input PrologRuntimeError: If query execution fails """ config = config or {} if self.__class__.__name__ == "PrologRunnable": callback_manager = CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), local_callbacks=self.callbacks, verbose=config.get("verbose", self.verbose), # type: ignore inheritable_tags=config.get("tags"), local_tags=self.tags, inheritable_metadata=config.get("metadata"), local_metadata=self.metadata, ) run_manager = callback_manager.on_chain_start( {"name": config.get("run_name") or self.get_name()}, {"input": input}, **kwargs, ) else: # PrologRunnable is being used as a tool run_manager = _DoNothing() # type: ignore try: result: PrologResult processed_input = self._process_input(input) query = self._build_query(processed_input) prolog_kwargs = self._get_prolog_kwargs(config) solutions = list(self._execute_query(query, **prolog_kwargs)) if len(solutions) == 0 or all(sol == {"truth": False} for sol in solutions): result = False elif all(sol == {"truth": True} for sol in solutions): result = True else: result = [self._clean_solution(sol) for sol in solutions] run_manager.on_chain_end({"output": result}) return result except (PrologValueError, PrologRuntimeError) as e: run_manager.on_chain_error(e) raise except Exception as e: run_manager.on_chain_error(e) raise PrologRuntimeError(f"Prolog execution error: {str(e)}")
[docs] async def ainvoke( self, input: PrologInput, config: Optional[RunnableConfig] = None, **kwargs: Any, ) -> PrologResult: """ Execute a Prolog query asynchronously and return all the solutions at once. Args: input (str, dict, BaseModel): Prolog query config (RunnableConfig, optional): langchain runnable configuration **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Returns: PrologResult: Query result. True, False or a list of dictionaries Raises: PrologValueError: If input type is invalid or schema is missing for a dictionay input PrologRuntimeError: If query execution fails """ return self.invoke(input, config=config, **kwargs)
[docs] def stream( # noqa: R701 self, input: PrologInput, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> Iterator[PrologResult]: """ Execute a Prolog query and yield solutions one by one. Args: input (str, dict, BaseModel): Prolog query config (RunnableConfig, optional): langchain runnable configuration **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Yields: PrologResult: Query result. True, False or a list of dictionaries Raises: PrologValueError: If input type is invalid or schema is missing for dictionary input PrologRuntimeError: If query execution fails """ config = config or {} if self.__class__.__name__ == "PrologRunnable": callback_manager = CallbackManager.configure( inheritable_callbacks=config.get("callbacks"), local_callbacks=self.callbacks, verbose=config.get("verbose", self.verbose), # type: ignore inheritable_tags=config.get("tags"), local_tags=self.tags, inheritable_metadata=config.get("metadata"), local_metadata=self.metadata, ) run_manager = callback_manager.on_chain_start( {"name": config.get("run_name") or self.get_name()}, {"input": input}, **kwargs, ) else: # PrologRunnable is being used as a tool run_manager = _DoNothing() # type: ignore try: processed_input = self._process_input(input) query = self._build_query(processed_input) prolog_kwargs = self._get_prolog_kwargs(config) # Create an iterator for the solutions solutions_iter = self._execute_query(query, **prolog_kwargs) # Try to get the first solution try: first_solution = next(solutions_iter, None) except StopIteration: run_manager.on_chain_end({"output": False}) yield False return # If there's no first solution, yield False if first_solution is None: run_manager.on_chain_end({"output": False}) yield False return # If all the solutions have only the same truth value, yield True or False if first_solution == {"truth": True} or first_solution == {"truth": False}: # Store solutions to check if they're all the same solutions = [first_solution] for solution in solutions_iter: solutions.append(solution) if all(sol == {"truth": False} for sol in solutions): run_manager.on_chain_end({"output": False}) yield False return if all(sol == {"truth": True} for sol in solutions): run_manager.on_chain_end({"output": True}) yield True return # Yield solutions one by one all_results = [] for solution in solutions: result = [self._clean_solution(solution)] all_results.append(result) yield result else: # Yield solutions one by one first_result = [self._clean_solution(first_solution)] all_results = [first_result] yield first_result for solution in solutions_iter: result = [self._clean_solution(solution)] all_results.append(result) yield result run_manager.on_chain_end({"output": all_results}) except (PrologValueError, PrologRuntimeError) as e: run_manager.on_chain_error(e) raise except Exception as e: run_manager.on_chain_error(e) raise PrologRuntimeError(f"Prolog execution error: {str(e)}")
[docs] async def astream( self, input: PrologInput, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> AsyncIterator[PrologResult]: """ Execute Prolog query asynchronously and yield results one by one. Args: input (str, dict, BaseModel): Prolog query config (RunnableConfig, optional): langchain runnable configuration **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Yields: PrologResult: Query result. True, False or a list of dictionaries Raises: PrologValueError: If input type is invalid or schema is missing PrologRuntimeError: If query execution fails """ for result in self.stream(input, config=config, **kwargs): yield result
[docs] def batch( self, inputs: List[PrologInput], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any], ) -> List[PrologResult]: """ Execute multiple Prolog queries and return all solutions. Args: inputs (list[str | dict | BaseModel]): List of Prolog queries config (RunnableConfig | list[RunnableConfig], optional): Optional langchain runnable configuration(s) return_exceptions (bool): If True, include exceptions as PrologRuntimeError in the results. Defaults to False. **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Returns: A list of Prolog queries results (True, False or a list of dictionaries). If return_exceptions is True, exception trowing queries return PrologRuntimeError as the result Raises: PrologValueError: If inputs is not a list PrologRuntimeError: If execution fails and return_exceptions is False Todo: Implement batch execution using Prolog built-in concurrency """ if not isinstance(inputs, list): raise PrologValueError("PrologRunnable batch inputs must be a list") configs = get_config_list(config, len(inputs)) results: List[PrologResult] = [] for input_item, config_item in zip(inputs, configs): try: result = self.invoke(input_item, config=config_item, **kwargs) results.append(result) except Exception as e: if return_exceptions: results.append(PrologRuntimeError(str(e))) else: raise PrologRuntimeError(f"Prolog batch execution error: {str(e)}") return results
[docs] async def abatch( self, inputs: List[PrologInput], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any], ) -> List[PrologResult]: """ Execute multiple Prolog queries asynchronously and return all solutions. Args: inputs (list[str | dict | BaseModel]): List of Prolog queries config (list[RunnableConfig], optional): Optional langchain runnable configuration(s) return_exceptions (bool): If True, include exceptions as PrologRuntimeError in the results. Defaults to False. **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Returns: List of Prolog queries results (True, False or list of dictionaries). If return_exceptions is True, exception trowing queries return PrologRuntimeError as a result Raises: PrologValueError: If inputs is not a list PrologRuntimeError: If execution fails and return_exceptions is False """ return self.batch(inputs, config=config, return_exceptions=return_exceptions, **kwargs)
[docs] def batch_as_completed( self, inputs: Sequence[PrologInput], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any], ) -> Iterator[Tuple[int, PrologResult]]: """ Execute multiple Prolog queries and yield results one by one. Args: inputs (sequence[str | dict | BaseModel]): Sequence of Prolog queries config (RunnableConfig | sequence[RunnableConfig], optional): Optional configuration(s) for the runnable return_exceptions: If True, include exceptions as PrologRuntimeError in results **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Yields: Tuples with enumerations of Prolog Query solutions. If return_exceptions is True, exception trowing queries yield PrologRuntimeError as the result Raises: PrologValueError: If inputs is not a list PrologRuntimeError: If execution fails and return_exceptions is False """ if not isinstance(inputs, list): raise PrologValueError("PrologRunnable batch_as_completed inputs must be a list") configs = get_config_list(config, len(inputs)) for i, (input_item, config_item) in enumerate(zip(inputs, configs)): try: result = self.invoke(input_item, config=config_item, **kwargs) yield (i, result) except Exception as e: if return_exceptions: yield (i, PrologRuntimeError(str(e))) else: raise PrologRuntimeError(f"Prolog batch_as_completed execution error: {str(e)}")
[docs] async def abatch_as_completed( self, inputs: Sequence[PrologInput], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any], ) -> AsyncIterator[Tuple[int, PrologResult]]: """ Execute multiple Prolog queries asynchronously and yield results one by one. Args: inputs (sequence[str | dict | BaseModel]): Sequence of Prolog queries config (RunnableConfig | sequence[RunnableConfig], optional): Optional configuration(s) for the runnable return_exceptions: If True, include exceptions as PrologRuntimeError in results **kwargs: Additional arguments for Prolog query execution. Supported keyword arguments by janus-swi: inputs, truth_vals Yields: Tuple with enumerations of Prolog Query solutions. If return_exceptions is True, exception trowing queries yield PrologRuntimeError as the result Raises: PrologValueError: If inputs is not a list PrologRuntimeError: If execution fails and return_exceptions is False """ for result in self.batch_as_completed(inputs, config=config, return_exceptions=return_exceptions, **kwargs): yield result