diff --git a/src/app.go b/src/app.go index f1ce77e..994511f 100644 --- a/src/app.go +++ b/src/app.go @@ -1,6 +1,9 @@ package main import ( + "bufio" + "bytes" + "compress/gzip" "context" "encoding/json" "fmt" @@ -75,6 +78,7 @@ func (c *Cache) Status() map[string]interface{} { } } +// fetchOnce downloads and writes it to destPath as-is. func fetchOnce(url, destPath string) (string, error) { client := &http.Client{ Timeout: 30 * time.Second, @@ -119,12 +123,162 @@ func fetchOnce(url, destPath string) (string, error) { contentType := resp.Header.Get("Content-Type") if contentType == "" { - contentType = "text/xml; charset=utf-8" + contentType = "application/octet-stream" } return contentType, nil } +// fetchGunzipOnce downloads a gzipped file +func fetchGunzipOnce(url, destPath, innerName string, transformer func(io.Reader, io.Writer) error) (string, error) { + client := &http.Client{ + Timeout: 320 * time.Second, + } + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return "", fmt.Errorf("creating request: %w", err) + } + + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("performing request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // Decompress source + zr, err := gzip.NewReader(resp.Body) + if err != nil { + return "", fmt.Errorf("creating gzip reader: %w", err) + } + defer zr.Close() + + tmpPath := destPath + ".tmp" + f, err := os.Create(tmpPath) + if err != nil { + return "", fmt.Errorf("creating temp file: %w", err) + } + + // Recompress to gz + gw := gzip.NewWriter(f) + if innerName != "" { + gw.Name = innerName + } + + var copyErr error + if transformer != nil { + copyErr = transformer(zr, gw) + } else { + _, copyErr = io.Copy(gw, zr) + } + closeErr1 := gw.Close() + closeErr2 := f.Close() + + if copyErr != nil { + _ = os.Remove(tmpPath) + return "", fmt.Errorf("copying/decompressing/compressing body: %w", copyErr) + } + if closeErr1 != nil { + _ = os.Remove(tmpPath) + return "", fmt.Errorf("closing gzip writer: %w", closeErr1) + } + if closeErr2 != nil { + _ = os.Remove(tmpPath) + return "", fmt.Errorf("closing temp file: %w", closeErr2) + } + + if err := os.Rename(tmpPath, destPath); err != nil { + _ = os.Remove(tmpPath) + return "", fmt.Errorf("renaming temp file: %w", err) + } + + contentType := "application/gzip" + return contentType, nil +} + +// transformChannelRefs rewrites channel identifiers for "*1" endpoints. +// - -> id="SBS6.nl" +// - -> channel="SBS6.nl" +func transformChannelRefs(r io.Reader, w io.Writer) error { + br := bufio.NewReader(r) + + for { + line, err := br.ReadBytes('\n') + if len(line) > 0 { + transformed := transformLineChannelRefs(line) + if _, werr := w.Write(transformed); werr != nil { + return werr + } + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } +} + +// transformLineChannelRefs applies the dot-removal +// and programme channel="...". +func transformLineChannelRefs(line []byte) []byte { + line = transformAttr(line, `channel id="`) + line = transformAttr(line, `channel="`) + return line +} + +// transformAttr removes all dots except the last one in an attribute value. +// Example: SBS.6.nl -> SBS6.nl +func transformAttr(line []byte, prefix string) []byte { + if !bytes.Contains(line, []byte(prefix)) { + return line + } + + var out []byte + start := 0 + + for { + idx := bytes.Index(line[start:], []byte(prefix)) + if idx == -1 { + out = append(out, line[start:]...) + break + } + idx += start + + out = append(out, line[start:idx+len(prefix)]...) + + rest := line[idx+len(prefix):] + end := bytes.IndexByte(rest, '"') + if end == -1 { + out = append(out, rest...) + break + } + + value := rest[:end] + + dotCount := bytes.Count(value, []byte(".")) + if dotCount >= 2 { + lastDot := bytes.LastIndex(value, []byte(".")) + before := value[:lastDot] + after := value[lastDot:] + + beforeNoDots := bytes.ReplaceAll(before, []byte("."), nil) + value = append(beforeNoDots, after...) + } + + out = append(out, value...) + out = append(out, '"') + + start = idx + len(prefix) + end + 1 + } + + return out +} + func startFetcher(ctx context.Context, cache *Cache, url string, okInterval, failInterval time.Duration) { go func() { for { @@ -150,6 +304,31 @@ func startFetcher(ctx context.Context, cache *Cache, url string, okInterval, fai }() } +func startFetcherGunzip(ctx context.Context, cache *Cache, url string, okInterval, failInterval time.Duration, innerName string, transformer func(io.Reader, io.Writer) error) { + go func() { + for { + contentType, err := fetchGunzipOnce(url, cache.filePath, innerName, transformer) + if err != nil { + log.Printf("error fetching (gunzip) %s: %v", url, err) + } + cache.Update(contentType, err) + + var nextInterval time.Duration + if err != nil || !cache.hasData { + nextInterval = failInterval + } else { + nextInterval = okInterval + } + + select { + case <-ctx.Done(): + return + case <-time.After(nextInterval): + } + } + }() +} + func parseEnvDurationHours(key string, defaultHours int) time.Duration { val := os.Getenv(key) if val == "" { @@ -163,7 +342,27 @@ func parseEnvDurationHours(key string, defaultHours int) time.Duration { return time.Duration(hours) * time.Hour } +func makeGzipHandler(cache *Cache, downloadName string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + path, contentType, ok := cache.GetContent() + if !ok { + http.Error(w, "Cache not yet initialized; no successful fetch so far", http.StatusServiceUnavailable) + return + } + + if contentType == "" { + contentType = "application/gzip" + } + + w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, downloadName)) + + http.ServeFile(w, r, path) + } +} + func main() { + // /chc still uses the original CACHE_URL behavior, for the set provider. url := os.Getenv("CACHE_URL") if url == "" { log.Fatal("CACHE_URL env var is required") @@ -176,22 +375,117 @@ func main() { if port == "" { port = "8080" } - - cacheFilePath := filepath.Join(os.TempDir(), "cache.xml") - cache := &Cache{ - filePath: cacheFilePath, - } + tmpDir := os.TempDir() + chcCacheFilePath := filepath.Join(tmpDir, "cache_chc.xml") + + // ES + es0CacheFilePath := filepath.Join(tmpDir, "cache_es0.xml.gz") + es1CacheFilePath := filepath.Join(tmpDir, "cache_es1.xml.gz") + + // BG + bg0CacheFilePath := filepath.Join(tmpDir, "cache_bg0.xml.gz") + bg1CacheFilePath := filepath.Join(tmpDir, "cache_bg1.xml.gz") + + // DE + de0CacheFilePath := filepath.Join(tmpDir, "cache_de0.xml.gz") + de1CacheFilePath := filepath.Join(tmpDir, "cache_de1.xml.gz") + + // FR + fr0CacheFilePath := filepath.Join(tmpDir, "cache_fr0.xml.gz") + fr1CacheFilePath := filepath.Join(tmpDir, "cache_fr1.xml.gz") + + // IT + it0CacheFilePath := filepath.Join(tmpDir, "cache_it0.xml.gz") + it1CacheFilePath := filepath.Join(tmpDir, "cache_it1.xml.gz") + + // NL + nl0CacheFilePath := filepath.Join(tmpDir, "cache_nl0.xml.gz") + nl1CacheFilePath := filepath.Join(tmpDir, "cache_nl1.xml.gz") + + // UK + uk0CacheFilePath := filepath.Join(tmpDir, "cache_uk0.xml.gz") + uk1CacheFilePath := filepath.Join(tmpDir, "cache_uk1.xml.gz") + + chcCache := &Cache{filePath: chcCacheFilePath} + + es0Cache := &Cache{filePath: es0CacheFilePath} + es1Cache := &Cache{filePath: es1CacheFilePath} + + bg0Cache := &Cache{filePath: bg0CacheFilePath} + bg1Cache := &Cache{filePath: bg1CacheFilePath} + + de0Cache := &Cache{filePath: de0CacheFilePath} + de1Cache := &Cache{filePath: de1CacheFilePath} + + fr0Cache := &Cache{filePath: fr0CacheFilePath} + fr1Cache := &Cache{filePath: fr1CacheFilePath} + + it0Cache := &Cache{filePath: it0CacheFilePath} + it1Cache := &Cache{filePath: it1CacheFilePath} + + nl0Cache := &Cache{filePath: nl0CacheFilePath} + nl1Cache := &Cache{filePath: nl1CacheFilePath} + + uk0Cache := &Cache{filePath: uk0CacheFilePath} + uk1Cache := &Cache{filePath: uk1CacheFilePath} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - startFetcher(ctx, cache, url, okInterval, failInterval) + // /chc: same behavior as the old root. + startFetcher(ctx, chcCache, url, okInterval, failInterval) + + // Base URLs for each region + const ( + esURL = "https://epgshare01.online/epgshare01/epg_ripper_ALL_SOURCES1.xml.gz" + bgURL = "https://epgshare01.online/epgshare01/epg_ripper_BG1.xml.gz" + deURL = "https://epgshare01.online/epgshare01/epg_ripper_DE1.xml.gz" + frURL = "https://epgshare01.online/epgshare01/epg_ripper_FR1.xml.gz" + itURL = "https://epgshare01.online/epgshare01/epg_ripper_IT1.xml.gz" + nlURL = "https://epgshare01.online/epgshare01/epg_ripper_NL1.xml.gz" + ukURL = "https://epgshare01.online/epgshare01/epg_ripper_UK1.xml.gz" + ) + + // ES: es0 (raw), es1 (undotted) + startFetcherGunzip(ctx, es0Cache, esURL, okInterval, failInterval, "es0.xml", nil) + startFetcherGunzip(ctx, es1Cache, esURL, okInterval, failInterval, "es1.xml", transformChannelRefs) + + // BG: bg0 (raw), bg1 (undotted) + startFetcherGunzip(ctx, bg0Cache, bgURL, okInterval, failInterval, "bg0.xml", nil) + startFetcherGunzip(ctx, bg1Cache, bgURL, okInterval, failInterval, "bg1.xml", transformChannelRefs) + + // DE: de0 (raw), de1 (undotted) + startFetcherGunzip(ctx, de0Cache, deURL, okInterval, failInterval, "de0.xml", nil) + startFetcherGunzip(ctx, de1Cache, deURL, okInterval, failInterval, "de1.xml", transformChannelRefs) + + // FR: fr0 (raw), fr1 (undotted) + startFetcherGunzip(ctx, fr0Cache, frURL, okInterval, failInterval, "fr0.xml", nil) + startFetcherGunzip(ctx, fr1Cache, frURL, okInterval, failInterval, "fr1.xml", transformChannelRefs) + + // IT: it0 (raw), it1 (undotted) + startFetcherGunzip(ctx, it0Cache, itURL, okInterval, failInterval, "it0.xml", nil) + startFetcherGunzip(ctx, it1Cache, itURL, okInterval, failInterval, "it1.xml", transformChannelRefs) + + // NL: nl0 (raw), nl1 (undotted) + startFetcherGunzip(ctx, nl0Cache, nlURL, okInterval, failInterval, "nl0.xml", nil) + startFetcherGunzip(ctx, nl1Cache, nlURL, okInterval, failInterval, "nl1.xml", transformChannelRefs) + + // UK: uk0 (raw), uk1 (undotted) + startFetcherGunzip(ctx, uk0Cache, ukURL, okInterval, failInterval, "uk0.xml", nil) + startFetcherGunzip(ctx, uk1Cache, ukURL, okInterval, failInterval, "uk1.xml", transformChannelRefs) mux := http.NewServeMux() + // Root: always 401. Against the sniffers grabbing bandwidth mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - path, contentType, ok := cache.GetContent() + w.Header().Set("WWW-Authenticate", `Basic realm="restricted"`) + http.Error(w, "unauthorized", http.StatusUnauthorized) + }) + + // /chc: original from CACHE_URL. + mux.HandleFunc("/chc", func(w http.ResponseWriter, r *http.Request) { + path, contentType, ok := chcCache.GetContent() if !ok { http.Error(w, "Cache not yet initialized; no successful fetch so far", http.StatusServiceUnavailable) return @@ -203,8 +497,60 @@ func main() { http.ServeFile(w, r, path) }) + // ES endpoints + mux.HandleFunc("/es0", makeGzipHandler(es0Cache, "es0.xml.gz")) + mux.HandleFunc("/es1", makeGzipHandler(es1Cache, "es1.xml.gz")) + + // BG endpoints + mux.HandleFunc("/bg0", makeGzipHandler(bg0Cache, "bg0.xml.gz")) + mux.HandleFunc("/bg1", makeGzipHandler(bg1Cache, "bg1.xml.gz")) + + // DE endpoints + mux.HandleFunc("/de0", makeGzipHandler(de0Cache, "de0.xml.gz")) + mux.HandleFunc("/de1", makeGzipHandler(de1Cache, "de1.xml.gz")) + + // FR endpoints + mux.HandleFunc("/fr0", makeGzipHandler(fr0Cache, "fr0.xml.gz")) + mux.HandleFunc("/fr1", makeGzipHandler(fr1Cache, "fr1.xml.gz")) + + // IT endpoints + mux.HandleFunc("/it0", makeGzipHandler(it0Cache, "it0.xml.gz")) + mux.HandleFunc("/it1", makeGzipHandler(it1Cache, "it1.xml.gz")) + + // NL endpoints + mux.HandleFunc("/nl0", makeGzipHandler(nl0Cache, "nl0.xml.gz")) + mux.HandleFunc("/nl1", makeGzipHandler(nl1Cache, "nl1.xml.gz")) + + // UK endpoints + mux.HandleFunc("/uk0", makeGzipHandler(uk0Cache, "uk0.xml.gz")) + mux.HandleFunc("/uk1", makeGzipHandler(uk1Cache, "uk1.xml.gz")) + + // Combined status endpoint. mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { - status := cache.Status() + status := map[string]interface{}{ + "chc": chcCache.Status(), + + "es0": es0Cache.Status(), + "es1": es1Cache.Status(), + + "bg0": bg0Cache.Status(), + "bg1": bg1Cache.Status(), + + "de0": de0Cache.Status(), + "de1": de1Cache.Status(), + + "fr0": fr0Cache.Status(), + "fr1": fr1Cache.Status(), + + "it0": it0Cache.Status(), + "it1": it1Cache.Status(), + + "nl0": nl0Cache.Status(), + "nl1": nl1Cache.Status(), + + "uk0": uk0Cache.Status(), + "uk1": uk1Cache.Status(), + } w.Header().Set("Content-Type", "application/json; charset=utf-8") enc := json.NewEncoder(w) @@ -216,7 +562,9 @@ func main() { }) addr := ":" + port - log.Printf("Starting server on %s, caching %s to %s (ok interval: %s, fail interval: %s)", addr, url, cacheFilePath, okInterval, failInterval) + log.Printf("Starting server on %s\n /chc -> %s\n EPG base URLs: ES:%s BG:%s DE:%s FR:%s IT:%s NL:%s UK:%s\n (ok interval: %s, fail interval: %s)", + addr, url, esURL, bgURL, deURL, frURL, itURL, nlURL, ukURL, okInterval, failInterval) + if err := http.ListenAndServe(addr, mux); err != nil { log.Fatalf("server error: %v", err) }