-
Notifications
You must be signed in to change notification settings - Fork 1
/
handler.js
212 lines (187 loc) · 5.9 KB
/
handler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
'use strict';
var AWS = require("aws-sdk");
var unmarshalItem = require("dynamodb-marshaler").unmarshalItem;
var unmarshal = require("dynamodb-marshaler").unmarshal;
var Papa = require("papaparse");
var fs = require("fs");
var crypto = require('crypto');
var headers = [];
var unMarshalledArray = [];
var rowCount = 0;
var writeCount = 0;
var writeChunk = process.env.writeChunk ? process.env.writeChunk : 5000;
var dynamoDB = new AWS.DynamoDB();
const s3 = new AWS.S3();
function unifyInAction (inAction) {
switch(inAction) {
case 'describe':
return 'describe';
case 'dump':
return 'dump';
default:
console.log("inAction value is invalid, using default dump");
return 'dump';
}
};
function random (howMany, chars) {
chars = chars
|| 'abcdefghijklmnopqrstuwxyzABCDEFGHIJKLMNOPQRSTUWXYZ0123456789';
var rnd = crypto.randomBytes(howMany)
, value = new Array(howMany)
, len = Math.min(256, chars.length)
, d = 256 / len
for (var i = 0; i < howMany; i++) {
value[i] = chars[Math.floor(rnd[i] / d)]
};
return value.join('');
};
var describeTable = function (query, inTableName, event, callback) {
console.log('Do describeTable');
var params = {
TableName: inTableName
};
dynamoDB.describeTable(params,
function (err, data) {
if (err) {
console.log(err, err.stack);
callback(err);
} else {
const response = {
statusCode: 200,
body: {
message: 'Go Serverless v1.0! Your function executed successfully!',
input: event,
output: data
},
};
callback(null, response);
}
}
);
};
var scanDynamoDB = function (query, stream, s3Bucket, s3Key, event, callback) {
console.log('Do scanDynamoDB');
dynamoDB.scan(query, function (err, data) {
if (!err) {
unMarshalIntoArray(data.Items); // Print out the subset of results.
if (data.LastEvaluatedKey) {
// Result is incomplete; there is more to come.
query.ExclusiveStartKey = data.LastEvaluatedKey;
if (rowCount >= writeChunk) {
// once the designated number of items has been read, write out to stream.
console.log('Do scanDynamoDB rowCount ', rowCount, ' ,writeCount ', writeCount);
unparseData(data.LastEvaluatedKey, stream, s3Bucket, s3Key, event, callback);
}
scanDynamoDB(query, stream, s3Bucket, s3Key, event, callback);
} else {
console.log('Do scanDynamoDB rowCount ', rowCount, ' ,writeCount ', writeCount);
unparseData("File Written", stream, s3Bucket, s3Key, event, callback);
}
} else {
console.error(err);
}
});
};
var unparseData = function (lastEvaluatedKey, stream, s3Bucket, s3Key, event, callback) {
var endData = Papa.unparse({
fields: [...headers],
data: unMarshalledArray
});
if (writeCount > 0) {
// remove column names after first write chunk.
endData = endData.replace(/(.*\r\n)/, "");;
}
writeData(endData, stream, s3Bucket, s3Key, event, callback);
// Print last evaluated key so process can be continued after stop.
console.log(lastEvaluatedKey);
// reset write array. saves memory
unMarshalledArray = [];
writeCount += rowCount;
console.log('Do unparseData rowCount ', rowCount, ' ,writeCount ', writeCount);
rowCount = 0;
};
var writeData = function (data, stream, s3Bucket, s3Key, event, callback) {
stream.write(data, function(err){
if (err) {
console.log(err, err.stack);
callback(err);
}else{
let params = {
Bucket: s3Bucket,
Key: s3Key,
Body: data
};
s3.putObject(params,
function(err) {
if (err) {
console.log(err, err.stack);
callback(err);
} else {
const response = {
statusCode: 200,
body: {
message: 'Upload the DynamoDB dump to S3. Your function executed successfully!',
input: event
},
};
callback(null, response);
}
}
);
}
});
};
function unMarshalIntoArray(items) {
if (items.length === 0) return;
items.forEach(function (row) {
let newRow = {};
//console.log( 'Row: ' + JSON.stringify( row ));
Object.keys(row).forEach(function (key) {
if (headers.indexOf(key.trim()) === -1) {
console.log( 'putting new key ' + key.trim() + ' into headers ' + headers.toString());
headers.push(key.trim());
}
let newValue = unmarshal(row[key]);
if (typeof newValue === "object") {
newRow[key] = JSON.stringify(newValue);
} else {
newRow[key] = newValue;
}
});
//console.log( newRow );
unMarshalledArray.push(newRow);
rowCount++;
});
};
module.exports.hello = function(event, context, callback) {
const options = {
tablename: event.tablename,
s3bucket: event.s3bucket,
filename: event.filename,
action: event.action,
}
var inTableName = options.tablename ? options.tablename : process.env.inTableName;
if (!inTableName) {
console.log("You must specify a table");
process.exit(1);
}
var inAction = options.action ? options.action : process.env.inAction;
var doAction = unifyInAction(inAction);
var outFileName = options.filename ? options.filename : process.env.outFileName;
// if there is a target file, open a write stream
if (doAction == 'dump' && !outFileName) {
outFileName = random(32);
}
var tempfile = '/tmp/' + outFileName;
var stream = fs.createWriteStream(tempfile, { flags: 'a' });
var outS3Bucket = options.s3bucket ? options.s3bucket : process.env.outS3Bucket;
var query = {
TableName: inTableName,
Limit: 1000
};
if (doAction == 'describe') {
describeTable(query, inTableName, event, callback);
}else {
scanDynamoDB(query, stream, outS3Bucket, outFileName, event, callback);
}
};