|
110 | 110 | import java.util.Optional; |
111 | 111 | import java.util.Queue; |
112 | 112 | import java.util.Set; |
| 113 | +import java.util.StringJoiner; |
113 | 114 | import java.util.concurrent.ConcurrentHashMap; |
114 | 115 | import java.util.concurrent.Future; |
115 | 116 | import java.util.concurrent.ScheduledExecutorService; |
116 | 117 | import java.util.concurrent.TimeUnit; |
| 118 | +import java.util.concurrent.atomic.AtomicBoolean; |
117 | 119 | import java.util.concurrent.atomic.AtomicInteger; |
118 | 120 | import java.util.stream.Collectors; |
119 | 121 |
|
@@ -248,6 +250,21 @@ public SchemaPartitionResp getOrCreateSchemaPartition(final GetOrCreateSchemaPar |
248 | 250 | return resp; |
249 | 251 | } |
250 | 252 |
|
| 253 | + // Here we check if the related Databases exist again, |
| 254 | + // due to we don't have a transaction mechanism. |
| 255 | + for (final String database : req.getPartitionSlotsMap().keySet()) { |
| 256 | + if (!isDatabaseExist(database)) { |
| 257 | + return new SchemaPartitionResp( |
| 258 | + new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) |
| 259 | + .setMessage( |
| 260 | + String.format( |
| 261 | + "Create SchemaPartition failed because the database: %s does not exist", |
| 262 | + database)), |
| 263 | + false, |
| 264 | + null); |
| 265 | + } |
| 266 | + } |
| 267 | + |
251 | 268 | // Filter unassigned SchemaPartitionSlots |
252 | 269 | final Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap = |
253 | 270 | partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap()); |
@@ -311,14 +328,19 @@ public SchemaPartitionResp getOrCreateSchemaPartition(final GetOrCreateSchemaPar |
311 | 328 | final AtomicInteger unassignedSlotNum = new AtomicInteger(); |
312 | 329 | final Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap = |
313 | 330 | partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap()); |
| 331 | + StringJoiner errDatabases = new StringJoiner(", ", "[", "]"); |
314 | 332 | unassignedSchemaPartitionSlotsMap.forEach( |
315 | | - (database, unassignedSchemaPartitionSlots) -> |
316 | | - unassignedSlotNum.addAndGet(unassignedSchemaPartitionSlots.size())); |
| 333 | + (database, unassignedSchemaPartitionSlots) -> { |
| 334 | + if (!unassignedSchemaPartitionSlots.isEmpty()) { |
| 335 | + errDatabases.add(database); |
| 336 | + unassignedSlotNum.addAndGet(unassignedSchemaPartitionSlots.size()); |
| 337 | + } |
| 338 | + }); |
317 | 339 |
|
318 | 340 | final String errMsg = |
319 | 341 | String.format( |
320 | | - "Lacked %d/%d SchemaPartition allocation result in the response of getOrCreateSchemaPartition method", |
321 | | - unassignedSlotNum.get(), totalSlotNum.get()); |
| 342 | + "Lacked %d/%d SchemaPartition allocation result when get or create schema partitions for databases: %s", |
| 343 | + unassignedSlotNum.get(), totalSlotNum.get(), errDatabases); |
322 | 344 | LOGGER.error(errMsg); |
323 | 345 | resp.setStatus( |
324 | 346 | new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg)); |
@@ -373,6 +395,21 @@ public DataPartitionResp getOrCreateDataPartition(final GetOrCreateDataPartition |
373 | 395 | return resp; |
374 | 396 | } |
375 | 397 |
|
| 398 | + // Here we check if the related Databases exist again, |
| 399 | + // due to we don't have a transaction mechanism. |
| 400 | + for (final String database : req.getPartitionSlotsMap().keySet()) { |
| 401 | + if (!isDatabaseExist(database)) { |
| 402 | + return new DataPartitionResp( |
| 403 | + new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) |
| 404 | + .setMessage( |
| 405 | + String.format( |
| 406 | + "Create DataPartition failed because the database: %s does not exist", |
| 407 | + database)), |
| 408 | + false, |
| 409 | + null); |
| 410 | + } |
| 411 | + } |
| 412 | + |
376 | 413 | // Filter unassigned DataPartitionSlots |
377 | 414 | Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap = |
378 | 415 | partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap()); |
@@ -446,16 +483,26 @@ public DataPartitionResp getOrCreateDataPartition(final GetOrCreateDataPartition |
446 | 483 | AtomicInteger unassignedSlotNum = new AtomicInteger(); |
447 | 484 | Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> unassignedDataPartitionSlotsMap = |
448 | 485 | partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap()); |
| 486 | + StringJoiner errDatabases = new StringJoiner(", ", "[", "]"); |
449 | 487 | unassignedDataPartitionSlotsMap.forEach( |
450 | | - (database, unassignedDataPartitionSlots) -> |
451 | | - unassignedDataPartitionSlots.forEach( |
452 | | - (seriesPartitionSlot, timeSlotList) -> |
453 | | - unassignedSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size()))); |
| 488 | + (database, unassignedDataPartitionSlots) -> { |
| 489 | + AtomicBoolean hasUnassignedSlot = new AtomicBoolean(false); |
| 490 | + unassignedDataPartitionSlots.forEach( |
| 491 | + (seriesPartitionSlot, timeSlotList) -> { |
| 492 | + if (!timeSlotList.getTimePartitionSlots().isEmpty()) { |
| 493 | + hasUnassignedSlot.set(true); |
| 494 | + unassignedSlotNum.addAndGet(timeSlotList.getTimePartitionSlots().size()); |
| 495 | + } |
| 496 | + }); |
| 497 | + if (hasUnassignedSlot.get()) { |
| 498 | + errDatabases.add(database); |
| 499 | + } |
| 500 | + }); |
454 | 501 |
|
455 | 502 | String errMsg = |
456 | 503 | String.format( |
457 | | - "Lacked %d/%d DataPartition allocation result in the response of getOrCreateDataPartition method", |
458 | | - unassignedSlotNum.get(), totalSlotNum.get()); |
| 504 | + "Lacked %d/%d DataPartition allocation result when get or create data partitions for databases: %s", |
| 505 | + unassignedSlotNum.get(), totalSlotNum.get(), errDatabases); |
459 | 506 | LOGGER.error(errMsg); |
460 | 507 | resp.setStatus( |
461 | 508 | new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()).setMessage(errMsg)); |
|
0 commit comments