package main import ( "context" "encoding/json" "flag" "fmt" "io" "log" "net/http" "os" "sort" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" ) var dropRaw bool // global variable set via flag func main() { // Define and parse command-line flags. forceOnce := flag.Bool("force-once", false, "Run the migration immediately without waiting for a webhook") debug := flag.Bool("debug", false, "Enable debug logging") dropRawFlag := flag.Bool("drop-raw", false, "Drop raw tables after migration (default: keep raw tables)") flag.Parse() // Set global dropRaw value. dropRaw = *dropRawFlag // If debug is not enabled, disable logging output. if !*debug { log.SetOutput(io.Discard) } // Read required environment variables. chHost := os.Getenv("CH_HOST") chPort := os.Getenv("CH_PORT") chUser := os.Getenv("CH_USER") chPass := os.Getenv("CH_PASS") rawDB := os.Getenv("RAW_DB") // e.g. "airbyte_data_raw" if chHost == "" || chPort == "" || chUser == "" || rawDB == "" { log.Fatal("Missing required environment variables (CH_HOST, CH_PORT, CH_USER, RAW_DB)") } // Derive the destination database by removing "_raw" from the raw database name. destDB := strings.Replace(rawDB, "_raw", "", -1) // Create a ClickHouse connection. ctx := context.Background() conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%s", chHost, chPort)}, Auth: clickhouse.Auth{ Database: rawDB, // initial connection database; fully qualified names are used below. Username: chUser, Password: chPass, }, DialTimeout: 5 * time.Second, Debug: *debug, // enable driver debug logging if -debug is set }) if err != nil { log.Fatalf("Failed to connect to ClickHouse: %v", err) } // Ensure the destination database exists. err = conn.Exec(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", destDB)) if err != nil { log.Fatalf("Failed to create destination database: %v", err) } // If the force-once flag is set, run the migration immediately and exit. if *forceOnce { log.Println("Force migration triggered...") if err := migrateData(ctx, conn, rawDB, destDB); err != nil { log.Fatalf("Migration failed: %v", err) } log.Println("Migration completed successfully, exiting.") os.Exit(0) } // Otherwise, set up an HTTP server to listen for webhook POSTs. http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } if err := migrateData(ctx, conn, rawDB, destDB); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write([]byte("Migration successful")) }) log.Println("Listening on :8080") log.Fatal(http.ListenAndServe(":8080", nil)) } // migrateData processes all tables in rawDB that have "_raw" in their name. // It reads the JSON data from _airbyte_data, infers a schema, drops the corresponding // target table if it exists, creates a fresh target table in destDB, inserts the data. // Finally, if the global dropRaw flag is true, it drops the raw table; otherwise it leaves it intact. func migrateData(ctx context.Context, conn clickhouse.Conn, rawDB, destDB string) error { // List tables in rawDB. query := "SELECT name FROM system.tables WHERE database = ?" rows, err := conn.Query(ctx, query, rawDB) if err != nil { return fmt.Errorf("query system.tables failed: %w", err) } var rawTables []string for rows.Next() { var name string if err := rows.Scan(&name); err != nil { return fmt.Errorf("failed to scan table name: %w", err) } // Process any table that has "_raw" in its name. if strings.Contains(name, "_raw") { rawTables = append(rawTables, name) } } if err := rows.Err(); err != nil { return fmt.Errorf("error iterating over system.tables: %w", err) } // Process each raw table. for _, rawTable := range rawTables { // Derive target table name. var targetTable string if strings.HasSuffix(rawTable, "_raw") { targetTable = strings.TrimSuffix(rawTable, "_raw") } else { targetTable = strings.Replace(rawTable, "_raw", "", -1) } log.Printf("Processing raw table %q -> target table %q", rawTable, targetTable) // Query the _airbyte_data column from the raw table. fullRawTable := fmt.Sprintf("%s.%s", rawDB, rawTable) q := fmt.Sprintf("SELECT _airbyte_data FROM %s", fullRawTable) dataRows, err := conn.Query(ctx, q) if err != nil { log.Printf("Failed to query table %s: %v", rawTable, err) continue } // Decode each row's JSON data into a map. var jsonRows []map[string]interface{} for dataRows.Next() { var jsonData string if err := dataRows.Scan(&jsonData); err != nil { log.Printf("Failed to scan JSON data in table %s: %v", rawTable, err) continue } // Trim BOM if present. jsonData = strings.TrimPrefix(jsonData, "\uFEFF") var m map[string]interface{} if err := json.Unmarshal([]byte(jsonData), &m); err != nil { log.Printf("Failed to unmarshal JSON in table %s: %v", rawTable, err) continue } jsonRows = append(jsonRows, m) } if err := dataRows.Err(); err != nil { log.Printf("Error iterating rows in table %s: %v", rawTable, err) continue } if len(jsonRows) == 0 { log.Printf("No valid JSON data found in table %s, skipping", rawTable) continue } // Drop the target (non-raw) table if it exists. dropTargetQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", destDB, targetTable) if err := conn.Exec(ctx, dropTargetQuery); err != nil { log.Printf("Failed to drop target table %s: %v", targetTable, err) continue } log.Printf("Dropped target table %q (if existed)", targetTable) // Infer the schema from the JSON rows. schema := inferSchema(jsonRows) // Create a fresh target table. if err := createTargetTable(ctx, conn, destDB, targetTable, schema); err != nil { log.Printf("Failed to create target table %s: %v", targetTable, err) continue } // Insert the JSON data into the target table. if err := insertData(ctx, conn, destDB, targetTable, schema, jsonRows); err != nil { log.Printf("Failed to insert data into target table %s: %v", targetTable, err) continue } // Drop the raw table only if dropRaw is set; otherwise leave it. if dropRaw { dropRawQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", rawDB, rawTable) if err := conn.Exec(ctx, dropRawQuery); err != nil { log.Printf("Failed to drop raw table %s: %v", rawTable, err) continue } log.Printf("Successfully processed and dropped raw table %s", rawTable) } else { log.Printf("Raw table %s retained (as per configuration)", rawTable) } } return nil } // inferSchema examines the JSON rows and returns a map of column names to ClickHouse data types. func inferSchema(rows []map[string]interface{}) map[string]string { schema := make(map[string]string) for _, row := range rows { for key, value := range row { if _, exists := schema[key]; exists { continue } schema[key] = inferType(value) } } return schema } // inferType returns a ClickHouse column type based on the Go type of the value. func inferType(v interface{}) string { if v == nil { return "String" } switch v.(type) { case bool: return "UInt8" case float64: f := v.(float64) if f == float64(int64(f)) { return "UInt64" } return "Float64" case string: return "String" default: return "String" } } // createTargetTable constructs and executes a CREATE TABLE statement in the destination database. // Here we use MergeTree so that all rows are preserved. func createTargetTable(ctx context.Context, conn clickhouse.Conn, db, table string, schema map[string]string) error { var cols []string var keys []string for key := range schema { keys = append(keys, key) } sort.Strings(keys) for _, key := range keys { cols = append(cols, fmt.Sprintf("`%s` %s", key, schema[key])) } query := fmt.Sprintf( "CREATE TABLE IF NOT EXISTS %s.%s (%s) ENGINE = MergeTree() ORDER BY tuple()", db, table, strings.Join(cols, ", "), ) return conn.Exec(ctx, query) } // defaultForType returns a default value for a given ClickHouse column type. func defaultForType(typ string) interface{} { switch typ { case "UInt8", "UInt64": return 0 case "Float64": return 0.0 default: return "" } } // insertData performs a batch insert of JSON rows into the target table. // It builds a list of quoted column names and converts non-basic values to JSON strings. func insertData(ctx context.Context, conn clickhouse.Conn, db, table string, schema map[string]string, rows []map[string]interface{}) error { var keys []string for key := range schema { keys = append(keys, key) } sort.Strings(keys) // Quote the column names for the INSERT query. var quotedKeys []string for _, key := range keys { quotedKeys = append(quotedKeys, fmt.Sprintf("`%s`", key)) } query := fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES", db, table, strings.Join(quotedKeys, ", ")) batch, err := conn.PrepareBatch(ctx, query) if err != nil { return fmt.Errorf("failed to prepare batch: %w", err) } // Process each row. for _, row := range rows { values := make([]interface{}, len(keys)) for i, key := range keys { if val, ok := row[key]; ok { // If the value is not a basic type, marshal it to JSON. switch v := val.(type) { case string, bool, float64: values[i] = v default: b, err := json.Marshal(v) if err != nil { values[i] = "" } else { values[i] = string(b) } } } else { values[i] = defaultForType(schema[key]) } } if err := batch.Append(values...); err != nil { return fmt.Errorf("failed to append batch: %w", err) } } if err := batch.Send(); err != nil { return fmt.Errorf("failed to send batch: %w", err) } return nil }