Skip to content

Commit

Permalink
gc-based snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
icristescu committed Nov 15, 2022
1 parent 9127c0c commit 9fea325
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
latest gc was called on. (#2110, @icristescu)
- Add `split` to create a new suffix chunk. Subsequent writes will append to
this chunk until `split` is called again. (#2118, @icristescu)
- Add `create_one_commit_store` to create a new store from the existing one,
containing only one commit. (#2125, @icristescu)

### Changed

Expand Down
3 changes: 2 additions & 1 deletion src/irmin-pack/unix/errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type base_error =
| `Invalid_read_of_gced_object of string
| `Inconsistent_store
| `Split_forbidden_during_batch
| `Multiple_empty_chunks ]
| `Multiple_empty_chunks
| `Forbidden_during_gc ]
[@@deriving irmin ~pp]
(** [base_error] is the type of most errors that can occur in a [result], except
for errors that have associated exceptions (see below) and backend-specific
Expand Down
85 changes: 66 additions & 19 deletions src/irmin-pack/unix/ext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,26 @@ module Maker (Config : Conf.S) = struct
cancelled
| None -> false

let start ~unlink ~use_auto_finalisation t commit_key =
let direct_commit_key t key =
let state : _ Pack_key.state = Pack_key.inspect key in
match state with
| Direct _ -> Ok key
| Indexed h -> (
match Commit.CA.index_direct_with_kind t.commit h with
| None ->
Error
(`Commit_key_is_dangling
(Irmin.Type.to_string XKey.t key))
| Some (k, _kind) -> Ok k)

let start ~unlink ~use_auto_finalisation ~new_files_path t commit_key
=
let open Result_syntax in
[%log.info "GC: Starting on %a" pp_key commit_key];
let* () =
if t.during_batch then Error `Gc_forbidden_during_batch else Ok ()
in
let* commit_key =
let state : _ Pack_key.state = Pack_key.inspect commit_key in
match state with
| Direct _ -> Ok commit_key
| Indexed h -> (
match Commit.CA.index_direct_with_kind t.commit h with
| None ->
Error
(`Commit_key_is_dangling
(Irmin.Type.to_string XKey.t commit_key))
| Some (k, _kind) -> Ok k)
in
let* commit_key = direct_commit_key t commit_key in
let root = Conf.root t.config in
let* () =
if not (File_manager.gc_allowed t.fm) then Error `Gc_disallowed
Expand All @@ -271,19 +273,21 @@ module Maker (Config : Conf.S) = struct
let gc =
Gc.v ~root ~generation:next_generation ~unlink
~dispatcher:t.dispatcher ~fm:t.fm ~contents:t.contents
~node:t.node ~commit:t.commit commit_key
~node:t.node ~commit:t.commit ~new_files_path commit_key
in
t.running_gc <- Some { gc; use_auto_finalisation };
Ok ()

let start_exn ?(unlink = true) ~use_auto_finalisation t commit_key =
let start_exn ?(unlink = true) ~use_auto_finalisation ~new_files_path
t commit_key =
match t.running_gc with
| Some _ ->
[%log.info "Repo is alreadying running GC. Skipping."];
Lwt.return false
| None -> (
let result =
start ~unlink ~use_auto_finalisation t commit_key
start ~unlink ~use_auto_finalisation ~new_files_path t
commit_key
in
match result with
| Ok _ -> Lwt.return true
Expand Down Expand Up @@ -346,6 +350,43 @@ module Maker (Config : Conf.S) = struct
Pack_key.v_direct ~offset ~length ~hash:entry.hash
in
Some key)

let create_one_commit_store t commit_key path =
let () =
match Io.classify_path path with
| `Directory -> ()
| `No_such_file_or_directory ->
Io.mkdir path |> Errs.raise_if_error
| _ -> Errs.raise_error `Invalid_layout
in
let commit_key =
direct_commit_key t commit_key |> Errs.raise_if_error
in
let* launched =
start_exn ~use_auto_finalisation:false ~new_files_path:path t
commit_key
in
let () =
if not launched then Errs.raise_error `Forbidden_during_gc
in
let* latest_gc_target_offset, suffix_start_offset =
match t.running_gc with
| None -> assert false
| Some { gc; _ } -> Gc.finalise_without_swap gc
in
let generation = File_manager.generation t.fm + 1 in
let config = Irmin.Backend.Conf.add t.config Conf.Key.root path in
let () =
File_manager.create_one_commit_store t.fm config ~generation
~latest_gc_target_offset ~suffix_start_offset commit_key
|> Errs.raise_if_error
in
let branch_path = Irmin_pack.Layout.V4.branch ~root:path in
let* branch_store =
Branch.v ~fresh:true ~readonly:false branch_path
in
let* () = Branch.close branch_store in
Lwt.return_unit
end

let batch t f =
Expand Down Expand Up @@ -533,6 +574,7 @@ module Maker (Config : Conf.S) = struct
let flush = X.Repo.flush
let fsync = X.Repo.fsync
let split = X.Repo.split_exn
let create_one_commit_store = X.Repo.Gc.create_one_commit_store

module Gc = struct
type msg = [ `Msg of string ]
Expand Down Expand Up @@ -563,13 +605,18 @@ module Maker (Config : Conf.S) = struct
`Msg err_msg

let finalise_exn = X.Repo.Gc.finalise_exn
let start_exn = X.Repo.Gc.start_exn ~use_auto_finalisation:false

let start_exn ?unlink t =
let root = Irmin_pack.Conf.root t.X.Repo.config in
X.Repo.Gc.start_exn ?unlink ~use_auto_finalisation:false
~new_files_path:root t

let start repo commit_key =
let root = Irmin_pack.Conf.root repo.X.Repo.config in
try
let* started =
X.Repo.Gc.start_exn ~unlink:true ~use_auto_finalisation:true repo
commit_key
X.Repo.Gc.start_exn ~unlink:true ~use_auto_finalisation:true
~new_files_path:root repo commit_key
in
Lwt.return_ok started
with exn -> catch_errors "Start GC" exn
Expand Down
58 changes: 58 additions & 0 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -796,4 +796,62 @@ struct
let chunk_start_idx = pl.chunk_start_idx in
let chunk_num = pl.chunk_num in
cleanup ~root ~generation ~chunk_start_idx ~chunk_num

let create_one_commit_store t config ~generation ~latest_gc_target_offset
~suffix_start_offset commit_key =
let open Result_syntax in
let src_root = t.root in
let dst_root = Irmin_pack.Conf.root config in
(* Step 1. Copy the dict *)
let src_dict = Irmin_pack.Layout.V4.dict ~root:src_root in
let dst_dict = Irmin_pack.Layout.V4.dict ~root:dst_root in
let* () = Io.copy_file ~src:src_dict ~dst:dst_dict in
(* Step 2. Create an empty suffix and close it. *)
let* suffix =
Suffix.create_rw ~root:dst_root ~overwrite:false
~auto_flush_threshold:1_000_000 ~auto_flush_procedure:`Internal
~start_idx:1
in
let* () = Suffix.close suffix in
(* Step 3. Create the control file and close it. *)
let status =
Payload.Gced
{
suffix_start_offset;
generation;
latest_gc_target_offset;
suffix_dead_bytes = Int63.zero;
}
in
let dict_end_poff = Io.size_of_path dst_dict |> Errs.raise_if_error in
let pl =
{
Payload.dict_end_poff;
suffix_end_poff = Int63.zero;
checksum = Int63.zero;
status;
upgraded_from_v3_to_v4 = false;
chunk_num = 1;
chunk_start_idx = 1;
}
in
let path = Irmin_pack.Layout.V4.control ~root:dst_root in
let* control = Control.create_rw ~path ~overwrite:false pl in
let* () = Control.close control in
(* Step 4. Create the index. *)
let* index =
let log_size = Conf.index_log_size config in
let throttle = Conf.merge_throttle config in
Index.v ~fresh:true ~flush_callback:Fun.id ~readonly:false ~throttle
~log_size dst_root
in
(* Step 5. Add the commit to the index, close the index. *)
let () =
match Pack_key.inspect commit_key with
| Pack_key.Direct { hash; offset; length } ->
Index.add index hash (offset, length, Pack_value.Kind.Commit_v2)
| Indexed _ -> assert false
in
let* () = Index.close index in
Ok ()
end
13 changes: 13 additions & 0 deletions src/irmin-pack/unix/file_manager_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ module type S = sig
val generation : t -> int
val gc_allowed : t -> bool
val split : t -> (unit, [> Errs.t ]) result

val create_one_commit_store :
t ->
Irmin.Backend.Conf.t ->
generation:int ->
latest_gc_target_offset:int63 ->
suffix_start_offset:int63 ->
Index.key Pack_key.t ->
(unit, [> open_rw_error | close_error ]) result
(** [create_one_commit_store t conf generation new_store_root key] is called
when creating a new store at [new_store_root] from the existing one,
containing only one commit, specified by the [key]. Ths new store will use
configuration options from [conf] and set to [generation]. *)
end

module type Sigs = sig
Expand Down
16 changes: 13 additions & 3 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ module Make (Args : Gc_args.S) = struct
latest_gc_target_offset : int63;
}

let v ~root ~generation ~unlink ~dispatcher ~fm ~contents ~node ~commit
commit_key =
let v ~root ~new_files_path ~generation ~unlink ~dispatcher ~fm ~contents
~node ~commit commit_key =
let new_suffix_start_offset, latest_gc_target_offset =
let state : _ Pack_key.state = Pack_key.inspect commit_key in
match state with
Expand Down Expand Up @@ -84,7 +84,7 @@ module Make (Args : Gc_args.S) = struct
let task =
Async.async (fun () ->
Worker.run_and_output_result root commit_key new_suffix_start_offset
~generation)
~generation ~new_files_path)
in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "before finalise"
Expand Down Expand Up @@ -282,6 +282,16 @@ module Make (Args : Gc_args.S) = struct
| `Running -> Lwt.return_ok `Running
| #Async.outcome as status -> go status)

let finalise_without_swap t =
let* status = Async.await t.task in
match status with
| `Success ->
Lwt.return (t.latest_gc_target_offset, t.new_suffix_start_offset)
| _ ->
let gc_output = read_gc_output ~root:t.root ~generation:t.generation in
let r = gc_errors status gc_output |> Errs.raise_if_error in
Lwt.return r

let on_finalise t f =
(* Ignore returned promise since the purpose of this
function is to add asynchronous callbacks to the GC
Expand Down
8 changes: 8 additions & 0 deletions src/irmin-pack/unix/gc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Make (Args : Gc_args.S) : sig

val v :
root:string ->
new_files_path:string ->
generation:int ->
unlink:bool ->
dispatcher:Args.Dispatcher.t ->
Expand All @@ -51,5 +52,12 @@ module Make (Args : Gc_args.S) : sig
finalises. *)

val cancel : t -> bool

val finalise_without_swap : t -> (int63 * int63) Lwt.t
(** Waits for the current gc to finish and returns immediately without
swapping the files and doing the other finalisation steps from [finalise].
It returns the [latest_gc_target_offset] and the
[new_suffix_start_offset]. *)
end
with module Args = Args
19 changes: 12 additions & 7 deletions src/irmin-pack/unix/gc_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ module Make (Args : Gc_args.S) = struct

type gc_output = (gc_results, Args.Errs.t) result [@@deriving irmin]

let run ~generation root commit_key new_suffix_start_offset =
let run ~generation ~new_files_path root commit_key new_suffix_start_offset =
let open Result_syntax in
let config =
Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 root
Expand Down Expand Up @@ -179,7 +179,7 @@ module Make (Args : Gc_args.S) = struct
stats := Gc_stats.Worker.add_file_size !stats "mapping" mapping_size
in
(fun f ->
Mapping_file.create ~report_file_sizes ~root ~generation
Mapping_file.create ~report_file_sizes ~root:new_files_path ~generation
~register_entries:f ()
|> Errs.raise_if_error)
@@ fun ~register_entry ->
Expand Down Expand Up @@ -230,7 +230,9 @@ module Make (Args : Gc_args.S) = struct
(* Step 4. Create the new prefix. *)
stats := Gc_stats.Worker.finish_current_step !stats "prefix: start";
let prefix =
let path = Irmin_pack.Layout.V4.prefix ~root ~generation in
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Ao.create_rw_exn ~path
in
let () =
Expand Down Expand Up @@ -262,7 +264,9 @@ module Make (Args : Gc_args.S) = struct
Dispatcher.read_exn dispatcher accessor buf
in
let prefix =
let path = Irmin_pack.Layout.V4.prefix ~root ~generation in
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Io.open_ ~path ~readonly:false |> Errs.raise_if_error
in
Errors.finalise_exn (fun _outcome ->
Expand Down Expand Up @@ -357,11 +361,12 @@ module Make (Args : Gc_args.S) = struct

(* No one catches errors when this function terminates. Write the result in a
file and terminate. *)
let run_and_output_result ~generation root commit_key new_suffix_start_offset
=
let run_and_output_result ~generation ~new_files_path root commit_key
new_suffix_start_offset =
let result =
Errs.catch (fun () ->
run ~generation root commit_key new_suffix_start_offset)
run ~generation ~new_files_path root commit_key
new_suffix_start_offset)
in
let write_result = write_gc_output ~root ~generation result in
write_result |> Errs.log_if_error "writing gc output"
Expand Down
7 changes: 6 additions & 1 deletion src/irmin-pack/unix/gc_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ module Make (Args : Gc_args.S) : sig
module Args : Gc_args.S

val run_and_output_result :
generation:int -> string -> Args.key -> int63 -> unit
generation:int ->
new_files_path:string ->
string ->
Args.key ->
int63 ->
unit

type suffix_params = {
start_offset : int63;
Expand Down
6 changes: 6 additions & 0 deletions src/irmin-pack/unix/io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ module Unix = struct
Ok ()
with Sys_error msg -> Error (`Sys_error msg)

let copy_file ~src ~dst =
let cmd = Filename.quote_command "cp" [ "-p"; src; dst ] in
match Sys.command cmd with
| 0 -> Ok ()
| n -> Error (`Sys_error (Int.to_string n))

let mkdir path =
match (classify_path (Filename.dirname path), classify_path path) with
| `Directory, `No_such_file_or_directory -> (
Expand Down
3 changes: 2 additions & 1 deletion src/irmin-pack/unix/io_errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ module Make (Io : Io.S) : S with module Io = Io = struct
| `Ro_not_allowed
| `Io_misc of Io.misc_error
| `Split_forbidden_during_batch
| `Multiple_empty_chunks ]
| `Multiple_empty_chunks
| `Forbidden_during_gc ]
[@@deriving irmin]

let raise_error = function
Expand Down
Loading

0 comments on commit 9fea325

Please sign in to comment.