ref:6994d00e454b0e14bcb069d4e50c308556d5597b

perf(pack): single-pass O(N) delta resolution — fixes the next bottleneck on huge pushes (#29)

Sub-issue under fangorn/anvil#153 umbrella. The streaming receive-pack fix in #28 made the receive path itself O(N), but exposed Pack.Reader as the next bottleneck — a live ovs push test ran for 27+ minutes pinned at 100% CPU during parse before I aborted. ## Root cause The offset-keyed cache in the previous resolver was a no-op: writes used `{:sha, sha}` keys, reads used raw `offset` keys, so they never collided. Every OFS_DELTA went through `do_read_object` which re-parsed the pack header AND re-decompressed via zlib for every base in the chain. For a pack with N objects and mean chain depth D, that's O(N · D) zlib decompressions. ovs has 134k objects, 108k deltas, chain depth ~5–10. Half a million to a million zlib decompresses, single-threaded. ## Fix Walk entries in pack order (already produced that way by `parse_entries`) maintaining a real offset-keyed `by_offset` map populated as we go. Pack format guarantees every OFS_DELTA's base is at a smaller offset, so by the time we hit a delta, its base is already resolved — apply the delta and record the result. REF_DELTAs that reference a forward or external base are deferred to a fixed-point post-pass; that pass terminates on either resolution-fixpoint or genuine unresolvability with a structured error. Each object decompressed once (in parse_entries), each delta applied once. **O(N) total.** Two preserved bits of context that were previously thrown away and re-derived on every chain walk: - OFS_DELTA `neg_offset` is now stored on the entry struct - REF_DELTA `base_sha` is now stored on the entry struct ## What this does NOT fix Tracked under #153 still: - **Memory**: parse still collects the entire resolved entries list before `store_entries/2` writes anything. Peak RSS is still O(pack size). Streaming parse-and-store rewrite is the next sub-issue. - **Point-lookup API** (`Reader.read_object/2,3,4`, used by `ObjectResolver`) still has the broken cache. Different code path; left alone for this PR per scope discipline. ## Test plan - [x] 928 tests / 0 failures across the existing suite (including 67 pack tests). - [x] `mix format --check-formatted` clean. - [x] `mix dialyzer` clean. - [ ] Live ovs push test against prod once this and the anvil mix.lock bump deploy. Expectation: parse phase drops from 27+ min to single-digit seconds.
SHA: 6994d00e454b0e14bcb069d4e50c308556d5597b
Author: Anvil <noreply@anvil.fangorn.io>
Date: 2026-05-06 13:53
Parents: d5d4b73
1 files changed +129 -129
Type
lib/ex_git_objectstore/pack/reader.ex +129 −129
@@ -173,170 +173,162 @@
end
end
# -- Two-phase delta resolution --
# -- Single-pass, in-pack-order delta resolution --
#
# Pack format guarantees that every OFS_DELTA's base is at a *smaller*
# offset (it appears earlier in the pack). REF_DELTAs may reference any
# SHA, including ones from later in the pack or external (thin pack) —
# Phase 1: Resolve all non-REF_DELTA entries (full objects + OFS_DELTAs).
# OFS_DELTAs can be fully resolved using offset chains within the pack.
# those go to a deferred queue and are resolved in dependency-order
# passes once all in-pack bases are known.
# After this phase, compute SHAs of all resolved objects to build a
# complete SHA→{type, data} map that REF_DELTAs can reference.
#
# `parse_entries` already decompressed every entry exactly once. The
# previous resolver threw that work away and re-parsed/re-decompressed
# via `do_read_object`, AND its offset-keyed "cache" was actually keyed
# `{:sha, sha}` on writes, so the offset lookups never hit. The result
# was O(N · D) zlib decompressions where D is the mean delta-chain
# depth — for a 134k-object / 108k-delta pack like ovs that pinned a
# Phase 2: Resolve REF_DELTA entries using the SHA map built in Phase 1.
# Falls back to external_resolver for thin pack support.
# CPU for half an hour and counting before the previous flush.
#
# This two-phase approach handles the case where a REF_DELTA references a
# base object that's stored as an OFS_DELTA in the pack (whose SHA wouldn't
# The new pipeline walks entries once in offset order, applying deltas
# against an offset-keyed `resolved` map populated as we go. Each
# object is decompressed once (during parse_entries) and each delta is
# applied once. Total work: O(N).
defp resolve_delta_entries(entries, _pack_data, _cache, external_resolver) do
initial = %{
resolved_in_order: [],
by_offset: %{},
sha_to_offset: %{},
deferred: []
}
# appear in the initial non-delta-only SHA index).
defp resolve_delta_entries(entries, pack_data, cache, external_resolver) do
{ref_deltas, others} = Enum.split_with(entries, fn e -> e.type == :ref_delta end)
with {:ok, resolved_others} <- resolve_non_ref_deltas(others, pack_data, cache) do
if ref_deltas == [] do
{:ok, resolved_others}
else
sha_map = build_sha_data_map(resolved_others)
resolve_ref_deltas(ref_deltas, pack_data, cache, sha_map, external_resolver,
resolved: resolved_others
)
end
case Enum.reduce_while(entries, {:ok, initial}, &resolve_entry_in_order/2) do
{:ok, state} -> finish_resolution(state, external_resolver)
{:error, _} = err -> err
end
end
defp resolve_non_ref_deltas(entries, pack_data, cache) do
entries
|> Enum.reduce_while({:ok, []}, &resolve_non_ref_entry(&1, &2, pack_data, cache))
|> finalize_entries()
defp resolve_entry_in_order(entry, {:ok, state}) do
case resolve_one_entry(entry, state) do
{:ok, new_state} -> {:cont, {:ok, new_state}}
{:error, _} = err -> {:halt, err}
end
end
defp resolve_non_ref_entry(entry, {:ok, acc}, pack_data, cache) do
if entry.type == :ofs_delta do
# Base type (commit/tree/blob/tag): parse_entries already decompressed
# it, so `entry.data` IS the resolved object payload. Just record it.
defp resolve_one_entry(%{type: t, offset: offset, data: data} = entry, state)
when t in [:commit, :tree, :blob, :tag] do
record_resolved(state, entry, t, data, offset)
case do_read_object(pack_data, entry.offset, cache, 0, nil) do
{:ok, {resolved_type, resolved_data}} ->
{:cont,
{:ok, [%{type: resolved_type, data: resolved_data, offset: entry.offset} | acc]}}
{:error, _} = err ->
{:halt, err}
end
else
{:cont, {:ok, [entry | acc]}}
end
end
defp finalize_entries({:ok, reversed}), do: {:ok, Enum.reverse(reversed)}
defp finalize_entries({:error, _} = err), do: err
# OFS_DELTA: base is at `entry.offset - entry.neg_offset`. Pack format
# guarantees this is a smaller offset, so it's always already in
# `by_offset`. Apply the delta and record the result.
defp resolve_one_entry(%{type: :ofs_delta} = entry, state) do
base_offset = entry.offset - entry.neg_offset
defp build_sha_data_map(entries) do
Enum.reduce(entries, %{}, fn entry, acc ->
sha = compute_object_sha(Atom.to_string(entry.type), entry.data)
Map.put(acc, sha, {entry.type, entry.data})
end)
end
case Map.fetch(state.by_offset, base_offset) do
{:ok, {base_type, base_data}} ->
apply_and_record(state, entry, base_type, base_data)
# Multi-pass REF_DELTA resolution: newly resolved REF_DELTAs may be
# bases for other REF_DELTAs.
defp resolve_ref_deltas([], _pack_data, _cache, _sha_map, _ext, resolved: resolved) do
{:ok, resolved}
:error ->
{:error, {:ofs_delta_base_missing, entry.offset, base_offset}}
end
end
defp resolve_ref_deltas(ref_deltas, pack_data, cache, sha_map, ext, resolved: resolved) do
{newly_resolved, still_pending} =
Enum.reduce(ref_deltas, {[], []}, fn entry, {done, pending} ->
case resolve_single_ref_delta(entry, pack_data, cache, sha_map, ext) do
{:ok, resolved_entry} -> {[resolved_entry | done], pending}
:pending -> {done, [entry | pending]}
{:error, _} = err -> throw(err)
end
end)
# REF_DELTA: try the in-pack SHA index first; if the base isn't yet
# known (forward reference or thin pack), defer to the post-pass.
defp resolve_one_entry(%{type: :ref_delta} = entry, state) do
case Map.fetch(state.sha_to_offset, entry.base_sha) do
{:ok, base_offset} ->
{base_type, base_data} = Map.fetch!(state.by_offset, base_offset)
apply_and_record(state, entry, base_type, base_data)
handle_ref_delta_pass(newly_resolved, still_pending, pack_data, cache, sha_map, ext, resolved)
catch
{:error, _} = err -> err
:error ->
{:ok, %{state | deferred: [entry | state.deferred]}}
end
end
defp handle_ref_delta_pass([], [first | _], _pack_data, _cache, _sha_map, _ext, _resolved) do
{:error, {:ref_delta_base_not_found, "unresolvable REF_DELTA at offset #{first.offset}"}}
defp apply_and_record(state, entry, base_type, base_data) do
case Delta.apply(base_data, entry.data) do
{:ok, result} -> record_resolved(state, entry, base_type, result, entry.offset)
{:error, _} = err -> err
end
end
defp handle_ref_delta_pass(
newly_resolved,
still_pending,
pack_data,
cache,
sha_map,
ext,
defp record_resolved(state, _entry, type, data, offset) do
sha = compute_object_sha(Atom.to_string(type), data)
resolved = %{type: type, data: data, offset: offset}
resolved
) do
new_sha_map =
Enum.reduce(newly_resolved, sha_map, fn entry, acc ->
sha = compute_object_sha(Atom.to_string(entry.type), entry.data)
Map.put(acc, sha, {entry.type, entry.data})
end)
{:ok,
all_resolved = resolved ++ Enum.reverse(newly_resolved)
%{
state
| resolved_in_order: [resolved | state.resolved_in_order],
by_offset: Map.put(state.by_offset, offset, {type, data}),
sha_to_offset: Map.put(state.sha_to_offset, sha, offset)
}}
end
# After the in-order pass, REF_DELTAs whose base wasn't in the pack at
# the time we scanned them get retried. Each pass either resolves
# something (newly available because another deferred REF_DELTA just
# finished) or routes to the external resolver. If a pass resolves
# nothing and there's still pending work, the remaining entries are
# genuinely unresolvable.
defp finish_resolution(%{deferred: []} = state, _external_resolver) do
{:ok, Enum.reverse(state.resolved_in_order)}
resolve_ref_deltas(still_pending, pack_data, cache, new_sha_map, ext, resolved: all_resolved)
end
defp resolve_single_ref_delta(entry, pack_data, _cache, sha_map, ext_resolver) do
<<_::binary-size(entry.offset), data::binary>> = pack_data
defp finish_resolution(state, external_resolver) do
case run_deferred_pass(state, external_resolver) do
{:ok, %{deferred: []} = new_state} ->
{:ok, Enum.reverse(new_state.resolved_in_order)}
case parse_object_header(data) do
{:ok, @obj_ref_delta, _size, header_len, _rest} ->
resolve_ref_delta_at(entry, pack_data, header_len, sha_map, ext_resolver)
{:ok, %{deferred: still_deferred} = new_state}
when length(still_deferred) < length(state.deferred) ->
finish_resolution(new_state, external_resolver)
{:ok, _, _, _, _} ->
{:error, {:unexpected_type_at_ref_delta_offset, entry.offset}}
{:ok, %{deferred: [first | _]}} ->
{:error, {:ref_delta_base_not_found, "unresolvable REF_DELTA at offset #{first.offset}"}}
{:error, _} = err ->
throw(err)
err
end
end
defp resolve_ref_delta_at(entry, pack_data, header_len, sha_map, ext_resolver) do
data_start = entry.offset + header_len
defp run_deferred_pass(state, external_resolver) do
Enum.reduce_while(state.deferred, {:ok, %{state | deferred: []}}, fn entry, {:ok, acc} ->
case resolve_deferred_ref_delta(entry, acc, external_resolver) do
{:ok, new_acc} -> {:cont, {:ok, new_acc}}
{:error, _} = err -> {:halt, err}
end
end)
end
defp resolve_deferred_ref_delta(entry, state, external_resolver) do
case Map.fetch(state.sha_to_offset, entry.base_sha) do
{:ok, base_offset} ->
<<_::binary-size(data_start), base_sha_bin::binary-size(20), compressed::binary>> = pack_data
base_sha = Base.encode16(base_sha_bin, case: :lower)
case decompress_data(compressed) do
{:ok, delta_data, _} ->
apply_ref_delta(delta_data, base_sha, entry.offset, sha_map, ext_resolver)
{:error, _} = err ->
throw(err)
end
end
{base_type, base_data} = Map.fetch!(state.by_offset, base_offset)
apply_and_record(state, entry, base_type, base_data)
defp apply_ref_delta(delta_data, base_sha, offset, sha_map, ext_resolver) do
case Map.get(sha_map, base_sha) do
{base_type, base_data} ->
apply_delta_to_entry(base_type, base_data, delta_data, offset)
:error ->
case call_external_resolver(external_resolver, entry.base_sha) do
{:ok, {base_type, base_data}} ->
apply_and_record(state, entry, base_type, base_data)
nil ->
try_external_ref_delta(delta_data, base_sha, offset, ext_resolver)
:pending ->
{:ok, %{state | deferred: [entry | state.deferred]}}
end
end
end
defp try_external_ref_delta(delta_data, base_sha, offset, ext_resolver)
when is_function(ext_resolver) do
case ext_resolver.(base_sha) do
{:ok, {base_type, base_data}} ->
apply_delta_to_entry(base_type, base_data, delta_data, offset)
{:error, _} ->
:pending
defp call_external_resolver(resolver, base_sha) when is_function(resolver, 1) do
case resolver.(base_sha) do
{:ok, _} = ok -> ok
{:error, _} -> :pending
end
end
defp try_external_ref_delta(_delta_data, _base_sha, _offset, _ext_resolver), do: :pending
defp apply_delta_to_entry(base_type, base_data, delta_data, offset) do
case Delta.apply(base_data, delta_data) do
{:ok, result} -> {:ok, %{type: base_type, data: result, offset: offset}}
{:error, _} = err -> throw(err)
end
end
defp call_external_resolver(_, _), do: :pending
# -- Single-entry parsing --
@@ -360,19 +352,27 @@
end
defp parse_entry_body(@obj_ofs_delta, header_len, rest) do
with {:ok, _neg_offset, ofs_len, rest_after_ofs} <- parse_ofs_delta_offset(rest),
with {:ok, neg_offset, ofs_len, rest_after_ofs} <- parse_ofs_delta_offset(rest),
{:ok, delta_data, compressed_len} <- decompress_data(rest_after_ofs) do
{:ok, %{type: :ofs_delta, data: delta_data, offset: 0},
# `neg_offset` is preserved on the entry so the resolution pass can
# locate the base in O(1) via the offset-keyed cache, instead of the
# previous behaviour which re-parsed the pack header to recover it.
{:ok, %{type: :ofs_delta, data: delta_data, offset: 0, neg_offset: neg_offset},
header_len + ofs_len + compressed_len}
end
end
defp parse_entry_body(@obj_ref_delta, header_len, rest) when byte_size(rest) >= 20 do
<<_base_sha::binary-size(20), rest_after_sha::binary>> = rest
<<base_sha_bin::binary-size(20), rest_after_sha::binary>> = rest
case decompress_data(rest_after_sha) do
{:ok, delta_data, compressed_len} ->
{:ok, %{type: :ref_delta, data: delta_data, offset: 0}, header_len + 20 + compressed_len}
# Preserve the base SHA so the resolution pass can index by it
# without re-reading and re-parsing the pack at this offset.
base_sha = Base.encode16(base_sha_bin, case: :lower)
{:ok, %{type: :ref_delta, data: delta_data, offset: 0, base_sha: base_sha},
header_len + 20 + compressed_len}
{:error, _} = err ->
err