218 lines
6.3 KiB
Go
218 lines
6.3 KiB
Go
package qmp
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/digitalocean/go-qemu/qmp"
|
|
"github.com/samber/mo"
|
|
)
|
|
|
|
type Client struct {
|
|
monitor qmp.Monitor
|
|
}
|
|
|
|
type VMStatus struct {
|
|
Running bool
|
|
Singlestep bool
|
|
Status string
|
|
}
|
|
|
|
func Connect(socketPath string) mo.Result[*Client] {
|
|
monitor, err := qmp.NewSocketMonitor("unix", socketPath, 2*time.Second)
|
|
if err != nil {
|
|
return mo.Err[*Client](fmt.Errorf("failed to create socket monitor: %w", err))
|
|
}
|
|
|
|
if err := monitor.Connect(); err != nil {
|
|
return mo.Err[*Client](fmt.Errorf("failed to connect to QMP socket: %w", err))
|
|
}
|
|
|
|
return mo.Ok(&Client{monitor: monitor})
|
|
}
|
|
|
|
func (c *Client) Status() mo.Result[VMStatus] {
|
|
type statusResult struct {
|
|
ID string `json:"id"`
|
|
Return struct {
|
|
Running bool `json:"running"`
|
|
Singlestep bool `json:"singlestep"`
|
|
Status string `json:"status"`
|
|
} `json:"return"`
|
|
}
|
|
|
|
cmd := []byte(`{"execute":"query-status"}`)
|
|
raw, err := c.monitor.Run(cmd)
|
|
if err != nil {
|
|
return mo.Err[VMStatus](fmt.Errorf("failed to execute query-status: %w", err))
|
|
}
|
|
|
|
var result statusResult
|
|
if err := json.Unmarshal(raw, &result); err != nil {
|
|
return mo.Err[VMStatus](fmt.Errorf("failed to parse status response: %w", err))
|
|
}
|
|
|
|
return mo.Ok(VMStatus{
|
|
Running: result.Return.Running,
|
|
Singlestep: result.Return.Singlestep,
|
|
Status: result.Return.Status,
|
|
})
|
|
}
|
|
|
|
func (c *Client) Shutdown() mo.Result[struct{}] {
|
|
cmd := []byte(`{"execute":"system_powerdown"}`)
|
|
_, err := c.monitor.Run(cmd)
|
|
if err != nil {
|
|
return mo.Err[struct{}](fmt.Errorf("failed to execute system_powerdown: %w", err))
|
|
}
|
|
|
|
return mo.Ok(struct{}{})
|
|
}
|
|
|
|
func (c *Client) Close() error {
|
|
return c.monitor.Disconnect()
|
|
}
|
|
|
|
// AddChardev adds a vhost-user-fs chardev for a virtiofsd socket.
|
|
// This is the first step of hot-mounting a filesystem.
|
|
func (c *Client) AddChardev(id, socketPath string) mo.Result[struct{}] {
|
|
cmd := fmt.Sprintf(`{"execute":"chardev-add","arguments":{"id":"%s","backend":{"type":"socket","data":{"addr":{"type":"unix","data":{"path":"%s"}},"server":false}}}}`, id, socketPath)
|
|
raw, err := c.monitor.Run([]byte(cmd))
|
|
if err != nil {
|
|
return mo.Err[struct{}](fmt.Errorf("failed to add chardev %s: %w", id, err))
|
|
}
|
|
|
|
// Check for error in response
|
|
var resp map[string]interface{}
|
|
if err := json.Unmarshal(raw, &resp); err == nil {
|
|
if errObj, ok := resp["error"]; ok {
|
|
return mo.Err[struct{}](fmt.Errorf("QMP error adding chardev: %v", errObj))
|
|
}
|
|
}
|
|
|
|
return mo.Ok(struct{}{})
|
|
}
|
|
|
|
// AddVhostUserFsDevice adds a vhost-user-fs-pci device connected to a chardev,
|
|
// targeting a hotplug-capable PCIe root port bus.
|
|
func (c *Client) AddVhostUserFsDevice(chardevID, tag, bus string) mo.Result[struct{}] {
|
|
// Device ID must be unique, derive from chardev ID
|
|
deviceID := "dev_" + chardevID
|
|
cmd := fmt.Sprintf(`{"execute":"device_add","arguments":{"driver":"vhost-user-fs-pci","chardev":"%s","tag":"%s","id":"%s","queue-size":1024,"bus":"%s"}}`, chardevID, tag, deviceID, bus)
|
|
raw, err := c.monitor.Run([]byte(cmd))
|
|
if err != nil {
|
|
return mo.Err[struct{}](fmt.Errorf("failed to add vhost-user-fs device for %s: %w", tag, err))
|
|
}
|
|
|
|
// Check for error in response
|
|
var resp map[string]interface{}
|
|
if err := json.Unmarshal(raw, &resp); err == nil {
|
|
if errObj, ok := resp["error"]; ok {
|
|
return mo.Err[struct{}](fmt.Errorf("QMP error adding device: %v", errObj))
|
|
}
|
|
}
|
|
|
|
return mo.Ok(struct{}{})
|
|
}
|
|
|
|
// FindFreeHotplugBus queries PCI devices and returns the first hotplug root port
|
|
// that has no child devices attached. Returns an error if all slots are occupied.
|
|
func (c *Client) FindFreeHotplugBus(busPrefix string, slotCount int) mo.Result[string] {
|
|
// Query all PCI devices to find which buses are occupied
|
|
cmd := []byte(`{"execute":"query-pci"}`)
|
|
raw, err := c.monitor.Run(cmd)
|
|
if err != nil {
|
|
return mo.Err[string](fmt.Errorf("failed to query PCI devices: %w", err))
|
|
}
|
|
|
|
// Parse to find which hotplug buses have devices
|
|
var resp struct {
|
|
Return []struct {
|
|
Devices []struct {
|
|
Bus int `json:"bus"`
|
|
QdevID string `json:"qdev_id"`
|
|
PCIBridge *struct {
|
|
Bus struct {
|
|
Number int `json:"number"`
|
|
} `json:"bus"`
|
|
Devices []json.RawMessage `json:"devices"`
|
|
} `json:"pci_bridge"`
|
|
} `json:"devices"`
|
|
} `json:"return"`
|
|
}
|
|
if err := json.Unmarshal(raw, &resp); err != nil {
|
|
return mo.Err[string](fmt.Errorf("failed to parse PCI response: %w", err))
|
|
}
|
|
|
|
// Build set of occupied hotplug buses
|
|
occupied := make(map[string]bool)
|
|
for _, bus := range resp.Return {
|
|
for _, dev := range bus.Devices {
|
|
if dev.PCIBridge != nil && len(dev.PCIBridge.Devices) > 0 {
|
|
occupied[dev.QdevID] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Find first free hotplug slot
|
|
for i := 0; i < slotCount; i++ {
|
|
busID := fmt.Sprintf("%s%d", busPrefix, i)
|
|
if !occupied[busID] {
|
|
return mo.Ok(busID)
|
|
}
|
|
}
|
|
|
|
return mo.Err[string](fmt.Errorf("all %d hotplug slots are occupied", slotCount))
|
|
}
|
|
|
|
// HotMountFilesystem performs a complete hot-mount of a virtiofsd filesystem.
|
|
// Requires the virtiofsd daemon to already be running and listening on socketPath.
|
|
// Finds a free PCIe hotplug root port and attaches the device there.
|
|
func (c *Client) HotMountFilesystem(tag, socketPath, busPrefix string, slotCount int) mo.Result[struct{}] {
|
|
// Find a free hotplug bus
|
|
busResult := c.FindFreeHotplugBus(busPrefix, slotCount)
|
|
if busResult.IsError() {
|
|
return mo.Err[struct{}](busResult.Error())
|
|
}
|
|
bus := busResult.MustGet()
|
|
|
|
// Use tag as chardev ID for simplicity
|
|
chardevID := tag
|
|
|
|
// Step 1: Add chardev
|
|
if result := c.AddChardev(chardevID, socketPath); result.IsError() {
|
|
return result
|
|
}
|
|
|
|
// Step 2: Add device on the hotplug bus
|
|
if result := c.AddVhostUserFsDevice(chardevID, tag, bus); result.IsError() {
|
|
return result
|
|
}
|
|
|
|
return mo.Ok(struct{}{})
|
|
}
|
|
|
|
// QueryChardevs lists all current chardevs to check if one exists.
|
|
func (c *Client) QueryChardevs() mo.Result[[]string] {
|
|
cmd := []byte(`{"execute":"query-chardev"}`)
|
|
raw, err := c.monitor.Run(cmd)
|
|
if err != nil {
|
|
return mo.Err[[]string](fmt.Errorf("failed to query chardevs: %w", err))
|
|
}
|
|
|
|
var resp struct {
|
|
Return []struct {
|
|
Label string `json:"label"`
|
|
} `json:"return"`
|
|
}
|
|
if err := json.Unmarshal(raw, &resp); err != nil {
|
|
return mo.Err[[]string](fmt.Errorf("failed to parse chardev response: %w", err))
|
|
}
|
|
|
|
var labels []string
|
|
for _, c := range resp.Return {
|
|
labels = append(labels, c.Label)
|
|
}
|
|
return mo.Ok(labels)
|
|
}
|