4
4
5
5
using System ;
6
6
using System . Collections . Generic ;
7
+ using System . Data ;
8
+ using System . Data . Common ;
7
9
using System . Globalization ;
8
10
using System . IO ;
9
11
using System . Text ;
12
+ using System . Threading . Tasks ;
10
13
11
14
namespace Microsoft . Data . Analysis
12
15
{
@@ -109,12 +112,158 @@ public static DataFrame LoadCsv(string filename,
109
112
}
110
113
}
111
114
115
+ public static DataFrame LoadFrom ( IEnumerable < IList < object > > vals , IList < ( string , Type ) > columnInfos )
116
+ {
117
+ var columnsCount = columnInfos . Count ;
118
+ var columns = new List < DataFrameColumn > ( columnsCount ) ;
119
+
120
+ foreach ( var ( name , type ) in columnInfos )
121
+ {
122
+ var column = CreateColumn ( type , name ) ;
123
+ columns . Add ( column ) ;
124
+ }
125
+
126
+ var res = new DataFrame ( columns ) ;
127
+
128
+ foreach ( var items in vals )
129
+ {
130
+ for ( var c = 0 ; c < items . Count ; c ++ )
131
+ {
132
+ items [ c ] = items [ c ] ;
133
+ }
134
+ res . Append ( items , inPlace : true ) ;
135
+ }
136
+
137
+ return res ;
138
+ }
139
+
140
+ public void SaveTo ( DataTable table )
141
+ {
142
+ var columnsCount = Columns . Count ;
143
+
144
+ if ( table . Columns . Count == 0 )
145
+ {
146
+ foreach ( var column in Columns )
147
+ {
148
+ table . Columns . Add ( column . Name , column . DataType ) ;
149
+ }
150
+ }
151
+ else
152
+ {
153
+ if ( table . Columns . Count != columnsCount )
154
+ throw new ArgumentException ( ) ;
155
+ for ( var c = 0 ; c < columnsCount ; c ++ )
156
+ {
157
+ if ( table . Columns [ c ] . DataType != Columns [ c ] . DataType )
158
+ throw new ArgumentException ( ) ;
159
+ }
160
+ }
161
+
162
+ var items = new object [ columnsCount ] ;
163
+ foreach ( var row in Rows )
164
+ {
165
+ for ( var c = 0 ; c < columnsCount ; c ++ )
166
+ {
167
+ items [ c ] = row [ c ] ?? DBNull . Value ;
168
+ }
169
+ table . Rows . Add ( items ) ;
170
+ }
171
+ }
172
+
173
+ public DataTable ToTable ( )
174
+ {
175
+ var res = new DataTable ( ) ;
176
+ SaveTo ( res ) ;
177
+ return res ;
178
+ }
179
+
180
+ public static DataFrame FromSchema ( DbDataReader reader )
181
+ {
182
+ var columnsCount = reader . FieldCount ;
183
+ var columns = new DataFrameColumn [ columnsCount ] ;
184
+
185
+ for ( var c = 0 ; c < columnsCount ; c ++ )
186
+ {
187
+ var type = reader . GetFieldType ( c ) ;
188
+ var name = reader . GetName ( c ) ;
189
+ var column = CreateColumn ( type , name ) ;
190
+ columns [ c ] = column ;
191
+ }
192
+
193
+ var res = new DataFrame ( columns ) ;
194
+ return res ;
195
+ }
196
+
197
+ public static async Task < DataFrame > LoadFrom ( DbDataReader reader )
198
+ {
199
+ var res = FromSchema ( reader ) ;
200
+ var columnsCount = reader . FieldCount ;
201
+
202
+ var items = new object [ columnsCount ] ;
203
+ while ( await reader . ReadAsync ( ) )
204
+ {
205
+ for ( var c = 0 ; c < columnsCount ; c ++ )
206
+ {
207
+ items [ c ] = reader . IsDBNull ( c )
208
+ ? null
209
+ : reader [ c ] ;
210
+ }
211
+ res . Append ( items , inPlace : true ) ;
212
+ }
213
+
214
+ reader . Close ( ) ;
215
+
216
+ return res ;
217
+ }
218
+
219
+ public static async Task < DataFrame > LoadFrom ( DbDataAdapter adapter )
220
+ {
221
+ using var reader = await adapter . SelectCommand . ExecuteReaderAsync ( ) ;
222
+ return await LoadFrom ( reader ) ;
223
+ }
224
+
225
+ public void SaveTo ( DbDataAdapter dataAdapter , DbProviderFactory factory )
226
+ {
227
+ using var commandBuilder = factory . CreateCommandBuilder ( ) ;
228
+ commandBuilder . DataAdapter = dataAdapter ;
229
+ dataAdapter . InsertCommand = commandBuilder . GetInsertCommand ( ) ;
230
+ dataAdapter . UpdateCommand = commandBuilder . GetUpdateCommand ( ) ;
231
+ dataAdapter . DeleteCommand = commandBuilder . GetDeleteCommand ( ) ;
232
+
233
+ using var table = ToTable ( ) ;
234
+
235
+ var connection = dataAdapter . SelectCommand . Connection ;
236
+ var needClose = connection . TryOpen ( ) ;
237
+
238
+ try
239
+ {
240
+ using var transaction = connection . BeginTransaction ( ) ;
241
+ try
242
+ {
243
+ dataAdapter . Update ( table ) ;
244
+ }
245
+ catch
246
+ {
247
+ transaction . Rollback ( ) ;
248
+ transaction . Dispose ( ) ;
249
+ throw ;
250
+ }
251
+ transaction . Commit ( ) ;
252
+ }
253
+ finally
254
+ {
255
+ if ( needClose )
256
+ connection . Close ( ) ;
257
+ }
258
+ }
259
+
112
260
/// <summary>
113
261
/// return <paramref name="columnIndex"/> of <paramref name="columnNames"/> if not null or empty, otherwise return "Column{i}" where i is <paramref name="columnIndex"/>.
114
262
/// </summary>
115
263
/// <param name="columnNames">column names.</param>
116
264
/// <param name="columnIndex">column index.</param>
117
265
/// <returns></returns>
266
+
118
267
private static string GetColumnName ( string [ ] columnNames , int columnIndex )
119
268
{
120
269
var defaultColumnName = "Column" + columnIndex . ToString ( ) ;
@@ -126,7 +275,7 @@ private static string GetColumnName(string[] columnNames, int columnIndex)
126
275
return defaultColumnName ;
127
276
}
128
277
129
- private static DataFrameColumn CreateColumn ( Type kind , string [ ] columnNames , int columnIndex )
278
+ private static DataFrameColumn CreateColumn ( Type kind , string columnName )
130
279
{
131
280
DataFrameColumn ret ;
132
281
if ( kind == typeof ( bool ) )
@@ -143,7 +292,7 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
143
292
}
144
293
else if ( kind == typeof ( string ) )
145
294
{
146
- ret = new StringDataFrameColumn ( GetColumnName ( columnNames , columnIndex ) , 0 ) ;
295
+ ret = new StringDataFrameColumn ( columnName , 0 ) ;
147
296
}
148
297
else if ( kind == typeof ( long ) )
149
298
{
@@ -187,7 +336,7 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
187
336
}
188
337
else if ( kind == typeof ( DateTime ) )
189
338
{
190
- ret = new PrimitiveDataFrameColumn < DateTime > ( GetColumnName ( columnNames , columnIndex ) ) ;
339
+ ret = new PrimitiveDataFrameColumn < DateTime > ( columnName ) ;
191
340
}
192
341
else
193
342
{
@@ -196,6 +345,11 @@ private static DataFrameColumn CreateColumn(Type kind, string[] columnNames, int
196
345
return ret ;
197
346
}
198
347
348
+ private static DataFrameColumn CreateColumn ( Type kind , string [ ] columnNames , int columnIndex )
349
+ {
350
+ return CreateColumn ( kind , GetColumnName ( columnNames , columnIndex ) ) ;
351
+ }
352
+
199
353
private static DataFrame ReadCsvLinesIntoDataFrame ( WrappedStreamReaderOrStringReader wrappedReader ,
200
354
char separator = ',' , bool header = true ,
201
355
string [ ] columnNames = null , Type [ ] dataTypes = null ,
0 commit comments