curcuma

ref: master

./watcher.go


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package curcuma

import (
	"context"
	"fmt"
	"log"
	"time"
	"mime"
	"strings"
	"path"

	"github.com/radovskyb/watcher"

	"github.com/minio/minio-go/v7"
	"github.com/vaughan0/go-ini"
)

type Watcher struct {
	MinioClient *minio.Client
	Config ini.File
}

func NewWatcher(mc *minio.Client, config ini.File) (*Watcher) {
	return &Watcher{
		MinioClient: mc,
		Config: config,
	}
}

func (w *Watcher) Observe() {
	dir, _ := w.Config.Get("fs", "path")
	watchmen := watcher.New()
	watchmen.SetMaxEvents(1)

	watchmen.FilterOps(watcher.Write, watcher.Create)

	go func() {
		for {
			select {
			case event := <-watchmen.Event:
				fmt.Println(event) // Print the event's info.
				if !event.FileInfo.IsDir() {
					s3Path := strings.Replace(event.Path, dir, "", 1)

					go upload(w, event.Path, s3Path)
				}
			case err := <-watchmen.Error:
				log.Fatalln(err)
			case <-watchmen.Closed:
				return
			}
		}
	}()

	if err := watchmen.AddRecursive(dir); err != nil {
		log.Fatalln(err)
	}

	// TODO: Make this configurable
	if err := watchmen.Start(time.Millisecond * 100); err != nil {
		log.Fatalln(err)
	}
}

func upload(w *Watcher, filePath string, s3Path string) {
	ctx := context.Background()
	bucketName, _ := w.Config.Get("s3", "bucket")

	if strings.HasPrefix(s3Path, "/") {
		s3Path = s3Path[1:]
	}

	contentType := mime.TypeByExtension(path.Ext(s3Path))

	info, err := w.MinioClient.FPutObject(ctx, bucketName, s3Path,
		filePath, minio.PutObjectOptions{ContentType: contentType},
	)

	if err != nil {
		log.Fatalln(err)
	}

	log.Printf("Successfully uploaded file: %s %s", s3Path, info.Size)
}