分享MSSQL、MySql、Oracle的大数据批量导入方法及编制程序手法细节

1:MSSQL

SQL语法篇:

BULK INSERT   
   [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]   
      FROM 'data_file'   
     [ WITH   
    (   
   [ [ , ] BATCHSIZE = batch_size ]   
   [ [ , ] CHECK_CONSTRAINTS ]   
   [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]   
   [ [ , ] DATAFILETYPE =   
      { 'char' | 'native'| 'widechar' | 'widenative' } ]   
   [ [ , ] FIELDTERMINATOR = 'field_terminator' ]   
   [ [ , ] FIRSTROW = first_row ]   
   [ [ , ] FIRE_TRIGGERS ]   
   [ [ , ] FORMATFILE = 'format_file_path' ]   
   [ [ , ] KEEPIDENTITY ]   
   [ [ , ] KEEPNULLS ]   
   [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]   
   [ [ , ] LASTROW = last_row ]   
   [ [ , ] MAXERRORS = max_errors ]   
   [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]   
   [ [ , ] ROWS_PER_BATCH = rows_per_batch ]   
   [ [ , ] ROWTERMINATOR = 'row_terminator' ]   
   [ [ , ] TABLOCK ]   
   [ [ , ] ERRORFILE = 'file_name' ]   
    )]   

SQL示例:

 bulk insert 表名  from 'D:\mydata.txt' 
with 
 (fieldterminator=',', 
 rowterminator='\n', 
 check_constraints) 
select * from 表名 

由于C#提供了SqlBulkCopy,所以非DBA的大家,越来越多会通进度序来调用:

C#代码篇:

C#代码调用示例及细节,以下代码摘录自CYQ.Data:

using (SqlBulkCopy sbc = new SqlBulkCopy(con, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, sqlTran))
                    {
                        sbc.BatchSize = 100000;
                        sbc.DestinationTableName = SqlFormat.Keyword(mdt.TableName, DalType.MsSql);
                        sbc.BulkCopyTimeout = AppConfig.DB.CommandTimeout;
                        foreach (MCellStruct column in mdt.Columns)
                        {
                            sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
                        }
                        sbc.WriteToServer(mdt);
                    }

有5个细节:

1:事务:

假如只是单个事务,构造函数能够是链接字符串。

要是急需和外部合成二个事务(比如先删除,再插入,那在同3个政工中)

就要求自个儿组织Connection对象和Transaction,在左右文中传递来处理。

2:插入是或不是引发触发器

通过SqlBulkCopyOptions.FireTriggers 引入

3:其余:批量数、超时时间、是还是不是写入主键ID。

恐怕引发的数据库Down机的地方:

在历史的历程中,小编遇上过的三个大坑是:

当数码的长度过长,数据的字段过短,发生多少二进制截断时,数据库服务甚至停掉了(恐怕是特例,只怕不是)。

之所以小心使用,尽力做好对外表数据做好数据长度验证。

2:MySql

有关MySql的批量,那是一段悲催的史迹,有多少个坑,直到前天,才发现并解决了。

SQL语法篇:

LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'data.txt'
    [REPLACE | IGNORE]
    INTO TABLE tbl_name
    [FIELDS
        [TERMINATED BY 'string']
        [[OPTIONALLY] ENCLOSED BY 'char']
        [ESCAPED BY 'char' ]
    ]
    [LINES
        [STARTING BY 'string']
        [TERMINATED BY 'string']
    ]
    [IGNORE number LINES]
    [(col_name_or_user_var,...)]
    [SET col_name = expr,...)]

示例篇:

LOAD DATA LOCAL INFILE 'C:\\Users\\cyq\\AppData\\Local\\Temp\\BulkCopy.csv' INTO TABLE `BulkCopy` CHARACTER SET utf8 FIELDS TERMINATED BY '$,$' LINES TERMINATED BY '
' (`ID`,`Name`,`CreateTime`,`Sex`)

就算MySql.Data.dll 提供了MySqlBulkLoader,但是看源码只是生成了个Load
Data 并用ADO.NET执行,

主导大坑的转移*.csv数据文件的竟然没提供,所以自个儿生成语句并实行就好了,不要求用它。

C#代码篇:

以下代码摘自CYQ.Data,是一段明天才改正好的代码:

 private static string MDataTableToFile(MDataTable dt, bool keepID, DalType dalType)
        {
            string path = Path.GetTempPath() + dt.TableName + ".csv";
            using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))
            {
                MCellStruct ms;
                string value;
                foreach (MDataRow row in dt.Rows)
                {
                    for (int i = 0; i < dt.Columns.Count; i++)
                    {
                        #region 设置值
                        ms = dt.Columns[i];
                        if (!keepID && ms.IsAutoIncrement)
                        {
                            continue;
                        }
                        else if (dalType == DalType.MySql && row[i].IsNull)
                        {
                            sw.Write("\\N");//Mysql用\N表示null值。
                        }
                        else
                        {
                            value = row[i].ToString();
                            if (ms.SqlType == SqlDbType.Bit)
                            {
                                int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;
                                if (dalType == DalType.MySql)
                                {
                                    byte[] b = new byte[1];
                                    b[0] = (byte)v;
                                    value = System.Text.Encoding.UTF8.GetString(b);//mysql必须用字节存档。
                                }
                                else
                                {
                                    value = v.ToString();
                                }

                            }
                            else
                            {
                                value = value.Replace("\\", "\\\\");//处理转义符号
                            }
                            sw.Write(value);
                        }

                        if (i != dt.Columns.Count - 1)//不是最后一个就输出
                        {
                            sw.Write(AppConst.SplitChar);
                        }
                        #endregion
                    }
                    sw.WriteLine();
                }
            }
            if (Path.DirectorySeparatorChar == '\\')
            {
                path = path.Replace(@"\", @"\\");
            }
            return path;
        }

如上代码是发生1个csv文件,用于被调用,有五个宗旨的坑,费了自笔者许多日子:

1:Bit类型数据导不进入?

2:第贰行数据自增ID被重置为1?

那五个难题,网上搜不到答案,放纵到今日,觉的应该化解了,然后就把它消除了。

缓解的思路是那般的:

A:先用Load Data OutFile导出三个文件,再用Load Data InFile导入文本。

一初步自作者用记事本打开看了一下,又随手Ctrl+S了一晃,结果发现标题和自家的同样,让自家嫌疑竟然不援救?

直到明日,重新导出,中间不看了,直接导入,发现它依旧又健康的,于是,思维一转:

B:把自身生成的文书和指令产生的文本,实行了十六进制比对,结果发现:

图片 1

Bit类型本人生成的的多少:是0,1,在十六进制下展现是30、31。

命令发生的数目在十六进制是00、01,查了下资料,发现MySql的Bit存档的Bit是二进制。

于是,把0,1用字节表示,再转字符串,再存档,就好了。

于是那样一段代码发生了(网上的DataTable转CSV代码都以没处理的,都不亮堂他们是怎么跑的,难道都没有定义Bit类型?):

if (ms.SqlType == SqlDbType.Bit)
{
     int v = (value.ToLower() == "true" || value == "1") ? 1 : 0;
     if (dalType == DalType.MySql)
     {
           byte[] b = new byte[1];
           b[0] = (byte)v;
           value = System.Text.Encoding.UTF8.GetString(b);//mysql必须用字节存档。
       }
      else
       {
            value = v.ToString();
       }
}

其它关于Null值,用\N表示。

消除完第②个难点,剩下正是第四个难题了,为何第一个行代码的主键会被置为1?

要么比对十六进制,结果惊人的意识:

图片 2

是BOM头,让它错识别了第一个主键值,所以被忽视主键,用了第二个自增值1替代了。

那也解释了为什么只要重新保存的数目都有Bug的原由。

于是,化解的艺术就是StreaWrite的时候,不生成BOM头,怎么处理呢?

于是就有了以下的代码:

 using (StreamWriter sw = new StreamWriter(path, false, new UTF8Encoding(false)))
{
       ...................
}

