DEV Community

x1957
x1957

Posted on

3 1

Protobuf log

我有一些数据是mq定时推送的,现在要在他们基础上做模型,之前都是收到就过了,没存过。
推送过来的数据是protobuf序列化过的,想也没啥必要反序列化了再打log。

方案1:
直接log,
pb1pb2pb3pb4pb5....pbn
最后发现,解析不了,开始想法是美好的,毕竟就那么几个field,读了就好了呀,但是后来发现因为pb可能会处理optional的字段,那就不知道一个message有多长啦。

方案2:
加换行符
pb1\n
pb2\n
pb3\n
...
pbn\n
这里就是自己傻逼了,pb里面可能会包含换行符等乱七八糟的东西(毕竟binary),所以不可行。

方案3:
按照之前tcp发送数据包那样的写法,在头部放个length
length1_pb1 length2_pb2 ... lengthn_pbn
这样我们解析的时候就先读有多少个bytes,然后拿这些bytes来反序列化pb messsage。

并且我们length最要也用pb编码,免得处理不一致

c.log.Write(proto.EncodeVarint(uint64(len(bs))))
c.log.Write(bs)

然后这边我们处理数据用的Rust

protobuf 这个库看用的人最多,并且在他的 CodedInputStream read_message正好就是处理我们这种结构的数据的.

    pub fn read_message<M : Message>(&mut self) -> ProtobufResult<M> {
        let mut r: M = Message::new();
        self.merge_message(&mut r)?;
        r.check_initialized()?;
        Ok(r)
    }

Rust处理pb的代码如下

mod message;
mod file;

#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;

use slog::Drain;

use clap::{App, Arg};
use message::Message;
use protobuf::CodedInputStream;
use std::fs;
use std::io::BufReader;

fn vist_dir(path: &str) {}

fn main() -> Result<(), Box<std::error::Error>> {
    let matches = App::new("pig data process")
        .version(env!("CARGO_PKG_VERSION"))
        .author(env!("CARGO_PKG_AUTHORS"))
        .arg(
            Arg::with_name("dir")
                .long("dir")
                .short("d")
                .help("dir of data")
                .value_name("DIR")
                .takes_value(true),
        )
        .get_matches();

    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::FullFormat::new(decorator).build().fuse();
    let drain = slog_async::Async::new(drain).build().fuse();
    let log = slog::Logger::root(drain, o!());

    let dir = matches.value_of("dir").expect("need --dir parameter");
    info!(log, "dir = {}", dir);

    let mut files = file::read_dir(dir);
    files.sort_by(|a, b| {
        a.file_name()
            .into_string()
            .unwrap()
            .cmp(&b.file_name().into_string().unwrap())
    });
    info!(log, "{:?}", files);
    let mut cnt = 0;
    for file in &files {
        let fd = fs::File::open(file.path()).unwrap();
        info!(log, "read {}", file.file_name().into_string().unwrap());
        let mut buffer = BufReader::new(fd);
        let mut is = CodedInputStream::new(&mut buffer);
        while let Ok(message) = is.read_message::<Message>(){
//            info!(log, "{:?}", message);
            cnt+=1;
        }
    }
    info!(log, "total = {}", cnt);
    Ok(())
}

Heroku

Simplify your DevOps and maximize your time.

Since 2007, Heroku has been the go-to platform for developers as it monitors uptime, performance, and infrastructure concerns, allowing you to focus on writing code.

Learn More

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay