使用Python连接Snowflake数据库

Snowflake数据库是一个基于云计算的数据仓库,支持SQL操作,因其可扩展性、多语言支持、处理大数据能力而广受欢迎。本文将介绍如何使用Python连接到Snowflake数据库,并执行数据的读写操作。

Snowflake数据库简介

Snowflake数据库与Amazon Web Services、Microsoft Azure和Google Cloud Platform兼容。本文主要关注如何将Snowflake与Python连接,以及在连接过程中遇到的一些错误。

连接Python到Snowflake数据库的不同方式

本文将概述如何使用Snowflake连接器、SQLAlchemy引擎和私钥连接Snowflake到Python。如果需要更深入了解,文章末尾提供了Snowflake官方文档的参考链接。

首先,需要安装支持pandas的Python连接器。在Jupyter notebook中使用以下命令安装:

!pip install snowflake-connector-python[pandas]

在客户端环境中连接Python到Snowflake DB时,需要提供以下信息。注意,使用SSO登录时,如果认证器设置为'externalbrowser',则不需要密码。Google Chrome是此操作最兼容的浏览器。

import snowflake.connector import pandas as pd import numpy as np ctx = snowflake.connector.connect( account = '', user = '', schema = '', warehouse='', role = '', authenticator='externalbrowser', ) cur = ctx.cursor()

现在,可以使用这个游标对象在Snowflake和Python之间进行数据读写操作。

由于已经安装了Python连接器,现在将安装SnowflakeSQLAlchemy包。在Jupyter notebook中使用以下命令安装:

!pip install --upgrade snowflake-sqlalchemy

假设在客户端环境中使用SSO外部浏览器认证,以下查询对于连接非常有用。

import pandas as pd import numpy as np from sqlalchemy import create_engine from snowflake.sqlalchemy import URL url = URL( account = '', user = '', database = '', schema = '', warehouse= '', role = '', authenticator='externalbrowser', ) engine = create_engine(url) connection = engine.connect()

现在,可以使用简单的命令从Snowflake DB表中获取数据到Python数据框。

query = '''select * from ''' data = pd.read_sql(query, connection)

同样,也可以将Python数据写入Snowflake表中。这个操作将截断并加载Snowflake表。如果想要增量加载,可以使用'append'代替'replace'。

data.to_sql('', engine, if_exists='replace', index=False, index_label=None)

除了常规信息(例如账户名、用户、数据库等)之外,还需要两个其他重要信息:一个是存储在Snowflake中的私钥的路径,另一个是访问密钥的密码。

from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open(path, "rb") as key: p_key= serialization.load_pem_private_key(key.read(),password=''.encode(),backend=default_backend()) pkb = p_key.private_bytes(encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8,encryption_algorithm=serialization.NoEncryption()) ctx = snowflake.connector.connect(user=user,account=account,private_key=pkb,warehouse=warehouse,database=database) if ctx: eng = sqlalchemy.create_engine(url,poolclass=sqlalchemy.pool.StaticPool,creator = lambda:ctx) else: eng=sqlalchemy.create_engine(url,creator=get_connect) con = eng.connect() url = URL(account = '',user = '',database = '',warehouse= '',role = '')

如果在执行上述代码后遇到“Could not deserialize key data”错误消息,那么需要检查提供的RSA密钥所在的路径。有时,如果密钥损坏或由于某些原因无法访问,也可能会出现上述错误。在这种情况下,可以通知Snowflake DBA生成一个新的RSA密钥。

cur = ctx.cursor() query = '''select * from ''' cur.execute(query) data = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description]) cur.close()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485