Skip to content

Commit

Permalink
Merge pull request #78 from jgehrcke/jp/stargazer-40k-challenge-snaps…
Browse files Browse the repository at this point in the history
…hots

Build up stargazer timeseries beyond 40k gazers using periodic snapshots
  • Loading branch information
jgehrcke committed Sep 30, 2023
2 parents 39be395 + b91dbf4 commit 57fc0c2
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 31 deletions.
106 changes: 92 additions & 14 deletions analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,39 +1401,106 @@ def read_stars_over_time_from_csv() -> pd.DataFrame:
log.info("stargazer_ts_inpath not provided, return emtpy df")
return pd.DataFrame()

log.info("Parse stargazer time series (raw) CSV: %s", ARGS.stargazer_ts_inpath)
log.info("Parse (raw) stargazer time series CSV: %s", ARGS.stargazer_ts_inpath)

df = pd.read_csv( # type: ignore
df_40klim = pd.read_csv( # type: ignore
ARGS.stargazer_ts_inpath,
index_col=["time_iso8601"],
date_parser=lambda col: pd.to_datetime(col, utc=True),
)

# df = df.astype(int)
df.index.rename("time", inplace=True)
log.info("stars_cumulative, raw data: %s", df["stars_cumulative"])
df_40klim.index.rename("time", inplace=True)
log.info("stars_cumulative, raw data: %s", df_40klim["stars_cumulative"])

if not len(df):
if not len(df_40klim):
log.info("CSV file did not contain data, return empty df")
return df
return df_40klim

raw_ts_latest_datetime = df_40klim.index[-1]
# log.info("df_40klim.index: %s", df_40klim.index)
# log.info("raw_ts_latest_datetime: %s", raw_ts_latest_datetime)

# Just to reiterate, this is expected to be the 'raw' API-provided
# timeseries, including each individual stargazer event up to 40k. It may
# not be reasonable to plot this as-is, depending on density and overall
# amount of data points.
df_stargazers_complete = df_40klim

# When ending up here: there is at least one stargazer (fast exit above for
# case 0). Note: the existence of the file `stargazer_ts_snapshot_inpath`
# does not mean that there are more than 40k stargazers. This makes testing
# more credible: execute this code path often.
if os.path.exists(ARGS.stargazer_ts_snapshot_inpath):
log.info(
"Parse (snapshot) stargazer time series CSV: %s",
ARGS.stargazer_ts_snapshot_inpath,
)

df_snapshots_beyond40k = pd.read_csv( # type: ignore
ARGS.stargazer_ts_snapshot_inpath,
index_col=["time_iso8601"],
date_parser=lambda col: pd.to_datetime(col, utc=True),
)
df_snapshots_beyond40k.index.rename("time", inplace=True)

# Unsorted input is unlikely, but still.
df_snapshots_beyond40k.sort_index(inplace=True)

log.info("stargazer snapshots timeseries:\n%s", df_snapshots_beyond40k)

# Defensive: select only those data points that are newer than those in
# df_40klim.
# log.info("df_snapshots_beyond40k.index: %s", df_snapshots_beyond40k.index)
df_snapshots_beyond40k = df_snapshots_beyond40k[
df_snapshots_beyond40k.index > raw_ts_latest_datetime
]

# Is at least one data point left?
if len(df_snapshots_beyond40k):
# Concatenate with 'raw' timeseries, along the same column.
df_snapshots_beyond40k.rename(
columns={"stargazers_cumulative_snapshot": "stars_cumulative"},
inplace=True,
)

# On purpose: overwrite object defined above.
df_stargazers_complete = pd.concat( # type: ignore
[df_stargazers_complete, df_snapshots_beyond40k]
)
log.info("concat result:\n%s", df_stargazers_complete)

# Make the stargazer timeseries that is going to be persisted via git
# contain data from both, the raw timeseries (obtained from API) as well as
# from the snapshots obtained so far; but downsample to at most one data
# point per day. Note that this is for external usage, not used for GHRS.
if ARGS.stargazer_ts_resampled_outpath:
# The CSV file should contain integers after all (no ".0"), therefore
# cast to int. There are no NaNs to be expected, i.e. this should work
# reliably.
df_for_csv_file = resample_to_1d_resolution(df, "stars_cumulative").astype(int)
log.info("stars_cumulative, for CSV file (resampled): %s", df_for_csv_file)
df_for_csv_file = resample_to_1d_resolution(
df_stargazers_complete, "stars_cumulative"
).astype(int)
log.info(
"stars_cumulative, for CSV file (resampled, from raw+snapshots): %s",
df_for_csv_file,
)
log.info("write aggregate to %s", ARGS.stargazer_ts_resampled_outpath)

# Pragmatic strategy against partial write / encoding problems.
tpath = ARGS.stargazer_ts_resampled_outpath + ".tmp"
df_for_csv_file.to_csv(tpath, index_label="time_iso8601")
os.rename(tpath, ARGS.stargazer_ts_resampled_outpath)

df_stargazers_for_plot = df_stargazers_complete

# Many data points? Downsample, for plotting.
if len(df) > 50:
df = downsample_series_to_N_points(df, "stars_cumulative")
if len(df_stargazers_complete) > 50:
df_stargazers_for_plot = downsample_series_to_N_points(
df_stargazers_complete, "stars_cumulative"
)

return df
log.info("df_stargazers_for_plot:\n%s", df_stargazers_for_plot)
return df_stargazers_for_plot


def read_forks_over_time_from_csv() -> pd.DataFrame:
Expand Down Expand Up @@ -1477,7 +1544,7 @@ def read_forks_over_time_from_csv() -> pd.DataFrame:


def downsample_series_to_N_points(df, column):
# Choose a bin time width for downsampling. Identify tovered timespan
# Choose a bin time width for downsampling. Identify covered timespan
# first.

timespan_hours = int(
Expand Down Expand Up @@ -1508,7 +1575,9 @@ def downsample_series_to_N_points(df, column):
# up-sampled data points (so that each data point still reflects an actual
# event or a group of events, but when there was no event within a bin then
# that bin does not appear with a data point in the resulting plot).
s = s.resample(f"{bin_width_hours}h").max().dropna()
# The resample operation might put the last data point into the future,
# Let's correct for that by putting origin="end".
s = s.resample(f"{bin_width_hours}h", origin="end").max().dropna()

log.info("len(series): %s", len(s))

Expand Down Expand Up @@ -1585,13 +1654,22 @@ def parse_args():
help="Write resampled stargazer time series to CSV file (at most "
"one sample per day). No file is created if time series is empty.",
)

parser.add_argument(
"--stargazer-ts-inpath",
default="",
metavar="PATH",
help="Read raw stargazer time series from CSV file. File must exist, may be empty.",
)

parser.add_argument(
"--stargazer-ts-snapshot-inpath",
default="",
metavar="PATH",
help="Read snapshot-based stargazer time series from CSV file "
"(helps accounting for the 40k limit). File not required to exist. ",
)

parser.add_argument(
"--fork-ts-resampled-outpath",
default="",
Expand Down
11 changes: 7 additions & 4 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ echo "fetch.py for ${STATS_REPOSPEC}"
# reduce the likelihood for bad order of log lines in the GH Action log viewer
# (seen `error: fetch.py returned with code 1 -- exit.` before the last line of
# the CPython stderr stream was shown.)

export PYTHONUNBUFFERED="on"

set +e
Expand All @@ -145,7 +144,8 @@ set -x
python "${GHRS_FILES_ROOT_PATH}/fetch.py" "${STATS_REPOSPEC}" \
--snapshot-directory=newsnapshots \
--fork-ts-outpath=forks-raw.csv \
--stargazer-ts-outpath=stars-raw.csv
--stargazer-ts-outpath=stars-raw.csv \
--stargazer-ts-snapshots-inoutpath=ghrs-data/stargazer-snapshots.csv
FETCH_ECODE=$?
set +x
set -e
Expand All @@ -167,12 +167,14 @@ set -x
mkdir -p ghrs-data/snapshots
cp -a newsnapshots/* ghrs-data/snapshots || echo "copy failed, ignore (continue)"

# New data files: show them from git's point of view.
# New/updated data files: show them from git's point of view.
git status --untracked=no --porcelain

# exit code 0 when nothing added
# Exit code 0 when nothing added
git add ghrs-data/snapshots

git add ghrs-data/stargazer-snapshots.csv || echo "failed, ignore"

# exit code 1 upon 'nothing to commit, working tree clean'
git commit -m "ghrs: snap ${UPDATE_ID} for ${STATS_REPOSPEC}" || echo "commit failed, ignore (continue)"

Expand All @@ -188,6 +190,7 @@ python "${GHRS_FILES_ROOT_PATH}/analyze.py" \
--output-directory latest-report \
--outfile-prefix "" \
--stargazer-ts-inpath "stars-raw.csv" \
--stargazer-ts-snapshot-inpath "ghrs-data/stargazer-snapshots.csv" \
--fork-ts-inpath "forks-raw.csv" \
--stargazer-ts-resampled-outpath "ghrs-data/stargazers.csv" \
--fork-ts-resampled-outpath "ghrs-data/forks.csv" \
Expand Down
138 changes: 125 additions & 13 deletions fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@


# Get tz-aware datetime object corresponding to invocation time.
# Note: could do `datetime.now(timezone.utc)` instead these days.
NOW = pytz.timezone("UTC").localize(datetime.utcnow())
INVOCATION_TIME_STRING = NOW.strftime("%Y-%m-%d_%H%M%S")

Expand Down Expand Up @@ -110,22 +111,114 @@ def main() -> None:
fetch_and_write_fork_ts(repo, args.fork_ts_outpath)

if args.stargazer_ts_outpath:
fetch_and_write_stargazer_ts(repo, args.stargazer_ts_outpath)
fetch_and_write_stargazer_ts(repo, args)

log.info("done!")


def fetch_and_write_stargazer_ts(repo: Repository.Repository, path: str):
dfstarscsv = get_stars_over_time(repo)
def fetch_and_write_stargazer_ts(repo: Repository.Repository, args):
"""
Fetch the complete stargazer timeseries as provided by the GitHub HTTP API.
Remarks:
- Each stargazer is represented ("raw" timeseries), analzye.py downsamples
to one datapoint per day (this is the timeseries one that is persisted
via git, not the "raw" one).
- Only the first 40k stargazers are represented; we assemble additional
history based on periodically obtained snapshots.
Idea: fetch both.
"""
# The JSON response to https://api.github.com/repos/<org>/<repo> contains
# the current stargazer count, not subject to the 40k limit. Fetching this
# periodically allows for building up a stargazer timeseries beyond said
# limit. Also see https://github.com/jgehrcke/github-repo-stats/issues/76

current_stargazer_count = repo.stargazers_count
log.info(
"current stargazer count as reported by repo properties: %s",
current_stargazer_count,
)

# Prepare current snapshot as pandas DataFrame. Will either be
# - appended to existing dataset (CSV file existing)
# - used to create a fresh dataset (no CSV file existing)
# - dropped (CSV file existing, but stargazer count did not change)
current_snapshot_df = pd.DataFrame(
data={"stargazers_cumulative_snapshot": [current_stargazer_count]},
index=pd.to_datetime([NOW.replace(microsecond=0)]),
)
current_snapshot_df.index.name = "time"

updated_sdf = None

if os.path.exists(args.stargazer_ts_snapshots_inoutpath):
log.info("read %s", args.stargazer_ts_snapshots_inoutpath)
sdf = pd.read_csv( # type: ignore
args.stargazer_ts_snapshots_inoutpath,
index_col=["time_iso8601"],
date_parser=lambda col: pd.to_datetime(col, utc=True),
)
sdf.index.rename("time", inplace=True)
log.info(
"stargazers_cumulative_snapshot, raw data from %s:\n%s",
args.stargazer_ts_snapshots_inoutpath,
sdf["stargazers_cumulative_snapshot"],
)

if current_stargazer_count == sdf["stargazers_cumulative_snapshot"].iloc[-1]:
log.info("current stargazer count matches last snapshot, skip update")
# As an optimization, in this case we also do not need to fetch the
# complete stargazer timeseries below; and can simply return from
# this function
return

else:
log.info("stargazer count changed; append snapshot to existing history")
updated_sdf = pd.concat([sdf, current_snapshot_df]) # type: ignore

else:
# Data file does not exist yet (first time invocation?). Start building
# up this timeseries: create this data file, containing precisely one
# data point. I hope this is an integer for the special case of 0/zero
# stargazers.
log.info("does not exist yet: %s", args.stargazer_ts_snapshots_inoutpath)
updated_sdf = current_snapshot_df

if updated_sdf is not None:
tmppath = args.stargazer_ts_snapshots_inoutpath + ".tmp" # todo: rnd string
# The idea here is to write the snapshot-based history before the 40k
# limit is reached to not have too divergent code paths between types
# of repos.
log.info(
"write cumulative/snapshot-based stargazer time series to %s, then rename to %s",
tmppath,
args.stargazer_ts_snapshots_inoutpath,
)
updated_sdf.to_csv(tmppath, index_label="time_iso8601")
os.rename(tmppath, args.stargazer_ts_snapshots_inoutpath)

if current_stargazer_count > 40000:
if os.path.exists(args.stargazer_ts_outpath):
log.info("40k limit crossed; skip (re)fetching entire stargazer timeseries")
return

log.info(
"40k limit crossed, but %s does not exist yet -- fetch first 40k",
args.stargazer_ts_outpath,
)

dfstarscsv = get_stars_over_time_40k_limit(repo)
log.info("stars_cumulative, for CSV file:\n%s", dfstarscsv)
tpath = path + ".tmp" # todo: rnd string
tpath = args.stargazer_ts_outpath + ".tmp" # todo: rnd string
log.info(
"write stargazer time series to %s, then rename to %s",
tpath,
path,
args.stargazer_ts_outpath,
)
dfstarscsv.to_csv(tpath, index_label="time_iso8601")
os.rename(tpath, path)
os.rename(tpath, args.stargazer_ts_outpath)


def fetch_and_write_fork_ts(repo: Repository.Repository, path: str):
Expand Down Expand Up @@ -208,6 +301,14 @@ def parse_args():
help="Fetch stargazer time series and write to this CSV file. Overwrite if file exists.",
)

# TODO: make this required
parser.add_argument(
"--stargazer-ts-snapshots-inoutpath",
default="",
metavar="PATH",
help="read/write stargazer time series snapshots, overwrite (append to) file if exists",
)

args = parser.parse_args()

if "/" not in args.repo:
Expand Down Expand Up @@ -364,7 +465,12 @@ def get_forks_over_time(repo: Repository.Repository) -> pd.DataFrame:
return df


def get_stars_over_time(repo: Repository.Repository) -> pd.DataFrame:
def get_stars_over_time_40k_limit(repo: Repository.Repository) -> pd.DataFrame:
"""
Fetch stargazer-over-time from beginning of time. This returns at most
the oldest 40.000 stargazers (a GitHub HTTP API limitation, see
https://github.com/jgehrcke/github-repo-stats/issues/76).
"""
# TODO: for ~10k stars repositories, this operation is too costly for doing
# it as part of each analyzer invocation. Move this to the fetcher, and
# persist the data.
Expand Down Expand Up @@ -433,13 +539,19 @@ def handle_rate_limit_error(exc):
log.warning("GitHub abuse mechanism triggered, wait 60 s, retry")
return True

needles_perm_err = [
"Resource not accessible by integration",
"Must have push access to repository",
]

if "403" in str(exc):
if "Resource not accessible by integration" in str(exc):
log.error(
'this appears to be a permanent error, as in "access denied -- do not retry": %s',
str(exc),
)
sys.exit(1)
for needle in needles_perm_err:
if needle in str(exc):
log.error(
'this appears to be a permanent error, as in "access denied -- do not retry": %s',
str(exc),
)
sys.exit(1)

log.warning("Exception contains 403, wait 60 s, retry: %s", str(exc))
# The request count quota is not necessarily responsible for this
Expand Down

0 comments on commit 57fc0c2

Please sign in to comment.