在机器学习和深度学习领域,构建灵活且复杂的模型是一个常见的需求。传统的顺序模型(Sequential model)虽然适用于大多数问题,但在处理具有非线性拓扑结构、共享层或多输入输出的模型时,其灵活性受限。为了解决这一问题,Keras提供了函数式API,允许以更灵活的方式定义模型。函数式API能够处理非线性拓扑结构的模型,共享层,以及多输入输出的情况。
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, MaxPooling2D, Conv2D, GlobalAveragePooling2D, Input, concatenate, Dropout
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import os
# 读取房屋信息
with open('HousesInfo.txt', 'r') as df:
df = df.readlines()
# 清洗数据
for index, line in enumerate(df):
df[index] = line.strip()
# 转换为浮点数
for i in range(len(df)):
df[i] = df[i].split()
for j in range(5):
df[i][j] = float(df[i][j])
# 转换为DataFrame
df = pd.DataFrame(df, columns=['Bedrooms', 'Bathrooms', 'Area', 'Zipcode', 'Price'])
# 定义输出和输入
y = df['Price']
X = df.drop('Price', axis=1)
# 标准化特征
scaler = StandardScaler()
X = np.array(X.join(pd.DataFrame(scaler.fit_transform(X[['Area', 'Zipcode']]))).drop(['Area', 'Zipcode'], axis=1))
y = scaler.fit_transform(np.array(y).reshape(-1, 1)).reshape(-1)
# 分割训练和测试数据
data_train, data_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=42)
input_data = Input(shape=4)
# 创建特征数据集模型,不进行训练,稍后将与图像数据合并
model1 = Dense(4, activation='relu', kernel_initializer='uniform')(input_data)
model1 = Dense(12, activation='relu')(model1)
DATADIR = r"C:UsersKIITDownloadsHouse Images Dataset"
rooms = ['bathroom', 'bedroom', 'frontal', 'kitchen']
CATEGORIES = [([(str(j)+'_'+i) for i in rooms]) for j in range(1, 536)]
data = []
rooms = []
for category in CATEGORIES:
for i in range(len(category)):
path = DATADIR
im_array = cv2.imread(os.path.join(path, category[i]+'.jpg'))
img_array = cv2.cvtColor(im_array, cv2.COLOR_BGR2RGB)
new_array = cv2.resize(img_array, (100, 100))
rooms.append(new_array)
data.append(np.array(rooms))
rooms.clear()
# 转换为数组并归一化
X = np.array(data)
X = X/255
bathroom = []
bedroom = []
frontal = []
kitchen = []
for a in range(X.shape[0]):
bathroom.append(X[a][0])
for b in range(X.shape[0]):
bedroom.append(X[b][1])
for c in range(X.shape[0]):
frontal.append(X[c][2])
for d in range(X.shape[0]):
kitchen.append(X[d][3])
# 从图像数据中获取不同类别
bathroom_train, bathroom_test, y_train, y_test = train_test_split(np.array(bathroom), y, test_size=0.15, random_state=42)
bedroom_train, bedroom_test, y_train, y_test = train_test_split(np.array(bedroom), y, test_size=0.15, random_state=42)
frontal_train, frontal_test, y_train, y_test = train_test_split(np.array(frontal), y, test_size=0.15, random_state=42)
kitchen_train, kitchen_test, y_train, y_test = train_test_split(np.array(kitchen), y, test_size=0.15, random_state=42)
# 定义每个类别的输入
Input_bath = Input(shape=(100,100,3))
Input_bed = Input(shape=(100,100,3))
Input_front = Input(shape=(100,100,3))
Input_kitchen = Input(shape=(100,100,3))
# 连接每个类别的层,并最终与属性合并
bath = Conv2D(filters=32, kernel_size=(3,3), padding='Same', activation='relu')(Input_bath)
bath = MaxPooling2D(pool_size=(2,2))(bath)
bath = Conv2D(filters=16, kernel_size=(3,3), padding='Same', activation='relu')(bath)
bath = MaxPooling2D(pool_size=(2,2))(bath)
bath_final = GlobalAveragePooling2D()(bath)
bed = Conv2D(filters=32, kernel_size=(3,3), padding='Same', activation='relu')(Input_bed)
bed = MaxPooling2D(pool_size=(2,2))(bed)
bed = Conv2D(filters=16, kernel_size=(3,3), padding='Same', activation='relu')(bed)
bed = MaxPooling2D(pool_size=(2,2))(bed)
bed_final = GlobalAveragePooling2D()(bed)
front = Conv2D(filters=32, kernel_size=(3,3), padding='Same', activation='relu')(Input_front)
front = MaxPooling2D(pool_size=(2,2))(front)
front = Conv2D(filters=16, kernel_size=(3,3), padding='Same', activation='relu')(front)
front = MaxPooling2D(pool_size=(2,2))(front)
front_final = GlobalAveragePooling2D()(front)
kitchen = Conv2D(filters=32, kernel_size=(3,3), padding='Same', activation='relu')(Input_kitchen)
kitchen = MaxPooling2D(pool_size=(2,2))(kitchen)
kitchen = Conv2D(filters=16, kernel_size=(3,3), padding='Same', activation='relu')(kitchen)
kitchen = MaxPooling2D(pool_size=(2,2))(kitchen)
kitchen_final = GlobalAveragePooling2D()(kitchen)
# 合并所有输入
combined = concatenate([bath_final, bed_final, front_final, kitchen_final])
# 添加正则化和创建图像类别的最终层
dropout = Dropout(0.5)(combined)
hidden1 = Dense(64, activation='relu')(dropout)
hidden2 = Dense(32, activation='relu')(hidden1)
# 合并图像输出和房屋属性
combined2 = concatenate([model1, hidden2])
output = Dense(1, activation='linear')(combined2)
# 创建模型
model = Model(inputs=([Input_bath, Input_bed, Input_front, Input_kitchen, input_data]), outputs=output)
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
callbacks = [
EarlyStopping(patience=60, restore_best_weights=False),
ReduceLROnPlateau(patience=35, factor=0.1)
]
model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
model.fit([bathroom_train, bedroom_train, frontal_train, kitchen_train, data_train], y_train, epochs=200,
callbacks=callbacks, batch_size=96, validation_split=0.15)