diff --git a/age--1.7.0--y.y.y.sql b/age--1.7.0--y.y.y.sql index 4f8c54a30..f711c0d43 100644 --- a/age--1.7.0--y.y.y.sql +++ b/age--1.7.0--y.y.y.sql @@ -408,3 +408,13 @@ $function$; COMMENT ON FUNCTION ag_catalog.age_pg_upgrade_status() IS 'Returns the current pg_upgrade readiness status of the AGE installation.'; + +-- +-- VLE cache invalidation trigger function +-- Installed on graph label tables to catch SQL-level mutations +-- and increment the per-graph version counter for VLE cache invalidation. +-- +CREATE FUNCTION ag_catalog.age_invalidate_graph_cache() + RETURNS trigger + LANGUAGE c +AS 'MODULE_PATHNAME'; diff --git a/regress/expected/age_global_graph.out b/regress/expected/age_global_graph.out index 478637800..b49d1a2f4 100644 --- a/regress/expected/age_global_graph.out +++ b/regress/expected/age_global_graph.out @@ -413,6 +413,180 @@ NOTICE: graph "ag_graph_3" has been dropped (1 row) +----------------------------------------------------------------------------------------------------------------------------- +-- +-- VLE cache invalidation tests +-- +-- These tests verify that the graph version counter properly invalidates +-- the VLE hash table cache when the graph is mutated, and that thin +-- entry lazy property fetch returns correct data. +-- +-- Setup: create a graph with a chain a->b->c->d +SELECT * FROM create_graph('vle_cache_test'); +NOTICE: graph "vle_cache_test" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('vle_cache_test', $$ + CREATE (a:Node {name: 'a'})-[:Edge]->(b:Node {name: 'b'})-[:Edge]->(c:Node {name: 'c'})-[:Edge]->(d:Node {name: 'd'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE query: find all paths from a's neighbors (should find b, b->c, b->c->d) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" + "c" + "d" +(3 rows) + +-- Now add a new node e connected to d. This should invalidate the cache. +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (d:Node {name: 'd'}) + CREATE (d)-[:Edge]->(:Node {name: 'e'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE query again: should now also find e via a->b->c->d->e (4 hops won't reach, +-- but d->e is 1 hop from d, and a->b->c->d->e would be 4 hops from a). +-- Increase range to *1..4 to include e +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" + "c" + "d" + "e" +(4 rows) + +-- Test cache invalidation on DELETE: remove node c and its edges +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (c:Node {name: 'c'}) + DETACH DELETE c +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE query: should only find b now (c is gone, so b->c path is broken) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +------ + "b" +(1 row) + +-- Test cache invalidation on SET: change b's name property +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (b:Node {name: 'b'}) + SET b.name = 'b_modified' + RETURN b.name +$$) AS (name agtype); + name +-------------- + "b_modified" +(1 row) + +-- VLE query: verify the updated property is returned via lazy fetch +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + name +-------------- + "b_modified" +(1 row) + +-- Test VLE with edge properties (exercises thin entry edge property fetch) +SELECT * FROM drop_graph('vle_cache_test', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table vle_cache_test._ag_label_vertex +drop cascades to table vle_cache_test._ag_label_edge +drop cascades to table vle_cache_test."Node" +drop cascades to table vle_cache_test."Edge" +NOTICE: graph "vle_cache_test" has been dropped + drop_graph +------------ + +(1 row) + +SELECT * FROM create_graph('vle_cache_test2'); +NOTICE: graph "vle_cache_test2" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('vle_cache_test2', $$ + CREATE (a:N {name: 'a'})-[:E {weight: 1}]->(b:N {name: 'b'})-[:E {weight: 2}]->(c:N {name: 'c'}) +$$) AS (v agtype); + v +--- +(0 rows) + +-- VLE path output to verify edge properties are fetched correctly via +-- thin entry lazy fetch. Returning the full path forces build_path() +-- to call get_edge_entry_properties() for each edge in the result. +-- The output must contain the correct weight values (1 and 2). +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + RETURN p + ORDER BY n.name +$$) AS (p agtype); + p +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + [{"id": 844424930131969, "label": "N", "properties": {"name": "a"}}::vertex, {"id": 1125899906842626, "label": "E", "end_id": 844424930131970, "start_id": 844424930131969, "properties": {"weight": 1}}::edge, {"id": 844424930131970, "label": "N", "properties": {"name": "b"}}::vertex]::path + [{"id": 844424930131969, "label": "N", "properties": {"name": "a"}}::vertex, {"id": 1125899906842626, "label": "E", "end_id": 844424930131970, "start_id": 844424930131969, "properties": {"weight": 1}}::edge, {"id": 844424930131970, "label": "N", "properties": {"name": "b"}}::vertex, {"id": 1125899906842625, "label": "E", "end_id": 844424930131971, "start_id": 844424930131970, "properties": {"weight": 2}}::edge, {"id": 844424930131971, "label": "N", "properties": {"name": "c"}}::vertex]::path +(2 rows) + +-- VLE edge properties via UNWIND + relationships() to individually verify +-- each edge's properties are correctly fetched from the heap via TID. +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + WITH p, n + UNWIND relationships(p) AS e + RETURN n.name, e.weight + ORDER BY n.name, e.weight +$$) AS (name agtype, weight agtype); + name | weight +------+-------- + "b" | 1 + "c" | 1 + "c" | 2 +(3 rows) + +-- Cleanup +SELECT * FROM drop_graph('vle_cache_test2', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table vle_cache_test2._ag_label_vertex +drop cascades to table vle_cache_test2._ag_label_edge +drop cascades to table vle_cache_test2."N" +drop cascades to table vle_cache_test2."E" +NOTICE: graph "vle_cache_test2" has been dropped + drop_graph +------------ + +(1 row) + ----------------------------------------------------------------------------------------------------------------------------- -- -- End of tests diff --git a/regress/sql/age_global_graph.sql b/regress/sql/age_global_graph.sql index 376681a8b..7750393bb 100644 --- a/regress/sql/age_global_graph.sql +++ b/regress/sql/age_global_graph.sql @@ -146,6 +146,103 @@ RESET client_min_messages; SELECT * FROM drop_graph('ag_graph_1', true); SELECT * FROM drop_graph('ag_graph_2', true); SELECT * FROM drop_graph('ag_graph_3', true); + +----------------------------------------------------------------------------------------------------------------------------- +-- +-- VLE cache invalidation tests +-- +-- These tests verify that the graph version counter properly invalidates +-- the VLE hash table cache when the graph is mutated, and that thin +-- entry lazy property fetch returns correct data. +-- + +-- Setup: create a graph with a chain a->b->c->d +SELECT * FROM create_graph('vle_cache_test'); + +SELECT * FROM cypher('vle_cache_test', $$ + CREATE (a:Node {name: 'a'})-[:Edge]->(b:Node {name: 'b'})-[:Edge]->(c:Node {name: 'c'})-[:Edge]->(d:Node {name: 'd'}) +$$) AS (v agtype); + +-- VLE query: find all paths from a's neighbors (should find b, b->c, b->c->d) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..3]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Now add a new node e connected to d. This should invalidate the cache. +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (d:Node {name: 'd'}) + CREATE (d)-[:Edge]->(:Node {name: 'e'}) +$$) AS (v agtype); + +-- VLE query again: should now also find e via a->b->c->d->e (4 hops won't reach, +-- but d->e is 1 hop from d, and a->b->c->d->e would be 4 hops from a). +-- Increase range to *1..4 to include e +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Test cache invalidation on DELETE: remove node c and its edges +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (c:Node {name: 'c'}) + DETACH DELETE c +$$) AS (v agtype); + +-- VLE query: should only find b now (c is gone, so b->c path is broken) +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Test cache invalidation on SET: change b's name property +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (b:Node {name: 'b'}) + SET b.name = 'b_modified' + RETURN b.name +$$) AS (name agtype); + +-- VLE query: verify the updated property is returned via lazy fetch +SELECT * FROM cypher('vle_cache_test', $$ + MATCH (a:Node {name: 'a'})-[:Edge*1..4]->(n:Node) + RETURN n.name + ORDER BY n.name +$$) AS (name agtype); + +-- Test VLE with edge properties (exercises thin entry edge property fetch) +SELECT * FROM drop_graph('vle_cache_test', true); +SELECT * FROM create_graph('vle_cache_test2'); + +SELECT * FROM cypher('vle_cache_test2', $$ + CREATE (a:N {name: 'a'})-[:E {weight: 1}]->(b:N {name: 'b'})-[:E {weight: 2}]->(c:N {name: 'c'}) +$$) AS (v agtype); + +-- VLE path output to verify edge properties are fetched correctly via +-- thin entry lazy fetch. Returning the full path forces build_path() +-- to call get_edge_entry_properties() for each edge in the result. +-- The output must contain the correct weight values (1 and 2). +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + RETURN p + ORDER BY n.name +$$) AS (p agtype); + +-- VLE edge properties via UNWIND + relationships() to individually verify +-- each edge's properties are correctly fetched from the heap via TID. +SELECT * FROM cypher('vle_cache_test2', $$ + MATCH p=(a:N {name: 'a'})-[:E *1..2]->(n:N) + WITH p, n + UNWIND relationships(p) AS e + RETURN n.name, e.weight + ORDER BY n.name, e.weight +$$) AS (name agtype, weight agtype); + +-- Cleanup +SELECT * FROM drop_graph('vle_cache_test2', true); + ----------------------------------------------------------------------------------------------------------------------------- -- -- End of tests diff --git a/sql/age_main.sql b/sql/age_main.sql index 59ada0f9f..3e9a71c92 100644 --- a/sql/age_main.sql +++ b/sql/age_main.sql @@ -381,3 +381,14 @@ CREATE FUNCTION ag_catalog._extract_label_id(graphid) STABLE PARALLEL SAFE AS 'MODULE_PATHNAME'; + +-- +-- VLE cache invalidation trigger function. +-- Installed on graph label tables to catch SQL-level mutations +-- (INSERT/UPDATE/DELETE/TRUNCATE) and increment the graph's +-- version counter so VLE caches are properly invalidated. +-- +CREATE FUNCTION ag_catalog.age_invalidate_graph_cache() + RETURNS trigger + LANGUAGE c +AS 'MODULE_PATHNAME'; diff --git a/src/backend/age.c b/src/backend/age.c index 2c016b021..f3e2c4e80 100644 --- a/src/backend/age.c +++ b/src/backend/age.c @@ -22,6 +22,29 @@ #include "optimizer/cypher_paths.h" #include "parser/cypher_analyze.h" #include "utils/ag_guc.h" +#include "utils/age_global_graph.h" + +#if PG_VERSION_NUM < 170000 +#include "miscadmin.h" + +/* saved hook pointers for PG < 17 shmem path */ +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +static void age_shmem_request_hook(void) +{ + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + age_graph_version_shmem_request(); +} + +static void age_shmem_startup_hook(void) +{ + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + age_graph_version_shmem_startup(); +} +#endif /* PG_VERSION_NUM < 170000 */ PG_MODULE_MAGIC; @@ -35,6 +58,15 @@ void _PG_init(void) process_utility_hook_init(); post_parse_analyze_init(); define_config_params(); + +#if PG_VERSION_NUM < 170000 + /* Register shared memory hooks for graph version tracking. + * On PG 17+, DSM is used instead (no hooks needed). */ + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = age_shmem_request_hook; + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = age_shmem_startup_hook; +#endif } void _PG_fini(void); diff --git a/src/backend/catalog/ag_catalog.c b/src/backend/catalog/ag_catalog.c index f4887f445..e5e91b556 100644 --- a/src/backend/catalog/ag_catalog.c +++ b/src/backend/catalog/ag_catalog.c @@ -25,12 +25,14 @@ #include "catalog/pg_class_d.h" #include "catalog/pg_namespace_d.h" #include "commands/defrem.h" +#include "nodes/parsenodes.h" #include "tcop/utility.h" #include "utils/lsyscache.h" #include "catalog/ag_graph.h" #include "catalog/ag_label.h" #include "utils/ag_cache.h" +#include "utils/age_global_graph.h" static object_access_hook_type prev_object_access_hook; static ProcessUtility_hook_type prev_process_utility_hook; @@ -94,19 +96,48 @@ void ag_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, { drop_age_extension((DropStmt *)pstmt->utilityStmt); } - else if (prev_process_utility_hook) - { - (*prev_process_utility_hook) (pstmt, queryString, readOnlyTree, context, - params, queryEnv, dest, qc); - } else { - Assert(IsA(pstmt, PlannedStmt)); - Assert(pstmt->commandType == CMD_UTILITY); - Assert(queryString != NULL); /* required as of 8.4 */ - Assert(qc == NULL || qc->commandTag == CMDTAG_UNKNOWN); - standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, - params, queryEnv, dest, qc); + /* + * Check for TRUNCATE on graph label tables. If any truncated + * table is a graph label table, increment the version counter + * for that graph to invalidate VLE caches. We do this before + * the truncate executes so the cache is invalidated regardless. + */ + if (IsA(pstmt->utilityStmt, TruncateStmt)) + { + TruncateStmt *tstmt = (TruncateStmt *) pstmt->utilityStmt; + ListCell *lc; + + foreach(lc, tstmt->relations) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid rel_oid = RangeVarGetRelid(rv, AccessShareLock, true); + + if (OidIsValid(rel_oid)) + { + Oid graph_oid = get_graph_oid_for_table(rel_oid); + + if (OidIsValid(graph_oid)) + increment_graph_version(graph_oid); + } + } + } + + if (prev_process_utility_hook) + { + (*prev_process_utility_hook) (pstmt, queryString, readOnlyTree, + context, params, queryEnv, dest, qc); + } + else + { + Assert(IsA(pstmt, PlannedStmt)); + Assert(pstmt->commandType == CMD_UTILITY); + Assert(queryString != NULL); + Assert(qc == NULL || qc->commandTag == CMDTAG_UNKNOWN); + standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, + params, queryEnv, dest, qc); + } } } diff --git a/src/backend/commands/label_commands.c b/src/backend/commands/label_commands.c index 051bbc8a0..ac789ecce 100644 --- a/src/backend/commands/label_commands.c +++ b/src/backend/commands/label_commands.c @@ -25,9 +25,11 @@ #include "commands/defrem.h" #include "commands/sequence.h" #include "commands/tablecmds.h" +#include "catalog/pg_trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parser.h" +#include "parser/parse_func.h" #include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -432,6 +434,64 @@ static void create_table_for_label(char *graph_name, char *label_name, create_index_on_column(schema_name, rel_name, "start_id", false); create_index_on_column(schema_name, rel_name, "end_id", false); } + + /* + * Install a cache invalidation trigger on the new label table, if the + * trigger function exists. The function is registered in the extension + * SQL (age_main.sql). It may not exist if running against an older + * version of the extension SQL that hasn't been upgraded yet. + * + * When installed, the trigger fires AFTER INSERT/UPDATE/DELETE/TRUNCATE + * (FOR EACH STATEMENT) and increments the graph's version counter so + * VLE caches are properly invalidated when the table is modified via SQL. + */ + { + Oid func_oid; + + /* check if the trigger function is registered in the catalog */ + func_oid = LookupFuncName( + list_make2(makeString("ag_catalog"), + makeString("age_invalidate_graph_cache")), + 0, NULL, true); + + if (OidIsValid(func_oid)) + { + CreateTrigStmt *trigger_stmt = makeNode(CreateTrigStmt); + PlannedStmt *trigger_wrapper; + + trigger_stmt->replace = false; + trigger_stmt->isconstraint = false; + trigger_stmt->trigname = "_age_cache_invalidate"; + trigger_stmt->relation = makeRangeVar(schema_name, rel_name, -1); + trigger_stmt->funcname = list_make2(makeString("ag_catalog"), + makeString("age_invalidate_graph_cache")); + trigger_stmt->args = NIL; + trigger_stmt->row = false; + trigger_stmt->timing = TRIGGER_TYPE_AFTER; + trigger_stmt->events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE | + TRIGGER_TYPE_DELETE | TRIGGER_TYPE_TRUNCATE; + trigger_stmt->columns = NIL; + trigger_stmt->whenClause = NULL; + trigger_stmt->transitionRels = NIL; + trigger_stmt->deferrable = false; + trigger_stmt->initdeferred = false; + trigger_stmt->constrrel = NULL; + + trigger_wrapper = makeNode(PlannedStmt); + trigger_wrapper->commandType = CMD_UTILITY; + trigger_wrapper->canSetTag = false; + trigger_wrapper->utilityStmt = (Node *) trigger_stmt; + trigger_wrapper->stmt_location = -1; + trigger_wrapper->stmt_len = 0; + + ProcessUtility(trigger_wrapper, + "(generated CREATE TRIGGER command)", + false, PROCESS_UTILITY_SUBCOMMAND, + NULL, NULL, None_Receiver, NULL); + + CommandCounterIncrement(); + } + } } static void create_index_on_column(char *schema_name, diff --git a/src/backend/executor/cypher_create.c b/src/backend/executor/cypher_create.c index 495eb3a08..876b6f250 100644 --- a/src/backend/executor/cypher_create.c +++ b/src/backend/executor/cypher_create.c @@ -25,6 +25,7 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/age_global_graph.h" static void begin_cypher_create(CustomScanState *node, EState *estate, int eflags); @@ -249,6 +250,9 @@ static TupleTableSlot *exec_cypher_create(CustomScanState *node) /* update the current command Id */ CommandCounterIncrement(); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(css->graph_oid); + /* if this was a terminal CREATE just return NULL */ if (terminal) { diff --git a/src/backend/executor/cypher_delete.c b/src/backend/executor/cypher_delete.c index 58503ec27..19eca3ad6 100644 --- a/src/backend/executor/cypher_delete.c +++ b/src/backend/executor/cypher_delete.c @@ -28,6 +28,7 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" +#include "utils/age_global_graph.h" #include "executor/cypher_utils.h" static void begin_cypher_delete(CustomScanState *node, EState *estate, @@ -193,8 +194,14 @@ static TupleTableSlot *exec_cypher_delete(CustomScanState *node) */ static void end_cypher_delete(CustomScanState *node) { + cypher_delete_custom_scan_state *css = + (cypher_delete_custom_scan_state *)node; + check_for_connected_edges(node); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(css->delete_data->graph_oid); + hash_destroy(((cypher_delete_custom_scan_state *)node)->vertex_id_htab); ExecEndNode(node->ss.ps.lefttree); diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index 1edfc812d..66170c1ed 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -26,6 +26,7 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/age_global_graph.h" /* * The following structure is used to hold a single vertex or edge component @@ -936,6 +937,10 @@ static void end_cypher_merge(CustomScanState *node) /* increment the command counter */ CommandCounterIncrement(); + /* invalidate VLE cache if merge created anything */ + if (css->created_new_path) + increment_graph_version(css->graph_oid); + ExecEndNode(node->ss.ps.lefttree); foreach (lc, path->target_nodes) diff --git a/src/backend/executor/cypher_set.c b/src/backend/executor/cypher_set.c index 09d3d3b54..8dfa63268 100644 --- a/src/backend/executor/cypher_set.c +++ b/src/backend/executor/cypher_set.c @@ -26,6 +26,8 @@ #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/age_global_graph.h" +#include "catalog/ag_graph.h" static void begin_cypher_set(CustomScanState *node, EState *estate, int eflags); @@ -829,6 +831,9 @@ static TupleTableSlot *exec_cypher_set(CustomScanState *node) /* increment the command counter to reflect the updates */ CommandCounterIncrement(); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(get_graph_oid(css->set_list->graph_name)); + return NULL; } @@ -837,6 +842,9 @@ static TupleTableSlot *exec_cypher_set(CustomScanState *node) /* increment the command counter to reflect the updates */ CommandCounterIncrement(); + /* invalidate VLE cache — graph was mutated */ + increment_graph_version(get_graph_oid(css->set_list->graph_name)); + estate->es_result_relations = saved_resultRels; econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); diff --git a/src/backend/utils/adt/age_global_graph.c b/src/backend/utils/adt/age_global_graph.c index f99d75abe..4564bcba4 100644 --- a/src/backend/utils/adt/age_global_graph.c +++ b/src/backend/utils/adt/age_global_graph.c @@ -21,14 +21,24 @@ #include "access/heapam.h" #include "catalog/namespace.h" +#include "commands/trigger.h" #include "common/hashfn.h" #include "commands/label_commands.h" +#include "port/atomics.h" +#include "storage/lwlock.h" #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/builtins.h" +#if PG_VERSION_NUM >= 170000 +#include "storage/dsm_registry.h" +#else +#include "storage/ipc.h" +#include "storage/shmem.h" +#endif + #include "utils/age_global_graph.h" #include "catalog/ag_graph.h" #include "catalog/ag_label.h" @@ -41,6 +51,52 @@ #define VERTEX_HTAB_INITIAL_SIZE 1000000 #define EDGE_HTAB_INITIAL_SIZE 1000000 +/* Maximum number of graphs tracked for version counting */ +#define AGE_MAX_GRAPHS 128 + +/* + * Graph version counter entry. Stored in shared memory (DSM or shmem) + * so that all backends can see mutation events. The version counter is + * incremented by Cypher mutations (CREATE/DELETE/SET/MERGE) and by + * SQL triggers on label tables. VLE cache invalidation checks this + * counter instead of snapshot xmin/xmax/curcid. + */ +typedef struct GraphVersionEntry +{ + Oid graph_oid; /* graph identifier (0 = unused slot) */ + pg_atomic_uint64 version; /* monotonic change counter */ +} GraphVersionEntry; + +/* + * Shared memory state for graph version tracking. + * Contains a fixed-size array of per-graph version counters. + */ +typedef struct GraphVersionState +{ + LWLock lock; /* protects slot allocation only */ + int num_entries; /* number of active entries */ + GraphVersionEntry entries[AGE_MAX_GRAPHS]; +} GraphVersionState; + +/* + * Version mode detection — determined once per backend on first use. + * DSM: PG 17+ GetNamedDSMSegment (no shared_preload_libraries needed) + * SHMEM: PG < 17 with shared_preload_libraries + * SNAPSHOT: PG < 17 without shared_preload_libraries (current behavior) + */ +typedef enum +{ + VERSION_MODE_UNKNOWN = 0, + VERSION_MODE_DSM, + VERSION_MODE_SHMEM, + VERSION_MODE_SNAPSHOT +} VersionMode; + +static VersionMode version_mode = VERSION_MODE_UNKNOWN; + +/* For PG < 17 shmem path */ +static GraphVersionState *shmem_version_state = NULL; + /* internal data structures implementation */ /* vertex entry for the vertex_hashtable */ @@ -51,7 +107,7 @@ typedef struct vertex_entry ListGraphId *edges_out; /* List of exiting edges graphids (int64) */ ListGraphId *edges_self; /* List of selfloop edges graphids (int64) */ Oid vertex_label_table_oid; /* the label table oid */ - Datum vertex_properties; /* datum property value */ + ItemPointerData tid; /* physical tuple location for lazy fetch */ } vertex_entry; /* edge entry for the edge_hashtable */ @@ -59,7 +115,7 @@ typedef struct edge_entry { graphid edge_id; /* edge id, it is also the hash key */ Oid edge_label_table_oid; /* the label table oid */ - Datum edge_properties; /* datum property value */ + ItemPointerData tid; /* physical tuple location for lazy fetch */ graphid start_vertex_id; /* start vertex */ graphid end_vertex_id; /* end vertex */ } edge_entry; @@ -75,9 +131,10 @@ typedef struct GRAPH_global_context Oid graph_oid; /* graph oid for searching */ HTAB *vertex_hashtable; /* hashtable to hold vertex edge lists */ HTAB *edge_hashtable; /* hashtable to hold edge to vertex map */ - TransactionId xmin; /* transaction ids for this graph */ - TransactionId xmax; - CommandId curcid; /* currentCommandId graph was created with */ + uint64 graph_version; /* version counter for cache invalidation */ + TransactionId xmin; /* snapshot fallback: transaction xmin */ + TransactionId xmax; /* snapshot fallback: transaction xmax */ + CommandId curcid; /* snapshot fallback: command id */ int64 num_loaded_vertices; /* number of loaded vertices in this graph */ int64 num_loaded_edges; /* number of loaded edges in this graph */ ListGraphId *vertices; /* vertices for vertex hashtable cleanup */ @@ -111,35 +168,59 @@ static void freeze_GRAPH_global_hashtables(GRAPH_global_context *ggctx); static List *get_ag_labels_names(Snapshot snapshot, Oid graph_oid, char label_type); static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, - Datum edge_properties, graphid start_vertex_id, + ItemPointerData tid, graphid start_vertex_id, graphid end_vertex_id, Oid edge_label_table_oid); static bool insert_vertex_edge(GRAPH_global_context *ggctx, graphid start_vertex_id, graphid end_vertex_id, graphid edge_id, char *edge_label_name); static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, Oid vertex_label_table_oid, - Datum vertex_properties); + ItemPointerData tid); /* definitions */ /* * Helper function to determine validity of the passed GRAPH_global_context. - * This is based off of the current active snapshot, to see if the graph could - * have been modified. Ideally, we should find a way to more accurately know - * whether the particular graph was modified. + * + * Uses graph-specific version counters (via DSM or shmem) when available. + * Falls back to snapshot-based invalidation when shared memory is not + * initialized (PG < 17 without shared_preload_libraries). + * + * The version counter approach only invalidates when the specific graph + * has been mutated (via Cypher operations or SQL triggers), avoiding false + * invalidation from unrelated transactions on the server. */ bool is_ggctx_invalid(GRAPH_global_context *ggctx) { - Snapshot snap = GetActiveSnapshot(); + /* use version counter if DSM or SHMEM mode is active */ + if (version_mode == VERSION_MODE_DSM || version_mode == VERSION_MODE_SHMEM) + { + uint64 current_version = get_graph_version(ggctx->graph_oid); + + /* + * If current_version is 0, no mutations have been tracked through + * the version counter system yet. Fall through to snapshot-based + * checking for safety — the graph may have been mutated via paths + * that don't increment the counter (e.g., before executor hooks + * are in place, or via direct SQL without triggers). + * + * Once current_version > 0, we know the counter is actively + * tracking this graph and can rely on it exclusively. + */ + if (current_version > 0) + { + return (ggctx->graph_version != current_version); + } + /* fall through to snapshot check */ + } - /* - * If the transaction ids (xmin or xmax) or currentCommandId (curcid) have - * changed, then we have a graph that was updated. This means that the - * global context for this graph is no longer valid. - */ - return (ggctx->xmin != snap->xmin || - ggctx->xmax != snap->xmax || - ggctx->curcid != snap->curcid); + /* SNAPSHOT fallback: original behavior — check snapshot ids */ + { + Snapshot snap = GetActiveSnapshot(); + return (ggctx->xmin != snap->xmin || + ggctx->xmax != snap->xmax || + ggctx->curcid != snap->curcid); + } } /* * Helper function to create the global vertex and edge hashtables. One @@ -332,7 +413,7 @@ static List *get_ag_labels_names(Snapshot snapshot, Oid graph_oid, * current GRAPH global edge hashtable. */ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, - Datum edge_properties, graphid start_vertex_id, + ItemPointerData tid, graphid start_vertex_id, graphid end_vertex_id, Oid edge_label_table_oid) { edge_entry *ee = NULL; @@ -378,12 +459,12 @@ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, * for hash function collisions. */ ee->edge_id = edge_id; - ee->edge_properties = edge_properties; + ee->tid = tid; ee->start_vertex_id = start_vertex_id; ee->end_vertex_id = end_vertex_id; ee->edge_label_table_oid = edge_label_table_oid; - /* we also need to store the edge id for clean up of edge property datums */ + /* we also need to store the edge id for cleanup */ ggctx->edges = append_graphid(ggctx->edges, edge_id); /* increment the number of loaded edges */ @@ -398,7 +479,7 @@ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, */ static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, Oid vertex_label_table_oid, - Datum vertex_properties) + ItemPointerData tid) { vertex_entry *ve = NULL; bool found = false; @@ -440,8 +521,8 @@ static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, ve->vertex_id = vertex_id; /* set the label table oid for this vertex */ ve->vertex_label_table_oid = vertex_label_table_oid; - /* set the datum vertex properties */ - ve->vertex_properties = vertex_properties; + /* set the TID for lazy property fetch */ + ve->tid = tid; /* set the NIL edge list */ ve->edges_in = NULL; ve->edges_out = NULL; @@ -590,7 +671,6 @@ static void load_vertex_hashtable(GRAPH_global_context *ggctx) while((tuple = heap_getnext(scan_desc, ForwardScanDirection)) != NULL) { graphid vertex_id; - Datum vertex_properties; bool inserted = false; /* something is wrong if this isn't true */ @@ -603,16 +683,11 @@ static void load_vertex_hashtable(GRAPH_global_context *ggctx) /* get the vertex id */ vertex_id = DatumGetInt64(column_get_datum(tupdesc, tuple, 0, "id", GRAPHIDOID, true)); - /* get the vertex properties datum */ - vertex_properties = column_get_datum(tupdesc, tuple, 1, - "properties", AGTYPEOID, true); - /* we need to make a copy of the properties datum */ - vertex_properties = datumCopy(vertex_properties, false, -1); - /* insert vertex into vertex hashtable */ + /* insert vertex into vertex hashtable with TID (no property copy) */ inserted = insert_vertex_entry(ggctx, vertex_id, vertex_label_table_oid, - vertex_properties); + tuple->t_self); /* warn if there is a duplicate */ if (!inserted) @@ -700,7 +775,6 @@ static void load_edge_hashtable(GRAPH_global_context *ggctx) graphid edge_id; graphid edge_vertex_start_id; graphid edge_vertex_end_id; - Datum edge_properties; bool inserted = false; /* something is wrong if this isn't true */ @@ -724,15 +798,9 @@ static void load_edge_hashtable(GRAPH_global_context *ggctx) 2, "end_id", GRAPHIDOID, true)); - /* get the edge properties datum */ - edge_properties = column_get_datum(tupdesc, tuple, 3, "properties", - AGTYPEOID, true); - - /* we need to make a copy of the properties datum */ - edge_properties = datumCopy(edge_properties, false, -1); - /* insert edge into edge hashtable */ - inserted = insert_edge_entry(ggctx, edge_id, edge_properties, + /* insert edge into edge hashtable with TID (no property copy) */ + inserted = insert_edge_entry(ggctx, edge_id, tuple->t_self, edge_vertex_start_id, edge_vertex_end_id, edge_label_table_oid); @@ -821,10 +889,6 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) return false; } - /* free the vertex's datumCopy properties */ - pfree_if_not_null(DatumGetPointer(value->vertex_properties)); - value->vertex_properties = 0; - /* free the edge list associated with this vertex */ free_ListGraphId(value->edges_in); free_ListGraphId(value->edges_out); @@ -838,12 +902,16 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) curr_vertex = next_vertex; } - /* free the edge properties, starting with the head */ + /* + * Verify edge entries exist in the hashtable. With thin entries, + * there are no property Datums to pfree — TIDs are stored inline. + * We still iterate to detect inconsistencies (missing edges). + * The hash_destroy below will handle actual memory cleanup. + */ curr_edge = peek_stack_head(ggctx->edges); while (curr_edge != NULL) { GraphIdNode *next_edge = NULL; - edge_entry *value = NULL; bool found = false; graphid edge_id; @@ -853,20 +921,16 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) /* get the current edge id */ edge_id = get_graphid(curr_edge); - /* retrieve the edge entry */ - value = (edge_entry *)hash_search(ggctx->edge_hashtable, - (void *)&edge_id, HASH_FIND, - &found); + /* verify the edge entry exists */ + hash_search(ggctx->edge_hashtable, (void *)&edge_id, + HASH_FIND, &found); + /* this is bad if it isn't found, but leave that to the caller */ if (found == false) { return false; } - /* free the edge's datumCopy properties */ - pfree_if_not_null(DatumGetPointer(value->edge_properties)); - value->edge_properties = 0; - /* move to the next edge */ curr_edge = next_edge; } @@ -1011,7 +1075,10 @@ GRAPH_global_context *manage_GRAPH_global_contexts(char *graph_name, new_ggctx->graph_name = pstrdup(graph_name); new_ggctx->graph_oid = graph_oid; - /* set the transaction ids */ + /* set the graph version counter for cache invalidation */ + new_ggctx->graph_version = get_graph_version(graph_oid); + + /* set snapshot fields for SNAPSHOT fallback mode */ new_ggctx->xmin = GetActiveSnapshot()->xmin; new_ggctx->xmax = GetActiveSnapshot()->xmax; new_ggctx->curcid = GetActiveSnapshot()->curcid; @@ -1261,9 +1328,55 @@ Oid get_vertex_entry_label_table_oid(vertex_entry *ve) return ve->vertex_label_table_oid; } +/* + * Fetch vertex properties on demand from the heap via stored TID. + * + * Returns a datumCopy of the properties in the current memory context. + * The caller does not need to free the result explicitly — it will be + * freed when the memory context is reset (typically the SRF multi-call + * context for VLE, which is cleaned up when the SRF completes). + * + * If the tuple is no longer visible (e.g., concurrent mutation between + * cache build and fetch), the version counter should have invalidated + * the cache. If we get here with a stale TID, it indicates a bug in + * the invalidation logic. + */ Datum get_vertex_entry_properties(vertex_entry *ve) { - return ve->vertex_properties; + Relation rel; + HeapTupleData tuple; + Buffer buffer; + Datum result = (Datum) 0; + + rel = table_open(ve->vertex_label_table_oid, AccessShareLock); + tuple.t_self = ve->tid; + + if (heap_fetch(rel, GetActiveSnapshot(), &tuple, &buffer, true)) + { + TupleDesc tupdesc = RelationGetDescr(rel); + bool isnull; + Datum props; + + /* properties is column 2 (1-indexed) */ + props = heap_getattr(&tuple, 2, tupdesc, &isnull); + if (!isnull) + result = datumCopy(props, false, -1); + + ReleaseBuffer(buffer); + } + + table_close(rel, AccessShareLock); + + /* + * If heap_fetch failed, the tuple is no longer visible. This should + * not happen under normal operation because the version counter + * invalidates the cache when the graph is mutated. + */ + if (result == (Datum) 0) + elog(ERROR, "get_vertex_entry_properties: stale TID - " + "vertex entry references a tuple that is no longer visible"); + + return result; } /* edge_entry accessor functions */ @@ -1277,9 +1390,41 @@ Oid get_edge_entry_label_table_oid(edge_entry *ee) return ee->edge_label_table_oid; } +/* + * Fetch edge properties on demand from the heap via stored TID. + * See get_vertex_entry_properties for memory and safety notes. + */ Datum get_edge_entry_properties(edge_entry *ee) { - return ee->edge_properties; + Relation rel; + HeapTupleData tuple; + Buffer buffer; + Datum result = (Datum) 0; + + rel = table_open(ee->edge_label_table_oid, AccessShareLock); + tuple.t_self = ee->tid; + + if (heap_fetch(rel, GetActiveSnapshot(), &tuple, &buffer, true)) + { + TupleDesc tupdesc = RelationGetDescr(rel); + bool isnull; + Datum props; + + /* properties is column 4 (1-indexed) */ + props = heap_getattr(&tuple, 4, tupdesc, &isnull); + if (!isnull) + result = datumCopy(props, false, -1); + + ReleaseBuffer(buffer); + } + + table_close(rel, AccessShareLock); + + if (result == (Datum) 0) + elog(ERROR, "get_edge_entry_properties: stale TID - " + "edge entry references a tuple that is no longer visible"); + + return result; } graphid get_edge_entry_start_vertex_id(edge_entry *ee) @@ -1531,3 +1676,291 @@ Datum age_graph_stats(PG_FUNCTION_ARGS) PG_RETURN_POINTER(agtype_value_to_agtype(result.res)); } + +/* + * ============================================================================ + * Graph Version Counter Implementation + * + * Provides per-graph monotonic version counters in shared memory for + * cross-backend VLE cache invalidation. Three modes are supported: + * + * DSM (PG 17+): Uses GetNamedDSMSegment — works without shared_preload_libs + * SHMEM (PG <17): Uses shmem_request/startup hooks — needs shared_preload_libs + * SNAPSHOT: Falls back to original snapshot-based invalidation + * ============================================================================ + */ + +#if PG_VERSION_NUM >= 170000 +/* + * DSM path: GetNamedDSMSegment init callback. + * Called once when the DSM segment is first created. + */ +static void age_dsm_init_callback(void *ptr) +{ + GraphVersionState *state = (GraphVersionState *) ptr; + + LWLockInitialize(&state->lock, + LWLockNewTrancheId()); + LWLockRegisterTranche(state->lock.tranche, "age_graph_version"); + state->num_entries = 0; + memset(state->entries, 0, sizeof(state->entries)); +} + +/* + * Get the shared GraphVersionState via DSM registry. + * The segment is created on first access and persists until server shutdown. + */ +static GraphVersionState *get_version_state_dsm(void) +{ + bool found; + + return (GraphVersionState *) + GetNamedDSMSegment("age_graph_versions", + sizeof(GraphVersionState), + age_dsm_init_callback, + &found); +} +#endif /* PG_VERSION_NUM >= 170000 */ + +/* + * SHMEM path: request and startup hooks for PG < 17. + * These are registered in _PG_init when shared_preload_libraries is used. + * On PG 17+, DSM is used instead and these functions are not called. + */ +#if PG_VERSION_NUM < 170000 +void age_graph_version_shmem_request(void) +{ + RequestAddinShmemSpace(MAXALIGN(sizeof(GraphVersionState))); +} + +void age_graph_version_shmem_startup(void) +{ + bool found; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + shmem_version_state = + (GraphVersionState *) ShmemInitStruct("AGE Graph Version State", + sizeof(GraphVersionState), + &found); + if (!found) + { + LWLockInitialize(&shmem_version_state->lock, + LWLockNewTrancheId()); + LWLockRegisterTranche(shmem_version_state->lock.tranche, + "age_graph_version"); + shmem_version_state->num_entries = 0; + memset(shmem_version_state->entries, 0, + sizeof(shmem_version_state->entries)); + } + + LWLockRelease(AddinShmemInitLock); +} +#endif /* PG_VERSION_NUM < 170000 */ + +/* + * Detect which version mode to use. Called once per backend on first access. + * Emits a DEBUG1 log message indicating the chosen mode. + */ +static void detect_version_mode(void) +{ +#if PG_VERSION_NUM >= 170000 + version_mode = VERSION_MODE_DSM; + elog(DEBUG1, "AGE: VLE cache using DSM version counter"); +#else + if (shmem_version_state != NULL) + { + version_mode = VERSION_MODE_SHMEM; + elog(DEBUG1, "AGE: VLE cache using SHMEM version counter"); + } + else + { + version_mode = VERSION_MODE_SNAPSHOT; + elog(DEBUG1, "AGE: VLE cache using snapshot-based invalidation " + "(add AGE to shared_preload_libraries for better caching)"); + } +#endif +} + +/* + * Get a pointer to the GraphVersionState, regardless of mode. + * Returns NULL only in SNAPSHOT mode (no shared memory available). + */ +static GraphVersionState *get_version_state(void) +{ + if (version_mode == VERSION_MODE_UNKNOWN) + detect_version_mode(); + +#if PG_VERSION_NUM >= 170000 + if (version_mode == VERSION_MODE_DSM) + return get_version_state_dsm(); +#endif + + if (version_mode == VERSION_MODE_SHMEM) + return shmem_version_state; + + return NULL; +} + +/* + * Get the current version counter for a graph. + * Returns 0 if the graph has never been tracked or if shared memory + * is not available. Lock-free read via pg_atomic_read_u64. + */ +uint64 get_graph_version(Oid graph_oid) +{ + GraphVersionState *state = get_version_state(); + int i; + + if (state == NULL) + return 0; + + /* lock-free scan of the array */ + for (i = 0; i < state->num_entries; i++) + { + if (state->entries[i].graph_oid == graph_oid) + return pg_atomic_read_u64(&state->entries[i].version); + } + + return 0; +} + +/* + * Increment the version counter for a graph. + * Called after any graph mutation (Cypher or SQL trigger). + * Lock-free for existing entries; acquires LWLock only to allocate new slots. + */ +void increment_graph_version(Oid graph_oid) +{ + GraphVersionState *state = get_version_state(); + int i; + + if (state == NULL) + return; + + /* try to find existing entry (lock-free) */ + for (i = 0; i < state->num_entries; i++) + { + if (state->entries[i].graph_oid == graph_oid) + { + pg_atomic_fetch_add_u64(&state->entries[i].version, 1); + return; + } + } + + /* new graph — need lock to allocate slot */ + LWLockAcquire(&state->lock, LW_EXCLUSIVE); + + /* re-check after acquiring lock (another backend may have added it) */ + for (i = 0; i < state->num_entries; i++) + { + if (state->entries[i].graph_oid == graph_oid) + { + LWLockRelease(&state->lock); + pg_atomic_fetch_add_u64(&state->entries[i].version, 1); + return; + } + } + + /* add new entry */ + if (state->num_entries < AGE_MAX_GRAPHS) + { + int idx = state->num_entries; + + state->entries[idx].graph_oid = graph_oid; + pg_atomic_init_u64(&state->entries[idx].version, 1); + + /* + * Write barrier ensures the entry fields are fully visible to + * other backends before num_entries is incremented. This prevents + * readers on weak memory-ordering architectures (e.g., ARM) from + * seeing the incremented count before the entry is initialized. + */ + pg_write_barrier(); + state->num_entries++; + } + else + { + elog(WARNING, "AGE: graph version counter table full (%d graphs)", + AGE_MAX_GRAPHS); + } + + LWLockRelease(&state->lock); +} + +/* + * Helper function to look up the graph OID for a given label table OID. + * Scans ag_label catalog for a matching relation column. + * Returns InvalidOid if the table is not a graph label table. + */ +Oid get_graph_oid_for_table(Oid table_oid) +{ + ScanKeyData scan_key; + Relation ag_label_rel; + TableScanDesc scan; + HeapTuple tuple; + Oid graph_oid = InvalidOid; + + ag_label_rel = table_open(ag_label_relation_id(), AccessShareLock); + + ScanKeyInit(&scan_key, Anum_ag_label_relation, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(table_oid)); + + scan = table_beginscan_catalog(ag_label_rel, 1, &scan_key); + + if ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + bool isnull; + Datum d = heap_getattr(tuple, Anum_ag_label_graph, + RelationGetDescr(ag_label_rel), &isnull); + if (!isnull) + graph_oid = DatumGetObjectId(d); + } + + table_endscan(scan); + table_close(ag_label_rel, AccessShareLock); + + return graph_oid; +} + +/* + * SQL-callable trigger function for VLE cache invalidation. + * Installed on graph label tables (AFTER INSERT/UPDATE/DELETE FOR EACH STATEMENT). + * Looks up which graph the triggering table belongs to and increments + * that graph's version counter. + */ +PG_FUNCTION_INFO_V1(age_invalidate_graph_cache); + +Datum age_invalidate_graph_cache(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata; + Oid table_oid; + Oid graph_oid; + + /* verify called as trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("age_invalidate_graph_cache: not called as trigger"))); + } + + trigdata = (TriggerData *) fcinfo->context; + table_oid = RelationGetRelid(trigdata->tg_relation); + + /* look up which graph this label table belongs to */ + graph_oid = get_graph_oid_for_table(table_oid); + + if (OidIsValid(graph_oid)) + { + increment_graph_version(graph_oid); + } + + /* + * Trigger protocol: return a null pointer without setting fcinfo->isnull. + * PG_RETURN_NULL() sets isnull=true, which violates the trigger protocol + * and causes "trigger function returned null value" errors during COPY. + */ + PG_RETURN_POINTER(NULL); +} diff --git a/src/backend/utils/adt/age_vle.c b/src/backend/utils/adt/age_vle.c index 9224ed612..733fe7179 100644 --- a/src/backend/utils/adt/age_vle.c +++ b/src/backend/utils/adt/age_vle.c @@ -364,54 +364,70 @@ static bool is_an_edge_match(VLE_local_context *vlelctx, edge_entry *ee) return false; } - /* get our edge's properties */ - edge_property = DATUM_GET_AGTYPE_P(get_edge_entry_properties(ee)); - /* get the containers */ - agtc_edge_property_constraint = &vlelctx->edge_property_constraint->root; - agtc_edge_property = &edge_property->root; - /* get the number of properties in the edge to be matched */ - num_edge_properties = AGTYPE_CONTAINER_SIZE(agtc_edge_property); - /* - * Check to see if the edge_properties object has AT LEAST as many pairs - * to compare as the edge_property_constraint object has pairs. If not, it - * can't possibly match. + * Fast path: if the label matched (or wasn't constrained) and there + * are no property constraints, the edge is a match. This avoids + * accessing edge properties entirely for label-only VLE patterns + * like [:KNOWS*1..3] which are the common case. */ - if (num_edge_property_constraints > num_edge_properties) + if (num_edge_property_constraints == 0) { - return false; + return true; } /* - * If the number of constraints are the same as the number of properties, - * then the datums would be the same if they match. + * Fetch edge properties once and cache locally. With thin entries, + * get_edge_entry_properties() does a heap_fetch, so we avoid calling + * it multiple times for the same edge. */ - if (num_edge_property_constraints == num_edge_properties) { - Datum edge_props = get_edge_entry_properties(ee); - uint32 edge_props_hash = datum_image_hash(edge_props, false, -1); + Datum edge_props_datum = get_edge_entry_properties(ee); + + edge_property = DATUM_GET_AGTYPE_P(edge_props_datum); + agtc_edge_property_constraint = &vlelctx->edge_property_constraint->root; + agtc_edge_property = &edge_property->root; + num_edge_properties = AGTYPE_CONTAINER_SIZE(agtc_edge_property); + + /* + * Check to see if the edge_properties object has AT LEAST as many + * pairs to compare as the edge_property_constraint object has pairs. + * If not, it can't possibly match. + */ + if (num_edge_property_constraints > num_edge_properties) + { + return false; + } - /* check the hash first */ - if (vlelctx->edge_property_constraint_hash == edge_props_hash) + /* + * If the number of constraints are the same as the number of + * properties, then the datums would be the same if they match. + */ + if (num_edge_property_constraints == num_edge_properties) { - /* if the hashes match, check the datum images */ - if (datum_image_eq(vlelctx->edge_property_constraint_datum, - edge_props, false, -1)) + uint32 edge_props_hash = datum_image_hash(edge_props_datum, + false, -1); + /* check the hash first */ + if (vlelctx->edge_property_constraint_hash == edge_props_hash) { - return true; + /* if the hashes match, check the datum images */ + if (datum_image_eq(vlelctx->edge_property_constraint_datum, + edge_props_datum, false, -1)) + { + return true; + } } - } - /* if we got here they aren't the same */ - return false; - } + /* if we got here they aren't the same */ + return false; + } - /* get the iterators */ - constraint_it = agtype_iterator_init(agtc_edge_property_constraint); - property_it = agtype_iterator_init(agtc_edge_property); + /* get the iterators */ + constraint_it = agtype_iterator_init(agtc_edge_property_constraint); + property_it = agtype_iterator_init(agtc_edge_property); - /* return the value of deep contains */ - return agtype_deep_contains(&property_it, &constraint_it, false); + /* return the value of deep contains */ + return agtype_deep_contains(&property_it, &constraint_it, false); + } } /* diff --git a/src/include/utils/age_global_graph.h b/src/include/utils/age_global_graph.h index 2b336a411..92044fc7e 100644 --- a/src/include/utils/age_global_graph.h +++ b/src/include/utils/age_global_graph.h @@ -59,4 +59,16 @@ Oid get_edge_entry_label_table_oid(edge_entry *ee); Datum get_edge_entry_properties(edge_entry *ee); graphid get_edge_entry_start_vertex_id(edge_entry *ee); graphid get_edge_entry_end_vertex_id(edge_entry *ee); + +/* Graph version counter functions — shared memory (DSM or shmem) */ +uint64 get_graph_version(Oid graph_oid); +void increment_graph_version(Oid graph_oid); +Oid get_graph_oid_for_table(Oid table_oid); + +/* Shared memory initialization for PG < 17 (shmem_request_hook path) */ +#if PG_VERSION_NUM < 170000 +void age_graph_version_shmem_request(void); +void age_graph_version_shmem_startup(void); +#endif + #endif