查看原文
其他

DuckDB访问数据湖

alitrack alitrack 2022-10-01

DuckDB 不仅可以方便访问 Pandas DataFrameCSVParquet、PyArrow,甚至可以方便地访问数据湖。

安装需要的包

pip install duckdb deltalake

DeltaTable 表示特定版本的增量表的状态。这包括哪些文件是当前表的一部分、表的结构以及其他元数据,例如创建时间。

加载本地文件系统的数据湖(当前版本)

path = "/tmp/iris_delta"
from deltalake import DeltaTable
dt = DeltaTable(path)

本次使用的测试数据由 PySpark +Delta Lake 生成, 更多相关信息可以访问 Delta Lake 快速入门-PySpark 版

使用 DuckDB 访问

import duckdb
con=duckdb.connect()
def dsql(sql):
    return con.execute(sql).df()

ds = dt.to_pyarrow_dataset()
sql = "select * from ds limit 3"
dsql(sql)


当前版本只有 50 条信息

sql = "select count(*) from ds"
dsql(sql)


时间旅行(Time Travel)

除了可以访问最新版本的数据,还可以通过提供要加载的版本号来加载相应的版本:

dt = DeltaTable(path, version=1)
ds = dt.to_pyarrow_dataset()
sql = "select count(*) from ds"
dsql(sql)


也可以在加载表格后,通过使用版本号或日期时间字符串更改版本:

dt.load_version(1)
#等价于👇的
from datetime import datetime
timestamp =(datetime
            .fromtimestamp(1650358332868 / 1e3)
            .astimezone()
            .isoformat())
dt.load_with_datetime(timestamp)
dt.files()


这里的时间戳可以通过下面的命令获得,

#版本历史信息
dt.history()

返回

[{'timestamp': 1650358316226,
  'operation': 'CREATE TABLE',
  'operationParameters': {'isManaged': 'false',
   'description': None,
   'partitionBy': '[]',
   'properties': '{}'},
  'isolationLevel': 'Serializable',
  'isBlindAppend': True,
  'operationMetrics': {},
  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',
  'txnId': '0b21cbea-5672-4367-84be-5d6c2fd07aef'},
 {'timestamp': 1650358332868,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append', 'partitionBy': '[]'},
  'readVersion': 0,
  'isolationLevel': 'Serializable',
  'isBlindAppend': True,
  'operationMetrics': {'numFiles': '1',
   'numOutputRows': '150',
   'numOutputBytes': '2792'},
  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',
  'txnId': 'aea8a270-4f3d-4ad3-8a63-2d9aebedefa7'},
 {'timestamp': 1650358460951,
  'operation': 'CREATE OR REPLACE TABLE',
  'operationParameters': {'isManaged': 'false',
   'description': None,
   'partitionBy': '["Species"]',
   'properties': '{}'},
  'readVersion': 1,
  'isolationLevel': 'Serializable',
  'isBlindAppend': False,
  'operationMetrics': {},
  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',
  'txnId': '679a731d-9456-4ed0-aa63-3ce8a4ece8bc'},
 {'timestamp': 1650358516037,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append', 'partitionBy': '[]'},
  'readVersion': 2,
  'isolationLevel': 'Serializable',
  'isBlindAppend': True,
  'operationMetrics': {'numFiles': '3',
   'numOutputRows': '150',
   'numOutputBytes': '5612'},
  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',
  'txnId': '443c8613-6dd7-4442-9111-adfd8dca7a67'},
 {'timestamp': 1650358641212,
  'operation': 'DELETE',
  'operationParameters': {'predicate': '[]'},
  'readVersion': 3,
  'isolationLevel': 'Serializable',
  'isBlindAppend': False,
  'operationMetrics': {'numRemovedFiles': '3',
   'executionTimeMs': '194',
   'scanTimeMs': '192',
   'rewriteTimeMs': '0'},
  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',
  'txnId': 'c6f4373f-3f7b-41ca-8fa1-11501dbd2715'},
 {'timestamp': 1650358662597,
  'operation': 'WRITE',
  'operationParameters': {'mode': 'Append', 'partitionBy': '[]'},
  'readVersion': 4,
  'isolationLevel': 'Serializable',
  'isBlindAppend': True,
  'operationMetrics': {'numFiles': '3',
   'numOutputRows': '50',
   'numOutputBytes': '5031'},
  'engineInfo': 'Apache-Spark/3.2.1 Delta-Lake/1.2.0',
  'txnId': 'cee79e6b-63bc-48a1-9818-ccb37e17d811'}]

多种文件系统支持

除了本地文件系统,可以通过 storage_options 来配置存储后端,如 AWS S3,

storage_options = {"AWS_ACCESS_KEY_ID""AWS_ACCESS_KEY_ID",
     "AWS_SECRET_ACCESS_KEY":"AWS_SECRET_ACCESS_KEY"}
dt = DeltaTable("../rust/tests/data/delta-0.2.0",
  storage_options=storage_options)

或者,如果您有一个数据目录,您可以通过引用数据库和表名来加载它。目前仅支持 AWS Glue。

对于 AWS Glue 目录,使用 AWS 环境变量进行身份验证。

from deltalake import DeltaTable
from deltalake import DataCatalog
database_name = "simple_database"
table_name = "simple_table"
data_catalog = DataCatalog.AWS
dt = DeltaTable.from_data_catalog(data_catalog=data_catalog,
    database_name=database_name,
    table_name=table_name)
dt.to_pyarrow_table().to_pydict()
{'id': [57956789]}

除了本地文件系统,还支持以下后端:

  • AWS S3,由前缀 检测 s3://。可以使用与 CLI 相同的方式使用环境变量指定 AWS 凭证。
  • Azure Data Lake Storage Gen 2,由前缀 检测 adls2://。请注意,  必须按照说明设置 Azure 存储帐户[1]
  • Google Cloud Storage,由前缀 检测 gs://


更多的访问方式

除了使用 SQL 的方式访问,可以可以这样的方式

import duckdb
from deltalake import DeltaTable
dt = DeltaTable(path)
ds = dt.to_pyarrow_dataset()
ex_data = duckdb.arrow(ds)
(ex_data
 .filter("Species = 'virginica' and Sepal_Length > 7")
 .project("Sepal_Length")
 .to_df())
#返回
 Sepal_Length
0 7.2
1 7.1
2 7.2

从上面的代码可以看出,DuckDB 其实是借助于强大的 DeltaTable 和 PyArrow 来实现对数据湖的访问。


参考资料

[1]

设置 Azure 存储帐户: https://github.com/delta-io/delta-rs/blob/main/docs/ADLSGen2-HOWTO.md


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存