前言
最近项目中需要使用到一个消息队列,主要用来将原来一些操作异步化。根据自己的使用场景和熟悉程度,选择了NATS Streaming。之所以,选择NATS Streaming。一,因为我选型一些中间件,我会优先选取一些自己熟悉的语言编写的,这样方便排查问题和进一步的深究。二,因为自己一直做k8s等云原生这块,偏向于cncf基金会管理的项目,毕竟这些项目从一开始就考虑了如何部署在k8s当中。三,是评估项目在不断发展过程中,引入的组件是否能够依旧满足需求。
消息队列的使用场景
如果问为什么这么做,需要说一下消息队列的使用场景。之前看知乎的时候,看到一些回答比较认同,暂时拿过来,更能形象表达。感谢同学的精彩解答。
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。
使用场景的话,举个例子:
假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:
- 校验用户名等信息,如果没问题会在数据库中添加一个用户记录
- 如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信
- 分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他
- 发送给用户一个包含操作指南的系统通知等等……
但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。
或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。
所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。
其实,总结一下消息队列的作用
- 削峰,形象点的话,可以比喻为蓄水池。比如elk日志收集系统中的kafka,主要在日志高峰期的时候,在牺牲实时性的同时,保证了整个系统的安全。
- 同步系统异构化。原先一个同步操作里的诸多步骤,可以考虑将一些不影响主线发展的步骤,通过消息队列异步处理。比如,电商行业,一个订单完成之后,一般除了直接返回给客户购买成功的消息,还要通知账户组进行扣费,通知处理库存变化,通知物流进行派送等,通知一些用户组做一些增加会员积分等操作等。
NATS Streaming 简介
NATS Streaming是一个由NATS驱动的数据流系统,用Go编程语言编写。 NATS Streaming服务器的可执行文件名是。 NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作。 NATS Streaming服务器作为Apache-2.0许可下的开源软件提供。 Synadia积极维护和支持NATS Streaming服务器。
特点
除了核心NATS平台的功能外,NATS Streaming还提供以下功能:
- 增强消息协议
NATS Streaming使用谷歌协议缓冲区实现自己的增强型消息格式。这些消息通过二进制数据流在NATS核心平台进行传播,因此不需要改变NATS的基本协议。NATS Streaming信息包含以下字段:
- 序列 - 一个全局顺序序列号为主题的通道 - 主题 - 是NATS Streaming 交付对象 - 答复内容 - 对应"reply-to"对应的对象内容 - 数据 - 真是数据内容 - 时间戳 - 接收的时间戳,单位是纳秒 - 重复发送 - 标志这条数据是否需要服务再次发送 - CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通讯领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法
- 消息/事件的持久性
NATS Streaming提供了可配置的消息持久化,持久目的地可以为内存或者文件。另外,对应的存储子系统使用了一个公共接口允许我们开发自己自定义实现来持久化对应的消息- 至少一次的发送
NATS Streaming提供了发布者和服务器之间的消息确认(发布操作) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其他外部存储器)用来为需要重新接受消息的订阅者进行重发消息。- 发布者发送速率限定
NATS Streaming提供了一个连接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任何时候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。- 每个订阅者的速率匹配/限制
NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止- 以主题重发的历史数据
新订阅的可以在已经存储起来的订阅的主题频道指定起始位置消息流。通过使用这个选项,消息就可以开始发送传递了:
1. 订阅的主题存储的最早的信息 2. 与当前订阅主题之前的最近存储的数据,这通常被认为是 "最后的值" 或 "初值" 对应的缓存 3. 一个以纳秒为基准的 日期/时间 4. 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒 5. 一个特定的消息序列号
- 持久订阅
订阅也可以指定一个“持久化的名称”可以在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者重新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最早的未被确认的消息处恢复。
docker 运行NATS Streaming
在运行之前,前面已经讲过NATS Streaming 相比nats,多了持久化的一个future。所以我们在接下来的demo演示中,会重点说这点。
运行基于memory的持久化示例:
docker run -ti -p 4222:4222 -p 8222:8222 nats-streaming:0.12.0
你将会看到如下的输出:
[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set][4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222[1] 2019/02/26 08:13:01.770581 [INF] Server is ready[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels: 100 *[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------[1] 2019/02/26 08:13:02.052601 [INF] STREAM: Subscriptions: 1000 *[1] 2019/02/26 08:13:02.052613 [INF] STREAM: Messages : 1000000 *[1] 2019/02/26 08:13:02.052624 [INF] STREAM: Bytes : 976.56 MB *[1] 2019/02/26 08:13:02.052635 [INF] STREAM: Age : unlimited *[1] 2019/02/26 08:13:02.052649 [INF] STREAM: Inactivity : unlimited *[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------
可以看出默认的是基于内存的持久化。
运行基于file的持久化示例:
docker run -ti -v /Users/gao/test/mq:/datastore -p 4222:4222 -p 8222:8222 nats-streaming:0.12.0 -store file --dir /datastore -m 8222
你将会看到如下的输出:
[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set][5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222[1] 2019/02/26 08:16:07.643932 [INF] Server is ready[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels: 100 *[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------[1] 2019/02/26 08:16:07.933711 [INF] STREAM: Subscriptions: 1000 *[1] 2019/02/26 08:16:07.933749 [INF] STREAM: Messages : 1000000 *[1] 2019/02/26 08:16:07.933793 [INF] STREAM: Bytes : 976.56 MB *[1] 2019/02/26 08:16:07.933837 [INF] STREAM: Age : unlimited *[1] 2019/02/26 08:16:07.933857 [INF] STREAM: Inactivity : unlimited *[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------
PS
- 如果部署在k8s当中,那么就可以采取基于file的持久化,通过挂载一个块存储来保证,数据可靠。比如,aws的ebs或是ceph的rbd。
- 4222为客户端连接的端口。8222为监控端口。
启动以后访问:localhost:8222,可以看到如下的网页:
启动参数解析
Streaming Server Options: -cid, --cluster_idCluster ID (default: test-cluster) -st, --store Store type: MEMORY|FILE|SQL (default: MEMORY) --dir For FILE store type, this is the root directory -mc, --max_channels Max number of channels (0 for unlimited) -msu, --max_subs Max number of subscriptions per channel (0 for unlimited) -mm, --max_msgs Max number of messages per channel (0 for unlimited) -mb, --max_bytes Max messages total size per channel (0 for unlimited) -ma, --max_age Max duration a message can be stored ("0s" for unlimited) -mi, --max_inactivity Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited) -ns, --nats_server Connect to this external NATS Server URL (embedded otherwise) -sc, --stan_config Streaming server configuration file -hbi, --hb_interval Interval at which server sends heartbeat to a client -hbt, --hb_timeout How long server waits for a heartbeat response -hbf, --hb_fail_count Number of failed heartbeats before server closes the client connection --ft_group Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore -sl, --signal [= ] Send signal to nats-streaming-server process (stop, quit, reopen) --encrypt Specify if server should use encryption at rest --encryption_cipher Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES --encryption_key Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable insteadStreaming Server Clustering Options: --clustered Run the server in a clustered configuration (default: false) --cluster_node_id ID of the node within the cluster if there is no stored ID (default: random UUID) --cluster_bootstrap Bootstrap the cluster if there is no existing state by electing self as leader (default: false) --cluster_peers List of cluster peer node IDs to bootstrap cluster state. --cluster_log_path Directory to store log replication data --cluster_log_cache_size Number of log entries to cache in memory to reduce disk IO (default: 512) --cluster_log_snapshots Number of log snapshots to retain (default: 2) --cluster_trailing_logs Number of log entries to leave after a snapshot and compaction --cluster_sync Do a file sync after every write to the replication log and message store --cluster_raft_logging Enable logging from the Raft library (disabled by default)Streaming Server File Store Options: --file_compact_enabled Enable file compaction --file_compact_frag File fragmentation threshold for compaction --file_compact_interval Minimum interval (in seconds) between file compactions --file_compact_min_size Minimum file size for compaction --file_buffer_size File buffer size (in bytes) --file_crc Enable file CRC-32 checksum --file_crc_poly Polynomial used to make the table used for CRC-32 checksum --file_sync Enable File.Sync on Flush --file_slice_max_msgs Maximum number of messages per file slice (subject to channel limits) --file_slice_max_bytes Maximum file slice size - including index file (subject to channel limits) --file_slice_max_age Maximum file slice duration starting when the first message is stored (subject to channel limits) --file_slice_archive_script Path to script to use if you want to archive a file slice being removed --file_fds_limit Store will try to use no more file descriptors than this given limit --file_parallel_recovery On startup, number of channels that can be recovered in parallel --file_truncate_bad_eof Truncate files for which there is an unexpected EOF on recovery, dataloss may occurStreaming Server SQL Store Options: --sql_driver Name of the SQL Driver ("mysql" or "postgres") --sql_source Datasource used when opening an SQL connection to the database --sql_no_caching Enable/Disable caching for improved performance --sql_max_open_conns Maximum number of opened connections to the databaseStreaming Server TLS Options: -secure Use a TLS connection to the NATS server without verification; weaker than specifying certificates. -tls_client_key Client key for the streaming server -tls_client_cert Client certificate for the streaming server -tls_client_cacert Client certificate CA for the streaming serverStreaming Server Logging Options: -SD, --stan_debug= Enable STAN debugging output -SV, --stan_trace= Trace the raw STAN protocol -SDV Debug and trace STAN --syslog_name On Windows, when running several servers as a service, use this name for the event source (See additional NATS logging options below)Embedded NATS Server Options: -a, --addr Bind to host address (default: 0.0.0.0) -p, --port Use port for clients (default: 4222) -P, --pid File to store PID -m, --http_port Use port for http monitoring -ms,--https_port Use port for https monitoring -c, --config Configuration fileLogging Options: -l, --log File to redirect log output -T, --logtime= Timestamp log entries (default: true) -s, --syslog Enable syslog as log method -r, --remote_syslog Syslog server addr (udp://localhost:514) -D, --debug= Enable debugging output -V, --trace= Trace the raw protocol -DV Debug and traceAuthorization Options: --user User required for connections --pass Password required for connections --auth Authorization token required for connectionsTLS Options: --tls= Enable TLS, do not verify clients (default: false) --tlscert Server certificate file --tlskey Private key for server certificate --tlsverify= Enable TLS, verify client certificates --tlscacert Client certificate CA for verificationNATS Clustering Options: --routes Routes to solicit and connect --cluster Cluster URL for solicited routesCommon Options: -h, --help Show this message -v, --version Show version --help_tls TLS help.
源码简单分析NATS Streaming 持久化
目前NATS Streaming支持以下4种持久化方式:
- MEMORY
- FILE
- SQL
- RAFT
其实看源码可以知道:NATS Streaming的基于接口实现,很容易扩展到更多的持久化方式。具体的接口如下:
// Store is the storage interface for NATS Streaming servers.//// If an implementation has a Store constructor with StoreLimits, it should be// noted that the limits don't apply to any state being recovered, for Store// implementations supporting recovery.//type Store interface { // GetExclusiveLock is an advisory lock to prevent concurrent // access to the store from multiple instances. // This is not to protect individual API calls, instead, it // is meant to protect the store for the entire duration the // store is being used. This is why there is no `Unlock` API. // The lock should be released when the store is closed. // // If an exclusive lock can be immediately acquired (that is, // it should not block waiting for the lock to be acquired), // this call will return `true` with no error. Once a store // instance has acquired an exclusive lock, calling this // function has no effect and `true` with no error will again // be returned. // // If the lock cannot be acquired, this call will return // `false` with no error: the caller can try again later. // // If, however, the lock cannot be acquired due to a fatal // error, this call should return `false` and the error. // // It is important to note that the implementation should // make an effort to distinguish error conditions deemed // fatal (and therefore trying again would invariably result // in the same error) and those deemed transient, in which // case no error should be returned to indicate that the // caller could try later. // // Implementations that do not support exclusive locks should // return `false` and `ErrNotSupported`. GetExclusiveLock() (bool, error) // Init can be used to initialize the store with server's information. Init(info *spb.ServerInfo) error // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...). Name() string // Recover returns the recovered state. // Implementations that do not persist state and therefore cannot // recover from a previous run MUST return nil, not an error. // However, an error must be returned for implementations that are // attempting to recover the state but fail to do so. Recover() (*RecoveredState, error) // SetLimits sets limits for this store. The action is not expected // to be retroactive. // The store implementation should make a deep copy as to not change // the content of the structure passed by the caller. // This call may return an error due to limits validation errors. SetLimits(limits *StoreLimits) error // GetChannelLimits returns the limit for this channel. If the channel // does not exist, returns nil. GetChannelLimits(name string) *ChannelLimits // CreateChannel creates a Channel. // Implementations should return ErrAlreadyExists if the channel was // already created. // Limits defined for this channel in StoreLimits.PeChannel map, if present, // will apply. Otherwise, the global limits in StoreLimits will apply. CreateChannel(channel string) (*Channel, error) // DeleteChannel deletes a Channel. // Implementations should make sure that if no error is returned, the // channel would not be recovered after a restart, unless CreateChannel() // with the same channel is invoked. // If processing is expecting to be time consuming, work should be done // in the background as long as the above condition is guaranteed. // It is also acceptable for an implementation to have CreateChannel() // return an error if background deletion is still happening for a // channel of the same name. DeleteChannel(channel string) error // AddClient stores information about the client identified by `clientID`. AddClient(info *spb.ClientInfo) (*Client, error) // DeleteClient removes the client identified by `clientID` from the store. DeleteClient(clientID string) error // Close closes this store (including all MsgStore and SubStore). // If an exclusive lock was acquired, the lock shall be released. Close() error}
官方也提供了mysql和pgsql两种数据的支持:
postgres.db.sql
CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));CREATE INDEX Idx_ChannelsName ON Channels (name(256));CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);-- Updates for 0.10.0ALTER TABLE Clients ADD proto BYTEA;
mysql.db.sql
CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);# Updates for 0.10.0ALTER TABLE Clients ADD proto BLOB;
总结
后续会详细解读一下代码实现和一些集群部署。当然肯定少不了如何部署高可用的集群在k8s当中。
参阅文章: