GCC Code Coverage Report
Directory: ../src/ Exec Total Coverage
File: /home/node-core-coverage/node-core-coverage/workdir/node/out/../src/stream_base.cc Lines: 184 205 89.8 %
Date: 2016-12-18 Branches: 102 222 45.9 %

Line Branch Exec Source
1
#include "stream_base.h"
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "env.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <limits.h>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::Context;
21
using v8::FunctionCallbackInfo;
22
using v8::HandleScope;
23
using v8::Integer;
24
using v8::Local;
25
using v8::Number;
26
using v8::Object;
27
using v8::String;
28
using v8::Value;
29
30
template int StreamBase::WriteString<ASCII>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UTF8>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<UCS2>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<LATIN1>(
37
    const FunctionCallbackInfo<Value>& args);
38
39
40
6781
int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
41
6781
  return ReadStart();
42
}
43
44
45
246
int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
46
246
  return ReadStop();
47
}
48
49
50
2998
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
51
2998
  Environment* env = Environment::GetCurrent(args);
52
53
2998
  CHECK(args[0]->IsObject());
54
5996
  Local<Object> req_wrap_obj = args[0].As<Object>();
55
56
  ShutdownWrap* req_wrap = new ShutdownWrap(env,
57
                                            req_wrap_obj,
58
                                            this,
59
5996
                                            AfterShutdown);
60
61
2998
  int err = DoShutdown(req_wrap);
62
2998
  if (err)
63
    delete req_wrap;
64
2998
  return err;
65
}
66
67
68
2992
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
69
2992
  StreamBase* wrap = req_wrap->wrap();
70
2992
  Environment* env = req_wrap->env();
71
72
  // The wrap and request objects should still be there.
73
5984
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
74
75
5984
  HandleScope handle_scope(env->isolate());
76
8976
  Context::Scope context_scope(env->context());
77
78
5984
  Local<Object> req_wrap_obj = req_wrap->object();
79
  Local<Value> argv[3] = {
80
    Integer::New(env->isolate(), status),
81
2992
    wrap->GetObject(),
82
    req_wrap_obj
83
14960
  };
84
85
14960
  if (req_wrap->object()->Has(env->context(),
86
14960
                              env->oncomplete_string()).FromJust()) {
87
5984
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
88
  }
89
90
2992
  delete req_wrap;
91
2992
}
92
93
94
11446
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
95
11446
  Environment* env = Environment::GetCurrent(args);
96
97
11446
  CHECK(args[0]->IsObject());
98
11446
  CHECK(args[1]->IsArray());
99
100
22892
  Local<Object> req_wrap_obj = args[0].As<Object>();
101
22892
  Local<Array> chunks = args[1].As<Array>();
102
103
11446
  size_t count = chunks->Length() >> 1;
104
105
22892
  MaybeStackBuffer<uv_buf_t, 16> bufs(count);
106
107
  // Determine storage size first
108
11446
  size_t storage_size = 0;
109
58131
  for (size_t i = 0; i < count; i++) {
110
46685
    storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
111
112
46685
    Local<Value> chunk = chunks->Get(i * 2);
113
114
46685
    if (Buffer::HasInstance(chunk))
115
20712
      continue;
116
      // Buffer chunk, no additional storage required
117
118
    // String chunk
119
51946
    Local<String> string = chunk->ToString(env->isolate());
120
51946
    enum encoding encoding = ParseEncoding(env->isolate(),
121
51946
                                           chunks->Get(i * 2 + 1));
122
    size_t chunk_size;
123

27267
    if (encoding == UTF8 && string->Length() > 65535)
124
      chunk_size = StringBytes::Size(env->isolate(), string, encoding);
125
    else
126
25973
      chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
127
128
25973
    storage_size += chunk_size;
129
  }
130
131
11446
  if (storage_size > INT_MAX)
132
    return UV_ENOBUFS;
133
134
  WriteWrap* req_wrap = WriteWrap::New(env,
135
                                       req_wrap_obj,
136
                                       this,
137
                                       AfterWrite,
138
11446
                                       storage_size);
139
140
11446
  uint32_t bytes = 0;
141
11446
  size_t offset = 0;
142
58131
  for (size_t i = 0; i < count; i++) {
143
46685
    Local<Value> chunk = chunks->Get(i * 2);
144
145
    // Write buffer
146
46685
    if (Buffer::HasInstance(chunk)) {
147
20712
      bufs[i].base = Buffer::Data(chunk);
148
20712
      bufs[i].len = Buffer::Length(chunk);
149
20712
      bytes += bufs[i].len;
150
20712
      continue;
151
    }
152
153
    // Write string
154
25973
    offset = ROUND_UP(offset, WriteWrap::kAlignSize);
155
25973
    CHECK_LE(offset, storage_size);
156
25973
    char* str_storage = req_wrap->Extra(offset);
157
25973
    size_t str_size = storage_size - offset;
158
159
51946
    Local<String> string = chunk->ToString(env->isolate());
160
51946
    enum encoding encoding = ParseEncoding(env->isolate(),
161
51946
                                           chunks->Get(i * 2 + 1));
162
51946
    str_size = StringBytes::Write(env->isolate(),
163
                                  str_storage,
164
                                  str_size,
165
                                  string,
166
25973
                                  encoding);
167
25973
    bufs[i].base = str_storage;
168
25973
    bufs[i].len = str_size;
169
25973
    offset += str_size;
170
25973
    bytes += str_size;
171
  }
172
173
11446
  int err = DoWrite(req_wrap, *bufs, count, nullptr);
174
175
68676
  req_wrap->object()->Set(env->async(), True(env->isolate()));
176
68676
  req_wrap->object()->Set(env->bytes_string(),
177
11446
                          Number::New(env->isolate(), bytes));
178
11446
  const char* msg = Error();
179
11446
  if (msg != nullptr) {
180
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
181
    ClearError();
182
  }
183
184
11446
  if (err)
185
    req_wrap->Dispose();
186
187
  return err;
188
}
189
190
191
192
193
1345
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
194
1345
  CHECK(args[0]->IsObject());
195
1345
  CHECK(Buffer::HasInstance(args[1]));
196
1345
  Environment* env = Environment::GetCurrent(args);
197
198
2690
  Local<Object> req_wrap_obj = args[0].As<Object>();
199
1345
  const char* data = Buffer::Data(args[1]);
200
1345
  size_t length = Buffer::Length(args[1]);
201
202
  WriteWrap* req_wrap;
203
  uv_buf_t buf;
204
1345
  buf.base = const_cast<char*>(data);
205
1345
  buf.len = length;
206
207
  // Try writing immediately without allocation
208
1345
  uv_buf_t* bufs = &buf;
209
1345
  size_t count = 1;
210
1345
  int err = DoTryWrite(&bufs, &count);
211
1345
  if (err != 0)
212
    goto done;
213
1345
  if (count == 0)
214
    goto done;
215
393
  CHECK_EQ(count, 1);
216
217
  // Allocate, or write rest
218
393
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
219
220
393
  err = DoWrite(req_wrap, bufs, count, nullptr);
221
1965
  req_wrap_obj->Set(env->async(), True(env->isolate()));
222
1179
  req_wrap_obj->Set(env->buffer_string(), args[1]);
223
224
393
  if (err)
225
    req_wrap->Dispose();
226
227
 done:
228
1345
  const char* msg = Error();
229
1345
  if (msg != nullptr) {
230
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
231
    ClearError();
232
  }
233
6725
  req_wrap_obj->Set(env->bytes_string(),
234
1345
                    Integer::NewFromUnsigned(env->isolate(), length));
235
1345
  return err;
236
}
237
238
239
template <enum encoding enc>
240
14073
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
241
14073
  Environment* env = Environment::GetCurrent(args);
242


14073
  CHECK(args[0]->IsObject());
243


28146
  CHECK(args[1]->IsString());
244
245
28146
  Local<Object> req_wrap_obj = args[0].As<Object>();
246
28146
  Local<String> string = args[1].As<String>();
247
14073
  Local<Object> send_handle_obj;
248


14073
  if (args[2]->IsObject())
249
    send_handle_obj = args[2].As<Object>();
250
251
  int err;
252
253
  // Compute the size of the storage that the string will be flattened into.
254
  // For UTF8 strings that are very long, go ahead and take the hit for
255
  // computing their actual size, rather than tripling the storage.
256
  size_t storage_size;
257
13146
  if (enc == UTF8 && string->Length() > 65535)
258
5
    storage_size = StringBytes::Size(env->isolate(), string, enc);
259
  else
260
14068
    storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
261
262


14073
  if (storage_size > INT_MAX)
263
    return UV_ENOBUFS;
264
265
  // Try writing immediately if write size isn't too big
266
  WriteWrap* req_wrap;
267
  char* data;
268
  char stack_storage[16384];  // 16kb
269
  size_t data_size;
270
  uv_buf_t buf;
271
272
  bool try_write = storage_size <= sizeof(stack_storage) &&
273






14073
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
274


14073
  if (try_write) {
275
12868
    data_size = StringBytes::Write(env->isolate(),
276
                                   stack_storage,
277
                                   storage_size,
278
                                   string,
279
                                   enc);
280
12868
    buf = uv_buf_init(stack_storage, data_size);
281
282
12868
    uv_buf_t* bufs = &buf;
283
12868
    size_t count = 1;
284
12868
    err = DoTryWrite(&bufs, &count);
285
286
    // Failure
287


12868
    if (err != 0)
288
      goto done;
289
290
    // Success
291


12868
    if (count == 0)
292
      goto done;
293
294
    // Partial write
295


228
    CHECK_EQ(count, 1);
296
  }
