vendor
This commit is contained in:
parent
07ec5529ac
commit
f501abe660
532 changed files with 271781 additions and 0 deletions
157
vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go
generated
vendored
Normal file
157
vendor/github.com/digitalocean/go-libvirt/internal/event/stream.go
generated
vendored
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
// Copyright 2020 The go-libvirt Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// emptyEvent is used as a zero-value. Clients will never receive one of these;
|
||||
// they are only here to satisfy the compiler. See the comments in process() for
|
||||
// more information.
|
||||
type emptyEvent struct{}
|
||||
|
||||
func (emptyEvent) GetCallbackID() int32 { return 0 }
|
||||
|
||||
// Stream is an unbounded buffered event channel. The implementation
|
||||
// consists of a pair of unbuffered channels and a goroutine to manage them.
|
||||
// Client behavior will not cause incoming events to block.
|
||||
type Stream struct {
|
||||
// Program specifies the source of the events - libvirt or QEMU.
|
||||
Program uint32
|
||||
|
||||
// CallbackID is returned by the event registration call.
|
||||
CallbackID int32
|
||||
|
||||
// manage unbounded channel behavior.
|
||||
queue []Event
|
||||
qlen chan (chan int)
|
||||
in, out chan Event
|
||||
|
||||
// terminates processing
|
||||
shutdown context.CancelFunc
|
||||
}
|
||||
|
||||
// NewStream configures a new Event Stream. Incoming events are appended to a
|
||||
// queue, which is then relayed to the listening client. Client behavior will
|
||||
// not cause incoming events to block. It is the responsibility of the caller
|
||||
// to terminate the Stream via Shutdown() when no longer in use.
|
||||
func NewStream(program uint32, cbID int32) *Stream {
|
||||
s := &Stream{
|
||||
Program: program,
|
||||
CallbackID: cbID,
|
||||
in: make(chan Event),
|
||||
out: make(chan Event),
|
||||
qlen: make(chan (chan int)),
|
||||
}
|
||||
|
||||
// Start the processing loop, which will return a routine we can use to
|
||||
// shut the queue down later.
|
||||
s.shutdown = s.start()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Len will return the current count of events in the internal queue for a
|
||||
// stream. It does this by sending a message to the stream's process() loop,
|
||||
// which will then write the current length to the channel contained in that
|
||||
// message.
|
||||
func (s *Stream) Len() int {
|
||||
// Send a request to the process() loop to get the current length of the
|
||||
// queue
|
||||
ch := make(chan int)
|
||||
s.qlen <- ch
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// Recv returns the next available event from the Stream's queue.
|
||||
func (s *Stream) Recv() chan Event {
|
||||
return s.out
|
||||
}
|
||||
|
||||
// Push appends a new event to the queue.
|
||||
func (s *Stream) Push(e Event) {
|
||||
s.in <- e
|
||||
}
|
||||
|
||||
// Shutdown gracefully terminates Stream processing, releasing all internal
|
||||
// resources. Events which have not yet been received by the client will be
|
||||
// dropped. Subsequent calls to Shutdown() are idempotent.
|
||||
func (s *Stream) Shutdown() {
|
||||
if s.shutdown != nil {
|
||||
s.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
// start starts the event processing loop, which will continue to run until
|
||||
// terminated by the returned context.CancelFunc.
|
||||
func (s *Stream) start() context.CancelFunc {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go s.process(ctx)
|
||||
|
||||
return cancel
|
||||
}
|
||||
|
||||
// process manages an Stream's lifecycle until canceled by the provided context.
|
||||
// Incoming events are appended to a queue which is then relayed to the
|
||||
// listening client. New events pushed onto the queue will not block if the
|
||||
// client is not actively polling for them; the stream will buffer them
|
||||
// internally.
|
||||
func (s *Stream) process(ctx context.Context) {
|
||||
// Close the output channel so that clients know this stream is finished.
|
||||
// We don't close s.in to avoid creating a race with the stream's Push()
|
||||
// function.
|
||||
defer close(s.out)
|
||||
|
||||
// This function is used to retrieve the next event from the queue, to be
|
||||
// sent to the client. If there are no more events to send, it returns a nil
|
||||
// channel and a zero-value event.
|
||||
nextEvent := func() (chan Event, Event) {
|
||||
sendCh := chan Event(nil)
|
||||
next := Event(emptyEvent{})
|
||||
if len(s.queue) > 0 {
|
||||
sendCh = s.out
|
||||
next = s.queue[0]
|
||||
}
|
||||
return sendCh, next
|
||||
}
|
||||
|
||||
// The select statement in this loop relies on the fact that a send to a nil
|
||||
// channel will block forever. If we have no entries in the queue, the
|
||||
// sendCh variable will be nil, so the clause that attempts to send an event
|
||||
// to the client will never complete. Clients will never receive an
|
||||
// emptyEvent.
|
||||
for {
|
||||
sendCh, nextEvt := nextEvent()
|
||||
|
||||
select {
|
||||
// new event received, append to queue
|
||||
case e := <-s.in:
|
||||
s.queue = append(s.queue, e)
|
||||
|
||||
case lenCh := <-s.qlen:
|
||||
lenCh <- len(s.queue)
|
||||
|
||||
// client received an event, pop from queue
|
||||
case sendCh <- nextEvt:
|
||||
s.queue = s.queue[1:]
|
||||
|
||||
// shutdown requested
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue