Async Operations¶
UPathTools provides comprehensive async/await support for filesystem operations, enabling efficient concurrent I/O and better performance for network-based filesystems.
AsyncUPath¶
AsyncUPath is a wrapper around regular UPath objects that provides async versions of all I/O methods, prefixed with a.
Creating AsyncUPath Objects¶
from upathtools import AsyncUPath
import asyncio
async def main():
# From a URL
path = AsyncUPath("https://example.com/data.json")
# From a local path
local = AsyncUPath("/tmp/file.txt")
# From an existing UPath
from upath import UPath
regular_path = UPath("s3://bucket/key")
async_path = AsyncUPath._from_upath(regular_path)
asyncio.run(main())
File Reading¶
async def read_examples():
path = AsyncUPath("https://example.com/data.json")
# Read as bytes
data: bytes = await path.aread_bytes()
# Read as text
text: str = await path.aread_text(encoding="utf-8")
# Read with custom encoding
content = await path.aread_text(
encoding="latin-1",
errors="ignore"
)
File Writing¶
async def write_examples():
path = AsyncUPath("s3://bucket/output.txt")
# Write bytes
await path.awrite_bytes(b"binary data")
# Write text
await path.awrite_text("Hello, world!", encoding="utf-8")
Path Operations¶
async def path_operations():
path = AsyncUPath("/tmp/example")
# Check existence
exists: bool = await path.aexists()
# Check if file or directory
is_file: bool = await path.ais_file()
is_dir: bool = await path.ais_dir()
# Create directory
await path.amkdir(parents=True, exist_ok=True)
# Touch file
await path.atouch(exist_ok=True)
# Remove file
await path.aunlink(missing_ok=True)
# Remove directory
await path.armdir()
# Get file stats
stat = await path.astat()
print(f"Size: {stat.st_size} bytes")
Directory Iteration¶
async def iterate_directory():
path = AsyncUPath("/tmp")
# Iterate over directory contents
async for item in path.aiterdir():
print(f"Found: {item}")
if await item.ais_file():
content = await item.aread_text()
Globbing¶
async def glob_examples():
path = AsyncUPath("/project")
# Find all Python files
async for py_file in path.aglob("**/*.py"):
print(py_file)
# Recursive glob with pattern
async for match in path.arglob("*.json"):
data = await match.aread_bytes()
File Operations¶
async def file_operations():
source = AsyncUPath("/tmp/source.txt")
target = AsyncUPath("/tmp/target.txt")
# Copy file
copied = await source.acopy(target)
# Move file
moved = await source.amove(target)
# Open file
async with await source.aopen("r") as f:
content = await f.read()
Batch Operations¶
UPathTools provides high-level async functions for efficient batch file operations.
read_folder()¶
Read multiple files concurrently:
from upathtools import read_folder
import asyncio
async def main():
# Read all Python files in parallel
files = await read_folder(
"src/",
pattern="**/*.py",
recursive=True,
load_parallel=True,
chunk_size=50, # Process 50 files at a time
mode="rt",
encoding="utf-8"
)
# files is a dict mapping relative paths to contents
for filepath, content in files.items():
print(f"{filepath}: {len(content)} characters")
asyncio.run(main())
Parameters:
path: Base directory to read frompattern: Glob pattern (default:"**/*")recursive: Search subdirectories (default:True)include_dirs: Include directories in results (default:False)exclude: List of patterns to excludemax_depth: Maximum directory depthmode: Read mode ("rt"for text,"rb"for binary)encoding: Text encoding (default:"utf-8")load_parallel: Load files concurrently (default:True)chunk_size: Files per batch (default:50)
list_files()¶
List files matching a pattern without reading content:
from upathtools import list_files
async def main():
# Get list of Python files
files = await list_files(
"src/",
pattern="**/*.py",
recursive=True,
exclude=["**/test_*.py", "**/__pycache__/**"]
)
# files is a list of UPath objects
for file in files:
print(file)
With detailed info:
async def main():
# Get detailed file information
file_infos = await list_files(
"src/",
pattern="**/*.py",
detail=True
)
# file_infos is a list of info dictionaries
for info in file_infos:
print(f"{info['name']}: {info['size']} bytes")
read_folder_as_text()¶
Combine multiple files into a single text document:
from upathtools import read_folder_as_text
async def main():
# Create a concatenated document
text = await read_folder_as_text(
"docs/",
pattern="**/*.md",
recursive=True,
load_parallel=True
)
# Output format:
# # Content of docs/intro.md
#
# Introduction text...
#
#
# # Content of docs/guide.md
#
# Guide text...
print(text)
read_path()¶
Read a single file asynchronously:
from upathtools import read_path
async def main():
# Read as text
text = await read_path("file.txt", mode="rt", encoding="utf-8")
# Read as bytes
data = await read_path("image.png", mode="rb")
Async Filesystem Access¶
Custom filesystems inheriting from BaseAsyncFileSystem have native async support:
from upathtools.filesystems import HTTPFileSystem
async def main():
fs = HTTPFileSystem()
# Async operations
content = await fs._cat_file("https://example.com/data.json")
files = await fs._ls("https://example.com/", detail=True)
exists = await fs._exists("https://example.com/file.txt")
# Get async UPath
path = fs.get_upath("https://example.com/", as_async=True)
async for item in path.aiterdir():
print(item)
BaseUPath with Async Methods¶
Custom UPath classes can inherit from BaseUPath to get async methods:
from upathtools.filesystems.base import BaseUPath
class MyPath(BaseUPath):
"""Custom path with async support."""
pass
async def main():
path = MyPath("/some/path")
# All async methods available
content = await path.aread_text()
async for item in path.aiterdir():
print(item)
Performance Considerations¶
Parallel vs Sequential¶
For remote filesystems or many files, parallel loading provides significant speedups:
# Sequential (slow for many files)
files = await read_folder("s3://bucket/", load_parallel=False)
# Parallel (much faster)
files = await read_folder("s3://bucket/", load_parallel=True, chunk_size=100)
Chunk Size Tuning¶
The chunk_size parameter controls memory usage vs speed:
# Small chunks: Lower memory, more overhead
files = await read_folder("data/", chunk_size=10)
# Large chunks: Higher memory, less overhead
files = await read_folder("data/", chunk_size=200)
# Balance for most cases
files = await read_folder("data/", chunk_size=50)
Async Context Managers¶
Use async context managers for file operations:
async def safe_file_operations():
path = AsyncUPath("/tmp/data.txt")
# Proper resource cleanup
async with await path.aopen("r") as f:
content = await f.read()
# File is automatically closed
Fallback Behavior¶
For filesystems without native async support, AsyncUPath automatically falls back to running operations in thread pools:
async def main():
# Local filesystem has no native async
path = AsyncUPath("/tmp/file.txt")
# Automatically runs in thread pool
content = await path.aread_text()
This ensures all operations work consistently regardless of the underlying filesystem implementation.
Error Handling¶
import asyncio
from upathtools import AsyncUPath
async def robust_reading():
path = AsyncUPath("https://example.com/might-not-exist.txt")
try:
content = await path.aread_text()
except FileNotFoundError:
print("File not found")
except PermissionError:
print("Permission denied")
except asyncio.TimeoutError:
print("Operation timed out")
except Exception as e:
print(f"Unexpected error: {e}")
Advanced Usage¶
Custom Async Filesystem¶
Create filesystems with async operations:
from upathtools.filesystems.base import BaseAsyncFileSystem
from typing import TypedDict
class MyInfoDict(TypedDict):
name: str
size: int
type: str
class MyFileSystem(BaseAsyncFileSystem[MyPath, MyInfoDict]):
protocol = "my"
upath_cls = MyPath
async def _ls(self, path: str, detail: bool = True) -> list[MyInfoDict]:
# Native async implementation
async with aiohttp.ClientSession() as session:
response = await session.get(f"{path}/list")
data = await response.json()
return data
Concurrent Operations¶
Combine multiple async operations:
async def concurrent_operations():
paths = [
AsyncUPath(f"https://example.com/file{i}.txt")
for i in range(10)
]
# Read all files concurrently
contents = await asyncio.gather(*[p.aread_text() for p in paths])
# Process results
for path, content in zip(paths, contents):
print(f"{path}: {len(content)} characters")
Best Practices¶
- Use async consistently: Don't mix sync and async operations in the same context
- Tune chunk_size: Balance memory usage and performance based on file sizes
- Handle errors gracefully: Network operations can fail, always use try/except
- Use context managers: For file operations, use
async withfor proper cleanup - Monitor concurrency: Too much parallelism can overwhelm systems or APIs