Snowflake数据库是一个基于云计算的数据仓库,支持SQL操作,因其可扩展性、多语言支持、处理大数据能力而广受欢迎。本文将介绍如何使用Python连接到Snowflake数据库,并执行数据的读写操作。
Snowflake数据库与Amazon Web Services、Microsoft Azure和Google Cloud Platform兼容。本文主要关注如何将Snowflake与Python连接,以及在连接过程中遇到的一些错误。
本文将概述如何使用Snowflake连接器、SQLAlchemy引擎和私钥连接Snowflake到Python。如果需要更深入了解,文章末尾提供了Snowflake官方文档的参考链接。
首先,需要安装支持pandas的Python连接器。在Jupyter notebook中使用以下命令安装:
!pip install snowflake-connector-python[pandas]
在客户端环境中连接Python到Snowflake DB时,需要提供以下信息。注意,使用SSO登录时,如果认证器设置为'externalbrowser',则不需要密码。Google Chrome是此操作最兼容的浏览器。
import snowflake.connector
import pandas as pd
import numpy as np
ctx = snowflake.connector.connect(
account = '',
user = '',
schema = '',
warehouse='',
role = '',
authenticator='externalbrowser',
)
cur = ctx.cursor()
现在,可以使用这个游标对象在Snowflake和Python之间进行数据读写操作。
由于已经安装了Python连接器,现在将安装SnowflakeSQLAlchemy包。在Jupyter notebook中使用以下命令安装:
!pip install --upgrade snowflake-sqlalchemy
假设在客户端环境中使用SSO外部浏览器认证,以下查询对于连接非常有用。
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
url = URL(
account = '',
user = '',
database = '',
schema = '',
warehouse= '',
role = '',
authenticator='externalbrowser',
)
engine = create_engine(url)
connection = engine.connect()
现在,可以使用简单的命令从Snowflake DB表中获取数据到Python数据框。
query = '''select * from '''
data = pd.read_sql(query, connection)
同样,也可以将Python数据写入Snowflake表中。这个操作将截断并加载Snowflake表。如果想要增量加载,可以使用'append'代替'replace'。
data.to_sql('', engine, if_exists='replace', index=False, index_label=None)
除了常规信息(例如账户名、用户、数据库等)之外,还需要两个其他重要信息:一个是存储在Snowflake中的私钥的路径,另一个是访问密钥的密码。
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open(path, "rb") as key:
p_key= serialization.load_pem_private_key(key.read(),password=''.encode(),backend=default_backend())
pkb = p_key.private_bytes(encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,encryption_algorithm=serialization.NoEncryption())
ctx = snowflake.connector.connect(user=user,account=account,private_key=pkb,warehouse=warehouse,database=database)
if ctx:
eng = sqlalchemy.create_engine(url,poolclass=sqlalchemy.pool.StaticPool,creator = lambda:ctx)
else:
eng=sqlalchemy.create_engine(url,creator=get_connect)
con = eng.connect()
url = URL(account = '',user = '',database = '',warehouse= '',role = '')
如果在执行上述代码后遇到“Could not deserialize key data”错误消息,那么需要检查提供的RSA密钥所在的路径。有时,如果密钥损坏或由于某些原因无法访问,也可能会出现上述错误。在这种情况下,可以通知Snowflake DBA生成一个新的RSA密钥。
cur = ctx.cursor()
query = '''select * from '''
cur.execute(query)
data = pd.DataFrame.from_records(iter(cur), columns=[x[0] for x in cur.description])
cur.close()