Skip to content

Commit

Permalink
Merge pull request #9 from perebaj/crawler_struct
Browse files Browse the repository at this point in the history
✨ crawler
  • Loading branch information
perebaj authored Jan 19, 2024
2 parents 1f7366d + 526f17d commit cb825f4
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 91 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ Some skilled engineers even have a blog site where they push some gold content,

# Roadmap

- Given a list of URLs and a user email, scrape those URLs and associate them with the email
- Save the results of the web scrape into a MongoDB to create the comparison logic
- Create a scheduler mechanism that will activate the scrape and comparison logic
- Trigger an email mechanism to send messages to the users when they favorite author publish something new 🏖️
This program aims to create the following features:

- Given a list of websites, that are located in a MongoDB collection, scrape the content of each website and save it in another MongoDB collection.
- After the scraping, calculate the similarity between the new content and the previous content of each website, and update the MongoDB collection
with this information.
- All the registered users will receive an email according to the URL that they have registered notifying them about news in their favorite engineers websites.

Obs: All these flows will be trigerred by a cron job.
56 changes: 7 additions & 49 deletions cmd/newsletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand All @@ -17,21 +16,19 @@ import (

// Config is the struct that contains the configuration for the service.
type Config struct {
LogLevel string
LogType string
LoopDurationMinutes time.Duration
Mongo mongodb.Config
LogLevel string
LogType string
Mongo mongodb.Config
}

func main() {

cfg := Config{
LogLevel: getEnvWithDefault("LOG_LEVEL", "INFO"),
LogType: getEnvWithDefault("LOG_TYPE", "json"),
LogLevel: getEnvWithDefault("LOG_LEVEL", ""),
LogType: getEnvWithDefault("LOG_TYPE", ""),
Mongo: mongodb.Config{
URI: getEnvWithDefault("NL_MONGO_URI", ""),
},
LoopDurationMinutes: time.Duration(10) * time.Second,
}

signalCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -74,49 +71,10 @@ func main() {
signalCh <- syscall.SIGTERM
}

URLCh := make(chan string)
fetchResultCh := make(chan string)

var wg sync.WaitGroup
wg.Add(5)

for i := 0; i < 5; i++ {
go newsletter.Worker(&wg, URLCh, fetchResultCh, newsletter.Fetch)
}

go func() {
defer close(URLCh)
for range time.Tick(cfg.LoopDurationMinutes) {
slog.Info("fetching engineers")
gotURLs, err := storage.DistinctEngineerURLs(ctx)
if err != nil {
slog.Error("error getting engineers", "error", err)
signalCh <- syscall.SIGTERM
}

slog.Info("fetched engineers", "engineers", len(gotURLs))
for _, url := range gotURLs {
URLCh <- url.(string)
}
}
}()

go func() {
wg.Wait()
defer close(fetchResultCh)
}()
crawler := newsletter.NewCrawler(5, time.Duration(10)*time.Second, signalCh)

go func() {
for v := range fetchResultCh {
slog.Info("saving fetched sites response", "response", v[:10])
err := storage.SaveSite(ctx, []mongodb.Site{
{Content: v, ScrapeDatetime: time.Now().UTC()},
})
if err != nil {
slog.Error("error saving site result", "error", err)
signalCh <- syscall.SIGTERM
}
}
crawler.Run(ctx, storage, newsletter.Fetch)
}()

<-signalCh
Expand Down
Binary file modified cmd/newsletter/newsletter
Binary file not shown.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ services:
GOLANGCI_LINT_VERSION: $GOLANGCI_LINT_VERSION
environment:
NL_MONGO_URI: "mongodb://root:root@mongodb:27017"
LOG_LEVEL: "DEBUG"
LOG_TYPE: "json"
depends_on:
- mongodb
volumes:
Expand Down
112 changes: 100 additions & 12 deletions scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,112 @@ package newsletter

import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"os"
"sync"
"syscall"
"time"

"github.com/perebaj/newsletter/mongodb"
)

// PageContent is the struct that gather important information of a website
type PageContent struct {
Content string
URL string
}

// Storage is the interface that wraps the basic methods to save and get data from the database
type Storage interface {
SaveSite(ctx context.Context, site []mongodb.Site) error
DistinctEngineerURLs(ctx context.Context) ([]interface{}, error)
}

// Crawler contains the necessary information to run the crawler
type Crawler struct {
URLch chan string
resultCh chan PageContent
signalCh chan os.Signal
MaxJobs int
wg *sync.WaitGroup
// scheduler is the pace time between each fetch
scheduler time.Duration
}

// NewCrawler initializes a new Crawler
func NewCrawler(maxJobs int, s time.Duration, signalCh chan os.Signal) *Crawler {
return &Crawler{
URLch: make(chan string),
resultCh: make(chan PageContent),
signalCh: signalCh,
wg: &sync.WaitGroup{},
MaxJobs: maxJobs,
scheduler: s,
}
}

// Run starts the crawler, where s represents the storage and f the function to fetch the content of a website
func (c *Crawler) Run(ctx context.Context, s Storage, f func(string) (string, error)) {
c.wg.Add(c.MaxJobs)
for i := 0; i < c.MaxJobs; i++ {
go c.Worker(f)
}

go func() {
defer close(c.URLch)
for range time.Tick(c.scheduler) {
slog.Debug("fetching engineers")
gotURLs, err := s.DistinctEngineerURLs(ctx)
if err != nil {
slog.Error("error getting engineers", "error", err)
c.signalCh <- syscall.SIGTERM
}

slog.Debug("fetched engineers", "engineers", len(gotURLs))
for _, url := range gotURLs {
c.URLch <- url.(string)
}
}
}()

go func() {
c.wg.Wait()
defer close(c.resultCh)
}()

go func() {
for v := range c.resultCh {
slog.Debug("saving fetched sites response")
err := s.SaveSite(ctx, []mongodb.Site{
{
URL: v.URL,
Content: v.Content,
ScrapeDatetime: time.Now().UTC(),
},
})
if err != nil {
slog.Error("error saving site result", "error", err)
c.signalCh <- syscall.SIGTERM
}
}
}()
}

// Worker use a worker pool to process jobs and send the restuls through a channel
func (c *Crawler) Worker(f func(string) (string, error)) {
defer c.wg.Done()
for url := range c.URLch {
content, err := f(url)
if err != nil {
slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err)
}
c.resultCh <- PageContent{Content: content, URL: url}
}
}

