Implicit Mount Module

This module contains the main classes for establishing SFTP connections and performing file operations on remote servers. It uses LFTP for high-performance file transfers and provides a Pythonic interface for remote file operations.

This module provides a pythonic interface for downloading files from a remote directory using the SFTP protocol.

The main functionality of this package is provided through the use of the ImplicitMount, IOHandler and RemotePathIterator classes:

  • The ImplicitMount class provides a low-level wrapper for the LFTP shell, which is used to communicate with the remote directory, and should only be used directly by advanced users.

  • The IOHandler class provides a high-level wrapper for the ImplicitMount class, which provides human-friendly methods for downloading files from a remote directory without the need to have technical knowledge on how to use LFTP.

  • The RemotePathIterator class provides a high-level wrapper for the IOHandler class, and handles asynchronous streaming of files from a remote directory to a local directory using thread-safe buffers.

pyremotedata.implicit_mount.delete_file_or_dir(path: str | PathLike[str], *, missing_ok: bool = True, force: bool = False) None[source]
class pyremotedata.implicit_mount.RemoteType(value)[source]

Bases: int, Enum

An enumeration.

MISSING = 0
FILE = 1
DIRECTORY = 2
class pyremotedata.implicit_mount.ImplicitMount(user: str | None = None, password: str | None = None, remote: str | None = None, port: int = 2222, verbose: bool = False, **kwargs)[source]

Bases: object

This is a low-level wrapper of LFTP, which provides a pythonic interface for executing LFTP commands and reading the output. It provides a robust and efficient backend for communicating with a remote storage server using the SFTP protocol, using a persistent LFTP shell handled in the background by a subprocess. It is designed to be used as a base class for higher-level wrappers, such as the pyremotedata.implicit_mount.IOHandler class, or as a standalone class for users familiar with LFTP.

OBS: The attributes of this method should not be used unless for development or advanced use cases, all responsibility in this case is on the user.

Parameters:
user: str | None = None

The username to use for connecting to the remote directory.

password: str | None = None

The SFTP password to possibly use when connecting to the remote host.

remote: str | None = None

The remote server to connect to. If user and password are supplied, this will default to ‘io.erda.au.dk’ for convenience.

port: int = 2222

The port to connect to (default: 2222).

verbose: bool = False

If True, print the commands executed by the class.

time_stamp_pattern = re.compile('^\\s*(\\S+\\s+){8}')
END_OF_OUTPUT = '# LFTP_END_OF_OUTPUT_IDENTIFIER {uuid} #'
static format_options(**kwargs) str[source]

Format keyword options as LFTP command-line arguments.

Parameters:
**kwargs

Keyword arguments to format.

Returns:

The formatted argument string.

execute_command(command: str, output: bool = True, blocking: bool = True, execute: bool = True, default_args: dict | None = None, **kwargs) str | list[str] | None[source]

Build and optionally execute an LFTP command.

Parameters:
command: str

The LFTP verb (and possibly arguments) to run.

output: bool = True

If True, return the command output.

blocking: bool = True

If True, wait for completion. If output is True, blocking must also be True.

execute: bool = True

If False, return the fully formatted command string instead of executing.

default_args: dict | None = None

Default options merged with **kwargs. Keyword arguments override defaults.

**kwargs

Additional options formatted via format_options().

Returns:

If execute is False, the formatted command string. If output is True, a list of output lines (or an empty list). Otherwise, None.

mount(lftp_settings: dict | None = None) None[source]

Open a persistent LFTP session and connect to the remote host.

Parameters:
lftp_settings: dict | None = None

LFTP settings to apply. If None, defaults from configuration are used.

Raises:
  • Exception – If the subprocess fails to start.

  • RuntimeError – If the connection to the remote directory fails.

unmount(timeout: float = 1) None[source]

Terminate the LFTP session and release resources.

Parameters:
timeout: float = 1

Maximum time to wait before forcefully terminating the process.

exists(path: str, mode: str = 'any', execute: bool = True, **kwargs)[source]

Check if a path (file/directory/link) exists.

Parameters:
path: str

Path to check.

mode: str = 'any'

Which types of file to match. Not-case sensitive, currently “any” (default), “file” and “directory” are supported.

**kwargs

Additional arguments passed to execute_command.

