ref:796cf4019db07f046eea9d127a297f47d76143a9

feat: telemetry spans for fetch, atomic push, and filter application

Operational visibility for the new protocol-v2 code paths. Events emitted: * [:ex_git_objectstore, :protocol, :fetch] — span around every UploadPackV2 fetch. Start/stop metadata carries wants, haves, mode (:full / :shallow / :filtered / :shallow_filtered / :wait_for_done), repo_id; stop measurements carry `objects` and `pack_bytes`. * [:ex_git_objectstore, :protocol, :receive_pack, :atomic] — span around the atomic ref-update flow. Stop metadata carries outcome (:committed / :rolled_back), commands, and validation_failures. * [:ex_git_objectstore, :pack, :filter] — single event per filter application. Measurements: objects_in, objects_out. Metadata: spec_kind (:blob_none / :blob_limit / :tree_depth / :object_type / :sparse_oid / :combine), repo_id. * [:ex_git_objectstore, :protocol, :receive_pack, :rollback_failed] — emitted in the rare case a ref rollback fails during atomic abort (already added in the previous error-handling commit; noted here for completeness). Test file `test/ex_git_objectstore/protocol/telemetry_test.exs` attaches a handler and asserts each event fires with the expected payload for clone, shallow fetch, filter, and atomic-commit paths.
SHA: 796cf4019db07f046eea9d127a297f47d76143a9
Author: Cole Christensen <cole.christensen@macmillan.com>
Date: 2026-04-19 01:19
Parents: b77b43f
3 files changed +311 -28
Type
lib/ex_git_objectstore/protocol/receive_pack.ex +29 −8
@@ -534,15 +534,36 @@
# between steps), leaving refs in a partially-applied state. This
# implementation is as atomic as the storage layer allows.
defp do_atomic_ref_updates(state) do
:telemetry.span(
[:ex_git_objectstore, :protocol, :receive_pack, :atomic],
%{repo_id: state.repo.id, commands: length(state.commands)},
fn ->
validations =
Enum.map(state.commands, fn cmd ->
{cmd, validate_ref_command(state.repo, state.update_hook, cmd)}
end)
validation_failures =
Enum.count(validations, fn {_cmd, v} -> match?({:error, _}, v) end)
result =
case Enum.find(validations, fn {_cmd, v} -> match?({:error, _}, v) end) do
nil -> atomic_commit_phase(state)
{_failing_cmd, {:error, reason}} -> atomic_reject_all(state, validations, reason)
end
{_response, final_state} = result
validations =
Enum.map(state.commands, fn cmd ->
{cmd, validate_ref_command(state.repo, state.update_hook, cmd)}
end)
outcome = if match?({:error, _}, final_state.result), do: :rolled_back, else: :committed
case Enum.find(validations, fn {_cmd, v} -> match?({:error, _}, v) end) do
nil -> atomic_commit_phase(state)
{_failing_cmd, {:error, reason}} -> atomic_reject_all(state, validations, reason)
end
{result,
%{
repo_id: state.repo.id,
commands: length(state.commands),
validation_failures: validation_failures,
outcome: outcome
}}
end
)
end
defp atomic_reject_all(state, validations, reason) do
lib/ex_git_objectstore/protocol/upload_pack_v2.ex +81 −20
@@ -348,26 +348,65 @@
"send_packfile=#{send_packfile?}"
)
span_meta = %{
repo_id: repo.id,
wants: length(wants),
haves: length(haves),
mode: fetch_mode(shallow_opts, filter_spec, wait_for_done?)
}
:telemetry.span([:ex_git_objectstore, :protocol, :fetch], span_meta, fn ->
{result, extra} =
do_handle_fetch(
repo,
wants,
haves,
args,
done?,
wait_for_done?,
shallow_opts,
filter_spec,
send_packfile?
)
{result, Map.merge(span_meta, extra)}
end)
end
defp do_handle_fetch(
repo,
wants,
haves,
_args,
_done?,
wait_for_done?,
shallow_opts,
filter_spec,
send_packfile?
) do
ack_section = build_acknowledgments(repo, haves, send_packfile?)
cond do
# `--negotiate-only` flow: client wants the ACKs and nothing else.
wait_for_done? ->
{{ack_section, :done}, %{pack_bytes: 0, objects: 0}}
{ack_section, :done}
# Normal clone / fetch with `done`, OR shallow/deepen request:
# send packfile immediately.
send_packfile? ->
{build_packfile_response(repo, wants, haves, ack_section, shallow_opts, filter_spec),
:done}
{response, stats} =
build_packfile_response(repo, wants, haves, ack_section, shallow_opts, filter_spec)
# Multi-round negotiation: client hasn't committed yet. Emit the
# ACKs and wait for another fetch command on the same session.
{{response, :done}, stats}
true ->
{{ack_section, :command}, %{pack_bytes: 0, objects: 0}}
{ack_section, :command}
end
end
defp fetch_mode(nil, nil, true), do: :wait_for_done
defp fetch_mode(nil, nil, _), do: :full
defp fetch_mode(nil, _filter, _), do: :filtered
defp fetch_mode(_shallow, nil, _), do: :shallow
defp fetch_mode(_, _, _), do: :shallow_filtered
defp build_packfile_response(repo, wants, haves, ack_section, shallow_opts, filter_spec) do
case collect_objects_maybe_shallow(repo, wants, haves, shallow_opts, filter_spec) do
{:ok, %{objects: objects} = walk} ->
@@ -382,20 +421,23 @@
PktLine.encode_sideband(1, pack_data)
|> IO.iodata_to_binary()
response =
IO.iodata_to_binary([
ack_section,
shallow_info,
packfile_header,
sideband_data,
PktLine.flush()
IO.iodata_to_binary([
ack_section,
shallow_info,
packfile_header,
sideband_data,
PktLine.flush()
])
])
{response, %{pack_bytes: byte_size(pack_data), objects: length(objects)}}
{:error, reason} ->
Logger.error(
"UploadPackV2: collect_objects failed for #{length(wants)} wants, " <>
"#{length(haves)} haves: #{inspect(reason)}"
)
{PktLine.flush(), %{pack_bytes: 0, objects: 0, error: reason}}
PktLine.flush()
end
end
@@ -708,10 +750,29 @@
do: compute_tree_metadata(repo, objects),
else: %{depths: %{}, paths: %{}}
Enum.filter(objects, fn entry ->
kept =
Enum.filter(objects, fn entry ->
Filter.include?(spec, filter_ctx_for(entry, meta), repo)
end)
:telemetry.execute(
[:ex_git_objectstore, :pack, :filter],
%{objects_in: length(objects), objects_out: length(kept)},
%{spec_kind: filter_spec_kind(spec), repo_id: repo.id}
)
kept
Filter.include?(spec, filter_ctx_for(entry, meta), repo)
end)
end
# Tag the filter kind for telemetry consumers without exposing the
# full spec structure (which could contain large values like
# sparse:oid's spec blob sha).
defp filter_spec_kind(:blob_none), do: :blob_none
defp filter_spec_kind({:blob_limit, _}), do: :blob_limit
defp filter_spec_kind({:tree_depth, _}), do: :tree_depth
defp filter_spec_kind({:object_type, _}), do: :object_type
defp filter_spec_kind({:sparse_oid, _}), do: :sparse_oid
defp filter_spec_kind({:combine, _}), do: :combine
# `tree:<n>` uses `ctx.tree_depth`; `sparse:oid=<oid>` uses `ctx.path`.
# Everything else decides from type + size, which are already in the