Shamrock 2025.10.0
Astrophysical Code
Loading...
Searching...
No Matches
RequestList.hpp
Go to the documentation of this file.
1// -------------------------------------------------------//
2//
3// SHAMROCK code for hydrodynamics
4// Copyright (c) 2021-2026 Timothée David--Cléris <tim.shamrock@proton.me>
5// SPDX-License-Identifier: CeCILL Free Software License Agreement v2.1
6// Shamrock is licensed under the CeCILL 2.1 License, see LICENSE for more information
7//
8// -------------------------------------------------------//
9
10#pragma once
11
20#include "shambase/time.hpp"
21#include "shamcomm/logs.hpp"
22#include "shamcomm/wrapper.hpp"
23#include <vector>
24
25namespace shamalgs::collective {
26
28
29 std::vector<MPI_Request> rqs;
30 std::vector<bool> is_ready;
31
32 size_t ready_count = 0;
33
34 public:
35 MPI_Request &new_request() {
36 rqs.emplace_back();
37 is_ready.push_back(false);
38 return rqs.back();
39 }
40
41 size_t size() const { return rqs.size(); }
42 bool is_event_ready(size_t i) const { return is_ready[i]; }
43 std::vector<MPI_Request> &requests() { return rqs; }
44
45 void test_ready() {
46 for (size_t i = 0; i < rqs.size(); i++) {
47 if (!is_ready[i]) {
48 int ready;
49 shamcomm::mpi::Test(&rqs[i], &ready, MPI_STATUS_IGNORE);
50 if (ready) {
51 is_ready[i] = true;
52 ready_count++;
53 }
54 }
55 }
56 }
57
58 bool all_ready() const { return ready_count == rqs.size(); }
59
60 void wait_all() {
61 if (ready_count == rqs.size()) {
62 return;
63 }
64 std::vector<MPI_Status> st_lst(rqs.size());
65 shamcomm::mpi::Waitall(
66 shambase::narrow_or_throw<i32>(rqs.size()), rqs.data(), st_lst.data());
67 ready_count = rqs.size();
68 is_ready.assign(rqs.size(), true);
69 }
70
71 size_t remain_count_no_test() { return rqs.size() - ready_count; }
72
73 size_t remain_count() {
74 test_ready();
75 return rqs.size() - ready_count;
76 }
77
78 void report_timeout() const {
79 std::string err_msg = "";
80 for (size_t i = 0; i < rqs.size(); i++) {
81 if (!is_ready[i]) {
82 err_msg += shambase::format("request {} is not ready\n", i);
83 }
84 }
85 std::string msg = shambase::format("timeout : \n{}", err_msg);
87 }
88
89 // spin lock until the number of in-flight requests is less than max_in_flight
90 void spin_lock_partial_wait(size_t max_in_flight, f64 timeout, f64 print_freq) {
91
92 if (rqs.size() < max_in_flight) {
93 return;
94 }
95
96 shambase::Timer twait;
97 twait.start();
98 f64 last_print_time = 0;
99 size_t in_flight;
100
101 while ((in_flight = remain_count()) >= max_in_flight) {
102 twait.end();
103 if (twait.elasped_sec() > timeout) {
104 report_timeout();
105 }
106
107 if (twait.elasped_sec() - last_print_time > print_freq) {
108 logger::warn_ln(
109 "SparseComm",
110 "too many messages in flight :",
111 in_flight,
112 "/",
113 max_in_flight);
114 last_print_time = twait.elasped_sec();
115 }
116 }
117 }
118 };
119
120} // namespace shamalgs::collective
double f64
Alias for double.
Class Timer measures the time elapsed since the timer was started.
Definition time.hpp:96
void end()
Stops the timer and stores the elapsed time in nanoseconds.
Definition time.hpp:111
f64 elasped_sec() const
Converts the stored nanosecond time to a floating point representation in seconds.
Definition time.hpp:123
void start()
Starts the timer.
Definition time.hpp:106
void throw_with_loc(std::string message, SourceLocation loc=SourceLocation{})
Throw an exception and append the source location to it.
Utilities for safe type narrowing conversions.