Shamrock 2025.10.0
Astrophysical Code
Loading...
Searching...
No Matches
ShamrockDump.cpp
Go to the documentation of this file.
1// -------------------------------------------------------//
2//
3// SHAMROCK code for hydrodynamics
4// Copyright (c) 2021-2026 Timothée David--Cléris <tim.shamrock@proton.me>
5// SPDX-License-Identifier: CeCILL Free Software License Agreement v2.1
6// Shamrock is licensed under the CeCILL 2.1 License, see LICENSE for more information
7//
8// -------------------------------------------------------//
9
16
18#include "shamcmdopt/env.hpp"
19#include "shamcomm/logs.hpp"
21
22namespace shamrock {
23
24 void write_shamrock_dump(std::string fname, std::string metadata_user, PatchScheduler &sched) {
25
26 StackEntry stack_loc{};
27
28 std::string metadata_patch = sched.serialize_patch_metadata().dump(4);
29
30 using namespace shamrock::patch;
31
32 std::vector<u64> pids;
33 std::vector<u64> bytecounts;
34 std::vector<sham::DeviceBuffer<u8>> datas;
35
36 // serialize patchdatas and push them into dat
37 sched.patch_data.for_each_patchdata([&](u64 pid, PatchDataLayer &pdat) {
38 auto ser_sz = pdat.serialize_buf_byte_size();
39 shamalgs::SerializeHelper ser(shamsys::instance::get_compute_scheduler_ptr());
40 ser.allocate(ser_sz, true);
41 pdat.serialize_buf(ser);
42
43 auto tmp = ser.finalize();
44 size_t bytecount = tmp.get_size();
45
46 pids.push_back(pid);
47 bytecounts.push_back(bytecount);
48 datas.push_back(std::move(tmp));
49 });
50
51 std::vector<u64> all_pids;
52 std::vector<u64> all_bytecounts;
53
55 pids, get_mpi_type<u64>(), all_pids, get_mpi_type<u64>(), MPI_COMM_WORLD);
57 bytecounts, get_mpi_type<u64>(), all_bytecounts, get_mpi_type<u64>(), MPI_COMM_WORLD);
58
59 std::vector<u64> all_offsets = all_bytecounts;
60
61 std::exclusive_scan(all_offsets.begin(), all_offsets.end(), all_offsets.begin(), u64{0});
62
63 using namespace nlohmann;
64
65 json j;
66 j["pids"] = all_pids;
67 j["bytecounts"] = all_bytecounts;
68 j["offsets"] = all_offsets;
69
70 std::string sout = j.dump(4);
71
72 // Write to the file
73
74 u64 head_ptr = 0;
75 MPI_File mfile{};
76
77 shamcomm::open_reset_file(mfile, fname);
78
79 shambase::Timer timer;
80 timer.start();
81
82 // do some perf investigation before enabling preallocation
83 bool preallocate = false;
84 if (preallocate) {
85 MPI_Offset tot_byte = all_offsets.back() + all_bytecounts.back() + metadata_user.size()
86 + metadata_patch.size() + sout.size() + sizeof(std::size_t) * 3;
87 MPICHECK(MPI_File_preallocate(mfile, tot_byte));
88 }
89
90 shamalgs::collective::write_header(mfile, metadata_user, head_ptr);
91 shamalgs::collective::write_header(mfile, metadata_patch, head_ptr);
92 shamalgs::collective::write_header(mfile, sout, head_ptr);
93
94 shamlog_debug_ln(
95 "ShamrockDump",
96 shambase::format(
97 "table sizes {} {} {}", metadata_patch.size(), metadata_user.size(), sout.size()));
98
99 if (/*do check*/ true) {
100 auto check_same_mpi = [](std::string s) {
101 u64 out = shamalgs::collective::allreduce_sum(s.size());
102 if (out != s.size() * shamcomm::world_size()) {
104 "ShamrockDump",
105 shambase::format(
106 "string size mismatch between all processes,\n size : {}\nthe "
107 "string : {}\n",
108 s.size(),
109 s));
111 "size mismatch in shamrock dump header");
112 }
113 };
114
115 check_same_mpi(metadata_user);
116 check_same_mpi(metadata_patch);
117 check_same_mpi(sout);
118 }
119
120 if (!shamcmdopt::getenv_str("SHAMDUMP_OFFSET_MODE_OLD").has_value()) {
121 // reset MPI view
122 MPICHECK(MPI_File_set_view(mfile, 0, MPI_BYTE, MPI_CHAR, "native", MPI_INFO_NULL));
123 }
124
125 // map of patch id -> all_pids idx
126 std::unordered_map<u64, size_t> map{};
127 for (u32 i = 0; i < all_pids.size(); i++) {
128 map[all_pids[i]] = i;
129 }
130
131 for (u32 i = 0; i < datas.size(); i++) {
132
133 u64 pid = pids[i];
134 u64 bytecount = bytecounts[i];
135
136 size_t off = all_offsets[map[pid]];
137 auto &data = datas[i];
138
139 shamcomm::CommunicationBuffer buf(data, shamsys::instance::get_compute_scheduler_ptr());
140
141 shamalgs::collective::write_at_large(mfile, buf.get_ptr(), bytecount, head_ptr + off);
142 }
143
144 // write data to file
145
146 MPI_File_close(&mfile);
147 timer.stop();
148
149 if (shamcomm::world_rank() == 0) {
150 size_t plist_len = all_offsets.size();
151 size_t max_head = all_offsets[plist_len - 1] + all_bytecounts[plist_len - 1] + head_ptr;
153 "Shamrock Dump",
154 shambase::format(
155 "dump to {}\n - took {}, bandwidth = {}/s",
156 fname,
157 timer.get_time_str(),
158 shambase::readable_sizeof(max_head / timer.elapsed_sec())));
159 }
160 }
161
162 void load_shamrock_dump(std::string fname, std::string &metadata_user, ShamrockCtx &ctx) {
163
164 StackEntry stack_loc{};
165
166 u64 head_ptr = 0;
167 MPI_File mfile{};
168
169 shamcomm::open_read_only_file(mfile, fname);
170
171 shambase::Timer timer;
172 timer.start();
173
174 std::string metadata_patch{};
175 std::string patchdata_infos{};
176
177 metadata_user = shamalgs::collective::read_header(mfile, head_ptr);
178 metadata_patch = shamalgs::collective::read_header(mfile, head_ptr);
179 patchdata_infos = shamalgs::collective::read_header(mfile, head_ptr);
180
181 if (!shamcmdopt::getenv_str("SHAMDUMP_OFFSET_MODE_OLD").has_value()) {
182 // reset MPI view
183 MPICHECK(MPI_File_set_view(mfile, 0, MPI_BYTE, MPI_CHAR, "native", MPI_INFO_NULL));
184 }
185 // logger::raw_ln(metadata_user, metadata_patch, patchdata_infos);
186
187 using namespace nlohmann;
188
189 json jmeta_patch = json::parse(metadata_patch);
190 json jpdat_info = json::parse(patchdata_infos);
191
192 ctx.pdata_layout_new();
193 *ctx.pdl = jmeta_patch.at("patchdata_layout").get<patch::PatchDataLayerLayout>();
194 ctx.init_sched(
195 jmeta_patch.at("crit_patch_split").get<u64>(),
196 jmeta_patch.at("crit_patch_merge").get<u64>());
197
198 auto &sched = shambase::get_check_ref(ctx.sched);
199
200 sched.patch_list = jmeta_patch.at("patchlist").get<SchedulerPatchList>();
201 sched.patch_tree = jmeta_patch.at("patchtree").get<scheduler::PatchTree>();
202 sched.patch_data.sim_box.from_json(jmeta_patch.at("sim_box"));
203
204 // edit patch owner to fit in new world size, or spread if more processes now
205 // a bit dirty but gets the job done for now
206 // ideally we should call a load balance once
207 for (auto &p : sched.patch_list.global) {
208 p.node_owner_id = p.node_owner_id % shamcomm::world_size();
209 }
210
211 // rebuild local patch list
212 auto loc_ids = sched.patch_list.build_local();
213
214 // Load patchdata according to new LB
215
216 std::vector<u64> all_offsets;
217 std::vector<u64> all_pids;
218 std::vector<u64> all_bytecounts;
219
220 all_bytecounts = jpdat_info.at("bytecounts").get<std::vector<u64>>();
221 all_offsets = jpdat_info.at("offsets").get<std::vector<u64>>();
222 all_pids = jpdat_info.at("pids").get<std::vector<u64>>();
223
224 struct PatchFileOffset {
225 u64 offset, bytecount;
226 };
227
228 std::unordered_map<u64, PatchFileOffset> off_table;
229
230 for (u32 i = 0; i < all_bytecounts.size(); i++) {
231 off_table[all_pids[i]] = {.offset = all_offsets[i], .bytecount = all_bytecounts[i]};
232 }
233
234 for (const auto &p : sched.patch_list.local) {
235 u64 pid = p.id_patch;
236 auto loc_file_info = off_table[pid];
237
239 loc_file_info.bytecount, shamsys::instance::get_compute_scheduler_ptr());
240
242 mfile, buf.get_ptr(), loc_file_info.bytecount, head_ptr + loc_file_info.offset);
243
245
247 shamsys::instance::get_compute_scheduler_ptr(), std::move(out), true);
248
249 patch::PatchDataLayer pdat = patch::PatchDataLayer::deserialize_buf(ser, ctx.pdl);
250
251 sched.patch_data.owned_data.add_obj(pid, std::move(pdat));
252 }
253
254 MPI_File_close(&mfile);
255 timer.stop();
256
257 if (shamcomm::world_rank() == 0) {
258 size_t plist_len = all_offsets.size();
259 size_t max_head = all_offsets[plist_len - 1] + all_bytecounts[plist_len - 1] + head_ptr;
261 "Shamrock Dump",
262 shambase::format(
263 "load dump from {}\n - took {}, bandwidth = {}/s",
264 fname,
265 timer.get_time_str(),
266 shambase::readable_sizeof(max_head / timer.elapsed_sec())));
267 }
268 }
269
270} // namespace shamrock
std::uint32_t u32
32 bit unsigned integer
std::uint64_t u64
64 bit unsigned integer
The MPI scheduler.
SchedulerPatchData patch_data
handle the data of the patches of the scheduler
Handle the patch list of the mpi scheduler.
A buffer allocated in USM (Unified Shared Memory).
size_t get_size() const
Gets the number of elements in the buffer.
Class Timer measures the time elapsed since the timer was started.
Definition time.hpp:35
std::string get_time_str() const
Converts the stored nanosecond time to a string representation.
Definition time.hpp:78
f64 elapsed_sec() const
Converts the stored nanosecond time to a floating point representation in seconds.
Definition time.hpp:87
void start()
Starts the timer.
Definition time.hpp:50
void stop()
Stops the timer and stores the elapsed time in nanoseconds.
Definition time.hpp:64
Shamrock communication buffers.
static sham::DeviceBuffer< u8 > convert_usm(CommunicationBuffer &&buf)
destroy the buffer and recover the held object
PatchDataLayer container class, the layout is described in patchdata_layout.
Patch Tree : Tree structure organisation for an abstract list of patches Nb : this tree is compatible...
Definition PatchTree.hpp:29
std::vector< int > vector_allgatherv(const std::vector< T > &send_vec, const MPI_Datatype &send_type, std::vector< T > &recv_vec, const MPI_Datatype &recv_type, const MPI_Comm comm)
allgatherv on vector with size query (size querying variant of vector_allgatherv_ks) //TODO add fault...
Definition exchanges.hpp:98
#define MPICHECK(mpicall)
Shortcut macro to check MPI return codes.
std::string readable_sizeof(double size)
given a sizeof value return a readble string Example : readable_sizeof(1e9) -> "1....
Definition string.hpp:84
void throw_with_loc(std::string message, SourceLocation loc=SourceLocation{})
Throw an exception and append the source location to it.
T & get_check_ref(const std::unique_ptr< T > &ptr, SourceLocation loc=SourceLocation())
Takes a std::unique_ptr and returns a reference to the object it holds. It throws a std::runtime_erro...
Definition memory.hpp:110
std::optional< std::string > getenv_str(const char *env_var)
Get the content of the environment variable if it exist.
Definition env.cpp:24
void open_reset_file(MPI_File &fh, const std::string &fname)
Open a MPI file and remove its content.
Definition io.cpp:24
i32 world_rank()
Gives the rank of the current process in the MPI communicator.
Definition worldInfo.cpp:40
i32 world_size()
Gives the size of the MPI communicator.
Definition worldInfo.cpp:38
void open_read_only_file(MPI_File &fh, const std::string &fname)
Open a mpi file in read only mode.
Definition io.cpp:52
namespace for the main framework
Definition __init__.py:1
void load_shamrock_dump(std::string fname, std::string &metadata_user, ShamrockCtx &ctx)
Load a Shamrock dump file and restore the state of the patches and retreive user metadata.
void write_shamrock_dump(std::string fname, std::string metadata_user, PatchScheduler &sched)
Write a Shamrock dump file containing the current state of the patches and user supplied metadata.
std::string read_header(MPI_File fh, u64 &file_head_ptr)
Reads a string from a file using MPI and updates the file head pointer. The string is preceded by its...
Definition io.hpp:276
void write_header(MPI_File fh, std::string s, u64 &file_head_ptr)
Writes a string to a file using MPI and updates the file head pointer. The string is preceded by its ...
Definition io.hpp:262
void write_at_large(MPI_File fh, const u8 *buf, size_t len, u64 file_head_ptr)
Writes a large byte buffer at a given offset in a file using MPI.
Definition io.hpp:181
void read_at_large(MPI_File fh, u8 *buf, size_t len, u64 file_head_ptr)
Reads a large byte buffer at a given offset in a file using MPI.
Definition io.hpp:224
void info_ln(std::string module_name, Types... var2)
Prints a log message with multiple arguments followed by a newline.
Definition logs.hpp:133
void err_ln(std::string module_name, Types... var2)
Prints a log message with multiple arguments followed by a newline.
Definition logs.hpp:133
This file contains the definition for the stacktrace related functionality.
shambase::details::BasicStackEntry StackEntry
Alias for shambase::details::BasicStackEntry.