Returns:

Boolean if execute=True else the command.

pathtype(remote_path: str)[source]
get(remote_path: str | list[str] | tuple[str, ...], local_path: str | list[str] | tuple[str, ...] | None = None, execute: bool = True, **kwargs)[source]

Download file(s) using LFTP get.

Parameters:
remote_path: str | list[str] | tuple[str, ...]

Remote file path to download.

local_path: str | list[str] | tuple[str, ...] | None = None

Local download destination path. If None, downloads to the current local directory.

execute: bool = True

If False, return the command string.

**kwargs

Additional options forwarded to put.

Returns:

Command string(s) when execute is False; otherwise, local destination path(s) based on the type of remote_path.

mget(remote_paths: list[str] | tuple[str, ...], local_destination_dir: str, execute: bool = True, default_args: dict | None = None, **kwargs)[source]
pget(remote_path: str, local_path: str | None, execute: bool = True, default_args: dict | None = None, **kwargs) str | None[source]

Download a single file using LFTP pget.

Parameters:
remote_path: str

Remote file path to download.

local_path: str | None

Local path destination, defaults to remote basename in current local directory.

execute: bool = True

If False, return the command string instead of executing.

Returns:

Absolute local path.

put(local_path: str | list[str] | tuple[str, ...], remote_path: str | list[str] | tuple[str, ...] | None = None, execute: bool = True, **kwargs)[source]

Upload file(s) using LFTP put.

Parameters:
local_path: str | list[str] | tuple[str, ...]

Local file(s) to upload.

remote_path: str | list[str] | tuple[str, ...] | None = None

Remote file path(s) to upload to. If None, uploads to the current remote directory.

execute: bool = True

If False, return the command string.

**kwargs

Additional options forwarded to put.

Returns:

Command string(s) when execute is False; otherwise, remote destination path(s) based on the type of remote_path.

mput(local_paths: list[str] | tuple[str, ...], remote_destination_dir: str, execute: bool = True, default_args: dict | None = None, **kwargs)[source]
rm(path: str | list[str] | tuple[str, ...], force: bool = False, execute: bool = True, blocking: bool = True, **kwargs) str | None[source]

Remove a file or directory.

Parameters:
path: str | list[str] | tuple[str, ...]

File or directory to delete.

force: bool = False

Force deletion of path (rm -rf), directories can only be deleted with this argument (empty or not).

**kwargs

Additional arguments passed to execute_command.

Returns:

The command execute=False else None

du(path: str, all: bool = False, bytes: bool = True, execute: bool = True, blocking: bool = True, default_args: dict | None = None, **kwargs)[source]

Get the size of a directory/path.

Parameters:
path: str

Path to examine.

all: bool = False

Return size of all files and subdirectories of path, including path itself, separately.

bytes: bool = True

Return size in bytes, otherwise in KB.

**kwargs

Passed to execute_command.

Returns:

A dict with path(s) as keys and size as values, if blocking=False None and if execute=False the command.

ls(path: str = '.', recursive: bool = False, use_cache: bool = True, pbar: int = 0, _top: bool = True) list[str][source]

List files in a remote directory (optionally recursively).

Uses LFTP cls (and manual traversal for recursion) to return file paths relative to path.

Parameters:
path: str = '.'

Remote directory to search.

recursive: bool = False

If True, search recursively.

use_cache: bool = True

If True, use cls (cached). If False, use recls to force refresh.

pbar: int = 0

Progress heartbeat for recursive listings (0 to disable).

Returns:

Relative file paths.

lls(local_path: str = '.', **kwargs) list[str] | str | None[source]

List files in a local directory via the LFTP shell.

Parameters:
local_path: str = '.'

Notes

Prefer native Python/OS listing. This is mainly useful for consistency and debugging through the same LFTP session.

cd(remote_path: str, **kwargs)[source]
pwd() str[source]

Return the current remote directory (LFTP pwd).

lcd(local_path: str) str[source]

Change the current local directory (LFTP lcd).

Parameters:
local_path: str

Local directory to change to.

lpwd()[source]

Get the current local directory.

Returns:

The current local directory.

mirror(remote: str, local: str, reverse: bool = False, output: bool = True, blocking: bool = True, execute: bool = True, default_args: dict | None = None, **kwargs) list[str] | None[source]

