"GOT", but the "O" is a cute, smiling pufferfish. Index | Thread | Search

From:
Omar Polo <op@omarpolo.com>
Subject:
Re: process deltas in compressed form
To:
Stefan Sperling <stsp@stsp.name>
Cc:
gameoftrees@openbsd.org
Date:
Mon, 02 May 2022 14:02:44 +0200

Download raw body.

Thread
Stefan Sperling <stsp@stsp.name> wrote:
> During packing we currently decompress reused deltas while reading them
> from the pack file, and store the decompressed data either in memory or
> in the delta cache file.
> The same applies to newly computed deltas; we write them to the delta
> cache file in uncompressed form, and compress the delta data while
> copying it into the generated pack file.
> 
> This approach works, but it is wasteful.
> A 4GB /tmp paritition will run out of space during 'gotadmin pack -a'
> in a copy of the OpenBSD src repo because the delta cache file grows
> too large.

i'm happy about hte change, but i remember 'gotadmin pack -a' working on
a 4GB /tmp partition:

% df -h | fgrep /tmp
/dev/sd1d      3.9G    6.8M    3.7G     0%    /tmp
% /usr/local/bin/gotadmin -V
gotadmin 0.68
% time /usr/local/bin/gotadmin pack -a
219498 commits colored; 2088729 objects found; 1010361 trees scanned
packing 5 references; 2088729 objects; deltify: 100%; writing pack: 1.2G 100%
Wrote fe8e346aa0f23dc77371a3ad76f5bcea7d5ea071.pack
 1.2G packed; indexing 100%; resolving deltas 100%
Indexed fe8e346aa0f23dc77371a3ad76f5bcea7d5ea071.pack
   23m04.15s real    14m22.31s user     7m40.23s system
 
> With the patch below, we store deltas in compressed form.
> Reused deltas will be copied as-is from their pack file, so we won't
> waste time compressing them again. We still decompress such deltas
> once order to verify that decompression succeeds. This is a sanity check
> also performed by Git. It is intended to protect against silent bitrot.
> So I decided to do the same. We would need to decompress a delta anyway
> at least in part because we need to read two size values stored at the
> beginning of a delta stream.
> 
> With this patch my system no longer runs out of space in /tmp when
> repacking the OpenBSD src repo.
> 
> I am adding more wrapper functions around libz. I am not happy about that
> as we already have a lot of glue code on top of libz. But the existing
> abstractions do not support incremental writes to a file with compression,
> they only support one-shot compression of an existing file or memory buffer.
> It would be good to tidy this up a bit and shrink the amount of abstractions
> in use here. But that can be done later.
> 
> ok?

it reads fine, ok op@;  small nitpics inline below.

