Skip to content

Commit

Permalink
Merge pull request #48 from baidu/py3
Browse files Browse the repository at this point in the history
Version 3.2.27
  • Loading branch information
mythmgn authored Jul 21, 2021
2 parents 42c61ff + ede3455 commit 79ab2f3
Show file tree
Hide file tree
Showing 23 changed files with 711 additions and 76 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tags
*.un~
*.tar
*/*.tar.gz
nohup.out

### folders ###
build
Expand Down Expand Up @@ -121,3 +122,7 @@ test.json
百度开源社区专访稿(马冠南).docx
docs/_book.zip
cup_bidu
examples/arrow-runtime/log/agent.stdout.stderr
examples/arrow-runtime/log/master.stdout.stderr
.ycm_extra_conf.py
examples/arrow-runtime/data/TOPDIR
90 changes: 52 additions & 38 deletions cup/net/asyn/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from cup.util import misc
from cup.util import threadpool
from cup.services import executor
from cup.net.asyn import ioloop
from cup.net.asyn import msg as async_msg
from cup.net.asyn import context as sockcontext

Expand Down Expand Up @@ -83,7 +84,8 @@ def __init__(self, ip, bindport, thdpool_param):
self._conns = {}
self._bind_port = bindport
self._bind_ip = ip
self._epoll = select.epoll()
# self._epoll = select.epoll()
self._poller = ioloop.PollerFactory()
self._stopsign = False
self._bind_sock = None
self._fileno2context = {}
Expand Down Expand Up @@ -152,7 +154,7 @@ def _set_sock_params(cls, sock):

@classmethod
def _set_sock_nonblocking(cls, sock):
sock.setblocking(0)
sock.setblocking(False)

@classmethod
def _epoll_write_params(cls):
Expand All @@ -165,6 +167,7 @@ def _epoll_read_params(cls):
def get_needack_dict(self):
"""
get neekack dict
"""
return self._needack_context_dict

Expand All @@ -182,7 +185,7 @@ def bind(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._set_sock_params(sock)
sock.bind((self._bind_ip, self._bind_port))

##self._set_sock_nonblocking(sock)
log.info(
'bind port info:(ip:%s, port:%s)' % (
Expand Down Expand Up @@ -253,18 +256,20 @@ def push_msg2sendqueue(self, msg):
self._rwlock.release_writelock()
ret = 0
try:
self._epoll.register(
sock.fileno(), self._epoll_write_params()
)
# self._epoll.register(
# sock.fileno(), self._epoll_write_params()
# )
self._poller.register(sock.fileno(), ioloop.WRITE | ioloop.READ)
except Exception as error: # pylint: disable=W0703
log.warn(
'failed to register the socket fileno, err_msg:%s,'
'perinfo:%s:%s. To epoll modify it' %
(str(error), peer[0], peer[1])
)
self._epoll.modify(
sock.fileno(), self._epoll_write_params()
)
# self._epoll.modify(
# sock.fileno(), self._epoll_write_params()
# )
self._poller.modify(sock.fileno(), ioloop.WRITE | ioloop.READ)
else:
log.error(
'failed to post msg. Connect failed. peer info:{0}.'
Expand Down Expand Up @@ -332,20 +337,21 @@ def _handle_new_conn(self, newsock, peer):
self._peer2context[peer] = context
self._context2fileno_peer[context] = (newsock.fileno(), peer)
self._rwlock.release_writelock()
self._epoll.register(
newsock.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR
self._poller.register(
newsock.fileno(), ioloop.WRITE | ioloop.READ
)
# self._epoll.register(
# newsock.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR
# )
log.info('a new connection: {0}, register new fd:{1}'.format(peer, newsock.fileno()))

def cleanup_error_context(self, context):
"""clean up error context"""
def _cleanup_context(send_queue, peerinfo):
"""cleanup context"""
log.debug('to cleanup socket, peer:{0}'.format(peerinfo))
log.debug(
'cleanup: send_queue of socket size:{0}'.format(
send_queue.qsize()
)
'to cleanup socket, peer:{0}, send_queue size {1}'.format(
peerinfo, send_queue.qsize())
)
while True:
try:
Expand All @@ -368,7 +374,8 @@ def _cleanup_context(send_queue, peerinfo):
if fileno_peer is None:
return
try:
self._epoll.unregister(fileno_peer[0])
# self._epoll.unregister(fileno_peer[0])
self._poller.unregister(fileno_peer[0])
except Exception as error: # pylint: disable=W0703
log.warn(
'epoll unregister error:%s, peerinfo:%s' %
Expand Down Expand Up @@ -444,7 +451,7 @@ def poll(self):
misc.check_not_none(self._bind_sock)
self._bind_sock.listen(128)
self._executor.queue_exec(
self.listen_new_connect,
self.listen_new_connect,
urgency=executor.URGENCY_HIGH)
self._executor.delay_exec(
2, # todo set the check_time to ?
Expand All @@ -453,29 +460,33 @@ def poll(self):
)
while not self._stopsign:
try:
events = self._epoll.poll(1)
events = self._poller.poll(1)
# events = self._epoll.poll(1)
except IOError as err:
if err.errno == errno.EINTR:
return
raise err
# log.debug('start to poll')
log.debug('handle events num:%s, bind sock fileno:%s' % (len(events), self._bind_sock.fileno()))
log.debug('poller events num {0}'.format(len(events)))
for fileno, event in events:
# if it comes from the listen port, new conn
#if fileno == self._bind_sock.fileno():
# newsock, addr = self._bind_sock.accept()
# self._handle_new_conn(newsock, addr)
if event & select.EPOLLIN:
# if event & select.EPOLLIN:
if event & ioloop.READ:
log.info('ioloop.READ, fd {0}'.format(fileno))
try:
self._handle_new_recv(self._fileno2context[fileno])
except KeyError:
log.info('fd:%s, socket already closed' % fileno)
elif event & select.EPOLLOUT:
log.info('fd:{0} socket already closed'.format(fileno))
# elif event & select.EPOLLOUT:
elif event & ioloop.WRITE:
log.info('ioloop.WRITE, fd {0}'.format(fileno))
try:
self._handle_new_send(self._fileno2context[fileno])
except KeyError:
log.info('fd:%s, socket already closed' % fileno)
elif (event & select.EPOLLHUP) or (event & select.EPOLLERR):
elif event & ioloop.ERROR:
# FIXME: consider if we need to release net msg resources
if event & select.EPOLLHUP:
log.info('--EPOLLHUP--')
Expand Down Expand Up @@ -521,7 +532,7 @@ def interupt_accept():
self._async_stop(force_stop)
interupt_accept()
log.info('connection manager stopped')

def get_recv_msg_ind(self):
"""
get recv msg ind
Expand Down Expand Up @@ -573,9 +584,11 @@ def _finish_read_callback(self, succ, result):
except KeyError:
pass
else:
self._epoll.modify(
context.get_sock().fileno(), select.EPOLLIN | select.EPOLLET
)
# self._epoll.modify(
# context.get_sock().fileno(), select.EPOLLIN | select.EPOLLET
# )
# TODO : determine if we need delete this
self._poller.modify(context.get_sock().fileno(), ioloop.READ | ioloop.WRITE)
context.release_readlock()

def read(self, context):
Expand All @@ -601,6 +614,7 @@ def read(self, context):
self._finish_read_callback(False, context)

def _do_read(self, context):
log.debug('to read sock {0}'.format(context.get_context_info()))
sock = context.get_sock()
data = None
context.move2recving_msg()
Expand All @@ -611,25 +625,25 @@ def _do_read(self, context):
err = error.args[0]
if err == errno.EAGAIN:
log.debug(
'EAGAIN happend, peer info %s' %
context.get_context_info()
)
'EAGAIN happend peerinfo {0}'.format(
context.get_context_info()
))
return context
elif err == errno.EWOULDBLOCK:
log.info(
'EWOULDBLOCK happend, context info %s' %
'EWOULDBLOCK happend peerinfo {0}'.format(
context.get_context_info()
)
))
return context
else:
log.debug(
'Socket error happend, error:%s, peer info %s' %
(str(error), context.get_context_info())
log.info(
'Socket error:{0}, peer info {1}'.format(
error, context.get_context_info())
)
context.to_destroy()
return context
except Exception as error:
log.critical(
log.error(
'Socket error happend, error:%s, peer info %s' %
(str(error), context.get_context_info())
)
Expand Down Expand Up @@ -665,7 +679,7 @@ def _finish_write_callback(self, succ, result):
)
else:
# log.debug('to epoll modify')
epoll_write_params = self._epoll_write_params()
# epoll_write_params = self._epoll_write_params()
context.release_writelock()

# context hash locked the writing.
Expand Down
Loading

0 comments on commit 79ab2f3

Please sign in to comment.