fastdo  0.6.16
eiennet_async.hpp
浏览该文件的文档.
1 #pragma once
2 
3 namespace eiennet
4 {
5 class AsyncSocket;
6 
7 // IO类型
8 enum IoType
9 {
16 };
17 
18 // IO场景基类
19 struct IoCtx
20 {
23 
24  IoCtx() : startTime(winux::GetUtcTimeMs()), timeoutMs(-1)
25  {
26  }
27 
28  virtual ~IoCtx()
29  {
30  }
31 };
32 
33 // 接受场景
35 {
36  using OkFunction = std::function< bool ( winux::SharedPointer<AsyncSocket> servSock, winux::SharedPointer<AsyncSocket> clientSock, ip::EndPoint const & ep ) >;
37  using TimeoutFunction = std::function< bool ( winux::SharedPointer<AsyncSocket> servSock, IoAcceptCtx * ctx ) >;
38 
41 
43 
45 };
46 
47 // 连接场景
49 {
50  using OkFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, winux::uint64 costTimeMs ) >;
51  using TimeoutFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, IoConnectCtx * ctx ) >;
52 
55 
57 
58  IoConnectCtx() : costTimeMs(0) { }
59 };
60 
61 // 数据接收场景
62 struct IoRecvCtx : IoCtx
63 {
64  using OkFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, winux::Buffer & data, bool cnnAvail ) >;
65  using TimeoutFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, IoRecvCtx * ctx ) >;
66 
69 
70  size_t hadBytes;
71  size_t targetBytes;
73  bool cnnAvail;
74 
75  IoRecvCtx() : hadBytes(0), targetBytes(0), cnnAvail(false) { }
76 };
77 
78 // 数据发送场景
79 struct IoSendCtx : IoCtx
80 {
81  using OkFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, winux::uint64 costTimeMs, bool cnnAvail ) >;
82  using TimeoutFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, IoSendCtx * ctx ) >;
83 
86 
87  size_t hadBytes;
90  bool cnnAvail;
91 
92  IoSendCtx() : hadBytes(0), costTimeMs(0), cnnAvail(false) { }
93 };
94 
95 // 无连接,数据接收场景
97 {
98  using OkFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, winux::Buffer & data, EndPoint const & ep ) >;
99  using TimeoutFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, IoRecvFromCtx * ctx ) >;
100 
103 
104  size_t hadBytes;
105  size_t targetBytes;
108 
109  IoRecvFromCtx() : hadBytes(0), targetBytes(0) { }
110 };
111 
112 // 无连接,数据发送场景
114 {
115  using OkFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, winux::uint64 costTimeMs ) >;
116  using TimeoutFunction = std::function< void ( winux::SharedPointer<AsyncSocket> sock, IoSendToCtx * ctx ) >;
117 
120 
121  size_t hadBytes;
125 
126  IoSendToCtx() : hadBytes(0), costTimeMs(0) { }
127 };
128 
131 {
132 public:
133  using IoMapMap = std::map< winux::SharedPointer<AsyncSocket>, std::map< IoType, winux::SharedPointer<IoCtx> > >;
134  using IoMap = IoMapMap::mapped_type;
135 
140  IoService( int threadCount = 4, double serverWait = 0.002 );
141 
143  virtual ~IoService();
144 
146  bool init( int threadCount = 4, double serverWait = 0.002 );
147 
149  void stop( bool b = true );
150 
152  void postConnect( winux::SharedPointer<AsyncSocket> sock, EndPoint const & ep, IoConnectCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoConnectCtx::TimeoutFunction cbTimeout = nullptr );
153  void postRecv( winux::SharedPointer<AsyncSocket> sock, size_t targetSize, IoRecvCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoRecvCtx::TimeoutFunction cbTimeout = nullptr );
154  void postSend( winux::SharedPointer<AsyncSocket> sock, void const * data, size_t size, IoSendCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoSendCtx::TimeoutFunction cbTimeout = nullptr );
155  void postRecvFrom( winux::SharedPointer<AsyncSocket> sock, size_t targetSize, IoRecvFromCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoRecvFromCtx::TimeoutFunction cbTimeout = nullptr );
156  void postSendTo( winux::SharedPointer<AsyncSocket> sock, EndPoint const & ep, void const * data, size_t size, IoSendToCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoSendToCtx::TimeoutFunction cbTimeout = nullptr );
157 
159  int run();
160 
162  void getSockIoCount( size_t * sockCount, size_t * ioCount ) const;
163 
165  void removeSock( winux::SharedPointer<AsyncSocket> sock );
166 
167  winux::ThreadPool & getPool() { return _pool; }
168  IoMapMap & getIoMaps() { return _ioMaps; }
169  winux::Mutex & getMutex() { return _mtx; }
170 
171 
172 protected:
175 
176  // IO请求加入Select轮询数组前
177  DEFINE_CUSTOM_EVENT( RunBeforeJoin, (), () )
178  // 完成Select.wait()后
179  DEFINE_CUSTOM_EVENT( RunAfterWait, ( int rc ), (rc) )
180 
181 private:
182  winux::ThreadPool _pool;
183  IoMapMap _ioMaps;
184  winux::RecursiveMutex _mtx;
185  winux::Condition _cdt;
186  double _serverWait;
187  int _threadCount;
188  bool _stop;
189 
190  friend class AsyncSocket;
192 };
193 
195 class EIENNET_DLL AsyncSocket : public Socket, public winux::EnableSharedFromThis<AsyncSocket>
196 {
197 protected:
199  explicit AsyncSocket( IoService & ioServ, int sock = -1, bool isNewSock = false ) : Socket( sock, isNewSock ), _ioServ(&ioServ), _data(nullptr)
200  {
201  this->setBlocking(false);
202  }
203 
205  AsyncSocket( IoService & ioServ, AddrFamily af, SockType sockType, Protocol proto ) : Socket( af, sockType, proto ), _ioServ(&ioServ), _data(nullptr)
206  {
207  this->setBlocking(false);
208  }
209 
210 public:
211  static winux::SharedPointer<AsyncSocket> New( IoService & ioServ, int sock = -1, bool isNewSock = false )
212  {
213  return winux::SharedPointer<AsyncSocket>( new AsyncSocket( ioServ, sock, isNewSock ) );
214  }
215 
216  static winux::SharedPointer<AsyncSocket> New( IoService & ioServ, AddrFamily af, SockType sockType, Protocol proto )
217  {
218  return winux::SharedPointer<AsyncSocket>( new AsyncSocket( ioServ, af, sockType, proto ) );
219  }
220 
222  {
223  int sock;
224  return this->Socket::accept( &sock, ep ) ? winux::SharedPointer<AsyncSocket>( new AsyncSocket( *_ioServ, sock, true ) ) : winux::SharedPointer<AsyncSocket>();
225  }
226 
228  void setDataPtr( void * data ) { _data = data; }
230  void * getDataPtr() const { return _data; }
232  template < typename _Ty >
233  _Ty * getDataPtr() const { return reinterpret_cast<_Ty*>(_data); }
234 
236  IoService & getService() const { return *_ioServ; }
238  void acceptAsync( IoAcceptCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoAcceptCtx::TimeoutFunction cbTimeout = nullptr );
240  void connectAsync( EndPoint const & ep, IoConnectCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoConnectCtx::TimeoutFunction cbTimeout = nullptr );
242  void recvUntilSizeAsync( size_t targetSize, IoRecvCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoRecvCtx::TimeoutFunction cbTimeout = nullptr );
245  {
246  this->recvUntilSizeAsync( 0, cbOk, timeoutMs, cbTimeout );
247  }
249  void sendAsync( void const * data, size_t size, IoSendCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoSendCtx::TimeoutFunction cbTimeout = nullptr );
252  {
253  this->sendAsync( data.getBuf(), data.getSize(), cbOk, timeoutMs, cbTimeout );
254  }
256  void recvFromUntilSizeAsync( size_t targetSize, IoRecvFromCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoRecvFromCtx::TimeoutFunction cbTimeout = nullptr );
259  {
260  this->recvFromUntilSizeAsync( 0, cbOk, timeoutMs, cbTimeout );
261  }
263  void sendToAsync( EndPoint const & ep, void const * data, size_t size, IoSendToCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoSendToCtx::TimeoutFunction cbTimeout = nullptr );
265  void sendToAsync( EndPoint const & ep, winux::Buffer const & data, IoSendToCtx::OkFunction cbOk, winux::uint64 timeoutMs = -1, IoSendToCtx::TimeoutFunction cbTimeout = nullptr )
266  {
267  this->sendToAsync( ep, data.getBuf(), data.getSize(), cbOk, timeoutMs, cbTimeout );
268  }
269 
274 
275 private:
276  IoService * _ioServ;
277  void * _data;
278 
279  friend class IoService;
280 };
281 
282 namespace ip
283 {
284 namespace tcp
285 {
287 class EIENNET_DLL AsyncSocket : public eiennet::AsyncSocket
288 {
289 public:
291 
292 protected:
293  AsyncSocket( IoService & ioServ, int sock, bool isNewSock = false ) : BaseClass( ioServ, sock, isNewSock ) { }
294 
295  explicit AsyncSocket( IoService & ioServ ) : BaseClass( ioServ, BaseClass::afInet, BaseClass::sockStream, BaseClass::protoUnspec ) { }
296 
297 public:
298  static winux::SharedPointer<AsyncSocket> New( IoService & ioServ, int sock, bool isNewSock = false )
299  {
300  return winux::SharedPointer<AsyncSocket>( new AsyncSocket( ioServ, sock, isNewSock ) );
301  }
302 
303  static winux::SharedPointer<AsyncSocket> New( IoService & ioServ )
304  {
305  return winux::SharedPointer<AsyncSocket>( new AsyncSocket(ioServ) );
306  }
307 };
308 
309 } // namespace tcp
310 
311 namespace udp
312 {
314 class EIENNET_DLL AsyncSocket : public eiennet::AsyncSocket
315 {
316 public:
318 
319 protected:
320  AsyncSocket( IoService & ioServ, int sock, bool isNewSock = false ) : BaseClass( ioServ, sock, isNewSock ) { }
321 
322  explicit AsyncSocket( IoService & ioServ ) : BaseClass( ioServ, BaseClass::afInet, BaseClass::sockDatagram, BaseClass::protoUnspec ) { }
323 
324 public:
325  static winux::SharedPointer<AsyncSocket> New( IoService & ioServ, int sock, bool isNewSock = false )
326  {
327  return winux::SharedPointer<AsyncSocket>( new AsyncSocket( ioServ, sock, isNewSock ) );
328  }
329 
330  static winux::SharedPointer<AsyncSocket> New( IoService & ioServ )
331  {
332  return winux::SharedPointer<AsyncSocket>( new AsyncSocket(ioServ) );
333  }
334 };
335 
336 } // namespace udp
337 
338 } // namespace ip
339 
340 } // namespace eiennet
341 
eiennet::AsyncSocket BaseClass
_Ty * getDataPtr() const
获取套接字关联数据
std::function< bool(winux::SharedPointer< AsyncSocket > servSock, winux::SharedPointer< AsyncSocket > clientSock, ip::EndPoint const &ep) > OkFunction
IoMapMap & getIoMaps()
static winux::SharedPointer< AsyncSocket > New(IoService &ioServ, int sock=-1, bool isNewSock=false)
void setDataPtr(void *data)
设置套接字关联数据
winux::uint64 costTimeMs
总花费时间
void * getBuf() const
暴露缓冲区指针
Definition: utilities.hpp:993
OkFunction cbOk
成功回调函数
TimeoutFunction cbTimeout
超时回调函数
OkFunction cbOk
成功回调函数
OkFunction cbOk
成功回调函数
TimeoutFunction cbTimeout
超时回调函数
size_t hadBytes
已发送数据量
ip::EndPoint epFrom
来自此EP的数据
winux::uint64 costTimeMs
总花费时间
异步套接字
winux::ThreadPool & getPool()
std::function< void(winux::SharedPointer< AsyncSocket > sock, winux::uint64 costTimeMs) > OkFunction
winux::uint64 timeoutMs
超时时间
void * getDataPtr() const
获取套接字关联数据
winux::uint64 costTimeMs
总花费时间
std::function< void(winux::SharedPointer< AsyncSocket > sock, IoConnectCtx *ctx) > TimeoutFunction
端点基类(套接字地址对象基类)
std::function< void(winux::SharedPointer< AsyncSocket > sock, IoSendCtx *ctx) > TimeoutFunction
size_t hadBytes
已接收数据量
size_t targetBytes
目标数据量
static winux::SharedPointer< AsyncSocket > New(IoService &ioServ)
void sendAsync(winux::Buffer const &data, IoSendCtx::OkFunction cbOk, winux::uint64 timeoutMs=-1, IoSendCtx::TimeoutFunction cbTimeout=nullptr)
发送数据(异步)
TimeoutFunction cbTimeout
超时回调函数
std::function< void(winux::SharedPointer< AsyncSocket > sock, winux::Buffer &data, bool cnnAvail) > OkFunction
void recvAsync(IoRecvCtx::OkFunction cbOk, winux::uint64 timeoutMs=-1, IoRecvCtx::TimeoutFunction cbTimeout=nullptr)
接收数据(异步)
size_t getSize() const
获取数据大小
Definition: utilities.hpp:1025
#define DISABLE_OBJECT_COPY(clsname)
Definition: utilities.hpp:85
static winux::SharedPointer< AsyncSocket > New(IoService &ioServ, int sock, bool isNewSock=false)
bool accept(int *sock, EndPoint *ep=NULL)
接受一个客户连接
互斥量
Definition: threads.hpp:349
ip::EndPoint ep
客户连接IP地址
std::function< void(winux::SharedPointer< AsyncSocket > sock, winux::uint64 costTimeMs, bool cnnAvail) > OkFunction
bool cnnAvail
连接是否有效
std::function< void(winux::SharedPointer< AsyncSocket > sock, IoRecvCtx *ctx) > TimeoutFunction
winux::GrowBuffer data
已接收的数据
std::function< bool(winux::SharedPointer< AsyncSocket > servSock, IoAcceptCtx *ctx) > TimeoutFunction
AsyncSocket(IoService &ioServ, int sock, bool isNewSock=false)
size_t hadBytes
已接收数据量
std::function< void(winux::SharedPointer< AsyncSocket > sock, winux::uint64 costTimeMs) > OkFunction
套接字基础类
缓冲区,表示内存中一块二进制数据(利用malloc/realloc进行内存分配)
Definition: utilities.hpp:906
static winux::SharedPointer< AsyncSocket > New(IoService &ioServ)
#define EIENNET_DLL
eiennet::AsyncSocket BaseClass
winux::uint64 startTime
请求开启的时间
void recvFromAsync(IoRecvFromCtx::OkFunction cbOk, winux::uint64 timeoutMs=-1, IoRecvFromCtx::TimeoutFunction cbTimeout=nullptr)
无连接,接收数据(异步)
winux::SharedPointer< AsyncSocket > accept(EndPoint *ep=nullptr)
size_t hadBytes
已发送数据量
TimeoutFunction cbTimeout
超时回调函数
SockType
套接字类型
线程池,创建一组线程等待着从任务队列中获取任务执行
Definition: threadtask.hpp:159
IoMapMap::mapped_type IoMap
网络通信库
AddrFamily
地址族
高效的可增长缓冲区,1.33倍冗余量
Definition: utilities.hpp:1103
uint64 GetUtcTimeMs(void)
获取UTC时间毫秒数,UTC秒数可以直接除以1000,或者调用CRT的time(NULL)
winux::GrowBuffer data
已接收的数据
std::function< void(winux::SharedPointer< AsyncSocket > sock, IoRecvFromCtx *ctx) > TimeoutFunction
#define DEFINE_CUSTOM_EVENT(evtname, paramtypes, calledparams)
Definition: utilities.hpp:182
winux::Buffer data
待发送的数据
AsyncSocket(IoService &ioServ)
OkFunction cbOk
成功回调函数
static winux::SharedPointer< AsyncSocket > New(IoService &ioServ, int sock, bool isNewSock=false)
TimeoutFunction cbTimeout
超时回调函数
std::function< void(winux::SharedPointer< AsyncSocket > sock, IoSendToCtx *ctx) > TimeoutFunction
std::map< winux::SharedPointer< AsyncSocket >, std::map< IoType, winux::SharedPointer< IoCtx > > > IoMapMap
简单指针
Definition: smartptr.hpp:302
winux::Buffer data
待发送的数据
void sendToAsync(EndPoint const &ep, winux::Buffer const &data, IoSendToCtx::OkFunction cbOk, winux::uint64 timeoutMs=-1, IoSendToCtx::TimeoutFunction cbTimeout=nullptr)
无连接,发送数据(异步)
TimeoutFunction cbTimeout
超时回调函数
AsyncSocket(IoService &ioServ, AddrFamily af, SockType sockType, Protocol proto)
构造函数2
winux::SimplePointer< EndPoint > ep
size_t targetBytes
目标数据量
std::function< void(winux::SharedPointer< AsyncSocket > sock, winux::Buffer &data, EndPoint const &ep) > OkFunction
unsigned __int64 uint64
Definition: utilities.hpp:230
winux::Mutex & getMutex()
static winux::SharedPointer< AsyncSocket > New(IoService &ioServ, AddrFamily af, SockType sockType, Protocol proto)
AsyncSocket(IoService &ioServ, int sock=-1, bool isNewSock=false)
构造函数1
virtual ~IoCtx()
IoService & getService() const
获取IO服务对象
AsyncSocket(IoService &ioServ, int sock, bool isNewSock=false)
跨平台基础功能库
Definition: archives.hpp:7
OkFunction cbOk
成功回调函数
AsyncSocket(IoService &ioServ)
OkFunction cbOk
成功回调函数