我有一些数据是mq定时推送的,现在要在他们基础上做模型,之前都是收到就过了,没存过。
推送过来的数据是protobuf序列化过的,想也没啥必要反序列化了再打log。
方案1:
直接log,
pb1pb2pb3pb4pb5....pbn
最后发现,解析不了,开始想法是美好的,毕竟就那么几个field,读了就好了呀,但是后来发现因为pb可能会处理optional的字段,那就不知道一个message有多长啦。
方案2:
加换行符
pb1\n
pb2\n
pb3\n
...
pbn\n
这里就是自己傻逼了,pb里面可能会包含换行符等乱七八糟的东西(毕竟binary),所以不可行。
方案3:
按照之前tcp发送数据包那样的写法,在头部放个length
length1_pb1 length2_pb2 ... lengthn_pbn
这样我们解析的时候就先读有多少个bytes,然后拿这些bytes来反序列化pb messsage。
并且我们length最要也用pb编码,免得处理不一致
c.log.Write(proto.EncodeVarint(uint64(len(bs))))
c.log.Write(bs)
然后这边我们处理数据用的Rust
protobuf 这个库看用的人最多,并且在他的 CodedInputStream
read_message
正好就是处理我们这种结构的数据的.
pub fn read_message<M : Message>(&mut self) -> ProtobufResult<M> {
let mut r: M = Message::new();
self.merge_message(&mut r)?;
r.check_initialized()?;
Ok(r)
}
Rust处理pb的代码如下
mod message;
mod file;
#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;
use slog::Drain;
use clap::{App, Arg};
use message::Message;
use protobuf::CodedInputStream;
use std::fs;
use std::io::BufReader;
fn vist_dir(path: &str) {}
fn main() -> Result<(), Box<std::error::Error>> {
let matches = App::new("pig data process")
.version(env!("CARGO_PKG_VERSION"))
.author(env!("CARGO_PKG_AUTHORS"))
.arg(
Arg::with_name("dir")
.long("dir")
.short("d")
.help("dir of data")
.value_name("DIR")
.takes_value(true),
)
.get_matches();
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let log = slog::Logger::root(drain, o!());
let dir = matches.value_of("dir").expect("need --dir parameter");
info!(log, "dir = {}", dir);
let mut files = file::read_dir(dir);
files.sort_by(|a, b| {
a.file_name()
.into_string()
.unwrap()
.cmp(&b.file_name().into_string().unwrap())
});
info!(log, "{:?}", files);
let mut cnt = 0;
for file in &files {
let fd = fs::File::open(file.path()).unwrap();
info!(log, "read {}", file.file_name().into_string().unwrap());
let mut buffer = BufReader::new(fd);
let mut is = CodedInputStream::new(&mut buffer);
while let Ok(message) = is.read_message::<Message>(){
// info!(log, "{:?}", message);
cnt+=1;
}
}
info!(log, "total = {}", cnt);
Ok(())
}
Top comments (0)