Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 56 additions & 30 deletions cmd/gcs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,35 +385,24 @@ func main() {
if err != nil {
logrus.WithError(err).Fatal("failed to initialize new runc runtime")
}
mux := bridge.NewBridgeMux()
b := bridge.Bridge{
Handler: mux,
EnableV4: *v4,
}

publisher := &bridge.Publisher{}
h := hcsv2.NewHost(rtime, tport, initialEnforcer, logWriter)
// Initialize virtual pod support in the host
if err := h.InitializeVirtualPodSupport(virtualPodsControl); err != nil {
logrus.WithError(err).Warn("Virtual pod support initialization failed")
}
b.AssignHandlers(mux, h)

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
if *useInOutErr {
bridgeIn = os.Stdin
bridgeOut = os.Stdout
} else {
const commandPort uint32 = 0x40000000
bridgeCon, err := tport.Dial(commandPort)
if err != nil {
logrus.WithFields(logrus.Fields{
"port": commandPort,
logrus.ErrorKey: err,
}).Fatal("failed to dial host vsock connection")
}
bridgeIn = bridgeCon
bridgeOut = bridgeCon
}
const commandPort uint32 = 0x40000000

// Reconnect loop: on each iteration we create a fresh bridge+mux, dial the
Comment thread
jterry75 marked this conversation as resolved.
// host, and serve until the connection drops. After a live migration the
// vsock connection breaks; we re-dial and continue.
//
// During live migration the VM is frozen and only wakes up when the host
// shim is ready, so the vsock port should be immediately available. We
// use a tight retry interval instead of exponential backoff.
const reconnectInterval = 100 * time.Millisecond

event := cgroups1.MemoryThresholdEvent(*gcsMemLimitBytes, false)
gefd, err := gcsControl.RegisterMemoryEvent(event)
Expand All @@ -430,15 +419,13 @@ func main() {
oomFile := os.NewFile(oom, "cefd")
defer oomFile.Close()

// Setup OOM monitoring for virtual-pods cgroup
virtualPodsOom, err := virtualPodsControl.OOMEventFD()
if err != nil {
logrus.WithError(err).Fatal("failed to retrieve the virtual-pods cgroups oom eventfd")
}
virtualPodsOomFile := os.NewFile(virtualPodsOom, "vp-oomfd")
defer virtualPodsOomFile.Close()

// time synchronization service
if !(*disableTimeSync) {
if err = startTimeSyncService(); err != nil {
logrus.WithError(err).Fatal("failed to start time synchronization service")
Expand All @@ -448,10 +435,49 @@ func main() {
go readMemoryEvents(startTime, gefdFile, "/gcs", int64(*gcsMemLimitBytes), gcsControl)
go readMemoryEvents(startTime, oomFile, "/containers", containersLimit, containersControl)
go readMemoryEvents(startTime, virtualPodsOomFile, "/containers/virtual-pods", containersLimit, virtualPodsControl)
err = b.ListenAndServe(bridgeIn, bridgeOut)
if err != nil {
logrus.WithFields(logrus.Fields{
logrus.ErrorKey: err,
}).Fatal("failed to serve gcs service")

mux := bridge.NewBridgeMux()
b := bridge.Bridge{
Handler: mux,
EnableV4: *v4,
Publisher: publisher,
}
b.AssignHandlers(mux, h)

// Reconnect loop: dial the host, serve until the connection drops, then
// re-dial. During live migration the VM is frozen and only wakes up when
// the destination host shim is ready, so the vsock port should be
// immediately available.
for {
publisher.SetBridge(&b)

var bridgeIn io.ReadCloser
var bridgeOut io.WriteCloser
if *useInOutErr {
bridgeIn = os.Stdin
bridgeOut = os.Stdout
} else {
bridgeCon, dialErr := tport.Dial(commandPort)
if dialErr != nil {
logrus.WithError(dialErr).Warn("failed to dial host, retrying")
time.Sleep(reconnectInterval)
continue
}
bridgeIn = bridgeCon
bridgeOut = bridgeCon
}

logrus.Info("bridge connected, serving")

serveErr := b.ListenAndServe(bridgeIn, bridgeOut)
Comment thread
jterry75 marked this conversation as resolved.
publisher.SetBridge(nil)

if b.ShutdownRequested() {
logrus.Info("bridge shutdown requested, exiting reconnect loop")
break
}

logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect")
time.Sleep(reconnectInterval)
}
}
39 changes: 32 additions & 7 deletions internal/guest/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ type Bridge struct {
hasQuitPending atomic.Bool

protVer prot.ProtocolVersion

// Publisher is a stable notification sink that survives bridge recreation
// during live migration.
Publisher *Publisher
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exported?

}

// ShutdownRequested returns true if the bridge has been asked to shut down.
func (b *Bridge) ShutdownRequested() bool {
return b.hasQuitPending.Load()
}

// AssignHandlers creates and assigns the appropriate bridge
Expand Down Expand Up @@ -226,17 +235,20 @@ func (b *Bridge) AssignHandlers(mux *Mux, host *hcsv2.Host) {
func (b *Bridge) ListenAndServe(bridgeIn io.ReadCloser, bridgeOut io.WriteCloser) error {
requestChan := make(chan *Request)
requestErrChan := make(chan error)
b.responseChan = make(chan bridgeResponse)
b.responseChan = make(chan bridgeResponse, 16)
responseErrChan := make(chan error)
b.quitChan = make(chan bool)

defer close(b.quitChan)
// Close order matters: quitChan must close first so PublishNotification
// and in-flight handlers see it before responseChan becomes invalid.
// responseChan is never explicitly closed — the response writer exits
// when quitChan closes and no more sends are possible.
defer bridgeIn.Close()
defer close(requestErrChan)
defer close(requestChan)
defer bridgeOut.Close()
defer close(responseErrChan)
defer close(b.responseChan)
defer close(requestChan)
defer close(requestErrChan)
defer bridgeIn.Close()
defer close(b.quitChan)

// Receive bridge requests and schedule them to be processed.
go func() {
Expand Down Expand Up @@ -440,7 +452,20 @@ func (b *Bridge) PublishNotification(n *prot.ContainerNotification) {
},
response: n,
}
b.responseChan <- resp
// Check quitChan first to avoid sending to a dead bridge.
select {
case <-b.quitChan:
logrus.WithField("containerID", n.ContainerID).
Warn("bridge quit, dropping notification")
return
default:
}
select {
case b.responseChan <- resp:
case <-b.quitChan:
logrus.WithField("containerID", n.ContainerID).
Warn("bridge quit, dropping notification")
}
}

// setErrorForResponseBase modifies the passed-in MessageResponseBase to
Expand Down
2 changes: 1 addition & 1 deletion internal/guest/bridge/bridge_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *Bridge) createContainerV2(r *Request) (_ RequestResponse, err error) {
Result: 0,
ResultInfo: "",
}
b.PublishNotification(notification)
b.Publisher.Publish(notification)
}()

return &prot.ContainerCreateResponse{}, nil
Expand Down
39 changes: 39 additions & 0 deletions internal/guest/bridge/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build linux

package bridge

import (
"sync"

"github.com/Microsoft/hcsshim/internal/guest/prot"
"github.com/sirupsen/logrus"
)

// Publisher provides a stable reference for container exit goroutines
// to publish notifications through. It survives bridge recreation
// during live migration — when the bridge is nil, notifications are dropped.
type Publisher struct {
mu sync.Mutex
b *Bridge
}

// SetBridge attaches or detaches the current bridge.
// Pass nil to detach (notifications will be dropped until a new bridge is set).
func (p *Publisher) SetBridge(b *Bridge) {
p.mu.Lock()
defer p.mu.Unlock()
p.b = b
}

// Publish sends a container notification to the current bridge.
// If no bridge is connected, the notification is dropped with a warning.
func (p *Publisher) Publish(n *prot.ContainerNotification) {
p.mu.Lock()
defer p.mu.Unlock()
if p.b == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just use a queue here? On Publish if nil, append, else drain and publish this one. on SetBridge, drain.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we drop exit events the shim will never understand that state I dont think

logrus.WithField("containerID", n.ContainerID).
Warn("bridge not connected, dropping container notification")
return
}
p.b.PublishNotification(n)
}
26 changes: 26 additions & 0 deletions internal/guest/bridge/publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build linux

package bridge

import (
"testing"

"github.com/Microsoft/hcsshim/internal/guest/prot"
)

func TestPublisher_NilBridge(t *testing.T) {
p := &Publisher{}
// Should not panic when bridge is nil
p.Publish(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "test"},
})
}

func TestPublisher_SetBridgeNil(t *testing.T) {
p := &Publisher{}
p.SetBridge(nil)
// Should not panic
p.Publish(&prot.ContainerNotification{
MessageBase: prot.MessageBase{ContainerID: "test"},
})
}
Loading