Read Delta tables without any Spark

jeppe742, updated 🕥 2022-10-03 06:09:13

Build package pypi Downloads

Delta Lake Reader

The Delta format, developed by Databricks, is often used to build data lakes or lakehouses.

While it has many benefits, one of the downsides of delta tables is that they rely on Spark to read the data. This might be infeasible, or atleast introduce a lot of overhead, if you want to build data applications like Streamlit apps or ML APIs ontop of the data in your Delta tables.

This package tries to fix this, by providing a lightweight python wrapper around the delta file format, without any Spark dependencies

Installation

Install the package using pip

pip install delta-lake-reader This will only install the minimal dependencies for working with local file system. To access Delta Tables stored in popular cloud storages, use one of the following commands, to include the cloud specific dependencies

Azure

pip install delta-lake-reader[azure]

Amazon Web Services (AWS)

pip install delta-lake-reader[aws]

Google Cloud Platform (GCP)

pip install delta-lake-reader[gcp]

Usage

Package is build on PyArrow and FSSpec.

This means that you get all the features of PyArrow, like predicate pushdown, partition pruning and easy interoperability with Pandas.

Meanwhile, FSSpec serves as a FileSystem agnostic backend, that lets you read files from many places, including popular cloud providers.

To read a DeltaTable, first create a DeltaTable object. This will read the delta transaction log to find the current files, and get the schema. This will, however, not read any data. To read the content of the table, call to_table() to get a pyarrow.Table object, or to_pandas() to get a pandas.DataFrame

Local file system

```python from deltalake import DeltaTable

native file path. Can be relative or absolute

table_path = "somepath/mytable"

Get table as pyarrow table

df = DeltaTable(table_path).to_table()

Get table as pandas dataframe

df = DeltaTable(table_path).to_pandas() ```

Azure

The Azure integration is based on adlfs package, developed by the Dask community.

credential used to authenticate against the storage account, can be either a SAS token, Access Keys or one of the azure.identity classes. See authentication using Azure SDK for more information.

```python from deltalake import DeltaTable from adlfs import AzureBlobFileSystem

example url 'abfss://[email protected]/somepath/mytable'

fs = AzureBlobFileSystem( account_name="myStorageAccount", credential='...' ) df = DeltaTable("mycontainer/somepath/mytable", file_system=fs).to_pandas()

```

Amazon Web Service (AWS)

The AWS integration is based on s3fs package, developed by the Dask community.

To authenticate you can either specify the access key and secret, or since it's build on boto, use one of their methods for authentication. See authentication using AWS SDK for more information.

```python from deltalake import DeltaTable from s3fs import S3FileSystem

example url 's3://myBucket/somepath/mytable'

fs = S3FileSystem() #authenticate using environment variables, in this example df = DeltaTable("myBucket/somepath/mytable", file_system=fs).to_pandas()

```

Google Cloud Platform (GCP)

The GCP integration is based on the gcsfs, developed by the Dask community.

For more information about authentication with GCP see the gcsfs documentation or the GCP documentation

```python from deltalake import DeltaTable from gcsfs import GCSFileSystem

example url 'gs://myBucket/somepath/mytable'

fs = GCSFileSystem() #authenticate using environment variables, in this example df = DeltaTable("myBucket/somepath/mytable", file_system=fs).to_pandas()

```

Time travel

One of the features of the Delta format, is the ability to do timetravel.

This can be done using the as_version method. Note that this currenly only support specific version, and not timestamp. ```python from deltalake import DeltaTable

df = DeltaTable("somepath/mytable").as_version(5).to_pandas() ``` Timetraveling to a version that has been vacuumed, currently results in undefined behavior

Predicate Pushdown, Partition Pruning & Columnar file formats

Since the resulting DeltaTable is based on the pyarrow.DataSet, you get many cool features for free.

The DeltaTable.to_table is inherited from pyarrow.Dataset.to_table. This means that you can include arguments like filter, which will do partition pruning and predicate pushdown. If you have a partitioned dataset, partition pruning can potentially reduce the data needed to be downloaded substantially. The predicate pushdown will not have any effect on the amount of data downloaded, but will reduce the dataset size when loaded into memory.

Further more, since the underlying parquet file format is columnar, you can select a subset of columns to be read from the files. This can be done by passing a list of column names to to_table.

See documentation of to_pandas, or to_table for documentation of all arguments

```python import pyarrow.dataset as ds

Predicate pushdown.

If the table is partitioned on age, it will also to partition pruning

df = DeltaTable("...").to_table(filter=ds.field("age")>=18).to_pandas()

Only load a subset of columns

df = DeltaTable("...").to_table(columns=["age","name"]).to_pandas()

```

Read more about filtering data using PyArrow

Bring Your Own Filesystem

Since the implementation is using the FSSpec for filesystem abstraction, you can in principle use any FSSpec filesystem. See more about available FSSpec interfaces.

python fs = SomeFSSpecFilesystem() df = DeltaTable(path, file_system=fs).to_pandas()

Performance comparison with PySpark

It is possible to run PySpark in local mode, which means you can run spark code without having to spin up an entire cluster. This, however, still involves a big performance and resource usage overhead. To investigate if this module is actually faster than using PySpark i made a small experiment.

The time to read a table into a pandas dataframe was measured for a table with 3 columns, and various number of rows. The tables were stored locally on a VM (8 vCPUs, 32GB ram). This might be a synthetic test case since you normally would store your table in a remote blob store, where network latency would even out the results a bit. PySpark was, however, still given an advantage by first being timed after starting the Spark session, which can take several seconds. Furthermore the resource usage by PySpark should be significantly higher, both in terms of CPU and RAM, which can be another limiting factor. Finally, reading data from remote blob storage often requires adding cloud specific JARs to the runtime, which may or may not be tedious to get to work.

The results can be seen below, where delta-lake-reader is about 100x faster than PySpark on average

Disclaimer (2021-01-15)

Databricks recently announced a stand alone reader for Delta tables in a blogpost The stand alone reader is JVM based, but an "official" Rust implementation with python bindings also exists. Back then the python bindings couldn't be pip installed, which was a major inconvenience for python developers, but this has since been added. While there is a lot of overlap between these two project, this projects still supports a few additional features, compared to the Rust implemtation, like more alternatives for authenticating to azure (identity based, instead of only relying on account key) and support for more file systems like GCP buckets. If you, however, are interested in a more actively maintained package, if would recommend taking a look at the Delta Rust implemtation. Although the idea for this library was made independently, some inspirations has been taken from the Rust library.

Read more

Delta Table paper

Delta transaction log

PyArrow Documentation

FSSpec Documentation

Issues

Azure Function - Error Reading Delta Table from Azure Storage Account

opened on 2022-12-20 18:38:12 by rafael-gomez-61

Azure Function - When trying to access a delta table on our Azure storage account, I get an error when I call DeltaTable class. Same code on PyCharm, no error.

I need HELP!! Am I missing something???

I have the following defined in the requirements.txt azure-functions azure-identity pyodbc delta-lake-reader[azure] Requirement already satisfied: delta-lake-reader[azure] in c:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages (from -r requirements.txt (line 8)) (0.2.13)

``` def readDeltaTableSchema(container_name: str, schema_name: str, delta_table: str) -> str: from deltalake import DeltaTable from adlfs import AzureBlobFileSystem

VZNTDIRECTORYID = os.getenv('AZ_STORAGE_VZNTDIRECTORYID')
VZNTID = os.getenv('AZ_STORAGE_VZNTID')
VZNTSECRET = os.getenv('AZ_STORAGE_VZNTSECRET')

az_account_name = os.getenv('AZ_STORAGE_STARBURST_ACCT')
vznt_tenant_storage_account = os.getenv('AZ_STORAGE_STARBURST_ACCT')
Storage_URL = "https://{vznt_tenant_storage_account}.dfs.core.windows.net"

az_container = container_name
az_schema = schema_name
az_delta_table = delta_table

url = f"abfss://{az_container}@{az_account_name}.dfs.core.windows.net/delta/{az_schema}/{az_delta_table}"

fs = AzureBlobFileSystem(
    account_name=az_account_name, account_url=Storage_URL,
    client_id=VZNTID, client_secret=VZNTSECRET, tenant_id=VZNTDIRECTORYID
)

--> deltaTableSchemaMeta = DeltaTable(url, file_system=fs)

``` Error: [2022-12-20T18:27:16.855Z] Executed 'Functions.register-data-sourceHTTPTrigger' (Failed, Id=bbfa069d-d7c9-475c-860b-3e593a0e0378, Duration=16026ms) [2022-12-20T18:27:16.858Z] System.Private.CoreLib: Exception while executing function: Functions.register-data-sourceHTTPTrigger. System.Private.CoreLib: Result: Failure Exception: HttpResponseError: Operation returned an invalid status 'The specifed resource name contains invalid characters.' ErrorCode:InvalidResourceName Stack: File "C:\ProgramData\chocolatey\lib\azure-functions-core-tools-3\tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "C:\Program Files\Python39\lib\concurrent\futures\thread.py", line 52, in run result = self.fn(self.args, self.kwargs) File "C:\ProgramData\chocolatey\lib\azure-functions-core-tools-3\tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 606, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "C:\ProgramData\chocolatey\lib\azure-functions-core-tools-3\tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\extension.py", line 215, in _raw_invocation_wrapper result = function(args) File "C:\working-folder\az-func\http-register-dta-source\register-data-sourceHTTPTrigger__init__.py", line 30, in main storage_access() File "C:\working-folder\az-func\http-register-dta-source\register-data-sourceHTTPTrigger__init__.py", line 62, in storage_access val = readDeltaTableSchema(az_container, az_schema, az_delta_table) File "C:\working-folder\az-func\http-register-dta-source\register-data-sourceHTTPTrigger__init__.py", line 91, in readDeltaTableSchema deltaTableSchemaMeta = DeltaTable(url, file_system=fs) File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\deltalake\deltatable.py", line 40, in init if not self._is_delta_table(): File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\deltalake\deltatable.py", line 62, in _is_delta_table return self.filesystem.exists(f"{self.log_path}") File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\adlfs\spec.py", line 1292, in exists return sync(self.loop, self._exists, path) File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\fsspec\asyn.py", line 71, in sync raise return_result File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\fsspec\asyn.py", line 25, in _runner result[0] = await coro File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\adlfs\spec.py", line 1314, in _exists if await bc.exists(): File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\azure\core\tracing\decorator_async.py", line 79, in wrapper_use_tracer return await func(args, **kwargs) File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\azure\storage\blob\aio_blob_client_async.py", line 652, in exists process_storage_error(error) File "C:\working-folder\az-func\http-register-dta-source.venv\lib\site-packages\azure\storage\blob_shared\response_handlers.py", line 185, in process_storage_error exec("raise error from None") # pylint: disable=exec-used # nosec File "", line 1, in .

Problem with reading tables from deltalake using AzureBlobFileSystem in adlfs #python

opened on 2022-09-08 08:07:06 by shr-poojary

Hi, I am facing an issue while reading delta tables from Azure Delta Lake using AzureBlobFileSystem in adlfs. I am not sure if its an issue with syntax or library versions. Kindly help!!

Libraries: 1. delta-lake-reader (Version: 0.2.13) 2. adlfs ((Version: 0.7.7) 3. azure-identity (Version: 1.7.1)

``` token_credential = ClientSecretCredential(active_directory_tenant_id, active_directory_client_id, active_directory_client_secret)

fs = AzureBlobFileSystem( account_name = storage_account, credential = token_credential )

dt = DeltaTable('containername/folder1/folder2/tablefolder', file_system=fs) #tablefolder contains the _delta_log ```

Error: ```

TypeError Traceback (most recent call last) in 7 pathtosa = deltaLakeFinalUri + conatainerPathToDeltaTable + deltaTableName 8 print(pathtosa) ----> 9 dt = DeltaTable('dqdata/RAW/Revenue/actual_revenue_and_margin', file_system=fs) 10 df=dt.to_pandas()

~\AppData\Roaming\Python\Python38\site-packages\deltalake\deltatable.py in init(self, path, file_system) 43 Make sure you point to the root of a delta table""" 44 ) ---> 45 self._as_newest_version() 46 47 # The PyArrow Dataset is exposed by a factory class,

~\AppData\Roaming\Python\Python38\site-packages\deltalake\deltatable.py in _as_newest_version(self) 149 # apply remaining versions. This can be a maximum of 9 versions. 150 # we will just break when we don't find any newer logs --> 151 self._apply_partial_logs(version=self.checkpoint + 9) 152 153 def to_table(self, args, *kwargs):

~\AppData\Roaming\Python\Python38\site-packages\deltalake\deltatable.py in _apply_partial_logs(self, version) 130 elif "metaData" in meta_data.keys(): 131 schema_string = meta_data["metaData"]["schemaString"] --> 132 self.schema = schema_from_string(schema_string) 133 # Stop if we have reatched the desired version 134 if self.version == version:

~\AppData\Roaming\Python\Python38\site-packages\deltalake\schema.py in schema_from_string(schema_string) 17 pa_type = map_type(type) 18 ---> 19 fields.append(pa.field(name, pa_type, nullable=nullable, metadata=metadata)) 20 return pa.schema(fields) 21

C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\types.pxi in pyarrow.lib.field()

C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\types.pxi in pyarrow.lib.ensure_metadata()

C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\types.pxi in pyarrow.lib.KeyValueMetadata.init()

C:\ProgramData\Anaconda3\lib\site-packages\pyarrow\lib.cp38-win_amd64.pyd in string.from_py.__pyx_convert_string_from_py_std__in_string()

TypeError: expected bytes, int found

```

Problem with reading blobs with Deltatable from deltalake and AzureBlobFileSystem from adlfs; with python

opened on 2022-04-04 08:34:14 by vvandermeij

Summary

I am trying to host an app online that has to read blobs with deltalake format from azure blobstorage. To do this I use: AzureBlobFileSystem from adlfs DeltaTable from deltalake. When hosting the website, (with azure App service) everything works well: When given input, the app reads the data from azure blob storage and returns the correct tables. The problem occurs when the app has been online and unused for several hours: When the app tries to read from the deltalake storage it fails and raises an HttpResponseError with a traceback that is not clear enough for me to understand what the actual issue is. When I restart the app, everything works fine again untill you wait for a few hours: The same bug keeps returning. I am not sure if the problem occurs in the adlfs package or the deltalake package. I hope that someone here can help to understand where it is failing and why and knows a solution to the problem! Thanks in advance

Traceback

``` HttpResponseError: The range specified is invalid for the current size of the resource. RequestId:2988ea13-601e-0083-40f7-47b68d000000 Time:2022-04-04T07:42:05.9077599Z ErrorCode:InvalidRange Content: <?xml version="1.0" encoding="utf-8"?>InvalidRange The range specified is invalid for the current size of the resource. RequestId:2988ea13-601e-0083-40f7-47b68d000000 Time:2022-04-04T07:42:05.9077599Z

Traceback: File "/opt/venv/lib/python3.9/site-packages/streamlit/script_runner.py", line 379, in _run_script exec(code, module.dict) File "/app/src/incasso/dashboard_form.py", line 95, in eenheid = find_eenheidnum(deelcontractnummer, fs).iloc[0] File "/opt/venv/lib/python3.9/site-packages/sklego/pandas_utils.py", line 81, in wrapper result = func(args, kwargs) File "/opt/venv/lib/python3.9/site-packages/incasso/dashboard_functions.py", line 29, in find_eenheidnum DeltaTable("20-silver/edh/woc_contracten", file_system=fs) File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 45, in init self._as_newest_version() File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 151, in _as_newest_version self._apply_partial_logs(version=self.checkpoint + 9) File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 113, in _apply_partial_logs for line in log: File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1616, in next out = self.readline() File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1613, in readline return self.readuntil(b"\n") File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1596, in readuntil part = self.read(blocks or self.blocksize) File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1565, in read out = self.cache._fetch(self.loc, self.loc + length) File "/opt/venv/lib/python3.9/site-packages/fsspec/caching.py", line 154, in _fetch self.cache = self.fetcher(start, end) # new block replaces old File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 91, in wrapper return sync(self.loop, func, args, kwargs) File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 71, in sync raise return_result File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 25, in _runner result[0] = await coro File "/opt/venv/lib/python3.9/site-packages/adlfs/spec.py", line 1804, in _async_fetch_range stream = await self.container_client.download_blob( File "/opt/venv/lib/python3.9/site-packages/azure/core/tracing/decorator_async.py", line 74, in wrapper_use_tracer return await func(*args, kwargs) File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 1000, in download_blob return await blob_client.download_blob( File "/opt/venv/lib/python3.9/site-packages/azure/core/tracing/decorator_async.py", line 74, in wrapper_use_tracer return await func(args, *kwargs) File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 480, in download_blob await downloader._setup() # pylint: disable=protected-access File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_download_async.py", line 250, in _setup self._response = await self._initial_request() File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_download_async.py", line 336, in _initial_request process_storage_error(error) File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/_shared/response_handlers.py", line 181, in process_storage_error exec("raise error from None") # pylint: disable=exec-used # nosec File "", line 1, in ```

Failed building wheel for aiothttp, multidict, yarl, ...

opened on 2020-12-28 12:34:57 by jeppe742

When installing azure, aws og gcp optional dependencies, the installation fails to build the wheel for aiohttp, multidict, yarl and some others.

This is due to an older version of pip. Update pip using pip install -U pip before installing to fix this.

pip >=20 is also set as a dependency, so running the installation twice, should also fix any issues related to old pip versions.

Installation of cryptography fails for Azure dependencies on windows

opened on 2020-12-28 00:58:48 by jeppe742

From version 3.3 cryptography switched from compiled wheels for each python version, to using the ABI3 format. This, however, requires a very recent version of pip to work. pip==20.0 seemed to do the trick for me

Otherwise, pip will fail to find a compiled wheel and try to compile it from source. This requires you to have OpenSSL installed

Jeppe Johan Waarkjær Olsen

Senior Data Engineer | M.Sc in Data Science

GitHub Repository

delta-tables delta-lake