GCC Code Coverage Report
Directory: ../src/ Exec Total Coverage
File: /home/node-core-coverage/node-core-coverage/workdir/node/src/stream_base.cc Lines: 187 209 89.5 %
Date: 2016-07-12 Branches: 107 228 46.9 %

Line Exec Source
1
#include "stream_base.h"
2
#include "stream_base-inl.h"
3
#include "stream_wrap.h"
4
5
#include "node.h"
6
#include "node_buffer.h"
7
#include "env.h"
8
#include "env-inl.h"
9
#include "js_stream.h"
10
#include "string_bytes.h"
11
#include "util.h"
12
#include "util-inl.h"
13
#include "v8.h"
14
15
#include <limits.h>  // INT_MAX
16
17
namespace node {
18
19
using v8::Array;
20
using v8::Context;
21
using v8::FunctionCallbackInfo;
22
using v8::HandleScope;
23
using v8::Integer;
24
using v8::Local;
25
using v8::Number;
26
using v8::Object;
27
using v8::String;
28
using v8::Value;
29
30
template int StreamBase::WriteString<ASCII>(
31
    const FunctionCallbackInfo<Value>& args);
32
template int StreamBase::WriteString<UTF8>(
33
    const FunctionCallbackInfo<Value>& args);
34
template int StreamBase::WriteString<UCS2>(
35
    const FunctionCallbackInfo<Value>& args);
36
template int StreamBase::WriteString<LATIN1>(
37
    const FunctionCallbackInfo<Value>& args);
38
39
40
6640
int StreamBase::ReadStart(const FunctionCallbackInfo<Value>& args) {
41
6640
  return ReadStart();
42
}
43
44
45
222
int StreamBase::ReadStop(const FunctionCallbackInfo<Value>& args) {
46
222
  return ReadStop();
47
}
48
49
50
3052
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
51
3052
  Environment* env = Environment::GetCurrent(args);
52
53
3052
  CHECK(args[0]->IsObject());
54
6104
  Local<Object> req_wrap_obj = args[0].As<Object>();
55
56
  ShutdownWrap* req_wrap = new ShutdownWrap(env,
57
                                            req_wrap_obj,
58
                                            this,
59
6104
                                            AfterShutdown);
60
61
3052
  int err = DoShutdown(req_wrap);
62
3052
  if (err)
63
    delete req_wrap;
64
3052
  return err;
65
}
66
67
68
3046
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
69
3046
  StreamBase* wrap = req_wrap->wrap();
70
3046
  Environment* env = req_wrap->env();
71
72
  // The wrap and request objects should still be there.
73
6092
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
74
75
6092
  HandleScope handle_scope(env->isolate());
76
9138
  Context::Scope context_scope(env->context());
77
78
6092
  Local<Object> req_wrap_obj = req_wrap->object();
79
  Local<Value> argv[3] = {
80
    Integer::New(env->isolate(), status),
81
3046
    wrap->GetObject(),
82
    req_wrap_obj
83
15230
  };
84
85
15230
  if (req_wrap->object()->Has(env->context(),
86
15230
                              env->oncomplete_string()).FromJust()) {
87
6092
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
88
  }
89
90
3046
  delete req_wrap;
91
3046
}
92
93
94
11256
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
95
11256
  Environment* env = Environment::GetCurrent(args);
96
97
11256
  CHECK(args[0]->IsObject());
98
11256
  CHECK(args[1]->IsArray());
99
100
22512
  Local<Object> req_wrap_obj = args[0].As<Object>();
101
22512
  Local<Array> chunks = args[1].As<Array>();
102
103
11256
  size_t count = chunks->Length() >> 1;
104
105
  uv_buf_t bufs_[16];
106
11256
  uv_buf_t* bufs = bufs_;
107
108
  // Determine storage size first
109
11256
  size_t storage_size = 0;
110
56952
  for (size_t i = 0; i < count; i++) {
111
45696
    storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
112
113
45696
    Local<Value> chunk = chunks->Get(i * 2);
114
115
45696
    if (Buffer::HasInstance(chunk))
116
20033
      continue;
117
      // Buffer chunk, no additional storage required
118
119
    // String chunk
120
51326
    Local<String> string = chunk->ToString(env->isolate());
121
51326
    enum encoding encoding = ParseEncoding(env->isolate(),
122
51326
                                           chunks->Get(i * 2 + 1));
123
    size_t chunk_size;
124
26956
    if (encoding == UTF8 && string->Length() > 65535)
125
      chunk_size = StringBytes::Size(env->isolate(), string, encoding);
126
    else
127
25663
      chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
128
129
25663
    storage_size += chunk_size;
130
  }
131
132
11256
  if (storage_size > INT_MAX)
133
    return UV_ENOBUFS;
134
135
11256
  if (arraysize(bufs_) < count)
136
3
    bufs = new uv_buf_t[count];
137
138
  WriteWrap* req_wrap = WriteWrap::New(env,
139
                                       req_wrap_obj,
140
                                       this,
141
                                       AfterWrite,
142
11256
                                       storage_size);
143
144
11256
  uint32_t bytes = 0;
145
11256
  size_t offset = 0;
146
56952
  for (size_t i = 0; i < count; i++) {
147
45696
    Local<Value> chunk = chunks->Get(i * 2);
148
149
    // Write buffer
150
45696
    if (Buffer::HasInstance(chunk)) {
151
20033
      bufs[i].base = Buffer::Data(chunk);
152
20033
      bufs[i].len = Buffer::Length(chunk);
153
20033
      bytes += bufs[i].len;
154
20033
      continue;
155
    }
156
157
    // Write string
158
25663
    offset = ROUND_UP(offset, WriteWrap::kAlignSize);
159
25663
    CHECK_LE(offset, storage_size);
160
25663
    char* str_storage = req_wrap->Extra(offset);
161
25663
    size_t str_size = storage_size - offset;
162
163
51326
    Local<String> string = chunk->ToString(env->isolate());
164
51326
    enum encoding encoding = ParseEncoding(env->isolate(),
165
51326
                                           chunks->Get(i * 2 + 1));
166
51326
    str_size = StringBytes::Write(env->isolate(),
167
                                  str_storage,
168
                                  str_size,
169
                                  string,
170
25663
                                  encoding);
171
25663
    bufs[i].base = str_storage;
172
25663
    bufs[i].len = str_size;
173
25663
    offset += str_size;
174
25663
    bytes += str_size;
175
  }
176
177
11256
  int err = DoWrite(req_wrap, bufs, count, nullptr);
178
179
  // Deallocate space
180
11256
  if (bufs != bufs_)
181
3
    delete[] bufs;
182
183
67536
  req_wrap->object()->Set(env->async(), True(env->isolate()));
184
67536
  req_wrap->object()->Set(env->bytes_string(),
185
11256
                          Number::New(env->isolate(), bytes));
186
11256
  const char* msg = Error();
187
11256
  if (msg != nullptr) {
188
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
189
    ClearError();
190
  }
191
192
11256
  if (err)
193
    req_wrap->Dispose();
194
195
  return err;
196
}
197
198
199
200
201
1092
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
202
1092
  CHECK(args[0]->IsObject());
203
1092
  CHECK(Buffer::HasInstance(args[1]));
204
1092
  Environment* env = Environment::GetCurrent(args);
205
206
2184
  Local<Object> req_wrap_obj = args[0].As<Object>();
207
1092
  const char* data = Buffer::Data(args[1]);
208
1092
  size_t length = Buffer::Length(args[1]);
209
210
  WriteWrap* req_wrap;
211
  uv_buf_t buf;
212
1092
  buf.base = const_cast<char*>(data);
213
1092
  buf.len = length;
214
215
  // Try writing immediately without allocation
216
1092
  uv_buf_t* bufs = &buf;
217
1092
  size_t count = 1;
