Skip to the content.

2020的笔记,因为与2021没有很大的出入

1. outline

教授以webcrawler为例,向我们解释了goroutines的相关用法。

1.1 webcrawler

网络爬虫所做的事情就是,若刚开始爬取第一个页面,那它就会将该页面上的所有urls都提取出来;接下里根据这些URLS来获取新的页面,之后再从页面里获取新的urls,如此迭代,直到抓取到所有的页面。

因为有些页面会循环往复,若我们什么都不做的话,会陷入套娃循环中。

1.2 key points

因此爬虫要记录已经爬取过的页面或是我们正在爬取的页面的集合,以确保我们不会第二次爬取任何已经在该集合中的页面。

我们可以将它看作⼀个树形结构,我们的任务是从实际网页的循环路径中找到⼀个树形子集。即避免陷入死循环,避免重复爬取某些页面。

增加并行爬取页面的goroutines的数量,直到每秒获取的吞吐量停止增长为止。(即直至占满网络带宽)

最难的点在于,我们如何知道爬虫何时完成工作?我们需要在代码中实现这些内容。

1.3 关于goroutines的重要知识点

因为初学go,不大熟悉goroutines的机制,这里举一个例子来说明go func()与func()的不同点。

问题:假设crawler()是用go编写的函数,crawler()和go crawler()两种执行方式有什么不同?

两种执行方式有以下三点不同:

举一个极端的例子,如果我这么写:

func main() {
   go crawler()
   go crawler()
}

很有可能我们等不到函数的执行,main()就已经返回了。

总的来说,crawler()go crawler() 两种执行方式都可以用于实现并发处理,但 go crawler() 更适合于处理大量并发任务,能够提高程序的效率和性能。

2. examples

教授给我们讲述了三种方式,下面做记录。这些方法的思想在后续lab中都会有应用。

2.1 Serial Crawler

串行爬虫,它可以在网络路径图中有效地执行深度优先搜索(DFS)。

这种方式传入的fetched一定要是以reference的方式,不能是拷贝的方式。

package main

import (
	"fmt"
	"sync"
)
//
// Serial crawler
//
// 本身并不具备并行的能力,因此我们要在后面使用goroutines让其并行执行
// 使用fetched记录已经爬取的,或正在爬取的url,string->bool的映射
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
	if fetched[url] {
		return
	}
	fetched[url] = true
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	for _, u := range urls {
		Serial(u, fetcher, fetched)
        // go Serial(u, fetcher, fetched)  不能这么干,因为main中很有可能等不及它执行就已经返回了
	}
	return
}

2.2 Concurrent crawler with shared state and Mutex

同样,加锁的区间被称为critical area,在同一时间内只能有一条goroutine访问。

主要涉及到了sync.Mutexsync.WaitGroupdefer(延迟执行语句)。

在下面的例子中,出现了defer sync.WaitGroup().done()的搭配,因为这代表是当函数完成时,将内部的counter–,完美的契合了done()的功能需求,且无需等待。

为什么这里不直接用map?

map在go中是一个指向heap的指针,map传入的内容始终是一个引用,我们不需要在前面放一个*来表示他是一个指针。

每个ConcurrentMutex都在等待他自己衍生的子ConcurrentMutex结束,然后return。

//
// Concurrent crawler with shared state and Mutex
//

type fetchState struct {
	mu      sync.Mutex
	fetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
    // 获取url状态需要互斥访问
	f.mu.Lock()
	already := f.fetched[url]
	f.fetched[url] = true
	f.mu.Unlock()

	if already {
		return
	}

	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	var done sync.WaitGroup
    // 这里for循环做的是将u指向了不同的地址,这就是改变值的原理
	for _, u := range urls {
     	// wait+1
		done.Add(1)
        // u是一个在不断变化的量,我们希望第一次执行的go func看到的是第一次拿到的url,因此用一个临时
        // 变量来保存下来这个状态,防止下面的函数读错url
        u2 := u
        // 内层函数一般可以看到外层的变量,可以直接引用
		go func() {
            // Done.done():当前func退出时,计数器减一
            // defer后面的语句不会马上调用, 而是延迟到函数结束时调用,这样做十分合理
            // Defer延迟执行语句,不必等待他完成,即使该func故障了,也不会影响全局的执行,
            // 若不加defer,遇到故障done.Done()不会执行;若加了defer,则不管怎样,到最后这个语句都会执行
			defer done.Done()
			ConcurrentMutex(u2, fetcher, f)
		}()
        // 此处是作为参数直接进行了拷贝,拥有了那一时刻u的值的一份拷贝
		//go func(u string) {
		//	defer done.Done()
		//	ConcurrentMutex(u, fetcher, f)
		//}(u)
	}
    // 等待当前任务完成才能返回
	done.Wait()
	return
}

func makeState() *fetchState {
	f := &fetchState{}
	f.fetched = make(map[string]bool)
	return f
}

若外部函数已经return了,但是内部函数引用了一个外部函数中的变量,那么会出现什么结果呢?

go内置了race检测工具,比如说对于上述程序:crawler.go,我们在运行的时候加上-race参数就可以检测了,但是只能在运行期间检测,没有动态检测功能,即只能检测到运行部分的代码中存在的race问题,没有运行的代码段是无法检测到问题的,因此建立的测试标准要能让所有代码都运行(即使某段代码发生race概率很低,假设运行10W次只出现几十次这种,他也能马上检测出来存在race):

go run -race crawler.go

可以检测代码中是否存在对共享变量的race。

img

这代表,线程读到的某个共享变量,在之前被改写了:

这里的含义是,比如说t1读到url为false,因此他要对此url进行爬取,接下来t1要把此url设为true,但是在未设置完成之前,又有t2读到了此url,因为未修改完成,因此读到的是false,故会发生重复读取的情况。

若我们想限制goroutines的数量,该怎么做呢?(现实使用中,网站上可能有数十亿url,我们不想让程序创建10亿个goroutines,这样会爆内存)

我们可以创建threads pool,通过复用thread来抓取url,而不是每出现一个url就创建一个线程。

2.3 Concurrent crawler with channels

这种方式不必使用lock,不必使用共享maps(标记url是否已经被爬取了),我们使用的是channels。channels和shared_memory方式是截然不同的,channels不会造成race。虽然本质是channels也是内存,但是workers对channel做的是发送,而master是接收,因此二者不存在race。

2.2例子是每抓取一个URL就会启动一个goroutine(树状路径),本例中是只有一个master会做创建动作。

//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{}
	} else {
		ch <- urls
	}
}

func master(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
    // 每当master从channel里读到一批数据时,master就知道有一个worker完成了任务,所以n--
    // 若channels中没有数据,则for会一直block在这里,他会在这里等待
    // 要明白这个是channel,而不是普通的变量
	for urls := range ch {
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
                // 本质是是每个worker都会往channel里面发送数据,每次发送的就是一批urls
				go worker(u, ch, fetcher)
			}
		}
		n -= 1
        // n为0时也意味着所有worker完成了任务
        // 若无法出发这里的break,那么这个大的for循环永远不会结束
		if n == 0 {
			break
		}
	}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
    // 如果有必要,master中的for循环的channel会等待这里的数据传入
	go func() {
		ch <- []string{url}
	}()
	master(ch, fetcher)
}

back.