< 返回版块

2019-03-21 12:01    责任编辑:Mike

本文转载自:http://wayslog.com/2018/11/19/rust-in-server-side/


title: rust in server side date: 2018-11-19 10:50:31 tags:

  • rust
  • linux
  • context switch
  • epoll

本文要讲的事,要从一个非常简单的问题讲起:“如何写出足够快的程序”。 事实上,本着节省服务器资源,节省地球电能的原则,同时满足运维关于服务器利用率的统计学需求,我们被要求对自己的程序进行了一番的优化。这次写下来,就是为了为这次优化行动做个总结。

TLDR:优化服务端程序,大部分情况下只有两个方向:1. 优化流程 2. 优化 syscall。

线程模型

我们首先采用了最简单的线程模型,任意一个线程只会服务一个连接。这是一个非常基础缺有效传统的线程模型,尤其在低连接数情况下,但是,一旦我们面临互联网企业用户的使(瞎)用(J)方(B)式(用),我们的程序很容易变得很慢。

背景分析

首先,我们在做的程序,正是我站内部使用的一个缓存中间件。众所周知嘛,缓存,对于 latency 和 qps 的需求都非常高,也就是说,我们需要这个中间件跑的又快又好。并且,对一个缓存中间件来说,其内在逻辑基本上是固定的,不存在流程优化的可能。因此,优化 syscall 是本次优化的重中之重。

syscall 开销

对网络服务来说,我们常见的 syscall 最多的只有两类: read 和 write 。那么,分析这两类的调用的开销就非常必要了。那么,究竟是哪里影响了 syscall 的性能呢。

内核陷入开销

众所周知,我们在进行任何syscall的时候,都是需要陷入内核调用的。这本身是一个从非特权态到特权态的操作,方便操作系统内核正确的调用网卡等硬件的接口。那么内核陷入开销是瓶颈么?我们可以测试一下:

我特意选择了一个不会失败不会阻塞的syscall: getpid 来测试 syscall 的性能。

#[bench]
fn bench_getpid_syscall(b: &mut test::Bencher) {
    b.iter(|| {
        unsafe {
            libc::syscall(libc::SYS_getpid);
        }
    });
}

(这里需要注意的是不能直接使用 libc 提供的 getpid 函数,因为 pid_t 这个结构体会被缓存)。

测试结果如下

test bench_getpid_syscall ... bench:          57 ns/iter (+/- 0)

可以看到,每次陷入内核调用的时候的开销大概在 50 ns 左右,这个开销在现代CPU来说,其实并不算大。以我的CPU为例,按照 57ns 一个陷入操作,也就意味着系统每秒可以执行的陷入操作能达到 20M 次。如果换算成 write/read,假设每次发送的包大小在 20 byte ,那正常来说系统能发送的数据量上限将是: 20M * 20 byte = 400 MB/s = 32 Gbps ,这种速度足以打满市面上的任何一款万兆网卡。

由此可见,内核陷入调用开销其实并不是我们服务端程序慢的主因。

上下文切换开销

在排除了内核陷入开销过大之后,我把目光聚焦到了上下文切换开销上。

何谓上下文?

这个要从进程的三态转换讲起。我们知道进程有三个基本的运行状态:就绪态,运行态,阻塞态。任何一本操作系统的书上都会有这三态的完整状态转换图。当一个进程的时间片用完或者从运行态转换为阻塞态的时候,要让出其CPU时间片。然后由操作系统寻找一个可运行的线程,将时间片给这个线程。这整个完整的调度过程被称为一次调度。然而,切换线程(注意事项线程而不是进程)是需要开销的。我们在各种书籍上也被反复强调这个开销并不会小。那么是否是因为频繁切换线程导致的呢?为了测试,我们写了如下代码:

const MAX: i32 = 1_000_000;

fn bench_yield(n: i32) {
    let times = MAX / n;
    let ths: Vec<_> = (0..n)
        .into_iter()
        .map(|_| thread::spawn(move || for _ in 0..times {
            unsafe {
                libc::syscall(libc::SYS_sched_yield);
            }
        }))
        .collect();
    for th in ths {
        th.join().unwrap();
    }
}