297
298
1433
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
299
300
1433
  data = req_wrap->Extra();
301
302


1433
  if (try_write) {
303
    // Copy partial data
304
456
    memcpy(data, buf.base, buf.len);
305
228
    data_size = buf.len;
306
  } else {
307
    // Write it
308
1205
    data_size = StringBytes::Write(env->isolate(),
309
                                   data,
310
                                   storage_size,
311
                                   string,
312
                                   enc);
313
  }
314
315


1433
  CHECK_LE(data_size, storage_size);
316
317
1433
  buf = uv_buf_init(data, data_size);
318
319


1433
  if (!IsIPCPipe()) {
320
1347
    err = DoWrite(req_wrap, &buf, 1, nullptr);
321
  } else {
322
86
    uv_handle_t* send_handle = nullptr;
323
324


86
    if (!send_handle_obj.IsEmpty()) {
325
      HandleWrap* wrap;
326


83
      ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
327
83
      send_handle = wrap->GetHandle();
328
      // Reference StreamWrap instance to prevent it from being garbage
329
      // collected before `AfterWrite` is called.
330


166
      CHECK_EQ(false, req_wrap->persistent().IsEmpty());
331
332
      req_wrap->object()->Set(env->handle_string(), send_handle_obj);
332
    }
333
334
86
    err = DoWrite(
335
        req_wrap,
336
        &buf,
337
        1,
338
        reinterpret_cast<uv_stream_t*>(send_handle));
339
  }
340
341
8598
  req_wrap->object()->Set(env->async(), True(env->isolate()));
342
343


1433
  if (err)
344
    req_wrap->Dispose();
345
346
 done:
347
14073
  const char* msg = Error();
348


14073
  if (msg != nullptr) {
349
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
350
    ClearError();
351
  }
352
56292
  req_wrap_obj->Set(env->bytes_string(),
353
                    Integer::NewFromUnsigned(env->isolate(), data_size));
354
14073
  return err;
355
}
356
357
358
13266
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
359
13266
  StreamBase* wrap = req_wrap->wrap();
360
13266
  Environment* env = req_wrap->env();
361
362
26526
  HandleScope handle_scope(env->isolate());
363
39792
  Context::Scope context_scope(env->context());
364
365
  // The wrap and request objects should still be there.
366
26532
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
367
368
  // Unref handle property
369
26532
  Local<Object> req_wrap_obj = req_wrap->object();
370
53064
  req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
371
26532
  wrap->OnAfterWrite(req_wrap);
372
373
  Local<Value> argv[] = {
374
    Integer::New(env->isolate(), status),
375
13266
    wrap->GetObject(),
376
    req_wrap_obj,
377
    Undefined(env->isolate())
378
92862
  };
379
380
13266
  const char* msg = wrap->Error();
381
13266
  if (msg != nullptr) {
382
    argv[3] = OneByteString(env->isolate(), msg);
383
    wrap->ClearError();
384
  }
385
386
66330
  if (req_wrap->object()->Has(env->context(),
387
66330
                              env->oncomplete_string()).FromJust()) {
388
26532
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
389
  }
390
391
13260
  req_wrap->Dispose();
392
13260
}
393
394
395
20832
void StreamBase::EmitData(ssize_t nread,
396
                          Local<Object> buf,
397
                          Local<Object> handle) {
398
20832
  Environment* env = env_;
399
400
  Local<Value> argv[] = {
401
    Integer::New(env->isolate(), nread),
402
    buf,
403
    handle
404
83328
  };
405
406
20832
  if (argv[1].IsEmpty())
407
7140
    argv[1] = Undefined(env->isolate());
408
409
20832
  if (argv[2].IsEmpty())
410
62085
    argv[2] = Undefined(env->isolate());
411
412
20832
  AsyncWrap* async = GetAsyncWrap();
413
20832
  if (async == nullptr) {
414
    node::MakeCallback(env,
415
                       GetObject(),
416
                       env->onread_string(),
417
                       arraysize(argv),
418
                       argv);
419
  } else {
420
41664
    async->MakeCallback(env->onread_string(), arraysize(argv), argv);
421
  }
422
20775
}
423
424
425
4
bool StreamBase::IsIPCPipe() {
426
4
  return false;
427
}
428
429
430
int StreamBase::GetFD() {
431
  return -1;
432
}
433
434
435
AsyncWrap* StreamBase::GetAsyncWrap() {
436
  return nullptr;
437
}
438
439
440
16258
Local<Object> StreamBase::GetObject() {
441
32516
  return GetAsyncWrap()->object();
442
}
443
444
445
555
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
446
  // No TryWrite by default
447
555
  return 0;
448
}
449
450
451
38318
const char* StreamResource::Error() const {
452
38318
  return nullptr;
453
}
454
455
456
void StreamResource::ClearError() {
457
  // No-op
458
}
459
460
}  // namespace node