Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ avro_decode_row(schemadesc_s const* schema,
if (dataptr != nullptr && dst_row >= 0) {
uint32_t v;
if (cur + 3 < end) {
v = unaligned_load<uint32_t>(cur);
v = (cur + 7 < end) ? cudf::io::unaligned_load_unsafe<uint32_t>(cur)
: cudf::io::unaligned_load<uint32_t>(cur);
cur += 4;
} else {
v = 0;
Expand All @@ -198,7 +199,8 @@ avro_decode_row(schemadesc_s const* schema,
if (dataptr != nullptr && dst_row >= 0) {
uint64_t v;
if (cur + 7 < end) {
v = unaligned_load<uint64_t>(cur);
v = (cur + 11 < end) ? cudf::io::unaligned_load_unsafe<uint64_t>(cur)
: cudf::io::unaligned_load<uint64_t>(cur);
cur += 8;
} else {
v = 0;
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/io/comp/snap.cu
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,12 @@ static __device__ uint32_t FindFourByteMatch(snap_state_s* s,
uint32_t match_mask, literal_cnt;
if (t == 0) { s->copy_length = 0; }
do {
bool valid4 = (pos + t + 4 <= len);
uint32_t data32 = (valid4) ? cudf::io::unaligned_load<uint32_t>(src + pos + t) : 0;
bool const valid4 = (pos + t + 4 <= len);
bool const valid8 = (pos + t + 8 <= len);
uint32_t const data32 = (valid4) ? (valid8)
? cudf::io::unaligned_load_unsafe<uint32_t>(src + pos + t)
: cudf::io::unaligned_load<uint32_t>(src + pos + t)
: 0;
uint32_t hash = (valid4) ? snap_hash(data32) : 0;
uint32_t local_match = HashMatchAny(hash, t);
uint32_t local_match_lane = 31 - __clz(local_match & ((1 << t) - 1));
Expand Down
29 changes: 24 additions & 5 deletions cpp/src/io/parquet/experimental/dictionary_page_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,25 @@ __device__ __forceinline__ int64_t convert_to_timestamp64(int64_t const value,
return value * timestamp_scale;
}

/**
* @brief Helper function to unalign load a value from a page data buffer
*
* @param page_data Pointer to the page data buffer
* @param value_idx Index of the value to load
* @param page_size Size of the page data buffer
* @return Loaded value
*/
template <typename T>
__device__ __forceinline__ T unaligned_load(uint8_t const* page_data,
int32_t value_idx,
int32_t page_size)
{
auto const offset = value_idx * sizeof(T);
return (offset + sizeof(T) + sizeof(uint32_t) <= page_size)
? cudf::io::unaligned_load_unsafe<T>(page_data + (value_idx * sizeof(T)))
: cudf::io::unaligned_load<T>(page_data + (value_idx * sizeof(T)));
}

/**
* @brief Query `cuco::static_set`s to evaluate (many) input (in)equality predicates
*
Expand Down Expand Up @@ -489,7 +508,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle timestamps
if constexpr (cudf::is_timestamp<T>()) {
auto const timestamp =
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t)));
unaligned_load<uint32_t>(page_data, value_idx, page.uncompressed_page_size);
if (timestamp_scale != 0) {
decoded_value = T{typename T::duration(static_cast<typename T::rep>(timestamp))};
} else {
Expand All @@ -513,7 +532,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
}

auto const duration =
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t)));
unaligned_load<uint32_t>(page_data, value_idx, page.uncompressed_page_size);
decoded_value = T{static_cast<typename T::rep>(duration)};
}
// Handle other int32 encoded values including smaller bitwidths and decimal32
Expand All @@ -524,7 +543,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
return {};
}
decoded_value = static_cast<T>(
cudf::io::unaligned_load<uint32_t>(page_data + (value_idx * sizeof(uint32_t))));
unaligned_load<uint32_t>(page_data, value_idx, page.uncompressed_page_size));
}
break;
}
Expand All @@ -538,7 +557,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle timestamps
if constexpr (cudf::is_timestamp<T>()) {
int64_t const timestamp =
cudf::io::unaligned_load<uint64_t>(page_data + (value_idx * sizeof(int64_t)));
unaligned_load<uint64_t>(page_data, value_idx, page.uncompressed_page_size);
if (timestamp_scale != 0) {
decoded_value = T{typename T::duration(
static_cast<typename T::rep>(convert_to_timestamp64(timestamp, timestamp_scale)))};
Expand All @@ -549,7 +568,7 @@ __device__ T decode_fixed_width_value(PageInfo const& page,
// Handle durations and other int64 encoded values including decimal64
else {
decoded_value = static_cast<T>(
cudf::io::unaligned_load<uint64_t>(page_data + (value_idx * sizeof(uint64_t))));
unaligned_load<uint64_t>(page_data, value_idx, page.uncompressed_page_size));
}
break;
}
Expand Down
80 changes: 26 additions & 54 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ inline __device__ void gpuStoreOutput(uint32_t* dst,
uint32_t dict_pos,
uint32_t dict_size)
{
if (dict_pos < dict_size) {
*dst = cudf::io::unaligned_load<uint32_t>(src8 + dict_pos);
if (dict_pos + sizeof(uint32_t) <= dict_size) {
*dst = (dict_pos + 2 * sizeof(uint32_t) <= dict_size)
? cudf::io::unaligned_load_unsafe<uint32_t>(src8 + dict_pos)
: cudf::io::unaligned_load<uint32_t>(src8 + dict_pos);
} else {
*dst = 0;
}
Expand All @@ -84,22 +86,15 @@ inline __device__ void gpuStoreOutput(uint2* dst,
uint32_t dict_pos,
uint32_t dict_size)
{
uint2 v;
unsigned int ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits
if (dict_pos < dict_size) {
v.x = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 4);
if (ofs) {
uint32_t next = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 8);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, next, ofs);
}
} else {
v.x = v.y = 0;
uint2 value{.x = 0, .y = 0};
if (dict_pos + sizeof(uint64_t) <= dict_size) {
auto const bytebuf = (dict_pos + sizeof(uint64_t) + sizeof(uint32_t) <= dict_size)
? cudf::io::unaligned_load_unsafe<uint64_t>(src8 + dict_pos)
: cudf::io::unaligned_load<uint64_t>(src8 + dict_pos);
value.x = reinterpret_cast<uint32_t const*>(&bytebuf)[0];
value.y = reinterpret_cast<uint32_t const*>(&bytebuf)[1];
}
*dst = v;
*dst = value;
}

/**
Expand All @@ -119,7 +114,7 @@ inline __device__ void read_int96_timestamp(page_state_s* s,
using cuda::std::chrono::duration_cast;

uint8_t const* src8;
uint32_t dict_pos, dict_size = s->dict_size, ofs;
uint32_t dict_pos, dict_size = s->dict_size;

if (s->dict_base) {
// Dictionary
Expand All @@ -132,31 +127,20 @@ inline __device__ void read_int96_timestamp(page_state_s* s,
src8 = s->data_start;
}
dict_pos *= (uint32_t)s->dtype_len_in;
ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits

if (dict_pos + 4 >= dict_size) {
*dst = 0;
return;
}

uint3 v;
int64_t nanos, days;
v.x = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 4);
v.z = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 8);
if (ofs) {
uint32_t next = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 12);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, v.z, ofs);
v.z = __funnelshift_r(v.z, next, ofs);
}
nanos = v.y;
nanos <<= 32;
uint3 const v = (dict_pos + sizeof(uint3) + sizeof(uint32_t) <= dict_size)
? cudf::io::unaligned_load_unsafe<uint3>(src8 + dict_pos)
: cudf::io::unaligned_load<uint3>(src8 + dict_pos);
int64_t nanos = v.y;
nanos <<= cudf::detail::size_in_bits<int32_t>();
nanos |= v.x;
// Convert from Julian day at noon to UTC seconds
days = static_cast<int32_t>(v.z);
int64_t const days = static_cast<int32_t>(v.z);
cudf::duration_D d_d{
days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow

Expand Down Expand Up @@ -192,7 +176,7 @@ inline __device__ void read_int64_timestamp(page_state_s* s,
int64_t* dst)
{
uint8_t const* src8;
uint32_t dict_pos, dict_size = s->dict_size, ofs;
uint32_t dict_pos, dict_size = s->dict_size;
int64_t ts;

if (s->dict_base) {
Expand All @@ -206,25 +190,13 @@ inline __device__ void read_int64_timestamp(page_state_s* s,
src8 = s->data_start;
}
dict_pos *= (uint32_t)s->dtype_len_in;
ofs = 3 & reinterpret_cast<size_t>(src8);
src8 -= ofs; // align to 32-bit boundary
ofs <<= 3; // bytes -> bits
if (dict_pos + 4 < dict_size) {
uint2 v;
int64_t val;
int32_t ts_scale;
v.x = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 0);
v.y = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 4);
if (ofs) {
uint32_t next = *reinterpret_cast<uint32_t const*>(src8 + dict_pos + 8);
v.x = __funnelshift_r(v.x, v.y, ofs);
v.y = __funnelshift_r(v.y, next, ofs);
}
val = v.y;
val <<= 32;
val |= v.x;

if (dict_pos + sizeof(int64_t) <= dict_size) {
int64_t const val = (dict_pos + sizeof(int64_t) + sizeof(uint32_t) <= dict_size)
? cudf::io::unaligned_load_unsafe<uint64_t>(src8 + dict_pos)
: cudf::io::unaligned_load<uint64_t>(src8 + dict_pos);
// Output to desired clock rate
ts_scale = s->ts_scale;
int32_t ts_scale = s->ts_scale;
if (ts_scale < 0) {
// round towards negative infinity
int sign = (val < 0);
Expand Down
37 changes: 36 additions & 1 deletion cpp/src/io/utilities/block_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,42 @@ inline __device__ T warp_reduce_pos(T pos, uint32_t t)
}

template <typename T>
requires(cuda::std::is_integral_v<T>)
requires(cuda::std::is_same_v<T, uint32_t> or cuda::std::is_same_v<T, uint64_t> or
cuda::std::is_same_v<T, uint3>)
inline __device__ T unaligned_load_unsafe(uint8_t const* p)
{
uint32_t const offset = 3 & reinterpret_cast<uintptr_t>(p);
uint32_t const shift_bits = offset << 3;
auto const* p32 = reinterpret_cast<uint32_t const*>(p - offset);
if constexpr (cuda::std::is_same_v<T, uint32_t>) {
auto const v0 = p32[0];
return (offset) ? __funnelshift_r(v0, p32[1], shift_bits) : v0;
} else if constexpr (cuda::std::is_same_v<T, uint64_t>) {
auto v0 = p32[0];
auto v1 = p32[1];
if (offset) {
auto const next = p32[2];
v0 = __funnelshift_r(v0, v1, shift_bits);
v1 = __funnelshift_r(v1, next, shift_bits);
}
return (((uint64_t)v1) << 32) | v0;
} else if constexpr (cuda::std::is_same_v<T, uint3>) {
uint3 value{.x = 0, .y = 0, .z = 0};
value.x = p32[0];
value.y = p32[1];
value.z = p32[2];
if (offset) {
auto const next = p32[3];
value.x = __funnelshift_r(value.x, value.y, shift_bits);
value.y = __funnelshift_r(value.y, value.z, shift_bits);
value.z = __funnelshift_r(value.z, next, shift_bits);
}
return value;
}
}

template <typename T>
requires(cuda::std::is_integral_v<T> or cuda::std::is_same_v<T, uint3>)
inline __device__ T unaligned_load(uint8_t const* p)
{
T value;
Expand Down