Skip to content

Commit

Permalink
swap out and swap in one version of weights at the need time
Browse files Browse the repository at this point in the history
  • Loading branch information
yxyOo committed Jun 8, 2021
1 parent 98a3b2d commit 9e51c51
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 668 deletions.
Binary file modified runtime/__pycache__/communication.cpython-36.pyc
Binary file not shown.
Binary file modified runtime/__pycache__/optimizer.cpython-36.pyc
Binary file not shown.
Binary file modified runtime/__pycache__/runtime.cpython-36.pyc
Binary file not shown.
Binary file modified runtime/__pycache__/threadsafe_queue.cpython-36.pyc
Binary file not shown.
102 changes: 74 additions & 28 deletions runtime/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
GLOO='gloo'



class CommunicationHandler(object):
""" Handles communication between stages.
Expand Down Expand Up @@ -247,10 +248,7 @@ def setup_queues(self):

# Queues for ack for forward pass-only runs as a clocking mechanism.
self.num_ack_threads = 0
with open("/home/mindspore/yxy/pipedream/runtime/image_classification/pipedream-yxy.log","a+") as f:
f.write(str(self.local_rank)+"\n")
f.write("self.tensor_tags: "+str(self.tensor_tags)+"\n")
f.close()


if "ack" in self.tensor_tags:
self.backward_receive_queues["ack"] = []
Expand Down Expand Up @@ -433,17 +431,36 @@ def start_helper_threads(self, num_iterations, forward_only):
for swap_recv_rank_tmp in self.swap_recv_ranks:
for i in range(len(self.swap_recv_ranks[swap_recv_rank_tmp])):
self.start_helper_thread(
self.swap_recv_helper_thread_args,
recv_helper_thread,
[swap_recv_rank_tmp, i, torch.float32,False],
num_iterations_for_recv_threads)
self.swap_recv_helper_thread_args,
recv_helper_thread,
[swap_recv_rank_tmp, i, torch.float32,False],
num_iterations_for_recv_threads)


self.start_helper_thread(
self.swap_send_helper_thread_args,
send_helper_thread,
[swap_recv_rank_tmp, i, True],
num_iterations_for_send_threads)


## start listenning swap stash signal
for i in self.swap_recv_ranks["swap_tensor"]:

with open("/home/mindspore/yxy/pipedream/runtime/image_classification/pipedream-yxy.log","a+") as f:
f.write("testrank: "+str(self.local_rank)+"\n")
f.close()




helper_thread = threading.Thread(target=self.response_swap_stash)
helper_thread.start()





# Start helper threads f
# or ack for forward pass-only run as a clocking
# mechanism.
Expand All @@ -469,7 +486,8 @@ def start_helper_thread(self, args_func, func, args_func_args, num_iterations):
args = args_func(*args_func_args)
helper_thread = threading.Thread(target=func,
args=args)
helper_thread.start()
helper_thread.start()


def create_process_groups(self):
""" Create process groups in the same order across all ranks.
Expand Down Expand Up @@ -760,21 +778,7 @@ def recv(self, tensor_name, forward_minibatch_id,
tensor = tensor.requires_grad_()
return tensor

def swap_out_stash(self, tensor_name, forward_minibatch_id,
backward_minibatch_id, backward=False):
if backward:
index = (backward_minibatch_id + self.rank_in_stage) % \
len(self.backward_receive_queues[tensor_name])
tensor = self.backward_receive_queues[tensor_name][
index].remove()
return tensor
else:
index = 0
tensor = self.swap_recv_queues[tensor_name][
index].remove()
if tensor.dtype == torch.float32:
tensor = tensor.requires_grad_()
return tensor


def send(self, tensor_name, tensor, forward_minibatch_id,
backward_minibatch_id, backward=False):
Expand All @@ -790,13 +794,46 @@ def send(self, tensor_name, tensor, forward_minibatch_id,
def swap_out(self, tensor_name, tensor, forward_minibatch_id,
backward_minibatch_id, backward=False):
if backward:
index = self.get_messaging_index(sending=True)
dst_rank = self.swap_send_ranks[tensor_name][index]
self.swap_send_recv_queues[tensor_name][index].add(tensor)
index = 0
dst_rank = self.swap_recv_ranks[tensor_name][index]
self.swap_recv_send_queues[tensor_name][index].add(tensor)
else:
index = (forward_minibatch_id + self.rank_in_stage) % \
len(self.swap_send_ranks[tensor_name])
self.swap_send_queues[tensor_name][index].add(tensor)
self.swap_send_queues[tensor_name][index].add(tensor)

def swap_in(self, tensor_name):
index = 0
tensor = self.swap_send_recv_queues[tensor_name][
index].remove()
if tensor.dtype == torch.float32:
tensor = tensor.requires_grad_()
return tensor


def response_swap_stash(self):
## yxy:recv signal to response fetch
# self.swap_recv_queues["swap_tensor"][0]


with open("/home/mindspore/yxy/pipedream/runtime/image_classification/pipedream-yxy.log","a+") as f:
f.write("test swap_recv_queues"+ str(self.swap_recv_queues["swap_tensor"][0])+"\n")
f.close()
while self.swap_recv_queues["swap_tensor"][0].len() != len(self.weights_shape[0]):
True
index = 0
for i in self.weights_shape[0]:
tensor = self.swap_recv_queues["swap_tensor"][index].remove()
if tensor.dtype == torch.float32:
tensor = tensor.requires_grad_()
self.swap_out(
"swap_tensor",
tensor,
forward_minibatch_id=0,
backward_minibatch_id=0,
backward=True)



def recv_helper_thread(queue, counter, local_rank, tensor_name,
src_rank, tag, tensor_shape, dtype,
Expand All @@ -812,6 +849,14 @@ def recv_helper_thread(queue, counter, local_rank, tensor_name,
sub_process_group=sub_process_group)
queue.add(tensor)
counter.decrement()









def send_helper_thread(queue, counter, local_rank, tensor_name,
src_rank, dst_rank, tag,
Expand Down Expand Up @@ -865,6 +910,7 @@ def _recv(tensor_name, src_rank, tensor_shape=None, dtype=torch.float32,

# Receive tensor.
tensor = torch.zeros(received_tensor_shape, dtype=dtype)

dist.recv(tensor=tensor,
src=src_rank,
tag=tag)
Expand Down
34 changes: 18 additions & 16 deletions runtime/image_classification/main_with_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def train(train_loader, r, optimizer, epoch):

# swap out v0 para
if args.rank == 0 :
stage_weights=optimizer.swap_weights("v1",args.rank)
stage_weights=optimizer.swap_weights("v1")
if stage_weights is not None:
for layer in stage_weights:
for layer_name in layer.items():
Expand All @@ -407,8 +407,10 @@ def train(train_loader, r, optimizer, epoch):
# f.close()


if args.rank == 3 and i == 10:
r.get_stash_tensor()
# if args.rank == 3 and i == 10:


# r.get_stash_tensor()



Expand Down Expand Up @@ -457,20 +459,20 @@ def train(train_loader, r, optimizer, epoch):
optimizer.zero_grad()


# # swap in v0 para
# swap_once = True
# if args.rank == 0 :
# # test cur verion == v0
# # send signal to swap in
# tensor_swap=optimizer.swap_weights("v1",args.rank)
# if tensor_swap is not None:
# with open("/home/mindspore/yxy/pipedream/runtime/image_classification/pipedream-yxy.log","a+") as f:
# f.write(str(args.rank)+"\n")
# f.write("tensor_swap is cuda: "+str(tensor_swap.is_cuda)+"\n")
# f.close()
# r.swap_out(tensor_swap)


if args.rank == 0 and optimizer.if_swap_in("v1") :
with open("/home/mindspore/yxy/pipedream/runtime/image_classification/pipedream-yxy.log","a+") as f:
f.write("before recover \n")
f.close()


weights_swap_in = r.swap_in()


with open("/home/mindspore/yxy/pipedream/runtime/image_classification/pipedream-yxy.log","a+") as f:
f.write("after recover \n")
f.close()
optimizer.recover_weights(weights_swap_in)

optimizer.load_old_params()
r.run_backward()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]},
"swap_recv": {"0": [], "1": [], "2": [], "3": [0]},
"swap_send": {"0": [3], "1": [], "2": [], "3": []},
"weights_shape": {"0": [[64, 3, 3, 3],[64],[64, 64, 3, 3],[64]], "1": [], "2": [], "3": []}
"weights_shape": {"0": [[64, 3, 3, 3],[64],[64, 64, 3, 3],[64],[1]], "1": [], "2": [], "3": []}
}
Loading

0 comments on commit 9e51c51

Please sign in to comment.