-
Notifications
You must be signed in to change notification settings - Fork 0
/
model_parallel.py
206 lines (162 loc) · 6.27 KB
/
model_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import torch
import torch.nn as nn
from preprocess import get_pretrained_model, AverageMeter
class ModelParallelVGG16_4GPU(nn.Module):
'''
Split VGG16 layers across 4 GPU devices
'''
def __init__(self, debugging=False):
super(ModelParallelVGG16_4GPU, self).__init__()
model = get_pretrained_model('vgg16')
self.overhead = AverageMeter()
self.debugging = debugging
self.seq1 = model.features[:len(model.features)//2].to('cuda:0')
self.seq2 = model.features[len(model.features)//2:].to('cuda:1')
self.avgpool = model.avgpool.to('cuda:2')
self.flatten = nn.Flatten().to('cuda:2')
self.classifier = model.classifier.to('cuda:3')
def get_overhead(self):
return self.overhead.avg
def reset_overhead(self):
self.overhead.reset()
def forward(self, x):
if self.debugging:
return self.forward_debug(x)
else:
return self.forward_normal(x)
def forward_normal(self, x):
x = x.to('cuda:0')
out = self.seq1(x)
out = out.to('cuda:1')
out = self.seq2(out)
out = out.to('cuda:2')
out = self.avgpool(out)
out = self.flatten(out)
out = out.to('cuda:3')
out = self.classifier(out)
return out
def forward_debug(self, x):
# Initialize CUDA Event Listeners to measure time taken for the data transfer to specific GPUs
time_accumulator = 0.0
start_time = torch.cuda.Event(enable_timing=True)
end_time = torch.cuda.Event(enable_timing=True)
# When doing CUDA operations, all operations get added to a stream and are executed in order.
# CUDA operations are asynchronous by default so they don't wait for previous ones to finish before the next one starts however which can mess up the time measurements for specific operations that we are interested in.
# Synchronize makes sure all the gpu operations in the stream are first completed.
# Ignore first one because this would be done even for the base case without Model Parallelism
x = x.to('cuda:0')
out = self.seq1(x)
start_time.record()
out = out.to('cuda:1')
end_time.record()
torch.cuda.synchronize()
time_accumulator += start_time.elapsed_time(end_time)
out = self.seq2(out)
start_time.record()
out = out.to('cuda:2')
end_time.record()
torch.cuda.synchronize()
time_accumulator += start_time.elapsed_time(end_time)
out = self.avgpool(out)
out = self.flatten(out)
start_time.record()
out = out.to('cuda:3')
end_time.record()
torch.cuda.synchronize()
time_accumulator += start_time.elapsed_time(end_time)
self.overhead.update(time_accumulator)
return None
class ModelParallelVGG16_3GPU(nn.Module):
'''
Split VGG16 layers across 3 GPU devices
'''
def __init__(self, debugging=False):
super(ModelParallelVGG16_3GPU, self).__init__()
model = get_pretrained_model('vgg16')
self.overhead = AverageMeter()
self.debugging = debugging
self.seq1 = model.features.to('cuda:0')
self.avgpool = model.avgpool.to('cuda:1')
self.flatten = nn.Flatten().to('cuda:1')
self.classifier = model.classifier.to('cuda:2')
def get_overhead(self):
return self.overhead.avg
def reset_overhead(self):
self.overhead.reset()
def forward(self, x):
if self.debugging:
return self.forward_debug(x)
else:
return self.forward_normal(x)
def forward_normal(self, x):
x = x.to('cuda:0')
out = self.seq1(x)
out = out.to('cuda:1')
out = self.avgpool(out)
out = self.flatten(out)
out = out.to('cuda:2')
out = self.classifier(out)
return out
def forward_debug(self,x):
time_accumulator = 0.0
start_time = torch.cuda.Event(enable_timing=True)
end_time = torch.cuda.Event(enable_timing=True)
x = x.to('cuda:0')
out = self.seq1(x)
start_time.record()
out = out.to('cuda:1')
end_time.record()
torch.cuda.synchronize()
time_accumulator += start_time.elapsed_time(end_time)
out = self.avgpool(out)
out = self.flatten(out)
start_time.record()
out = out.to('cuda:2')
end_time.record()
torch.cuda.synchronize()
time_accumulator += start_time.elapsed_time(end_time)
self.overhead.update(time_accumulator)
return None
class ModelParallelVGG16_2GPU(nn.Module):
'''
Split VGG16 layers across 2 GPU devices
'''
def __init__(self, debugging=False):
super(ModelParallelVGG16_2GPU, self).__init__()
model = get_pretrained_model('vgg16')
self.overhead = AverageMeter()
self.debugging = debugging
self.seq1 = model.features.to('cuda:0')
self.avgpool = model.avgpool.to('cuda:1')
self.flatten = nn.Flatten().to('cuda:1')
self.classifier = model.classifier.to('cuda:1')
def get_overhead(self):
return self.overhead.avg
def reset_overhead(self):
self.overhead.reset()
def forward(self, x):
if self.debugging:
return self.forward_debug(x)
else:
return self.forward_normal(x)
def forward_normal(self, x):
x = x.to('cuda:0')
out = self.seq1(x)
out = out.to('cuda:1')
out = self.avgpool(out)
out = self.flatten(out)
out = self.classifier(out)
return out
def forward_debug(self,x):
time_accumulator = 0.0
start_time = torch.cuda.Event(enable_timing=True)
end_time = torch.cuda.Event(enable_timing=True)
x = x.to('cuda:0')
out = self.seq1(x)
start_time.record()
out = out.to('cuda:1')
end_time.record()
torch.cuda.synchronize()
time_accumulator += start_time.elapsed_time(end_time)
self.overhead.update(time_accumulator)
return None