classConversionManager:"""Manages document conversion using configured providers. In order to not make things super complex, all Converters will be implemented as sync. The manager will handle async I/O and thread pooling. """def__init__(self,config:ConversionConfig|list[DocumentConverter]):ifisinstance(config,list):self.config=ConversionConfig()self._converters=configelse:self.config=configself._converters=self._setup_converters()self._executor=ThreadPoolExecutor(max_workers=3)def__del__(self):self._executor.shutdown(wait=False)defsupports_file(self,path:JoinablePathLike)->bool:"""Check if any converter supports the file."""returnany(c.supports_file(path)forcinself._converters)defsupports_content(self,content:Any,mime_type:str|None=None)->bool:"""Check if any converter supports the file."""returnany(c.supports_content(content,mime_type)forcinself._converters)def_setup_converters(self)->list[DocumentConverter]:"""Create converter instances from config."""fromllmling_agent_converters.plain_converterimportPlainConverterconverters=[i.get_converter()foriinself.config.providersor[]ifi.enabled]# Always add PlainConverter as fallback# if it gets configured by user, that one gets preference.converters.append(PlainConverter())returnconvertersasyncdefconvert_file(self,path:JoinablePathLike)->str:"""Convert file using first supporting converter."""loop=asyncio.get_running_loop()content=awaitread_path(path,"rb")forconverterinself._converters:# Run support check in thread poolsupports=awaitloop.run_in_executor(self._executor,converter.supports_file,path)ifnotsupports:continue# Run conversion in thread poolimportmimetypestyp=mimetypes.guess_type(str(path))[0]returnawaitloop.run_in_executor(self._executor,converter.convert_content,content,typ,)returnstr(content)asyncdefconvert_content(self,content:Any,mime_type:str|None=None)->str:"""Convert content using first supporting converter."""loop=asyncio.get_running_loop()forconverterinself._converters:# Run support check in thread poolsupports=awaitloop.run_in_executor(self._executor,converter.supports_content,content,mime_type)ifnotsupports:continue# Run conversion in thread poolreturnawaitloop.run_in_executor(self._executor,converter.convert_content,content,mime_type)returnstr(content)# Fallback for unsupported content
asyncdefconvert_content(self,content:Any,mime_type:str|None=None)->str:"""Convert content using first supporting converter."""loop=asyncio.get_running_loop()forconverterinself._converters:# Run support check in thread poolsupports=awaitloop.run_in_executor(self._executor,converter.supports_content,content,mime_type)ifnotsupports:continue# Run conversion in thread poolreturnawaitloop.run_in_executor(self._executor,converter.convert_content,content,mime_type)returnstr(content)# Fallback for unsupported content
Source code in src/llmling_agent/prompts/conversion_manager.py
5960616263646566676869707172737475767778798081
asyncdefconvert_file(self,path:JoinablePathLike)->str:"""Convert file using first supporting converter."""loop=asyncio.get_running_loop()content=awaitread_path(path,"rb")forconverterinself._converters:# Run support check in thread poolsupports=awaitloop.run_in_executor(self._executor,converter.supports_file,path)ifnotsupports:continue# Run conversion in thread poolimportmimetypestyp=mimetypes.guess_type(str(path))[0]returnawaitloop.run_in_executor(self._executor,converter.convert_content,content,typ,)returnstr(content)
Source code in src/llmling_agent/prompts/conversion_manager.py
454647
defsupports_content(self,content:Any,mime_type:str|None=None)->bool:"""Check if any converter supports the file."""returnany(c.supports_content(content,mime_type)forcinself._converters)
Source code in src/llmling_agent/prompts/conversion_manager.py
414243
defsupports_file(self,path:JoinablePathLike)->bool:"""Check if any converter supports the file."""returnany(c.supports_file(path)forcinself._converters)