// Fetch returns the content of a url as a string
func Fetch(url string) (string, error) {
resp, err := http.Get(url)
Expand All @@ -35,15 +135,3 @@ func Fetch(url string) (string, error) {

return bodyString, nil
}

// Worker use a worker pool to process jobs and send the restuls through a channel
func Worker(wg *sync.WaitGroup, urls <-chan string, result chan<- string, f func(string) (string, error)) {
defer wg.Done()
for url := range urls {
content, err := f(url)
if err != nil {
slog.Error(fmt.Sprintf("error getting reference: %s", url), "error", err)
}
result <- content
}
}
68 changes: 42 additions & 26 deletions scrape_test.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,40 @@
package newsletter

import (
"fmt"
"context"
"net/http"
"net/http/httptest"
"sync"
"os"
"testing"
"time"

"github.com/perebaj/newsletter/mongodb"
)

func TestWorker(_ *testing.T) {
urls := make(chan string)
results := make(chan string)
const fakeURL = "http://fakeurl.test"

f := func(s string) (string, error) {
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("job %s done", s), nil
}
// Even not verifying the result, this test is useful to check if the crawler is running properly, since it is
// using Mocks for the Storage and the Fetch function.
func TestCrawlerRun(t *testing.T) {
timeoutCh := time.After(time.Duration(150) * time.Millisecond)
ctx := context.Background()
s := NewStorageMock()

var wg sync.WaitGroup
wg.Add(2)
go Worker(&wg, urls, results, f)
go Worker(&wg, urls, results, f)
f := func(string) (string, error) {
return "Hello, World!", nil
}

go func() {
urls <- "job1"
urls <- "job2"
urls <- "job3"
urls <- "job4"
urls <- "job5"
urls <- "job6"
defer close(urls)
}()
signalCh := make(chan os.Signal, 1)

c := NewCrawler(1, time.Duration(1000)*time.Millisecond, signalCh)
go func() {
wg.Wait()
defer close(results)
c.Run(ctx, s, f)
}()

for i := 0; i < 6; i++ {
<-results
select {
case <-signalCh:
t.Error("unexpected signal error")
case <-timeoutCh:
}
}

Expand Down Expand Up @@ -79,3 +74,24 @@ func TestGetReferences_Status500(t *testing.T) {
t.Errorf("expected empty body, got %s", got)
}
}

// TODO: Move the StorageMock to a separate file, preferable in the same package(mongodb)
type StorageMock interface {
SaveSite(ctx context.Context, site []mongodb.Site) error
DistinctEngineerURLs(ctx context.Context) ([]interface{}, error)
}

type StorageMockImpl struct {
}

func NewStorageMock() StorageMock {
return StorageMockImpl{}
}

func (s StorageMockImpl) SaveSite(_ context.Context, _ []mongodb.Site) error {
return nil
}

func (s StorageMockImpl) DistinctEngineerURLs(_ context.Context) ([]interface{}, error) {
return []interface{}{fakeURL}, nil
}

0 comments on commit cb825f4

Please sign in to comment.