git » go-net » unix-separados » tree

[unix-separados] / http2 / writesched.go

// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package http2

import "fmt"

// frameWriteMsg is a request to write a frame.
type frameWriteMsg struct {
	// write is the interface value that does the writing, once the
	// writeScheduler (below) has decided to select this frame
	// to write. The write functions are all defined in write.go.
	write writeFramer

	stream *stream // used for prioritization. nil for non-stream frames.

	// done, if non-nil, must be a buffered channel with space for
	// 1 message and is sent the return value from write (or an
	// earlier error) when the frame has been written.
	done chan error
}

// for debugging only:
func (wm frameWriteMsg) String() string {
	var streamID uint32
	if wm.stream != nil {
		streamID = wm.stream.id
	}
	var des string
	if s, ok := wm.write.(fmt.Stringer); ok {
		des = s.String()
	} else {
		des = fmt.Sprintf("%T", wm.write)
	}
	return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
}

// writeScheduler tracks pending frames to write, priorities, and decides
// the next one to use. It is not thread-safe.
type writeScheduler struct {
	// zero are frames not associated with a specific stream.
	// They're sent before any stream-specific freams.
	zero writeQueue

	// maxFrameSize is the maximum size of a DATA frame
	// we'll write. Must be non-zero and between 16K-16M.
	maxFrameSize uint32

	// sq contains the stream-specific queues, keyed by stream ID.
	// when a stream is idle, it's deleted from the map.
	sq map[uint32]*writeQueue

	// canSend is a slice of memory that's reused between frame
	// scheduling decisions to hold the list of writeQueues (from sq)
	// which have enough flow control data to send. After canSend is
	// built, the best is selected.
	canSend []*writeQueue

	// pool of empty queues for reuse.
	queuePool []*writeQueue
}

func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
	if len(q.s) != 0 {
		panic("queue must be empty")
	}
	ws.queuePool = append(ws.queuePool, q)
}

func (ws *writeScheduler) getEmptyQueue() *writeQueue {
	ln := len(ws.queuePool)
	if ln == 0 {
		return new(writeQueue)
	}
	q := ws.queuePool[ln-1]
	ws.queuePool = ws.queuePool[:ln-1]
	return q
}

func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }

func (ws *writeScheduler) add(wm frameWriteMsg) {
	st := wm.stream
	if st == nil {
		ws.zero.push(wm)
	} else {
		ws.streamQueue(st.id).push(wm)
	}
}

func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
	if q, ok := ws.sq[streamID]; ok {
		return q
	}
	if ws.sq == nil {
		ws.sq = make(map[uint32]*writeQueue)
	}
	q := ws.getEmptyQueue()
	ws.sq[streamID] = q
	return q
}

// take returns the most important frame to write and removes it from the scheduler.
// It is illegal to call this if the scheduler is empty or if there are no connection-level
// flow control bytes available.
func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
	if ws.maxFrameSize == 0 {
		panic("internal error: ws.maxFrameSize not initialized or invalid")
	}

	// If there any frames not associated with streams, prefer those first.
	// These are usually SETTINGS, etc.
	if !ws.zero.empty() {
		return ws.zero.shift(), true
	}
	if len(ws.sq) == 0 {
		return
	}

	// Next, prioritize frames on streams that aren't DATA frames (no cost).
	for id, q := range ws.sq {
		if q.firstIsNoCost() {
			return ws.takeFrom(id, q)
		}
	}

	// Now, all that remains are DATA frames with non-zero bytes to
	// send. So pick the best one.
	if len(ws.canSend) != 0 {
		panic("should be empty")
	}
	for _, q := range ws.sq {
		if n := ws.streamWritableBytes(q); n > 0 {
			ws.canSend = append(ws.canSend, q)
		}
	}
	if len(ws.canSend) == 0 {
		return
	}
	defer ws.zeroCanSend()

	// TODO: find the best queue
	q := ws.canSend[0]

	return ws.takeFrom(q.streamID(), q)
}

