diff --git a/Apache-IoTDB-Client-CSharp-UserCase/Program.cs b/Apache-IoTDB-Client-CSharp-UserCase/Program.cs index 281104d..8bc7605 100644 --- a/Apache-IoTDB-Client-CSharp-UserCase/Program.cs +++ b/Apache-IoTDB-Client-CSharp-UserCase/Program.cs @@ -99,7 +99,7 @@ static async Task ExecuteQueryStatement() await session_pool.Open(false); var res = await session_pool.ExecuteQueryStatementAsync("select * from root.ln.wf01.wt01"); res.ShowTableNames(); - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(res.Next()); } diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs index 3126d1f..0e64099 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs @@ -28,7 +28,11 @@ public partial class SessionPoolTest { public async Task TestInsertAlignedRecord() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); int status; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -51,7 +55,7 @@ public async Task TestInsertAlignedRecord() var start_ms = DateTime.Now.Ticks / 10000; for (var timestamp = 1; timestamp <= fetchSize * processedSize; timestamp++) { - var rowRecord = new RowRecord(timestamp, values, measures); + var rowRecord = new RowRecord(timestamp, values, measures, new List { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 }); var task = session_pool.InsertAlignedRecordAsync( string.Format("{0}.{1}", testDatabaseName, testDevice), rowRecord); tasks.Add(task); @@ -65,7 +69,11 @@ public async Task TestInsertAlignedRecord() } public async Task TestInsertAlignedStringRecord() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -98,7 +106,7 @@ public async Task TestInsertAlignedStringRecord() Console.WriteLine(string.Format("total insert aligned string record time is {0}", end_ms - start_ms)); var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_cnt = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_cnt++; res.Next(); @@ -111,7 +119,11 @@ public async Task TestInsertAlignedStringRecord() } public async Task TestInsertAlignedRecords() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -168,6 +180,10 @@ public async Task TestInsertAlignedRecords() testMeasurements[5], testMeasurements[6] }); + var dataTypes_lst = new List>() { }; + dataTypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32 }); + dataTypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE }); + dataTypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT }); var values_lst = new List>() { }; values_lst.Add(new List() { true, (int)123 }); values_lst.Add(new List() { true, (int)123, (long)456, (double)1.1 }); @@ -177,7 +193,7 @@ public async Task TestInsertAlignedRecords() var rowRecords = new List() { }; for (var i = 0; i < 3; i++) { - var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]); + var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i], dataTypes_lst[i]); rowRecords.Add(rowRecord); } @@ -185,7 +201,7 @@ public async Task TestInsertAlignedRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); Console.WriteLine(rowRecords); System.Diagnostics.Debug.Assert(true); @@ -201,7 +217,8 @@ public async Task TestInsertAlignedRecords() { device_id.Add(string.Format("{0}.{1}", testDatabaseName, testDevice)); rowRecords.Add(new RowRecord(timestamp, new List() { true, (int)123 }, - new List() { testMeasurements[1], testMeasurements[2] })); + new List() { testMeasurements[1], testMeasurements[2] }, + new List() { TSDataType.BOOLEAN, TSDataType.INT32 })); if (timestamp % fetchSize == 0) { tasks.Add(session_pool.InsertAlignedRecordsAsync(device_id, rowRecords)); @@ -216,7 +233,7 @@ public async Task TestInsertAlignedRecords() res.ShowTableNames(); var record_count = fetchSize * processedSize; var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; Console.WriteLine(res.Next()); @@ -234,7 +251,11 @@ public async Task TestInsertAlignedRecords() } public async Task TestInsertAlignedStringRecords() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -267,7 +288,7 @@ public async Task TestInsertAlignedStringRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); @@ -298,7 +319,7 @@ public async Task TestInsertAlignedStringRecords() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(res.Next()); res_count += 1; @@ -314,7 +335,11 @@ public async Task TestInsertAlignedStringRecords() } public async Task TestInsertAlignedRecordsOfOneDevice() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -377,17 +402,21 @@ public async Task TestInsertAlignedRecordsOfOneDevice() values_lst.Add(new List() {true, (int) 123, (long) 456, (double) 1.1, (float) 10001.1, "test_record"}); var timestamp_lst = new List() { 1, 2, 3 }; + var dataTypes_lst = new List>() { }; + dataTypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32 }); + dataTypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE }); + dataTypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT }); var rowRecords = new List() { }; for (var i = 0; i < 3; i++) { - var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]); + var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i], dataTypes_lst[i]); rowRecords.Add(rowRecord); } status = await session_pool.InsertAlignedRecordsOfOneDeviceAsync(device_id, rowRecords); System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); rowRecords = new List() { }; @@ -395,7 +424,8 @@ public async Task TestInsertAlignedRecordsOfOneDevice() for (var timestamp = 4; timestamp <= fetchSize * processedSize; timestamp++) { rowRecords.Add(new RowRecord(timestamp, new List() { true, (int)123 }, - new List() { testMeasurements[1], testMeasurements[2] })); + new List() { testMeasurements[1], testMeasurements[2] }, + new List() { TSDataType.BOOLEAN, TSDataType.INT32 })); if (timestamp % fetchSize == 0) { tasks.Add(session_pool.InsertAlignedRecordsOfOneDeviceAsync(device_id, rowRecords)); @@ -407,7 +437,7 @@ public async Task TestInsertAlignedRecordsOfOneDevice() res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -423,7 +453,11 @@ public async Task TestInsertAlignedRecordsOfOneDevice() } public async Task TestInsertAlignedStringRecordsOfOneDevice() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -454,7 +488,7 @@ public async Task TestInsertAlignedStringRecordsOfOneDevice() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -480,7 +514,7 @@ public async Task TestInsertAlignedStringRecordsOfOneDevice() res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs index 0229cf8..86de613 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs @@ -29,7 +29,11 @@ public partial class SessionPoolTest { public async Task TestInsertAlignedTablet() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -54,7 +58,7 @@ public async Task TestInsertAlignedTablet() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -83,7 +87,7 @@ public async Task TestInsertAlignedTablet() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -100,7 +104,11 @@ public async Task TestInsertAlignedTablet() public async Task TestInsertAlignedTablets() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -148,7 +156,7 @@ public async Task TestInsertAlignedTablets() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); // large data test var tasks = new List>(); @@ -175,7 +183,7 @@ public async Task TestInsertAlignedTablets() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1])); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs index a09d796..10932ff 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs @@ -30,7 +30,11 @@ public partial class SessionPoolTest public async Task TestInsertRecord() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); int status; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -58,7 +62,7 @@ public async Task TestInsertRecord() var start_ms = DateTime.Now.Ticks / 10000; for (var timestamp = 1; timestamp <= fetchSize * processedSize; timestamp++) { - var rowRecord = new RowRecord(timestamp, values, measures); + var rowRecord = new RowRecord(timestamp, values, measures, new List { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 }); var task = session_pool.InsertRecordAsync( string.Format("{0}.{1}", testDatabaseName, testDevice), rowRecord); tasks.Add(task); @@ -73,7 +77,11 @@ public async Task TestInsertRecord() } public async Task TestInsertStringRecord() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -111,7 +119,7 @@ public async Task TestInsertStringRecord() Console.WriteLine(string.Format("total insert string record time is {0}", end_ms - start_ms)); var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_cnt = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_cnt++; res.Next(); @@ -124,7 +132,11 @@ public async Task TestInsertStringRecord() } public async Task TestInsertStrRecord() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -143,13 +155,13 @@ public async Task TestInsertStrRecord() var measures = new List { testMeasurements[1], testMeasurements[2] }; var values = new List { (int)1, (int)2 }; - var rowRecord = new RowRecord(1, values, measures); + var rowRecord = new RowRecord(1, values, measures, new List { TSDataType.INT32, TSDataType.INT32 }); status = await session_pool.InsertRecordAsync( string.Format("{0}.{1}", testDatabaseName, testDevice), rowRecord); System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<2"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); @@ -157,7 +169,7 @@ public async Task TestInsertStrRecord() // large data test var rowRecords = new List() { }; for (var timestamp = 2; timestamp <= fetchSize * processedSize; timestamp++) - rowRecords.Add(new RowRecord(timestamp, values, measures)); + rowRecords.Add(new RowRecord(timestamp, values, measures, new List { TSDataType.INT32, TSDataType.INT32 })); for (var timestamp = 2; timestamp <= fetchSize * processedSize; timestamp++) { @@ -170,7 +182,7 @@ public async Task TestInsertStrRecord() res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -185,7 +197,11 @@ public async Task TestInsertStrRecord() } public async Task TestInsertRecords() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -245,17 +261,18 @@ public async Task TestInsertRecords() {true, (int) 123, (long) 456, (double) 1.1, (float) 10001.1, "test_record"}); var timestamp_lst = new List() { 1, 2, 3 }; var rowRecords = new List() { }; - for (var i = 0; i < 3; i++) - { - var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]); - rowRecords.Add(rowRecord); - } + rowRecords.Add(new RowRecord(timestamp_lst[0], values_lst[0], measurements_lst[0], + new List { TSDataType.BOOLEAN, TSDataType.INT32 })); + rowRecords.Add(new RowRecord(timestamp_lst[1], values_lst[1], measurements_lst[1], + new List { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE })); + rowRecords.Add(new RowRecord(timestamp_lst[2], values_lst[2], measurements_lst[2], + new List { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT })); status = await session_pool.InsertRecordsAsync(device_id, rowRecords); System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine(status); @@ -268,7 +285,8 @@ public async Task TestInsertRecords() { device_id.Add(string.Format("{0}.{1}", testDatabaseName, testDevice)); rowRecords.Add(new RowRecord(timestamp, new List() { true, (int)123 }, - new List() { testMeasurements[1], testMeasurements[2] })); + new List() { testMeasurements[1], testMeasurements[2] }, + new List { TSDataType.BOOLEAN, TSDataType.INT32 })); if (timestamp % fetchSize == 0) { tasks.Add(session_pool.InsertRecordsAsync(device_id, rowRecords)); @@ -285,7 +303,7 @@ public async Task TestInsertRecords() res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -300,7 +318,7 @@ public async Task TestInsertRecords() res = await session_pool.ExecuteQueryStatementAsync(sql); res.ShowTableNames(); RowRecord row = null; - while (res.HasNext()) + while (await res.HasNextAsync()) { row = res.Next(); break; @@ -318,7 +336,11 @@ public async Task TestInsertRecords() } public async Task TestInsertStringRecords() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -351,7 +373,7 @@ public async Task TestInsertStringRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); @@ -383,7 +405,7 @@ public async Task TestInsertStringRecords() res.ShowTableNames(); var record_count = fetchSize * processedSize; var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(res.Next()); res_count += 1; @@ -400,7 +422,11 @@ public async Task TestInsertStringRecords() } public async Task TestInsertRecordsOfOneDevice() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -457,17 +483,18 @@ public async Task TestInsertRecordsOfOneDevice() {true, (int) 123, (long) 456, (double) 1.1, (float) 10001.1, "test_record"}); var timestamp_lst = new List() { 1, 2, 3 }; var rowRecords = new List() { }; - for (var i = 0; i < 3; i++) - { - var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]); - rowRecords.Add(rowRecord); - } + rowRecords.Add(new RowRecord(timestamp_lst[0], values_lst[0], measurements_lst[0], + new List { TSDataType.BOOLEAN, TSDataType.INT32 })); + rowRecords.Add(new RowRecord(timestamp_lst[1], values_lst[1], measurements_lst[1], + new List { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE })); + rowRecords.Add(new RowRecord(timestamp_lst[2], values_lst[2], measurements_lst[2], + new List { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT })); status = await session_pool.InsertRecordsOfOneDeviceAsync(device_id, rowRecords); System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -476,7 +503,8 @@ public async Task TestInsertRecordsOfOneDevice() for (var timestamp = 4; timestamp <= fetchSize * processedSize; timestamp++) { rowRecords.Add(new RowRecord(timestamp, new List() { true, (int)123 }, - new List() { testMeasurements[1], testMeasurements[2] })); + new List() { testMeasurements[1], testMeasurements[2] }, + new List { TSDataType.BOOLEAN, TSDataType.INT32 })); if (timestamp % fetchSize == 0) { tasks.Add(session_pool.InsertRecordsOfOneDeviceAsync(device_id, rowRecords)); @@ -488,7 +516,7 @@ public async Task TestInsertRecordsOfOneDevice() res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -504,7 +532,11 @@ public async Task TestInsertRecordsOfOneDevice() } public async Task TestInsertStringRecordsOfOneDevice() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -541,7 +573,7 @@ public async Task TestInsertStringRecordsOfOneDevice() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -567,7 +599,7 @@ public async Task TestInsertStringRecordsOfOneDevice() res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -584,7 +616,11 @@ public async Task TestInsertStringRecordsOfOneDevice() public async Task TestInsertRecordsWithAllType() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -648,7 +684,7 @@ public async Task TestInsertRecordsWithAllType() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(res.Next()); res_count += 1; diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs index b200b20..f9d2b89 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs @@ -29,7 +29,11 @@ public partial class SessionPoolTest { public async Task TestInsertTablet() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -55,7 +59,7 @@ public async Task TestInsertTablet() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -83,7 +87,7 @@ public async Task TestInsertTablet() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -100,7 +104,11 @@ public async Task TestInsertTablet() public async Task TestInsertTablets() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -148,7 +156,7 @@ public async Task TestInsertTablets() // System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); // large data test @@ -176,7 +184,7 @@ public async Task TestInsertTablets() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1])); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -192,7 +200,11 @@ public async Task TestInsertTablets() } public async Task TestInsertTabletWithNullValue() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -215,7 +227,7 @@ public async Task TestInsertTabletWithNullValue() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(res.Next()); res_count += 1; @@ -230,7 +242,11 @@ public async Task TestInsertTabletWithNullValue() public async Task TestInsertTabletWithAllType() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -278,7 +294,7 @@ public async Task TestInsertTabletWithAllType() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(res.Next()); res_count += 1; diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs index 866a7ab..c9174cd 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Template.cs @@ -28,7 +28,11 @@ public partial class SessionPoolTest { public async Task TestCreateAndDropSchemaTemplate() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -63,7 +67,11 @@ public async Task TestCreateAndDropSchemaTemplate() public async Task TestSetAndUnsetSchemaTemplate() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs index fffd540..9eb4952 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs @@ -28,7 +28,11 @@ public partial class SessionPoolTest { public async Task TestTestInsertRecord() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); int status; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -58,7 +62,7 @@ public async Task TestTestInsertRecord() var start_ms = DateTime.Now.Ticks / 10000; for (var timestamp = 1; timestamp <= fetchSize * processedSize; timestamp++) { - var rowRecord = new RowRecord(timestamp, values, measures); + var rowRecord = new RowRecord(timestamp, values, measures, new List { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 }); var task = session_pool.TestInsertRecordAsync( string.Format("{0}.{1}", testDatabaseName, testDevice), rowRecord); tasks.Add(task); @@ -74,7 +78,11 @@ public async Task TestTestInsertRecord() public async Task TestTestInsertRecords() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -132,11 +140,15 @@ public async Task TestTestInsertRecords() values_lst.Add(new List() { true, (int)123, (long)456, (double)1.1 }); values_lst.Add(new List() {true, (int) 123, (long) 456, (double) 1.1, (float) 10001.1, "test_record"}); + var datatypes_lst = new List>() { }; + datatypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32 }); + datatypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE }); + datatypes_lst.Add(new List() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT }); var timestamp_lst = new List() { 1, 2, 3 }; var rowRecords = new List() { }; for (var i = 0; i < 3; i++) { - var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]); + var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i], datatypes_lst[i]); rowRecords.Add(rowRecord); } @@ -144,7 +156,7 @@ public async Task TestTestInsertRecords() // System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); @@ -156,7 +168,8 @@ public async Task TestTestInsertRecords() { device_id.Add(string.Format("{0}.{1}", testDatabaseName, testDevice)); rowRecords.Add(new RowRecord(timestamp, new List() { true, (int)123 }, - new List() { testMeasurements[1], testMeasurements[2] })); + new List() { testMeasurements[1], testMeasurements[2] }, + new List() { TSDataType.BOOLEAN, TSDataType.INT32 })); if (timestamp % fetchSize == 0) { tasks.Add(session_pool.TestInsertRecordsAsync(device_id, rowRecords)); @@ -171,7 +184,7 @@ public async Task TestTestInsertRecords() res.ShowTableNames(); var record_count = fetchSize * processedSize; var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -188,7 +201,11 @@ public async Task TestTestInsertRecords() public async Task TestTestInsertTablet() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -213,7 +230,7 @@ public async Task TestTestInsertTablet() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -241,7 +258,7 @@ public async Task TestTestInsertTablet() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); @@ -257,7 +274,11 @@ public async Task TestTestInsertTablet() public async Task TestTestInsertTablets() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -305,7 +326,7 @@ public async Task TestTestInsertTablets() // System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -333,7 +354,7 @@ public async Task TestTestInsertTablets() "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1])); res.ShowTableNames(); var res_count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { res_count += 1; res.Next(); diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs index 049407c..08d78f2 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.TimeSeries.cs @@ -100,7 +100,11 @@ public async Task TestDeleteTimeSeries() } public async Task TestCreateTimeSeries() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -172,7 +176,11 @@ public async Task TestCreateAlignedTimeseries() } public async Task TestCheckTimeSeriesExists() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Utils.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Utils.cs index 94f1e85..6b9ba4a 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Utils.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Utils.cs @@ -19,13 +19,14 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Apache.IoTDB.DataStructure; namespace Apache.IoTDB.Samples { public partial class SessionPoolTest { - public static void PrintDataSetByObject(SessionDataSet sessionDataSet) + public static async Task PrintDataSetByObject(SessionDataSet sessionDataSet) { IReadOnlyList columns = sessionDataSet.GetColumnNames(); @@ -35,7 +36,7 @@ public static void PrintDataSetByObject(SessionDataSet sessionDataSet) } Console.WriteLine(); - while (sessionDataSet.HasNext()) + while (await sessionDataSet.HasNextAsync()) { for (int i = 0; i < columns.Count; i++) { @@ -47,7 +48,7 @@ public static void PrintDataSetByObject(SessionDataSet sessionDataSet) } } - public static void PrintDataSetByString(SessionDataSet sessionDataSet) + public static async Task PrintDataSetByString(SessionDataSet sessionDataSet) { IReadOnlyList columns = sessionDataSet.GetColumnNames(); @@ -57,7 +58,7 @@ public static void PrintDataSetByString(SessionDataSet sessionDataSet) } Console.WriteLine(); - while (sessionDataSet.HasNext()) + while (await sessionDataSet.HasNextAsync()) { for (int i = 0; i < columns.Count; i++) { diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs index c561c96..4d6138d 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs @@ -192,14 +192,18 @@ public async Task TestOpenWithNodeUrlsAndInsertOneRecord() status = await session_pool.CreateTimeSeries( string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[2]), TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY); - var rowRecord = new RowRecord(1668404120807, new() { "1111111", "22222", "333333" }, new() { testMeasurements[0], testMeasurements[1], testMeasurements[2] }); + var rowRecord = new RowRecord(1668404120807, new List() { "1111111", "22222", "333333" }, new List() { testMeasurements[0], testMeasurements[1], testMeasurements[2] }, new List() { TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT }); status = await session_pool.InsertRecordsAsync(new List() { string.Format("{0}.{1}", testDatabaseName, testDevice) }, new List() { rowRecord }); Debug.Assert(status == 0); Console.WriteLine("TestOpenWithNodeUrlsAndInsertOneRecord Passed!"); } public async Task TestInsertOneRecord() { - var session_pool = new SessionPool(host, port, 1); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(1) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -212,12 +216,16 @@ public async Task TestInsertOneRecord() status = await session_pool.CreateTimeSeries( string.Format("{0}.{1}.{2}", testDatabaseName, testDevice, testMeasurements[2]), TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY); - var rowRecord = new RowRecord(1668404120807, new() { "1111111", "22222", "333333" }, new() { testMeasurements[0], testMeasurements[1], testMeasurements[2] }); + var rowRecord = new RowRecord(1668404120807, new List() { "1111111", "22222", "333333" }, new List() { testMeasurements[0], testMeasurements[1], testMeasurements[2] }, new List() { TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT }); status = await session_pool.InsertRecordsAsync(new List() { string.Format("{0}.{1}", testDatabaseName, testDevice) }, new List() { rowRecord }); } public async Task TestGetTimeZone() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -233,7 +241,11 @@ public async Task TestGetTimeZone() public async Task TestCreateAndDeleteDatabase() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -249,7 +261,11 @@ public async Task TestCreateAndDeleteDatabase() public async Task TestDeleteDatabase() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -270,7 +286,11 @@ public async Task TestDeleteDatabase() public async Task TestSetTimeZone() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -284,7 +304,11 @@ public async Task TestSetTimeZone() public async Task TestDeleteData() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -310,21 +334,22 @@ public async Task TestDeleteData() testMeasurements[1], testMeasurements[2], testMeasurements[3] }; var values = new List { "test_text", true, (int)123 }; + var dataTypes = new List { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 }; status = await session_pool.InsertRecordAsync( - string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(1, values, measures)); + string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(1, values, measures, dataTypes)); System.Diagnostics.Debug.Assert(status == 0); status = await session_pool.InsertRecordAsync( - string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(2, values, measures)); + string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(2, values, measures, dataTypes)); System.Diagnostics.Debug.Assert(status == 0); status = await session_pool.InsertRecordAsync( - string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(3, values, measures)); + string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(3, values, measures, dataTypes)); System.Diagnostics.Debug.Assert(status == 0); status = await session_pool.InsertRecordAsync( - string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(4, values, measures)); + string.Format("{0}.{1}", testDatabaseName, testDevice), new RowRecord(4, values, measures, dataTypes)); System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); var ts_path_lst = new List() @@ -335,7 +360,7 @@ public async Task TestDeleteData() await session_pool.DeleteDataAsync(ts_path_lst, 2, 3); res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -346,7 +371,11 @@ public async Task TestDeleteData() public async Task TestNonSql() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -373,7 +402,7 @@ await session_pool.ExecuteNonQueryStatementAsync( var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -425,7 +454,11 @@ await cnt.CreateCommand( public async Task TestSqlQuery() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -450,28 +483,28 @@ await session_pool.ExecuteNonQueryStatementAsync( "insert into " + string.Format("{0}.{1}", testDatabaseName, testDevice) + "(timestamp, status, hardware) VALUES (7, true,'lz')"); var res = await session_pool.ExecuteQueryStatementAsync("show timeseries root"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("SHOW TIMESERIES ROOT sql passed!"); res = await session_pool.ExecuteQueryStatementAsync("show devices"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("SHOW DEVICES sql passed!"); res = await session_pool.ExecuteQueryStatementAsync($"COUNT TIMESERIES {testDatabaseName}"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("COUNT TIMESERIES root sql Passed"); res = await session_pool.ExecuteQueryStatementAsync("select * from root.ln.wf01 where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("SELECT sql Passed"); res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - SessionPoolTest.PrintDataSetByString(res); + await SessionPoolTest.PrintDataSetByString(res); await res.Close(); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -481,7 +514,11 @@ await session_pool.ExecuteNonQueryStatementAsync( } public async Task TestRawDataQuery() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -501,7 +538,7 @@ public async Task TestRawDataQuery() var device_id_lst = new List() { }; for (int i = 1; i <= fetchSize * processedSize; i++) { - var record = new RowRecord(i, values, measurements); + var record = new RowRecord(i, values, measurements, data_type_lst); records.Add(record); device_id_lst.Add(device_id); } @@ -512,7 +549,7 @@ public async Task TestRawDataQuery() var res = await session_pool.ExecuteRawDataQuery(paths, 10, fetchSize * processedSize); var count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { count++; res.Next(); @@ -528,7 +565,11 @@ public async Task TestRawDataQuery() } public async Task TestLastDataQuery() { - var session_pool = new SessionPool(host, port, poolSize); + var session_pool = new SessionPool.Builder() + .SetHost(host) + .SetPort(port) + .SetPoolSize(poolSize) + .Build(); var status = 0; await session_pool.Open(false); if (debug) session_pool.OpenDebugMode(); @@ -548,7 +589,7 @@ public async Task TestLastDataQuery() var device_id_lst = new List() { }; for (int i = 1; i <= fetchSize * processedSize; i++) { - var record = new RowRecord(i, values, measurements); + var record = new RowRecord(i, values, measurements, data_type_lst); records.Add(record); device_id_lst.Add(device_id); } @@ -559,7 +600,7 @@ public async Task TestLastDataQuery() var res = await session_pool.ExecuteLastDataQueryAsync(paths, fetchSize * processedSize - 10); var count = 0; - while (res.HasNext()) + while (await res.HasNextAsync()) { Console.WriteLine(count); res.Next(); @@ -603,7 +644,7 @@ public async Task TestMultiNodeDataFetch() var device_id_lst = new List() { }; for (int i = 1; i <= fetchSize * processedSize * 4 + 783; i++) { - var record = new RowRecord(i, values, measurements); + var record = new RowRecord(i, values, measurements, data_type_lst); records.Add(record); device_id_lst.Add(device_id); } @@ -623,7 +664,7 @@ public async Task TestMultiNodeDataFetch() Console.WriteLine(); var count = 0; - while (res.HasNext()) count++; + while (await res.HasNextAsync()) count++; Console.WriteLine(count + " " + (fetchSize * processedSize * 4 + 783)); System.Diagnostics.Debug.Assert(count == fetchSize * processedSize * 4 + 783); diff --git a/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs b/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs index cebcd2f..4760c98 100644 --- a/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs +++ b/samples/Apache.IoTDB.Samples/TableSessionPoolTest.cs @@ -73,14 +73,14 @@ await tableSessionPool.ExecuteNonQueryStatementAsync( // show tables from current database var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES"); res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + while (await res.HasNextAsync()) Console.WriteLine(res.Next()); await res.Close(); // show tables by specifying another database // using SHOW tables FROM res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1"); res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + while (await res.HasNextAsync()) Console.WriteLine(res.Next()); await res.Close(); var tableName = "testTable1"; @@ -123,7 +123,7 @@ await tableSessionPool.ExecuteNonQueryStatementAsync( res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 " + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'"); res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + while (await res.HasNextAsync()) Console.WriteLine(res.Next()); await res.Close(); await tableSessionPool.Close(); @@ -148,7 +148,7 @@ public async Task TestUseDatabase() // show tables from current database var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES"); res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + while (await res.HasNextAsync()) Console.WriteLine(res.Next()); await res.Close(); await tableSessionPool.ExecuteNonQueryStatementAsync("use test2"); @@ -156,7 +156,7 @@ public async Task TestUseDatabase() // show tables from current database res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES"); res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + while (await res.HasNextAsync()) Console.WriteLine(res.Next()); await res.Close(); await tableSessionPool.Close(); @@ -225,7 +225,7 @@ await tableSessionPool.ExecuteNonQueryStatementAsync( var res = await tableSessionPool.ExecuteQueryStatementAsync("select count(*) from " + tableName + " where f1 is null"); - while (res.HasNext()) + while (await res.HasNextAsync()) { var row = res.Next(); Console.WriteLine(row); diff --git a/src/Apache.IoTDB.Data/IoTDBCommand.cs b/src/Apache.IoTDB.Data/IoTDBCommand.cs index f717a83..1a28508 100644 --- a/src/Apache.IoTDB.Data/IoTDBCommand.cs +++ b/src/Apache.IoTDB.Data/IoTDBCommand.cs @@ -368,6 +368,7 @@ private RowRecord BindParamters(IoTDBParameterCollection pms) { var measures = new List(); var values = new List(); + var dataTypes = new List(); for (int i = 0; i < pms.Count; i++) @@ -380,13 +381,16 @@ private RowRecord BindParamters(IoTDBParameterCollection pms) { case TypeCode.Boolean: values.Add((tp.Value as bool?).GetValueOrDefault()); + dataTypes.Add(TSDataType.BOOLEAN); break; case TypeCode.Char: values.Add(tp.Value as string); + dataTypes.Add(TSDataType.TEXT); break; case TypeCode.Byte: case TypeCode.SByte: values.Add((tp.Value as byte?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT32); break; case TypeCode.DateTime: var t0 = tp.Value as DateTime?; @@ -395,43 +399,53 @@ private RowRecord BindParamters(IoTDBParameterCollection pms) throw new ArgumentException($"InvalidArgumentOfDateTime{tp.Value}"); } values.Add(t0.GetValueOrDefault()); + dataTypes.Add(TSDataType.DATE); break; case TypeCode.DBNull: break; case TypeCode.Single: values.Add((tp.Value as float?).GetValueOrDefault()); + dataTypes.Add(TSDataType.FLOAT); break; case TypeCode.Decimal: case TypeCode.Double: values.Add((tp.Value as double?).GetValueOrDefault()); + dataTypes.Add(TSDataType.DOUBLE); break; case TypeCode.Int16: values.Add((tp.Value as short?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT32); break; case TypeCode.Int32: values.Add((tp.Value as int?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT32); break; case TypeCode.Int64: values.Add((tp.Value as long?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT64); break; case TypeCode.UInt16: values.Add((tp.Value as short?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT32); break; case TypeCode.UInt32: values.Add((tp.Value as uint?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT64); break; case TypeCode.UInt64: values.Add((tp.Value as ulong?).GetValueOrDefault()); + dataTypes.Add(TSDataType.INT64); break; case TypeCode.String: default: values.Add(tp.Value as string); + dataTypes.Add(TSDataType.TEXT); break; } } - return new RowRecord(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), values, measures); + return new RowRecord(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), values, measures, dataTypes); } /// diff --git a/src/Apache.IoTDB.Data/IoTDBDataReader.cs b/src/Apache.IoTDB.Data/IoTDBDataReader.cs index fe946b9..9010ced 100644 --- a/src/Apache.IoTDB.Data/IoTDBDataReader.cs +++ b/src/Apache.IoTDB.Data/IoTDBDataReader.cs @@ -137,7 +137,9 @@ public override bool Read() { throw new InvalidOperationException($"DataReaderClosed{nameof(Read)}"); } +#pragma warning disable CS0618 // HasNext is obsolete, but DbDataReader.Read() must be synchronous if (_dataSet.HasNext()) +#pragma warning restore CS0618 { rowdata = _dataSet.Next(); } @@ -455,7 +457,9 @@ public override int GetValues(object[] values) /// A System.Data.DataTable that describes the column metadata. public override DataTable GetSchemaTable() { +#pragma warning disable CS0618 // HasNext is obsolete, but GetSchemaTable() must be synchronous if (_dataSet.HasNext()) +#pragma warning restore CS0618 { rowdata = _dataSet.Next(); } diff --git a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs index 5eeed48..350c417 100644 --- a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs @@ -140,7 +140,7 @@ public RpcDataSet(string sql, List columnNameList, List columnTy _tsBlockSize = 0; _tsBlockIndex = -1; - _zoneId = TimeZoneInfo.FindSystemTimeZoneById(zoneId); + _zoneId = FindTimeZoneSafe(zoneId); if (columnIndex2TsBlockColumnIndexList.Count != _columnNameList.Count) throw new ArgumentException("Column index list size mismatch"); @@ -194,7 +194,13 @@ public async Task Close() _isClosed = true; } + [Obsolete("Use NextAsync() instead. This synchronous method may cause deadlocks in certain synchronization contexts.")] public bool Next() + { + return NextAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + + public async Task NextAsync() { if (HasCachedBlock()) { @@ -212,7 +218,7 @@ public bool Next() if (_moreData) { - bool hasResultSet = FetchResults(); + bool hasResultSet = await FetchResultsAsync().ConfigureAwait(false); if (hasResultSet && HasCachedByteBuffer()) { ConstructOneTsBlock(); @@ -221,11 +227,11 @@ public bool Next() } } - Close().Wait(); + await Close().ConfigureAwait(false); return false; } - private bool FetchResults() + private async Task FetchResultsAsync() { if (_isClosed) throw new InvalidOperationException("Dataset closed"); @@ -242,16 +248,14 @@ private bool FetchResults() try { - var task = _client.ServiceClient.fetchResultsV2Async(req); - var resp = task.ConfigureAwait(false).GetAwaiter().GetResult(); + var resp = await _client.ServiceClient.fetchResultsV2Async(req).ConfigureAwait(false); if (!resp.HasResultSet) { - Close().Wait(); + await Close().ConfigureAwait(false); return false; } - // return _queryResult != null && _queryResultIndex < _queryResultSize; _queryResult = resp.QueryResult; _queryResultIndex = 0; _queryResultSize = _queryResult?.Count ?? 0; @@ -410,6 +414,7 @@ private int GetIntByTsBlockColumnIndex(int tsBlockColumnIndex) CheckRecord(); if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) { + if (tsBlockColumnIndex == -1) return checked((int)_curTsBlock.GetTimeByIndex(_tsBlockIndex)); _lastReadWasNull = false; TSDataType dataType = _curTsBlock.GetColumn(tsBlockColumnIndex).GetDataType(); if (dataType == TSDataType.INT64) @@ -624,6 +629,8 @@ public RowRecord GetRow() IReadOnlyList columns = _columnNameList; int i = 0; List fieldList = new List(); + List measurementList = new List(); + List dataTypeList = new List(); long timestamp = 0; foreach (string columnName in columns) { @@ -631,6 +638,15 @@ public RowRecord GetRow() string typeStr = _columnTypeList[i]; TSDataType dataType = Client.GetDataTypeByStr(typeStr); + // Identify the real time column by tsBlock index, not by data type + int tsBlockColumnIndex = GetTsBlockColumnIndexForColumnName(columnName); + if (tsBlockColumnIndex == -1) + { + timestamp = GetLong(columnName); + i += 1; + continue; + } + switch (dataType) { case TSDataType.BOOLEAN: @@ -643,8 +659,7 @@ public RowRecord GetRow() localfield = GetLong(columnName); break; case TSDataType.TIMESTAMP: - localfield = null; - timestamp = GetLong(columnName); + localfield = GetLong(columnName); break; case TSDataType.FLOAT: localfield = GetFloat(columnName); @@ -654,19 +669,28 @@ public RowRecord GetRow() break; case TSDataType.TEXT: case TSDataType.STRING: + localfield = GetString(columnName); + break; case TSDataType.BLOB: + var binary = GetBinary(columnName); + localfield = binary?.Data; + break; case TSDataType.DATE: - localfield = GetString(columnName); + localfield = GetDate(columnName); break; default: string err_msg = "value format not supported"; throw new TException(err_msg, null); } if (localfield != null) + { fieldList.Add(localfield); + measurementList.Add(columnName); + dataTypeList.Add(dataType); + } i += 1; } - return new RowRecord(timestamp, fieldList, _columnNameList); + return new RowRecord(timestamp, fieldList, measurementList, dataTypeList); } public DateTime GetTimestampByIndex(int columnIndex) @@ -701,6 +725,10 @@ public DateTime GetDate(string columnName) private DateTime GetDateByTsBlockColumnIndex(int tsBlockColumnIndex) { + if (tsBlockColumnIndex == -1) + { + return GetTimestampByTsBlockColumnIndex(tsBlockColumnIndex); + } int value = GetIntByTsBlockColumnIndex(tsBlockColumnIndex); return Int32ToDate(value); } @@ -768,6 +796,75 @@ private string FormatDatetime(string format, string precision, long value, TimeZ return convertedTime.ToString(format); } + private static readonly Dictionary IanaToWindows = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "Asia/Shanghai", "China Standard Time" }, + { "Asia/Chongqing", "China Standard Time" }, + { "Asia/Hong_Kong", "China Standard Time" }, + { "Asia/Urumqi", "China Standard Time" }, + { "Asia/Tokyo", "Tokyo Standard Time" }, + { "Asia/Seoul", "Korea Standard Time" }, + { "Asia/Singapore", "Singapore Standard Time" }, + { "Asia/Kolkata", "India Standard Time" }, + { "Asia/Calcutta", "India Standard Time" }, + { "Asia/Dubai", "Arabian Standard Time" }, + { "Europe/London", "GMT Standard Time" }, + { "Europe/Paris", "Romance Standard Time" }, + { "Europe/Berlin", "W. Europe Standard Time" }, + { "Europe/Moscow", "Russian Standard Time" }, + { "America/New_York", "Eastern Standard Time" }, + { "America/Chicago", "Central Standard Time" }, + { "America/Denver", "Mountain Standard Time" }, + { "America/Los_Angeles", "Pacific Standard Time" }, + { "America/Sao_Paulo", "E. South America Standard Time" }, + { "Australia/Sydney", "AUS Eastern Standard Time" }, + { "Pacific/Auckland", "New Zealand Standard Time" }, + { "Etc/UTC", "UTC" }, + { "UTC", "UTC" }, + { "GMT", "GMT Standard Time" }, + }; + + internal static TimeZoneInfo FindTimeZoneSafe(string zoneId) + { + if (string.IsNullOrEmpty(zoneId)) + return TimeZoneInfo.Utc; + + try + { + return TimeZoneInfo.FindSystemTimeZoneById(zoneId); + } + catch (TimeZoneNotFoundException) + { + // On Windows, IANA IDs (e.g. "Asia/Shanghai") are not recognized by older .NET runtimes. + // Try mapping to Windows time zone ID. + if (IanaToWindows.TryGetValue(zoneId, out string windowsId)) + { + try + { + return TimeZoneInfo.FindSystemTimeZoneById(windowsId); + } + catch (TimeZoneNotFoundException) { } + } + + // Reverse lookup: if a Windows ID was passed on a non-Windows system + foreach (var kvp in IanaToWindows) + { + if (string.Equals(kvp.Value, zoneId, StringComparison.OrdinalIgnoreCase)) + { + try + { + return TimeZoneInfo.FindSystemTimeZoneById(kvp.Key); + } + catch (TimeZoneNotFoundException) { } + } + } + + throw new TimeZoneNotFoundException( + $"Cannot resolve time zone ID '{zoneId}'. " + + $"Ensure it is a valid IANA (e.g. 'Asia/Shanghai') or Windows (e.g. 'China Standard Time') time zone ID."); + } + } + private int GetTsBlockColumnIndexForColumnName(string columnName) { if (!_columnName2TsBlockColumnIndexMap.TryGetValue(columnName, out int index)) diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index bc7cde4..dc21928 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -68,7 +68,9 @@ public SessionDataSet( DefaultTimeout, _zoneId, ColumnIndex2TsBlockColumnIndexList ); } + [Obsolete("Use HasNextAsync() instead. This synchronous method may cause deadlocks in certain synchronization contexts.")] public bool HasNext() => _rpcDataSet.Next(); + public async Task HasNextAsync() => await _rpcDataSet.NextAsync().ConfigureAwait(false); public RowRecord Next() => _rpcDataSet.GetRow(); public bool IsNull(string columnName) => _rpcDataSet.IsNullByColumnName(columnName); public bool IsNullByIndex(int columnIndex) => _rpcDataSet.IsNullByIndex(columnIndex); diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index 1d2aecf..8d42282 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -737,7 +737,7 @@ public async Task CheckTimeSeriesExistsAsync(string tsPath) { var sql = "SHOW TIMESERIES " + tsPath; var sessionDataSet = await ExecuteQueryStatementAsync(sql); - bool timeSeriesExists = sessionDataSet.HasNext(); + bool timeSeriesExists = await sessionDataSet.HasNextAsync(); await sessionDataSet.Close(); // be sure to close the SessionDataSet to put the client back to the pool return timeSeriesExists; }