Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 7c6f74d

Browse files
committedNov 13, 2024·
Merge branch 'parallel-read-torus-from-exo' into 'master'
Parallel torus read See merge request etphipp/genten!53
2 parents a63dca2 + 5c19500 commit 7c6f74d

File tree

7 files changed

+225
-50
lines changed

7 files changed

+225
-50
lines changed
 
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# example usage:
2+
# decomp --processors [num_procs] --subdir exo_files exo_files/test_larger_kappa_perp.exo
3+
# mpiexec -n [num_procs] python3 decompose_torus.py exo_files/test_larger_kappa_perp.exo z
4+
5+
import os
6+
import sys
7+
import exodus3 as ex
8+
import numpy as np
9+
import pygenten as gt
10+
import pyttb
11+
sys.path.append('..')
12+
import torus_to_tensor
13+
14+
if __name__ == "__main__":
15+
assert len(sys.argv) >= 3, "usage: decompose_torus.py <exodus_base_name> <axis_of_rotation>"
16+
num_procs = gt.num_procs()
17+
base_filename = sys.argv[1]
18+
axis = sys.argv[2]
19+
tol = 1.0e-10
20+
if len(sys.argv) >= 4:
21+
tol = np.double(sys.argv[3])
22+
23+
num_procs_per_poloidal_plane = num_procs
24+
np_tensor, global_blocking, parallel_map = torus_to_tensor.torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane, tol)
25+
shape = np_tensor.shape
26+
print("Dimensions on proc " + str(gt.proc_rank()) + ":")
27+
print(" Num toroidal: "+str(shape[0]))
28+
print(" Num poloidal: "+str(shape[1]))
29+
print(" Num variables: "+str(shape[2]))
30+
print(" Num times: "+str(shape[3]))
31+
32+
ttb_tensor = pyttb.tensor.from_data(np_tensor)
33+
gt_tensor = gt.make_gt_tensor(ttb_tensor)
34+
35+
gt_dist_tensor, gt_dtc = gt.distribute_tensor(gt_tensor, global_blocking, parallel_map)
36+
del(ttb_tensor)
37+
del(gt_tensor)
38+
u,perf = gt.cp_als(gt_dist_tensor, dtc=gt_dtc, rank=16, maxiters=200, tol=1e-4, seed=12345, dist_guess_method="parallel")
39+
del(u)
40+
del(perf)
41+
del(gt_dist_tensor)
42+
43+
44+
45+

‎python/physics_utils/test_torus_to_tensor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
import sys
55
import exodus3 as ex
66
import numpy as np
7-
import _pygenten as gt
7+
import pygenten as gt
88
import _phys_utils as pu
9-
sys.path.append('..')
109
import torus_to_tensor
1110
import unittest
1211

‎python/physics_utils/torus_to_tensor.py

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@
2424
import sys
2525
import exodus3 as ex
2626
import numpy as np
27-
import _pygenten as gt
28-
import _phys_utils as pu
27+
import pygenten as gt
28+
import pygenten._phys_utils as pu
2929

3030
def torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane,tol=1.0e-10):
3131
# error check the processor decomposition
3232
num_procs = gt.num_procs()
33-
if num_procs%num_procs_per_poloidal_plane > 0:
34-
raise Exception('Invalid num_procs_per_poloidal_plane (' + str(num_procs_per_poloidal_plane) + '): must divide num MPI ranks (' + str(num_procs) + ') evenly')
33+
if num_procs_per_poloidal_plane > 0:
34+
if num_procs%num_procs_per_poloidal_plane > 0:
35+
raise Exception('Invalid num_procs_per_poloidal_plane (' + str(num_procs_per_poloidal_plane) + '): must divide num MPI ranks (' + str(num_procs) + ') evenly')
3536

