在处理大量数据时,如何高效地将数据导入MySQL服务器是一个常见的问题。本文将介绍一种通过构建插入查询并分批执行的方法,以提高数据同步的效率。这种方法特别适用于需要从不同ERP系统和电子商务平台同步数据的场景。
为了优化和改进当前的集成软件核心,开发了这段代码。该软件负责从不同的ERP系统和电子商务平台集成和同步数据,通常涉及从MS SQL Server和Oracle数据库移动数据到MySQL服务器数据库。随着软件不断根据新的要求或新的ERP系统的不同结构进行参数化,数据访问和目标需要新的实体,除了现有的实体之外,因此需要一个例程来轻松适应这些新的列、表或系统,并批量将这些数据加载到中间暂存区域,以便与电子商务平台同步。
首先,需要使用数据填充DataTables,并将它们添加到DataSet中。假设DataTables已经被填充。GetDataSet()函数允许添加所有想要加载到MySQL的DataTables,并以DataSet的形式返回它们。每个DataTable将对应于源数据库数据集的整个表定义或列的子集(由填充数据集的查询定义)。
public DataSet GetDataSet()
{
DataSet ds = new DataSet();
DataTable dtProducts = new DataTable();
DataTable dtCustomers = new DataTable();
dtProducts.TableName = "Products";
dtCustomers.TableName = "Customers";
ds.Tables.Add(dtProducts);
ds.Tables.Add(dtCustomers);
return ds;
}
BuildInsertQueriesByBatch()是本文的主要函数。它负责构建并连接每个DataTable中的行。第一个参数是要加载到MySQL中的DataTable,batch整数参数是想要每个批次加载的行数,这也是这个函数的关键,因此根据其值,过程将获得更好的性能。如果DataSet有2000行,batch变量的值是200...那么这个过程将收集这200行,并用一条指令插入它们,将2000行插入10条指令中,大大减少了总时间。
public string BuildInsertQueriesByBatch(DataTable dt, int batch)
{
string sReturnSql = "";
string sTableName = "";
int nPosition = 0;
int nRows = 0;
sTableName = dt.TableName;
sReturnSql += "TRUNCATE TABLE " + sTableName + ";";
foreach (DataRow dr in dt.Rows)
{
sReturnSql += "INSERT INTO " + sTableName + "(";
nPosition = 0;
foreach (DataColumn column in dt.Columns)
{
if (nPosition == 0)
{
sReturnSql += "" + column.ColumnName + "";
nPosition += 1;
}
else
{
sReturnSql += "," + column.ColumnName + "";
}
}
sReturnSql += ") VALUES(";
nPosition = 0;
foreach (DataColumn column in dt.Columns)
{
if (nPosition == 0)
{
sReturnSql += this.DefineInsertType(column.DataType.ToString(), dr[column.ColumnName].ToString());
nPosition += 1;
}
else
{
sReturnSql += "," + this.DefineInsertType(column.DataType.ToString(), dr[column.ColumnName].ToString());
}
}
sReturnSql += ");";
nRows++;
if (nRows == batch)
{
ExecuteRequest(sReturnSql);
sReturnSql = "";
nRows = 0;
}
}
ExecuteRequest(sReturnSql);
nRows = 0;
return sReturnSql;
}
DefineInsertType()函数将不同的数据类型格式化为正确的格式,以便将它们添加到插入语句中,基于MySQL标准。
private object DefineInsertType(string sType, string sReceived)
{
object oReturn;
oReturn = null;
sReceived = this.FormatValue(sReceived.ToString());
if (sType.ToLower().IndexOf("int") != -1)
{
if (sReceived == null || sReceived.ToString() == string.Empty)
{
oReturn = 1;
}
else
{
oReturn = sReceived;
}
}
// ... 更多数据类型的处理 ...
return oReturn;
}
ExecuteRequest()函数执行由batch变量值收集的插入语句的数量。要运行这个函数,使用MySQL的Connector/Net驱动程序,可在以下链接获得:https://dev.mysql.com/downloads/connector/net/6.0.html
public int ExecuteRequest(string sRequest)
{
MySql.Data.MySqlClient.MySqlConnection oMySqlConnection;
MySql.Data.MySqlClient.MySqlCommand oSqlCommand;
int i = -1;
oSqlCommand = new MySql.Data.MySqlClient.MySqlCommand();
oMySqlConnection = new MySql.Data.MySqlClient.MySqlConnection("MySqlConnectionString");
try
{
oSqlCommand.Connection = oMySqlConnection;
oSqlCommand.CommandType = System.Data.CommandType.Text;
oSqlCommand.CommandText = sRequest;
if (oMySqlConnection.State == System.Data.ConnectionState.Closed)
oMySqlConnection.Open();
oSqlCommand.ExecuteNonQuery();
oSqlCommand.Connection.Close();
oMySqlConnection.Close();
i = 1;
}
catch (System.Data.SqlClient.SqlException ex)
{
i = -1;
oMySqlConnection.Close();
}
return i;
}
有了所有这些必需的函数和生成多个插入语句的逻辑,只需要调用主函数BuildInsertQueriesByBatch(),如下所示:
public void ExecuteExportNew()
{
DataSet ds = new DataSet();
ds = GetDataSet();
BuildInsertQueriesByBatch(ds.Tables["Products"], 200);
BuildInsertQueriesByBatch(ds.Tables["Customers"], 100);
}
两次调用BuildInsertQueriesByBatch()函数,一次针对每个DataTable。如果有更多的DataTable,将需要为DataSet中的每个表添加一个指令,包括DataTable名称和所需的batch数值。