在本系列文章中,将介绍如何设置Google Kubernetes Engine (GKE) 集群来部署Docker容器。文章假设读者已经熟悉深度学习、DevOps、Jenkins和Kubernetes的基础知识。在本系列的前一篇文章中,构建了四个自动化的Jenkins工作流。在本文——本系列的最后一篇文章——中,将开发一个半自动化的生产部署,以实现CI/CDMLOps管道。之所以是半自动化,是因为通常作为产品所有者,可能希望在部署到生产环境之前检查单元测试结果,以避免服务失败。生产环境的部署可以手动完成,但为了实现Google MLOps成熟度模型的目标,自动化是必需的。
下面的图表显示了在项目架构中的位置。生产环境的部署包括:
一直在使用Jenkins和Kubernetes构建CI/CD解决方案。接下来的脚本将展示如何使用Python与Jenkins和Kubernetes交互,以自动化部署到生产环境的任务。Python脚本将在本地运行。
让深入代码。首先,导入所需的库并定义变量:
from kubernetes import client, config
from google.cloud import storage
import jenkins
import time
import os
bucket_name = 'automatictrainingcicd-aiplatform'
model_name = 'best_model.hdf5'
接下来,声明一个函数来清理集群中已完成的作业:
def clean_jobs():
config.load_kube_config()
api_instance = client.BatchV1Api()
print("Listing jobs:")
api_response = api_instance.list_job_for_all_namespaces()
jobs = []
print('job-name job-namespace active succeeded failed start-time completion-time')
for i in api_response.items:
jobs.append([i.metadata.name, i.metadata.namespace])
print("%s %s %s %s %s %s %s" % (i.metadata.name, i.metadata.namespace, i.status.active, i.status.succeeded, i.status.failed, i.status.start_time, i.status.completion_time))
print('Deleting jobs...')
if len(jobs) > 0:
for i in range(len(jobs)):
api_instance.delete_namespaced_job(jobs[i][0], jobs[i][1])
print("Jobs deleted.")
else:
print("No jobs found.")
return
以下是将模型从GCS测试注册表复制到生产注册表的函数:
def model_to_production():
storage_client = storage.Client.from_service_account_json('AutomaticTrainingCICD-68f56bfa992e.json')
bucket = storage_client.bucket(bucket_name)
status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing', model_name)).exists(storage_client)
if status == True:
print('Copying model...')
source_blob = bucket.blob('{}/{}'.format('testing', model_name))
destination_blob_name = '{}/{}'.format('production', model_name)
blob_copy = bucket.copy_blob(source_blob, bucket, destination_blob_name)
print('Model from testing registry has been copied to production registry.')
else:
print('No model found at testing registry.')
return
接下来,如果预测服务处于活动状态,以下函数将负责Pod的关闭;否则,将触发AutomaticTraining-PredictionAPI Jenkins工作流:
def check_services():
api_instance = client.CoreV1Api()
api_response = api_instance.list_service_for_all_namespaces()
print('Listing services:')
print('service-namespace service-name')
services = []
for i in api_response.items:
print("%s %s" % (i.metadata.namespace, i.metadata.name))
services.append(i.metadata.name)
if True in (t.startswith('gke-api') for t in services):
print('gke-api service is active. Proceeding to systematically shutdown its pods...')
shutdown_pods()
else:
jenkins_build()
return
如果预测服务处于活动状态,以下函数负责Pod的关闭:
def shutdown_pods():
config.load_kube_config()
api_instance = client.CoreV1Api()
print("Listing pods:")
api_response = api_instance.list_pod_for_all_namespaces(watch=False)
pods = []
print('pod-ip-address pod-namespace pod-name')
for i in api_response.items:
print("%s %s %s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
pods.append([i.metadata.name, i.metadata.namespace])
print('Shutting down pods...')
print('Deleting only gke-api pods...')
if len(pods) > 0:
for i in range(len(pods)):
if pods[i][0].startswith('gke-api') == True:
api_instance.delete_namespaced_pod(pods[i][0], pods[i][1])
print("Pod '{}' shut down.".format(pods[i][0]))
time.sleep(120)
print("All pods have been shut down.")
else:
print("No pods found.")
return
如果预测服务不处于活动状态,以下函数将被触发。它负责部署预测服务:
def jenkins_build():
print('gke-api service is not active. Proceeding to build AutomaticTraining-PredictionAPI job at Jenkins.')
server = jenkins.Jenkins('http://localhost:8080', username='your_username', password='your_password')
server.build_job('AutomaticTraining-PredictionAPI')
print('AutomaticTraining-PredictionAPI job has been triggered, check Jenkins logs for more information.')
return
最后,是主函数;它按要求的顺序执行整个脚本:
def main():
clean_jobs()
model_to_production()
check_services()
if __name__ == '__main__':
main()
一旦运行了开发的Python脚本文件,应该得到以下响应:
要再次检查是否获得了新的Pod,请在脚本执行前后运行kubectl get pods。应该看到不同的Pod标识符:
要查看最终产品的样子(包括界面“奖励”),请查看这里。服务界面的公共IP地址是访问服务的地方: