GCC Code Coverage Report
Directory: ../src/ Exec Total Coverage
File: /home/node-core-coverage/node-core-coverage/workdir/node/out/../src/stream_base.h Lines: 43 46 93.5 %
Date: 2016-12-18 Branches: 19 32 59.4 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async-wrap.h"
8
#include "req-wrap.h"
9
#include "req-wrap-inl.h"
10
#include "node.h"
11
#include "util.h"
12
13
#include "v8.h"
14
15
namespace node {
16
17
// Forward declarations
18
class StreamBase;
19
20
template <class Req>
21
class StreamReq {
22
 public:
23
  typedef void (*DoneCb)(Req* req, int status);
24
25
18542
  explicit StreamReq(DoneCb cb) : cb_(cb) {
26
  }
27
28
18514
  inline void Done(int status, const char* error_str = nullptr) {
29
18514
    Req* req = static_cast<Req*>(this);
30
18514
    Environment* env = req->env();
31

18514
    if (error_str != nullptr) {
32
25
      req->object()->Set(env->error_string(),
33
                         OneByteString(env->isolate(), error_str));
34
    }
35
36
18514
    cb_(req, status);
37
18508
  }
38
39
 private:
40
  DoneCb cb_;
41
};
42
43
2992
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
44
                     public StreamReq<ShutdownWrap> {
45
 public:
46
  ShutdownWrap(Environment* env,
47
               v8::Local<v8::Object> req_wrap_obj,
48
               StreamBase* wrap,
49
               DoneCb cb)
50
      : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
51
        StreamReq<ShutdownWrap>(cb),
52
5996
        wrap_(wrap) {
53
2998
    Wrap(req_wrap_obj, this);
54
  }
55
56
2998
  static void NewShutdownWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
57
2998
    CHECK(args.IsConstructCall());
58
2998
  }
59
60
  static ShutdownWrap* from_req(uv_shutdown_t* req) {
61
2990
    return ContainerOf(&ShutdownWrap::req_, req);
62
  }
63
64
  inline StreamBase* wrap() const { return wrap_; }
65
  size_t self_size() const override { return sizeof(*this); }
66
67
 private:
68
  StreamBase* const wrap_;
69
};
70
71
15529
class WriteWrap: public ReqWrap<uv_write_t>,
72
                 public StreamReq<WriteWrap> {
73
 public:
74
  static inline WriteWrap* New(Environment* env,
75
                               v8::Local<v8::Object> obj,
76
                               StreamBase* wrap,
77
                               DoneCb cb,
78
                               size_t extra = 0);
79
  inline void Dispose();
80
  inline char* Extra(size_t offset = 0);
81
82
  inline StreamBase* wrap() const { return wrap_; }
83
84
  size_t self_size() const override { return storage_size_; }
85
86
29135
  static void NewWriteWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
87
29135
    CHECK(args.IsConstructCall());
88
29135
  }
89
90
  static WriteWrap* from_req(uv_write_t* req) {
91
14604
    return ContainerOf(&WriteWrap::req_, req);
92
  }
93
94
  static const size_t kAlignSize = 16;
95
96
 protected:
97
  WriteWrap(Environment* env,
98
            v8::Local<v8::Object> obj,
99
            StreamBase* wrap,
100
            DoneCb cb,
101
            size_t storage_size)
102
      : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
103
        StreamReq<WriteWrap>(cb),
104
        wrap_(wrap),
105
31088
        storage_size_(storage_size) {
106
15544
    Wrap(obj, this);
107
  }
108
109
  void* operator new(size_t size) = delete;
110
  void* operator new(size_t size, char* storage) { return storage; }
111
112
  // This is just to keep the compiler happy. It should never be called, since
113
  // we don't use exceptions in node.
114
  void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
115
116
 private:
117
  // People should not be using the non-placement new and delete operator on a
118
  // WriteWrap. Ensure this never happens.
119
  void operator delete(void* ptr) { UNREACHABLE(); }
120
121
  StreamBase* const wrap_;
122
  const size_t storage_size_;
123
};
124
125
class StreamResource {
126
 public:
127
  template <class T>
128
  struct Callback {
129
28719
    Callback() : fn(nullptr), ctx(nullptr) {}
130
    Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
131
    Callback(const Callback&) = default;
132
133
    inline bool is_empty() { return fn == nullptr; }
134
    inline void clear() {
135
5068
      fn = nullptr;
136
5068
      ctx = nullptr;
137
    }
138
139
    T fn;
140
    void* ctx;
141
  };
142
143
  typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
144
  typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
145
  typedef void (*ReadCb)(ssize_t nread,
146
                         const uv_buf_t* buf,
147
                         uv_handle_type pending,
148
                         void* ctx);
149
150
33364
  StreamResource() : bytes_read_(0) {
151
  }
152
7067
  virtual ~StreamResource() = default;
153
154
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
155
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
156
  virtual int DoWrite(WriteWrap* w,
157
                      uv_buf_t* bufs,
158
                      size_t count,
159
                      uv_stream_t* send_handle) = 0;
160
  virtual const char* Error() const;
161
  virtual void ClearError();
162
163
  // Events
164
  inline void OnAfterWrite(WriteWrap* w) {
165
13277
    if (!after_write_cb_.is_empty())
166
12372
      after_write_cb_.fn(w, after_write_cb_.ctx);
167
  }
168
169
  inline void OnAlloc(size_t size, uv_buf_t* buf) {
170

25819
    if (!alloc_cb_.is_empty())
171
25819
      alloc_cb_.fn(size, buf, alloc_cb_.ctx);
172
  }
173
174
  inline void OnRead(ssize_t nread,
175
                     const uv_buf_t* buf,
176
                     uv_handle_type pending = UV_UNKNOWN_HANDLE) {
177

26175
    if (nread > 0)
178
23794
      bytes_read_ += static_cast<uint64_t>(nread);
179


26397
    if (!read_cb_.is_empty())
180
26397
      read_cb_.fn(nread, buf, pending, read_cb_.ctx);
181
  }
182
183
  inline void set_after_write_cb(Callback<AfterWriteCb> c) {
184
8333
    after_write_cb_ = c;
185
  }
186
187
12521
  inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
188
12521
  inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
189
190
  inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
191
  inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
192
  inline Callback<ReadCb> read_cb() { return read_cb_; }
193
194
 private:
195
  Callback<AfterWriteCb> after_write_cb_;
196
  Callback<AllocCb> alloc_cb_;
197
  Callback<ReadCb> read_cb_;
198
  uint64_t bytes_read_;
199
200
  friend class StreamBase;
201
};
202
203
class StreamBase : public StreamResource {
204
 public:
205
  enum Flags {
206
    kFlagNone = 0x0,
207
    kFlagHasWritev = 0x1,
208
    kFlagNoShutdown = 0x2
209
  };
210
211
  template <class Base>
212
  static inline void AddMethods(Environment* env,
213
                                v8::Local<v8::FunctionTemplate> target,
214
                                int flags = kFlagNone);
215
216
  virtual void* Cast() = 0;
217
  virtual bool IsAlive() = 0;
218
  virtual bool IsClosing() = 0;
219
  virtual bool IsIPCPipe();
220
  virtual int GetFD();
221
222
  virtual int ReadStart() = 0;
223
  virtual int ReadStop() = 0;
224
225
  inline void Consume() {
226
3174
    CHECK_EQ(consumed_, false);
227
3174
    consumed_ = true;
228
  }
229
230
  template <class Outer>
231
2256
  inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
232
233
  void EmitData(ssize_t nread,
234
                v8::Local<v8::Object> buf,
235
                v8::Local<v8::Object> handle);
236
237
 protected:
238
16682
  explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
239
  }
240
241
14134
  virtual ~StreamBase() = default;
242
243
  // One of these must be implemented
244
  virtual AsyncWrap* GetAsyncWrap();
245
  virtual v8::Local<v8::Object> GetObject();
246
247
  // Libuv callbacks
248
  static void AfterShutdown(ShutdownWrap* req, int status);
249
  static void AfterWrite(WriteWrap* req, int status);
250
251
  // JS Methods
252
  int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
253
  int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
254
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
255
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
256
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
257
  template <enum encoding enc>
258
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
259
260
  template <class Base>
261
  static void GetFD(v8::Local<v8::String> key,
262
                    const v8::PropertyCallbackInfo<v8::Value>& args);
263
264
  template <class Base>
265
  static void GetExternal(v8::Local<v8::String> key,
266
                          const v8::PropertyCallbackInfo<v8::Value>& args);
267
268
  template <class Base>
269
  static void GetBytesRead(v8::Local<v8::String> key,
270
                           const v8::PropertyCallbackInfo<v8::Value>& args);
271
272
  template <class Base,
273
            int (StreamBase::*Method)(
274
      const v8::FunctionCallbackInfo<v8::Value>& args)>
275
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
276
277
 private:
278
  Environment* env_;
279
  bool consumed_;
280
};
281
282
}  // namespace node
283
284
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
285
286
#endif  // SRC_STREAM_BASE_H_