Mirror a remote directory to a local destination (LFTP mirror).

Parameters:
remote: str

Remote directory to download (if not reverse).

local: str

Local destination directory (if not reverse).

reverse: bool = False

Upload from local to destination.

**kwargs

Additional options forwarded to mirror.

Returns:

List of newly downloaded files or following ImplicitMount.execute_command.

class pyremotedata.implicit_mount.IOHandler(local_dir: str | None = None, user_confirmation: bool = False, clean: bool = False, user: str | None = None, password: str | None = None, remote: str | None = None, lftp_settings: dict[str, str] | None = None, **kwargs)[source]

Bases: ImplicitMount

This is a high-level wrapper for the pyremotedata.implicit_mount.ImplicitMount class, which provides human-friendly methods for downloading files from a remote directory without the need to have technical knowledge on how to use LFTP.

To avoid SSH setup use lftp_settings = {‘sftp:connect-program’ : ‘ssh -a -x -i <keyfile>’}, user = <USER>, remote = <REMOTE>.

OBS: The attributes of this method should not be used unless for development or advanced use cases, all responsibility in this case is on the user.

Parameters:
local_dir: str | None = None

The local directory to use for downloading files. If None, a temporary directory will be used (suggested, unless truly necessary).

user_confirmation: bool = False

If True, the user will be asked for confirmation before deleting files. (strongly suggested for debugging and testing)

clean: bool = False

If True, the local directory will be cleaned after the context manager is exited. (suggested, if not it may lead to rapid exhaustion of disk space)

lftp_settings: dict[str, str] | None = None

