Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(server): reduce importer transaction time #1380

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 114 additions & 98 deletions server/internal/usecase/interactor/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,77 +365,89 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope
return interfaces.ImportItemsResponse{}, interfaces.ErrInvalidOperator
}

return Run1(ctx, operator, i.repos, Usecase().Transaction(), func(ctx context.Context) (interfaces.ImportItemsResponse, error) {
s := param.SP.Schema()
if !operator.IsWritableWorkspace(s.Workspace()) {
return interfaces.ImportItemsResponse{}, interfaces.ErrOperationDenied
}
res := NewImportRes()
s := param.SP.Schema()
if !operator.IsWritableWorkspace(s.Workspace()) {
return interfaces.ImportItemsResponse{}, interfaces.ErrOperationDenied
}
res := NewImportRes()

prj, err := i.repos.Project.FindByID(ctx, s.Project())
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
prj, err := i.repos.Project.FindByID(ctx, s.Project())
if err != nil {
return interfaces.ImportItemsResponse{}, err
}

m, err := i.repos.Model.FindByID(ctx, param.ModelID)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
m, err := i.repos.Model.FindByID(ctx, param.ModelID)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}

itemsIds := lo.FilterMap(param.Items, func(i interfaces.ImportItemParam, _ int) (item.ID, bool) {
if i.ItemId != nil {
return *i.ItemId, true
}
return item.ID{}, false
})
oldItems, err := i.repos.Item.FindByIDs(ctx, itemsIds, nil)
if err != nil {
return interfaces.ImportItemsResponse{}, err
itemsIds := lo.FilterMap(param.Items, func(i interfaces.ImportItemParam, _ int) (item.ID, bool) {
if i.ItemId != nil {
return *i.ItemId, true
}
return item.ID{}, false
})
oldItems, err := i.repos.Item.FindByIDs(ctx, itemsIds, nil)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}

isMetadata := false
if m.Metadata() != nil && s.ID() == *m.Metadata() {
isMetadata = true
metaItemsIds := lo.FilterMap(param.Items, func(i interfaces.ImportItemParam, _ int) (item.ID, bool) {
if i.MetadataID != nil {
return *i.MetadataID, true
}
return item.ID{}, false
})
oldMetaItems, err := i.repos.Item.FindByIDs(ctx, metaItemsIds, nil)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}

threadsToSave := thread.List{}
itemsToSave := item.List{}

type itemChanges struct {
oldFields item.Fields
action interfaces.ImportStrategyType
}
itemsEvent := map[item.ID]itemChanges{}
isMetadata := false
if m.Metadata() != nil && s.ID() == *m.Metadata() {
isMetadata = true
}

// update schema if needed
if param.MutateSchema && len(param.Fields) > 0 {
for _, fieldParam := range param.Fields {
if fieldParam.Key == "" || s.HasFieldByKey(fieldParam.Key) {
return interfaces.ImportItemsResponse{}, schema.ErrInvalidKey
}
threadsToSave := thread.List{}
itemsToSave := item.List{}

f, err := schema.NewFieldWithDefaultProperty(fieldParam.Type).
NewID().
Unique(fieldParam.Unique).
Multiple(fieldParam.Multiple).
Required(fieldParam.Required).
Name(fieldParam.Name).
Description(lo.FromPtr(fieldParam.Description)).
Key(id.NewKey(fieldParam.Key)).
DefaultValue(fieldParam.DefaultValue).
Build()
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
type itemChanges struct {
oldFields item.Fields
action interfaces.ImportStrategyType
}
itemsEvent := map[item.ID]itemChanges{}

s.AddField(f)
res.FieldAdded(f)
// update schema if needed
if param.MutateSchema && len(param.Fields) > 0 {
for _, fieldParam := range param.Fields {
if fieldParam.Key == "" || s.HasFieldByKey(fieldParam.Key) {
return interfaces.ImportItemsResponse{}, schema.ErrInvalidKey
}
err = i.repos.Schema.Save(ctx, s)

f, err := schema.NewFieldWithDefaultProperty(fieldParam.Type).
NewID().
Unique(fieldParam.Unique).
Multiple(fieldParam.Multiple).
Required(fieldParam.Required).
Name(fieldParam.Name).
Description(lo.FromPtr(fieldParam.Description)).
Key(id.NewKey(fieldParam.Key)).
DefaultValue(fieldParam.DefaultValue).
Build()
if err != nil {
return interfaces.ImportItemsResponse{}, err
}

s.AddField(f)
res.FieldAdded(f)
}
err = i.repos.Schema.Save(ctx, s)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
}

f := func(ctx context.Context) (interfaces.ImportItemsResponse, error) {

for _, itemParam := range param.Items {

Expand Down Expand Up @@ -517,10 +529,7 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope

var mi item.Versioned
if itemParam.MetadataID != nil {
mi, err = i.repos.Item.FindByID(ctx, *itemParam.MetadataID, nil)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
mi = oldMetaItems.Item(*itemParam.MetadataID)
if m.Metadata() == nil || *m.Metadata() != mi.Value().Schema() {
return interfaces.ImportItemsResponse{}, interfaces.ErrMetadataMismatch
}
Expand Down Expand Up @@ -587,50 +596,57 @@ func (i Item) Import(ctx context.Context, param interfaces.ImportItemsParam, ope
return interfaces.ImportItemsResponse{}, err
}

// TODO: create ItemsImported event
items, err := i.repos.Item.FindByIDs(ctx, lo.Keys(itemsEvent), nil)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
var events []Event
return res.Into(), nil
}

for k, changes := range itemsEvent {
vi := items.Item(k)
it := vi.Value()
runRes, err := Run1(ctx, operator, i.repos, Usecase().Transaction(), f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix ineffectual error assignment.

The error assignment on line 602 is ineffectual as it's immediately overwritten.

-runRes, err := Run1(ctx, operator, i.repos, Usecase().Transaction(), f)
+runRes, runErr := Run1(ctx, operator, i.repos, Usecase().Transaction(), f)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
runRes, err := Run1(ctx, operator, i.repos, Usecase().Transaction(), f)
runRes, runErr := Run1(ctx, operator, i.repos, Usecase().Transaction(), f)
🧰 Tools
🪛 golangci-lint (1.62.2)

602-602: ineffectual assignment to err

(ineffassign)

🪛 GitHub Check: ci-server / lint

[failure] 602-602:
ineffectual assignment to err (ineffassign)

if err != nil {
return interfaces.ImportItemsResponse{}, err
}

refItems, err := i.getReferencedItems(ctx, it.Fields())
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
// TODO: create ItemsImported event
items, err := i.repos.Item.FindByIDs(ctx, lo.Keys(itemsEvent), nil)
if err != nil {
return interfaces.ImportItemsResponse{}, err
}
var events []Event

var eType event.Type
if changes.action == interfaces.ImportStrategyTypeInsert {
eType = event.ItemCreate
} else {
eType = event.ItemUpdate
}
events = append(events, Event{
Project: prj,
Workspace: s.Workspace(),
Type: eType,
Object: vi,
WebhookObject: item.ItemModelSchema{
Item: vi.Value(),
Model: m,
Schema: s,
GroupSchemas: param.SP.GroupSchemas(),
ReferencedItems: refItems,
Changes: item.CompareFields(it.Fields(), changes.oldFields),
},
Operator: operator.Operator(),
})
}
if err := i.events(ctx, events); err != nil {
for k, changes := range itemsEvent {
vi := items.Item(k)
it := vi.Value()

refItems, err := i.getReferencedItems(ctx, it.Fields())
if err != nil {
return interfaces.ImportItemsResponse{}, err
}

return res.Into(), nil
})
var eType event.Type
if changes.action == interfaces.ImportStrategyTypeInsert {
eType = event.ItemCreate
} else {
eType = event.ItemUpdate
}
events = append(events, Event{
Project: prj,
Workspace: s.Workspace(),
Type: eType,
Object: vi,
WebhookObject: item.ItemModelSchema{
Item: vi.Value(),
Model: m,
Schema: s,
GroupSchemas: param.SP.GroupSchemas(),
ReferencedItems: refItems,
Changes: item.CompareFields(it.Fields(), changes.oldFields),
},
Operator: operator.Operator(),
})
}
if err := i.events(ctx, events); err != nil {
return interfaces.ImportItemsResponse{}, err
}

return runRes, err
}

func (i Item) Import2(ctx context.Context, aId id.AssetID, mId id.ModelID, format, strategy, geoFieldKey string, mutateSchema bool, operator *usecase.Operator) error {
Expand Down