GCC Code Coverage Report
Directory: ../src/ Exec Total Coverage
File: /home/node-core-coverage/node-core-coverage/workdir/node/src/stream_base.h Lines: 41 44 93.2 %
Date: 2016-07-23 Branches: 19 32 59.4 %

Line Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async-wrap.h"
8
#include "req-wrap.h"
9
#include "req-wrap-inl.h"
10
#include "node.h"
11
12
#include "v8.h"
13
14
namespace node {
15
16
// Forward declarations
17
class StreamBase;
18
19
template <class Req>
20
class StreamReq {
21
 public:
22
  typedef void (*DoneCb)(Req* req, int status);
23
24
18205
  explicit StreamReq(DoneCb cb) : cb_(cb) {
25
  }
26
27
18178
  inline void Done(int status, const char* error_str = nullptr) {
28
18178
    Req* req = static_cast<Req*>(this);
29
18178
    Environment* env = req->env();
30
18178
    if (error_str != nullptr) {
31
15
      req->object()->Set(env->error_string(),
32
                         OneByteString(env->isolate(), error_str));
33
    }
34
35
18178
    cb_(req, status);
36
18174
  }
37
38
 private:
39
  DoneCb cb_;
40
};
41
42
3050
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
43
                     public StreamReq<ShutdownWrap> {
44
 public:
45
  ShutdownWrap(Environment* env,
46
               v8::Local<v8::Object> req_wrap_obj,
47
               StreamBase* wrap,
48
               DoneCb cb)
49
      : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
50
        StreamReq<ShutdownWrap>(cb),
51
6112
        wrap_(wrap) {
52
3056
    Wrap(req_wrap_obj, this);
53
  }
54
55
3056
  static void NewShutdownWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
56
3056
    CHECK(args.IsConstructCall());
57
3056
  }
58
59
  inline StreamBase* wrap() const { return wrap_; }
60
  size_t self_size() const override { return sizeof(*this); }
61
62
 private:
63
  StreamBase* const wrap_;
64
};
65
66
15137
class WriteWrap: public ReqWrap<uv_write_t>,
67
                 public StreamReq<WriteWrap> {
68
 public:
69
  static inline WriteWrap* New(Environment* env,
70
                               v8::Local<v8::Object> obj,
71
                               StreamBase* wrap,
72
                               DoneCb cb,
73
                               size_t extra = 0);
74
  inline void Dispose();
75
  inline char* Extra(size_t offset = 0);
76
77
  inline StreamBase* wrap() const { return wrap_; }
78
79
  size_t self_size() const override { return storage_size_; }
80
81
29012
  static void NewWriteWrap(const v8::FunctionCallbackInfo<v8::Value>& args) {
82
29012
    CHECK(args.IsConstructCall());
83
29012
  }
84
85
  static const size_t kAlignSize = 16;
86
87
 protected:
88
  WriteWrap(Environment* env,
89
            v8::Local<v8::Object> obj,
90
            StreamBase* wrap,
91
            DoneCb cb,
92
            size_t storage_size)
93
      : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
94
        StreamReq<WriteWrap>(cb),
95
        wrap_(wrap),
96
30298
        storage_size_(storage_size) {
97
15149
    Wrap(obj, this);
98
  }
99
100
  void* operator new(size_t size) = delete;
101
  void* operator new(size_t size, char* storage) { return storage; }
102
103
  // This is just to keep the compiler happy. It should never be called, since
104
  // we don't use exceptions in node.
105
  void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
106
107
 private:
108
  // People should not be using the non-placement new and delete operator on a
109
  // WriteWrap. Ensure this never happens.
110
  void operator delete(void* ptr) { UNREACHABLE(); }
111
112
  StreamBase* const wrap_;
113
  const size_t storage_size_;
114
};
115
116
class StreamResource {
117
 public:
118
  template <class T>
119
  struct Callback {
120
28394
    Callback() : fn(nullptr), ctx(nullptr) {}
121
    Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
122
    Callback(const Callback&) = default;
123
124
    inline bool is_empty() { return fn == nullptr; }
125
    inline void clear() {
126
5046
      fn = nullptr;
127
5046
      ctx = nullptr;
128
    }
129
130
    T fn;
131
    void* ctx;
132
  };
133
134
  typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
135
  typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
136
  typedef void (*ReadCb)(ssize_t nread,
137
                         const uv_buf_t* buf,
138
                         uv_handle_type pending,
139
                         void* ctx);
140
141
32992
  StreamResource() : bytes_read_(0) {
142
  }
143
7214
  virtual ~StreamResource() = default;
144
145
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
146
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
147
  virtual int DoWrite(WriteWrap* w,
148
                      uv_buf_t* bufs,
149
                      size_t count,
150
                      uv_stream_t* send_handle) = 0;
151
  virtual const char* Error() const;
152
  virtual void ClearError();
153
154
  // Events
155
  inline void OnAfterWrite(WriteWrap* w) {
156
13015
    if (!after_write_cb_.is_empty())
157
12174
      after_write_cb_.fn(w, after_write_cb_.ctx);
158
  }
159
160
  inline void OnAlloc(size_t size, uv_buf_t* buf) {
161
22238
    if (!alloc_cb_.is_empty())
162
22238
      alloc_cb_.fn(size, buf, alloc_cb_.ctx);
163
  }
164
165
  inline void OnRead(ssize_t nread,
166
                     const uv_buf_t* buf,
167
                     uv_handle_type pending = UV_UNKNOWN_HANDLE) {
168
22564
    if (nread > 0)
169
20247
      bytes_read_ += static_cast<uint64_t>(nread);
170
22769
    if (!read_cb_.is_empty())
171
22769
      read_cb_.fn(nread, buf, pending, read_cb_.ctx);
172
  }
173
174
  inline void set_after_write_cb(Callback<AfterWriteCb> c) {
175
8240
    after_write_cb_ = c;
176
  }
177
178
12380
  inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
179
12380
  inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
180
181
  inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
182
  inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
183
  inline Callback<ReadCb> read_cb() { return read_cb_; }
184
185
 private:
186
  Callback<AfterWriteCb> after_write_cb_;
187
  Callback<AllocCb> alloc_cb_;
188
  Callback<ReadCb> read_cb_;
189
  uint64_t bytes_read_;
190
191
  friend class StreamBase;
192
};
193
194
class StreamBase : public StreamResource {
195
 public:
196
  enum Flags {
197
    kFlagNone = 0x0,
198
    kFlagHasWritev = 0x1,
199
    kFlagNoShutdown = 0x2
200
  };
201
202
  template <class Base>
203
  static inline void AddMethods(Environment* env,
204
                                v8::Local<v8::FunctionTemplate> target,
205
                                int flags = kFlagNone);
206
207
  virtual void* Cast() = 0;
208
  virtual bool IsAlive() = 0;
209
  virtual bool IsClosing() = 0;
210
  virtual bool IsIPCPipe();
211
  virtual int GetFD();
212
213
  virtual int ReadStart() = 0;
214
  virtual int ReadStop() = 0;
215
216
  inline void Consume() {
217
3126
    CHECK_EQ(consumed_, false);
218
3126
    consumed_ = true;
219
  }
220
221
  template <class Outer>
222
2124
  inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
223
224
  void EmitData(ssize_t nread,
225
                v8::Local<v8::Object> buf,
226
                v8::Local<v8::Object> handle);
227
228
 protected:
229
16496
  explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
230
  }
