diff --git a/README.md b/README.md index 8ecd4cb..52fe6c1 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,11 @@ Some skilled engineers even have a blog site where they push some gold content, # Roadmap -- Given a list of URLs and a user email, scrape those URLs and associate them with the email -- Save the results of the web scrape into a MongoDB to create the comparison logic -- Create a scheduler mechanism that will activate the scrape and comparison logic -- Trigger an email mechanism to send messages to the users when they favorite author publish something new 🏖️ +This program aims to create the following features: + +- Given a list of websites, that are located in a MongoDB collection, scrape the content of each website and save it in another MongoDB collection. +- After the scraping, calculate the similarity between the new content and the previous content of each website, and update the MongoDB collection +with this information. +- All the registered users will receive an email according to the URL that they have registered notifying them about news in their favorite engineers websites. + +Obs: All these flows will be trigerred by a cron job. diff --git a/cmd/newsletter/main.go b/cmd/newsletter/main.go index 42b23cf..d7bdd5f 100644 --- a/cmd/newsletter/main.go +++ b/cmd/newsletter/main.go @@ -7,7 +7,6 @@ import ( "log/slog" "os" "os/signal" - "sync" "syscall" "time" @@ -17,21 +16,19 @@ import ( // Config is the struct that contains the configuration for the service. type Config struct { - LogLevel string - LogType string - LoopDurationMinutes time.Duration - Mongo mongodb.Config + LogLevel string + LogType string + Mongo mongodb.Config } func main() { cfg := Config{ - LogLevel: getEnvWithDefault("LOG_LEVEL", "INFO"), - LogType: getEnvWithDefault("LOG_TYPE", "json"), + LogLevel: getEnvWithDefault("LOG_LEVEL", ""), + LogType: getEnvWithDefault("LOG_TYPE", ""), Mongo: mongodb.Config{ URI: getEnvWithDefault("NL_MONGO_URI", ""), }, - LoopDurationMinutes: time.Duration(10) * time.Second, } signalCh := make(chan os.Signal, 1) @@ -74,49 +71,10 @@ func main() { signalCh <- syscall.SIGTERM } - URLCh := make(chan string) - fetchResultCh := make(chan string) - - var wg sync.WaitGroup - wg.Add(5) - - for i := 0; i < 5; i++ { - go newsletter.Worker(&wg, URLCh, fetchResultCh, newsletter.Fetch) - } - - go func() { - defer close(URLCh) - for range time.Tick(cfg.LoopDurationMinutes) { - slog.Info("fetching engineers") - gotURLs, err := storage.DistinctEngineerURLs(ctx) - if err != nil { - slog.Error("error getting engineers", "error", err) - signalCh <- syscall.SIGTERM - } - - slog.Info("fetched engineers", "engineers", len(gotURLs)) - for _, url := range gotURLs { - URLCh <- url.(string) - } - } - }() - - go func() { - wg.Wait() - defer close(fetchResultCh) - }() + crawler := newsletter.NewCrawler(5, time.Duration(10)*time.Second, signalCh) go func() { - for v := range fetchResultCh { - slog.Info("saving fetched sites response", "response", v[:10]) - err := storage.SaveSite(ctx, []mongodb.Site{ - {Content: v, ScrapeDatetime: time.Now().UTC()}, - }) - if err != nil { - slog.Error("error saving site result", "error", err) - signalCh <- syscall.SIGTERM - } - } + crawler.Run(ctx, storage, newsletter.Fetch) }() <-signalCh diff --git a/cmd/newsletter/newsletter b/cmd/newsletter/newsletter index 7f83f83..6c984e4 100755 Binary files a/cmd/newsletter/newsletter and b/cmd/newsletter/newsletter differ diff --git a/docker-compose.yml b/docker-compose.yml index 961a3a9..3f8e09d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,8 @@ services: GOLANGCI_LINT_VERSION: $GOLANGCI_LINT_VERSION environment: NL_MONGO_URI: "mongodb://root:root@mongodb:27017" + LOG_LEVEL: "DEBUG" + LOG_TYPE: "json" depends_on: - mongodb volumes: diff --git a/scrape.go b/scrape.go index 568c407..1b4b765 100644 --- a/scrape.go +++ b/scrape.go @@ -3,12 +3,112 @@ package newsletter import ( "bytes" + "context" "fmt" "log/slog" "net/http" + "os" "sync" + "syscall" + "time" + + "github.com/perebaj/newsletter/mongodb" ) +// PageContent is the struct that gather important information of a website +type PageContent struct { + Content string + URL string +} + +// Storage is the interface that wraps the basic methods to save and get data from the database +type Storage interface { + SaveSite(ctx context.Context, site []mongodb.Site) error + DistinctEngineerURLs(ctx context.Context) ([]interface{}, error) +} + +// Crawler contains the necessary information to run the crawler +type Crawler struct { + URLch chan string + resultCh chan PageContent + signalCh chan os.Signal + MaxJobs int + wg *sync.WaitGroup + // scheduler is the pace time between each fetch + scheduler time.Duration +} + +// NewCrawler initializes a new Crawler +func NewCrawler(maxJobs int, s time.Duration, signalCh chan os.Signal) *Crawler { + return &Crawler{ + URLch: make(chan string), + resultCh: make(chan PageContent), + signalCh: signalCh, + wg: &sync.WaitGroup{}, + MaxJobs: maxJobs, + scheduler: s, + } +} + +// Run starts the crawler, where s represents the storage and f the function to fetch the content of a website +func (c *Crawler) Run(ctx context.Context, s Storage, f func(string) (string, error)) { + c.wg.Add(c.MaxJobs) + for i := 0; i < c.MaxJobs; i++ { + go c.Worker(f) + } + + go func() { + defer close(c.URLch) + for range time.Tick(c.scheduler) { + slog.Debug("fetching engineers") + gotURLs, err := s.DistinctEngineerURLs(ctx) + if err != nil { + slog.Error("error getting engineers", "error", err) + c.signalCh <- syscall.SIGTERM + } + + slog.Debug("fetched engineers", "engineers", len(gotURLs)) + for _, url := range gotURLs { + c.URLch <- url.(string) + } + } + }() + + go func() { + c.wg.Wait() + defer close(c.resultCh) + }() + + go func() { + for v := range c.resultCh { + slog.Debug("saving fetched sites response") + err := s.SaveSite(ctx, []mongodb.Site{ + { + URL: v.URL, + Content: v.Content, + ScrapeDatetime: time.Now().UTC(), + }, + }) + if err != nil { + slog.Error("error saving site result", "error", err) + c.signalCh <- syscall.SIGTERM + } + } + }() +} + +// Worker use a worker pool to process jobs and send the restuls through a channel +func (c *Crawler) Worker(f func(string) (string, error)) { + defer c.wg.Done() + for url := range c.URLch { + content, err := f(url) + if err != nil { + slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err) + } + c.resultCh <- PageContent{Content: content, URL: url} + } +} + // Fetch returns the content of a url as a string func Fetch(url string) (string, error) { resp, err := http.Get(url) @@ -35,15 +135,3 @@ func Fetch(url string) (string, error) { return bodyString, nil } - -// Worker use a worker pool to process jobs and send the restuls through a channel -func Worker(wg *sync.WaitGroup, urls <-chan string, result chan<- string, f func(string) (string, error)) { - defer wg.Done() - for url := range urls { - content, err := f(url) - if err != nil { - slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err) - } - result <- content - } -} diff --git a/scrape_test.go b/scrape_test.go index 3aa18c4..fee07b7 100644 --- a/scrape_test.go +++ b/scrape_test.go @@ -1,45 +1,40 @@ package newsletter import ( - "fmt" + "context" "net/http" "net/http/httptest" - "sync" + "os" "testing" "time" + + "github.com/perebaj/newsletter/mongodb" ) -func TestWorker(_ *testing.T) { - urls := make(chan string) - results := make(chan string) +const fakeURL = "http://fakeurl.test" - f := func(s string) (string, error) { - time.Sleep(100 * time.Millisecond) - return fmt.Sprintf("job %s done", s), nil - } +// Even not verifying the result, this test is useful to check if the crawler is running properly, since it is +// using Mocks for the Storage and the Fetch function. +func TestCrawlerRun(t *testing.T) { + timeoutCh := time.After(time.Duration(150) * time.Millisecond) + ctx := context.Background() + s := NewStorageMock() - var wg sync.WaitGroup - wg.Add(2) - go Worker(&wg, urls, results, f) - go Worker(&wg, urls, results, f) + f := func(string) (string, error) { + return "Hello, World!", nil + } - go func() { - urls <- "job1" - urls <- "job2" - urls <- "job3" - urls <- "job4" - urls <- "job5" - urls <- "job6" - defer close(urls) - }() + signalCh := make(chan os.Signal, 1) + c := NewCrawler(1, time.Duration(1000)*time.Millisecond, signalCh) go func() { - wg.Wait() - defer close(results) + c.Run(ctx, s, f) }() - for i := 0; i < 6; i++ { - <-results + select { + case <-signalCh: + t.Error("unexpected signal error") + case <-timeoutCh: } } @@ -79,3 +74,24 @@ func TestGetReferences_Status500(t *testing.T) { t.Errorf("expected empty body, got %s", got) } } + +// TODO: Move the StorageMock to a separate file, preferable in the same package(mongodb) +type StorageMock interface { + SaveSite(ctx context.Context, site []mongodb.Site) error + DistinctEngineerURLs(ctx context.Context) ([]interface{}, error) +} + +type StorageMockImpl struct { +} + +func NewStorageMock() StorageMock { + return StorageMockImpl{} +} + +func (s StorageMockImpl) SaveSite(_ context.Context, _ []mongodb.Site) error { + return nil +} + +func (s StorageMockImpl) DistinctEngineerURLs(_ context.Context) ([]interface{}, error) { + return []interface{}{fakeURL}, nil +}