3637
# get filename for this proc
3738
rank = gt.proc_rank()
@@ -74,19 +75,21 @@ def torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane,tol=1.0e-1
7475
total_ref_nodes = pu.global_go_sum(num_ref_nodes)
7576
total_nodes = pu.global_go_sum(num_nodes)
7677
total_thetas = len(unique_thetas)
78+
num_theta_procs = -1
7779
if total_nodes != total_ref_nodes*total_thetas:
7880
msg = 'Toroidal decomposition failure: total_nodes != total_ref_nodes*total_thetas'
7981
msg = msg + ' (' + str(total_nodes) + ' != ' + str(total_ref_nodes) + '*' + str(total_thetas) + ')'
8082
raise Exception(msg)
81-
if num_procs_per_poloidal_plane > total_ref_nodes:
82-
msg = 'Invalid num_procs_per_poloidal_plane (' + str(num_procs_per_poloidal_plane) + '): '
83-
msg = msg + 'must be less than number of nodes per poloidal plane (' + str(total_ref_nodes) +')'
84-
raise Exception(msg)
85-
num_procs_per_theta = num_procs//num_procs_per_poloidal_plane
86-
if num_procs_per_theta > total_thetas:
87-
msg = 'Invalid num_procs_per_theta (num_procs/num_procs_per_poloidal_plane = ' + str(num_procs_per_theta) + '): '
88-
msg = msg + 'must be less than number of poloidal planes in the mesh (' + str(total_thetas) +')'
89-
raise Exception(msg)
83+
if num_procs_per_poloidal_plane:
84+
if num_procs_per_poloidal_plane > total_ref_nodes:
85+
msg = 'Invalid num_procs_per_poloidal_plane (' + str(num_procs_per_poloidal_plane) + '): '
86+
msg = msg + 'must be less than number of nodes per poloidal plane (' + str(total_ref_nodes) +')'
87+
raise Exception(msg)
88+
num_theta_procs = num_procs//num_procs_per_poloidal_plane
89+
if num_theta_procs > total_thetas:
90+
msg = 'Invalid num_theta_procs (num_procs/num_procs_per_poloidal_plane = ' + str(num_theta_procs) + '): '
91+
msg = msg + 'must be less than number of poloidal planes in the mesh (' + str(total_thetas) +')'
92+
raise Exception(msg)
9093

9194
# get the associated gids and r and a values
9295
ref_gids = np.zeros(num_ref_nodes, dtype=np.longlong)
@@ -124,11 +127,20 @@ def torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane,tol=1.0e-1
124127
cids[i] = tids[i]*total_ref_nodes + rids[i]
125128

126129
# determine which cids should be on which procs based on the user defined decomposition
127-
redistributed_cids = distribute_composite_ids_across_procs(num_procs_per_theta,total_thetas,num_procs_per_poloidal_plane,total_ref_nodes)
130+
redistributed_cids = -1*np.ones(0, dtype=np.longlong)
131+
global_blocking = []
132+
parallel_map = []
133+
if num_procs_per_poloidal_plane > 0:
134+
redistributed_cids, global_blocking = distribute_composite_ids_across_procs(num_theta_procs,total_thetas,num_procs_per_poloidal_plane,total_ref_nodes)
135+
else:
136+
redistributed_cids = distribute_composite_ids_to_root(num_theta_procs,total_thetas,num_procs_per_poloidal_plane,total_ref_nodes)
128137
redistributed_num_nodes = len(redistributed_cids)
129138

130139
# use cids to redistribute data across procs
131140
redistributed_node_data = pu.redistribute_data_across_procs(cids,node_data,redistributed_cids)
141+
tensor = np.zeros((0,0,0,0), dtype=np.double)
142+
if len(redistributed_node_data) == 0:
143+
return tensor
132144

133145
# back out tids and rids from redistributed cids
134146
redistributed_tids = -1*np.ones(redistributed_num_nodes, dtype=np.longlong)
@@ -149,7 +161,18 @@ def torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane,tol=1.0e-1
149161
for v in range(num_vars):
150162
for t in range(num_times):
151163
tensor[redistributed_tids[i]-start_tid, redistributed_rids[i]-start_rid, v, t] = redistributed_node_data[i,t*num_vars+v]
152-
return tensor
164+
165+
# add var and time dimensions to global blocking
166+
if num_procs_per_poloidal_plane > 0:
167+
vt_blocking = np.zeros((2,num_procs+1),dtype=np.longlong)
168+
vt_blocking[0,1] = num_vars
169+
vt_blocking[1,1] = num_times
170+
global_blocking = np.vstack([global_blocking,vt_blocking])
171+
parallel_map = np.ones((4),dtype=np.longlong)
172+
parallel_map[0] = num_theta_procs
173+
parallel_map[1] = num_procs_per_poloidal_plane
174+
175+
return tensor, global_blocking, parallel_map
153176