#[bench]
fn bench_yield_x1_thread(b: &mut test::Bencher) {
    b.iter(|| bench_yield(1))
}

得出结果可知,每次 yield 的时间大约是 130ns ,需要注意的是,这是一个syscall,当然还有 50ns 的陷入内核调用开销。

远远达不到开销过大的标准。

调度开销

在上面的测试过程中,我们尝试调大 thread 的数量,发现,当 thread 越多的时候,完成 1M 次上下文切换的时间越来越多。也就是说,当同时执行的线程数量越多的时候,其对应的 yield 开销越大。众所周知,linux 现在的调度器名为 CFS,由于其使用的红黑树,每次调度的开销实际上仅与当前活跃线程数量正相关,而且复杂度大致为:O(log(n))。

这正好印证了我们的测试, 当可运行的 thread 越多的时候,上下文切换开销基本不变,但是,调度开销上去了。

具体点,增加的时间就是系统找到下一个可运行的线程的时间。

听起来, O(log(n)) 的开销不大,但是一个系统内的活跃线程数量并不会完全受我们控制,尤其当你的中间件里存在着大量出IO操作的时候(也就是通常说的高并发的情况下)。log(n) 的复杂度会导致线程的响应时间会增加的更多,这样,大量的线程得不到响应,反而会使系统内可运行的线程数量又变多。

这也是所谓线程模型处理高并发的短处,甚至,有好事儿者(挑战者?)还专门发明了一个 C10K 问题。

这里不得不聊一下 Go 语言,我们知道,Go 语言默认会开启一个最大同时运行的并发数,一般等价于系统核心数(超线程)。但是其后台实际的线程数有多少呢,其实我们也无法控制。如果你的系统里存在大量非内置的 syscall(比如上面的getpid) 或者存在大量的 cgo 调用,你很可能的到一个跑着几百几千个线程的怪物程序。

正确的做法

解决 C10K 问题,正确的入手方式是改掉传统的线程模型,也就是着手减少系统 runnable 线程数量。

为了这一目标,linux 引入了多路复用技术。

我们来假设一下,如果一个线程调用了一次 I/O 操作而对应的这个 fd 并没有数据给他或者返回,那么这个线程会立马问自己三个问题:我是谁,我还可以动么,我可以阻塞么。

三个问题直击内核心灵,内核大手一挥,你阻塞吧。

可想而知,这个样的代价是巨大的。一次阻塞态切换,不仅包含这上下文切换的开销,还有可能带来的 syscall resume 之后的大量的调度开销。因此,我们诞生了一个线程用多个fd的技术:select/poll。但是仔细看看,select 和 poll 的模型还是挺傻的,因为你不知道具体是谁发生了 I/O 就绪事件,我只能非常莽的轮询所有 fd 。这显然非常低效。

epoll 改进了上述模型,用两个链表维护了 IO 事件就绪的事件列表,用户在得到 epoll 事件之后,拿到的一定是所有的发生用户期待的 I/O 事件的列表。这样无疑快了很多。

所以 epoll 在本质上,其实是一种简单的就绪通知机制。

epoll 模型

epoll 自从诞生之初,关于怎么有效利用 epoll 的讨论就没有停止过。挣扎了这么久,现在大家大部分会倾向于三种模型:

  1. 1 event loop + N worker thread
  2. N (event loop + worker) + reuseport
  3. coroutine

三种模型各有优劣,一般情况下,我们使用第一种模型。这种模型有着独立的 event loop 线程,同时也就意味着有着独立的 io 线程。比较适合 CPU 计算密集的线程模型,但是如果 IO 事件超过了单线程处理的上限,这个模型很容易有瓶颈。

第二种模型也有着大量的拥趸,包括作者本人也是这种模型的提倡者。写这种程序很简单,只需要按照普通的单线程程序一样去使用和编写,写完之后再最外层进行 pthread_create 就行了。但是,限于 reuseport 本身的特性,这样的服务在刚启动的时候会进入一段时间不稳定(有意者可以看再说socket的SO_REUSEPORT选项)。

