在本文中,将探讨如何通过Rust的并发特性实现大规模文档的异步分块和嵌入,以及如何与Weaviate向量数据库集成,实现高效的图像嵌入和搜索。这种技术可以显著减少内存使用量并加快处理速度。
在创建嵌入时,尤其是在处理大规模文档时,当前的嵌入框架采用两步过程:分块和嵌入。首先,从所有文件中提取文本,并创建块/节点。然后,这些块被送入具有特定批量大小的嵌入模型以处理嵌入。在这个过程中,块和嵌入保留在系统内存中。当文件和嵌入维度较小时,这不是问题。但当文件数量众多,且使用大型模型,尤其是多向量嵌入时,这就成为了问题。因此,处理这些需要高RAM来处理嵌入。此外,如果这是同步进行的,在创建块时会浪费大量时间,因为分块不是一个计算密集型操作。在创建块的同时,将它们传递给嵌入模型将是高效的。
解决方案是创建一个异步分块和嵌入任务。可以有效地使用Rust的并发模式和线程安全性来生成线程来处理这个任务。这是通过Rust的MPSC(多生产者单消费者)模块完成的,该模块在线程之间传递消息。因此,这创建了一个块流,传递到嵌入线程的缓冲区中。一旦缓冲区完成,它就嵌入块并将嵌入返回到主线程,然后主线程将它们发送到向量数据库。这确保了在单个操作上没有时间浪费,也没有瓶颈。此外,系统只存储缓冲区中的块和嵌入,并在它们移动到向量数据库后从内存中擦除它们。
from embed_anything import EmbedData
from embed_anything.vectordb import Adapter
class WeaviateAdapter(Adapter):
def __init__(self, api_key, url):
super().__init__(api_key)
self.client = weaviate.connect_to_weaviate_cloud(
cluster_url=url, auth_credentials=wvc.init.Auth.api_key(api_key)
)
if self.client.is_ready():
print("Weaviate is ready")
def create_index(self, index_name: str):
self.index_name = index_name
self.collection = self.client.collections.create(
index_name, vectorizer_config=wvc.config.Configure.Vectorizer.none()
)
return self.collection
def convert(self, embeddings: List[EmbedData]):
data = []
for embedding in embeddings:
property = embedding.metadata
property["text"] = embedding.text
data.append(
wvc.data.DataObject(properties=property, vector=embedding.embedding)
)
return data
def upsert(self, embeddings):
data = self.convert(embeddings)
self.client.collections.get(self.index_name).data.insert_many(data)
def delete_index(self, index_name: str):
self.client.collections.delete(index_name)
import embed_anything
model = embed_anything.EmbeddingModel.from_pretrained_cloud(
embed_anything.WhichModel.Clip,
model_id="openai/clip-vit-base-patch16")
data = embed_anything.embed_image_directory(
"\image_directory",
embeder=model,
adapter=weaviate_adapter,
config=embed_anything.ImageEmbedConfig(buffer_size=100),
)
query_vector = embed_anything.embed_query(["image of a cat"], embeder=model)[0].embedding
response = weaviate_adapter.collection.query.near_vector(
near_vector=query_vector,
limit=2,
return_metadata=wvc.query.MetadataQuery(certainty=True),
)