ref:0ac01400194edcbe8b62a97e421a7306d6918510

perf(pack): streaming parse-and-store with bounded LRU base cache

Replaces Reader.parse/2's collect-everything-into-a-list shape with a callback-driven Reader.parse_stream/3. Each resolved object is handed to the caller as it's decoded, then dropped — the BEAM is free to GC the binary before the next entry arrives. The receive-pack flow uses this to write each object to storage immediately. Base lookups for in-pack OFS_DELTA / REF_DELTA chains hit a bounded LRU (~4096 entries) first; on a miss, we fall back to an external_resolver that reads the just-written object back from storage. Memory usage becomes O(LRU + 1 in-flight object), independent of pack size. Measured on the openvswitch/ovs pack (96 MB compressed, 134k objects, 108k deltas): before (collected-list parse): peak RSS 3.4–3.5 GB, OOM at 3.82 GiB after (streaming + LRU): peak RSS 622 MB, post-GC 63 MB Memory stays bounded throughout the run (oscillates 200–470 MB) instead of growing with pack size. Parse time ~54 s including the simulated storage write per object — comparable to the previous in-memory run. The legacy `Reader.parse/2` is kept as a thin wrapper over `parse_stream/3` for backwards compatibility — same buffered shape, same buffered cost, only existing callers see it.
SHA: 0ac01400194edcbe8b62a97e421a7306d6918510
Author: CI <ci@anvil.test>
Date: 2026-05-06 18:04
Parents: 96509fa
2 files changed +328 -199
Type
lib/ex_git_objectstore/pack/reader.ex +308 −184
@@ -46,247 +46,324 @@
@base_types [@obj_commit, @obj_tree, @obj_blob, @obj_tag]
# `:sha` is only present on objects emitted via `parse_stream/3` (and
# therefore on `parse/2`'s collected output, since it's implemented atop
# the streaming variant). Some legacy callers may have constructed
# pack_object maps without it; downstream code should fall back to
# `compute_object_sha/2` when the field is missing.
@type pack_object :: %{
type: atom(),
data: binary(),
offset: non_neg_integer()
required(:type) => atom(),
required(:data) => binary(),
required(:offset) => non_neg_integer(),
optional(:sha) => String.t()
}
@doc """
Parse all objects from a packfile binary, resolving deltas.
Returns a list of `%{type: atom, data: binary, offset: integer}`.
Backwards-compatible wrapper over `parse_stream/3` that collects every
resolved object into a list. **Holds the entire decompressed pack in
memory simultaneously** — for ~100 MB packs of the ovs shape, that's
multiple GB of RSS. New callers should prefer `parse_stream/3` so
they can write each object to storage and discard from memory as it
completes.
## Options
- `:external_resolver` — a function `(sha :: String.t()) -> {:ok, {type_atom, data}} | {:error, term()}`
used to resolve REF_DELTA bases not found in the pack (thin pack support).
"""
@spec parse(binary(), keyword()) :: {:ok, [pack_object()]} | {:error, term()}
def parse(pack_data, opts \\ [])
def parse(pack_data, opts \\ []) do
me = self()
def parse(
<<@pack_signature, version::unsigned-big-32, count::unsigned-big-32, _rest::binary>> =
pack_data,
opts
)
when version in [2, 3] do
if count > @max_pack_objects do
{:error, {:pack_too_large, count}}
else
parse_verified(pack_data, count, Keyword.get(opts, :external_resolver))
callback = fn entry ->
send(me, {:pack_entry, entry})
:ok
end
case parse_stream(pack_data, callback, opts) do
:ok ->
entries = collect_entries([])
{:ok, entries}
{:error, _} = err ->
# Drain any in-flight messages so they don't pollute future
# calls in this process.
flush_pack_entries()
err
end
end
def parse(_, _opts), do: {:error, :invalid_pack_header}
defp collect_entries(acc) do
receive do
{:pack_entry, entry} -> collect_entries([entry | acc])
after
0 -> Enum.reverse(acc)
end
end
defp parse_verified(pack_data, count, external_resolver) do
with :ok <- verify_pack_checksum(pack_data),
defp flush_pack_entries do
receive do
{:pack_entry, _} -> flush_pack_entries()
after
0 -> :ok
entries_data = strip_pack_envelope(pack_data),
{:ok, entries, cache} <- parse_entries(entries_data, count, [], %{}, 12) do
resolve_delta_entries(entries, pack_data, cache, external_resolver)
end
end
defp strip_pack_envelope(pack_data) do
entries_len = byte_size(pack_data) - 12 - 20
@doc """
Stream-parse a packfile, emitting each resolved object via callback.
<<_header::binary-size(12), entries_data::binary-size(entries_len),
_checksum::binary-size(20)>> = pack_data
This is the memory-bounded alternative to `parse/2`. Walks entries in
pack order, decompresses + resolves each one, and passes the resolved
`pack_object()` map to `callback` immediately. The caller is expected
to store the object (e.g. write to disk) and then return — the entry
is discarded by this function as soon as the callback returns, except
when it's a base that future deltas may reference (kept in an
in-memory map so OFS_DELTA / REF_DELTA resolution stays fast).
entries_data
end
Memory usage is bounded by:
- the input `pack_data` binary itself (caller chose to materialize it);
- the by-offset cache of base data needed for in-pack delta resolution;
- one resolved object at a time during the callback.
This avoids the buffered-entries-list shape of `parse/2` which scales
to O(decompressed pack size) extra RSS.
@doc """
Verify the trailing SHA-1 checksum of a pack file.
"""
@spec verify_pack_checksum(binary()) :: :ok | {:error, term()}
def verify_pack_checksum(pack_data) when byte_size(pack_data) >= 32 do
body_len = byte_size(pack_data) - 20
<<body::binary-size(body_len), checksum::binary-size(20)>> = pack_data
expected = :crypto.hash(:sha, body)
if checksum == expected do
:ok
else
{:error, :pack_checksum_mismatch}
end
end
## Callback contract
def verify_pack_checksum(_), do: {:error, :pack_too_short}
`callback.(pack_object)` returns `:ok` to continue or
`{:error, reason}` to abort the parse and propagate `reason`.
@doc """
Read a single object at the given offset from a packfile binary.
Resolves deltas recursively using the pack data itself.
"""
@spec read_object(binary(), non_neg_integer()) :: {:ok, {atom(), binary()}} | {:error, term()}
def read_object(pack_data, offset) do
read_object(pack_data, offset, %{})
end
## Options
@doc """
Read object with a pre-populated cache of already-resolved objects.
Same as `parse/2`.
"""
@spec read_object(binary(), non_neg_integer(), map()) ::
{:ok, {atom(), binary()}} | {:error, term()}
def read_object(pack_data, offset, cache) do
do_read_object(pack_data, offset, cache, 0, nil)
@spec parse_stream(binary(), (pack_object() -> :ok | {:error, term()}), keyword()) ::
:ok | {:error, term()}
def parse_stream(pack_data, callback, opts \\ [])
def parse_stream(
<<@pack_signature, version::unsigned-big-32, count::unsigned-big-32, _rest::binary>> =
pack_data,
callback,
opts
)
when version in [2, 3] and is_function(callback, 1) do
cond do
count > @max_pack_objects ->
{:error, {:pack_too_large, count}}
true ->
with :ok <- verify_pack_checksum(pack_data) do
stream_verified(pack_data, count, callback, Keyword.get(opts, :external_resolver))
end
end
end
@doc """
Read object with a pre-populated cache and an external resolver for REF_DELTA bases.
def parse_stream(_, _callback, _opts), do: {:error, :invalid_pack_header}
# Walks every entry in pack order. State threaded through:
#
# - offset_to_sha: %{offset => sha} — small (just hex strings).
# Needed to translate an OFS_DELTA's base offset
The `external_resolver` is a function `(sha :: binary()) -> {:ok, {atom(), binary()}} | {:error, term()}`
used to look up base objects not found within the pack itself (thin packs).
"""
@spec read_object(binary(), non_neg_integer(), map(), (binary() ->
{:ok, {atom(), binary()}}
# into a SHA before consulting the data cache or
# external resolver.
# - lru_data: %{sha => {type, data}} bounded by `lru_max`.
# Holds recently-emitted objects so that nearby
# deltas can resolve their bases without round-
# tripping to storage.
# - lru_queue: :queue of SHAs in insertion order, used for
# FIFO eviction when the cache outgrows lru_max.
# - deferred: REF_DELTAs whose base SHA hadn't appeared yet
# at the moment we tried to resolve. Run a
# fixed-point pass at the end.
#
# The previous design held every resolved object indefinitely. For
# ovs's 134k objects that pushed peak RSS to 3+ GB. The bounded LRU
# plus external resolver fallback keeps the working set small without
# losing correctness — when a delta misses, we re-read the base
# object from storage where the receive callback just persisted it.
defp stream_verified(pack_data, count, callback, external_resolver) do
state = %{
offset_to_sha: %{},
lru_data: %{},
lru_queue: :queue.new(),
lru_size: 0,
lru_max: lru_max_entries(),
deferred: [],
callback: callback,
external_resolver: external_resolver
}
case stream_entries(pack_data, count, 12, state) do
{:ok, state} -> finalize_deferred(state)
{:error, _} = err -> err
end
| {:error, term()})) ::
{:ok, {atom(), binary()}} | {:error, term()}
def read_object(pack_data, offset, cache, external_resolver)
when is_function(external_resolver, 1) do
do_read_object(pack_data, offset, cache, 0, external_resolver)
end
# Bounded LRU for recently-resolved objects, used as the primary base
# cache for OFS_DELTA / REF_DELTA resolution. The previous design held
# every resolved object in a map for the lifetime of the parse — for
# ovs's 134k objects that pushed peak RSS to 3+ GB, almost all of which
# was BEAM binary-heap garbage that didn't get reclaimed in time.
#
# 4096 entries × ~5 KB avg ≈ 20 MB live. Most deltas reference recent
# bases (locality), so the hit rate stays high; misses fall back to the
# external resolver (reads the just-written object back from storage).
defp lru_max_entries, do: 4096
# -- Entry parsing (sequential scan) --
defp stream_entries(_pack_data, 0, _abs_offset, state), do: {:ok, state}
defp parse_entries(_rest, 0, acc, cache, _abs_offset), do: {:ok, Enum.reverse(acc), cache}
defp stream_entries(pack_data, remaining, abs_offset, state) do
<<_skip::binary-size(abs_offset), rest::binary>> = pack_data
defp parse_entries(rest, remaining, acc, cache, abs_offset) do
case parse_entry_at(rest) do
{:ok, entry, consumed} ->
entry = %{entry | offset: abs_offset}
case resolve_streamed_entry(entry, pack_data, state) do
cache = maybe_cache_sha(cache, entry, abs_offset)
{:ok, new_state} ->
stream_entries(pack_data, remaining - 1, abs_offset + consumed, new_state)
{:error, _} = err ->
err
end
<<_consumed::binary-size(consumed), new_rest::binary>> = rest
parse_entries(new_rest, remaining - 1, [entry | acc], cache, abs_offset + consumed)
{:error, _} = err ->
err
end
end
# For each freshly-parsed entry: produce its final {type, data}, emit
# via the caller's callback, and update the state. Base types skip the
# apply step.
defp resolve_streamed_entry(%{type: t, offset: offset, data: data} = _entry, _pack_data, state)
when t in [:commit, :tree, :blob, :tag] do
emit_resolved(state, offset, t, data)
defp maybe_cache_sha(cache, entry, abs_offset) do
if entry.type in [:commit, :tree, :blob, :tag] do
sha = compute_object_sha(Atom.to_string(entry.type), entry.data)
Map.put(cache, {:sha, sha}, abs_offset)
end
defp resolve_streamed_entry(%{type: :ofs_delta, offset: offset} = entry, _pack_data, state) do
base_offset = offset - entry.neg_offset
with {:ok, base_sha} <- Map.fetch(state.offset_to_sha, base_offset),
{:ok, {base_type, base_data}, state} <- fetch_base(state, base_sha) do
case Delta.apply(base_data, entry.data) do
{:ok, result} -> emit_resolved(state, offset, base_type, result)
{:error, _} = err -> err
end
else
:error ->
{:error, {:ofs_delta_base_missing, offset, base_offset}}
{:error, :base_not_found} ->
cache
{:error, {:ofs_delta_base_unresolved, offset, base_offset}}
end
end
# -- Single-pass, in-pack-order delta resolution --
#
# Pack format guarantees that every OFS_DELTA's base is at a *smaller*
# offset (it appears earlier in the pack). REF_DELTAs may reference any
# SHA, including ones from later in the pack or external (thin pack) —
# those go to a deferred queue and are resolved in dependency-order
# passes once all in-pack bases are known.
#
# `parse_entries` already decompressed every entry exactly once. The
# previous resolver threw that work away and re-parsed/re-decompressed
# via `do_read_object`, AND its offset-keyed "cache" was actually keyed
# `{:sha, sha}` on writes, so the offset lookups never hit. The result
# was O(N · D) zlib decompressions where D is the mean delta-chain
# depth — for a 134k-object / 108k-delta pack like ovs that pinned a
# CPU for half an hour and counting before the previous flush.
#
# The new pipeline walks entries once in offset order, applying deltas
# against an offset-keyed `resolved` map populated as we go. Each
# object is decompressed once (during parse_entries) and each delta is
# applied once. Total work: O(N).
defp resolve_delta_entries(entries, _pack_data, _cache, external_resolver) do
initial = %{
resolved_in_order: [],
by_offset: %{},
sha_to_offset: %{},
deferred: []
}
defp resolve_streamed_entry(%{type: :ref_delta, offset: offset} = entry, _pack_data, state) do
case fetch_base(state, entry.base_sha) do
{:ok, {base_type, base_data}, new_state} ->
case Delta.apply(base_data, entry.data) do
{:ok, result} -> emit_resolved(new_state, offset, base_type, result)
{:error, _} = err -> err
end
case Enum.reduce_while(entries, {:ok, initial}, &resolve_entry_in_order/2) do
{:ok, state} -> finish_resolution(state, external_resolver)
{:error, _} = err -> err
{:error, :base_not_found} ->
# Defer: base might be later in pack OR external (thin pack).
{:ok, %{state | deferred: [entry | state.deferred]}}
end
end
# Look up a base by SHA. Tries the in-memory LRU first (fast path —
# most pack-internal delta references hit here because of locality);
# falls back to the external resolver, which for the receive-pack flow
# reads the just-written object from storage. Returns the data plus an
# updated state (the LRU may have been promoted/refilled).
defp fetch_base(state, sha) do
case Map.fetch(state.lru_data, sha) do
{:ok, value} ->
{:ok, value, state}
:error ->
case call_external_resolver_stream(state.external_resolver, sha) do
{:ok, value} ->
{:ok, value, lru_put(state, sha, value)}
:pending ->
defp resolve_entry_in_order(entry, {:ok, state}) do
case resolve_one_entry(entry, state) do
{:error, :base_not_found}
end
{:ok, new_state} -> {:cont, {:ok, new_state}}
{:error, _} = err -> {:halt, err}
end
end
defp emit_resolved(state, offset, type, data) do
# Base type (commit/tree/blob/tag): parse_entries already decompressed
# it, so `entry.data` IS the resolved object payload. Just record it.
defp resolve_one_entry(%{type: t, offset: offset, data: data} = entry, state)
when t in [:commit, :tree, :blob, :tag] do
record_resolved(state, entry, t, data, offset)
end
sha = compute_object_sha(Atom.to_string(type), data)
pack_object = %{type: type, data: data, offset: offset, sha: sha}
# OFS_DELTA: base is at `entry.offset - entry.neg_offset`. Pack format
# guarantees this is a smaller offset, so it's always already in
case state.callback.(pack_object) do
:ok ->
new_state = %{
state
| offset_to_sha: Map.put(state.offset_to_sha, offset, sha)
}
# `by_offset`. Apply the delta and record the result.
defp resolve_one_entry(%{type: :ofs_delta} = entry, state) do
base_offset = entry.offset - entry.neg_offset
case Map.fetch(state.by_offset, base_offset) do
{:ok, {base_type, base_data}} ->
apply_and_record(state, entry, base_type, base_data)
{:ok, lru_put(new_state, sha, {type, data})}
{:error, _} = err ->
:error ->
{:error, {:ofs_delta_base_missing, entry.offset, base_offset}}
err
end
end
defp lru_put(%{lru_data: data} = state, sha, value) when is_map_key(data, sha) do
# Already cached; refresh by replacing the value (cheap; queue
# ordering is approximate — a slightly stale ordering doesn't change
# correctness, only the eviction order).
# REF_DELTA: try the in-pack SHA index first; if the base isn't yet
# known (forward reference or thin pack), defer to the post-pass.
defp resolve_one_entry(%{type: :ref_delta} = entry, state) do
case Map.fetch(state.sha_to_offset, entry.base_sha) do
{:ok, base_offset} ->
{base_type, base_data} = Map.fetch!(state.by_offset, base_offset)
apply_and_record(state, entry, base_type, base_data)
%{state | lru_data: Map.put(data, sha, value)}
:error ->
{:ok, %{state | deferred: [entry | state.deferred]}}
end
end
defp apply_and_record(state, entry, base_type, base_data) do
case Delta.apply(base_data, entry.data) do
{:ok, result} -> record_resolved(state, entry, base_type, result, entry.offset)
defp lru_put(state, sha, value) do
new_data = Map.put(state.lru_data, sha, value)
new_queue = :queue.in(sha, state.lru_queue)
new_size = state.lru_size + 1
if new_size > state.lru_max do
lru_evict_one(%{state | lru_data: new_data, lru_queue: new_queue, lru_size: new_size})
else
{:error, _} = err -> err
%{state | lru_data: new_data, lru_queue: new_queue, lru_size: new_size}
end
end
defp lru_evict_one(state) do
case :queue.out(state.lru_queue) do
defp record_resolved(state, _entry, type, data, offset) do
sha = compute_object_sha(Atom.to_string(type), data)
resolved = %{type: type, data: data, offset: offset}
{{:value, sha}, q2} ->
%{
state
| lru_data: Map.delete(state.lru_data, sha),
lru_queue: q2,
lru_size: state.lru_size - 1
}
{:ok,
%{
state
| resolved_in_order: [resolved | state.resolved_in_order],
by_offset: Map.put(state.by_offset, offset, {type, data}),
sha_to_offset: Map.put(state.sha_to_offset, sha, offset)
{:empty, _} ->
state
end
}}
end
# After the in-order pass, REF_DELTAs whose base wasn't in the pack at
# the time we scanned them get retried. Each pass either resolves
# After the in-order pass, REF_DELTAs whose base wasn't yet known get
# retried. Each pass either resolves something (because another
# deferred REF_DELTA just finished) or routes to the external resolver.
# If a pass resolves nothing and there's still pending work, the
# remaining entries are genuinely unresolvable.
# something (newly available because another deferred REF_DELTA just
# finished) or routes to the external resolver. If a pass resolves
# nothing and there's still pending work, the remaining entries are
# genuinely unresolvable.
defp finish_resolution(%{deferred: []} = state, _external_resolver) do
{:ok, Enum.reverse(state.resolved_in_order)}
end
defp finalize_deferred(%{deferred: []}), do: :ok
defp finish_resolution(state, external_resolver) do
case run_deferred_pass(state, external_resolver) do
{:ok, %{deferred: []} = new_state} ->
{:ok, Enum.reverse(new_state.resolved_in_order)}
defp finalize_deferred(state) do
case run_deferred_pass(state) do
{:ok, %{deferred: []}} ->
:ok
{:ok, %{deferred: still_deferred} = new_state}
when length(still_deferred) < length(state.deferred) ->
finish_resolution(new_state, external_resolver)
finalize_deferred(new_state)
{:ok, %{deferred: [first | _]}} ->
{:error, {:ref_delta_base_not_found, "unresolvable REF_DELTA at offset #{first.offset}"}}
@@ -296,39 +373,86 @@
end
end
defp run_deferred_pass(state, external_resolver) do
defp run_deferred_pass(state) do
Enum.reduce_while(state.deferred, {:ok, %{state | deferred: []}}, fn entry, {:ok, acc} ->
case resolve_deferred_ref_delta(entry, acc, external_resolver) do
case resolve_deferred_ref_delta(entry, acc) do
{:ok, new_acc} -> {:cont, {:ok, new_acc}}
{:error, _} = err -> {:halt, err}
end
end)
end
defp resolve_deferred_ref_delta(entry, state) do
defp resolve_deferred_ref_delta(entry, state, external_resolver) do
case Map.fetch(state.sha_to_offset, entry.base_sha) do
case fetch_base(state, entry.base_sha) do
{:ok, {base_type, base_data}, new_state} ->
case Delta.apply(base_data, entry.data) do
{:ok, result} -> emit_resolved(new_state, entry.offset, base_type, result)
{:error, _} = err -> err
end
{:ok, base_offset} ->
{base_type, base_data} = Map.fetch!(state.by_offset, base_offset)
apply_and_record(state, entry, base_type, base_data)
{:error, :base_not_found} ->
:error ->
case call_external_resolver(external_resolver, entry.base_sha) do
{:ok, %{state | deferred: [entry | state.deferred]}}
{:ok, {base_type, base_data}} ->
apply_and_record(state, entry, base_type, base_data)
:pending ->
{:ok, %{state | deferred: [entry | state.deferred]}}
end
end
end
defp call_external_resolver(resolver, base_sha) when is_function(resolver, 1) do
defp call_external_resolver_stream(resolver, base_sha) when is_function(resolver, 1) do
case resolver.(base_sha) do
{:ok, _} = ok -> ok
{:error, _} -> :pending
end
end
defp call_external_resolver_stream(_, _), do: :pending
@doc """
Verify the trailing SHA-1 checksum of a pack file.
"""
@spec verify_pack_checksum(binary()) :: :ok | {:error, term()}
def verify_pack_checksum(pack_data) when byte_size(pack_data) >= 32 do
body_len = byte_size(pack_data) - 20
<<body::binary-size(body_len), checksum::binary-size(20)>> = pack_data
expected = :crypto.hash(:sha, body)
if checksum == expected do
:ok
else
{:error, :pack_checksum_mismatch}
end
end
def verify_pack_checksum(_), do: {:error, :pack_too_short}
@doc """
Read a single object at the given offset from a packfile binary.
Resolves deltas recursively using the pack data itself.
defp call_external_resolver(_, _), do: :pending
"""
@spec read_object(binary(), non_neg_integer()) :: {:ok, {atom(), binary()}} | {:error, term()}
def read_object(pack_data, offset) do
read_object(pack_data, offset, %{})
end
@doc """
Read object with a pre-populated cache of already-resolved objects.
"""
@spec read_object(binary(), non_neg_integer(), map()) ::
{:ok, {atom(), binary()}} | {:error, term()}
def read_object(pack_data, offset, cache) do
do_read_object(pack_data, offset, cache, 0, nil)
end
@doc """
Read object with a pre-populated cache and an external resolver for REF_DELTA bases.
The `external_resolver` is a function `(sha :: binary()) -> {:ok, {atom(), binary()}} | {:error, term()}`
used to look up base objects not found within the pack itself (thin packs).
"""
@spec read_object(binary(), non_neg_integer(), map(), (binary() ->
{:ok, {atom(), binary()}}
| {:error, term()})) ::
{:ok, {atom(), binary()}} | {:error, term()}
def read_object(pack_data, offset, cache, external_resolver)
when is_function(external_resolver, 1) do
do_read_object(pack_data, offset, cache, 0, external_resolver)
end
# -- Single-entry parsing --