Skip to content

Commit

Permalink
remove std::thread read_csv_worker, useless and buggy in seastar
Browse files Browse the repository at this point in the history
  • Loading branch information
longqimin committed Jan 10, 2025
1 parent 85d8c63 commit c17b8f8
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 33 deletions.
8 changes: 5 additions & 3 deletions include/internal/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,12 @@ namespace csv {
return false;
else {
// Reading thread is not active => start another one
if (this->read_csv_worker.joinable())
this->read_csv_worker.join();
// if (this->read_csv_worker.joinable())
// this->read_csv_worker.join();

// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv(internals::ITERATION_CHUNK_SIZE);

this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
}
}
else if (this->records->front().size() != this->n_cols &&
Expand Down
13 changes: 7 additions & 6 deletions include/internal/csv_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ namespace csv {
CSVReader& operator=(const CSVReader&) = delete; // No copy assignment
CSVReader& operator=(CSVReader&& other) = default;
~CSVReader() {
if (this->read_csv_worker.joinable()) {
this->read_csv_worker.join();
}
// if (this->read_csv_worker.joinable()) {
// this->read_csv_worker.join();
// }
}

/** @name Retrieving CSV Rows */
Expand Down Expand Up @@ -216,13 +216,14 @@ namespace csv {

/** @name Multi-Threaded File Reading: Flags and State */
///@{
std::thread read_csv_worker; /**< Worker thread for read_csv() */
// std::thread read_csv_worker; /**< Worker thread for read_csv() */
///@}

/** Read initial chunk to get metadata */
void initial_read() {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);
}

void trim_header();
Expand Down
5 changes: 3 additions & 2 deletions include/internal/csv_reader_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ namespace csv {
/** Return an iterator to the first row in the reader */
CSV_INLINE CSVReader::iterator CSVReader::begin() {
if (this->records->empty()) {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);

// Still empty => return end iterator
if (this->records->empty()) return this->end();
Expand Down
26 changes: 15 additions & 11 deletions single_include/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6354,9 +6354,9 @@ namespace csv {
CSVReader& operator=(const CSVReader&) = delete; // No copy assignment
CSVReader& operator=(CSVReader&& other) = default;
~CSVReader() {
if (this->read_csv_worker.joinable()) {
this->read_csv_worker.join();
}
// if (this->read_csv_worker.joinable()) {
// this->read_csv_worker.join();
// }
}

/** @name Retrieving CSV Rows */
Expand Down Expand Up @@ -6436,13 +6436,14 @@ namespace csv {

/** @name Multi-Threaded File Reading: Flags and State */
///@{
std::thread read_csv_worker; /**< Worker thread for read_csv() */
// std::thread read_csv_worker; /**< Worker thread for read_csv() */
///@}

/** Read initial chunk to get metadata */
void initial_read() {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);
}

void trim_header();
Expand Down Expand Up @@ -7576,10 +7577,12 @@ namespace csv {
return false;
else {
// Reading thread is not active => start another one
if (this->read_csv_worker.joinable())
this->read_csv_worker.join();
// if (this->read_csv_worker.joinable())
// this->read_csv_worker.join();

// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv(internals::ITERATION_CHUNK_SIZE);

this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
}
}
else if (this->records->front().size() != this->n_cols &&
Expand Down Expand Up @@ -7733,8 +7736,9 @@ namespace csv {
/** Return an iterator to the first row in the reader */
CSV_INLINE CSVReader::iterator CSVReader::begin() {
if (this->records->empty()) {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);

// Still empty => return end iterator
if (this->records->empty()) return this->end();
Expand Down
26 changes: 15 additions & 11 deletions single_include_test/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6354,9 +6354,9 @@ namespace csv {
CSVReader& operator=(const CSVReader&) = delete; // No copy assignment
CSVReader& operator=(CSVReader&& other) = default;
~CSVReader() {
if (this->read_csv_worker.joinable()) {
this->read_csv_worker.join();
}
// if (this->read_csv_worker.joinable()) {
// this->read_csv_worker.join();
// }
}

/** @name Retrieving CSV Rows */
Expand Down Expand Up @@ -6436,13 +6436,14 @@ namespace csv {

/** @name Multi-Threaded File Reading: Flags and State */
///@{
std::thread read_csv_worker; /**< Worker thread for read_csv() */
// std::thread read_csv_worker; /**< Worker thread for read_csv() */
///@}

/** Read initial chunk to get metadata */
void initial_read() {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);
}

void trim_header();
Expand Down Expand Up @@ -7576,10 +7577,12 @@ namespace csv {
return false;
else {
// Reading thread is not active => start another one
if (this->read_csv_worker.joinable())
this->read_csv_worker.join();
// if (this->read_csv_worker.joinable())
// this->read_csv_worker.join();

// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv(internals::ITERATION_CHUNK_SIZE);

this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
}
}
else if (this->records->front().size() != this->n_cols &&
Expand Down Expand Up @@ -7733,8 +7736,9 @@ namespace csv {
/** Return an iterator to the first row in the reader */
CSV_INLINE CSVReader::iterator CSVReader::begin() {
if (this->records->empty()) {
this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
this->read_csv_worker.join();
// this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE);
// this->read_csv_worker.join();
this->read_csv(internals::ITERATION_CHUNK_SIZE);

// Still empty => return end iterator
if (this->records->empty()) return this->end();
Expand Down

0 comments on commit c17b8f8

Please sign in to comment.