diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index cb7c6f4bee..82f859d5b0 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -136,6 +136,29 @@ func CommandContext(ctx context.Context, host cow.ProcessHost, name string, arg return cmd } +// Attach wires IO relays to a process the caller has already resolved. +// Counterpart of [Command] / [CommandContext] for the destination side +// of live migration: caller obtains `p` via the host's restore path +// (e.g. gcs.Container.OpenProcessWithIO) and Attach binds the +// process's stdio to the supplied destination streams. +func Attach(ctx context.Context, p cow.Process, stdin io.Reader, stdout, stderr io.Writer) (*Cmd, error) { + cmd := &Cmd{ + Process: p, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Log: log.G(ctx).WithField("pid", p.Pid()), + Context: ctx, + ExitState: &ExitState{}, + allDoneCh: make(chan struct{}), + CopyAfterExitTimeout: time.Second, + } + if err := cmd.startRelay(); err != nil { + return nil, err + } + return cmd, nil +} + // Start starts a command. The caller must ensure that if Start succeeds, // Wait is eventually called to clean up resources. func (c *Cmd) Start() error { @@ -209,7 +232,13 @@ func (c *Cmd) Start() error { c.Log = c.Log.WithField("pid", p.Pid()) } - // Start relaying process IO. + return c.startRelay() +} + +// startRelay wires the IO relay goroutines and the context-cancel +// killer to [Cmd.Process]. +func (c *Cmd) startRelay() error { + p := c.Process stdin, stdout, stderr := p.Stdio() if c.Stdin != nil { // Do not make stdin part of the error group because there is no way for diff --git a/internal/cmd/cmd_test.go b/internal/cmd/cmd_test.go index d576d32778..fe28704f95 100644 --- a/internal/cmd/cmd_test.go +++ b/internal/cmd/cmd_test.go @@ -137,6 +137,11 @@ func (p *localProcess) Pid() int { return p.p.Pid } +// IOPorts always returns zeros: the test fake uses OS pipes, not vsock. +func (p *localProcess) IOPorts() (stdin, stdout, stderr uint32) { + return 0, 0, 0 +} + func (p *localProcess) ResizeConsole(ctx context.Context, x, y uint16) error { return errors.New("not supported") } @@ -280,3 +285,53 @@ func TestCmdStuckIo(t *testing.T) { t.Fatalf("expected: %v; got: %v", errIOTimeOut, err) } } + +// TestCmdAttach verifies that Attach binds a Cmd to a caller-supplied +// process and the resulting Cmd can be Wait'd to completion. Mirrors +// the migration restore path: caller obtains the process via the +// host's restore API (e.g. gcs.OpenProcessWithIO) and Attach wires IO. +func TestCmdAttach(t *testing.T) { + host := &localProcessHost{} + p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{ + CommandLine: "cmd /c exit /b 0", + }) + if err != nil { + t.Fatal(err) + } + + cmd, err := Attach(context.Background(), p, nil, nil, nil) + if err != nil { + t.Fatalf("Attach: %v", err) + } + if cmd.Process != p { + t.Fatal("Cmd.Process does not match the supplied process") + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait: %v", err) + } +} + +// TestCmdAttachIO verifies that Attach's IO relays flow process output +// to caller-supplied destination streams. +func TestCmdAttachIO(t *testing.T) { + host := &localProcessHost{} + p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{ + CommandLine: "cmd /c echo hello", + CreateStdOutPipe: true, + }) + if err != nil { + t.Fatal(err) + } + + var stdout bytes.Buffer + cmd, err := Attach(context.Background(), p, nil, &stdout, nil) + if err != nil { + t.Fatalf("Attach: %v", err) + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait: %v", err) + } + if got := stdout.String(); got != "hello\r\n" { + t.Fatalf("stdout=%q, want %q", got, "hello\r\n") + } +} diff --git a/internal/controller/process/mocks/mock_cow.go b/internal/controller/process/mocks/mock_cow.go index be3c0c63bd..9878fa25d9 100644 --- a/internal/controller/process/mocks/mock_cow.go +++ b/internal/controller/process/mocks/mock_cow.go @@ -115,6 +115,22 @@ func (mr *MockProcessMockRecorder) ExitCode() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExitCode", reflect.TypeOf((*MockProcess)(nil).ExitCode)) } +// IOPorts mocks base method. +func (m *MockProcess) IOPorts() (uint32, uint32, uint32) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IOPorts") + ret0, _ := ret[0].(uint32) + ret1, _ := ret[1].(uint32) + ret2, _ := ret[2].(uint32) + return ret0, ret1, ret2 +} + +// IOPorts indicates an expected call of IOPorts. +func (mr *MockProcessMockRecorder) IOPorts() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IOPorts", reflect.TypeOf((*MockProcess)(nil).IOPorts)) +} + // Kill mocks base method. func (m *MockProcess) Kill(ctx context.Context) (bool, error) { m.ctrl.T.Helper() diff --git a/internal/cow/cow.go b/internal/cow/cow.go index b60cd383b6..aadc82fa1d 100644 --- a/internal/cow/cow.go +++ b/internal/cow/cow.go @@ -27,6 +27,10 @@ type Process interface { CloseStderr(ctx context.Context) error // Pid returns the process ID. Pid() int + // IOPorts returns the host-side vsock ports allocated for this + // process's stdio relay, or zeros for hosts that don't use vsock + // (WCOW HCS, job containers). Used by the live-migration save path. + IOPorts() (stdin, stdout, stderr uint32) // Stdio returns the stdio streams for a process. These may be nil if a stream // was not requested during CreateProcess. Stdio() (_ io.Writer, _ io.Reader, _ io.Reader) diff --git a/internal/gcs/container.go b/internal/gcs/container.go index 549abd35a2..2bb30b8d56 100644 --- a/internal/gcs/container.go +++ b/internal/gcs/container.go @@ -5,6 +5,7 @@ package gcs import ( "context" "errors" + "fmt" "sync" "time" @@ -67,9 +68,9 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf return c, nil } -// CloneContainer just creates the wrappers and sets up notification requests for a -// container that is already running inside the UVM (after cloning). -func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) { +// OpenContainer attaches a host-side wrapper to a container already +// running inside the UVM. +func (gc *GuestConnection) OpenContainer(_ context.Context, cid string) (_ *Container, err error) { c := &Container{ gc: gc, id: cid, @@ -118,6 +119,66 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co return c.gc.exec(ctx, c.id, config) } +// OpenProcessWithIO is the live-migration restore counterpart of +// [Container.CreateProcess]: it attaches to a process already running +// in this container and re-listens on the supplied vsock ports. +func (c *Container) OpenProcessWithIO(ctx context.Context, pid uint32, stdinPort, stdoutPort, stderrPort uint32) (_ *Process, err error) { + ctx, span := oc.StartSpan(ctx, "gcs::Container::OpenProcessWithIO", oc.WithClientSpanKind) + defer span.End() + defer func() { oc.SetSpanStatus(span, err) }() + span.AddAttributes( + trace.StringAttribute("cid", c.id), + trace.Int64Attribute("pid", int64(pid))) + + p := &Process{ + gc: c.gc, + cid: c.id, + id: pid, + stdinPort: stdinPort, + stdoutPort: stdoutPort, + stderrPort: stderrPort, + } + defer func() { + if err != nil { + p.Close() + } + }() + + listen := func(port uint32) (*ioChannel, error) { + if port == 0 { + return nil, nil + } + l, err := c.gc.ioListenFn(port) + if err != nil { + return nil, fmt.Errorf("listen vsock port %d: %w", port, err) + } + return newIoChannel(l), nil + } + if p.stdin, err = listen(stdinPort); err != nil { + return nil, err + } + if p.stdout, err = listen(stdoutPort); err != nil { + return nil, err + } + if p.stderr, err = listen(stderrPort); err != nil { + return nil, err + } + + // Subscribe to the process exit notification. + waitReq := prot.ContainerWaitForProcess{ + RequestBase: makeRequest(ctx, c.id), + ProcessID: p.id, + TimeoutInMs: 0xffffffff, + } + p.waitCall, err = c.gc.brdg.AsyncRPC(ctx, prot.RPCWaitForProcess, &waitReq, &p.waitResp) + if err != nil { + return nil, fmt.Errorf("failed to wait on existing process pid %d in container %s: %w", pid, c.id, err) + } + go p.waitBackground() + log.G(ctx).WithField("pid", p.id).Debug("opened existing process with IO") + return p, nil +} + // ID returns the container's ID. func (c *Container) ID() string { return c.id diff --git a/internal/gcs/guestconnection.go b/internal/gcs/guestconnection.go index 35e6709d15..23d14e0c57 100644 --- a/internal/gcs/guestconnection.go +++ b/internal/gcs/guestconnection.go @@ -265,6 +265,27 @@ func (gc *GuestConnection) newIoChannel() (*ioChannel, uint32, error) { return newIoChannel(l), port, nil } +// SetNextPort raises the new-process IO port allocator floor. Called +// by the live-migration restore path after [Connect] to skip past +// vsock ports already in use by restored processes. Never goes +// backwards. +func (gc *GuestConnection) SetNextPort(p uint32) { + gc.mu.Lock() + defer gc.mu.Unlock() + if p > gc.nextPort { + gc.nextPort = p + } +} + +// NextPort returns the current allocator floor. Used by the +// live-migration save path to record what [SetNextPort] should be +// seeded with on the destination. +func (gc *GuestConnection) NextPort() uint32 { + gc.mu.Lock() + defer gc.mu.Unlock() + return gc.nextPort +} + func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error { gc.mu.Lock() defer gc.mu.Unlock() diff --git a/internal/gcs/process.go b/internal/gcs/process.go index 4c2428a657..8b114fb442 100644 --- a/internal/gcs/process.go +++ b/internal/gcs/process.go @@ -34,6 +34,9 @@ type Process struct { stdin, stdout, stderr *ioChannel stdinCloseWriteOnce sync.Once stdinCloseWriteErr error + // stdinPort, stdoutPort, stderrPort record the vsock ports that + // gc.exec allocated for this process's stdio relay. + stdinPort, stdoutPort, stderrPort uint32 } var _ cow.Process = &Process{} @@ -100,6 +103,9 @@ func (gc *GuestConnection) exec(ctx context.Context, cid string, params interfac g := winio.VsockServiceID(vsockSettings.StdErr) hvsockSettings.StdErr = &g } + // Snapshot the per-stream vsock ports so the live-migration snapshot + // can re-establish the same host-side listeners on the destination. + p.stdinPort, p.stdoutPort, p.stderrPort = vsockSettings.StdIn, vsockSettings.StdOut, vsockSettings.StdErr var resp prot.ContainerExecuteProcessResponse err = gc.brdg.RPC(ctx, prot.RPCExecuteProcess, &req, &resp, false) @@ -131,14 +137,20 @@ func (p *Process) Close() error { trace.StringAttribute("cid", p.cid), trace.Int64Attribute("pid", int64(p.id))) - if err := p.stdin.Close(); err != nil { - log.G(ctx).WithError(err).Warn("close stdin failed") + if p.stdin != nil { + if err := p.stdin.Close(); err != nil { + log.G(ctx).WithError(err).Warn("close stdin failed") + } } - if err := p.stdout.Close(); err != nil { - log.G(ctx).WithError(err).Warn("close stdout failed") + if p.stdout != nil { + if err := p.stdout.Close(); err != nil { + log.G(ctx).WithError(err).Warn("close stdout failed") + } } - if err := p.stderr.Close(); err != nil { - log.G(ctx).WithError(err).Warn("close stderr failed") + if p.stderr != nil { + if err := p.stderr.Close(); err != nil { + log.G(ctx).WithError(err).Warn("close stderr failed") + } } return nil } @@ -211,6 +223,13 @@ func (p *Process) Pid() int { return int(p.id) } +// IOPorts returns the host-side vsock ports allocated for this process's +// stdin/stdout/stderr relay (zero if the corresponding stream was not +// opened). +func (p *Process) IOPorts() (stdin, stdout, stderr uint32) { + return p.stdinPort, p.stdoutPort, p.stderrPort +} + // ResizeConsole requests that the pty associated with the process resize its // window. func (p *Process) ResizeConsole(ctx context.Context, width, height uint16) (err error) { diff --git a/internal/hcs/process.go b/internal/hcs/process.go index fef2bf546c..22e5313045 100644 --- a/internal/hcs/process.go +++ b/internal/hcs/process.go @@ -57,6 +57,12 @@ func (process *Process) Pid() int { return process.processID } +// IOPorts always returns zeros: HCS processes route stdio over named +// pipes, not vsock. Implemented to satisfy [cow.Process]. +func (process *Process) IOPorts() (stdin, stdout, stderr uint32) { + return 0, 0, 0 +} + // SystemID returns the ID of the process's compute system. func (process *Process) SystemID() string { return process.system.ID() diff --git a/internal/jobcontainers/process.go b/internal/jobcontainers/process.go index cf3318f3b7..4819f6a84d 100644 --- a/internal/jobcontainers/process.go +++ b/internal/jobcontainers/process.go @@ -197,6 +197,12 @@ func (p *JobProcess) Pid() int { return p.cmd.Pid() } +// IOPorts always returns zeros: job-container processes route stdio +// over OS pipes, not vsock. Implemented to satisfy [cow.Process]. +func (p *JobProcess) IOPorts() (stdin, stdout, stderr uint32) { + return 0, 0, 0 +} + // Close cleans up any state associated with the process but does not kill it. func (p *JobProcess) Close() error { p.stdioLock.Lock() diff --git a/internal/vm/guestmanager/guest.go b/internal/vm/guestmanager/guest.go index 54e0a56e53..42f67c9d4c 100644 --- a/internal/vm/guestmanager/guest.go +++ b/internal/vm/guestmanager/guest.go @@ -137,3 +137,30 @@ func (gm *Guest) CloseConnection() error { return err } + +// NextPort returns the active GCS connection's IO port allocator +// floor, or 0 if no connection is active. Used by the live-migration +// save path. +func (gm *Guest) NextPort() uint32 { + gm.mu.RLock() + defer gm.mu.RUnlock() + + if gm.gc == nil { + return 0 + } + return gm.gc.NextPort() +} + +// SetNextPort raises the active GCS connection's IO port allocator +// floor. No-op if no connection is active. Used by the live-migration +// restore path to skip past vsock ports already in use by restored +// processes. +func (gm *Guest) SetNextPort(p uint32) { + gm.mu.Lock() + defer gm.mu.Unlock() + + if gm.gc == nil { + return + } + gm.gc.SetNextPort(p) +} diff --git a/internal/vm/guestmanager/manager.go b/internal/vm/guestmanager/manager.go index 573a641399..8d942ed7ba 100644 --- a/internal/vm/guestmanager/manager.go +++ b/internal/vm/guestmanager/manager.go @@ -31,6 +31,21 @@ func (gm *Guest) CreateContainer(ctx context.Context, cid string, config interfa return c, nil } +// OpenContainer attaches a host-side wrapper to a container already +// running inside the UVM. Counterpart of [CreateContainer] for the +// live-migration restore path. +func (gm *Guest) OpenContainer(ctx context.Context, cid string) (*gcs.Container, error) { + gm.mu.Lock() + defer gm.mu.Unlock() + + c, err := gm.gc.OpenContainer(ctx, cid) + if err != nil { + return nil, fmt.Errorf("failed to open container %s: %w", cid, err) + } + + return c, nil +} + // DumpStacks requests a stack dump from the guest and returns it as a string. func (gm *Guest) DumpStacks(ctx context.Context) (string, error) { gm.mu.Lock() diff --git a/internal/vm/vmmanager/lifetime.go b/internal/vm/vmmanager/lifetime.go index f09459d5cf..03743939c4 100644 --- a/internal/vm/vmmanager/lifetime.go +++ b/internal/vm/vmmanager/lifetime.go @@ -87,6 +87,17 @@ func (uvm *UtilityVM) PropertiesV2(ctx context.Context, types ...hcsschema.Prope return props, nil } +// PropertiesV3 returns the properties of the utility VM from HCS using the V2 +// property query schema. +func (uvm *UtilityVM) PropertiesV3(ctx context.Context, query *hcsschema.PropertyQuery) (*hcsschema.Properties, error) { + props, err := uvm.cs.PropertiesV3(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to get properties from HCS: %w", err) + } + + return props, nil +} + // StartedTime returns the time when the utility VM entered the running state. func (uvm *UtilityVM) StartedTime() time.Time { return uvm.cs.StartedTime() diff --git a/internal/vm/vmmanager/migration.go b/internal/vm/vmmanager/migration.go new file mode 100644 index 0000000000..c2cfa18360 --- /dev/null +++ b/internal/vm/vmmanager/migration.go @@ -0,0 +1,65 @@ +//go:build windows && (lcow || wcow) + +package vmmanager + +import ( + "context" + "fmt" + + "github.com/Microsoft/hcsshim/internal/hcs" + hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2" +) + +// StartWithMigrationOptions starts the utility VM as a live migration destination +// using the provided migration configuration. +func (uvm *UtilityVM) StartWithMigrationOptions(ctx context.Context, config *hcs.MigrationConfig) error { + if err := uvm.cs.StartWithMigrationOptions(ctx, config); err != nil { + return fmt.Errorf("failed to start utility VM with migration options: %w", err) + } + return nil +} + +// InitializeLiveMigrationOnSource initializes a live migration on the source side +// of the utility VM with the provided options. +func (uvm *UtilityVM) InitializeLiveMigrationOnSource(ctx context.Context, options *hcsschema.MigrationInitializeOptions) error { + if err := uvm.cs.InitializeLiveMigrationOnSource(ctx, options); err != nil { + return fmt.Errorf("failed to initialize live migration on source: %w", err) + } + return nil +} + +// StartLiveMigrationOnSource starts the live migration on the source side using +// the provided transport socket and session ID. +func (uvm *UtilityVM) StartLiveMigrationOnSource(ctx context.Context, config *hcs.MigrationConfig) error { + if err := uvm.cs.StartLiveMigrationOnSource(ctx, config); err != nil { + return fmt.Errorf("failed to start live migration on source: %w", err) + } + return nil +} + +// StartLiveMigrationTransfer starts the memory transfer phase of a live migration. +func (uvm *UtilityVM) StartLiveMigrationTransfer(ctx context.Context, options *hcsschema.MigrationTransferOptions) error { + if err := uvm.cs.StartLiveMigrationTransfer(ctx, options); err != nil { + return fmt.Errorf("failed to start live migration transfer: %w", err) + } + return nil +} + +// FinalizeLiveMigration completes the live migration workflow. If resume is true +// the utility VM is resumed; otherwise it is stopped. +func (uvm *UtilityVM) FinalizeLiveMigration(ctx context.Context, resume bool) error { + if err := uvm.cs.FinalizeLiveMigration(ctx, resume); err != nil { + return fmt.Errorf("failed to finalize live migration: %w", err) + } + return nil +} + +// MigrationNotifications returns a read-only channel that receives live migration +// event payloads for the utility VM. +func (uvm *UtilityVM) MigrationNotifications() (<-chan hcsschema.OperationSystemMigrationNotificationInfo, error) { + ch, err := uvm.cs.MigrationNotifications() + if err != nil { + return nil, fmt.Errorf("failed to get migration notifications channel: %w", err) + } + return ch, nil +}