DEV Community

x1957
x1957

Posted on

3 1

Protobuf log

我有一些数据是mq定时推送的,现在要在他们基础上做模型,之前都是收到就过了,没存过。
推送过来的数据是protobuf序列化过的,想也没啥必要反序列化了再打log。

方案1:
直接log,
pb1pb2pb3pb4pb5....pbn
最后发现,解析不了,开始想法是美好的,毕竟就那么几个field,读了就好了呀,但是后来发现因为pb可能会处理optional的字段,那就不知道一个message有多长啦。

方案2:
加换行符
pb1\n
pb2\n
pb3\n
...
pbn\n
这里就是自己傻逼了,pb里面可能会包含换行符等乱七八糟的东西(毕竟binary),所以不可行。

方案3:
按照之前tcp发送数据包那样的写法,在头部放个length
length1_pb1 length2_pb2 ... lengthn_pbn
这样我们解析的时候就先读有多少个bytes,然后拿这些bytes来反序列化pb messsage。

并且我们length最要也用pb编码,免得处理不一致

c.log.Write(proto.EncodeVarint(uint64(len(bs))))
c.log.Write(bs)

然后这边我们处理数据用的Rust

protobuf 这个库看用的人最多,并且在他的 CodedInputStream read_message正好就是处理我们这种结构的数据的.

    pub fn read_message<M : Message>(&mut self) -> ProtobufResult<M> {
        let mut r: M = Message::new();
        self.merge_message(&mut r)?;
        r.check_initialized()?;
        Ok(r)
    }

Rust处理pb的代码如下

mod message;
mod file;

#[macro_use]
extern crate slog;
extern crate slog_async;
extern crate slog_term;

use slog::Drain;

use clap::{App, Arg};
use message::Message;
use protobuf::CodedInputStream;
use std::fs;
use std::io::BufReader;

fn vist_dir(path: &str) {}

fn main() -> Result<(), Box<std::error::Error>> {
    let matches = App::new("pig data process")
        .version(env!("CARGO_PKG_VERSION"))
        .author(env!("CARGO_PKG_AUTHORS"))
        .arg(
            Arg::with_name("dir")
                .long("dir")
                .short("d")
                .help("dir of data")
                .value_name("DIR")
                .takes_value(true),
        )
        .get_matches();

    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::FullFormat::new(decorator).build().fuse();
    let drain = slog_async::Async::new(drain).build().fuse();
    let log = slog::Logger::root(drain, o!());

    let dir = matches.value_of("dir").expect("need --dir parameter");
    info!(log, "dir = {}", dir);

    let mut files = file::read_dir(dir);
    files.sort_by(|a, b| {
        a.file_name()
            .into_string()
            .unwrap()
            .cmp(&b.file_name().into_string().unwrap())
    });
    info!(log, "{:?}", files);
    let mut cnt = 0;
    for file in &files {
        let fd = fs::File::open(file.path()).unwrap();
        info!(log, "read {}", file.file_name().into_string().unwrap());
        let mut buffer = BufReader::new(fd);
        let mut is = CodedInputStream::new(&mut buffer);
        while let Ok(message) = is.read_message::<Message>(){
//            info!(log, "{:?}", message);
            cnt+=1;
        }
    }
    info!(log, "total = {}", cnt);
    Ok(())
}

Quadratic AI

Quadratic AI – The Spreadsheet with AI, Code, and Connections

  • AI-Powered Insights: Ask questions in plain English and get instant visualizations
  • Multi-Language Support: Seamlessly switch between Python, SQL, and JavaScript in one workspace
  • Zero Setup Required: Connect to databases or drag-and-drop files straight from your browser
  • Live Collaboration: Work together in real-time, no matter where your team is located
  • Beyond Formulas: Tackle complex analysis that traditional spreadsheets can't handle

Get started for free.

Watch The Demo 📊✨

Top comments (0)

Image of Checkly

4 Playwright Locators Explained: Which One Should You Use?

- locator('.cta'): Fast but brittle
- getByText('Click me'): User-facing, but can miss broken accessibility
- getByRole('button', { name: 'Click me' }): Most robust, best for a11y
- getByTestId('cta-button'): Stable, but ignores UX

Watch video

👋 Kindness is contagious

Engage with a wealth of insights in this thoughtful article, valued within the supportive DEV Community. Coders of every background are welcome to join in and add to our collective wisdom.

A sincere "thank you" often brightens someone’s day. Share your gratitude in the comments below!

On DEV, the act of sharing knowledge eases our journey and fortifies our community ties. Found value in this? A quick thank you to the author can make a significant impact.

Okay