Distributed Ranges
Loading...
Searching...
No Matches
distributed_vector.hpp
1// SPDX-FileCopyrightText: Intel Corporation
2//
3// SPDX-License-Identifier: BSD-3-Clause
4
5#pragma once
6
7#include <dr/mp/algorithms/fill.hpp>
8#include <dr/mp/allocator.hpp>
9#include <dr/mp/containers/distribution.hpp>
10#include <dr/mp/containers/segment.hpp>
11
12namespace dr::mp {
13
15 dr::rma_window win_;
16
17public:
18 void *allocate(std::size_t data_size) {
19 assert(data_size > 0);
20 void *data = __detail::allocator<std::byte>().allocate(data_size);
21 DRLOG("called MPI allocate({}) -> got:{}", data_size, data);
22 win_.create(default_comm(), data, data_size);
23 active_wins().insert(win_.mpi_win());
24 return data;
25 }
26
27 void deallocate(void *data, std::size_t data_size) {
28 assert(data_size > 0);
29 DRLOG("calling MPI deallocate ({}, data_size:{})", data, data_size);
30 active_wins().erase(win_.mpi_win());
31 win_.free();
32 __detail::allocator<std::byte>().deallocate(static_cast<std::byte *>(data),
33 data_size);
34 }
35
36 void getmem(void *dst, std::size_t offset, std::size_t datalen,
37 int segment_index) {
38 DRLOG("calling MPI get(dst:{}, "
39 "segm_offset:{}, size:{}, peer:{})",
40 dst, offset, datalen, segment_index);
41
42#if (MPI_VERSION >= 4) || \
43 (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
44 // 64-bit API inside
45 win_.get(dst, datalen, segment_index, offset);
46#else
47 for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
48 std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
49 DRLOG("{}:{} win_.get total {} now {} bytes at off {}, dst offset {}",
50 default_comm().rank(), __LINE__, datalen, s, off, offset + off);
51 win_.get((uint8_t *)dst + off, s, segment_index, offset + off);
52 off += s;
53 remainder -= s;
54 }
55#endif
56 }
57
58 void putmem(void const *src, std::size_t offset, std::size_t datalen,
59 int segment_index) {
60 DRLOG("calling MPI put(segm_offset:{}, "
61 "src:{}, size:{}, peer:{})",
62 offset, src, datalen, segment_index);
63
64#if (MPI_VERSION >= 4) || \
65 (defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
66 if (mp::use_sycl()) {
67 // 32-bit API inside for sycl based buffers
68 for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
69 std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
70 DRLOG("{}:{} win_.put {} bytes at off {}, dst offset {}",
71 default_comm().rank(), __LINE__, s, off, offset + off);
72 win_.put((uint8_t *)src + off, s, segment_index, offset + off);
73 off += s;
74 remainder -= s;
75 }
76 } else {
77 // 64-bit API inside
78 win_.put(src, datalen, segment_index, offset);
79 }
80#else
81 for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
82 std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
83 DRLOG("{}:{} win_.put {} bytes at off {}, dst offset {}",
84 default_comm().rank(), __LINE__, s, off, offset + off);
85 win_.put((uint8_t *)src + off, s, segment_index, offset + off);
86 off += s;
87 remainder -= s;
88 }
89#endif
90 }
91
92 std::size_t getrank() const { return win_.communicator().rank(); }
93
94 void fence() const { win_.fence(); }
95};
96
97#ifdef DRISHMEM
98class IshmemBackend {
99 void *shared_mem_;
100
101public:
102 void *allocate(std::size_t data_size) {
103 assert(data_size > 0);
104 shared_mem_ = ishmem_malloc(data_size);
105 DRLOG("called ishmem_malloc({}) -> got:{}", data_size, shared_mem_);
106 return shared_mem_;
107 }
108
109 void deallocate(void *data, std::size_t data_size) {
110 assert(data_size > 0);
111 assert(data == shared_mem_);
112 drlog.debug("calling ishmem_free({})\n", data);
113 ishmem_free(data);
114 }
115
116 void getmem(void *dst, std::size_t offset, std::size_t datalen,
117 int segment_index) {
118 void *src = static_cast<std::byte *>(shared_mem_) + offset;
119
120 DRLOG("calling ishmem_getmem(dst:{}, src:{} (= dv:{} + "
121 "segm_offset:{}), size:{}, peer:{})",
122 dst, src, shared_mem_, offset, datalen, segment_index);
123
124 ishmem_getmem(dst, src, datalen, segment_index);
125 }
126
127 void putmem(void const *src, std::size_t offset, std::size_t datalen,
128 int segment_index) {
129 void *dst = static_cast<std::byte *>(shared_mem_) + offset;
130 DRLOG("calling ishmem_putmem(dst:{} (= dv:{} + segm_offset:{}), "
131 "src:{}, size:{}, peer:{})",
132 dst, shared_mem_, offset, src, datalen, segment_index);
133 ishmem_putmem(dst, src, datalen, segment_index);
134 }
135
136 std::size_t getrank() const {
137 auto my_process_segment_index = ishmem_my_pe();
138 DRLOG("called ishmem_my_pe() -> {}", my_process_segment_index);
139 return my_process_segment_index;
140 }
141
142 void fence() const {
143 // TODO: to have locality use ishmemx_fence_work_group
144 ishmem_fence();
145 }
146};
147#endif
148
150template <typename T, class BackendT = MpiBackend> class distributed_vector {
151
152public:
153 using value_type = T;
154 using size_type = std::size_t;
155 using difference_type = std::ptrdiff_t;
156 using backend_type = BackendT;
157
158 class iterator {
159 public:
160 using iterator_category = std::random_access_iterator_tag;
161 using value_type = typename distributed_vector::value_type;
162 using difference_type = typename distributed_vector::difference_type;
163
164 iterator() {}
165 iterator(const distributed_vector *parent, difference_type offset)
166 : parent_(parent), offset_(offset) {}
167
168 auto operator+(difference_type n) const {
169 return iterator(parent_, offset_ + n);
170 }
171 friend auto operator+(difference_type n, const iterator &other) {
172 return other + n;
173 }
174 auto operator-(difference_type n) const {
175 return iterator(parent_, offset_ - n);
176 }
177 auto operator-(iterator other) const { return offset_ - other.offset_; }
178
179 auto &operator+=(difference_type n) {
180 offset_ += n;
181 return *this;
182 }
183 auto &operator-=(difference_type n) {
184 offset_ -= n;
185 return *this;
186 }
187 auto &operator++() {
188 offset_++;
189 return *this;
190 }
191 auto operator++(int) {
192 auto old = *this;
193 offset_++;
194 return old;
195 }
196 auto &operator--() {
197 offset_--;
198 return *this;
199 }
200 auto operator--(int) {
201 auto old = *this;
202 offset_--;
203 return old;
204 }
205
206 bool operator==(iterator other) const {
207 if (parent_ == nullptr || other.parent_ == nullptr) {
208 return false;
209 } else {
210 return offset_ == other.offset_;
211 }
212 }
213 auto operator<=>(iterator other) const {
214 assert(parent_ == other.parent_);
215 return offset_ <=> other.offset_;
216 }
217
218 auto operator*() const {
219 auto segment_size = parent_->segment_size_;
220 return parent_
221 ->segments()[offset_ / segment_size][offset_ % segment_size];
222 }
223 auto operator[](difference_type n) const { return *(*this + n); }
224
225 auto local() {
226 auto segment_size = parent_->segment_size_;
227 return (parent_->segments()[offset_ / segment_size].begin() +
228 offset_ % segment_size)
229 .local();
230 }
231
232 //
233 // Support for distributed ranges
234 //
235 // distributed iterator provides segments
236 // remote iterator provides local
237 //
238 auto segments() {
239 return dr::__detail::drop_segments(parent_->segments(), offset_);
240 }
241
242 private:
243 const distributed_vector *parent_ = nullptr;
244 difference_type offset_;
245 };
246
247 // Do not copy
248 // We need a move constructor for the implementation of reduce algorithm
249 distributed_vector(const distributed_vector &) = delete;
250 distributed_vector &operator=(const distributed_vector &) = delete;
251 distributed_vector(distributed_vector &&) { assert(false); }
252
255 init(size, dist);
256 }
257
259 distributed_vector(std::size_t size, value_type fill_value,
260 distribution dist = distribution()) {
261 init(size, dist);
262 mp::fill(*this, fill_value);
263 }
264
266 if (!finalized()) {
267 fence();
268 if (data_ != nullptr) {
269 backend.deallocate(data_, data_size_ * sizeof(value_type));
270 }
271
272 delete halo_;
273 }
274 }
275
277 auto begin() const { return iterator(this, 0); }
279 auto end() const { return begin() + size_; }
280
282 auto size() const { return size_; }
284 auto operator[](difference_type n) const { return *(begin() + n); }
285 auto &halo() const { return *halo_; }
286
287 auto segments() const { return rng::views::all(segments_); }
288
289 void fence() { backend.fence(); }
290
291 auto segment_size() const { return segment_size_; }
292
293 auto get_segment_offset(std::size_t segment_id) const {
294 return segment_id * segment_size_;
295 }
296
297private:
298 void init(auto size, auto dist) {
299 size_ = size;
300 distribution_ = dist;
301
302 // determine the distribution of data
303 auto comm_size = default_comm().size(); // dr-style ignore
304 auto hb = dist.halo();
305 std::size_t gran = dist.granularity();
306 // TODO: make this an error that is reported back to user
307 assert(size % gran == 0 && "size must be a multiple of the granularity");
308 assert(hb.prev % gran == 0 && "size must be a multiple of the granularity");
309 assert(hb.next % gran == 0 && "size must be a multiple of the granularity");
310 segment_size_ = gran * std::max({(size / gran + comm_size - 1) / comm_size,
311 hb.prev / gran, hb.next / gran});
312
313 data_size_ = segment_size_ + hb.prev + hb.next;
314
315 if (size_ > 0) {
316 data_ = static_cast<T *>(backend.allocate(data_size_ * sizeof(T)));
317 }
318
319 halo_ = new span_halo<T>(default_comm(), data_, data_size_, hb);
320
321 std::size_t segment_index = 0;
322 for (std::size_t i = 0; i < size; i += segment_size_) {
323 segments_.emplace_back(this, segment_index++,
324 std::min(segment_size_, size - i), data_size_);
325 }
326
327 fence();
328 }
329
330 friend dv_segment_iterator<distributed_vector>;
331
332 std::size_t segment_size_ = 0;
333 std::size_t data_size_ = 0; // size + halo
334 T *data_ = nullptr;
335 span_halo<T> *halo_;
336
337 distribution distribution_;
338 std::size_t size_;
339 std::vector<dv_segment<distributed_vector>> segments_;
340 BackendT backend;
341};
342
343template <typename T, typename B>
344auto &halo(const distributed_vector<T, B> &dv) {
345 return dv.halo();
346}
347
348} // namespace dr::mp
Definition: distributed_vector.hpp:14
Definition: allocator.hpp:11
Definition: distributed_vector.hpp:158
distributed vector
Definition: distributed_vector.hpp:150
auto size() const
Returns size.
Definition: distributed_vector.hpp:282
auto operator[](difference_type n) const
Returns reference using index.
Definition: distributed_vector.hpp:284
distributed_vector(std::size_t size, value_type fill_value, distribution dist=distribution())
Constructor.
Definition: distributed_vector.hpp:259
auto end() const
Returns iterator to end.
Definition: distributed_vector.hpp:279
auto begin() const
Returns iterator to beginning.
Definition: distributed_vector.hpp:277
distributed_vector(std::size_t size=0, distribution dist=distribution())
Constructor.
Definition: distributed_vector.hpp:254
Definition: communicator.hpp:242
Definition: distribution.hpp:11