ATProto Browser

ATProto Browser

Experimental browser for the Atmosphere

Record data

{
  "uri": "at://did:plc:by3jhwdqgbtrcc7q4tkkv3cf/com.whtwnd.blog.entry/3l7yquwnc7q2t",
  "cid": "bafyreidgiacy5p4w4l3ttrduhew7odrpsybr336kntkq3dfajyousvdkru",
  "value": {
    "$type": "com.whtwnd.blog.entry",
    "theme": "github-light",
    "title": "Sort a 42 GB CSV quickly",
    "content": "# Sort a 42 GB CSV quickly\n\nSay you're, I don't know, building [a website that shows the most popular emojis on Bluesky realtime](https://emojistats.bsky.sh). Further assume that you want to move the MVP architecture that's mostly just Redis to Postgres + [Timescale](https://www.timescale.com/) which unlocks a lot of potential for resilience and features. \n\nOf course, you want to validate and test your architecture, and for that, you create a bunch of mock data, a [hypertable](https://docs.timescale.com/use-timescale/latest/hypertables/) of about 550m emojis, which is about 5x more than what's currently on Bluesky. You gather two days' worth of of data from the firehose, then run\n\n```sql\nBEGIN;\nINSERT INTO emojis\nSELECT * FROM emojis;\nCOMMIT;\n```\n\non repeat until you have enough test data. \n\nAnd for something quick and dirty, this does the job. But oh! You're working with time series data. Bluesky has been around for two years, so it'd be worth stretching all this data out, time-wise, from 2 days to 2 years. \n\nSo you dump your table in a CSV:\n\n```sql\nCOPY (SELECT * from emojis) TO '/path/to/emojis.csv' WITH (FORMAT CSV, DELIMITER ',', HEADER TRUE);\n```\n\nAnd now you have a file that looks something like this:\n\n```csv\n$ head emojis.csv\ndid,rkey,emoji,lang,created_at\ndid:plc:pfgs64jdrbznui4qktarwvvz,3l7aowcsbgj23,🫶,en,2024-01-31T13:47:33Z\ndid:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,💥,en,2024-06-06T21:18:01Z\ndid:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,💥,en,2024-05-10T02:16:11Z\ndid:plc:4ojtgthev5iexawd7k63ri43,3l7aowe6e3x2z,💥,en,2023-06-19T16:46:21Z\ndid:plc:oemwnvl3bdhbglqsu4vtutg4,3l7aowe6tvz2y,⁉️,ja,2024-03-17T23:13:22Z\ndid:plc:enbbvjkhuzg3bzmlxjjdko2a,3l7aowejtcc24,🥶,en,2024-05-16T03:36:28Z\ndid:plc:sdx2znbevuvup2osq5ng24yf,3l7aowd44ts2a,✌️,ja,2022-12-22T10:52:09Z\ndid:plc:yfozqgqtw7shp23rgj2lb57o,3l7aowf2mqv27,🥰,es,2024-10-14T02:14:23Z\ndid:plc:yfcrcqqhqzt7h6aikn4ehypj,3l7aowfa3tk2j,😵‍💫,zh,2023-04-04T10:53:03Z\n```\n\nNow, how do you stretch this out to two years? You ask o1-mini to write a Go program that does it as fast as possible for you.\n\n```go\npackage main\n\nimport (\n\t\"bufio\"\n\t\"encoding/csv\"\n\t\"fmt\"\n\t\"io\"\n\t\"log\"\n\t\"math/rand\"\n\t\"os\"\n\t\"runtime\"\n\t\"strconv\"\n\t\"strings\"\n\t\"sync\"\n\t\"time\"\n)\n\n// Configuration Constants\nconst (\n\tINPUT_CSV         = \"/path/to/emojis_dump.csv\"     // Path to the exported CSV\n\tOUTPUT_CSV        = \"/path/to/emojis_modified.csv\" // Path for the modified CSV\n\tNUM_WORKERS       = 8                                      // Number of concurrent worker goroutines\n\tCHANNEL_BUFFER    = 10000                                  // Buffer size for channels\n\tPROGRESS_INTERVAL = 1000000                                // Log progress every 1,000,000 records\n)\n\n// Record represents a single CSV record\ntype Record struct {\n\tFields []string\n\tLine   int // Line number for logging purposes\n}\n\n// ProcessedRecord represents a processed CSV record ready to be written\ntype ProcessedRecord struct {\n\tFields []string\n\tLine   int // Line number for logging purposes\n}\n\nfunc main() {\n\tstartTime := time.Now()\n\tfmt.Println(\"Starting processing...\")\n\n\t// Ensure output directory exists\n\toutputDir := \"/path/to/tmp/\" // Adjust based on OUTPUT_CSV path\n\tif err := os.MkdirAll(outputDir, os.ModePerm); err != nil {\n\t\tlog.Fatalf(\"Error creating output directory: %v\", err)\n\t}\n\n\t// Open input CSV file\n\tinputFile, err := os.Open(INPUT_CSV)\n\tif err != nil {\n\t\tlog.Fatalf(\"Error opening input CSV file: %v\", err)\n\t}\n\tdefer inputFile.Close()\n\n\t// Create output CSV file\n\toutputFile, err := os.Create(OUTPUT_CSV)\n\tif err != nil {\n\t\tlog.Fatalf(\"Error creating output CSV file: %v\", err)\n\t}\n\tdefer outputFile.Close()\n\n\t// Initialize CSV reader and writer with buffered I/O\n\treader := csv.NewReader(bufio.NewReaderSize(inputFile, 16*1024*1024))  // 10MB buffer\n\twriter := csv.NewWriter(bufio.NewWriterSize(outputFile, 16*1024*1024)) // 10MB buffer\n\n\t// Read header\n\theader, err := reader.Read()\n\tif err != nil {\n\t\tlog.Fatalf(\"Error reading CSV header: %v\", err)\n\t}\n\n\t// Identify the index of 'created_at' column\n\tcreatedAtIdx := -1\n\tfor idx, col := range header {\n\t\tif strings.TrimSpace(col) == \"created_at\" {\n\t\t\tcreatedAtIdx = idx\n\t\t\tbreak\n\t\t}\n\t}\n\tif createdAtIdx == -1 {\n\t\tlog.Fatalf(\"'created_at' column not found in CSV header.\")\n\t}\n\n\t// Write header to output CSV\n\tif err := writer.Write(header); err != nil {\n\t\tlog.Fatalf(\"Error writing header to output CSV: %v\", err)\n\t}\n\twriter.Flush()\n\n\t// Channels for pipeline\n\trecordChan := make(chan Record, CHANNEL_BUFFER)\n\tprocessedChan := make(chan ProcessedRecord, CHANNEL_BUFFER)\n\n\t// WaitGroups to synchronize goroutines\n\tvar wgWorkers sync.WaitGroup\n\tvar wgWriter sync.WaitGroup\n\n\t// Start worker goroutines\n\tfor i := 0; i < NUM_WORKERS; i++ {\n\t\twgWorkers.Add(1)\n\t\tgo worker(&wgWorkers, recordChan, processedChan, createdAtIdx, len(header))\n\t}\n\n\t// Start writer goroutine\n\twgWriter.Add(1)\n\tgo writerGoroutine(&wgWriter, writer, processedChan)\n\n\t// Read and dispatch records\n\tlineNumber := 1 // Starting after header\n\tfor {\n\t\trecordFields, err := reader.Read()\n\t\tif err == io.EOF {\n\t\t\tbreak\n\t\t}\n\t\tif err != nil {\n\t\t\tlog.Printf(\"Error reading record at line %d: %v\", lineNumber+1, err)\n\t\t\tcontinue\n\t\t}\n\n\t\trec := Record{\n\t\t\tFields: recordFields,\n\t\t\tLine:   lineNumber + 1,\n\t\t}\n\n\t\trecordChan <- rec\n\t\tlineNumber++\n\n\t\t// Optional: Implement early termination or error handling if needed\n\t}\n\n\t// Close the record channel to signal workers no more records are coming\n\tclose(recordChan)\n\n\t// Wait for all workers to finish processing\n\twgWorkers.Wait()\n\n\t// Close the processed channel to signal writer no more records are coming\n\tclose(processedChan)\n\n\t// Wait for writer to finish\n\twgWriter.Wait()\n\n\t// Flush any remaining data in the writer buffer\n\twriter.Flush()\n\tif err := writer.Error(); err != nil {\n\t\tlog.Fatalf(\"Error flushing writer: %v\", err)\n\t}\n\n\tendTime := time.Now()\n\telapsed := endTime.Sub(startTime)\n\tfmt.Printf(\"Processing completed in %s.\\n\", elapsed)\n}\n\n// worker processes records: updates 'created_at' with a random timestamp\nfunc worker(wg *sync.WaitGroup, recordChan <-chan Record, processedChan chan<- ProcessedRecord, createdAtIdx int, expectedFields int) {\n\tdefer wg.Done()\n\n\t// Initialize a new rand.Rand instance with a unique seed to avoid race conditions\n\tr := rand.New(rand.NewSource(time.Now().UnixNano() + int64(rand.Intn(1000)))) // Adding randomness to the seed\n\n\t// Calculate time range for random timestamp\n\tnow := time.Now().UTC()\n\tstartTime := now.AddDate(-2, 0, 0).Unix()\n\tendTime := now.Unix()\n\n\t// Initialize local counters\n\tlocalCount := 0\n\n\tfor rec := range recordChan {\n\t\t// Validate field count\n\t\tif len(rec.Fields) != expectedFields {\n\t\t\tlog.Printf(\"Warning: Record at line %d has %d fields; expected %d. Skipping.\", rec.Line, len(rec.Fields), expectedFields)\n\t\t\tcontinue\n\t\t}\n\n\t\t// Parse and replace 'created_at' field\n\t\tnewTimestamp := randomTimestamp(r, startTime, endTime)\n\t\trec.Fields[createdAtIdx] = newTimestamp\n\n\t\t// Send the processed record to the writer\n\t\tprocessedChan <- ProcessedRecord{\n\t\t\tFields: rec.Fields,\n\t\t\tLine:   rec.Line,\n\t\t}\n\n\t\tlocalCount++\n\t\tif localCount%PROGRESS_INTERVAL == 0 {\n\t\t\tlog.Printf(\"[Worker %d] Processed %d records.\", getGID(), localCount)\n\t\t}\n\t}\n}\n\n// writerGoroutine writes processed records to the output CSV\nfunc writerGoroutine(wg *sync.WaitGroup, writer *csv.Writer, processedChan <-chan ProcessedRecord) {\n\tdefer wg.Done()\n\n\ttotalWritten := 0\n\n\tfor procRec := range processedChan {\n\t\tif err := writer.Write(procRec.Fields); err != nil {\n\t\t\tlog.Printf(\"Error writing record at line %d: %v\", procRec.Line, err)\n\t\t\tcontinue\n\t\t}\n\t\ttotalWritten++\n\n\t\t// Flush periodically to ensure data is written to disk\n\t\tif totalWritten%1000000 == 0 {\n\t\t\twriter.Flush()\n\t\t\tif err := writer.Error(); err != nil {\n\t\t\t\tlog.Printf(\"Error flushing writer at record %d: %v\", totalWritten, err)\n\t\t\t}\n\t\t\tlog.Printf(\"[Writer] Written %d records.\", totalWritten)\n\t\t}\n\t}\n\n\t// Final flush\n\twriter.Flush()\n\tif err := writer.Error(); err != nil {\n\t\tlog.Printf(\"Error during final flush: %v\", err)\n\t}\n\tlog.Printf(\"[Writer] Completed writing %d records.\", totalWritten)\n}\n\n// randomTimestamp generates a random ISO8601 timestamp between start and end Unix timestamps\nfunc randomTimestamp(r *rand.Rand, start, end int64) string {\n\trandSec := r.Int63n(end-start+1) + start\n\trandomTime := time.Unix(randSec, 0).UTC()\n\treturn randomTime.Format(time.RFC3339)\n}\n\n// getGID returns the goroutine ID for logging purposes\nfunc getGID() int {\n\t// WARNING: This uses a hack to get goroutine ID and is not recommended for production use\n\t// It's used here solely for logging purposes as per the user's request\n\tvar buf [64]byte\n\tn := runtime.Stack(buf[:], false)\n\tstack := strings.TrimPrefix(string(buf[:n]), \"goroutine \")\n\tidField := strings.Fields(stack)[0]\n\tid, err := strconv.Atoi(idField)\n\tif err != nil {\n\t\treturn 0\n\t}\n\treturn id\n}\n```\n\nNow you have the right data, but when you try to `COPY` it back, it goes very slowly and you realize that this is the worst possible thing you can do to a time-series database: filling it up with completely random data, time-wise. A good way to make this a lot less bad is to sort the whole thing ascending by date, making it optimal for importing.\n\nFor this, we'll ~~ask o1-mini to write another go program that will take several hours to complete haha who would do that the first time not me~~ use GNU sort:\n\n```bash\nLC_ALL=C tail -n +2 emojis_modified.csv | gsort -t',' -k5,5 --parallel=$(nproc) --buffer-size=16G --temporary-directory=/path/to/tmp > sorted_emojis_modified.csv\n```\n\nThis took a little over 49 minutes on my M1 Pro MBP, which is pretty damn good.\n\nAfter which importing was a breeze, somewhere in the ballpark of 20-25 minutes:\n\n```sql\nCOPY emojis_new FROM '/path/to/sorted_emojis_modified.csv' WITH (FORMAT CSV, DELIMITER ',', HEADER FALSE);\n```\n\nAnd there! Two years of test data, ready for benchmarking.",
    "createdAt": "2024-11-02T23:03:46.577Z",
    "visibility": "author"
  }
}