231
232
14428
  virtual ~StreamBase() = default;
233
234
  // One of these must be implemented
235
  virtual AsyncWrap* GetAsyncWrap();
236
  virtual v8::Local<v8::Object> GetObject();
237
238
  // Libuv callbacks
239
  static void AfterShutdown(ShutdownWrap* req, int status);
240
  static void AfterWrite(WriteWrap* req, int status);
241
242
  // JS Methods
243
  int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
244
  int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
245
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
246
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
247
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
248
  template <enum encoding enc>
249
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
250
251
  template <class Base>
252
  static void GetFD(v8::Local<v8::String> key,
253
                    const v8::PropertyCallbackInfo<v8::Value>& args);
254
255
  template <class Base>
256
  static void GetExternal(v8::Local<v8::String> key,
257
                          const v8::PropertyCallbackInfo<v8::Value>& args);
258
259
  template <class Base>
260
  static void GetBytesRead(v8::Local<v8::String> key,
261
                           const v8::PropertyCallbackInfo<v8::Value>& args);
262
263
  template <class Base,
264
            int (StreamBase::*Method)(
265
      const v8::FunctionCallbackInfo<v8::Value>& args)>
266
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
267
268
 private:
269
  Environment* env_;
270
  bool consumed_;
271
};
272
273
}  // namespace node
274
275
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
276
277
#endif  // SRC_STREAM_BASE_H_