import asyncio
import io
import os
import logging
import weakref
import random
from urllib.parse import urlparse
from decorator import decorator
from pathlib import Path
import aiohttp
import requests
from fsspec.asyn import (
AsyncFileSystem,
sync,
sync_wrapper,
)
from fsspec.callbacks import _DEFAULT_CALLBACK
from fsspec.exceptions import FSTimeoutError
from fsspec.utils import isfilelike, nullcontext, tokenize
from fsspec.implementations.memory import MemoryFile
DEFAULT_CONFIG = {"STORAGE_SERVICE": "https://storage.oceanum.io"}
_DEFAULT_BATCH_SIZE = 16
_NOFILES_DEFAULT_BATCH_SIZE = 16
logger = logging.getLogger("fsspec.oceanum")
@decorator
async def retry_request(func, retries=6, *args, **kwargs):
for retry in range(retries):
try:
if retry > 0:
await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32))
return await func(*args, **kwargs)
except (
aiohttp.client_exceptions.ClientError,
FSTimeoutError,
FileNotFoundError,
) as e:
if isinstance(e, FileNotFoundError):
logger.debug("Request returned 404")
raise e
if retry == retries - 1:
logger.exception(f"{func.__name__} out of retries on exception: {e}")
raise e
[docs]
async def get_client(**kwargs):
return aiohttp.ClientSession(**kwargs)
[docs]
class FileSystem(AsyncFileSystem):
"""Datamesh storage filesystem.
This follows the fsspec specification and can be used with dask.
You can use this class directly, for example:
fs = FileSystem(token="my_datamesh_token")
fs.ls("/myfolder")
or use fsspec convenience functions with protocol "oceanum". For example:
of=fsspec.open("oceanum://myfolder/myfile.txt", token="my_datamesh_token")
"""
[docs]
def __init__(
self,
token:str|None=None,
service:str=os.environ.get("STORAGE_SERVICE", DEFAULT_CONFIG["STORAGE_SERVICE"]),
asynchronous:bool=False,
loop:asyncio.AbstractEventLoop|None=None,
timeout:int=3600,
batch_size:int=_DEFAULT_BATCH_SIZE,
):
"""Storage filesystem constructor
Args:
token (string): Your datamesh access token. Defaults to os.environ.get("DATAMESH_TOKEN", None).
service (string, optional): URL of datamesh service. Defaults to os.environ.get("STORAGE_SERVICE", "https://storage.oceanum.io").
timeout (int, optional): Timeout for requests in seconds. Defaults to 3600.
batch_size (int, optional): Number of concurrent requests. Defaults to 16.
Raises:
ValueError: Missing or invalid arguments
"""
self._token = token or os.environ.get("DATAMESH_TOKEN", None)
url = urlparse(service)
self._proto = url.scheme
self._host = url.netloc
self._base_url = f"{self._proto}://{self._host}/"
self._init_auth_headers(self._token)
super().__init__(
self, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)
self.get_client = get_client
self.client_kwargs = {
"headers": self._auth_headers,
"timeout": aiohttp.ClientTimeout(total=timeout, sock_read=timeout),
}
self._session = None
def _init_auth_headers(self, token: str| None):
if token is not None:
if token.startswith("Bearer "):
self._auth_headers = {"Authorization": token}
else:
self._auth_headers = {
"X-DATAMESH-TOKEN": token,
}
else:
raise ValueError(
"A valid key must be supplied as a connection constructor argument or defined in environment variables as DATAMESH_TOKEN"
)
@property
def fsid(self):
return "oceanum"
[docs]
@staticmethod
def close_session(loop, session):
if loop is not None and loop.is_running():
try:
sync(loop, session.close, timeout=0.1)
return
except (TimeoutError, FSTimeoutError):
pass
connector = getattr(session, "_connector", None)
if connector is not None:
# close after loop is dead
connector._close()
[docs]
async def set_session(self):
if self._session is None:
self._session = await self.get_client(loop=self.loop, **self.client_kwargs)
if not self.asynchronous:
weakref.finalize(self, self.close_session, self.loop, self._session)
return self._session
async def _ls(self,
path="",
detail=True,
file_prefix=None,
match_glob=None,
limit=None,
):
logger.debug(path)
session = await self.set_session()
spath = path.lstrip("/")
params = {}
if limit:
params["limit"] = limit
if file_prefix:
params["file_prefix"] = file_prefix
if match_glob:
params["match_glob"] = match_glob
async with session.get(self._base_url + spath, params=params or None) as r:
try:
self._raise_not_found_for_status(r, path)
except FileNotFoundError:
# The storage endpoint enforces trailing slash for directories, so test for that
if path.endswith("/"):
raise FileNotFoundError(path)
else:
return await self._ls(
path + "/",
detail=detail,
file_prefix=file_prefix,
match_glob=match_glob,
limit=limit,
)
listing = await r.json()
if not listing:
raise FileNotFoundError(path)
if detail:
return [
{
**u,
"type": "directory" if u.get("contentType") == "folder" else "file",
}
for u in listing
]
else:
return [u["name"] for u in listing]
ls = sync_wrapper(_ls)
def _raise_not_found_for_status(self, response, url):
"""
Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
"""
if response.status == 404:
raise FileNotFoundError(url)
response.raise_for_status()
async def _cat_file(self, path, **kwargs):
logger.debug(path)
session = await self.set_session()
async with session.get(self._base_url + path.strip("/")) as r:
out = await r.read()
self._raise_not_found_for_status(r, path)
return out
@retry_request
async def _get_file(
self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs
):
if os.path.isdir(lpath):
return
logger.debug(rpath)
if rpath[-1] == "/":
os.makedirs(lpath, exist_ok=True)
return
session = await self.set_session()
async with session.get(self._base_url + rpath.lstrip("/")) as r:
try:
size = int(r.headers["content-length"])
except (ValueError, KeyError):
size = None
callback.set_size(size)
self._raise_not_found_for_status(r, rpath)
if isfilelike(lpath):
outfile = lpath
else:
outfile = open(lpath, "wb")
try:
chunk = True
while chunk:
chunk = await r.content.read(chunk_size)
outfile.write(chunk)
callback.relative_update(len(chunk))
finally:
if not isfilelike(lpath):
outfile.close()
async def _put_file(
self,
lpath,
rpath,
chunk_size=5 * 2**20,
callback=_DEFAULT_CALLBACK,
method="post",
**kwargs,
):
async def gen_chunks():
# Support passing arbitrary file-like objects
# and use them instead of streams.
if isinstance(lpath, io.IOBase):
context = nullcontext(lpath)
use_seek = False # might not support seeking
else:
context = open(lpath, "rb")
use_seek = True
with context as f:
if use_seek:
callback.set_size(f.seek(0, 2))
f.seek(0)
else:
callback.set_size(getattr(f, "size", None))
chunk = f.read(chunk_size)
while chunk:
yield chunk
callback.relative_update(len(chunk))
chunk = f.read(chunk_size)
session = await self.set_session()
method = method.lower()
if method not in ("post", "put"):
raise ValueError(
f"method has to be either 'post' or 'put', not: {method!r}"
)
meth = getattr(session, method)
async with meth(self._base_url + rpath.strip("/"), data=gen_chunks()) as resp:
self._raise_not_found_for_status(resp, rpath)
async def _exists(self, path, **kwargs):
try:
logger.debug(path)
session = await self.set_session()
r = await session.get(self._base_url + path.lstrip("/"))
async with r:
return r.status < 400
except (requests.HTTPError, aiohttp.ClientError):
return False
async def _isfile(self, path):
try:
info = await self._info(path)
return info["type"] == "file"
except:
return False
async def _isdir(self, path):
try:
info = await self._info(path)
return info["type"] == "directory"
except OSError:
return False
async def _open(
self,
path,
mode="rb",
chunk_size=5 * 2**8,
**kwargs,
):
if mode != "rb":
raise NotImplementedError
logger.debug(path)
session = await self.set_session()
async with session.get(self._base_url + path.strip("/")) as r:
try:
size = int(r.headers["content-length"])
except (ValueError, KeyError):
size = None
self._raise_not_found_for_status(r, path)
f = MemoryFile(None, None)
try:
chunk = True
while chunk:
chunk = await r.content.read(chunk_size)
f.write(chunk)
finally:
f.seek(0)
return f
async def _info(self, path, **kwargs):
"""Get info of path"""
info = {}
session = await self.set_session()
async with session.head(self._base_url + path.lstrip("/")) as r:
self._raise_not_found_for_status(r, path)
return {
"name": path,
"size": int(r.headers.get("content-length")),
"contentType": r.headers.get("content-type"),
"modified": r.headers.get("last-modified"),
"type": (
"directory" if r.headers.get("content-type") == "folder" else "file"
),
}
async def _mkdir(self, path, create_parents=True, **kwargs):
logger.debug(path)
if create_parents:
await self._makedirs(path, exist_ok=True)
else:
await self._makedirs(path, exist_ok=False)
async def _makedirs(self, path, exist_ok=False):
logger.debug(path)
if not exist_ok:
check = await self._exists(path)
if check:
raise FileExistsError(path)
session = await self.set_session()
async with session.put(self._base_url + path.strip("/") + "/_") as r:
self._raise_not_found_for_status(r, path)
async def _cp_file(self, path1, path2, **kwargs):
logger.debug(f"{path1} -> {path2}")
session = await self.set_session()
async with session.post(
self._base_url + path2.lstrip("/"), headers={"x-copy-source": path1}
) as r:
self._raise_not_found_for_status(r, path1)
return r.status == 201
[docs]
def ukey(self, path):
"""Unique identifier"""
return tokenize(path)
[docs]
def ls(
path: str,
recursive: bool,
detail: bool = False,
token: str | None = None,
service: str = DEFAULT_CONFIG["STORAGE_SERVICE"],
**kwargs,
):
"""List contents in the oceanum storage (the root directory by default).
Parameters
----------
path: str
Path to list.
recursive: bool
List subdirectories recursively.
detail: bool
Return detailed information about each content.
token: str
Oceanum datamesh token.
service: str
Oceanum storage service URL.
Returns
-------
contents: list | dict
List of contents or dictionary with detailed information about each content.
"""
fs = FileSystem(token=token, service=service)
try:
maxdepth = None if recursive else 1
paths = fs.find(path, maxdepth=maxdepth, withdirs=True, detail=detail,
**kwargs)
if not paths:
raise FileNotFoundError(f"Path {path} not found")
return paths
except aiohttp.client_exceptions.ClientError as err:
raise aiohttp.client_exceptions.ClientError(
f"Path {path} not found or not authorised (check datamesh token)"
) from err
[docs]
def get(
source: str,
dest: str,
recursive: bool = False,
token: str | None = None,
service: str = DEFAULT_CONFIG["STORAGE_SERVICE"],
):
"""Copy remote source to local dest, or multiple sources to directory.
Parameters
----------
source: str
Path to get.
dest: str
Destination path.
recursive: bool
Get directories recursively.
overwrite: bool
Overwrite existing destination.
token: str
Oceanum datamesh token.
service: str
Oceanum storage service URL.
Notes
-----
- Directory dest defined with a trailing slash must exist, consistent with gsutil.
- Non-existing file dest path is allowed, consistent with gsutil (all required
intermediate directories are created).
"""
fs = FileSystem(token=token, service=service)
is_source_dir = fs.isdir(source)
# Ensure attempting to copy folder without recursive option does not fail silently
if is_source_dir and not recursive:
raise IsADirectoryError(f"--recursive is required to get directory {source}")
# Deal with mimetype error when trying to copy file with recursive option
if not is_source_dir and recursive:
raise NotADirectoryError(f"Source {source} is a file, do not use --recursive")
# Prevent from downloading into a folder that does not exist
if not Path(dest).is_dir() and dest.endswith(os.path.sep):
raise FileNotFoundError(f"Destination directory {dest} not found")
# Expand destination file name to avoid IsADirectoryError
if not is_source_dir and Path(dest).is_dir():
dest = str(Path(dest) / Path(source).name)
# Expand destination folder name if different than source's to keep path structure
if is_source_dir and Path(dest) != Path(source):
dest = str(Path(dest) / Path(source).name)
# Downloading
fs.get(source, dest, recursive=recursive)
[docs]
def put(
source: str,
dest: str,
recursive: bool = False,
token: str | None = None,
service: str = DEFAULT_CONFIG["STORAGE_SERVICE"],
):
"""Copy local source to remote dest, or multiple sources to directory.
Parameters
----------
source: str
Path to get.
dest: str
Destination path.
recursive: bool
Get directories recursively.
token: str
Oceanum datamesh token.
service: str
Oceanum storage service URL.
"""
fs = FileSystem(token=token, service=service)
is_dest_dir = fs.isdir(dest)
# Deal with ClientOsError when trying to copy non-existing source
if not Path(source).exists():
raise FileNotFoundError(f"Source {source} not found")
# Ensure attempting to copy folder without recursive option does not fail silently
if Path(source).is_dir() and not recursive:
raise IsADirectoryError(f"--recursive is required to put directory {source}")
# Raise if trying to upload a folder into an existing file
if fs.exists(dest) and not is_dest_dir and Path(source).is_dir():
raise FileExistsError(f"Destination {dest} is an existing file")
# Downloading
fs.put(source, dest, recursive=recursive)
[docs]
def rm(
path: str,
recursive: bool = False,
token: str | None = None,
service: str = DEFAULT_CONFIG["STORAGE_SERVICE"],
):
"""Remove path file or directory.
Parameters
----------
path: str
Path to remove.
recursive: bool
Remove directories recursively.
token: str
Oceanum datamesh token.
service: str
Oceanum storage service URL.
"""
raise NotImplementedError("rm not implemented yet")