[Oracle] Bulk Insert Data

 

命名空间:Oracle.DataAccess.Client

总结

壹、在30+万和60+万数据时,ArrayBind3回性导入和OracleBulkCopy时间相差不是一点都不小,不过ArrayBind格局相似都急需转移数据方式,占用了有的时光,而
OracleBulkCopy 则只须要简单处理一下 DataTable 数据源即可导入;

2、当数据量达到100+万时,ArrayBind
很简单并发内部存储器不足极度,此时只得利用分批次执行导入,遵照测试结果可知,次数越少,速度越快;而采用OracleBulkCopy 形式则很少出现内部存款和储蓄器不足现象,由此可见 OracleBulkCopy
占用内部存款和储蓄器比 ArrayBind 模式少;

三、接纳 OracleBulkCopy 导入时,先要禁用该表全体触发器,如若该表存在自增
ID 触发器,就相比麻烦了,得先禁止使用改变的自增 ID
的触发器,然后手动自增设置要导入的多寡,最终才得以导入;同时,这种导入方式不可并发,四个时刻只可以有一个用户在导入(因为自增ID交由程序处理),此时还索要锁表,制止别的人还要批量导入数据;

方式二:OracleBulkCopy

说明:

  1. OracleBulkCopy 采用 direct path 方式导入;

  2. 不支持 transaction,无法 Rollback;

  3. 若是该表存在触发器时,不可能选择 OracleBulkCopy(报那二个音信 Oracle
    Error: ORA-260八陆),除非先禁用该表的享有触发器;

  4. 进度中会自动启用 NOT NULL、UNIQUE 和 P大切诺基IMAPRADOY KEY 二种约束,其中 NOT
    NULL 约束在列数组绑定时表达,任何违背 NOT NULL
    约束规范的行数据都会舍弃;UNIQUE
    约束是在导入完毕后重建索引时认证,但是在 bulk copy
    时,允许违反索引约束,并在成功后将引得设置成禁止使用(UNUSABLE)状态;而且,假如索引壹始发景况正是剥夺(UNUSABLE)状态时,OracleBulkCopy
    是会报错的。

参照代码如下:

  1 /// <summary>
  2 /// 批量插入数据
  3 /// 该方法需要禁用该表所有触发器,并且插入的数据如果为空,是不会采用默认值
  4 /// </summary>
  5 /// <param name="table">数据表</param>
  6 /// <param name="targetTableName">数据库目标表名</param>
  7 /// <returns></returns>
  8 public bool InsertBulkData(DataTable table, string targetTableName)
  9 {
 10     bool result = false;
 11     string connStr = GetConnectionString();
 12     using (OracleConnection connection = new OracleConnection(connStr))
 13     {
 14         using (OracleBulkCopy bulkCopy = new OracleBulkCopy(connStr, OracleBulkCopyOptions.Default))
 15         {
 16             if (table != null && table.Rows.Count > 0)
 17             {
 18                 bulkCopy.DestinationTableName = targetTableName;
 19                 for (int i = 0; i < table.Columns.Count; i++)
 20                 {
 21                     string col = table.Columns[i].ColumnName;
 22                     bulkCopy.ColumnMappings.Add(col, col);
 23                 }
 24                 connection.Open();
 25                 bulkCopy.WriteToServer(table);
 26                 result = true;
 27             }
 28             bulkCopy.Close();
 29             bulkCopy.Dispose();
 30         }
 31     }
 32 
 33     return result;
 34 }

测试结果:

数据类型:4列NVARCHAR2,2列NUMBER

30+万(7.36M):用时 14:590

60+万(14.6M):用时 28:28

1048576(24.9M):用时 52:971

叠加,禁止使用表的保有外键SQL:

ALTER TABLE table_name DISABLE ALL TRIGGERS

工具:Microsoft Visual Studio Ultimate 2013 + Oracle SQL Developer
1.5.5 + Oracle Database 11g Enterprise Edition 11.2.0.1.0(32位)
+ TNS for 32-bit Windows 11.2.0.1.0

组件:Oracle.DataAccess.dll(2.112.1.0)

ODP.NET 版本:ODP.NET for .NET Framework 2.0 或 ODP.NET for .NET
Framework 4

参考资料:

1、ArrayBind
http://www.oracle.com/technetwork/issue-archive/2009/09-sep/o59odpnet-085168.html

2、ArrayBind
http://www.soaspx.com/dotnet/csharp/csharp_20130911_10501.html

三、Oracle数据导入方法
http://dbanotes.net/Oracle/All_About_Oracle_Data_Loading.htm

4、介绍OracleBulkCopy类
https://docs.oracle.com/cd/E11882_01/win.112/e23174/OracleBulkCopyClass.htm#ODPNT7446

5、http://dba.stackexchange.com/questions/7287/what-specifically-does-oraclebulkcopy-do-and-how-can-i-optimize-its-performance

方式一:ArrayBind

当插入一条数据时,SQL 语句如下:

INSERT INTO table_name VALUES (:col1, :col2, :col3, :col4, :col5)

  1 public void InsertDataRow(Dictionary<string, object> dataRow)
  2 {
  3     StringBuilder sbCmdText = new StringBuilder();
  4     sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName);
  5     sbCmdText.Append(string.Join(",", dataRow.Keys.ToArray()));
  6     sbCmdText.Append(") VALUES (");
  7     sbCmdText.Append(":" + string.Join(",:", dataRow.Keys.ToArray()));
  8     sbCmdText.Append(")");
  9 
 10     using (OracleConnection conn = new OracleConnection())
 11     {
 12         using (OracleCommand cmd = conn.CreateCommand())
 13         {
 14             cmd.CommandType = CommandType.Text;
 15             cmd.CommandText = sbCmdText.ToString();
 16             OracleParameter parameter = null;
 17             OracleDbType dbType = OracleDbType.Object;
 18             foreach (string colName in dataRow.Keys)
 19             {
 20                 dbType = GetOracleDbType(dataRow[colName]);
 21                 parameter = new OracleParameter(colName, dbType);
 22                 parameter.Direction = ParameterDirection.Input;
 23                 parameter.OracleDbTypeEx = dbType;
 24                 parameter.Value = dataRow[colName];
 25                 cmd.Parameters.Add(parameter);
 26             }
 27             conn.Open();
 28             int result = cmd.ExecuteNonQuery();
 29         }
 30     }
 31 }

那儿,每一个 OracleParameter 的 Value 值都予以单个字段的
贰个具体值,那种也是最棒守旧的插入数据的艺术。

Oracle V六 中 OCI 编制程序接口参预了数组接口脾气。

当使用 ArrayBind 时,OraleParameter 的 Value 值则是赋予单个字段的
二个数组,即多条数据的该字段组合成的3个数组。此时 Oracle 仅须要执行一遍SQL
语句,即可在内部存款和储蓄器中批量解析并导入数据,收缩程序与数据库之间往来的操作,其独到之处就是数量导入的完整时间分明回落,特别是进度占用
CPU 的时刻。

要是数据源是 DataTable 类型,首先把 DataTable 数据源,转换来object[][] 类型,然后绑定 OracleParameter 的 Value
值为对应字段的1个 Object[] 数组即可;参考代码如下:

  1 /// <summary>
  2 /// 批量插入大数据量
  3 /// </summary>
  4 /// <param name="columnData">列名-列数据字典</param>
  5 /// <param name="dataCount">数据量</param>
  6 /// <returns>插入数据量</returns>
  7 public int InsertBigData(Dictionary<string, object> columnData, int dataCount)
  8 {
  9     int result = 0;
 10     if (columnData == null || columnData.Count < 1)
 11     {
 12         return result;
 13     }
 14     string[] colHeaders = columnData.Keys.ToArray();
 15     StringBuilder sbCmdText = new StringBuilder();
 16     if (columnData.Count > 0)
 17     {
 18         // 拼接INSERT的SQL语句
 19         sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName);
 20         sbCmdText.Append(string.Join(",", colHeaders));
 21         sbCmdText.Append(") VALUES (");
 22         sbCmdText.Append(m_ParameterPrefix + string.Join("," + m_ParameterPrefix, colHeaders));
 23         sbCmdText.Append(")");
 24         OracleConnection connection = null;
 25         try
 26         {
 27             connection = new OracleConnection(GetConnectionString());
 28             using (OracleCommand command = connection.CreateCommand())
 29             {
 30                 command.ArrayBindCount = dataCount;
 31                 command.BindByName = true;
 32                 command.CommandType = CommandType.Text;
 33                 command.CommandText = sbCmdText.ToString();
 34                 command.CommandTimeout = 1800;
 35                 OracleParameter parameter;
 36                 OracleDbType dbType = OracleDbType.Object;
 37                 foreach (string colName in colHeaders)
 38                 {
 39                     dbType = GetOracleDbType(columnData[colName]);
 40                     parameter = new OracleParameter(colName, dbType);
 41                     parameter.Direction = ParameterDirection.Input;
 42                     parameter.OracleDbTypeEx = dbType;
 43                     parameter.Value = columnData[colName];
 44                     command.Parameters.Add(parameter);
 45                 }
 46                 connection.Open();
 47                 OracleTransaction trans = connection.BeginTransaction();
 48                 try
 49                 {
 50                     command.Transaction = trans;
 51                     result = command.ExecuteNonQuery();
 52                     trans.Commit();
 53                 }
 54                 catch (Exception ex)
 55                 {
 56                     trans.Rollback();
 57                     throw ex;
 58                 }
 59             }
 60         }
 61         finally
 62         {
 63             if (connection != null)
 64             {
 65                 connection.Close();
 66                 connection.Dispose();
 67             }
 68             GC.Collect();
 69             GC.WaitForFullGCComplete();
 70         }
 71     }
 72     return result;
 73 }

