C# 封装SqlBulkCopy,让批量插入更方便

Stella981
• 阅读 802

关于 SqlServer 批量插入的方式,前段时间也有大神给出了好几种批量插入的方式及对比测试(http://www.cnblogs.com/jiekzou/p/6145550.html),估计大家也都明白,最佳的方式就是用 SqlBulkCopy。我对 SqlBulkCopy 封装成了一个 Helper 方法,使得批量插入更加方便,先看看封装后的方法定义:

public static class SqlConnectionExtension
{
    /// <summary>
    /// 使用 SqlBulkCopy 向 destinationTableName 表插入数据
    /// </summary>
    /// <typeparam name="TModel">必须拥有与目标表所有字段对应属性</typeparam>
    /// <param name="conn"></param>
    /// <param name="modelList">要插入的数据</param>
    /// <param name="batchSize">SqlBulkCopy.BatchSize</param>
    /// <param name="destinationTableName">如果为 null,则使用 TModel 名称作为 destinationTableName</param>
    /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
    /// <param name="externalTransaction">要使用的事务</param>
    public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null);
}

上面都有详细解释,相信大家一看就会明白,接下来演示下用法及效果:

先创建一个测试的 Users 表:

CREATE TABLE [dbo].[Users](
    [Id] [uniqueidentifier] NOT NULL,
    [Name] [nvarchar](100) NULL,
    [Gender] [int] NULL,
    [Age] [int] NULL,
    [CityId] [int] NULL,
    [OpTime] [datetime] NULL,
    CONSTRAINT [PK_Users] PRIMARY KEY CLUSTERED([Id] ASC)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
 ) ON [PRIMARY]

然后定义一个与表映射的 Model,记住,由于 SqlBulkCopy 的特性,定义的 Model 必须拥有与表所有的字段对应的属性:

public enum Gender
{
    Man = 1,
    Woman
}

public class User
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public Gender? Gender { get; set; }
    public int? Age { get; set; }
    public int? CityId { get; set; }
    public DateTime? OpTime { get; set; }
}

制造些数据,然后就可以直接插入了:

List<User> usersToInsert = new List<User>();
usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so1", Gender = Gender.Man, Age = 18, CityId = 1, OpTime = DateTime.Now });
usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so2", Gender = Gender.Man, Age = 19, CityId = 2, OpTime = DateTime.Now });
usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so3", Gender = Gender.Man, Age = 20, CityId = 3, OpTime = DateTime.Now });
usersToInsert.Add(new User() { Id = Guid.NewGuid(), Name = "so4", Gender = Gender.Man, Age = 21, CityId = 4, OpTime = DateTime.Now });

using (SqlConnection conn = new SqlConnection("Data Source = .;Initial Catalog = Chloe;Integrated Security = SSPI;"))
{
    conn.BulkCopy(usersToInsert, 20000, "Users");
}

执行插入后表数据:

C# 封装SqlBulkCopy,让批量插入更方便

很方便吧,定义好 Model,调用 BulkCopy 方法就能插入了。这个方法主要解决了两个问题:1.免去手动构造 DataTable 和向 DataTable 填充数据,要知道,SqlBulkCopy 要求 DataTable 的列必须和表列顺序一致,如果手动构造 DataTable 的话会使代码很难维护;2.不用亲自 new 出 SqlBulkCopy 对象以及手动给 SqlBulkCopy 对象设置各种值,如 DestinationTableName、BulkCopyTimeout、BatchSize 等,用封装的方法,直接传相应的值就好了。接下来贴干货,简单介绍下实现。

先了解 SqlBulkCopy 的定义(部分):

public sealed class SqlBulkCopy : IDisposable
{
    public SqlBulkCopy(SqlConnection connection);
    public SqlBulkCopy(string connectionString);
    public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions);
    public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction);

    public int BatchSize { get; set; }
    public int BulkCopyTimeout { get; set; }
    public SqlBulkCopyColumnMappingCollection ColumnMappings { get; }
    public string DestinationTableName { get; set; }
    public bool EnableStreaming { get; set; }
    public int NotifyAfter { get; set; }
    public event SqlRowsCopiedEventHandler SqlRowsCopied;

    public void Close();
    public void WriteToServer(DataRow[] rows);
    public void WriteToServer(DataTable table);
    public void WriteToServer(IDataReader reader);
    public void WriteToServer(DataTable table, DataRowState rowState);
}

我们只需关注 WriteToServer 方法。因为我们的数据源不是数据库或excel,所以我们直接不考虑 WriteToServer(IDataReader reader)。WriteToServer(DataRow[] rows) 直接无视,不多解释,所以我们只需考虑用 WriteToServer(DataTable table) 就行了。开干!

一、构造一个结构严谨的 DataTable。 
由于 SqlBulkCopy 要求 DataTable 的列必须和表列顺序一致,并且不能多也不能少,所以,我们首先要创建一个和目标表字段顺序一致的 DataTable,先查出目标表的结构:

static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
{
    string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);

    List<SysColumn> columns = new List<SysColumn>();
    using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
    {
        conn.Open();
        using (var reader = conn.ExecuteReader(sql))
        {
            while (reader.Read())
            {
                SysColumn column = new SysColumn();
                column.Name = reader.GetDbValue("name");
                column.ColOrder = reader.GetDbValue("colorder");

                columns.Add(column);
            }
        }
        conn.Close();
    }

    return columns;
}

得到基本的表结构 List,再创建“严格”的 DataTable 对象:

DataTable dt = new DataTable();

Type modelType = typeof(TModel);

List<SysColumn> columns = GetTableColumns(conn, tableName);
List<PropertyInfo> mappingProps = new List<PropertyInfo>();

var props = modelType.GetProperties();
for (int i = 0; i < columns.Count; i++)
{
    var column = columns[i];
    PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
    if (mappingProp == null)
        throw new Exception(string.Format("model 类型 '{0}'未定义与表 '{1}' 列名为 '{2}' 映射的属性", modelType.FullName, tableName, column.Name));

    mappingProps.Add(mappingProp);
    Type dataType = GetUnderlyingType(mappingProp.PropertyType);
    if (dataType.IsEnum)
        dataType = typeof(int);
    dt.Columns.Add(new DataColumn(column.Name, dataType));
}

注意,构造 DataColumn 时,要给 Column 设置 DataType,及数据类型。因为如果不指定数据类型,默认是 string 类型,那样会导致将数据发送至数据库时会引起数据转换,会有些许无谓的性能损耗,同时,如果不指定数据类型,导入一些数据类型时可能会失败,比如模型属性是 Guid 类型,导入时会出现类型转换失败异常。

二、利用反射,获取属性值,构造一行一行的 DataRow,填充 DataTable:

foreach (var model in modelList)
{
    DataRow dr = dt.NewRow();
    for (int i = 0; i < mappingProps.Count; i++)
    {
        PropertyInfo prop = mappingProps[i];
        object value = prop.GetValue(model);

        if (GetUnderlyingType(prop.PropertyType).IsEnum)
        {
            if (value != null)
                value = (int)value;
        }

        dr[i] = value ?? DBNull.Value;
    }

    dt.Rows.Add(dr);
}

三、一个完整包含数据的 DataTable 对象就创建好了,我们就可以使用 SqlBulkCopy 插入数据了:

public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
{
    bool shouldCloseConnection = false;

    if (string.IsNullOrEmpty(destinationTableName))
        destinationTableName = typeof(TModel).Name;

    DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);

    SqlBulkCopy sbc = null;

    try
    {
        if (externalTransaction != null)
            sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
        else
            sbc = new SqlBulkCopy(conn);

        using (sbc)
        {
            sbc.BatchSize = batchSize;
            sbc.DestinationTableName = destinationTableName;

            if (bulkCopyTimeout != null)
                sbc.BulkCopyTimeout = bulkCopyTimeout.Value;

            if (conn.State != ConnectionState.Open)
            {
                shouldCloseConnection = true;
                conn.Open();
            }

            sbc.WriteToServer(dtToWrite);
        }
    }
    finally
    {
        if (shouldCloseConnection && conn.State == ConnectionState.Open)
            conn.Close();
    }
}

完事,一个批量插入的 Helper 方法就这么产生了,最终的完整实现如下:

C# 封装SqlBulkCopy,让批量插入更方便 C# 封装SqlBulkCopy,让批量插入更方便

public static class SqlConnectionExtension
{
    /// <summary>
    /// 使用 SqlBulkCopy 向 destinationTableName 表插入数据
    /// </summary>
    /// <typeparam name="TModel">必须拥有与目标表所有字段对应属性</typeparam>
    /// <param name="conn"></param>
    /// <param name="modelList">要插入的数据</param>
    /// <param name="batchSize">SqlBulkCopy.BatchSize</param>
    /// <param name="destinationTableName">如果为 null,则使用 TModel 名称作为 destinationTableName</param>
    /// <param name="bulkCopyTimeout">SqlBulkCopy.BulkCopyTimeout</param>
    /// <param name="externalTransaction">要使用的事务</param>
    public static void BulkCopy<TModel>(this SqlConnection conn, List<TModel> modelList, int batchSize, string destinationTableName = null, int? bulkCopyTimeout = null, SqlTransaction externalTransaction = null)
    {
        bool shouldCloseConnection = false;

        if (string.IsNullOrEmpty(destinationTableName))
            destinationTableName = typeof(TModel).Name;

        DataTable dtToWrite = ToSqlBulkCopyDataTable(modelList, conn, destinationTableName);

        SqlBulkCopy sbc = null;

        try
        {
            if (externalTransaction != null)
                sbc = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, externalTransaction);
            else
                sbc = new SqlBulkCopy(conn);

            using (sbc)
            {
                sbc.BatchSize = batchSize;
                sbc.DestinationTableName = destinationTableName;

                if (bulkCopyTimeout != null)
                    sbc.BulkCopyTimeout = bulkCopyTimeout.Value;

                if (conn.State != ConnectionState.Open)
                {
                    shouldCloseConnection = true;
                    conn.Open();
                }

                sbc.WriteToServer(dtToWrite);
            }
        }
        finally
        {
            if (shouldCloseConnection && conn.State == ConnectionState.Open)
                conn.Close();
        }
    }

    public static DataTable ToSqlBulkCopyDataTable<TModel>(List<TModel> modelList, SqlConnection conn, string tableName)
    {
        DataTable dt = new DataTable();

        Type modelType = typeof(TModel);

        List<SysColumn> columns = GetTableColumns(conn, tableName);
        List<PropertyInfo> mappingProps = new List<PropertyInfo>();

        var props = modelType.GetProperties();
        for (int i = 0; i < columns.Count; i++)
        {
            var column = columns[i];
            PropertyInfo mappingProp = props.Where(a => a.Name == column.Name).FirstOrDefault();
            if (mappingProp == null)
                throw new Exception(string.Format("model 类型 '{0}'未定义与表 '{1}' 列名为 '{2}' 映射的属性", modelType.FullName, tableName, column.Name));

            mappingProps.Add(mappingProp);
            Type dataType = GetUnderlyingType(mappingProp.PropertyType);
            if (dataType.IsEnum)
                dataType = typeof(int);
            dt.Columns.Add(new DataColumn(column.Name, dataType));
        }

        foreach (var model in modelList)
        {
            DataRow dr = dt.NewRow();
            for (int i = 0; i < mappingProps.Count; i++)
            {
                PropertyInfo prop = mappingProps[i];
                object value = prop.GetValue(model);

                if (GetUnderlyingType(prop.PropertyType).IsEnum)
                {
                    if (value != null)
                        value = (int)value;
                }

                dr[i] = value ?? DBNull.Value;
            }

            dt.Rows.Add(dr);
        }

        return dt;
    }
    static List<SysColumn> GetTableColumns(SqlConnection sourceConn, string tableName)
    {
        string sql = string.Format("select * from syscolumns inner join sysobjects on syscolumns.id=sysobjects.id where sysobjects.xtype='U' and sysobjects.name='{0}' order by syscolumns.colid asc", tableName);

        List<SysColumn> columns = new List<SysColumn>();
        using (SqlConnection conn = (SqlConnection)((ICloneable)sourceConn).Clone())
        {
            conn.Open();
            using (var reader = conn.ExecuteReader(sql))
            {
                while (reader.Read())
                {
                    SysColumn column = new SysColumn();
                    column.Name = reader.GetDbValue("name");
                    column.ColOrder = reader.GetDbValue("colorder");

                    columns.Add(column);
                }
            }
            conn.Close();
        }

        return columns;
    }

    static Type GetUnderlyingType(Type type)
    {
        Type unType = Nullable.GetUnderlyingType(type); ;
        if (unType == null)
            unType = type;

        return unType;
    }

    class SysColumn
    {
        public string Name { get; set; }
        public int ColOrder { get; set; }
    }
}

完整代码

代码不多,仅仅150行,大家可以直接拷走拿去用。其中用了反射,估计吃瓜群众可能不淡定了~哈哈,如果你真有大数据插入需求,这点反射消耗相对大数据插入简直九牛一毛,微乎其微,放心好了。

转自:https://www.cnblogs.com/so9527/p/6193154.html

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这