154177
def get_cylindrical_coordinates(x, y, z, axis, tol):
155178
"""
@@ -326,6 +349,32 @@ def distribute_composite_ids_across_procs(num_procs_x,total_x,num_procs_y,total_
326349
for i in range(num_target_xids):
327350
for j in range(num_target_yids):
328351
target_cids[i*num_target_yids+j] = (start_xid+i)*total_y + (start_yid+j)
352+
353+
# create the global blocking array needed for dist tensor context
354+
global_blocking = np.zeros((2, num_procs+1), dtype=np.longlong)
355+
for i in range(num_procs_x):
356+
global_blocking[0,i+1] = global_blocking[0,i] + total_x//num_procs_x
357+
if i < total_x%num_procs_x:
358+
global_blocking[0,i+1] += 1
359+
for i in range(num_procs_y):
360+
global_blocking[1,i+1] = global_blocking[1,i] + total_y//num_procs_y
361+
if i < total_y%num_procs_y:
362+
global_blocking[1,i+1] += 1
363+
364+
return target_cids, global_blocking
365+
366+
def distribute_composite_ids_to_root(num_procs_x,total_x,num_procs_y,total_y):
367+
"""
368+
Given how many procs to divide a number of ids in two coordinate directions into,
369+
distribute composite ids on each processor that fit this decomposition
370+
"""
371+
372+
# assign ranks in each coordinate direction
373+
target_cids = -1*np.ones(0, dtype=np.longlong)
374+
rank = gt.proc_rank()
375+
if rank == 0:
376+
total_cids = total_x*total_y
377+
target_cids = np.arange(0, total_cids-1, 1, dtype=np.longlong)
329378
return target_cids
330379

331380
if __name__ == "__main__":
@@ -340,7 +389,7 @@ def distribute_composite_ids_across_procs(num_procs_x,total_x,num_procs_y,total_
340389
tol = 1.0e-10
341390
if len(sys.argv) >= 5:
342391
tol = np.double(sys.argv[4])
343-
tensor = torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane, tol)
392+
tensor, global_blocking = torus_to_tensor(base_filename, axis, num_procs_per_poloidal_plane, tol)
344393
gt.finalizeGenten()
345394

346395

‎python/pygenten/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pygenten._pygenten as gt
66
import json
7+
import numpy
78

89
def make_algparams(args):
910
"""
@@ -73,13 +74,13 @@ def read_and_distribute_tensor(filename, file_type=None, format=None, shape=None
7374
return Xs,dtc
7475
return Xd,dtc
7576

76-
def distribute_tensor(X, **kwargs):
77+
def distribute_tensor(X, global_blocking=numpy.empty(0), parallel_map=numpy.empty(0), **kwargs):
7778
"""
7879
Distribute the given tensor in parallel and return the result.
7980
8081
Also returns the distributed tensor context.
8182
"""
8283
a,rem = make_algparams(kwargs)
8384
dtc = gt.DistTensorContext()
84-
XX = dtc.distributeTensor(X, a)
85+
XX = dtc.distributeTensor(X, a, global_blocking, parallel_map)
8586
return XX,dtc

‎python/src/Genten_Pybind11_classes.cpp

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ void pygenten_tensor(py::module &m){
585585
Constructor that returns an empty tensor.)");
586586
cl.def(py::init([](const py::buffer& b, const bool copy=true) {
587587
// Initialize a Genten::Tensor from a numpy array
588-
py::buffer_info info = b.request();
588+
py::buffer_info info = b.request();
589589

590590
if (info.format != py::format_descriptor<ttb_real>::format())
591591
throw std::runtime_error("Incompatible format: expected a ttb_real array!");
@@ -983,11 +983,43 @@ void pygenten_sptensor(py::module &m){
983983
}, R"(
984984
Distribute a given sparse tensor in parallel and return the distributed
985985
tensor.)");
986-
cl.def("distributeTensor", [](Genten::DTC& dtc, const Genten::Tensor& X, const Genten::AlgParams& algParams) {
987-
return dtc.distributeTensor(X, algParams);
986+
cl.def("distributeTensor", [](Genten::DTC& dtc,
987+
const Genten::Tensor& X,
988+
const Genten::AlgParams& algParams,
989+
const py::array_t<ttb_indx>& global_blocking_in,
990+
const py::array_t<ttb_indx>& parallel_map_in) {
991+
if(global_blocking_in.size() == 0)
992+
{
993+
// the tensor hasn't already been distributed
994+
return dtc.distributeTensor(X, algParams);
995+
}
996+
else
997+
{
998+
// the tensor is already distributed and we can pass its global blocking after translating it into a vector of vectors
999+
using Genten::small_vector;
1000+
std::vector<small_vector<ttb_indx>> global_blocking;
1001+
for(py::ssize_t i = 0; i < global_blocking_in.shape(0); i++)
1002+
{
1003+
small_vector<ttb_indx> variable_blocking;
1004+
variable_blocking.push_back(global_blocking_in.at(i,0));
1005+
for(py::ssize_t j = 1; j < global_blocking_in.shape(1); j++)
1006+
if(global_blocking_in.at(i,j) > 0)
1007+
variable_blocking.push_back(global_blocking_in.at(i,j));
1008+
global_blocking.push_back(variable_blocking);
1009+
}
1010+
small_vector<ttb_indx> parallel_map(parallel_map_in.shape(0));
1011+
for(py::ssize_t i = 0; i < parallel_map_in.shape(0); i++)
1012+
{
1013+
parallel_map[i] = parallel_map_in.at(i);
1014+
}
1015+
return dtc.distributeTensor(X, global_blocking, parallel_map, algParams);
1016+
}
9881017
}, R"(
9891018
Distribute a given dense tensor in parallel and return the distributed
990-
tensor.)");
1019+
tensor.)", py::arg("X"),
1020+
py::arg("algParams"),
1021+
py::arg("global_blocking_in") = py::array_t<ttb_indx>(0),
1022+
py::arg("parallel_map_in") = py::array_t<ttb_indx>(0));
9911023
cl.def("importToRoot", [](const Genten::DTC& dtc, const Genten::Sptensor& u) {
9921024
return dtc.importToRoot<Genten::DefaultHostExecutionSpace>(u);
9931025
}, R"(

‎src/Genten_DistTensorContext.cpp

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ distributeTensorImpl(const Sptensor& X, const AlgParams& algParams)
607607
template <typename ExecSpace>
608608
TensorT<ExecSpace>
609609
DistTensorContext<ExecSpace>::
610-
distributeTensorImpl(const Tensor& X, const AlgParams& algParams)
610+
distributeTensorImpl(const Tensor& X, const AlgParams& algParams, const std::vector<small_vector<ttb_indx>>& global_blocking, const small_vector<ttb_indx>& parallel_map)
611611
{
612612
ttb_indx ndims = X.ndims();
613613
auto layout = X.getLayout();
@@ -633,6 +633,10 @@ distributeTensorImpl(const Tensor& X, const AlgParams& algParams)
633633
ndims = max_ndims;
634634
#endif
635635

636+
std::vector<ttb_real> Tvec;
637+
ttb_indx nnz;
638+
ttb_indx offset = 0;
639+
636640
// Check if we have already distributed a tensor, in which case this one
637641
// needs to be of the same size
638642
if (global_dims_.size() > 0) {
@@ -650,44 +654,61 @@ distributeTensorImpl(const Tensor& X, const AlgParams& algParams)
650654
global_dims_[i] = X.ndims() > 0 ? X.size(i) : 0;
651655

652656
#ifdef HAVE_DIST
653-
std::vector<ttb_indx> max_global_dims = global_dims_;
654-
MPI_Allreduce(MPI_IN_PLACE, max_global_dims.data(), ndims,
655-
DistContext::toMpiType<ttb_indx>(), MPI_MAX,
656-
DistContext::commWorld());
657-
if (X.ndims() > 0 && max_global_dims != global_dims_)
658-
Genten::error("Tensor dimensions are not consistent across processors!");
659-
global_dims_ = max_global_dims;
657+
if(global_blocking.size() == 0) {
658+
std::vector<ttb_indx> max_global_dims = global_dims_;
659+
MPI_Allreduce(MPI_IN_PLACE, max_global_dims.data(), ndims,
660+
DistContext::toMpiType<ttb_indx>(), MPI_MAX,
661+
DistContext::commWorld());
662+
if (X.ndims() > 0 && max_global_dims != global_dims_)
663+
Genten::error("Tensor dimensions are not consistent across processors!");
664+
global_dims_ = max_global_dims;
665+
}
660666
#endif
661667

662-
if (algParams.proc_grid.size() > 0) {
663-
gt_assert(algParams.proc_grid.size() == ndims);
664-
small_vector<ttb_indx> grid(ndims);
665-
for (ttb_indx i=0; i<ndims; ++i)
666-
grid[i] = algParams.proc_grid[i];
668+
if(global_blocking.size() > 0) {
669+
global_blocking_ = global_blocking;
667670
pmap_ = std::shared_ptr<ProcessorMap>(new ProcessorMap(global_dims_,
668-
grid,
671+
parallel_map,
669672
dist_method));
673+
DistContext::Barrier();
674+
nnz = pmap_->gridAllReduce(X.nnz(), ProcessorMap::Sum);
675+
Tvec = std::vector<ttb_real>(X.getValues().size());
676+
for(ttb_indx i = 0; i < X.getValues().size(); i++)
677+
Tvec[i] = X.getValues()[i];
670678
}
671679
else
672-
pmap_ = std::shared_ptr<ProcessorMap>(new ProcessorMap(global_dims_,
673-
dist_method));
680+
{
681+
if (algParams.proc_grid.size() > 0) {
682+
gt_assert(algParams.proc_grid.size() == ndims);
683+
small_vector<ttb_indx> grid(ndims);
684+
for (ttb_indx i=0; i<ndims; ++i)
685+
grid[i] = algParams.proc_grid[i];
686+
pmap_ = std::shared_ptr<ProcessorMap>(new ProcessorMap(global_dims_,
687+
grid,
688+
dist_method));
689+
}
690+
else
691+
pmap_ = std::shared_ptr<ProcessorMap>(new ProcessorMap(global_dims_,
692+
dist_method));
674693

675-
detail::printGrids(*pmap_);
694+
global_blocking_ =
695+
detail::generateUniformBlocking(global_dims_, pmap_->gridDims());
676696

677-
global_blocking_ =
678-
detail::generateUniformBlocking(global_dims_, pmap_->gridDims());
697+
DistContext::Barrier();
698+
699+
nnz = pmap_->gridAllReduce(X.nnz(), ProcessorMap::Max);
700+
Tvec = detail::distributeTensorToVectorsDense(
701+
X, nnz, pmap_->gridComm(), pmap_->gridRank(), pmap_->gridSize(), offset);
702+
}
703+
detail::printGrids(*pmap_);
679704

680705
detail::printBlocking(*pmap_, global_blocking_);
681706
DistContext::Barrier();
682707
}
683708

684-
ttb_indx nnz = pmap_->gridAllReduce(X.nnz(), ProcessorMap::Max);
685-
ttb_indx offset = 0;
686-
auto Tvec = detail::distributeTensorToVectorsDense(
687-
X, nnz, pmap_->gridComm(), pmap_->gridRank(), pmap_->gridSize(), offset);
688-
709+
const bool redistribute_needed = global_blocking.size() == 0;
689710
return distributeTensorData(Tvec, nnz, offset, global_dims_, global_blocking_,
690-
layout, *pmap_, algParams);
711+
layout, *pmap_, algParams, redistribute_needed);
691712
}
692713

693714
template <typename ExecSpace>

0 commit comments

Comments
 (0)
Please sign in to comment.