Switch to ref-per-commit approach

This enables single-commit push, and simpler deletion of old snapshots.
This commit is contained in:
jan 2020-12-24 22:34:49 -08:00
parent 055a7cf581
commit 6558595572

View File

@ -3,7 +3,7 @@
Git snapshotting tool Git snapshotting tool
""" """
from typing import List, Union, Optional from typing import Sequence, Union, Optional
import subprocess import subprocess
import tempfile import tempfile
import datetime import datetime
@ -11,18 +11,25 @@ import argparse
from itertools import chain from itertools import chain
def _run(command: Union[str, List[str]], **kwargs) -> str: def _run(command: Union[str, Sequence[str]], **kwargs) -> str:
""" """
Wrapper for `subprocess.run()`: Wrapper for `subprocess.run()`:
- Accepts args as either a list of strings or space-delimited string - Accepts args as either a list of strings or space-delimited string
- Captures and returns stdout - Captures and returns stdout
Args:
command: A list of strings or a space-delimited string
**kwargs: Passed to `subprocess.run()`
Returns:
Captured stdout
""" """
args: Sequence[str]
if isinstance(command, str): if isinstance(command, str):
args = command.split() args = command.split()
else: else:
args = command args = command
# print('run', ' '.join(args))
result = subprocess.run(args, stdout=subprocess.PIPE, **kwargs) result = subprocess.run(args, stdout=subprocess.PIPE, **kwargs)
return result.stdout.decode().strip() return result.stdout.decode().strip()
@ -33,14 +40,14 @@ def get_latest_commit(short: bool = True, cwd: Optional[str] = None) -> str:
This includes non-lethe commits. This includes non-lethe commits.
""" """
fmt = 'h' if short else 'H' fmt = 'h' if short else 'H'
return _run('git log --all -1 --format=%{}'.format(fmt), cwd=cwd) return _run(f'git log --all -1 --format=%{fmt}', cwd=cwd)
def shorten_hash(sha: str, cwd: Optional[str] = None) -> str: def shorten_hash(sha: str, cwd: Optional[str] = None) -> str:
""" """
Get the short version of a hash Get the short version of a hash
""" """
return _run('git rev-parse --short {}'.format(sha), cwd=cwd) return _run(f'git rev-parse --short {sha}', cwd=cwd)
def get_root(cwd: Optional[str] = None) -> str: def get_root(cwd: Optional[str] = None) -> str:
@ -76,7 +83,7 @@ def get_tree(ref: str, cwd: Optional[str] = None) -> str:
def commit_tree(tree: str, def commit_tree(tree: str,
parents: List[str], parents: Sequence[str],
message: Optional[str] = None, message: Optional[str] = None,
cwd: Optional[str] = None, cwd: Optional[str] = None,
) -> str: ) -> str:
@ -95,6 +102,7 @@ def commit_tree(tree: str,
def update_ref(target_ref: str, def update_ref(target_ref: str,
target_commit: str, target_commit: str,
old_commit: Optional[str] = None, old_commit: Optional[str] = None,
*,
message: str = 'new snapshot', message: str = 'new snapshot',
cwd: Optional[str] = None, cwd: Optional[str] = None,
) -> str: ) -> str:
@ -111,13 +119,23 @@ def update_ref(target_ref: str,
def push_ref(remote: str = 'origin', def push_ref(remote: str = 'origin',
target_ref: str = 'refs/lethe/HEAD', target_ref: str = 'refs/lethe/LATEST',
remote_ref: Optional[str] = None, remote_ref: Optional[str] = None,
*,
cwd: Optional[str] = None, cwd: Optional[str] = None,
) -> str: ) -> str:
""" """
Push `target_ref` to `remote` as `remote_ref`. Push `target_ref` to `remote` as `remote_ref`.
By default, `remote_ref` will be the same as `target_ref`. By default, `remote_ref` will be the same as `target_ref`.
Args:
remote: git remote to push to (default 'origin')
target_ref: ref to push (default 'refs/lethe/LATEST')
remote_ref: ref to push to (default same as `target_ref`)
cwd: Repository directory. Default is current working directory.
Returns:
git command stdout
""" """
if remote_ref is None: if remote_ref is None:
remote_ref = target_ref remote_ref = target_ref
@ -125,29 +143,52 @@ def push_ref(remote: str = 'origin',
def fetch_ref(remote: str = 'origin', def fetch_ref(remote: str = 'origin',
remote_ref: str = 'refs/lethe/HEAD', remote_ref: str = 'refs/lethe/LATEST',
target_ref: Optional[str] = None, target_ref: Optional[str] = None,
*,
cwd: Optional[str] = None, cwd: Optional[str] = None,
) -> str: ) -> str:
""" """
Fetch `remote_ref` from `remote` as `target_ref`. Fetch `remote_ref` from `remote` as `target_ref`.
By default, `target_ref` will be the same as `remote_ref`. By default, `target_ref` will be the same as `remote_ref`.
Args:
remote: git remote to push to (default 'origin')
remote_ref: ref to fetch from (default 'refs/lethe/LATEST')
target_ref: ref to fetch to (default same as `remote_ref`)
cwd: Repository directory. Default is current working directory.
Returns:
git command stdout
""" """
if target_ref is None: if target_ref is None:
target_ref = remote_ref target_ref = remote_ref
return _run(['git', 'fetch', remote, remote_ref + ':' + target_ref], cwd=cwd) return _run(['git', 'fetch', remote, remote_ref + ':' + target_ref], cwd=cwd)
def deref_symref(ref: str, cwd: Optional[str] = None) -> str: def deref_symref(ref: str,
*,
cwd: Optional[str] = None,
) -> str:
""" """
Dereference a symbolic ref Dereference a symbolic ref
""" """
return _run(['git', 'symbolic-ref', '--quiet', ref], cwd=cwd) return _run(['git', 'symbolic-ref', '--quiet', ref], cwd=cwd)
def find_merge_base(commits: List[str], cwd: Optional[str] = None) -> str: def find_merge_base(commits: Sequence[str],
*,
cwd: Optional[str] = None,
) -> str:
""" """
Find the "best common ancestor" commit. Find the "best common ancestor" commit.
Args:
commits: Collection of commits to find the best common ancestor for
cwd: Repository directory. Default is current working directory.
Returns:
Hash of the best common ancestor commit
""" """
if len(commits) == 0: if len(commits) == 0:
raise Exception('Called find_merge_base with no commits!') raise Exception('Called find_merge_base with no commits!')
@ -159,7 +200,7 @@ def find_merge_base(commits: List[str], cwd: Optional[str] = None) -> str:
return base return base
def snap_tree(cwd: Optional[str] = None) -> str: def snap_tree(*, cwd: Optional[str] = None) -> str:
""" """
Create a new tree, consisting of all non-ignored files in the repository. Create a new tree, consisting of all non-ignored files in the repository.
Return the hash of the tree. Return the hash of the tree.
@ -173,9 +214,10 @@ def snap_tree(cwd: Optional[str] = None) -> str:
return tree return tree
def snap_ref(parent_refs: List[str], def snap_ref(parent_refs: Sequence[str],
target_refs: List[str], target_refs: Sequence[str],
message: Optional[str] = None, message: Optional[str] = None,
*,
cwd: Optional[str] = None, cwd: Optional[str] = None,
) -> str: ) -> str:
""" """
@ -185,16 +227,7 @@ def snap_ref(parent_refs: List[str],
parent_commits = [c for c in [get_commit(p, cwd=cwd) for p in parent_refs] if c] parent_commits = [c for c in [get_commit(p, cwd=cwd) for p in parent_refs] if c]
old_commits = [get_commit(t, cwd=cwd) for t in target_refs] old_commits = [get_commit(t, cwd=cwd) for t in target_refs]
extant_old_commits = list(set(c for c in old_commits if c)) commit = commit_tree(new_tree, set(parent_commits), message, cwd=cwd)
new_parents = list(set(p for p in parent_commits
if p != find_merge_base([p] + extant_old_commits, cwd=cwd)))
# if not new_parents:
# tree_unchanged = all(new_tree == get_tree(c, cwd=cwd) for c in old_commits)
# if tree_unchanged:
# return new_tree
commit = commit_tree(new_tree, extant_old_commits + new_parents, message, cwd=cwd)
for target_ref, old_commit in zip(target_refs, old_commits): for target_ref, old_commit in zip(target_refs, old_commits):
# update ref to point to commit, or create new ref # update ref to point to commit, or create new ref
@ -204,41 +237,58 @@ def snap_ref(parent_refs: List[str],
return commit return commit
def snap(parent_refs: Optional[List[str]] = None, def snap(parent_refs: Optional[Sequence[str]] = None,
target_refs: Optional[List[str]] = None, target_refs: Optional[Sequence[str]] = None,
message: Optional[str] = None, message: Optional[str] = None,
*,
cwd: Optional[str] = None, cwd: Optional[str] = None,
) -> str: ) -> str:
""" """
Create a new commit of all non-ignored files in the repository. Create a new commit, containing all non-ignored files.
`parent_refs` default to `['HEAD']`. Args:
If there are any symbolic refs in `parent_refs`, the refs parent_refs: Refs in this list are set as parents of the new commit.
they point to are added to <parent_refs>. If any symbolic refs are included, the underlying refs they point
to are also added to `parent_refs`. Defaults to `['HEAD']`.
All commits pointed to by existing `parent_refs` and `target_refs`, target_refs: Refs in this list will be updated to point to the new commit.
become parents of the newly created commit.
`target_refs` are created/updated to point the commit.
Default is Default is
'refs/lethe/head_name' for each parent ref of the form - 'refs/lethe/LATEST', and
'refs/heads/head_name', and - 'refs/lethe/path/to/ref/LATEST' and
'refs/lethe/path/to/ref' for each parent ref of the form 'refs/lethe/path/to/ref/snap_2020-12-20_18.21.14.281876'
'refs/path/to/ref'. for each parent_ref of the form 'refs/path/to/ref'.
`message` is used as the commit message. Symbolic refs are dereferenced, so a parent_ref of 'master'
will results in the creation/overwrite of
'refs/lethe/heads/master/LATEST'
Note that 'refs/lethe/LATEST' must be specified explicitly if the default
is overridden
message: The commit message. Default is a simple note containing the current
date/time.
cwd: Path to the repository. Default is the current working directory.
Returns:
The hash for the new commit.
""" """
if parent_refs is None: if parent_refs is None:
parent_refs = ['HEAD'] parent_refs = ['HEAD']
else:
parent_refs = list(parent_refs)
parent_refs += [r for r in [deref_symref(s, cwd=cwd) for s in parent_refs] if r] parent_refs += [r for r in [deref_symref(s, cwd=cwd) for s in parent_refs] if r]
if target_refs is None: date_str = str(datetime.datetime.now())
target_refs = [] date_ref = date_str.replace(' ', '_').replace(':', '.')
for p in parent_refs:
if p.startswith('refs/'):
p = p[len('refs/'):]
target_refs.append('refs/lethe/' + p) if target_refs is None:
target_refs = ['refs/lethe/LATEST']
for pp in parent_refs:
if not pp.startswith('refs/'):
continue
target_base = 'refs/lethe/' + pp[len('refs/'):]
target_refs += [target_base + '/LATEST',
target_base + '/' + date_ref]
if message is None:
message = 'snapshot ' + date_str
commit = snap_ref(parent_refs, target_refs, message=message, cwd=cwd) commit = snap_ref(parent_refs, target_refs, message=message, cwd=cwd)
return commit return commit