在本文中,将探讨如何构建一个符合GoogleMLOps成熟度模型第二级别的功能管道。将从模型的持续集成和训练开始,进一步测试训练好的模型,确保其在生产环境中的表现。假设读者已经对Python、深度学习、Docker、DevOps和Flask有一定的了解。在本系列的前几篇文章中,已经解释了如何持续集成模型变更以及在收集到新数据时如何持续训练模型。本文将介绍如何在模拟生产环境的测试环境中测试训练好的模型。将从测试模型注册表中加载模型,通过模型API的副本公开它,并对其进行测试。欢迎在这个阶段添加自己的测试。下面的图表显示了在项目过程中的位置。
首先,需要从其仓库中获取原始代码。以下是代码文件结构的简要概述:
data_utils.py文件包含了检查测试模型注册表中是否存在模型的函数,如果存在,则加载该模型:
import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
import os
import cv2
import sys
def previous_model(bucket_name, model_filename):
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
status = storage.Blob(bucket=bucket, name='{}/{}/'.format('testing', model_filename)).exists(storage_client)
return status, None
except Exception as e:
print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: ' + e, flush=True)
return None, e
def load_model(bucket_name, model_filename):
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob1 = bucket.blob('{}/{}/'.format('testing', model_filename))
blob1.download_to_filename('/root/' + str(model_filename))
return True, None
except Exception as e:
print('Something went wrong when load previous model from GCS bucket. Exception: ' + e, flush=True)
return False, e
email_notifications.py文件处理发送给产品所有者的成功或问题代码执行通知:
import smtplib
import os
sender = 'example@gmail.com'
receiver = ['svirahonda@gmail.com']
smtp_provider = 'smtp.gmail.com'
smtp_port = 587
smtp_account = 'example@gmail.com'
smtp_password = 'your_password'
def send_update(message):
message = 'Subject: {}\n\n{}'.format('An automatic unit testing has ended recently.', message)
try:
server = smtplib.SMTP(smtp_provider, smtp_port)
server.starttls()
server.login(smtp_account, smtp_password)
server.sendmail(sender, receiver, message)
except Exception as e:
print('Something went wrong. Unable to send email.', flush=True)
print('Exception: ', e)
def exception(e_message):
try:
message = 'Subject: {}\n\n{}'.format('Something went wrong with the testing API.', e_message)
server = smtplib.SMTP(smtp_provider, smtp_port)
server.starttls()
server.login(smtp_account, smtp_password)
server.sendmail(sender, receiver, message)
except Exception as e:
print('Something went wrong. Unable to send email.', flush=True)
print('Exception: ', e)
task.py文件处理容器执行。它协调Flask应用程序的初始化和结束,模型加载,模型测试和电子邮件通知:
import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import Flask, Response, request, jsonify
import threading
import requests
import time
app = Flask(__name__)
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
class_names = ['Normal', 'Viral Pneumonia', 'COVID-19']
headers = {'content-type': 'image/png'}
api = 'http://127.0.0.1:5000/'
@app.before_first_request
def before_first_request():
def initialize_job():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
global model
model_gcs = data_utils.previous_model(bucket_name, model_name)
if model_gcs[0] == True:
model_gcs = data_utils.load_model(bucket_name, model_name)
if model_gcs[0] == True:
try:
model = load_model(model_name)
except Exception as e:
email_notifications.exception('Something went wrong trying to test old /testing model. Exception: ' + str(e))
sys.exit(1)
else:
email_notifications.exception('Something went wrong when trying to load old /testing model. Exception: ' + str(model_gcs[1]))
sys.exit(1)
if model_gcs[0] == False:
email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
sys.exit(1)
if model_gcs[0] == None:
email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: ' + model_gcs[1] + '. Aborting automatic testing.')
sys.exit(1)
api_test()
thread = threading.Thread(target=initialize_job)
thread.start()
@app.route('/init', methods=['GET', 'POST'])
def init():
message = {'message': 'API initialized.'}
response = jsonpickle.encode(message)
return Response(response=response, status=200, mimetype="application/json")
@app.route('/', methods=['POST'])
def index():
if request.method == 'POST':
try:
image = np.fromstring(request.data, np.uint8)
image = image.reshape((128, 128, 3))
image = [image]
image = np.array(image)
image = image.astype(np.float16)
result = model.predict(image)
result = np.argmax(result)
message = {'message': '{}'.format(str(result))}
json_response = jsonify(message)
return json_response
except Exception as e:
message = {'message': 'Error: ' + str(e)}
json_response = jsonify(message)
email_notifications.exception('Something went wrong when trying to make prediction via testing API. Exception: ' + str(e) + '. Aborting automatic testing.')
return json_response
else:
message = {'message': 'Error. Please use this API in a proper manner.'}
json_response = jsonify(message)
return json_response
def self_initialize():
def initialization():
global started
started = False
while started == False:
try:
server_response = requests.get('http://127.0.0.1:5000/init')
if server_response.status_code == 200:
started = True
except:
pass
time.sleep(3)
thread = threading.Thread(target=initialization)
thread.start()
def api_test():
try:
image = cv2.imread('TEST_IMAGE.jpg')
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (128, 128))
result = requests.post(api, data=image.tostring(), headers=headers)
result = result.json()
prediction = int(result['message'])
if prediction == 1:
email_notifications.send_update('Testing stage has ended successfully. Shutting down container. Check the GCP logs for more information.')
sys.exit(0)
else:
email_notifications.send_update('Testing stage has crashed. Check the GCP logs for more information.')
sys.exit(1)
except Exception as e:
email_notifications.exception('Testing stage crashed with an exception: ' + str(e) + '. Check the GCP logs for more information.')
sys.exit(1)
if __name__ == '__main__':
self_initialize()
app.run(host='0.0.0.0', debug=True, threaded=True)
Dockerfile提供了容器构建的规则:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0
WORKDIR /root
RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python Flask jsonpickle
RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev
ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git
RUN mv /root/AutomaticTraining-UnitTesting/data_utils.py /root
RUN mv /root/AutomaticTraining-UnitTesting/task.py /root
RUN mv /root/AutomaticTraining-UnitTesting/email_notifications.py /root
RUN mv /root/AutomaticTraining-UnitTesting/TEST_IMAGE.jpg /root
EXPOSE 5000
ENTRYPOINT ["python", "task.py"]