经过New多个Encoding,并钦定参数为false,替代我们健康的System.Text.Encoding.UTF8Encoding。

那一个细节很隐衷,不说您都猜不道。。。

3:Oracle

SQL语法篇

LOAD[DATA]
[ { INFILE | INDDN } {file | * }
[STREAM | RECORD | FIXED length [BLOCKSIZE size]|
VARIABLE [length] ]
[ { BADFILE | BADDN } file ]
{DISCARDS | DISCARDMAX} integr ]
[ {INDDN | INFILE} . . . ]
[ APPEND | REPLACE | INSERT ]
[RECLENT integer]
[ { CONCATENATE integer |
CONTINUEIF { [THIS | NEXT] (start[: end])LAST }
Operator { 'string' | X 'hex' } } ]
INTO TABLE [user.]table
[APPEND | REPLACE|INSERT]
[WHEN condition [AND condition]...]
[FIELDS [delimiter] ]
(
column {
RECNUM | CONSTANT value |
SEQUENCE ( { integer | MAX |COUNT} [, increment] ) |
[POSITION ( { start [end] | * [ + integer] }
) ]
datatype
[TERMINATED [ BY ] {WHITESPACE| [X] 'character' } ]
[ [OPTIONALLY] ENCLOSE[BY] [X]'charcter']
[NULLIF condition ]
[DEFAULTIF condotion]
}
[ ,...]
)

以上配置存档成一个CTL文件,再由以下的吩咐调用:

Sqlldr userid=用户名/密码@数据库 control=文件名.ctl 

C#语法篇:

.NET里大概有三种操作Oracle的一手:

1:System.Data.OracleClient
(须求设置客户端)没有带批量办法(还区分x86和x64)。

2:Oracle.DataAccess  (需求设置客户端)带批量格局(也区分x86和x64)。

3:Oracle.ManagedDataAccess
(不要求设置客户端)没带批量措施(不区分x86和x64,但仅补助.NET
4.0或以上)

Oracle.DataAccess
带的批量主意叫:OracleBulkCopy,由于使用格局和SqlBulkCopy大概相同,就不介绍了。

图片 3

假使调用程序所在的服务器安装了Oracle客户端,能够开展以下办法的调用:

流程如下:

1:产生*.cvs数据文件,见MySql中的代码,一样用的。

2:产生*.ctl控制文件,把变化的Load Data 语句存档成1个*.ctl文件即可。

3:用sqlidr.exe执行CTL文件,那里悲催的一些是,不可能用ADO.NET调用,只可以用经过调用,所以,那个批量只能单独行使。

调用进度的连带代码:

 bool hasSqlLoader = false;
        private bool HasSqlLoader() //检测是否安装了客户端。
        {
            hasSqlLoader = false;
            Process proc = new Process();
            proc.StartInfo.FileName = "sqlldr";
            proc.StartInfo.CreateNoWindow = true;
            proc.StartInfo.UseShellExecute = false;
            proc.StartInfo.RedirectStandardOutput = true;

            proc.OutputDataReceived += new DataReceivedEventHandler(proc_OutputDataReceived);
            proc.Start();
            proc.BeginOutputReadLine();
            proc.WaitForExit();
            return hasSqlLoader;
        }

        void proc_OutputDataReceived(object sender, DataReceivedEventArgs e)
        {
            if (!hasSqlLoader)
            {
                hasSqlLoader = e.Data.StartsWith("SQL*Loader:");
            }
        }
        //已经实现,但没有事务,所以暂时先不引入。
        private bool ExeSqlLoader(string arg)
        {
            try
            {
                Process proc = new Process();
                proc.StartInfo.FileName = "sqlldr";
                proc.StartInfo.Arguments = arg;
                proc.Start();
                proc.WaitForExit();
                return true;
            }
            catch
            {

            }
            return false;
        }

总结:

乘机大数指标推广,数据间的批量活动必然越来频仍的被提到,所以无论是是用SQL脚本,依旧友好写代码,或是用DBImport工具,都将成必需技能之一了!

鉴于此,分享一下自身在这一块费过的力和填过的坑,供大家参考!

相关文章