Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract file header from stream instead of physical file #396

Open
kri5t opened this issue Jun 11, 2022 · 2 comments
Open

Extract file header from stream instead of physical file #396

kri5t opened this issue Jun 11, 2022 · 2 comments

Comments

@kri5t
Copy link

kri5t commented Jun 11, 2022

I've been looking through your library and I found the extractFileHeader and it works great.
My only issue is that we are running in a cloud environment and dealing with rather large avro files (48gb).
Having to download that file onto the docker image and inspect it is rather in efficient.

I've been trying to modify your method to allow me to take a Readable instead of the actual path but it turns out too many inner methods are being used in the extractFileHeader for it to be feasible. I've done something like this:

import { Injectable } from '@nestjs/common';
import * as avro from 'avsc';
import { Readable } from 'stream';

@Injectable()
export class AvroSchemaFileExtractorService {
  MAGIC_BYTES;
  HEADER_TYPE;

  constructor() {
    this.MAGIC_BYTES = Buffer.from('Obj\x01');
    const OPTS = { namespace: 'org.apache.avro.file' };
    const MAP_BYTES_TYPE = avro.Type.forSchema({ type: 'map', values: 'bytes' }, OPTS);
    this.HEADER_TYPE = avro.Type.forSchema(
      {
        name: 'Header',
        type: 'record',
        fields: [
          { name: 'magic', type: { type: 'fixed', name: 'Magic', size: 4 } },
          { name: 'meta', type: MAP_BYTES_TYPE },
          { name: 'sync', type: { type: 'fixed', name: 'Sync', size: 16 } },
        ],
      },
      OPTS,
    );
  }

  async get(fileStream: Readable, opts: any = {}): Promise<string | null> {
    // const decode = opts.decode === undefined ? true : !!opts.decode;
    const size = Math.max(opts.size || 4096, 4);
    let buf = Buffer.alloc(size);
    for await (const chunk of fileStream) {
      if (chunk.length > size) {
        buf = chunk;
        break;
      }
    }
    try {
      if (buf.length < 4 || !this.MAGIC_BYTES.equals(buf.slice(0, 4))) {
        return null;
      }

      // Here it starts to break down.
      const tap = new (avro as any).utils.Tap(buf);
      let header = null;
      do {
        header = (this.HEADER_TYPE as any)._read(tap);
      } while (!(avro as any).isValid());
      // if (decode !== false) {
      //   const meta = header.meta;
      //   meta['avro.schema'] = JSON.parse(meta['avro.schema'].toString());
      //   if (meta['avro.codec'] !== undefined) {
      //     meta['avro.codec'] = meta['avro.codec'].toString();
      //   }
      // }
      return header;
    } finally {
      if (opts.destroy) {
        fileStream.destroy();
      }
    }
  }
}

But again the inner methods are not exposed and I cannot access them.
Would it be possible to include a more cloud friendly version that accepts a stram instead of a path?

@kri5t
Copy link
Author

kri5t commented Jun 11, 2022

I found a rather hacky way of doing this:

import { Injectable } from '@nestjs/common';
import * as avro from 'avsc';
import * as crc32 from 'buffer-crc32';
import * as snappy from 'snappy';
import { Readable } from 'stream';

@Injectable()
export class AvroSchemaFileExtractorService {
  snappyDecoder: avro.Codec = (buf, callback) => {
    const checksum = buf.slice(buf.length - 4, buf.length);
    const payload = buf.slice(0, buf.length - 4);
    try {
      const inflated = snappy.uncompressSync(payload, {});
      if (inflated) {
        if (!checksum.equals(crc32(inflated))) {
          callback(new Error('invalid checksum'));
          return;
        }
        callback(null, Buffer.isBuffer(inflated) ? inflated : Buffer.from(inflated));
      }
    } catch (err) {
      callback(err);
      return;
    }
  };

  async get(fileStream: Readable): Promise<string | null> {
    return new Promise(async (resolve, reject) => {
      let schema = null;
      const blockDecoder = new avro.streams.BlockDecoder({
        codecs: { snappy: this.snappyDecoder },
      }).on('metadata', (type, codec, header) => {
        console.log(header);
        const extractedSchema = JSON.parse(header.meta['avro.schema'].toString());
        schema = extractedSchema;
      });
      const blockDecoderStream = fileStream.pipe(blockDecoder);
      for await (const _chunk of blockDecoderStream) {
        if (schema) {
          resolve(schema);
        } else reject('No schema found');
      }
    });
  }
}

I still think it would be great with a more official way of doing this.

@mtth
Copy link
Owner

mtth commented Jun 25, 2022

Hi @kri5t. Your approach of using a BlockDecoder and listening to the 'metadata' event sounds right. Two quick suggestions:

  • Using events.once could simplify the implementation significantly.
  • You can also destroy the stream afterwards to avoid downloading the entire file.

I'm open to adding this as a helper function if you're interested in submitting a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants