Skip to content

Commit

Permalink
fix(region,host): manual probe isolated devices only update usb and c…
Browse files Browse the repository at this point in the history
…ustom pci devices
  • Loading branch information
wanyaoqi committed Dec 17, 2024
1 parent c5747b7 commit ea2b3b3
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/compute/hostdrivers/kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,9 @@ func (driver *SKVMHostDriver) RequestProbeIsolatedDevices(ctx context.Context, u
url := fmt.Sprintf("%s/hosts/%s/probe-isolated-devices", host.ManagerUri, host.GetId())
httpClient := httputils.GetDefaultClient()
header := mcclient.GetTokenHeaders(userCred)
_, respBody, err := httputils.JSONRequest(httpClient, ctx, "POST", url, header, input, false)
_, _, err := httputils.JSONRequest(httpClient, ctx, "POST", url, header, input, false)
if err != nil {
return nil, errors.Wrapf(err, "send to host %s", url)
}
return respBody.(*jsonutils.JSONArray), err
return nil, nil
}
20 changes: 6 additions & 14 deletions pkg/compute/models/guest_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6273,25 +6273,17 @@ func (self *SGuest) PerformProbeIsolatedDevices(ctx context.Context, userCred mc
if err != nil {
return nil, errors.Wrapf(err, "GetHostDriver")
}
hostDevs, err := driver.RequestProbeIsolatedDevices(ctx, userCred, host, data)
_, err = driver.RequestProbeIsolatedDevices(ctx, userCred, host, data)
if err != nil {
return nil, errors.Wrap(err, "RequestProbeIsolatedDevices")
}
objs, err := hostDevs.GetArray()
hostDevs, err := host.GetIsolateDevices()
if err != nil {
return nil, errors.Wrapf(err, "GetArray from %q", hostDevs)
return nil, errors.Wrapf(err, "GetIsolateDevices")
}
devs := make([]*SIsolatedDevice, 0)
for _, obj := range objs {
id, err := obj.GetString("id")
if err != nil {
return nil, errors.Wrapf(err, "device %s", obj)
}
devObj, err := IsolatedDeviceManager.FetchById(id)
if err != nil {
return nil, errors.Wrapf(err, "FetchById %q", id)
}
dev := devObj.(*SIsolatedDevice)
devs := make([]SIsolatedDevice, 0)
for i := range hostDevs {
dev := hostDevs[i]
if dev.GuestId == "" {
devs = append(devs, dev)
}
Expand Down
48 changes: 47 additions & 1 deletion pkg/compute/models/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7122,12 +7122,58 @@ func (manager *SHostManager) InitializeData() error {
return manager.initHostname()
}

func (hh *SHost) GetCustomIsolatedDevices() ([]SIsolatedDevice, error) {
hidmsq := HostIsolatedDeviceModelManager.Query().Equals("host_id", hh.Id).SubQuery()
idmq := IsolatedDeviceModelManager.Query("dev_type")
idmq = idmq.Join(hidmsq, sqlchemy.Equals(idmq.Field("id"), hidmsq.Field("isolated_device_model_id")))
q := IsolatedDeviceManager.Query().Equals("host_id", hh.Id).In("dev_type", idmq.SubQuery())
ret := []SIsolatedDevice{}
err := db.FetchModelObjects(IsolatedDeviceManager, q, &ret)
if err != nil {
return nil, err
}
return ret, nil
}

func (hh *SHost) GetUsbIsolatedDevices() ([]SIsolatedDevice, error) {
q := IsolatedDeviceManager.Query().Equals("dev_type", api.USB_TYPE)
ret := []SIsolatedDevice{}
err := db.FetchModelObjects(IsolatedDeviceManager, q, &ret)
if err != nil {
return nil, err
}
return ret, nil
}

func (hh *SHost) PerformProbeIsolatedDevices(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
driver, err := hh.GetHostDriver()
if err != nil {
return nil, errors.Wrapf(err, "GetHostDriver")
}
return driver.RequestProbeIsolatedDevices(ctx, userCred, hh, data)
// probe usb and custom pci devices
customDevs, err := hh.GetCustomIsolatedDevices()
if err != nil {
return nil, errors.Wrap(err, "GetCustomIsolatedDevices")
}
usbDevs, err := hh.GetUsbIsolatedDevices()
if err != nil {
return nil, errors.Wrap(err, "GetUsbIsolatedDevices")
}
devs := make([]SIsolatedDevice, 0)
devs = append(devs, customDevs...)
devs = append(devs, usbDevs...)

input := jsonutils.NewDict()
input.Set("registed_devs", jsonutils.Marshal(devs))
_, err = driver.RequestProbeIsolatedDevices(ctx, userCred, hh, input)
if err != nil {
return nil, err
}
newDevs, err := hh.GetIsolateDevices()
if err != nil {
return nil, err
}
return jsonutils.Marshal(newDevs), nil
}

func (hh *SHost) PerformSyncIsolatedDevices(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
Expand Down
56 changes: 55 additions & 1 deletion pkg/hostman/hostinfo/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,61 @@ func (h *SHostInfo) ProbeSyncIsolatedDevices(hostId string, body jsonutils.JSONO
if h.GetHostId() != hostId {
return nil, nil
}
return h.probeSyncIsolatedDevices()
registedCloudDevs := make([]isolated_device.CloudDeviceInfo, 0)
err := body.Unmarshal(&registedCloudDevs, "registed_devs")
if err != nil {
return nil, errors.Wrap(err, "Parse input registed_devs")
}

// remove registedDevs from isolated device mananger first
for _, info := range registedCloudDevs {
h.IsolatedDeviceMan.RemoveDeviceByIdent(info.VendorDeviceId, info.Addr, info.MdevId)
}

devs, err := h.IsolatedDeviceMan.ProbeUsbAndCustomPCIDevs()
if err != nil {
return nil, err
}

var devsNeedUpdate = map[string]bool{}
for _, info := range registedCloudDevs {
dev := h.IsolatedDeviceMan.GetDeviceByIdent(info.VendorDeviceId, info.Addr, info.MdevId)
if dev != nil {
dev.SetDeviceInfo(info)
devsNeedUpdate[dev.GetCloudId()] = h.IsolatedDeviceMan.CheckDevIsNeedUpdate(dev, &info)
} else {
// detach device
h.IsolatedDeviceMan.AppendDetachedDevice(&info)
}
}
h.IsolatedDeviceMan.StartDetachTask()
// no need custom probe
//h.IsolatedDeviceMan.BatchCustomProbe()

// sync each isolated device found
eg := errgroup.Group{}
// limits the number of active goroutines in this group to at most
eg.SetLimit(16)
for i := range devs {
dev := devs[i]
eg.Go(func() error {
needUpdate := false
if need, ok := devsNeedUpdate[dev.GetCloudId()]; !ok || need {
needUpdate = true
}

if _, err := isolated_device.SyncDeviceInfo(h.GetSession(), h.HostId, dev, needUpdate); err != nil {
log.Errorf("Sync deviceInfo %s error: %v", dev.String(), err)
return errors.Wrapf(err, "Sync device %s", dev.String())
} else {
return nil
}
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return nil, nil
}

func (h *SHostInfo) setHostname(name string) {
Expand Down
63 changes: 63 additions & 0 deletions pkg/hostman/isolated_device/isolated_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"

"yunion.io/x/jsonutils"
Expand Down Expand Up @@ -146,32 +147,39 @@ type IDevice interface {
type IsolatedDeviceManager interface {
GetDevices() []IDevice
GetDeviceByIdent(vendorDevId, addr, mdevId string) IDevice
RemoveDeviceByIdent(vendorDevId, addr, mdevId string)
GetDeviceByAddr(addr string) IDevice
ProbePCIDevices(skipGPUs, skipUSBs, skipCustomDevs bool, sriovNics, ovsOffloadNics []HostNic, nvmePciDisks, amdVgpuPFs, nvidiaVgpuPFs []string, enableCudaMps, enableContainerNPU, enableWhitelist bool)
StartDetachTask()
BatchCustomProbe()
AppendDetachedDevice(dev *CloudDeviceInfo)
GetQemuParams(devAddrs []string) *QemuParams
CheckDevIsNeedUpdate(dev IDevice, devInfo *CloudDeviceInfo) bool
ProbeUsbAndCustomPCIDevs() ([]IDevice, error)
}

type isolatedDeviceManager struct {
host IHost
devices []IDevice
DetachedDevices []*CloudDeviceInfo

lock *sync.Mutex
}

func NewManager(host IHost) IsolatedDeviceManager {
man := &isolatedDeviceManager{
host: host,
devices: make([]IDevice, 0),
DetachedDevices: make([]*CloudDeviceInfo, 0),
lock: new(sync.Mutex),
}
// Do probe later - Qiu Jian
return man
}

func (man *isolatedDeviceManager) GetDevices() []IDevice {
man.lock.Lock()
defer man.lock.Unlock()
return man.devices
}

Expand Down Expand Up @@ -328,6 +336,38 @@ func (man *isolatedDeviceManager) probeUSBs(skipUSBs bool) {
}
}

func (man *isolatedDeviceManager) ProbeUsbAndCustomPCIDevs() ([]IDevice, error) {
man.lock.Lock()
defer man.lock.Unlock()

devModels, err := man.getCustomIsolatedDeviceModels()
if err != nil {
return nil, errors.Wrap(err, "getCustomIsolatedDeviceModels")
}

ret := make([]IDevice, 0)
for _, devModel := range devModels {
devs, err := getPassthroughPCIDevs(devModel, GpuClassCodes)
if err != nil {
return nil, errors.Wrap(err, "getPassthroughPCIDevs")
}
for i, dev := range devs {
ret = append(ret, dev)
log.Infof("Add general pci device: %d => %#v", i, dev)
}
}

usbs, err := getPassthroughUSBs()
if err != nil {
return nil, errors.Wrap(err, "getPassthroughUSBs")
}
for idx, usb := range usbs {
ret = append(ret, usb)
log.Infof("Add USB device: %d => %#v", idx, usb)
}
return ret, nil
}

type HostNic struct {
Bridge string
Interface string
Expand Down Expand Up @@ -428,6 +468,9 @@ func (man *isolatedDeviceManager) probeNVIDIAVgpus(nvidiaVgpuPFs []string) {
}

func (man *isolatedDeviceManager) ProbePCIDevices(skipGPUs, skipUSBs, skipCustomDevs bool, sriovNics, ovsOffloadNics []HostNic, nvmePciDisks, amdVgpuPFs, nvidiaVgpuPFs []string, enableCudaMps, enableContainerNPU, enableWhitelist bool) {
man.lock.Lock()
defer man.lock.Unlock()

man.devices = make([]IDevice, 0)
if man.host.IsContainerHost() {
man.probeContainerNvidiaGPUs(enableCudaMps)
Expand Down Expand Up @@ -528,7 +571,25 @@ func (man *isolatedDeviceManager) CheckDevIsNeedUpdate(dev IDevice, devInfo *Clo
return false
}

func (man *isolatedDeviceManager) RemoveDeviceByIdent(vendorDevId, addr, mdevId string) {
man.lock.Lock()
defer man.lock.Unlock()
idx := -1
for i := range man.devices {
dev := man.devices[i]
if dev.GetVendorDeviceId() == vendorDevId && dev.GetAddr() == addr && dev.GetMdevId() == mdevId {
idx = i
break
}
}
if idx >= 0 {
man.devices = append(man.devices[:idx], man.devices[idx+1:]...)
}
}

func (man *isolatedDeviceManager) GetDeviceByIdent(vendorDevId, addr, mdevId string) IDevice {
man.lock.Lock()
defer man.lock.Unlock()
for _, dev := range man.devices {
if dev.GetVendorDeviceId() == vendorDevId && dev.GetAddr() == addr && dev.GetMdevId() == mdevId {
return dev
Expand All @@ -547,6 +608,8 @@ func (man *isolatedDeviceManager) GetDeviceByVendorDevId(vendorDevId string) IDe
}

func (man *isolatedDeviceManager) GetDeviceByAddr(addr string) IDevice {
man.lock.Lock()
defer man.lock.Unlock()
for _, dev := range man.devices {
if dev.GetAddr() == addr {
return dev
Expand Down

0 comments on commit ea2b3b3

Please sign in to comment.