Skip to content

Commit 10c6839

Browse files
committed
Implement Reallocate function
1 parent a17b313 commit 10c6839

10 files changed

Lines changed: 77 additions & 33 deletions

cpp/src/arrow/buffer.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,11 @@ Status PoolBuffer::Reserve(int64_t new_capacity) {
8080
uint8_t* new_data;
8181
new_capacity = BitUtil::RoundUpToMultipleOf64(new_capacity);
8282
if (mutable_data_) {
83-
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
84-
memcpy(new_data, mutable_data_, size_);
85-
pool_->Free(mutable_data_, capacity_);
83+
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &mutable_data_));
8684
} else {
8785
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
86+
mutable_data_ = new_data;
8887
}
89-
mutable_data_ = new_data;
9088
data_ = mutable_data_;
9189
capacity_ = new_capacity;
9290
}

cpp/src/arrow/builder-benchmark.cc

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,37 +25,42 @@ namespace arrow {
2525

2626
constexpr int64_t kFinalSize = 256;
2727

28-
static void BM_BuildPrimitiveArrayNoNulls(benchmark::State& state) { // NOLINT non-const reference
29-
// 1 MiB block
28+
static void BM_BuildPrimitiveArrayNoNulls(
29+
benchmark::State& state) { // NOLINT non-const reference
30+
// 2 MiB block
3031
std::vector<int64_t> data(256 * 1024, 100);
3132
while (state.KeepRunning()) {
3233
Int64Builder builder(default_memory_pool(), arrow::int64());
3334
for (int i = 0; i < kFinalSize; i++) {
34-
// Build up an array of 256 MiB in size
35+
// Build up an array of 512 MiB in size
3536
builder.Append(data.data(), data.size(), nullptr);
3637
}
3738
std::shared_ptr<Array> out;
3839
builder.Finish(&out);
3940
}
40-
state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
41+
state.SetBytesProcessed(
42+
state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
4143
}
4244

43-
BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);;
45+
BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
46+
;
4447

45-
static void BM_BuildVectorNoNulls(benchmark::State& state) { // NOLINT non-const reference
46-
// 1 MiB block
48+
static void BM_BuildVectorNoNulls(
49+
benchmark::State& state) { // NOLINT non-const reference
50+
// 2 MiB block
4751
std::vector<int64_t> data(256 * 1024, 100);
4852
while (state.KeepRunning()) {
4953
std::vector<int64_t> builder;
5054
for (int i = 0; i < kFinalSize; i++) {
51-
// Build up an array of 256 MiB in size
55+
// Build up an array of 512 MiB in size
5256
builder.insert(builder.end(), data.cbegin(), data.cend());
5357
}
5458
}
55-
state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
59+
state.SetBytesProcessed(
60+
state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
5661
}
5762

58-
BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);;
63+
BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
64+
;
5965

6066
} // namespace arrow
61-

cpp/src/arrow/io/interfaces.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ Status ReadableFileInterface::ReadAt(
4545
}
4646

4747
Status Writeable::Write(const std::string& data) {
48-
return Write(reinterpret_cast<const uint8_t*>(data.c_str()),
49-
static_cast<int64_t>(data.size()));
48+
return Write(
49+
reinterpret_cast<const uint8_t*>(data.c_str()), static_cast<int64_t>(data.size()));
5050
}
5151

5252
} // namespace io

cpp/src/arrow/io/io-file-test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,9 @@ class MyMemoryPool : public MemoryPool {
291291
}
292292

293293
void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); }
294+
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
295+
*ptr = reinterpret_cast<uint8_t*>(std::realloc(*ptr, new_size));
296+
}
294297

295298
int64_t bytes_allocated() const override { return -1; }
296299

cpp/src/arrow/jemalloc/jemalloc-builder-benchmark.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,24 @@ namespace arrow {
2525

2626
constexpr int64_t kFinalSize = 256;
2727

28-
static void BM_BuildPrimitiveArrayNoNulls(benchmark::State& state) { // NOLINT non-const reference
29-
// 1 MiB block
28+
static void BM_BuildPrimitiveArrayNoNulls(
29+
benchmark::State& state) { // NOLINT non-const reference
30+
// 2 MiB block
3031
std::vector<int64_t> data(256 * 1024, 100);
3132
while (state.KeepRunning()) {
3233
Int64Builder builder(jemalloc::MemoryPool::default_pool(), arrow::int64());
3334
for (int i = 0; i < kFinalSize; i++) {
34-
// Build up an array of 256 MiB in size
35+
// Build up an array of 512 MiB in size
3536
builder.Append(data.data(), data.size(), nullptr);
3637
}
3738
std::shared_ptr<Array> out;
3839
builder.Finish(&out);
3940
}
40-
state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
41+
state.SetBytesProcessed(
42+
state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
4143
}
4244

43-
BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);;
45+
BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
46+
;
4447

4548
} // namespace arrow
46-

cpp/src/arrow/jemalloc/jemalloc-memory_pool-test.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,3 @@ TEST(JemallocMemoryPool, OOM) {
5151
} // namespace test
5252
} // namespace jemalloc
5353
} // namespace arrow
54-

