fangorn/ex_git_objectstore
public
ref:main
# Copyright 2026 Cole Christensen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
defmodule ExGitObjectstore.Test.FailingStorage do
@moduledoc """
Storage backend wrapper for tests. Delegates everything to a
`Memory` store but lets the test inject a synthetic failure on a
specific `put_ref` call — typically used to verify that atomic
ReceivePack correctly rolls back when a mid-batch ref write fails.
Usage:
{:ok, pid} = FailingStorage.start_link()
repo = Repo.new("r", storage: {FailingStorage, FailingStorage.config(pid)})
FailingStorage.fail_put_ref_once(pid, "refs/heads/boom")
# Any subsequent put_ref for "refs/heads/boom" returns
# `{:error, :injected_failure}`. Writes to other refs, and
# subsequent writes to "refs/heads/boom" after the first
# injected failure, behave normally.
"""
alias ExGitObjectstore.Storage.Memory
@behaviour ExGitObjectstore.Storage
# --- setup ---
@spec start_link() :: {:ok, pid()}
def start_link do
{:ok, mem_pid} = Memory.start_link()
Agent.start_link(fn -> %{mem_pid: mem_pid, fail_put_ref: nil} end)
end
@spec config(pid()) :: map()
def config(pid), do: %{pid: pid}
@doc """
Arm a one-shot failure: the next `put_ref/5` call whose ref name
matches `ref` returns `{:error, :injected_failure}`; the arm is
cleared after it fires.
"""
@spec fail_put_ref_once(pid(), String.t()) :: :ok
def fail_put_ref_once(pid, ref) do
Agent.update(pid, fn state -> %{state | fail_put_ref: ref} end)
end
# --- Storage callbacks ---
@impl true
def get_object(config, prefix, sha), do: Memory.get_object(mem_cfg(config), prefix, sha)
@impl true
def put_object(config, prefix, sha, data),
do: Memory.put_object(mem_cfg(config), prefix, sha, data)
@impl true
def object_exists?(config, prefix, sha), do: Memory.object_exists?(mem_cfg(config), prefix, sha)
@impl true
def list_objects(config, prefix), do: Memory.list_objects(mem_cfg(config), prefix)
@impl true
def list_packs(config, prefix), do: Memory.list_packs(mem_cfg(config), prefix)
@impl true
def get_pack(config, prefix, pack_sha), do: Memory.get_pack(mem_cfg(config), prefix, pack_sha)
@impl true
def get_pack_index(config, prefix, pack_sha),
do: Memory.get_pack_index(mem_cfg(config), prefix, pack_sha)
@impl true
def put_pack(config, prefix, pack_sha, pack_data, idx_data),
do: Memory.put_pack(mem_cfg(config), prefix, pack_sha, pack_data, idx_data)
@impl true
def stream_pack(config, prefix, pack_sha),
do: Memory.stream_pack(mem_cfg(config), prefix, pack_sha)
@impl true
def get_ref(config, prefix, ref), do: Memory.get_ref(mem_cfg(config), prefix, ref)
@impl true
def put_ref(%{pid: pid} = config, prefix, ref, new_sha, old_sha) do
case Agent.get_and_update(pid, fn
%{fail_put_ref: ^ref} = state -> {:fail, %{state | fail_put_ref: nil}}
state -> {:ok, state}
end) do
:fail -> {:error, :injected_failure}
:ok -> Memory.put_ref(mem_cfg(config), prefix, ref, new_sha, old_sha)
end
end
@impl true
def delete_ref(config, prefix, ref), do: Memory.delete_ref(mem_cfg(config), prefix, ref)
@impl true
def list_refs(config, prefix, ref_prefix),
do: Memory.list_refs(mem_cfg(config), prefix, ref_prefix)
@impl true
def get_head(config, prefix), do: Memory.get_head(mem_cfg(config), prefix)
@impl true
def put_head(config, prefix, target), do: Memory.put_head(mem_cfg(config), prefix, target)
@impl true
def get_blob(config, prefix, key), do: Memory.get_blob(mem_cfg(config), prefix, key)
@impl true
def put_blob(config, prefix, key, data), do: Memory.put_blob(mem_cfg(config), prefix, key, data)
@impl true
def delete_blob(config, prefix, key), do: Memory.delete_blob(mem_cfg(config), prefix, key)
@impl true
def blob_exists?(config, prefix, key), do: Memory.blob_exists?(mem_cfg(config), prefix, key)
# --- helpers ---
defp mem_cfg(%{pid: pid}) do
mem_pid = Agent.get(pid, & &1.mem_pid)
Memory.config(mem_pid)
end
end