ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

multiplexer

2022-07-28 14:05:14  阅读:194  来源: 互联网

标签:return nil err multiplexer io error addr


package mesh

import (
"fmt"
"io"
"net"
"strconv"
"strings"
)

type Listener interface {
io.Closer
Accept() (io.ReadWriteCloser, net.Addr, error)
}

type tcpListener struct {
net.Listener
}

func (t *tcpListener) Accept() (io.ReadWriteCloser, net.Addr, error) {
c, err := t.Listener.Accept()
if err != nil {
return nil, nil, err
}
return c, c.RemoteAddr(), nil
}


type udpListener struct {
*net.UDPConn
m map[string]chan []byte
}

func (u *udpListener) Close() error {
for _, v := range u.m {
close(v)
}
return u.UDPConn.Close()
}

func (u *udpListener) Accept() (io.ReadWriteCloser, net.Addr, error) {
for {
b := make([]byte, 40960)
n, addr, err := u.ReadFrom(b)
if err != nil {
return nil, nil, err
}
if addr == nil {
return nil, nil, fmt.Errorf("udp: unknown remote addr")
}
t, ok := u.m[addr.String()]
if !ok {
channel := make(chan []byte, 100)
t <- b[:n]
u.m[addr.String()] = channel
return nil, addr, nil
}
t <- b[:n]
}
}

func Listen(addr string) (Listener, error) {
scheme, ip, port, err := parseAddr(addr)
if err != nil {
return nil, err
}
switch scheme[:3] {
case "tcp":
l, err := net.ListenTCP(scheme, &net.TCPAddr{ip, port, ""})
return &tcpListener{l}, err
case "udp":
l, err := net.ListenUDP(scheme, &net.UDPAddr{ip, port, ""})
return &udpListener{l, make(map[string]chan []byte)}, err
default:
return nil, fmt.Errorf("unsupported")
}
}

func Dial(addr string) (io.ReadWriteCloser, error) {
scheme, ip, port, err := parseAddr(addr)
if err != nil {
return nil, err
}
switch scheme[:3] {
case "tcp":
return net.DialTCP(scheme, nil, &net.TCPAddr{ip, port, ""})
case "udp":
return net.DialUDP(scheme, nil, &net.UDPAddr{ip, port, ""})
default:
return nil, fmt.Errorf("unsupported")
}
}


func parseAddr(addr string) (string, net.IP, int, error) {
i := strings.Index(addr, "://")
if i == -1 {
return "", nil, 0, fmt.Errorf("parse: find scheme in %s", addr)
}
scheme := addr[:i]
addr = addr[i+3:]

//i = strings.Index(addr, ":")
//if i == -1 {
// return "", nil, 0, fmt.Errorf("parse: find port in %s", addr)
//}
//host := addr[:i]
//sport := addr[i+1:]

host, sport, err := net.SplitHostPort(addr)
if err != nil {
return "", nil, 0, err
}

port, err := strconv.Atoi(sport)
if err != nil {
return "", nil, 0, err
}
ip := net.ParseIP(host)
if ip == nil {
return "", nil, 0, fmt.Errorf("parse: ip %s", host)
}
return scheme, ip, port, nil
}







package mux

import (
"errors"
"io"
)

type cell struct {
id uint64
data []byte
}

type Mux interface {
io.Closer
Dial() (io.ReadWriteCloser, error)
Accept() (io.ReadWriteCloser, error)
}

func NewMuxer(l io.ReadWriteCloser, n int) Mux {
mux := &multiplexer{l, make(chan cell, n), make(chan cell, n), make(map[uint64]chan cell),
make(chan io.ReadWriteCloser, n), make(chan io.ReadWriteCloser, n),
make(chan error, 2), make(chan error, 1), uint32(n)}

go func(m *multiplexer) {
for req := range m.send {
id := req.id
err := write_var_u64(m, id)
if err != nil {
return
}
err = writeBytes(m, req.data)
if err != nil {
return
}
}
_ = m.ReadWriteCloser.Close()
}(mux)

go func(m *multiplexer) {
closed := false
for {
if !closed {
select {
case <- m.err:
close(m.accept)
close(m.dialer)
closed = true
case <- m.close:
close(m.accept)
close(m.dialer)
closed = true
default:
break
}
}

res, ok := <-m.recv
if !ok {
return
}
id := res.id
data := res.data
rsp, ok := m.channels[id]
if ok {
if len(data) > 0 {
rsp <- cell{id, data}
} else {
close(rsp)
delete(m.channels, id)
if len(m.channels) == 0 && closed {
close(m.send)
close(m.recv)
}
}
} else {
if !closed {
channel := m.dialer
if len(data) > 0 {
channel = m.accept
}
channel <- newExclusive(id, data, m)
}
}
}
}(mux)

go func(m *multiplexer) {
m.err <- func() error {
for {
select {
case <- m.close:
return nil
default:
break
}
id, err := read_var_u64(m)
if err != nil {
return err
}
data, err := readBytes(m)
if err != nil {
return err
}
m.recv <- cell{id, data}
}
}()
close(m.err)
}(mux)
return mux
}

type multiplexer struct {
io.ReadWriteCloser
send chan cell
recv chan cell
channels map[uint64]chan cell

accept chan io.ReadWriteCloser
dialer chan io.ReadWriteCloser
close chan error
err chan error
n uint32
}

func (m *multiplexer) Dial() (io.ReadWriteCloser, error) {
m.recv <- cell{uid(), nil}
l, ok := <-m.dialer
if !ok {
return nil, errors.New("dial: closed")
}
return l, nil
}

func (m *multiplexer) Accept() (io.ReadWriteCloser, error) {
l, ok := <-m.accept
if ok {
return nil, errors.New("accept: closed")
}
return l, nil
}

func (m *multiplexer) Close() error {
m.close <- nil
m.close <- nil
close(m.close)
return nil
}


type exclusive struct {
id uint64
muxer *multiplexer
channel chan cell
buf []byte
}

func newExclusive(id uint64, buf []byte, m *multiplexer) io.ReadWriteCloser {
channel := make(chan cell, m.n)
m.channels[id] = channel //critical
return &exclusive{id, m, channel, buf}
}

func (c *exclusive) Read(p []byte) (int, error) {
for {
if len(c.buf) > 0 {
n := copy(p, c.buf)
c.buf = c.buf[n:]
return n, nil
}
result, ok := <-c.channel
if !ok {
return 0, io.EOF
}
c.buf = result.data
}
}

func (c *exclusive) Write(p []byte) (int, error) {
if len(p) > 0 {
c.muxer.send <- cell{c.id, p}
}
return len(p), nil
}

func (c *exclusive) Close() error {
c.muxer.send <- cell{c.id, nil}
c.muxer.recv <- cell{c.id, nil}
return nil
}

标签:return,nil,err,multiplexer,io,error,addr
来源: https://www.cnblogs.com/Janly/p/16528375.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有