Skip to content

Commit a935c81

Browse files
bkietzjvanstratenwestonpace
committed
ARROW-15238: [C++] ARROW_ENGINE module with substrait consumer
Continuation of #11707. I'm taking over from @bkietz for now because he's unavailable right now for personal reasons. Closes #12279 from jvanstraten/substrait-consumer Lead-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Co-authored-by: Jeroen van Straten <jeroen.van.straten@gmail.com> Co-authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
1 parent f8689a1 commit a935c81

48 files changed

Lines changed: 4998 additions & 40 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ jobs:
9999
-e ARROW_GCS=OFF
100100
-e ARROW_MIMALLOC=OFF
101101
-e ARROW_ORC=OFF
102+
-e ARROW_ENGINE=OFF
102103
-e ARROW_PARQUET=OFF
103104
-e ARROW_S3=OFF
104105
-e CMAKE_UNITY_BUILD=ON

cpp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,9 @@ if(ARROW_CUDA
351351
endif()
352352

353353
if(ARROW_ENGINE)
354+
set(ARROW_PARQUET ON)
354355
set(ARROW_COMPUTE ON)
356+
set(ARROW_DATASET ON)
355357
endif()
356358

357359
if(ARROW_SKYHOOK)

cpp/cmake_modules/DefineOptions.cmake

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
225225

226226
define_option(ARROW_DATASET "Build the Arrow Dataset Modules" OFF)
227227

228-
define_option(ARROW_ENGINE "Build the Arrow Execution Engine" OFF)
228+
define_option(ARROW_ENGINE "Build the Arrow Query Engine Module" OFF)
229229

230230
define_option(ARROW_FILESYSTEM "Build the Arrow Filesystem Layer" OFF)
231231

@@ -478,6 +478,16 @@ advised that if this is enabled 'install' will fail silently on components;\
478478
that have not been built"
479479
OFF)
480480

481+
set(ARROW_SUBSTRAIT_REPO_DEFAULT "https://github.com/substrait-io/substrait")
482+
define_option_string(ARROW_SUBSTRAIT_REPO
483+
"Custom git repository URL for downloading Substrait sources.;\
484+
See also ARROW_SUBSTRAIT_TAG" "${ARROW_SUBSTRAIT_REPO_DEFAULT}")
485+
486+
set(ARROW_SUBSTRAIT_TAG_DEFAULT "e1b4c04a1b518912f4c4065b16a1b2c0ac8e14cf")
487+
define_option_string(ARROW_SUBSTRAIT_TAG
488+
"Custom git hash/tag/branch for Substrait repository.;\
489+
See also ARROW_SUBSTRAIT_REPO" "${ARROW_SUBSTRAIT_TAG_DEFAULT}")
490+
481491
option(ARROW_BUILD_CONFIG_SUMMARY_JSON "Summarize build configuration in a JSON file"
482492
ON)
483493
endif()
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# - Find Arrow Engine (arrow/engine/api.h, libarrow_engine.a, libarrow_engine.so)
19+
#
20+
# This module requires Arrow from which it uses
21+
# arrow_find_package()
22+
#
23+
# This module defines
24+
# ARROW_ENGINE_FOUND, whether Arrow Engine has been found
25+
# ARROW_ENGINE_IMPORT_LIB,
26+
# path to libarrow_engine's import library (Windows only)
27+
# ARROW_ENGINE_INCLUDE_DIR, directory containing headers
28+
# ARROW_ENGINE_LIB_DIR, directory containing Arrow Engine libraries
29+
# ARROW_ENGINE_SHARED_LIB, path to libarrow_engine's shared library
30+
# ARROW_ENGINE_STATIC_LIB, path to libarrow_engine.a
31+
32+
if(DEFINED ARROW_ENGINE_FOUND)
33+
return()
34+
endif()
35+
36+
set(find_package_arguments)
37+
if(${CMAKE_FIND_PACKAGE_NAME}_FIND_VERSION)
38+
list(APPEND find_package_arguments "${${CMAKE_FIND_PACKAGE_NAME}_FIND_VERSION}")
39+
endif()
40+
if(${CMAKE_FIND_PACKAGE_NAME}_FIND_REQUIRED)
41+
list(APPEND find_package_arguments REQUIRED)
42+
endif()
43+
if(${CMAKE_FIND_PACKAGE_NAME}_FIND_QUIETLY)
44+
list(APPEND find_package_arguments QUIET)
45+
endif()
46+
find_package(Arrow ${find_package_arguments})
47+
find_package(Parquet ${find_package_arguments})
48+
49+
if(ARROW_FOUND AND PARQUET_FOUND)
50+
arrow_find_package(ARROW_ENGINE
51+
"${ARROW_HOME}"
52+
arrow_engine
53+
arrow/engine/api.h
54+
ArrowEngine
55+
arrow-engine)
56+
if(NOT ARROW_ENGINE_VERSION)
57+
set(ARROW_ENGINE_VERSION "${ARROW_VERSION}")
58+
endif()
59+
endif()
60+
61+
if("${ARROW_ENGINE_VERSION}" VERSION_EQUAL "${ARROW_VERSION}")
62+
set(ARROW_ENGINE_VERSION_MATCH TRUE)
63+
else()
64+
set(ARROW_ENGINE_VERSION_MATCH FALSE)
65+
endif()
66+
67+
mark_as_advanced(ARROW_ENGINE_IMPORT_LIB
68+
ARROW_ENGINE_INCLUDE_DIR
69+
ARROW_ENGINE_LIBS
70+
ARROW_ENGINE_LIB_DIR
71+
ARROW_ENGINE_SHARED_IMP_LIB
72+
ARROW_ENGINE_SHARED_LIB
73+
ARROW_ENGINE_STATIC_LIB
74+
ARROW_ENGINE_VERSION
75+
ARROW_ENGINE_VERSION_MATCH)
76+
77+
find_package_handle_standard_args(
78+
ArrowEngine
79+
REQUIRED_VARS ARROW_ENGINE_INCLUDE_DIR ARROW_ENGINE_LIB_DIR ARROW_ENGINE_VERSION_MATCH
80+
VERSION_VAR ARROW_ENGINE_VERSION)
81+
set(ARROW_ENGINE_FOUND ${ArrowEngine_FOUND})
82+
83+
if(ArrowEngine_FOUND AND NOT ArrowEngine_FIND_QUIETLY)
84+
message(STATUS "Found the Arrow Engine by ${ARROW_ENGINE_FIND_APPROACH}")
85+
message(STATUS "Found the Arrow Engine shared library: ${ARROW_ENGINE_SHARED_LIB}")
86+
message(STATUS "Found the Arrow Engine import library: ${ARROW_ENGINE_IMPORT_LIB}")
87+
message(STATUS "Found the Arrow Engine static library: ${ARROW_ENGINE_STATIC_LIB}")
88+
endif()

cpp/cmake_modules/ThirdpartyToolchain.cmake

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ endif()
309309

310310
if(ARROW_ORC
311311
OR ARROW_FLIGHT
312-
OR ARROW_GANDIVA)
312+
OR ARROW_GANDIVA
313+
OR ARROW_ENGINE)
313314
set(ARROW_WITH_PROTOBUF ON)
314315
endif()
315316

@@ -1427,6 +1428,11 @@ macro(build_protobuf)
14271428
set(PROTOBUF_VENDORED TRUE)
14281429
set(PROTOBUF_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/protobuf_ep-install")
14291430
set(PROTOBUF_INCLUDE_DIR "${PROTOBUF_PREFIX}/include")
1431+
# This flag is based on what the user initially requested but if
1432+
# we've fallen back to building protobuf we always build it statically
1433+
# so we need to reset the flag so that we can link against it correctly
1434+
# later.
1435+
set(Protobuf_USE_STATIC_LIBS ON)
14301436
# Newer protobuf releases always have a lib prefix independent from CMAKE_STATIC_LIBRARY_PREFIX
14311437
set(PROTOBUF_STATIC_LIB
14321438
"${PROTOBUF_PREFIX}/lib/libprotobuf${CMAKE_STATIC_LIBRARY_SUFFIX}")
@@ -1533,7 +1539,7 @@ if(ARROW_WITH_PROTOBUF)
15331539
PC_PACKAGE_NAMES
15341540
protobuf)
15351541

1536-
if(ARROW_PROTOBUF_USE_SHARED AND MSVC_TOOLCHAIN)
1542+
if(NOT Protobuf_USE_STATIC_LIBS AND MSVC_TOOLCHAIN)
15371543
add_definitions(-DPROTOBUF_USE_DLLS)
15381544
endif()
15391545

cpp/examples/arrow/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ if(ARROW_COMPUTE)
2121
add_arrow_example(compute_register_example)
2222
endif()
2323

24+
if(ARROW_ENGINE)
25+
add_arrow_example(engine_substrait_consumption EXTRA_LINK_LIBS arrow_engine_shared)
26+
endif()
27+
2428
if(ARROW_COMPUTE AND ARROW_CSV)
2529
add_arrow_example(compute_and_write_csv_example)
2630
endif()
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <arrow/api.h>
19+
#include <arrow/compute/api.h>
20+
#include <arrow/engine/api.h>
21+
22+
#include <cstdlib>
23+
#include <iostream>
24+
#include <memory>
25+
#include <vector>
26+
27+
namespace eng = arrow::engine;
28+
namespace cp = arrow::compute;
29+
30+
#define ABORT_ON_FAILURE(expr) \
31+
do { \
32+
arrow::Status status_ = (expr); \
33+
if (!status_.ok()) { \
34+
std::cerr << status_.message() << std::endl; \
35+
abort(); \
36+
} \
37+
} while (0);
38+
39+
class IgnoringConsumer : public cp::SinkNodeConsumer {
40+
public:
41+
explicit IgnoringConsumer(size_t tag) : tag_{tag} {}
42+
43+
arrow::Status Consume(cp::ExecBatch batch) override {
44+
// Consume a batch of data
45+
// (just print its row count to stdout)
46+
std::cout << "-" << tag_ << " consumed " << batch.length << " rows" << std::endl;
47+
return arrow::Status::OK();
48+
}
49+
50+
arrow::Future<> Finish() override {
51+
// Signal to the consumer that the last batch has been delivered
52+
// (we don't do any real work in this consumer so mark it finished immediately)
53+
//
54+
// The returned future should only finish when all outstanding tasks have completed
55+
// (after this method is called Consume is guaranteed not to be called again)
56+
std::cout << "-" << tag_ << " finished" << std::endl;
57+
return arrow::Future<>::MakeFinished();
58+
}
59+
60+
private:
61+
// A unique label for instances to help distinguish logging output if a plan has
62+
// multiple sinks
63+
//
64+
// In this example, this is set to the zero-based index of the relation tree in the plan
65+
size_t tag_;
66+
};
67+
68+
arrow::Future<std::shared_ptr<arrow::Buffer>> GetSubstraitFromServer(
69+
const std::string& filename) {
70+
// Emulate server interaction by parsing hard coded JSON
71+
std::string substrait_json = R"({
72+
"relations": [
73+
{"rel": {
74+
"read": {
75+
"base_schema": {
76+
"struct": {
77+
"types": [ {"i64": {}}, {"bool": {}} ]
78+
},
79+
"names": ["i", "b"]
80+
},
81+
"local_files": {
82+
"items": [
83+
{
84+
"uri_file": "file://FILENAME_PLACEHOLDER",
85+
"format": "FILE_FORMAT_PARQUET"
86+
}
87+
]
88+
}
89+
}
90+
}}
91+
],
92+
"extension_uris": [
93+
{
94+
"extension_uri_anchor": 7,
95+
"uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
96+
}
97+
],
98+
"extensions": [
99+
{"extension_type": {
100+
"extension_uri_reference": 7,
101+
"type_anchor": 42,
102+
"name": "null"
103+
}},
104+
{"extension_type_variation": {
105+
"extension_uri_reference": 7,
106+
"type_variation_anchor": 23,
107+
"name": "u8"
108+
}},
109+
{"extension_function": {
110+
"extension_uri_reference": 7,
111+
"function_anchor": 42,
112+
"name": "add"
113+
}}
114+
]
115+
})";
116+
std::string filename_placeholder = "FILENAME_PLACEHOLDER";
117+
substrait_json.replace(substrait_json.find(filename_placeholder),
118+
filename_placeholder.size(), filename);
119+
return eng::internal::SubstraitFromJSON("Plan", substrait_json);
120+
}
121+
122+
int main(int argc, char** argv) {
123+
if (argc < 2) {
124+
std::cout << "Please specify a parquet file to scan" << std::endl;
125+
// Fake pass for CI
126+
return EXIT_SUCCESS;
127+
}
128+
129+
// Plans arrive at the consumer serialized in a Buffer, using the binary protobuf
130+
// serialization of a substrait Plan
131+
auto maybe_serialized_plan = GetSubstraitFromServer(argv[1]).result();
132+
ABORT_ON_FAILURE(maybe_serialized_plan.status());
133+
std::shared_ptr<arrow::Buffer> serialized_plan =
134+
std::move(maybe_serialized_plan).ValueOrDie();
135+
136+
// Print the received plan to stdout as JSON
137+
arrow::Result<std::string> maybe_plan_json =
138+
eng::internal::SubstraitToJSON("Plan", *serialized_plan);
139+
ABORT_ON_FAILURE(maybe_plan_json.status());
140+
std::cout << std::string(50, '#') << " received substrait::Plan:" << std::endl;
141+
std::cout << maybe_plan_json.ValueOrDie() << std::endl;
142+
143+
// The data sink(s) for plans is/are implicit in substrait plans, but explicit in
144+
// Arrow. Therefore, deserializing a plan requires a factory for consumers: each
145+
// time the root of a substrait relation tree is deserialized, an Arrow consumer is
146+
// constructed into which its batches will be piped.
147+
std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
148+
std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
149+
// All batches produced by the plan will be fed into IgnoringConsumers:
150+
auto tag = consumers.size();
151+
consumers.emplace_back(new IgnoringConsumer{tag});
152+
return consumers.back();
153+
};
154+
155+
// Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
156+
arrow::Result<std::vector<cp::Declaration>> maybe_decls =
157+
eng::DeserializePlan(*serialized_plan, consumer_factory);
158+
ABORT_ON_FAILURE(maybe_decls.status());
159+
std::vector<cp::Declaration> decls = std::move(maybe_decls).ValueOrDie();
160+
161+
// It's safe to drop the serialized plan; we don't leave references to its memory
162+
serialized_plan.reset();
163+
164+
// Construct an empty plan (note: configure Function registry and ThreadPool here)
165+
arrow::Result<std::shared_ptr<cp::ExecPlan>> maybe_plan = cp::ExecPlan::Make();
166+
ABORT_ON_FAILURE(maybe_plan.status());
167+
std::shared_ptr<cp::ExecPlan> plan = std::move(maybe_plan).ValueOrDie();
168+
169+
// Add decls to plan (note: configure ExecNode registry before this point)
170+
for (const cp::Declaration& decl : decls) {
171+
ABORT_ON_FAILURE(decl.AddToPlan(plan.get()).status());
172+
}
173+
174+
// Validate the plan and print it to stdout
175+
ABORT_ON_FAILURE(plan->Validate());
176+
std::cout << std::string(50, '#') << " produced arrow::ExecPlan:" << std::endl;
177+
std::cout << plan->ToString() << std::endl;
178+
179+
// Start the plan...
180+
std::cout << std::string(50, '#') << " consuming batches:" << std::endl;
181+
ABORT_ON_FAILURE(plan->StartProducing());
182+
183+
// ... and wait for it to finish
184+
ABORT_ON_FAILURE(plan->finished().status());
185+
return EXIT_SUCCESS;
186+
}

cpp/src/arrow/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,10 @@ if(ARROW_COMPUTE)
725725
add_subdirectory(compute)
726726
endif()
727727

728+
if(ARROW_ENGINE)
729+
add_subdirectory(engine)
730+
endif()
731+
728732
if(ARROW_CUDA)
729733
add_subdirectory(gpu)
730734
endif()

cpp/src/arrow/array/array_base.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ std::string Array::ToString() const {
282282
return ss.str();
283283
}
284284

285+
void PrintTo(const Array& x, std::ostream* os) { *os << x.ToString(); }
286+
285287
Result<std::shared_ptr<Array>> Array::View(
286288
const std::shared_ptr<DataType>& out_type) const {
287289
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> result,

0 commit comments

Comments
 (0)