一文搞懂Go通道( 三 )

接收channel的数据的流程如下:
CASE1:前置channel为nil的场景:
如果block为非阻塞,直接return;
如果block为阻塞,就调用gopark()阻塞当前goroutine,并抛出异常 。

  • 前置场景,block为非阻塞,且channel为非缓冲队列且sender等待队列为空 或者 channel为有缓冲队列但是队列里面元素数量为0,且channel未关闭,这个时候直接return;
  • 调用 lock(&c.lock) 锁住channel的全局锁;
CASE2:channel已经被关闭且channel缓冲中没有数据了,这时直接返回success和空值;
CASE3:sender队列非空,调用 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)
函数处理:
1.先取channel缓冲队列的对头元素复制给receiver(也就是ep);
2.将sender队列的对头元素里面的数据复制到channel缓冲队列刚刚弹出的元素的位置,这样缓冲队列就不用移动数据了 。
channel是非缓冲channel,直接调用recvDirect函数直接从sender recv元素到ep对象,这样就只用复制一次;
对于sender队列非空情况下,有缓冲的channel的缓冲队列一定是满的:
释放channel的全局锁;
调用goready函数标记当前goroutine处于ready,可以运行的状态;
CASE4:sender队列为空,缓冲队列非空,直接取队列元素,移动头索引;
CASE5:sender队列为空、缓冲队列也没有元素且不阻塞协程,直接return (false,false);
CASE6:sender队列为空且channel的缓存队列为空,将goroutine加入recv队列,并阻塞 。
2.4.2写入数据/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed.it is easiest to loop and re-run * the operation; we'll see that it's now closed. */func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true}复制代码


推荐阅读