DEV Community

x1957
x1957

Posted on

Protobuf log

我有一些数据是mq定时推送的,现在要在他们基础上做模型,之前都是收到就过了,没存过。
推送过来的数据是protobuf序列化过的,想也没啥必要反序列化了再打log。

方案1:
直接log,
pb1pb2pb3pb4pb5....pbn
最后发现,解析不了,开始想法是美好的,毕竟就那么几个field,读了就好了呀,但是后来发现因为pb可能会处理optional的字段,那就不知道一个message有多长啦。

方案2:
加换行符
pb1\n
pb2\n
pb3\n
...
pbn\n
这里就是自己傻逼了,pb里面可能会包含换行符等乱七八糟的东西(毕竟binary),所以不可行。

方案3:
按照之前tcp发送数据包那样的写法,在头部放个length
length1_pb1 length2_pb2 ... lengthn_pbn
这样我们解析的时候就先读有多少个bytes,然后拿这些bytes来反序列化pb messsage。

并且我们length最要也用pb编码,免得处理不一致

c.log.Write(proto.EncodeVarint(uint64(len(bs))))
c.log.Write(bs)

然后这边我们处理数据用的Rust

protobuf 这个库看用的人最多,并且在他的 CodedInputStream read_message正好就是处理我们这种结构的数据的.

    pub fn read_message<M : Message>(&mut self) -> ProtobufResult<M> {
        let mut r: M = Message::new();
        self.merge_message(&mut r)?;
        r.check_initialized()?;
        Ok(r)
    }

Rust处理pb的代码如下

mod message;
mod file;

#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;

use slog::Drain;

use clap::{App, Arg};
use message::Message;
use protobuf::CodedInputStream;
use std::fs;
use std::io::BufReader;

fn vist_dir(path: &str) {}

fn main() -> Result<(), Box<std::error::Error>> {
    let matches = App::new("pig data process")
        .version(env!("CARGO_PKG_VERSION"))
        .author(env!("CARGO_PKG_AUTHORS"))
        .arg(
            Arg::with_name("dir")
                .long("dir")
                .short("d")
                .help("dir of data")
                .value_name("DIR")
                .takes_value(true),
        )
        .get_matches();

    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::FullFormat::new(decorator).build().fuse();
    let drain = slog_async::Async::new(drain).build().fuse();
    let log = slog::Logger::root(drain, o!());

    let dir = matches.value_of("dir").expect("need --dir parameter");
    info!(log, "dir = {}", dir);

    let mut files = file::read_dir(dir);
    files.sort_by(|a, b| {
        a.file_name()
            .into_string()
            .unwrap()
            .cmp(&b.file_name().into_string().unwrap())
    });
    info!(log, "{:?}", files);
    let mut cnt = 0;
    for file in &files {
        let fd = fs::File::open(file.path()).unwrap();
        info!(log, "read {}", file.file_name().into_string().unwrap());
        let mut buffer = BufReader::new(fd);
        let mut is = CodedInputStream::new(&mut buffer);
        while let Ok(message) = is.read_message::<Message>(){
//            info!(log, "{:?}", message);
            cnt+=1;
        }
    }
    info!(log, "total = {}", cnt);
    Ok(())
}

Top comments (0)