使用Docker容器在CI/CD MLOps管道中的部署

在本系列文章中,将介绍如何设置Google Kubernetes Engine (GKE) 集群来部署Docker容器。文章假设读者已经熟悉深度学习、DevOps、Jenkins和Kubernetes的基础知识。在本系列的前一篇文章中,构建了四个自动化的Jenkins工作流。在本文——本系列的最后一篇文章——中,将开发一个半自动化的生产部署,以实现CI/CDMLOps管道。之所以是半自动化,是因为通常作为产品所有者,可能希望在部署到生产环境之前检查单元测试结果,以避免服务失败。生产环境的部署可以手动完成,但为了实现Google MLOps成熟度模型的目标,自动化是必需的。

下面的图表显示了在项目架构中的位置。生产环境的部署包括:

  • 单元测试结束后,将模型文件从GCS测试注册表复制到生产注册表
  • 清理已完成的Kubernetes作业
  • 如果相应的工作流已经执行,那么启动预测服务Pod的系统性关闭,这将迫使Kubernetes启动新的Pod,加载新模型,实现零服务停机时间

开发Python脚本

一直在使用JenkinsKubernetes构建CI/CD解决方案。接下来的脚本将展示如何使用Python与JenkinsKubernetes交互,以自动化部署到生产环境的任务。Python脚本将在本地运行。

让深入代码。首先,导入所需的库并定义变量:

from kubernetes import client, config from google.cloud import storage import jenkins import time import os bucket_name = 'automatictrainingcicd-aiplatform' model_name = 'best_model.hdf5'

接下来,声明一个函数来清理集群中已完成的作业:

def clean_jobs(): config.load_kube_config() api_instance = client.BatchV1Api() print("Listing jobs:") api_response = api_instance.list_job_for_all_namespaces() jobs = [] print('job-name job-namespace active succeeded failed start-time completion-time') for i in api_response.items: jobs.append([i.metadata.name, i.metadata.namespace]) print("%s %s %s %s %s %s %s" % (i.metadata.name, i.metadata.namespace, i.status.active, i.status.succeeded, i.status.failed, i.status.start_time, i.status.completion_time)) print('Deleting jobs...') if len(jobs) > 0: for i in range(len(jobs)): api_instance.delete_namespaced_job(jobs[i][0], jobs[i][1]) print("Jobs deleted.") else: print("No jobs found.") return

以下是将模型从GCS测试注册表复制到生产注册表的函数:

def model_to_production(): storage_client = storage.Client.from_service_account_json('AutomaticTrainingCICD-68f56bfa992e.json') bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing', model_name)).exists(storage_client) if status == True: print('Copying model...') source_blob = bucket.blob('{}/{}'.format('testing', model_name)) destination_blob_name = '{}/{}'.format('production', model_name) blob_copy = bucket.copy_blob(source_blob, bucket, destination_blob_name) print('Model from testing registry has been copied to production registry.') else: print('No model found at testing registry.') return

接下来,如果预测服务处于活动状态,以下函数将负责Pod的关闭;否则,将触发AutomaticTraining-PredictionAPI Jenkins工作流:

def check_services(): api_instance = client.CoreV1Api() api_response = api_instance.list_service_for_all_namespaces() print('Listing services:') print('service-namespace service-name') services = [] for i in api_response.items: print("%s %s" % (i.metadata.namespace, i.metadata.name)) services.append(i.metadata.name) if True in (t.startswith('gke-api') for t in services): print('gke-api service is active. Proceeding to systematically shutdown its pods...') shutdown_pods() else: jenkins_build() return

如果预测服务处于活动状态,以下函数负责Pod的关闭:

def shutdown_pods(): config.load_kube_config() api_instance = client.CoreV1Api() print("Listing pods:") api_response = api_instance.list_pod_for_all_namespaces(watch=False) pods = [] print('pod-ip-address pod-namespace pod-name') for i in api_response.items: print("%s %s %s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name)) pods.append([i.metadata.name, i.metadata.namespace]) print('Shutting down pods...') print('Deleting only gke-api pods...') if len(pods) > 0: for i in range(len(pods)): if pods[i][0].startswith('gke-api') == True: api_instance.delete_namespaced_pod(pods[i][0], pods[i][1]) print("Pod '{}' shut down.".format(pods[i][0])) time.sleep(120) print("All pods have been shut down.") else: print("No pods found.") return

如果预测服务不处于活动状态,以下函数将被触发。它负责部署预测服务:

def jenkins_build(): print('gke-api service is not active. Proceeding to build AutomaticTraining-PredictionAPI job at Jenkins.') server = jenkins.Jenkins('http://localhost:8080', username='your_username', password='your_password') server.build_job('AutomaticTraining-PredictionAPI') print('AutomaticTraining-PredictionAPI job has been triggered, check Jenkins logs for more information.') return

最后,是主函数;它按要求的顺序执行整个脚本:

def main(): clean_jobs() model_to_production() check_services() if __name__ == '__main__': main()

运行脚本

一旦运行了开发的Python脚本文件,应该得到以下响应:

  • 所有旧的、已完成的作业将被删除
  • 模型被复制到生产注册表
  • Pods成功终止

要再次检查是否获得了新的Pod,请在脚本执行前后运行kubectl get pods。应该看到不同的Pod标识符:

要查看最终产品的样子(包括界面“奖励”),请查看这里。服务界面的公共IP地址是访问服务的地方:

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485