Rust 与 epoll

在讨论 Rust 之前,我们需要了解一下各大系统上的 I/O 多路复用设施。除了我们常见的 linux 上的 epoll ,还有和 epoll 基本类似的 kqueue,还有 windows 上专属的 iocp。这些是在对应系统上进行高速I/O的必要设施。但是,这些设施的使用姿势各不相同,尤其 iocp 采用了与 epoll/kqueue 风格迥异的 Proactor 模式。而 Rust 作为一门系统语言,必然要考虑同时对这三种基础设施的兼容,于是, mio 应运而生。

mio 是 Carl Lerche 编写的,用一点点的附加开销统一了上述三大基础设施的一款框架。其本身的用法基本类似 epoll。其目的很明确,就是奔着 Rust 的基础设施这一层面去的。事实上 mio 也确实成了既定标准,甚至火狐还给 mio 的开发者奖励了一大笔钱。

那么是不是说,我们现在想用 Rust 编写高性能程序,就一定要用 mio 呢?

当然不是。

mio 本身存在着巨大的使用代价,这个代价不在运行时,而在于开发时。为了利用好 mio ,你需要自己编写一个完整的缓冲区管理程序,同时还有自己维护事件订阅列表以及各个连接的状态转移。最终,你得到了一个巨大无比的状态机。这还只是单线程程序,如果中间涉及到多线程处理,混乱的代码会让你绝望。

不过幸好,Rust 社区意识到了这个问题,哈利波特同学(Alex Crichton)开发了 futures 库,为 rust 提供了 Future/Stream 的抽象。同时,二人合力开发了 tokio ,让使用者的开发难度大大降低。

Future 与 Stream

什么是 Future ?Future 其实是一种抽象,用来表示一种即将完成的动作。比如,一次数据库请求,一个复杂的数值计算,一次 Cache 操作。等等。这些都是一个 Future 。我们来看一下 Future 的部分声明:

pub trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
}

上面是 Future 的主要声明部分,它只有一个 required method 。也就是说,你想要实现一个 Future, 至少要实现上面的 poll 方法。

Future 有三个状态:

  1. Err(Self::Error) 表示发生了错误,并且返回对应的错误类型。
  2. Ok(Async::NotReady) 表示本次计算还没有完成,需要等会儿才能完成。需要由调用方重试。
  3. Ok(Async::Ready(T)) 表示本次计算完成了,并且返回了对应的结果。

需要知道的是,Future 的使用方所有的调用 Future 的动作都是在 poll 函数里进行的,这里就涉及到一个问题:Future 需要自己维护自己的状态。因为 Future 也不知道上次自己怎么样了。只能拿出自己的小本本,查一下自己上次做到哪里了。这也是 Future 区别于传统的 coroutine 最大的区别。

Future 说完了,我们来看另一个 trait 定义:Stream。

pub trait Stream {
    type Item;
    type Error;
    
    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>;
}

与 Future 不同的是,Stream 一共有四种状态:

  1. Err(Self::Error) 表示发生了错误,终止此 Stream。
  2. Ok(Async::NotReady) 表示Stream暂时没有数据。
  3. Ok(Async::Ready(Some(Self::Item))) 表示正确取到了数据。
  4. Ok(Async::Ready(None)) 表示没收到数据且本 Stream 已经被关闭,不会再有数据过来了。

我们可以看到,Stream 比 Future 多了一个终结状态,用来表示 Stream 彻底完成的终结状态。

那么 Stream 可以用来表示什么呢?我估计很多人都想到了,Stream 可以被用来表示一个一个的请求,一个一个的连接,或者任何适合流形式表达的东西。

如何进行关联?

上文中我们介绍了 Future/Stream ,但是 Future 系统究竟是怎样与 mio 系统关联起来的呢?

我们知道,一个 Future 实际上表示一个处理过程,那么,我们完全有理由将 mio 的 fd 与 Future 关联起来。

一旦 mio 的某个 fd 发生了事件,那么就来调用对应的 Future 的 poll 函数即可。

解释一下上面的问题。

fd 是什么?file descriptor, 因为 linux 万物皆文件的特性,所以一般我们会把 文件、连接 等资源句柄成为 file descriptor。

发生了什么事件,联系前面说的底层特性,我们知道,mio 在linux的底层是 epoll,mio 会为对应的 fd 注册可读可写事件,而当对应的 fd 有可读或者可写的事件的时候,mio 会收到 epoll 的通知。

我们其实,只需要保存一下 fd 与 Future 的关联关系即可。

一个简单的案例分析

URl: https://github.com/wayslog/aster/blob/master/src/cluster/mod.rs#L252-L313

整体结构:

let fut = lazy()
  .and_then(...)
  .and_then(...)
  .and_then(...);
current_thread::block_on_all(fut).unwrap();

(题外话:代码折叠真的挺有用的)

我们可以看到,fut 最终被 current_thread 跑了起来,而且会一直 block 在这里除非 fut 完成。但是,我们可以看到最后一个的 and_then 里明明已经有了返回:

current_thread::spawn(amt);
Ok(())

是没执行到这里么?当然不是。。。我们加入一个输出函数来看一看:

## 代码 
current_thread::spawn(amt);
println!("i am baka");
Ok(())

## 输出
DEBUG 2018-11-29T11:31:10Z: libaster::proxy: skip proxy ping feature for cluster test-mc-proxy
 INFO 2018-11-29T11:31:10Z: libaster::proxy: setup ping for cluster test-redis-proxy
i am baka
DEBUG 2018-11-29T11:31:10Z: libaster::cluster::fetcher: trying to execute cmd to 127.0.0.1:7000

可以明显的看到,本函数已经退出了,那这个 Future 究竟等在哪里呢?

答,当 Future 里又有子任务(sub process)被 spawn 的时候, tokio 会等到所有的子任务完成才会退出。而我们发现,上面的子任务,其实是不断的 accept 新连接的任务,可以这么说,只要它还能 accept 新连接,这个 Future 就不会退出,就会被永远的挂在那里。

而且,我们仔细看被 spawn 的函数:

let amt = listen.incoming().for_each(...);

这其中,for_each 其实没干别的,就是将本来的 Stream 转化成了 Future。这里必须要说明一下,以防有人有疑问。

我们仔细想想这个过程,Runtime::block_on 在所有可以跑的 Future 之间来回调用,所以任何一个 Future 一旦陷入了阻塞,其实其所在的线程也就陷入了阻塞,进而阻塞了整个 eventloop。这个当然是不被允许的,因此,传统的 epoll 模型在遇到文件 I/O 这种容易阻塞的操作的时候都是起一个后台线程来接管所有的 I/O 操作的。

高生产力的用法

上面我们分析了一个典型的 Future 启动流程,然后我们就可以开始编写自己的代码了。

对于任意一个 TcpStream , Rust 都为它实现了 AscynRead/AsyncWrite 接口,我们可以通过这两个接口,来实现自己的对 tcp 数据包的操作。但是,这个接口在使用的时候总会让人感觉不是很方便。因为你还要自己管理缓冲区之类的,太困难。于是,tokio 为你提供了更上一层的抽象:tokio-codec。

我们分析一下一个服务器软件的处理流程大致可以化为:

Tcp数据流入 -> decode -> 进行处理 -> 构建返回 -> encode -> Tcp数据写回

而 tokio-codec 为你专门提供了两个trait来进行 encode、decode,没错,他们就是 Decoder Encoder

pub trait Decoder {
    type Item;
    type Error: From<Error>;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
}

pub trait Encoder {
    type Item;
    type Error: From<Error>;
    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
}

这里要稍微提一下 BytesMut,这是 tokio 的标配的 buffer 管理库,据说性能比较高。

这里,tokio 已经帮你写好了 Decoder 和 Encoder 的方法,我们要做的,只是去读取 src 并解析,然后把响应写入 Encoder 的 dst 里即可。

我们来看一下典型用法:

let codec = CmdCodec::default();
let (cmd_tx, resp_rx) = codec.framed(sock).split();

我们通过 CmdCodec 声明了一个 Codec 类型,然后把 sock 进行 framed 化,然后 split 成为上下行的两个结构体。

很自然的, cmd_tx 关联了 Readble 事件, 是一个 Stream 类型;

resp_rx 则也响应的关联了 Writable 事件,是一个 Sink 类型。

这样一来,我们的 tokio 代码几乎只需要实现 Codec 即可,其他部分可以说是完全的业务逻辑就是了。

Stream 组合

前面说到,resp_rx 是一个 Sink 类型。那么 Sink 又是什么呢?

照例我们来看定义:

pub trait Sink {
    type SinkItem;
    type SinkError;
    fn start_send(&mut self, item: Self::SinkItem)
        -> Result<AsyncSink<Self::SinkItem>, Self::SinkError>;

    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError>;
    fn close(&mut self) -> Result<Async<()>, Self::SinkError> { ... }
}

这里我们知道了, 如果把 Stream 视为一个连续不断读取消息的流,那么 Sink 就是一个不断写入消息的入口。Stream 可以将自己的消息写入对应的 Sinkl 里。同时, Sink 也能主动的读取 Stream 的消息直到读完。

这就是 Stream::forward 和 Sink::send_all 所表达的两种语义,当然,最终的行为是一致的。

与此同时我们发现, Sink 的 trait 中,定义了 start_sendpoll_complete 这两个方法。我们从语义上可以知道,start_send 表示发送你的数据进入队列,而 poll_complete 则表示将这个队列 flush 掉。

为什么要这样设计呢?

因为我们知道,tokio 实质上是一个 Future 之间互相唤醒的模型,在大多数情况下,start_send 仅仅保证把数据写入到指定的内存位置,但是并不保证一定会唤醒处理线程。所以,我们需要再每次确认需要唤醒的时候,人工去唤醒一下对应的处理线程, 这也就是 poll_complete 做的事儿。

Future 之间通信

为了我们方便使用,futures 默认提供了一系列 Future 之间通信组件:unsync、sync 两个包。顾名思义,unsync 被用于单线程内通信,sync则被用于多线程之间通信。因为 unsync 可以被视为 sync 的同步版本,我们可以略过 unsync 包直接讲述 sync 包里的内容。

BiLock

BiLock 是 sync 包里提供的一个同步原语,其表现上和传统的 std::sync::Mutex 上一致,不同点在于:

  1. 只能在两个 Future 之间互相锁定
  2. 实现了 poll 语义

也就是说,这个 lock 本身也是可以提供 poll_lock 接口的。甚至使用上也和正常的 lock 差别不大。

oneshot/mpsc

futures 除了提供了一套 lock 之外,还提供了一个多写单读队列 mpsc ,而 oneshot 则是这个队列的单发版本,不多赘述。

channel 其实是一个 Sender、Receiver 互相唤醒的原子队列,也正是因为这个互相唤醒开销,不能把 channel 和传统的无锁队列进行对比。但是,它本身也足够快,不用担心性能问题。

同时,Sender 实现了 Sink trait,Receiver 实现了 Stream trait, 这样一来, channel 甚至可以无缝嵌入到

在 aster 里也用了很多 mpsc 用来做 Future 之间的交流。我们来看下例子:

# https://github.com/wayslog/aster/blob/master/src/proxy/mod.rs#L248-L277
fn create_conn(node: &str) -> AsResult<Sender<T>> {
    let node_addr = node.to_string();
    let (tx, rx) = channel(1024 * 8);
    let ret_tx = tx.clone();
    let amt = lazy(|| -> Result<(), ()> { Ok(()) })
        .and_then(move |_| {
            node_addr
                .as_str()
                .parse()
                .map_err(|err| error!("fail to parse addr {:?}", err))
        })
        .and_then(|addr| {
            TcpStream::connect(&addr).map_err(|err| error!("fail to connect {:?}", err))
        })
        .and_then(|sock| {
            sock.set_nodelay(true).expect("set nodelay must ok");
            let codec = <T as Request>::node_codec();
            let (sink, stream) = codec.framed(sock).split();
            let arx = rx.map_err(|err| {
                info!("fail to send due to {:?}", err);
                Error::Critical
            });
            let (nd, nr) = spawn_node(arx, sink, stream);
            current_thread::spawn(nd);
            current_thread::spawn(nr);
            Ok(())
        });
    current_thread::spawn(amt);
    Ok(ret_tx)
}

这里其实只是开启了一个后端连接,然后,返回 Sender<T> 给 Proxy 结构体使用。

关于开发模型

上面说了 futures 的几个关键接口,那么 tokio 是怎么和 Future 关联起来的呢 ?

答案是 runtime 。

tokio 默认提供了两种 runtime, 一个是 tokio::runtime::Runtime ,另一个是 tokio::runtime::current_thread::Runtime。 其中,前者表示一个默认占满所有 thread 的线程池,后者表示在当前线程跑一个event_loop。

这里,就涉及到两个模型了:

  1. 1 eventloop thread + N worker threads
  2. each thread has 1 eventloop + reuseport

第一个模型非常适合 CPU 计算需求比较高的程序,例如某个在线 ML 程序。第二个模型,适合 I/O 超级密集的应用,例如我们的缓存代理。

这中间还有另一个引申出来的问题就是:采用第二种模型,你只需要写单线程代码,不需要考虑复杂的多线程逻辑和代码。

聊聊 coroutine

上面我们已经初步了解了 Rust 的高性能网络程序的写法。但是,人都是不满足的,有人觉得 Future 模型有着其固有的模型缺陷。那就是无法保存上下文状态。 每一个 Future 必须重入式的进行回溯,自己保存自己的状态。这显然增大了编程的难度。

复杂到 aster 甚至用了 4.2 k 行代码才实现了 overlord(Go版本)1W 行实现的功能(作者预期2k行左右能实现相同功能)。

为此,Rust 社区专门引入了 async/await 的功能, 其功能与 python、C# 中的语义一样,甚至实现方式都类似。你可以把 async 的函数,视为一个简单的无栈 coroutine,它带着 resume 的功能,也就意味着,你可以在适合的时候,让出 CPU,让调度器调度寻找可以执行的 coroutine 。

那么这种实现有什么问题呢?

  1. 因为是无栈的 coroutine,因此不能简单的递归自己 (现在这种 closure 实现看起来也真的不能递归自己)
  2. 因为无栈的特性,Rust 需要为这种语法专门配套一个 yield 关键字,来生成一个 generator。但是,这个 generator 注定了是不能被 move 的。也就是说,无栈协程依赖于当前栈,一旦被转移或者当前栈被破坏,则当前协程失效。
  3. 写起来需要注意。因为有栈协程需要关注的是什么时候开新的栈,而无栈协程关注的是什么时候应该放弃CPU。二者的关注点不一样,相应的,无栈协程的入门难度稍微大点。

而相对的,有失必有得,无栈协程比起有栈协程有着难以企及的性能优势。不要以为 copy 栈是一件开销很小的事儿,因为同一时刻在上下文中调度的栈太多太细了,copy 栈的开销将被无限放大。

当然了,这么多年,从 C# 开始的 async/await 模型,在编程难度上已经基本上和 stackfullness corountine 基本持平了。我们不需要考虑太多。

而 Rust,也在支持了 async/await 的同时由船娘(withoutboats)开发了一个实验性的项目 —— romio。下面是它的demo:

#![feature(async_await, await_macro, futures_api)]

use std::io;

use futures::executor;
use futures::io::{AsyncReadExt, AllowStdIo};

use romio::TcpStream;

fn main() -> io::Result<()> {
    executor::block_on(async {
        let mut stream = await!(TcpStream::connect(&"127.0.0.1:7878".parse().unwrap()))?;
        let mut stdout = AllowStdIo::new(io::stdout());
        await!(stream.copy_into(&mut stdout))?;
        Ok(())
    })
}

不过上面的项目还处于试验阶段,现阶段进行高性能服务端程序开发,tokio 还是必不可少的。

结语

目前,使用 Rust 进行高性能网络服务开发是不是那么便利。

但是我们有里有相信,在 async/await 完成之后, Rust 在高性能中间件开发领域,必然会有更加长远的进步和爆发。