构建AI模型的CI/CD管道

在本文中,将探讨如何构建一个符合GoogleMLOps成熟度模型第二级别的功能管道。将从模型的持续集成和训练开始,进一步测试训练好的模型,确保其在生产环境中的表现。假设读者已经对Python、深度学习、Docker、DevOps和Flask有一定的了解。在本系列的前几篇文章中,已经解释了如何持续集成模型变更以及在收集到新数据时如何持续训练模型。本文将介绍如何在模拟生产环境的测试环境中测试训练好的模型。将从测试模型注册表中加载模型,通过模型API的副本公开它,并对其进行测试。欢迎在这个阶段添加自己的测试。下面的图表显示了在项目过程中的位置。

代码文件结构

首先,需要从其仓库中获取原始代码。以下是代码文件结构的简要概述:

data_utils.py文件包含了检查测试模型注册表中是否存在模型的函数,如果存在,则加载该模型:

import datetime from google.cloud import storage import pandas as pd import numpy as np import os import cv2 import sys def previous_model(bucket_name, model_filename): try: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket=bucket, name='{}/{}/'.format('testing', model_filename)).exists(storage_client) return status, None except Exception as e: print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: ' + e, flush=True) return None, e def load_model(bucket_name, model_filename): try: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob1 = bucket.blob('{}/{}/'.format('testing', model_filename)) blob1.download_to_filename('/root/' + str(model_filename)) return True, None except Exception as e: print('Something went wrong when load previous model from GCS bucket. Exception: ' + e, flush=True) return False, e

email_notifications.py文件处理发送给产品所有者的成功或问题代码执行通知:

import smtplib import os sender = 'example@gmail.com' receiver = ['svirahonda@gmail.com'] smtp_provider = 'smtp.gmail.com' smtp_port = 587 smtp_account = 'example@gmail.com' smtp_password = 'your_password' def send_update(message): message = 'Subject: {}\n\n{}'.format('An automatic unit testing has ended recently.', message) try: server = smtplib.SMTP(smtp_provider, smtp_port) server.starttls() server.login(smtp_account, smtp_password) server.sendmail(sender, receiver, message) except Exception as e: print('Something went wrong. Unable to send email.', flush=True) print('Exception: ', e) def exception(e_message): try: message = 'Subject: {}\n\n{}'.format('Something went wrong with the testing API.', e_message) server = smtplib.SMTP(smtp_provider, smtp_port) server.starttls() server.login(smtp_account, smtp_password) server.sendmail(sender, receiver, message) except Exception as e: print('Something went wrong. Unable to send email.', flush=True) print('Exception: ', e)

task.py文件处理容器执行。它协调Flask应用程序的初始化和结束,模型加载,模型测试和电子邮件通知:

import tensorflow as tf from tensorflow.keras.models import load_model import jsonpickle import data_utils, email_notifications import sys import os from google.cloud import storage import datetime import numpy as np import jsonpickle import cv2 from flask import Flask, Response, request, jsonify import threading import requests import time app = Flask(__name__) model_name = 'best_model.hdf5' bucket_name = 'automatictrainingcicd-aiplatform' class_names = ['Normal', 'Viral Pneumonia', 'COVID-19'] headers = {'content-type': 'image/png'} api = 'http://127.0.0.1:5000/' @app.before_first_request def before_first_request(): def initialize_job(): if len(tf.config.experimental.list_physical_devices('GPU')) > 0: tf.config.set_soft_device_placement(True) tf.debugging.set_log_device_placement(True) global model model_gcs = data_utils.previous_model(bucket_name, model_name) if model_gcs[0] == True: model_gcs = data_utils.load_model(bucket_name, model_name) if model_gcs[0] == True: try: model = load_model(model_name) except Exception as e: email_notifications.exception('Something went wrong trying to test old /testing model. Exception: ' + str(e)) sys.exit(1) else: email_notifications.exception('Something went wrong when trying to load old /testing model. Exception: ' + str(model_gcs[1])) sys.exit(1) if model_gcs[0] == False: email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.') sys.exit(1) if model_gcs[0] == None: email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: ' + model_gcs[1] + '. Aborting automatic testing.') sys.exit(1) api_test() thread = threading.Thread(target=initialize_job) thread.start() @app.route('/init', methods=['GET', 'POST']) def init(): message = {'message': 'API initialized.'} response = jsonpickle.encode(message) return Response(response=response, status=200, mimetype="application/json") @app.route('/', methods=['POST']) def index(): if request.method == 'POST': try: image = np.fromstring(request.data, np.uint8) image = image.reshape((128, 128, 3)) image = [image] image = np.array(image) image = image.astype(np.float16) result = model.predict(image) result = np.argmax(result) message = {'message': '{}'.format(str(result))} json_response = jsonify(message) return json_response except Exception as e: message = {'message': 'Error: ' + str(e)} json_response = jsonify(message) email_notifications.exception('Something went wrong when trying to make prediction via testing API. Exception: ' + str(e) + '. Aborting automatic testing.') return json_response else: message = {'message': 'Error. Please use this API in a proper manner.'} json_response = jsonify(message) return json_response def self_initialize(): def initialization(): global started started = False while started == False: try: server_response = requests.get('http://127.0.0.1:5000/init') if server_response.status_code == 200: started = True except: pass time.sleep(3) thread = threading.Thread(target=initialization) thread.start() def api_test(): try: image = cv2.imread('TEST_IMAGE.jpg') image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image = cv2.resize(image, (128, 128)) result = requests.post(api, data=image.tostring(), headers=headers) result = result.json() prediction = int(result['message']) if prediction == 1: email_notifications.send_update('Testing stage has ended successfully. Shutting down container. Check the GCP logs for more information.') sys.exit(0) else: email_notifications.send_update('Testing stage has crashed. Check the GCP logs for more information.') sys.exit(1) except Exception as e: email_notifications.exception('Testing stage crashed with an exception: ' + str(e) + '. Check the GCP logs for more information.') sys.exit(1) if __name__ == '__main__': self_initialize() app.run(host='0.0.0.0', debug=True, threaded=True)

Dockerfile提供了容器构建的规则:

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python Flask jsonpickle RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git RUN mv /root/AutomaticTraining-UnitTesting/data_utils.py /root RUN mv /root/AutomaticTraining-UnitTesting/task.py /root RUN mv /root/AutomaticTraining-UnitTesting/email_notifications.py /root RUN mv /root/AutomaticTraining-UnitTesting/TEST_IMAGE.jpg /root EXPOSE 5000 ENTRYPOINT ["python", "task.py"]
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485