upload
This commit is contained in:
32
Dockerfile
Normal file
32
Dockerfile
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
# Stage 1: Build the Go application
|
||||||
|
FROM golang:1.22.3 AS builder
|
||||||
|
|
||||||
|
# Set the working directory in the container
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy the Go source code to the container
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Download necessary Go modules
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
# Build the application with CGO disabled for a static binary
|
||||||
|
RUN CGO_ENABLED=0 GOOS=linux go build -o airbyte-clickhouse-normer .
|
||||||
|
|
||||||
|
# Stage 2: Create a lightweight image with the Go binary
|
||||||
|
FROM alpine:latest
|
||||||
|
|
||||||
|
# Set the working directory in the container
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy the binary from the builder stage
|
||||||
|
COPY --from=builder /app/airbyte-clickhouse-normer .
|
||||||
|
|
||||||
|
# Ensure binary has execute permissions
|
||||||
|
RUN chmod +x airbyte-clickhouse-normer
|
||||||
|
|
||||||
|
# Expose the port the app runs on (replace 8080 with your port)
|
||||||
|
EXPOSE 8080
|
||||||
|
|
||||||
|
# Set the command to run the application
|
||||||
|
CMD ["./airbyte-clickhouse-normer"]
|
22
README.md
22
README.md
@@ -1,3 +1,25 @@
|
|||||||
# clickhouse-airbyte-normer
|
# clickhouse-airbyte-normer
|
||||||
|
|
||||||
GoProg to move data from raw tables to normal tables #hack
|
GoProg to move data from raw tables to normal tables #hack
|
||||||
|
|
||||||
|
A quick hack to get the Airbyte raw tables to normal tables so my collegueas can query the data better in clickhouse.
|
||||||
|
|
||||||
|
env variables for docker build:
|
||||||
|
|
||||||
|
```
|
||||||
|
chHost := os.Getenv("CH_HOST")
|
||||||
|
chPort := os.Getenv("CH_PORT")
|
||||||
|
chUser := os.Getenv("CH_USER")
|
||||||
|
chPass := os.Getenv("CH_PASS")
|
||||||
|
rawDB := os.Getenv("RAW_DB")
|
||||||
|
```
|
||||||
|
|
||||||
|
flags
|
||||||
|
|
||||||
|
```
|
||||||
|
forceOnce := flag.Bool("force-once", false, "Run the migration immediately without waiting for a webhook")
|
||||||
|
debug := flag.Bool("debug", false, "Enable debug logging")
|
||||||
|
dropRawFlag := flag.Bool("drop-raw", false, "Drop raw tables after migration (default: keep raw tables)")
|
||||||
|
```
|
||||||
|
|
||||||
|
It's not production ready but feel free to use as a quick hack to at least make the data more usable.
|
23
go.mod
Normal file
23
go.mod
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
module airbyte-clickhouse-normer
|
||||||
|
|
||||||
|
go 1.22.3
|
||||||
|
|
||||||
|
require github.com/ClickHouse/clickhouse-go/v2 v2.32.1
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/ClickHouse/ch-go v0.65.0 // indirect
|
||||||
|
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||||
|
github.com/go-faster/city v1.0.1 // indirect
|
||||||
|
github.com/go-faster/errors v0.7.1 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
github.com/klauspost/compress v1.17.11 // indirect
|
||||||
|
github.com/paulmach/orb v0.11.1 // indirect
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||||
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
github.com/segmentio/asm v1.2.0 // indirect
|
||||||
|
github.com/shopspring/decimal v1.4.0 // indirect
|
||||||
|
go.opentelemetry.io/otel v1.34.0 // indirect
|
||||||
|
go.opentelemetry.io/otel/trace v1.34.0 // indirect
|
||||||
|
golang.org/x/sys v0.30.0 // indirect
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
|
)
|
109
go.sum
Normal file
109
go.sum
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
github.com/ClickHouse/ch-go v0.65.0 h1:vZAXfTQliuNNefqkPDewX3kgRxN6Q4vUENnnY+ynTRY=
|
||||||
|
github.com/ClickHouse/ch-go v0.65.0/go.mod h1:tCM0XEH5oWngoi9Iu/8+tjPBo04I/FxNIffpdjtwx3k=
|
||||||
|
github.com/ClickHouse/clickhouse-go/v2 v2.32.1 h1:RLhkxA6iH/bLTXeDtEj/u4yUx9Q03Y95P+cjHScQK78=
|
||||||
|
github.com/ClickHouse/clickhouse-go/v2 v2.32.1/go.mod h1:YtaiIFlHCGNPbOpAvFGYobtcVnmgYvD/WmzitixxWYc=
|
||||||
|
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||||
|
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
|
||||||
|
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
|
||||||
|
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
|
||||||
|
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
|
||||||
|
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||||
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
|
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
|
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
|
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
|
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||||
|
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||||
|
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||||
|
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||||
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
|
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
|
||||||
|
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
|
||||||
|
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
|
||||||
|
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||||
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
|
||||||
|
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
|
||||||
|
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
|
||||||
|
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||||
|
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
||||||
|
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
|
||||||
|
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
|
||||||
|
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
|
||||||
|
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
|
||||||
|
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
|
||||||
|
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
|
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
|
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
||||||
|
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
|
||||||
|
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
|
||||||
|
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
||||||
|
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||||
|
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
|
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||||
|
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
|
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
|
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||||
|
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||||
|
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||||
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
|
||||||
|
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
|
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||||
|
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||||
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
330
main.go
Normal file
330
main.go
Normal file
@@ -0,0 +1,330 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var dropRaw bool // global variable set via flag
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Define and parse command-line flags.
|
||||||
|
forceOnce := flag.Bool("force-once", false, "Run the migration immediately without waiting for a webhook")
|
||||||
|
debug := flag.Bool("debug", false, "Enable debug logging")
|
||||||
|
dropRawFlag := flag.Bool("drop-raw", false, "Drop raw tables after migration (default: keep raw tables)")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// Set global dropRaw value.
|
||||||
|
dropRaw = *dropRawFlag
|
||||||
|
|
||||||
|
// If debug is not enabled, disable logging output.
|
||||||
|
if !*debug {
|
||||||
|
log.SetOutput(io.Discard)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read required environment variables.
|
||||||
|
chHost := os.Getenv("CH_HOST")
|
||||||
|
chPort := os.Getenv("CH_PORT")
|
||||||
|
chUser := os.Getenv("CH_USER")
|
||||||
|
chPass := os.Getenv("CH_PASS")
|
||||||
|
rawDB := os.Getenv("RAW_DB") // e.g. "airbyte_data_raw"
|
||||||
|
|
||||||
|
if chHost == "" || chPort == "" || chUser == "" || rawDB == "" {
|
||||||
|
log.Fatal("Missing required environment variables (CH_HOST, CH_PORT, CH_USER, RAW_DB)")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Derive the destination database by removing "_raw" from the raw database name.
|
||||||
|
destDB := strings.Replace(rawDB, "_raw", "", -1)
|
||||||
|
|
||||||
|
// Create a ClickHouse connection.
|
||||||
|
ctx := context.Background()
|
||||||
|
conn, err := clickhouse.Open(&clickhouse.Options{
|
||||||
|
Addr: []string{fmt.Sprintf("%s:%s", chHost, chPort)},
|
||||||
|
Auth: clickhouse.Auth{
|
||||||
|
Database: rawDB, // initial connection database; fully qualified names are used below.
|
||||||
|
Username: chUser,
|
||||||
|
Password: chPass,
|
||||||
|
},
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
Debug: *debug, // enable driver debug logging if -debug is set
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to ClickHouse: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the destination database exists.
|
||||||
|
err = conn.Exec(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", destDB))
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create destination database: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the force-once flag is set, run the migration immediately and exit.
|
||||||
|
if *forceOnce {
|
||||||
|
log.Println("Force migration triggered...")
|
||||||
|
if err := migrateData(ctx, conn, rawDB, destDB); err != nil {
|
||||||
|
log.Fatalf("Migration failed: %v", err)
|
||||||
|
}
|
||||||
|
log.Println("Migration completed successfully, exiting.")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, set up an HTTP server to listen for webhook POSTs.
|
||||||
|
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := migrateData(ctx, conn, rawDB, destDB); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte("Migration successful"))
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Println("Listening on :8080")
|
||||||
|
log.Fatal(http.ListenAndServe(":8080", nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateData processes all tables in rawDB that have "_raw" in their name.
|
||||||
|
// It reads the JSON data from _airbyte_data, infers a schema, drops the corresponding
|
||||||
|
// target table if it exists, creates a fresh target table in destDB, inserts the data.
|
||||||
|
// Finally, if the global dropRaw flag is true, it drops the raw table; otherwise it leaves it intact.
|
||||||
|
func migrateData(ctx context.Context, conn clickhouse.Conn, rawDB, destDB string) error {
|
||||||
|
// List tables in rawDB.
|
||||||
|
query := "SELECT name FROM system.tables WHERE database = ?"
|
||||||
|
rows, err := conn.Query(ctx, query, rawDB)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("query system.tables failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rawTables []string
|
||||||
|
for rows.Next() {
|
||||||
|
var name string
|
||||||
|
if err := rows.Scan(&name); err != nil {
|
||||||
|
return fmt.Errorf("failed to scan table name: %w", err)
|
||||||
|
}
|
||||||
|
// Process any table that has "_raw" in its name.
|
||||||
|
if strings.Contains(name, "_raw") {
|
||||||
|
rawTables = append(rawTables, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return fmt.Errorf("error iterating over system.tables: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process each raw table.
|
||||||
|
for _, rawTable := range rawTables {
|
||||||
|
// Derive target table name.
|
||||||
|
var targetTable string
|
||||||
|
if strings.HasSuffix(rawTable, "_raw") {
|
||||||
|
targetTable = strings.TrimSuffix(rawTable, "_raw")
|
||||||
|
} else {
|
||||||
|
targetTable = strings.Replace(rawTable, "_raw", "", -1)
|
||||||
|
}
|
||||||
|
log.Printf("Processing raw table %q -> target table %q", rawTable, targetTable)
|
||||||
|
|
||||||
|
// Query the _airbyte_data column from the raw table.
|
||||||
|
fullRawTable := fmt.Sprintf("%s.%s", rawDB, rawTable)
|
||||||
|
q := fmt.Sprintf("SELECT _airbyte_data FROM %s", fullRawTable)
|
||||||
|
dataRows, err := conn.Query(ctx, q)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to query table %s: %v", rawTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode each row's JSON data into a map.
|
||||||
|
var jsonRows []map[string]interface{}
|
||||||
|
for dataRows.Next() {
|
||||||
|
var jsonData string
|
||||||
|
if err := dataRows.Scan(&jsonData); err != nil {
|
||||||
|
log.Printf("Failed to scan JSON data in table %s: %v", rawTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Trim BOM if present.
|
||||||
|
jsonData = strings.TrimPrefix(jsonData, "\uFEFF")
|
||||||
|
var m map[string]interface{}
|
||||||
|
if err := json.Unmarshal([]byte(jsonData), &m); err != nil {
|
||||||
|
log.Printf("Failed to unmarshal JSON in table %s: %v", rawTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
jsonRows = append(jsonRows, m)
|
||||||
|
}
|
||||||
|
if err := dataRows.Err(); err != nil {
|
||||||
|
log.Printf("Error iterating rows in table %s: %v", rawTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(jsonRows) == 0 {
|
||||||
|
log.Printf("No valid JSON data found in table %s, skipping", rawTable)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop the target (non-raw) table if it exists.
|
||||||
|
dropTargetQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", destDB, targetTable)
|
||||||
|
if err := conn.Exec(ctx, dropTargetQuery); err != nil {
|
||||||
|
log.Printf("Failed to drop target table %s: %v", targetTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("Dropped target table %q (if existed)", targetTable)
|
||||||
|
|
||||||
|
// Infer the schema from the JSON rows.
|
||||||
|
schema := inferSchema(jsonRows)
|
||||||
|
|
||||||
|
// Create a fresh target table.
|
||||||
|
if err := createTargetTable(ctx, conn, destDB, targetTable, schema); err != nil {
|
||||||
|
log.Printf("Failed to create target table %s: %v", targetTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the JSON data into the target table.
|
||||||
|
if err := insertData(ctx, conn, destDB, targetTable, schema, jsonRows); err != nil {
|
||||||
|
log.Printf("Failed to insert data into target table %s: %v", targetTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop the raw table only if dropRaw is set; otherwise leave it.
|
||||||
|
if dropRaw {
|
||||||
|
dropRawQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", rawDB, rawTable)
|
||||||
|
if err := conn.Exec(ctx, dropRawQuery); err != nil {
|
||||||
|
log.Printf("Failed to drop raw table %s: %v", rawTable, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("Successfully processed and dropped raw table %s", rawTable)
|
||||||
|
} else {
|
||||||
|
log.Printf("Raw table %s retained (as per configuration)", rawTable)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// inferSchema examines the JSON rows and returns a map of column names to ClickHouse data types.
|
||||||
|
func inferSchema(rows []map[string]interface{}) map[string]string {
|
||||||
|
schema := make(map[string]string)
|
||||||
|
for _, row := range rows {
|
||||||
|
for key, value := range row {
|
||||||
|
if _, exists := schema[key]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
schema[key] = inferType(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return schema
|
||||||
|
}
|
||||||
|
|
||||||
|
// inferType returns a ClickHouse column type based on the Go type of the value.
|
||||||
|
func inferType(v interface{}) string {
|
||||||
|
if v == nil {
|
||||||
|
return "String"
|
||||||
|
}
|
||||||
|
switch v.(type) {
|
||||||
|
case bool:
|
||||||
|
return "UInt8"
|
||||||
|
case float64:
|
||||||
|
f := v.(float64)
|
||||||
|
if f == float64(int64(f)) {
|
||||||
|
return "UInt64"
|
||||||
|
}
|
||||||
|
return "Float64"
|
||||||
|
case string:
|
||||||
|
return "String"
|
||||||
|
default:
|
||||||
|
return "String"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createTargetTable constructs and executes a CREATE TABLE statement in the destination database.
|
||||||
|
// Here we use MergeTree so that all rows are preserved.
|
||||||
|
func createTargetTable(ctx context.Context, conn clickhouse.Conn, db, table string, schema map[string]string) error {
|
||||||
|
var cols []string
|
||||||
|
var keys []string
|
||||||
|
for key := range schema {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
for _, key := range keys {
|
||||||
|
cols = append(cols, fmt.Sprintf("`%s` %s", key, schema[key]))
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(
|
||||||
|
"CREATE TABLE IF NOT EXISTS %s.%s (%s) ENGINE = MergeTree() ORDER BY tuple()",
|
||||||
|
db, table, strings.Join(cols, ", "),
|
||||||
|
)
|
||||||
|
return conn.Exec(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultForType returns a default value for a given ClickHouse column type.
|
||||||
|
func defaultForType(typ string) interface{} {
|
||||||
|
switch typ {
|
||||||
|
case "UInt8", "UInt64":
|
||||||
|
return 0
|
||||||
|
case "Float64":
|
||||||
|
return 0.0
|
||||||
|
default:
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// insertData performs a batch insert of JSON rows into the target table.
|
||||||
|
// It builds a list of quoted column names and converts non-basic values to JSON strings.
|
||||||
|
func insertData(ctx context.Context, conn clickhouse.Conn, db, table string, schema map[string]string, rows []map[string]interface{}) error {
|
||||||
|
var keys []string
|
||||||
|
for key := range schema {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
|
||||||
|
// Quote the column names for the INSERT query.
|
||||||
|
var quotedKeys []string
|
||||||
|
for _, key := range keys {
|
||||||
|
quotedKeys = append(quotedKeys, fmt.Sprintf("`%s`", key))
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES", db, table, strings.Join(quotedKeys, ", "))
|
||||||
|
batch, err := conn.PrepareBatch(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to prepare batch: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process each row.
|
||||||
|
for _, row := range rows {
|
||||||
|
values := make([]interface{}, len(keys))
|
||||||
|
for i, key := range keys {
|
||||||
|
if val, ok := row[key]; ok {
|
||||||
|
// If the value is not a basic type, marshal it to JSON.
|
||||||
|
switch v := val.(type) {
|
||||||
|
case string, bool, float64:
|
||||||
|
values[i] = v
|
||||||
|
default:
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
values[i] = ""
|
||||||
|
} else {
|
||||||
|
values[i] = string(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
values[i] = defaultForType(schema[key])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := batch.Append(values...); err != nil {
|
||||||
|
return fmt.Errorf("failed to append batch: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := batch.Send(); err != nil {
|
||||||
|
return fmt.Errorf("failed to send batch: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
Reference in New Issue
Block a user