-
Notifications
You must be signed in to change notification settings - Fork 5
/
index.js
81 lines (66 loc) · 1.9 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
'use strict';
var PluginError = require('plugin-error');
var through = require('through2');
var Stream = require('stream');
var utils = require('util');
var Readable = Stream.Readable;
var Duplex = Stream.Duplex;
var Transform = Stream.Transform;
module.exports = function (func) {
if (!func || typeof func != 'function') {
throw new PluginError('gulp-flatMap', '`flatMap` must be called with one parameter, a function');
}
var openStreams = [];
var ended = false;
var error = false;
function closeStreamIfNoMoreOpenStreams(stream){
if(openStreams.length == 0){
if(ended && !error){
stream.push(null);
}
}else{
}
}
return through.obj(function(data, enc, done){
if (data.isStream()) {
this.emit('error', new PluginError('gulp-flatMap', 'Streaming not supported'));
return;
}
var self = this;
var notYetRead = true;
var readStream = new Readable({objectMode: true});
readStream._read = function(){
if(notYetRead){
notYetRead = false;
readStream.push(data);
}else{
readStream.push(null);
}
};
var resultStream = func(readStream, data);
if(resultStream
&& typeof resultStream === 'object'
&& 'on' in resultStream
&& typeof resultStream.on === 'function'){
openStreams.push(resultStream);
resultStream.on('end', function(){
openStreams.splice(openStreams.indexOf(resultStream), 1);
closeStreamIfNoMoreOpenStreams(self);
done();
});
resultStream.on('data', function(result){
self.push(result);
});
resultStream.on('error', function(error){
console.error("error!");
done(error);
});
}else{
this.emit('error', new PluginError('gulp-flatMap', 'The function must return a stream'));
return;
}
}, function(){
ended = true;
closeStreamIfNoMoreOpenStreams(this);
});
};