218
1092
  int err = DoTryWrite(&bufs, &count);
219
1092
  if (err != 0)
220
    goto done;
221
1092
  if (count == 0)
222
    goto done;
223
317
  CHECK_EQ(count, 1);
224
225
  // Allocate, or write rest
226
317
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
227
228
317
  err = DoWrite(req_wrap, bufs, count, nullptr);
229
1585
  req_wrap_obj->Set(env->async(), True(env->isolate()));
230
231
317
  if (err)
232
    req_wrap->Dispose();
233
234
 done:
235
1092
  const char* msg = Error();
236
1092
  if (msg != nullptr) {
237
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
238
    ClearError();
239
  }
240
5460
  req_wrap_obj->Set(env->bytes_string(),
241
1092
                    Integer::NewFromUnsigned(env->isolate(), length));
242
1092
  return err;
243
}
244
245
246
template <enum encoding enc>
247
14912
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
248
14912
  Environment* env = Environment::GetCurrent(args);
249
14912
  CHECK(args[0]->IsObject());
250
29824
  CHECK(args[1]->IsString());
251
252
29824
  Local<Object> req_wrap_obj = args[0].As<Object>();
253
29824
  Local<String> string = args[1].As<String>();
254
14912
  Local<Object> send_handle_obj;
255
14912
  if (args[2]->IsObject())
256
    send_handle_obj = args[2].As<Object>();
257
258
  int err;
259
260
  // Compute the size of the storage that the string will be flattened into.
261
  // For UTF8 strings that are very long, go ahead and take the hit for
262
  // computing their actual size, rather than tripling the storage.
263
  size_t storage_size;
264
14005
  if (enc == UTF8 && string->Length() > 65535)
265
5
    storage_size = StringBytes::Size(env->isolate(), string, enc);
266
  else
267
14907
    storage_size = StringBytes::StorageSize(env->isolate(), string, enc);
268
269
14912
  if (storage_size > INT_MAX)
270
    return UV_ENOBUFS;
271
272
  // Try writing immediately if write size isn't too big
273
  WriteWrap* req_wrap;
274
  char* data;
275
  char stack_storage[16384];  // 16kb
276
  size_t data_size;
277
  uv_buf_t buf;
278
279
  bool try_write = storage_size <= sizeof(stack_storage) &&
280
14912
                   (!IsIPCPipe() || send_handle_obj.IsEmpty());
281
14912
  if (try_write) {
282
13707
    data_size = StringBytes::Write(env->isolate(),
283
                                   stack_storage,
284
                                   storage_size,
285
                                   string,
286
                                   enc);
287
13707
    buf = uv_buf_init(stack_storage, data_size);
288
289
13707
    uv_buf_t* bufs = &buf;
290
13707
    size_t count = 1;
291
13707
    err = DoTryWrite(&bufs, &count);
292
293
    // Failure
294
13707
    if (err != 0)
295
      goto done;
296
297
    // Success
298
13707
    if (count == 0)
299
      goto done;
300
301
    // Partial write
302
221
    CHECK_EQ(count, 1);
303
  }
304
305
1426
  req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
306
307
1426
  data = req_wrap->Extra();
308
309
1426
  if (try_write) {
310
    // Copy partial data
311
442
    memcpy(data, buf.base, buf.len);
312
221
    data_size = buf.len;
313
  } else {
314
    // Write it
315
1205
    data_size = StringBytes::Write(env->isolate(),
316
                                   data,
317
                                   storage_size,
318
                                   string,
319
                                   enc);
320
  }
321
322
1426
  CHECK_LE(data_size, storage_size);
323
324
1426
  buf = uv_buf_init(data, data_size);
325
326
1426
  if (!IsIPCPipe()) {
327
1338
    err = DoWrite(req_wrap, &buf, 1, nullptr);
328
  } else {
329
88
    uv_handle_t* send_handle = nullptr;
330
331
88
    if (!send_handle_obj.IsEmpty()) {
332
      HandleWrap* wrap;
333
84
      ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
334
84
      send_handle = wrap->GetHandle();
335
      // Reference StreamWrap instance to prevent it from being garbage
336
      // collected before `AfterWrite` is called.
337
168
      CHECK_EQ(false, req_wrap->persistent().IsEmpty());
338
336
      req_wrap->object()->Set(env->handle_string(), send_handle_obj);
339
    }
340
341
88
    err = DoWrite(
342
        req_wrap,
343
        &buf,
344
        1,
345
        reinterpret_cast<uv_stream_t*>(send_handle));
346
  }
347
348
8556
  req_wrap->object()->Set(env->async(), True(env->isolate()));
349
350
1426
  if (err)
351
    req_wrap->Dispose();
352
353
 done:
354
14912
  const char* msg = Error();
355
14912
  if (msg != nullptr) {
356
    req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
357
    ClearError();
358
  }
359
59648
  req_wrap_obj->Set(env->bytes_string(),
360
                    Integer::NewFromUnsigned(env->isolate(), data_size));
361
14912
  return err;
362
}
363
364
365
12994
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
366
12994
  StreamBase* wrap = req_wrap->wrap();
367
12994
  Environment* env = req_wrap->env();
368
369
25984
  HandleScope handle_scope(env->isolate());
370
38978
  Context::Scope context_scope(env->context());
371
372
  // The wrap and request objects should still be there.
373
25988
  CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
374
375
  // Unref handle property
376
25988
  Local<Object> req_wrap_obj = req_wrap->object();
377
51976
  req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
378
25988
  wrap->OnAfterWrite(req_wrap);
379
380
  Local<Value> argv[] = {
381
    Integer::New(env->isolate(), status),
382
12994
    wrap->GetObject(),
383
    req_wrap_obj,
384
    Undefined(env->isolate())
385
90958
  };
386
387
12994
  const char* msg = wrap->Error();
388
12994
  if (msg != nullptr) {
389
    argv[3] = OneByteString(env->isolate(), msg);
390
    wrap->ClearError();
391
  }
392
393
64970
  if (req_wrap->object()->Has(env->context(),
394
64970
                              env->oncomplete_string()).FromJust()) {
395
25988
    req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
396
  }
397
398
12990
  req_wrap->Dispose();
399
12990
}
400
401
402
17138
void StreamBase::EmitData(ssize_t nread,
403
                          Local<Object> buf,
404
                          Local<Object> handle) {
405
17138
  Environment* env = env_;
406
407
  Local<Value> argv[] = {
408
    Integer::New(env->isolate(), nread),
409
    buf,
410
    handle
411
68552
  };
412
413
17138
  if (argv[1].IsEmpty())
414
6957
    argv[1] = Undefined(env->isolate());
415
416
17138
  if (argv[2].IsEmpty())
417
51237
    argv[2] = Undefined(env->isolate());
418
419
17138
  AsyncWrap* async = GetAsyncWrap();
420
17138
  if (async == nullptr) {
421
    node::MakeCallback(env,
422
                       GetObject(),
423
                       env->onread_string(),
424
                       arraysize(argv),
425
                       argv);
426
  } else {
427
34276
    async->MakeCallback(env->onread_string(), arraysize(argv), argv);
428
  }
429
17082
}
430
431
432
4
bool StreamBase::IsIPCPipe() {
433
4
  return false;
434
}
435
436
437
int StreamBase::GetFD() {
438
  return -1;
439
}
440
441
442
AsyncWrap* StreamBase::GetAsyncWrap() {
443
  return nullptr;
444
}
445
446
447
16040
Local<Object> StreamBase::GetObject() {
448
32080
  return GetAsyncWrap()->object();
449
}
450
451
452
523
int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
453
  // No TryWrite by default
454
523
  return 0;
455
}
456
457
458
38581
const char* StreamResource::Error() const {
459
38581
  return nullptr;
460
}
461
462
463
void StreamResource::ClearError() {
464
  // No-op
465
}
466
467
}  // namespace node