Skip to content

Commit

Permalink
perf: ⚡ increase perf of counts and remove connection to prevent lock…
Browse files Browse the repository at this point in the history
…ing (#30)
  • Loading branch information
HannesOberreiter committed May 12, 2024
1 parent a534d54 commit cbbefb9
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 62 deletions.
17 changes: 6 additions & 11 deletions pkg/gbif/gbif.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package gbif

import (
"context"
"database/sql"
"encoding/json"
"errors"
Expand All @@ -27,7 +26,7 @@ var (
occurrenceStatus = "occurrenceStatus=PRESENT"
)

const SampleRows = "5"
const SampleRows = "25"

// Response is the response from the GBIF API for the occurrence search
type Response struct {
Expand Down Expand Up @@ -162,26 +161,22 @@ func FetchLatest(taxonID string) []LatestObservation {
// SaveObservation saves the latest observation for each taxon
// It first clears the old observations for each taxon before inserting the new ones
// to improve performance each insert contains alls new observations for this taxa at once
func SaveObservation(observation [][]LatestObservation, conn *sql.Conn, ctx context.Context) {
func SaveObservation(observation [][]LatestObservation, db *sql.DB) {
slog.Info("Updating observations", "taxa", len(observation))
const stmt = "INSERT INTO observations (ObservationID, TaxonID, CountryCode, ObservationDate, ObservationDateOriginal) VALUES"
for _, res := range observation {
var insertString []string
clearOldObservations(conn, ctx, res[0].TaxonID)
clearOldObservations(db, res[0].TaxonID)
slog.Info("Inserting new for taxaId", "observations", len(res), "taxaId", res[0].TaxonID)
for _, obs := range res {
insertString = append(insertString, fmt.Sprintf("('%s', '%s', '%s', '%s', '%s')", obs.ObservationID, obs.TaxonID, obs.CountryCode, obs.ObservationDate, obs.ObservationOriginalDate))
}
query := stmt + strings.Join(insertString, ",") + " ON CONFLICT DO NOTHING;"
_, err := conn.ExecContext(ctx, query)
_, err := db.Exec(query)
if err != nil {
slog.Error("Database error on inserting new observations", err)
}
}
err := conn.Close()
if err != nil {
slog.Error("Failed to close connection", err)
}
}

// Get the synonym id for a taxon id, this is used if fetch is called on a synonym
Expand Down Expand Up @@ -374,8 +369,8 @@ func CleanDate(date string) string {

// We are only interested in the latest observation for each taxon, so we clear the old ones before inserting new ones
// runs in the same transaction as SaveObservation
func clearOldObservations(conn *sql.Conn, ctx context.Context, taxonID string) {
res, err := conn.ExecContext(ctx, "DELETE FROM observations WHERE TaxonID = ?", taxonID)
func clearOldObservations(db *sql.DB, taxonID string) {
res, err := db.Exec("DELETE FROM observations WHERE TaxonID = ?", taxonID)
if err != nil {
slog.Error("Database error on clearing old observations", err)
}
Expand Down
12 changes: 2 additions & 10 deletions pkg/gbif/gbif_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gbif

import (
"context"
"database/sql"
"log"
"log/slog"
Expand Down Expand Up @@ -60,17 +59,10 @@ func TestSaveObservations(t *testing.T) {
var observations [][]LatestObservation
observations = append(observations, []LatestObservation{observation})

ctx := context.Background()
conn, err := internal.DB.Conn(ctx)
if err != nil {
slog.Error("Failed to create connection", err)
}
defer conn.Close()

SaveObservation(observations, conn, ctx)
SaveObservation(observations, internal.DB)

var count int
err = internal.DB.QueryRow("SELECT COUNT(*) FROM observations WHERE TaxonID = ?", DemoTaxa[0]).Scan(&count)
err := internal.DB.QueryRow("SELECT COUNT(*) FROM observations WHERE TaxonID = ?", DemoTaxa[0]).Scan(&count)
if err != nil {
log.Fatal(err)
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func GetCounts(db *sql.DB, q Query) Counts {
var taxaCount int
var observationCount int

observationQuery := sq.Select("COUNT(DISTINCT(observations.TaxonID, observations.CountryCode))").From("observations").LeftJoin("taxa ON observations.TaxonID = taxa.SynonymID")
observationQuery := sq.Select("COUNT(*)").From("observations").InnerJoin("taxa ON observations.TaxonID = taxa.TaxonID")

createFilterQuery(&observationQuery, q)
err = observationQuery.RunWith(db).QueryRow().Scan(&observationCount)
if err != nil {
Expand All @@ -127,7 +128,7 @@ func GetCounts(db *sql.DB, q Query) Counts {
if q.COUNTRY != "" {
taxaCount = observationCount // There should be only one taxa per observation per country
} else {
taxaQuery := sq.Select("COUNT(DISTINCT taxa.SynonymID)").From("taxa")
taxaQuery := sq.Select("COUNT(*)").From("taxa").Where(sq.Eq{"isSynonym": false})

createFilterQuery(&taxaQuery, q)
err = taxaQuery.RunWith(db).QueryRow().Scan(&taxaCount)
Expand Down Expand Up @@ -173,6 +174,10 @@ func GetTableData(db *sql.DB, q Query, increaseLimit ...bool) []TableRow {
}
}

if !q.SHOW_SYNONYMS {
query = query.Where(sq.Or{sq.Eq{"isSynonym": false}})
}

createFilterQuery(&query, q)

rows, err := query.RunWith(db).Query()
Expand Down Expand Up @@ -275,10 +280,6 @@ func countryCodeToFlag(x string) (country, flag string) {
}

func createFilterQuery(query *sq.SelectBuilder, q Query) {
if !q.SHOW_SYNONYMS {
*query = query.Where(sq.Or{sq.Eq{"isSynonym": false}, sq.Eq{"SynonymID": nil}})
}

if q.SEARCH != "" {
*query = query.Where(sq.ILike{"ScientificName": "%" + q.SEARCH + "%"})
}
Expand Down
14 changes: 1 addition & 13 deletions scripts/cron/cron.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package main

import (
"context"
"log/slog"
"os"
"time"

"github.com/HannesOberreiter/gbif-extinct/internal"
"github.com/HannesOberreiter/gbif-extinct/pkg/gbif"
Expand Down Expand Up @@ -40,15 +38,5 @@ func main() {
return
}

var err error
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := internal.DB.Conn(ctx)
if err != nil {
slog.Error("Failed to create connection", err)
return
}
defer conn.Close()

gbif.SaveObservation(results, conn, ctx)
cancel()
gbif.SaveObservation(results, internal.DB)
}
24 changes: 2 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,9 @@ func fetch(c echo.Context) error {
return c.String(http.StatusNotFound, "No data found")
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := internal.DB.Conn(ctx)
if err != nil {
slog.Error("Failed to create connection", err)
cancel()
return c.String(http.StatusInternalServerError, "Failed database connection")
}
defer conn.Close()
var results [][]gbif.LatestObservation
results = append(results, res)
gbif.SaveObservation(results, conn, ctx)
cancel()
gbif.SaveObservation(results, internal.DB)

c.Response().Header().Set("HX-Trigger", "filterSubmit")
return c.String(http.StatusOK, "Updated")
Expand Down Expand Up @@ -246,18 +237,7 @@ func cronFetch() {
return
}

var err error
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := internal.DB.Conn(ctx)
if err != nil {
slog.Error("Failed to create connection", err)
cancel()
return
}
defer conn.Close()

gbif.SaveObservation(results, conn, ctx)
cancel()
gbif.SaveObservation(results, internal.DB)
}

// Utility function to render a template
Expand Down

0 comments on commit cbbefb9

Please sign in to comment.