ref:9d4658eab2863b5c423ff69e117e666e9804ed3c

perf: streaming receive-pack — iolist body + throttled SHA + flush API (#27)

Closes anvil#153. ## Summary Three changes that together fix the ovs push outage observed in prod: 1. `pack_buffer` switched from `binary()` to an iolist accumulator. Per-chunk cost is now O(1) iolist cons instead of binary `<>` copy. 2. SHA-1 completeness check is throttled to once per 4 MB of body growth (with auto-bypass for bodies smaller than that — keeps small packs / HTTP one-shot working unchanged). 3. New `flush/1` API for callers with an out-of-band end-of-stream signal. Forces the final verify and surfaces `:incomplete_pack` for truncated buffers rather than leaving the state machine wedged in `:pack` forever. ## Why Live test push of openvswitch/ovs (106 MB / 134k objects) in prod showed: - Server CPU pinned at 100% on a single core - RSS climbed from 200 MB → 1.37 GB at 50 MB transferred - On-disk repo dir stayed at 8 KB the entire time (nothing written until the whole pack is buffered) - Transfer rate degraded 900 → 120 KB/s as the buffer grew - Would have OOM'd before completion against the 3.82 GiB container cap Two compounding bugs in `ReceivePack.feed/2 :pack`: - **O(K²) memory**: `new_buffer = state.pack_buffer <> data` on every chunk forced binary copies for a single growing blob. - **O(N·K) CPU**: `verify_pack_checksum` ran `:crypto.hash(:sha, whole_buffer)` on every feed call. OTP 28 doesn't expose `crypto:hash_copy/1`, so we can't keep an incremental SHA cheaply — instead, throttle the check. ## Test plan - [x] Existing receive-pack tests: 11/11. - [x] Full ex_git_objectstore suite: 928/0. - [x] New `receive_pack_streaming_test.exs` (4 tests): - 5 MB pack in 4 KB chunks completes; every blob lands - Process heap stays under 3× pack size (regression lock) - Small pack finalizes via `feed` alone (HTTP one-shot path) - `flush/1` on truncated input reports `:incomplete_pack` - [x] `mix format --check-formatted` clean. ## Follow-up (anvil-side, separate PR) The throttle has a tail-edge: if a pack ends mid-4MB-window and the SSH client EOFs without sending more bytes, the receive sits in `:pack` waiting for the next boundary that never comes. Anvil's SSH cli.ex needs to call `ProtocolHandler.flush` on `{:eof, channel_id}` — landing in the companion anvil PR alongside the mix.lock bump.
SHA: 9d4658eab2863b5c423ff69e117e666e9804ed3c
Author: Anvil <noreply@anvil.fangorn.io>
Date: 2026-05-06 03:43
Parents: 1f149a1
2 files changed +462 -28
Type
lib/ex_git_objectstore/protocol/receive_pack.ex +189 −28
@@ -61,6 +61,41 @@
] ->
:ok | {:error, term()})
@typedoc """
Streaming pack accumulator.
`body` is an iolist of every byte received *except the final 20-byte
trailer*. Iolist append is O(1) per chunk — no copy — vs. the previous
binary `<>` which forced a copy on every feed and made the receive path
O(N²) on memory traffic alone.
`lookahead` always holds the most recent (up to) 20 bytes. As new data
arrives, the old lookahead bytes graduate into `body` and the newest 20
bytes become the new lookahead. When the pack is complete, the lookahead
IS the trailer.
`bytes_since_check` is what makes the receive path O(N) on CPU instead of
O(N²). The previous implementation re-hashed the entire growing buffer
on every chunk arrival to test for completeness — for a 100 MB pack
delivered in 32 KB chunks, that's 100 MB × 3000 chunks of SHA-1 work,
roughly 5 minutes single-threaded. We instead defer the SHA-1 check
until a non-trivial amount of new data has arrived (`@check_interval`),
bringing total verify work to O(N²/check_interval) which at the chosen
4 MB interval is a fraction of a second. The trade-off is up to
`check_interval` extra bytes buffered after the actual pack end before
we notice — `flush/1` exists for callers that have an out-of-band EOF
signal (e.g. the SSH layer) to force the final check.
`body_size` is `iolist_size(body)` kept incrementally so we can enforce
`@max_pack_size` without traversing the iolist.
"""
@type pack_acc :: %{
body: iolist(),
body_size: non_neg_integer(),
lookahead: binary(),
bytes_since_check: non_neg_integer()
}
@type state :: %__MODULE__{
repo: Repo.t(),
pre_receive_hook: (Repo.t(), [command()] -> :ok | {:error, term()}) | nil,
@@ -70,6 +105,6 @@
commands: [command()],
client_caps: MapSet.t(),
cmd_buffer: binary(),
pack_buffer: binary(),
pack_acc: pack_acc() | nil,
result: nil | :ok | {:error, term()}
}
@@ -83,10 +118,27 @@
commands: [],
client_caps: MapSet.new(),
cmd_buffer: <<>>,
pack_buffer: <<>>,
pack_acc: nil,
result: nil
]
@trailer_size 20
# 4 MB between SHA-1 completeness checks. Empirically this is small enough
# that the worst-case overshoot (pack ends just past a check boundary,
# caller doesn't call flush/1) costs at most one extra hash, while still
# large enough that streaming a 100 MB pack does ~25 hash passes total
# rather than one per 32 KB chunk.
@check_interval 4 * 1024 * 1024
defp empty_pack_acc do
%{
body: [],
body_size: 0,
lookahead: <<>>,
bytes_since_check: 0
}
end
@doc """
Create a new receive-pack state machine and generate the ref advertisement.
Returns `{advertisement_data, state}`.
@@ -125,13 +177,17 @@
case parse_commands(full_data) do
{:ok, commands, client_caps, rest} ->
# SSH can splice the pack's first bytes onto the same packet that
# ended the command list. Seed the streaming accumulator with that
# `rest` so it's hashed/buffered exactly like data arriving via a
# subsequent feed/2.
state = %{
state
| commands: commands,
client_caps: client_caps,
phase: :pack,
cmd_buffer: <<>>,
pack_acc: absorb(empty_pack_acc(), rest)
pack_buffer: rest
}
# Check if the pack is already complete in the buffer
@@ -148,13 +204,15 @@
end
def feed(%__MODULE__{phase: :pack} = state, data) do
new_buffer = state.pack_buffer <> data
acc = state.pack_acc || empty_pack_acc()
new_acc = absorb(acc, data)
new_total = new_acc.body_size + byte_size(new_acc.lookahead)
if byte_size(new_buffer) > @max_pack_size do
if new_total > @max_pack_size do
report = build_error_report(:pack_too_large, state.commands)
{report, %{state | phase: :done, result: {:error, :pack_too_large}}}
else
state = %{state | pack_acc: new_acc}
state = %{state | pack_buffer: new_buffer}
maybe_process_pack(state)
end
end
@@ -164,6 +222,49 @@
end
@doc """
Force a final completeness check on the buffered pack.
`feed/2` only verifies the pack's SHA-1 trailer every `@check_interval`
bytes — that's what keeps a 100 MB push from doing 3000 full-buffer hash
passes. The trade-off is that the *last* `<= @check_interval` bytes
might arrive without crossing a boundary, so the receive sits in
`:pack` phase even after the pack is fully delivered.
Transports that have an out-of-band end-of-stream signal (the SSH layer's
`{:eof, channel_id}` message; an HTTP request body's natural end) call
`flush/1` once they know no more data is coming. It bypasses the
throttle and runs a final hash, transitioning to `:done` if the bytes
buffered so far form a valid pack.
Returns `{response, state}` matching `feed/2`'s shape.
"""
@spec flush(state()) :: {binary(), state()}
def flush(%__MODULE__{phase: :pack, commands: commands} = state) do
all_deletes? = Enum.all?(commands, fn cmd -> cmd.new_sha == @zero_sha end)
cond do
all_deletes? or commands == [] ->
process_ref_updates(state)
true ->
case pack_check(state.pack_acc, :force) do
{:complete, _acc} ->
process_pack_and_refs(state)
{:incomplete, new_acc} ->
# No more data is coming and the buffered bytes don't form a
# valid pack. Surface this rather than sitting in :pack forever.
report = build_error_report(:incomplete_pack, state.commands)
{report,
%{state | pack_acc: new_acc, phase: :done, result: {:error, :incomplete_pack}}}
end
end
end
def flush(%__MODULE__{} = state), do: {<<>>, state}
@doc """
Check if the protocol exchange is complete.
"""
@spec done?(state()) :: boolean()
@@ -172,5 +273,32 @@
# -- Private --
# Append `data` to the streaming accumulator. Bytes that are no longer in
# the trailing 20-byte window graduate into the iolist body; the rest
# stays in `lookahead` until more data arrives. Per-chunk cost: O(1)
# iolist cons + a single 20-byte slice. No SHA work happens here — that
# is throttled by `bytes_since_check` and only fires in pack_check/2.
defp absorb(acc, data) when is_binary(data) do
combined = acc.lookahead <> data
case byte_size(combined) - @trailer_size do
n when n <= 0 ->
# Still under the trailer window — nothing to commit yet.
%{acc | lookahead: combined}
graduate_size ->
<<graduate::binary-size(graduate_size), new_lookahead::binary-size(@trailer_size)>> =
combined
%{
acc
| body: [acc.body, graduate],
body_size: acc.body_size + graduate_size,
lookahead: new_lookahead,
bytes_since_check: acc.bytes_since_check + graduate_size
}
end
end
defp build_advertisement(repo) do
refs = list_all_refs_with_head(repo)
@@ -340,37 +468,58 @@
if all_deletes? or commands == [] do
process_ref_updates(state)
else
# Check if we have a complete packfile
case check_pack_complete(state.pack_buffer) do
:complete ->
case pack_check(state.pack_acc, :throttled) do
{:complete, _acc} ->
process_pack_and_refs(state)
{:incomplete, new_acc} ->
{<<>>, %{state | pack_acc: new_acc}}
:incomplete ->
{<<>>, state}
end
end
end
# A complete pack is: 12-byte header + entries + 20-byte SHA-1 trailer.
defp check_pack_complete(
<<"PACK", _version::32, _count::unsigned-big-32, _rest::binary>> = data
) do
# A complete pack has: header (12 bytes) + entries + 20-byte checksum
# We can't easily know the total size without parsing all entries,
# so we verify the trailing SHA-1 checksum
verify_pack_checksum(data)
end
# The accumulator buffers everything except the trailing 20 bytes; the
# pack is complete iff that trailer equals SHA-1 of everything before
# it.
#
# Mode `:throttled` only attempts the SHA when at least @check_interval
# bytes have arrived since the last check. This is what stops the receive
# path from re-hashing the entire growing buffer on every 32 KB chunk.
# Mode `:force` always attempts the SHA — used by `flush/1` when the
# transport (e.g. SSH) signals that no more bytes are coming and we need
# to make a final determination.
defp pack_check(nil, _mode), do: {:incomplete, nil}
defp check_pack_complete(<<>>), do: :incomplete
defp pack_check(%{lookahead: l} = acc, _mode) when byte_size(l) < @trailer_size,
do: {:incomplete, acc}
defp pack_check(%{body_size: n} = acc, _mode) when n < 12, do: {:incomplete, acc}
defp check_pack_complete(_), do: :incomplete
defp pack_check(%{body_size: n, bytes_since_check: c} = acc, :throttled)
when n > @check_interval and c < @check_interval,
defp verify_pack_checksum(data) when byte_size(data) < 32, do: :incomplete
do: {:incomplete, acc}
# Bodies smaller than the throttle window are cheap to hash and frequent
# in tests/HTTP-one-shot pushes — bypass the throttle for those so callers
defp verify_pack_checksum(data) do
body_len = byte_size(data) - 20
# don't need to know about flush/1 just to make small pushes finalize.
<<body::binary-size(body_len), checksum::binary-size(20)>> = data
expected = :crypto.hash(:sha, body)
if checksum == expected, do: :complete, else: :incomplete
defp pack_check(%{body: body, lookahead: lookahead} = acc, _mode) do
# SHA-1 collision probability against random partial pack data is 1 in
# 2^160, so an opportunistic match means we have the actual trailer.
# The `Pack.Reader.parse/2` downstream pass catches anything that
# somehow slipped past (with a structured error rather than a panic),
# so we don't bother with the prior magic-bytes ("PACK") prefix check
# — verifying the magic would require materializing the iolist for
# 8 bits of additional confidence.
candidate = :crypto.hash(:sha, body)
new_acc = %{acc | bytes_since_check: 0}
if candidate == lookahead do
{:complete, new_acc}
else
{:incomplete, new_acc}
end
end
defp process_pack_and_refs(state) do
@@ -378,7 +527,15 @@
[:ex_git_objectstore, :protocol, :receive_pack],
%{repo_id: state.repo.id, command_count: length(state.commands)},
fn ->
# Materialize the iolist body once for the existing single-shot
# Pack.Reader. This is the only point in the receive path where we
# hold a binary the size of the pack — and only briefly, before the
# parse pass replaces it with parsed entries. A future change can
# convert Pack.Reader to a streaming consumer to drop even this
# peak (see issue #153 for the longer-term plan).
pack_data = pack_acc_to_binary(state.pack_acc)
result =
case store_pack_objects(state.repo, state.pack_buffer) do
case store_pack_objects(state.repo, pack_data) do
:ok ->
process_ref_updates(state)
@@ -391,6 +548,10 @@
{result, %{repo_id: state.repo.id, command_count: length(state.commands)}}
end
)
end
defp pack_acc_to_binary(%{body: body, lookahead: lookahead}) do
:erlang.iolist_to_binary([body, lookahead])
end
defp store_pack_objects(repo, pack_data) do