Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
eca9647
beginning of refactor
etwest Oct 10, 2023
fe84d2f
refactor sketch and supernode. Needs more tests
etwest Oct 12, 2023
1348116
working refactor done. Missing some content
etwest Oct 18, 2023
778c656
rework how buckets are constructed
etwest Oct 19, 2023
0dcc30f
create a bucket type to make alpha and gamma values contiguous
etwest Oct 19, 2023
50576fd
sketch variable naming and configs
etwest Oct 20, 2023
dae73de
small bits of pull request feedback
etwest Oct 20, 2023
41c206a
comments, small changes, better CI
etwest Oct 21, 2023
6c60f01
workflow bug fix
etwest Oct 21, 2023
0c2d5c0
add config files... oops
etwest Oct 21, 2023
87e6232
workflow: slight tweak to matrix
etwest Oct 21, 2023
fb06eb0
1 column, better sketch constructor, spanning forest query
etwest Oct 24, 2023
bda0433
new query procedure, dsu, and more
etwest Oct 25, 2023
b04e86f
better query and raw buckets
etwest Oct 26, 2023
494c7f4
reworking query. Probably not correct yet.
etwest Oct 27, 2023
80c3e3d
more code but still not working
etwest Oct 27, 2023
8090892
working after changes to dsu
etwest Oct 27, 2023
6034472
switch dsu to random priority to fix a bug
etwest Oct 29, 2023
c44f948
better config printing
etwest Oct 31, 2023
496da2f
more tests and delete supernode tests
etwest Nov 2, 2023
e522c1e
add path correctness experiment
etwest Nov 5, 2023
9ebdb69
progress on better query algorithm. Not working yet
etwest Nov 10, 2023
23c94ff
working but somewhat slow
etwest Nov 11, 2023
3aaaa64
improved query performance
etwest Nov 14, 2023
ab2c6ec
remove sort bottleneck. Begin investigating post_processing as bottle…
etwest Nov 17, 2023
f1ae69f
make seed a mandatory argument of the CC algorithm
etwest Nov 26, 2023
c0c72d9
merge with refactor and more efficient return type
etwest Nov 29, 2023
ec061c7
oops forgot some files
etwest Nov 29, 2023
c23514c
add spanning forest return type
etwest Nov 29, 2023
3fbf54e
fewer rounds from math
etwest Dec 1, 2023
93d153e
fix tests
etwest Dec 2, 2023
d188fc3
Merge pull request #137 from GraphStreamingProject/refactor_query
etwest Dec 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ name: CMake

on: [push]

env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: Release

jobs:
full-build:
# The CMake configure and build commands are platform agnostic and should work equally
Expand All @@ -16,16 +12,11 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, ubuntu-20.04]
flags: ['"-DL0_SAMPLING"', '"-DNO_EAGER_DSU"', '""']

steps:
- uses: actions/checkout@v2

# - uses: actions/cache@v2
# id: InstalledDependencyCache
# with:
# path: ${{runner.workspace}}/install
# key: InstalledDependencyCache

- name: Create Build Environment
# Some projects don't allow in-source building, so create a separate build directory
# We'll use this as our working directory for all subsequent commands
Expand All @@ -36,18 +27,13 @@ jobs:
# access regardless of the host operating system
shell: bash
working-directory: ${{runner.workspace}}/build
# Note the current convention is to use the -S and -B options here to specify source
# and build directories, but this is only available with CMake 3.13 and higher.
# The CMake binaries on the Github Actions machines are (as of this writing) 3.12
env:
BOOST_ROOT: ${{runner.workspace}}/boost
run: cmake -DCMAKE_INSTALL_PREFIX=${{runner.workspace}}/install -DCMAKE_INSTALL_INCLUDEDIR=${{runner.workspace}}/install/include -DCMAKE_BUILD_TYPE=$BUILD_TYPE $GITHUB_WORKSPACE
run: cmake -DCMAKE_CXX_FLAGS=${{matrix.flags}} -DCMAKE_INSTALL_PREFIX=${{runner.workspace}}/install -DCMAKE_INSTALL_INCLUDEDIR=${{runner.workspace}}/install/include $GITHUB_WORKSPACE

- name: Build
working-directory: ${{runner.workspace}}/build
shell: bash
# Execute the build. You can specify a specific target with "--target <NAME>"
run: cmake --build . --config $BUILD_TYPE
run: cmake --build .

- name: Unit Testing
working-directory: ${{runner.workspace}}/build
Expand Down
35 changes: 18 additions & 17 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS ON)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION TRUE)

# Make the default build type Release. If user or another
# project sets a different value than use that
Expand Down Expand Up @@ -90,48 +91,48 @@ set(BUILD_SHARED_LIBS "${SAVED_BUILD_SHARED_LIBS}" CACHE BOOL "" FORCE)
# AVAILABLE COMPILATION DEFINITIONS:
# VERIFY_SAMPLES_F Use a deterministic connected-components
# algorithm to verify post-processing.
# USE_EAGER_DSU Use the eager DSU query optimization if
# this flag is present.
# NO_EAGER_DSU Do not use the eager DSU query optimization
# if this flag is present.
# L0_SAMPLING Run the CubeSketch l0 sampling algorithm
# to ensure that we sample uniformly.
# Otherwise, run a support finding algorithm.
#
# Example:
# cmake -DCMAKE_CXX_FLAGS="-DL0_Sampling" ..

add_library(GraphZeppelin
src/graph.cpp
src/graph_configuration.cpp
src/supernode.cpp
src/graph_worker.cpp
src/l0_sampling/sketch.cpp
src/cc_sketch_alg.cpp
src/driver_configuration.cpp
src/cc_alg_configuration.cpp
src/sketch.cpp
src/util.cpp)
add_dependencies(GraphZeppelin GutterTree)
target_link_libraries(GraphZeppelin PUBLIC xxhash GutterTree)
target_include_directories(GraphZeppelin PUBLIC include/ include/l0_sampling/)
target_include_directories(GraphZeppelin PUBLIC include/)
target_compile_options(GraphZeppelin PUBLIC -fopenmp)
target_link_options(GraphZeppelin PUBLIC -fopenmp)
target_compile_definitions(GraphZeppelin PUBLIC XXH_INLINE_ALL USE_EAGER_DSU)
target_compile_definitions(GraphZeppelin PUBLIC XXH_INLINE_ALL)

add_library(GraphZeppelinVerifyCC
src/graph.cpp
src/graph_configuration.cpp
src/supernode.cpp
src/graph_worker.cpp
src/l0_sampling/sketch.cpp
src/cc_sketch_alg.cpp
src/driver_configuration.cpp
src/cc_alg_configuration.cpp
src/sketch.cpp
src/util.cpp
test/util/file_graph_verifier.cpp
test/util/mat_graph_verifier.cpp)
add_dependencies(GraphZeppelinVerifyCC GutterTree)
target_link_libraries(GraphZeppelinVerifyCC PUBLIC xxhash GutterTree)
target_include_directories(GraphZeppelinVerifyCC PUBLIC include/ include/l0_sampling/ include/test/)
target_include_directories(GraphZeppelinVerifyCC PUBLIC include/ include/test/)
target_compile_options(GraphZeppelinVerifyCC PUBLIC -fopenmp)
target_link_options(GraphZeppelinVerifyCC PUBLIC -fopenmp)
target_compile_definitions(GraphZeppelinVerifyCC PUBLIC XXH_INLINE_ALL VERIFY_SAMPLES_F USE_EAGER_DSU)
target_compile_definitions(GraphZeppelinVerifyCC PUBLIC XXH_INLINE_ALL VERIFY_SAMPLES_F)

if (BUILD_EXE)
add_executable(tests
test/test_runner.cpp
test/graph_test.cpp
test/sketch_test.cpp
test/supernode_test.cpp
test/dsu_test.cpp
test/util_test.cpp
test/util/file_graph_verifier.cpp
Expand Down
106 changes: 106 additions & 0 deletions include/ascii_file_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#pragma once

#include <fstream>
#include <iostream>
#include <cassert>

#include "graph_stream.h"

class AsciiFileStream : public GraphStream {
public:
AsciiFileStream(std::string file_name, bool has_type = true)
: file_name(file_name), has_type(has_type) {

bool stream_exists = false;
{
std::fstream check(file_name, std::fstream::in);
stream_exists = check.is_open();
}

if (stream_exists)
stream_file.open(file_name, std::fstream::in | std::fstream::out);
else
stream_file.open(file_name, std::fstream::in | std::fstream::out | std::fstream::trunc);

if (!stream_file.is_open())
throw StreamException("AsciiFileStream: could not open " + file_name);

if (stream_exists)
stream_file >> num_vertices >> num_edges;
}

inline size_t get_update_buffer(GraphStreamUpdate* upd_buf, size_t num_updates) {
assert(upd_buf != nullptr);

size_t i = 0;
for (; i < num_updates; i++) {
GraphStreamUpdate& upd = upd_buf[i];

if (upd_offset >= num_edges || upd_offset >= break_edge_idx) {
upd.type = BREAKPOINT;
upd.edge = {0, 0};
return i + 1;
}
int type = INSERT;
if (has_type)
stream_file >> type;
stream_file >> upd.edge.src >> upd.edge.dst;
upd.type = type;
++upd_offset;
}
return i;
}

// get_update_buffer() is not thread safe
inline bool get_update_is_thread_safe() { return false; }

inline void write_header(node_id_t num_verts, edge_id_t num_edg) {
stream_file.seekp(0); // seek to beginning
stream_file << num_verts << " " << num_edg << std::endl;
num_vertices = num_verts;
num_edges = num_edg;
}

inline void write_updates(GraphStreamUpdate* upd_buf, edge_id_t num_updates) {
for (edge_id_t i = 0; i < num_updates; i++) {
auto upd = upd_buf[i];
if (has_type)
stream_file << (int) upd.type << " ";
stream_file << upd.edge.src << " " << upd.edge.dst << std::endl;
}
}

inline void set_num_edges(edge_id_t num_edg) {
num_edges = num_edg;
}

inline void seek(edge_id_t pos) {
if (pos != 0)
throw StreamException("AsciiFileStream: stream does not support seeking by update index");
stream_file.seekp(0); stream_file.seekg(0);
upd_offset = 0;
}

inline bool set_break_point(edge_id_t break_idx) {
if (break_idx < upd_offset) return false;
break_edge_idx = break_idx;
return true;
}

inline void serialize_metadata(std::ostream& out) {
out << AsciiFile << " " << file_name << std::endl;
}

static GraphStream* construct_from_metadata(std::istream& in) {
std::string file_name_from_stream;
in >> file_name_from_stream;
return new AsciiFileStream(file_name_from_stream);
}

private:
const std::string file_name;
const bool has_type;
std::fstream stream_file;
edge_id_t break_edge_idx = -1;
edge_id_t upd_offset = 0;
};
153 changes: 153 additions & 0 deletions include/binary_file_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#pragma once
#include <fcntl.h>
#include <unistd.h> //open and close

#include <atomic>
#include <cassert>
#include <cstring>
#include <fstream>
#include <iostream>

#include "graph_stream.h"

class BinaryFileStream : public GraphStream {
public:
/**
* Open a BinaryFileStream
* @param file_name Name of the stream file
*/
BinaryFileStream(std::string file_name, bool open_read_only = true)
: read_only(open_read_only), file_name(file_name) {
if (read_only)
stream_fd = open(file_name.c_str(), O_RDONLY, S_IRUSR);
else
stream_fd = open(file_name.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);

if (!stream_fd)
throw StreamException("BinaryFileStream: Could not open stream file " + file_name +
". Does it exist?");

// read header from the input file
if (read_only) {
if (read(stream_fd, (char*)&num_vertices, sizeof(num_vertices)) != sizeof(num_vertices))
throw StreamException("BinaryFileStream: Could not read number of nodes");
if (read(stream_fd, (char*)&num_edges, sizeof(num_edges)) != sizeof(num_edges))
throw StreamException("BinaryFileStream: Could not read number of edges");

end_of_file = (num_edges * edge_size) + header_size;
stream_off = header_size;
set_break_point(-1);
}
}

~BinaryFileStream() {
if (stream_fd) close(stream_fd);
}

inline size_t get_update_buffer(GraphStreamUpdate* upd_buf, size_t num_updates) {
assert(upd_buf != nullptr);

// many threads may execute this line simultaneously creating edge cases
size_t bytes_to_read = num_updates * edge_size;
size_t read_off = stream_off.fetch_add(bytes_to_read, std::memory_order_relaxed);

// catch these edge cases here
if (read_off + bytes_to_read > break_index) {
bytes_to_read = read_off > break_index ? 0 : break_index - read_off;
stream_off = break_index.load();
upd_buf[bytes_to_read / edge_size] = {BREAKPOINT, {0, 0}};
}
// read into the buffer
assert(bytes_to_read % edge_size == 0);
size_t bytes_read = 0;
while (bytes_read < bytes_to_read) {
int r =
pread(stream_fd, upd_buf + bytes_read, bytes_to_read - bytes_read, read_off + bytes_read);
if (r == -1) throw StreamException("BinaryFileStream: Could not perform pread");
if (r == 0) throw StreamException("BinaryFileStream: pread() got no data");
bytes_read += r;
}

size_t upds_read = bytes_to_read / edge_size;
if (upds_read < num_updates) {
GraphStreamUpdate& upd = upd_buf[upds_read];
upd.type = BREAKPOINT;
upd.edge = {0, 0};
return upds_read + 1;
}
return upds_read;
}

// get_update_buffer() is thread safe! :)
inline bool get_update_is_thread_safe() { return true; }

// write the number of nodes and edges to the stream
inline void write_header(node_id_t num_verts, edge_id_t num_edg) {
if (read_only) throw StreamException("BinaryFileStream: stream not open for writing!");

lseek(stream_fd, 0, SEEK_SET);
int r1 = write(stream_fd, (char*)&num_verts, sizeof(num_verts));
int r2 = write(stream_fd, (char*)&num_edg, sizeof(num_edg));

if (r1 + r2 != header_size) {
perror("write_header");
throw StreamException("BinaryFileStream: could not write header to stream file");
}

stream_off = header_size;
num_vertices = num_verts;
num_edges = num_edg;
end_of_file = (num_edges * edge_size) + header_size;
}

// write an edge to the stream
inline void write_updates(GraphStreamUpdate* upd, edge_id_t num_updates) {
if (read_only) throw StreamException("BinaryFileStream: stream not open for writing!");

size_t bytes_to_write = num_updates * edge_size;
// size_t write_off = stream_off.fetch_add(bytes_to_write, std::memory_order_relaxed);

size_t bytes_written = 0;
while (bytes_written < bytes_to_write) {
int r = write(stream_fd, (char*)upd + bytes_written, bytes_to_write - bytes_written);
if (r == -1) throw StreamException("BinaryFileStream: Could not perform write");
bytes_written += r;
}
}

// seek to a position in the stream
inline void seek(edge_id_t edge_idx) { stream_off = edge_idx * edge_size + header_size; }

inline bool set_break_point(edge_id_t break_idx) {
edge_id_t byte_index = END_OF_STREAM;
if (break_idx != END_OF_STREAM) {
byte_index = header_size + break_idx * edge_size;
}
if (byte_index < stream_off) return false;
break_index = byte_index;
if (break_index > end_of_file) break_index = end_of_file;
return true;
}

inline void serialize_metadata(std::ostream& out) {
out << BinaryFile << " " << file_name << std::endl;
}

static GraphStream* construct_from_metadata(std::istream& in) {
std::string file_name_from_stream;
in >> file_name_from_stream;
return new BinaryFileStream(file_name_from_stream);
}

private:
int stream_fd;
edge_id_t end_of_file;
std::atomic<edge_id_t> stream_off;
std::atomic<edge_id_t> break_index;
const bool read_only; // is stream read only?
const std::string file_name;

// size of binary encoded edge and buffer read size
static constexpr size_t edge_size = sizeof(GraphStreamUpdate);
static constexpr size_t header_size = sizeof(node_id_t) + sizeof(edge_id_t);
};
Loading