Add any additional settings or setting overrides (see https://lftp.yar.ru/lftp-man.html). The most common usecase is properly to use lftp_settings = {‘sftp:connect-program’ : ‘ssh -a -x -i <keyfile>’}. The defaults can also be overwritten by changing the PyRemoteData config file.

user: str | None = None

The username to use for connecting to the remote directory.

password: str | None = None

The SFTP password to possibly use when connecting to the remote host.

remote: str | None = None

The remote server to connect to.

**kwargs

Keyword arguments to pass to the pyremotedata.implicit_mount.ImplicitMount constructor.

start() None[source]

Initialize the connection to the remote directory.

Very useful for interactive use, but shouldn’t be used in scripts, using a context manager is safer and does the same.

stop() None[source]

Close the connection to the remote directory.

download(remote_path: str | list[str] | tuple[str, ...], local_destination: str | list[str] | tuple[str, ...] | None = None, n: int = 14, blocking: bool = True, **kwargs)[source]

Downloads one or more files or a directory from the remote directory to the given local destination.

Parameters:
remote_path: str | list[str] | tuple[str, ...]

The remote path(s) to download.

local_destination: str | list[str] | tuple[str, ...] | None = None

The local destination directory(s) to download the file(s) to. If None, the file(s) will be downloaded to the current local directory.

n: int = 14

Parallel connections to use if relevant (default=14).

blocking: bool = True

If True, the function will block until the download is complete.

**kwargs

Extra keyword arguments are passed to the IOHandler.multi_download, IOHandler.pget or IOHandler.mirror functions depending on the type of the remote path(s).

Returns:

The local path(s) of the downloaded file(s) or directory.

upload(local_path: str | list[str] | tuple[str, ...], remote_destination: str | list[str] | tuple[str, ...] | None = None, n: int = 14, blocking: bool = True, **kwargs)[source]

Uploads one or more files or a directory to the remote destination.

Parameters:
local_path: str | list[str] | tuple[str, ...]

The local file(s) or directory to upload.

remote_destination: str | list[str] | tuple[str, ...] | None = None

Remote destination directory(s) of uploaded files. If None will upload to current remote directory.

n: int = 14

Parallel connections to use if relevant (default=14).

blocking: bool = True

If True, the function will block until the download is complete.

**kwargs

Extra keyword arguments are passed to the IOHandler.multi_download, IOHandler.pget or IOHandler.mirror functions depending on the type of the remote path(s).

Returns:

The local path(s) of the downloaded file(s) or directory.

sync(local_destination: str | None = None, direction: str = 'down', allow_root: bool = False, progress: bool = False, batch_size: int = 128, replace_local: bool = False, refresh_cache: bool = False, **kwargs)[source]

Synchronized the current remote directory to the given local destination.

Parameters:
local_destination: str | None = None

The local destination to synchronize the current remote directory to, defaults to “<CURRENT_LOCAL_DIRECTORY_PATH>/<CURRENT_REMOTE_DIRECTORY_NAME>”.

direction: str = 'down'

Synchronization directory; one of non-case-sensitive [“down”, “up”, “both”] (default=”down”). “down”: Download contents of current remote directory to local destination. “up”: Upload contents of local destination to current remote directory. “both”: First synchronize “down”, then synchronize “up”.

progress: bool = False

Show a progress bar.

batch_size: int = 128

Number of files passed to each download call.

replace_local: bool = False

By default existing files are skipped, if this is enabled, existing files are deleted and refetched.

refresh_cache: bool = False

Recompute file index of remote directory, can be extremely slow. Disabled by default.

**kwargs

Passed to IOHandler.download.

Returns:

A list of paths to the local paths that have been synchronized, not including existing files if replace_local=False.

get_file_index(skip: int = 0, nmax: int | None = None, override: bool = False, store: bool = True, pattern: str | None = None) list[str][source]

Get a list of files in the current remote directory.

Parameters:
skip: int = 0

The number of files to skip.

nmax: int | None = None

The maximum number of files to include.

override: bool = False

If True, the file index will be overridden if it already exists.

store: bool = True

If True, the file index will be stored on the remote directory.

pattern: str | None = None

A regular expression pattern to filter the file names by, e.g. “.txt$” to only include files with the “.txt” extension.

Returns:

A list of files in the current remote directory.

cache_file_index(skip: int = 0, nmax: int | None = None, override: bool = False) None[source]
clean()[source]
class pyremotedata.implicit_mount.RemotePathIterator(io_handler: IOHandler, batch_size: int = 64, batch_parallel: int = 10, max_queued_batches: int = 3, n_local_files: int = 384, clear_local: bool = True, retry_base_delay: float = 0.5, retry_max_delay: float = 30.0, retry_timeout: float = 120.0, **kwargs)[source]

Bases: object

Buffered iterator for streaming many remote files efficiently.

Downloads are performed in a background thread and yielded as (local_path, remote_path) tuples for consumption by the caller.

Parameters:
io_handler: IOHandler

Active pyremotedata.implicit_mount.IOHandler used for all transfers.

batch_size: int = 64

Files to download per batch. Larger batches can be more efficient but use more memory.

batch_parallel: int = 10

Parallel transfers per batch. Tune for fairness and server limits.

max_queued_batches: int = 3

Number of prefetched batches to keep queued. Higher values smooth throughput but require more local storage.

n_local_files: int = 384

Maximum number of local files to keep before deleting consumed files. Must exceed batch_size * max_queued_batches (2x recommended).

clear_local: bool = True

If True, delete consumed files to free space.

retry_base_delay: float = 0.5

Initial backoff (seconds) for single-item retry.

retry_max_delay: float = 30.0

Maximum backoff (seconds) per retry step.

retry_timeout: float = 120.0

Per-file hard timeout (seconds) before raising.

**kwargs

Forwarded to IOHandler.get_file_index to build the file index. Set store=False for read-only remotes (slower on first run). If store=False, override must also be False.

Yields:

(local_path, remote_path) – for each downloaded file.

shuffle() None[source]

Shuffle the internal list of remote paths in-place.

Raises:

RuntimeError – If called while iterating.

subset(indices: list[int]) None[source]

Restrict the iterator to a subset of indices (in-place).

Parameters:
indices: list[int]

Indices to keep. Accepts a list, a single int, or a slice.

Raises:
split(proportion: list[float | int] | None = None, indices: list[list[int]] | None = None) list[RemotePathIterator][source]

Split into multiple iterators that share the same backend.

Either proportion or indices must be provided (exclusively). The resulting iterators must not be used in parallel.

Parameters:
proportion: list[float | int] | None = None

Proportions used to allocate items to splits. Will be normalized if needed.

indices: list[list[int]] | None = None

Explicit index lists for each split.

Returns:

Independent iterators over disjoint

subsets of the original paths.

download_files() None[source]
start_download_queue() None[source]

Start the background thread that performs batch downloads.