// zeroCanSend is defered from take.
func (ws *writeScheduler) zeroCanSend() {
	for i := range ws.canSend {
		ws.canSend[i] = nil
	}
	ws.canSend = ws.canSend[:0]
}

// streamWritableBytes returns the number of DATA bytes we could write
// from the given queue's stream, if this stream/queue were
// selected. It is an error to call this if q's head isn't a
// *writeData.
func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
	wm := q.head()
	ret := wm.stream.flow.available() // max we can write
	if ret == 0 {
		return 0
	}
	if int32(ws.maxFrameSize) < ret {
		ret = int32(ws.maxFrameSize)
	}
	if ret == 0 {
		panic("internal error: ws.maxFrameSize not initialized or invalid")
	}
	wd := wm.write.(*writeData)
	if len(wd.p) < int(ret) {
		ret = int32(len(wd.p))
	}
	return ret
}

func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
	wm = q.head()
	// If the first item in this queue costs flow control tokens
	// and we don't have enough, write as much as we can.
	if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
		allowed := wm.stream.flow.available() // max we can write
		if allowed == 0 {
			// No quota available. Caller can try the next stream.
			return frameWriteMsg{}, false
		}
		if int32(ws.maxFrameSize) < allowed {
			allowed = int32(ws.maxFrameSize)
		}
		// TODO: further restrict the allowed size, because even if
		// the peer says it's okay to write 16MB data frames, we might
		// want to write smaller ones to properly weight competing
		// streams' priorities.

		if len(wd.p) > int(allowed) {
			wm.stream.flow.take(allowed)
			chunk := wd.p[:allowed]
			wd.p = wd.p[allowed:]
			// Make up a new write message of a valid size, rather
			// than shifting one off the queue.
			return frameWriteMsg{
				stream: wm.stream,
				write: &writeData{
					streamID: wd.streamID,
					p:        chunk,
					// even if the original had endStream set, there
					// arebytes remaining because len(wd.p) > allowed,
					// so we know endStream is false:
					endStream: false,
				},
				// our caller is blocking on the final DATA frame, not
				// these intermediates, so no need to wait:
				done: nil,
			}, true
		}
		wm.stream.flow.take(int32(len(wd.p)))
	}

	q.shift()
	if q.empty() {
		ws.putEmptyQueue(q)
		delete(ws.sq, id)
	}
	return wm, true
}

func (ws *writeScheduler) forgetStream(id uint32) {
	q, ok := ws.sq[id]
	if !ok {
		return
	}
	delete(ws.sq, id)

	// But keep it for others later.
	for i := range q.s {
		q.s[i] = frameWriteMsg{}
	}
	q.s = q.s[:0]
	ws.putEmptyQueue(q)
}

type writeQueue struct {
	s []frameWriteMsg
}

// streamID returns the stream ID for a non-empty stream-specific queue.
func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }

func (q *writeQueue) empty() bool { return len(q.s) == 0 }

func (q *writeQueue) push(wm frameWriteMsg) {
	q.s = append(q.s, wm)
}

// head returns the next item that would be removed by shift.
func (q *writeQueue) head() frameWriteMsg {
	if len(q.s) == 0 {
		panic("invalid use of queue")
	}
	return q.s[0]
}

func (q *writeQueue) shift() frameWriteMsg {
	if len(q.s) == 0 {
		panic("invalid use of queue")
	}
	wm := q.s[0]
	// TODO: less copy-happy queue.
	copy(q.s, q.s[1:])
	q.s[len(q.s)-1] = frameWriteMsg{}
	q.s = q.s[:len(q.s)-1]
	return wm
}

func (q *writeQueue) firstIsNoCost() bool {
	if df, ok := q.s[0].write.(*writeData); ok {
		return len(df.p) == 0
	}
	return true
}