cpp/src/arrow/jemalloc/memory_pool.cc

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,32 @@ MemoryPool* MemoryPool::default_pool() {
3535

3636
MemoryPool::MemoryPool() : allocated_size_(0) {}
3737

38-
MemoryPool::~MemoryPool() {};
39-
38+
MemoryPool::~MemoryPool(){};
39+
4040
Status MemoryPool::Allocate(int64_t size, uint8_t** out) {
4141
*out = reinterpret_cast<uint8_t*>(mallocx(size, MALLOCX_ALIGN(kAlignment)));
4242
if (*out == NULL) {
4343
std::stringstream ss;
4444
ss << "malloc of size " << size << " failed";
4545
return Status::OutOfMemory(ss.str());
46-
}
46+
}
4747
allocated_size_ += size;
4848
return Status::OK();
4949
}
5050

51+
Status MemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
52+
*ptr = reinterpret_cast<uint8_t*>(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment)));
53+
if (*ptr == NULL) {
54+
std::stringstream ss;
55+
ss << "realloc of size " << new_size << " failed";
56+
return Status::OutOfMemory(ss.str());
57+
}
58+
59+
allocated_size_ += new_size - old_size;
60+
61+
return Status::OK();
62+
}
63+
5164
void MemoryPool::Free(uint8_t* buffer, int64_t size) {
5265
allocated_size_ -= size;
5366
free(buffer);
@@ -57,5 +70,5 @@ int64_t MemoryPool::bytes_allocated() const {
5770
return allocated_size_.load();
5871
}
5972

60-
} // jemalloc
61-
} // arrow
73+
} // jemalloc
74+
} // arrow

cpp/src/arrow/jemalloc/memory_pool.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#ifndef ARROW_JEMALLOC_MEMORY_POOL_H
2121
#define ARROW_JEMALLOC_MEMORY_POOL_H
2222

23-
#include "arrow/memory_pool.h"
23+
#include "arrow/memory_pool.h"
2424

2525
#include <atomic>
2626

@@ -32,14 +32,15 @@ namespace jemalloc {
3232

3333
class ARROW_EXPORT MemoryPool : public ::arrow::MemoryPool {
3434
public:
35-
static MemoryPool* default_pool();
35+
static MemoryPool* default_pool();
3636

3737
MemoryPool(MemoryPool const&) = delete;
38-
MemoryPool& operator=(MemoryPool const&) = delete;
38+
MemoryPool& operator=(MemoryPool const&) = delete;
3939

4040
virtual ~MemoryPool();
4141

4242
Status Allocate(int64_t size, uint8_t** out) override;
43+
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
4344
void Free(uint8_t* buffer, int64_t size) override;
4445

4546
int64_t bytes_allocated() const override;
@@ -50,7 +51,6 @@ class ARROW_EXPORT MemoryPool : public ::arrow::MemoryPool {
5051
std::atomic<int64_t> allocated_size_;
5152
};
5253

53-
5454
} // namespace jemalloc
5555
} // namespace arrow
5656

cpp/src/arrow/memory_pool.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class InternalMemoryPool : public MemoryPool {
6767
virtual ~InternalMemoryPool();
6868

6969
Status Allocate(int64_t size, uint8_t** out) override;
70+
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
7071

7172
void Free(uint8_t* buffer, int64_t size) override;
7273

@@ -85,6 +86,28 @@ Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) {
8586
return Status::OK();
8687
}
8788

89+
Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
90+
std::lock_guard<std::mutex> guard(pool_lock_);
91+
92+
// Note: We cannot use realloc() here as it doesn't guarantee alignment.
93+
94+
// Allocate new chunk
95+
uint8_t* out;
96+
RETURN_NOT_OK(AllocateAligned(new_size, &out));
97+
// Copy contents and release old memory chunk
98+
memcpy(out, *ptr, std::min(new_size, old_size));
99+
#ifdef _MSC_VER
100+
_aligned_free(*ptr);
101+
#else
102+
std::free(*ptr);
103+
#endif
104+
*ptr = out;
105+
106+
bytes_allocated_ += new_size - old_size;
107+
108+
return Status::OK();
109+
}
110+
88111
int64_t InternalMemoryPool::bytes_allocated() const {
89112
std::lock_guard<std::mutex> guard(pool_lock_);
90113
return bytes_allocated_;

cpp/src/arrow/memory_pool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class ARROW_EXPORT MemoryPool {
3131
virtual ~MemoryPool();
3232

3333
virtual Status Allocate(int64_t size, uint8_t** out) = 0;
34+
virtual Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) = 0;
3435
virtual void Free(uint8_t* buffer, int64_t size) = 0;
3536

3637
virtual int64_t bytes_allocated() const = 0;

0 commit comments

Comments
 (0)