slightly related: while reading I've stubled upon csum_output where
we're potentially truncating a size_t into an 'unsigned int' when
calling crc32.  It seems unlikely (maybe even impossible) to end up
calling csum_output with a `len' greater than UINT_MAX, but what about
using crc32_z?

diff 6b7665acf3ac9dd7d0c30372df5a4fa09b1b47fa /tmp/got
blob - 3c97a77f77b3da0548ab67dbcb2685723456ee39
file + lib/deflate.c
--- lib/deflate.c
+++ lib/deflate.c
@@ -83,7 +83,7 @@ static void
 csum_output(struct got_deflate_checksum *csum, const uint8_t *buf, size_t len)
 {
 	if (csum->output_crc)
-		*csum->output_crc = crc32(*csum->output_crc, buf, len);
+		*csum->output_crc = crc32_z(*csum->output_crc, buf, len);
 
 	if (csum->output_sha1)
 		SHA1Update(csum->output_sha1, buf, len);


> diff cf8f868e7c97644d885e9cc2a06debbe9eac72b0 da308c0ea675766ef4047c919ba7eb6d104d842e
> blob - 3c97a77f77b3da0548ab67dbcb2685723456ee39
> blob + 8be992eef33b9ee118e5dc75d039f224e5acb657
> --- lib/deflate.c
> +++ lib/deflate.c
> @@ -136,9 +136,9 @@ got_deflate_read(struct got_deflate_buf *zb, FILE *f, 
>  	return NULL;
>  }
>  
> -const struct got_error *
> -got_deflate_read_mmap(struct got_deflate_buf *zb, uint8_t *map, size_t offset,
> -    size_t len, size_t *outlenp, size_t *consumed)
> +static const struct got_error *
> +deflate_read_mmap(struct got_deflate_buf *zb, uint8_t *map, size_t offset,
> +    size_t len, size_t *outlenp, size_t *consumed, int flush_on_eof)
>  {
>  	z_stream *z = &zb->z;
>  	size_t last_total_out = z->total_out;
> @@ -159,7 +159,8 @@ got_deflate_read_mmap(struct got_deflate_buf *zb, uint
>  				z->avail_in = len - *consumed;
>  			if (z->avail_in == 0) {
>  				/* EOF */
> -				ret = deflate(z, Z_FINISH);
> +				if (flush_on_eof)
> +					ret = deflate(z, Z_FINISH);
>  				break;
>  			}
>  		}
> @@ -179,6 +180,53 @@ got_deflate_read_mmap(struct got_deflate_buf *zb, uint
>  	return NULL;
>  }
>  
> +const struct got_error *
> +got_deflate_read_mmap(struct got_deflate_buf *zb, uint8_t *map, size_t offset,
> +    size_t len, size_t *outlenp, size_t *consumed)
> +{
> +	return deflate_read_mmap(zb, map, offset, len, outlenp, consumed, 1);
> +}
> +
> +const struct got_error *
> +got_deflate_flush(struct got_deflate_buf *zb, FILE *outfile,
> +    struct got_deflate_checksum *csum, off_t *outlenp)
> +{
> +	int ret;
> +	size_t n;
> +	z_stream *z = &zb->z;
> +

nit: can drop the braces here

> +	if (z->avail_in != 0) {
> +		return got_error_msg(GOT_ERR_COMPRESSION,
> +		    "cannot flush zb with pending input data");
> +	}
> +
> +	do {
> +		size_t avail, last_total_out = zb->z.total_out;
> +
> +		z->next_out = zb->outbuf;
> +		z->avail_out = zb->outlen;
> +
> +		ret = deflate(z, Z_FINISH);
> +		if (ret != Z_STREAM_END && ret != Z_OK)
> +			return got_error(GOT_ERR_COMPRESSION);
> +
> +		avail = z->total_out - last_total_out;
> +		if (avail > 0) {
> +			n = fwrite(zb->outbuf, avail, 1, outfile);
> +			if (n != 1)
> +				return got_ferror(outfile, GOT_ERR_IO);
> +			if (csum)
> +				csum_output(csum, zb->outbuf, avail);
> +			if (outlenp)
> +				*outlenp += avail;
> +		}
> +	} while (ret != Z_STREAM_END);
> +
> +	zb->flags &= ~GOT_DEFLATE_F_HAVE_MORE;
> +	return NULL;

nit: empty line

> +
> +}
> +
>  void
>  got_deflate_end(struct got_deflate_buf *zb)
>  {
> @@ -263,3 +311,97 @@ done:
>  	got_deflate_end(&zb);
>  	return err;
>  }
> +
> +const struct got_error *
> +got_deflate_append_to_file_mmap(struct got_deflate_buf *zb, off_t *outlen,
> +    uint8_t *map, size_t offset, size_t len, FILE *outfile,
> +    struct got_deflate_checksum *csum)
> +{
> +	const struct got_error *err;
> +	size_t avail, consumed;
> +
> +	do {
> +		err = deflate_read_mmap(zb, map, offset, len, &avail,
> +		    &consumed, 0);
> +		if (err)
> +			break;
> +		offset += consumed;
> +		len -= consumed;
> +		if (avail > 0) {
> +			size_t n;
> +			n = fwrite(zb->outbuf, avail, 1, outfile);
> +			if (n != 1) {
> +				err = got_ferror(outfile, GOT_ERR_IO);
> +				break;
> +			}
> +			if (csum)
> +				csum_output(csum, zb->outbuf, avail);
> +			if (outlen)
> +				*outlen += avail;
> +		}
> +	} while ((zb->flags & GOT_DEFLATE_F_HAVE_MORE) && len > 0);
> +
> +	return err;
> +}
> +
> +const struct got_error *
> +got_deflate_to_mem_mmap(uint8_t **outbuf, size_t *outlen,
> +    size_t *consumed_total, struct got_deflate_checksum *csum, uint8_t *map,
> +    size_t offset, size_t len)
> +{
> +	const struct got_error *err;
> +	size_t avail, consumed;
> +	struct got_deflate_buf zb;
> +	void *newbuf;

nit: i'd go with a size_t for nbuf

> +	int nbuf = 1;
> +
> +	if (outbuf) {
> +		*outbuf = malloc(GOT_DEFLATE_BUFSIZE);
> +		if (*outbuf == NULL)
> +			return got_error_from_errno("malloc");
> +		err = got_deflate_init(&zb, *outbuf, GOT_DEFLATE_BUFSIZE);
> +		if (err) {
> +			free(*outbuf);
> +			*outbuf = NULL;
> +			return err;
> +		}
> +	} else {
> +		err = got_deflate_init(&zb, NULL, GOT_DEFLATE_BUFSIZE);
> +		if (err)
> +			return err;
> +	}
> +
> +	*outlen = 0;
> +	if (consumed_total)
> +		*consumed_total = 0;
> +	do {
> +		err = got_deflate_read_mmap(&zb, map, offset, len, &avail,
> +		    &consumed);
> +		if (err)
> +			goto done;
> +		offset += consumed;
> +		if (consumed_total)
> +			*consumed_total += consumed;
> +		len -= consumed;
> +		if (avail > 0 && csum)
> +			csum_output(csum, zb.outbuf, avail);
> +		*outlen += avail;
> +		if ((zb.flags & GOT_DEFLATE_F_HAVE_MORE) && outbuf != NULL) {
> +			newbuf = reallocarray(*outbuf, ++nbuf,
> +			    GOT_DEFLATE_BUFSIZE);
> +			if (newbuf == NULL) {
> +				err = got_error_from_errno("reallocarray");
> +				free(*outbuf);
> +				*outbuf = NULL;
> +				*outlen = 0;
> +				goto done;
> +			}
> +			*outbuf = newbuf;
> +			zb.outbuf = newbuf + *outlen;
> +			zb.outlen = (nbuf * GOT_DEFLATE_BUFSIZE) - *outlen;
> +		}
> +	} while (zb.flags & GOT_DEFLATE_F_HAVE_MORE);
> +done:
> +	got_deflate_end(&zb);
> +	return err;
> +}
> blob - 1c429af85a27229451e05798e9511d0a8f474968
> blob + 09a8755cf062db2ffd1f7e2c26deef470dbba250
> --- lib/got_lib_deflate.h
> +++ lib/got_lib_deflate.h
> @@ -39,8 +39,17 @@ const struct got_error *got_deflate_init(struct got_de
>      size_t);
>  const struct got_error *got_deflate_read(struct got_deflate_buf *, FILE *,
>      off_t, size_t *, off_t *);
> +const struct got_error *got_deflate_read_mmap(struct got_deflate_buf *,
> +    uint8_t *, size_t, size_t, size_t *, size_t *);
>  void got_deflate_end(struct got_deflate_buf *);
>  const struct got_error *got_deflate_to_file(off_t *, FILE *, off_t, FILE *,
>      struct got_deflate_checksum *);
>  const struct got_error *got_deflate_to_file_mmap(off_t *, uint8_t *,
>      size_t, size_t, FILE *, struct got_deflate_checksum *);
> +const struct got_error *got_deflate_flush(struct got_deflate_buf *, FILE *,
> +    struct got_deflate_checksum *, off_t *);
> +const struct got_error *got_deflate_append_to_file_mmap(
> +    struct got_deflate_buf *, off_t *, uint8_t *, size_t, size_t, FILE *,
> +    struct got_deflate_checksum *);
> +const struct got_error *got_deflate_to_mem_mmap(uint8_t **, size_t *, size_t *,
> +    struct got_deflate_checksum *, uint8_t *, size_t, size_t);
> blob - 6af8d574c7b345c52d3e0c19759bf3ae6bd62b20
> blob + 4bbe44dda07c97ba6f0ef878da0caadbe6de9741
> --- lib/got_lib_object.h
> +++ lib/got_lib_object.h
> @@ -104,7 +104,7 @@ const struct got_error *got_object_open_from_packfile(
>      struct got_object_id *, struct got_pack *, struct got_packidx *, int,
>      struct got_repository *);
>  const struct got_error *got_object_read_raw_delta(uint64_t *, uint64_t *,
> -    off_t *, off_t *, off_t *, struct got_object_id **, int,
> +    off_t *, off_t *, off_t *, off_t *, struct got_object_id **, int,
>      struct got_packidx *, int, struct got_object_id *, struct got_repository *);
>  const struct got_error *got_object_read_header_privsep(struct got_object **,
>      struct got_object_id *, struct got_repository *, int);
> blob - e8fb373e287ee80486d50ed07964d9d39924308d
> blob + 6a3d3981c9afd96d48ef7746b2d0b1d78793a7ca
> --- lib/got_lib_pack.h
> +++ lib/got_lib_pack.h
> @@ -212,7 +212,7 @@ const struct got_error *got_packfile_extract_object(st
>  const struct got_error *got_packfile_extract_object_to_mem(uint8_t **, size_t *,
>      struct got_object *, struct got_pack *);
>  const struct got_error *got_packfile_extract_raw_delta(uint8_t **, size_t *,
> -    off_t *, off_t *, struct got_object_id *, uint64_t *, uint64_t *,
> +    size_t *, off_t *, off_t *, struct got_object_id *, uint64_t *, uint64_t *,
>      struct got_pack *, struct got_packidx *, int);
>  struct got_pack *got_repo_get_cached_pack(struct got_repository *,
>      const char *);
> blob - 110fe049d86c1a33fb3b33e4fe74ffa8a3dbbfa8
> blob + e57f4dd3f8f4d207324b69c89c54442ae78cd5bb
> --- lib/got_lib_privsep.h
> +++ lib/got_lib_privsep.h
> @@ -284,6 +284,7 @@ struct got_imsg_raw_delta {
>  	uint64_t base_size;
>  	uint64_t result_size;
>  	off_t delta_size;
> +	off_t delta_compressed_size;
>  	off_t delta_offset;
>  	off_t delta_out_offset;
>  
> @@ -662,8 +663,9 @@ const struct got_error *got_privsep_send_raw_delta_req
>      struct got_object_id *);
>  const struct got_error *got_privsep_send_raw_delta_outfd(struct imsgbuf *, int);
>  const struct got_error *got_privsep_send_raw_delta(struct imsgbuf *, uint64_t,
> -    uint64_t,  off_t, off_t, off_t, struct got_object_id *);
> +    uint64_t,  off_t, off_t, off_t, off_t, struct got_object_id *);
>  const struct got_error *got_privsep_recv_raw_delta(uint64_t *, uint64_t *,
> -    off_t *, off_t *, off_t *, struct got_object_id **, struct imsgbuf *);
> +    off_t *, off_t *, off_t *, off_t *, struct got_object_id **,
> +    struct imsgbuf *);
>  
>  void got_privsep_exec_child(int[2], const char *, const char *);
> blob - b87e6eecb828ef98889452c3dd9b205e5eaf3c33
> blob + 4e5facc7f5e3c665aa540bb9caf3299f68626c2d
> --- lib/object.c
> +++ lib/object.c
> @@ -388,8 +388,8 @@ got_object_open_from_packfile(struct got_object **obj,
>  
>  const struct got_error *
>  got_object_read_raw_delta(uint64_t *base_size, uint64_t *result_size,
> -    off_t *delta_size, off_t *delta_offset, off_t *delta_out_offset,
> -    struct got_object_id **base_id, int delta_cache_fd,
> +    off_t *delta_size, off_t *delta_compressed_size, off_t *delta_offset,
> +    off_t *delta_out_offset, struct got_object_id **base_id, int delta_cache_fd,
>      struct got_packidx *packidx, int obj_idx, struct got_object_id *id,
>      struct got_repository *repo)
>  {
> @@ -400,6 +400,7 @@ got_object_read_raw_delta(uint64_t *base_size, uint64_
>  	*base_size = 0;
>  	*result_size = 0;
>  	*delta_size = 0;
> +	*delta_compressed_size = 0;
>  	*delta_offset = 0;
>  	*delta_out_offset = 0;
>  
> @@ -439,7 +440,8 @@ got_object_read_raw_delta(uint64_t *base_size, uint64_
>  		return err;
>  
>  	return got_privsep_recv_raw_delta(base_size, result_size, delta_size,
> -	    delta_offset, delta_out_offset, base_id, pack->privsep_child->ibuf);
> +	    delta_compressed_size, delta_offset, delta_out_offset, base_id,
> +	    pack->privsep_child->ibuf);
>  }
>  
>  static const struct got_error *
> blob - d875046e25b7f0b4172baa4dbd064445a73f18c4
> blob + e901a95d83890c031e00dc5f8a9ee560e51ace41
> --- lib/pack.c
> +++ lib/pack.c
> @@ -902,23 +902,33 @@ got_pack_parse_offset_delta(off_t *base_offset, size_t
>  
>  static const struct got_error *
>  read_delta_data(uint8_t **delta_buf, size_t *delta_len,
> -    size_t delta_data_offset, struct got_pack *pack)
> +    size_t *delta_compressed_len, size_t delta_data_offset,
> +    struct got_pack *pack)
>  {
>  	const struct got_error *err = NULL;
> +	size_t consumed = 0;
>  
>  	if (pack->map) {
>  		if (delta_data_offset >= pack->filesize)
>  			return got_error(GOT_ERR_PACK_OFFSET);
>  		err = got_inflate_to_mem_mmap(delta_buf, delta_len,
> -		    NULL, NULL, pack->map, delta_data_offset,
> +		    &consumed, NULL, pack->map, delta_data_offset,
>  		    pack->filesize - delta_data_offset);
> +		if (err)
> +			return err;
>  	} else {
>  		if (lseek(pack->fd, delta_data_offset, SEEK_SET) == -1)
>  			return got_error_from_errno("lseek");
> -		err = got_inflate_to_mem_fd(delta_buf, delta_len, NULL,
> -		    NULL, 0, pack->fd);
> +		err = got_inflate_to_mem_fd(delta_buf, delta_len,
> +		    &consumed, NULL, 0, pack->fd);
> +		if (err)
> +			return err;
>  	}
> -	return err;
> +
> +	if (delta_compressed_len)
> +		*delta_compressed_len = consumed;
> +
> +	return NULL;
>  }
>  
>  static const struct got_error *
> @@ -1200,7 +1210,7 @@ got_pack_get_delta_chain_max_size(uint64_t *max_size,
>  			if (delta_buf == NULL) {
>  				cached = 0;
>  				err = read_delta_data(&delta_buf, &delta_len,
> -				    delta->data_offset, pack);
> +				    NULL, delta->data_offset, pack);
>  				if (err)
>  					return err;
>  				err = got_delta_cache_add(pack->delta_cache,
> @@ -1336,7 +1346,7 @@ got_pack_dump_delta_chain_to_file(size_t *result_size,
>  		    pack->delta_cache, delta->data_offset);
>  		if (delta_buf == NULL) {
>  			cached = 0;
> -			err = read_delta_data(&delta_buf, &delta_len,
> +			err = read_delta_data(&delta_buf, &delta_len, NULL,
>  			    delta->data_offset, pack);
>  			if (err)
>  				goto done;
> @@ -1482,7 +1492,7 @@ got_pack_dump_delta_chain_to_mem(uint8_t **outbuf, siz
>  		    pack->delta_cache, delta->data_offset);
>  		if (delta_buf == NULL) {
>  			cached = 0;
> -			err = read_delta_data(&delta_buf, &delta_len,
> +			err = read_delta_data(&delta_buf, &delta_len, NULL,
>  			    delta->data_offset, pack);
>  			if (err)
>  				goto done;
> @@ -1601,20 +1611,76 @@ got_packfile_extract_object_to_mem(uint8_t **buf, size
>  	return err;
>  }
>  
> +static const struct got_error *
> +read_raw_delta_data(uint8_t **delta_buf, size_t *delta_len,
> +    size_t *delta_len_compressed, uint64_t *base_size, uint64_t *result_size,
> +    off_t delta_data_offset, struct got_pack *pack, struct got_packidx *packidx)
> +{
> +	const struct got_error *err = NULL;
> +
> +	/* Validate decompression and obtain the decompressed size. */
> +	err = read_delta_data(delta_buf, delta_len, delta_len_compressed,
> +	    delta_data_offset, pack);
> +	if (err)
> +		return err;
> +
> +	/* Read delta base/result sizes from head of delta stream. */
> +	err = got_delta_get_sizes(base_size, result_size,
> +	    *delta_buf, *delta_len);
> +	if (err)
> +		goto done;
> +
> +	/* Discard decompressed delta and read it again in compressed form. */
> +	free(*delta_buf);
> +	*delta_buf = malloc(*delta_len_compressed);
> +	if (*delta_buf == NULL)
> +		return got_error_from_errno("malloc");

goto done?

probably not important, I expect callers to check the error before
looking at the returned data, but the other error conditions don't
return directly.