Oracle 1Oracle 2

  1 /// <summary>
  2 /// 根据数据类型获取OracleDbType
  3 /// </summary>
  4 /// <param name="value">数据</param>
  5 /// <returns>数据的Oracle类型</returns>
  6 private static OracleDbType GetOracleDbType(object value)
  7 {
  8     OracleDbType dataType = OracleDbType.Object;
  9     if (value is string[])
 10     {
 11         dataType = OracleDbType.Varchar2;
 12     }
 13     else if (value is DateTime[])
 14     {
 15         dataType = OracleDbType.TimeStamp;
 16     }
 17     else if (value is int[] || value is short[])
 18     {
 19         dataType = OracleDbType.Int32;
 20     }
 21     else if (value is long[])
 22     {
 23         dataType = OracleDbType.Int64;
 24     }
 25     else if (value is decimal[] || value is double[] || value is float[])
 26     {
 27         dataType = OracleDbType.Decimal;
 28     }
 29     else if (value is Guid[])
 30     {
 31         dataType = OracleDbType.Varchar2;
 32     }
 33     else if (value is bool[] || value is Boolean[])
 34     {
 35         dataType = OracleDbType.Byte;
 36     }
 37     else if (value is byte[])
 38     {
 39         dataType = OracleDbType.Blob;
 40     }
 41     else if (value is char[])
 42     {
 43         dataType = OracleDbType.Char;
 44     }
 45     return dataType;
 46 }

GetOracleDbType

说明:如若利用分次(每一次一万数额)执行 InsertBigData
方法,速度反倒比一遍性执行 InsertBigData 方法慢,详见上边测试结果;

测试结果:

无索引,数据类型:4列NVARCHAR2,2列NUMBER

30+万(柒.3陆M):二回性导入用时 一伍:62三,每便一千0导入用时

60+万(1四.陆M):一遍性导入用时 2八:20柒,每回一千0导入用时 一:二:300

Oracle,十0+万(2四.九M):叁回性导入报如下至极

Oracle 3

此时实在从财富监视器上得以摸清仍有可用内存,但是还是报
OutOfMemoryException,所以思疑应该是3个 bug;

假诺每趟一千0导入用时 二:九:252

假设老是陆仟0导入用时 58:10一

叠加 InsertBigData 方法运用示例:

Oracle 4Oracle 5

  1 // 每10000数据导入一次
  2 Dictionary<string, object> columnsData = new Dictionary<string, object>();
  3 int dataCount = m_SourceDataTable.Rows.Count;
  4 int times = dataCount / 10000 + (dataCount % 10000 == 0 ? 0 : 1);
  5 for (int i = 0; i < times; i++)
  6 {
  7     int startIndex = i * 10000;
  8     int endIndex = (i + 1) * 10000;
  9     endIndex = endIndex > dataCount ? dataCount : endIndex;
 10     int currDataCount = endIndex - startIndex;
 11     columnsData.Add("COL1", new string[currDataCount]);
 12     columnsData.Add("COL2", new string[currDataCount]);
 13     columnsData.Add("COL3", new decimal[currDataCount]);
 14     columnsData.Add("COL4", new string[currDataCount]);
 15     columnsData.Add("COL5", new decimal[currDataCount]);
 16     columnsData.Add("COL6", new string[currDataCount]);
 17     for (int rowIndex = startIndex; rowIndex < endIndex; rowIndex++)
 18     {
 19         int dicRowIndex = rowIndex - startIndex;// 列数据行索引
 20         foreach (string colName in columnsData.Keys)
 21         {
 22             object cell = m_SourceDataTable.Rows[rowIndex][colName];
 23             string cellStr = (cell + "").TrimEnd(new char[] { '\0', ' ' });
 24             if (colName == "COL3" || colName == "COL5")
 25             {
 26                 decimal value = 0;
 27                 decimal.TryParse(cellStr, out value);
 28                 ((decimal[])columnsData[colName])[dicRowIndex] = value;
 29             }
 30             else
 31             {
 32                 ((string[])columnsData[colName])[dicRowIndex] = cellStr;
 33             }
 34         }
 35     }
 36     m_DAL.InsertBigData(columnsData, currDataCount);
 37 
 38     columnsData.Clear();
 39     GC.Collect();
 40     GC.WaitForFullGCComplete();
 41 }

View Code

相关文章