Skip to content

Commit

Permalink
Merge pull request #234 from Abirdcfly/know
Browse files Browse the repository at this point in the history
fix: knowledgebase should use versioneddataset
  • Loading branch information
nkwangleiGIT authored Nov 20, 2023
2 parents 793d8b8 + 11fe393 commit 3d5d17b
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 129 deletions.
5 changes: 3 additions & 2 deletions api/v1alpha1/knowledgebase_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type KnowledgeBaseSpec struct {
// VectorStore defines the vectorstore to store results
VectorStore *TypedObjectReference `json:"vectorStore,omitempty"`

// FileGroups included files Grouped by Datasource
// FileGroups included files Grouped by VersionedDataset
FileGroups []FileGroup `json:"fileGroups,omitempty"`
}

type FileGroupDetail struct {
// From defines the source which provides these files
// From defines the datasource which provides these files
Source *TypedObjectReference `json:"source,omitempty"`

// FileDetails is the detail files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ spec:
- name
type: object
fileGroups:
description: FileGroups included files Grouped by Datasource
description: FileGroups included files Grouped by VersionedDataset
items:
properties:
paths:
Expand Down Expand Up @@ -197,7 +197,8 @@ spec:
type: object
type: array
source:
description: From defines the source which provides these files
description: From defines the datasource which provides these
files
properties:
apiGroup:
description: APIGroup is the group for the resource being
Expand Down
14 changes: 14 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,20 @@ rules:
- get
- patch
- update
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- versioneddataset
verbs:
- get
- list
- watch
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- versioneddataset/status
verbs:
- get
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
Expand Down
8 changes: 6 additions & 2 deletions config/samples/arcadia_v1alpha1_dataset.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1
kind: Dataset
metadata:
name: dataset-sample
name: dataset-playground
namespace: arcadia
spec:
# TODO(user): Add fields here
contentType: text
description: test dataset management
displayName: dataset example
field: finance
9 changes: 3 additions & 6 deletions config/samples/arcadia_v1alpha1_knowledgebase.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ spec:
displayName: "测试 KnowledgeBase"
description: "测试 KnowledgeBase"
embedder:
apiGroup: arcadia.kubeagi.k8s.com.cn/v1alpha1
kind: Embedders
name: zhipuai-embedders-sample
namespace: arcadia
vectorStore:
apiGroup: arcadia.kubeagi.k8s.com.cn/v1alpha1
kind: VectorStores
name: chroma-sample
namespace: arcadia
fileGroups:
- source:
apiGroup: arcadia.kubeagi.k8s.com.cn/v1alpha1
kind: Datasources
name: arcadia-local
kind: VersionedDataset
name: dataset-playground-v1
namespace: arcadia
paths:
- example-test/knowledgebase-1.txt
- dataset/dataset-playground/v1/qa.csv
17 changes: 15 additions & 2 deletions config/samples/arcadia_v1alpha1_versioneddataset.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1
kind: VersionedDataset
metadata:
name: versioneddataset-sample
name: dataset-playground-v1
namespace: arcadia
spec:
# TODO(user): Add fields here
dataset:
kind: Dataset
name: dataset-playground
namespace: arcadia
fileGroups:
- source:
kind: Datasource
name: arcadia-local
namespace: arcadia
paths:
- qa.csv
released: 0
version: v1
142 changes: 61 additions & 81 deletions controllers/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
arcadiav1alpha1 "github.com/kubeagi/arcadia/api/v1alpha1"
"github.com/kubeagi/arcadia/pkg/config"
"github.com/kubeagi/arcadia/pkg/datasource"
pkgdocumentloaders "github.com/kubeagi/arcadia/pkg/documentloaders"
"github.com/kubeagi/arcadia/pkg/embeddings"
zhipuaiembeddings "github.com/kubeagi/arcadia/pkg/embeddings/zhipuai"
"github.com/kubeagi/arcadia/pkg/llms/zhipuai"
Expand All @@ -54,11 +55,10 @@ const (
)

var (
errNoDataSource = fmt.Errorf("no datasource")
errDataSourceTypeUnkonwn = fmt.Errorf("unknown datasource type")
errDataSourceNotReady = fmt.Errorf("datasource is not ready")
errEmbedderNotReady = fmt.Errorf("embedder is not ready")
errVectorStoreNotReady = fmt.Errorf("vector store is not ready")
errNoSource = fmt.Errorf("no source")
errDataSourceNotReady = fmt.Errorf("datasource is not ready")
errEmbedderNotReady = fmt.Errorf("embedder is not ready")
errVectorStoreNotReady = fmt.Errorf("vector store is not ready")
)

// KnowledgeBaseReconciler reconciles a KnowledgeBase object
Expand All @@ -72,8 +72,8 @@ type KnowledgeBaseReconciler struct {
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=knowledgebases/finalizers,verbs=update
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=embedders,verbs=get;list;watch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=embedders/status,verbs=get
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=datasources,verbs=get;list;watch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=datasources/status,verbs=get
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=versioneddataset,verbs=get;list;watch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=versioneddataset/status,verbs=get
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=vectorstores,verbs=get;list;watch
//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=vectorstores/status,verbs=get

Expand Down Expand Up @@ -227,28 +227,39 @@ func (r *KnowledgeBaseReconciler) setCondition(kb *arcadiav1alpha1.KnowledgeBase
func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase, vectorStore *arcadiav1alpha1.VectorStore, embedder *arcadiav1alpha1.Embedder, group arcadiav1alpha1.FileGroup) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to reconcile FileGroup.DataSource: %s: %w", group.Source.Name, err)
err = fmt.Errorf("failed to reconcile FileGroup.Source: %s: %w", group.Source.Name, err)
}
}()

if group.Source == nil {
return errNoDataSource
return errNoSource
}
dataSource := &arcadiav1alpha1.Datasource{}
versionedDataset := &arcadiav1alpha1.VersionedDataset{}
ns := group.Source.GetNamespace()
if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, dataSource); err != nil {
if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil {
if errors.IsNotFound(err) {
return errNoDataSource
return errNoSource
} else {
return err
}
}
if !dataSource.Status.IsReady() {
if !versionedDataset.Status.IsReady() {
return errDataSourceNotReady
}
if dataSource.Spec.Type() == arcadiav1alpha1.DatasourceTypeUnknown {
return errDataSourceTypeUnkonwn

system, err := config.GetSystemDatasource(ctx, r.Client)
if err != nil {
return err
}
endpoint := system.Spec.Enpoint.DeepCopy()
if endpoint != nil && endpoint.AuthSecret != nil {
endpoint.AuthSecret.WithNameSpace(system.Namespace)
}
ds, err := datasource.NewLocal(ctx, r.Client, endpoint)
if err != nil {
return err
}
info := &arcadiav1alpha1.OSS{Bucket: ns}

if len(kb.Status.FileGroupDetail) == 0 {
kb.Status.FileGroupDetail = make([]arcadiav1alpha1.FileGroupDetail, 1)
Expand All @@ -257,7 +268,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
var fileGroupDetail *arcadiav1alpha1.FileGroupDetail
pathMap := make(map[string]*arcadiav1alpha1.FileDetails, 1)
for i, detail := range kb.Status.FileGroupDetail {
if detail.Source != nil && detail.Source.Name == dataSource.Name && detail.Source.GetNamespace() == dataSource.GetNamespace() {
if detail.Source != nil && detail.Source.Name == versionedDataset.Name && detail.Source.GetNamespace() == versionedDataset.GetNamespace() {
fileGroupDetail = &kb.Status.FileGroupDetail[i]
for i, detail := range fileGroupDetail.FileDetails {
pathMap[detail.Path] = &fileGroupDetail.FileDetails[i] // FIXME 这样对不?
Expand All @@ -271,35 +282,6 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
kb.Status.FileGroupDetail = append(kb.Status.FileGroupDetail, *fileGroupDetail)
}

var ds datasource.Datasource
info := &arcadiav1alpha1.OSS{}
switch dataSource.Spec.Type() {
case arcadiav1alpha1.DatasourceTypeLocal:
system, err := config.GetSystemDatasource(ctx, r.Client)
if err != nil {
return err
}
endpoint := system.Spec.Enpoint.DeepCopy()
if endpoint != nil && endpoint.AuthSecret != nil {
endpoint.AuthSecret.WithNameSpace(system.Namespace)
}
ds, err = datasource.NewLocal(ctx, r.Client, endpoint)
if err != nil {
return err
}
info = &arcadiav1alpha1.OSS{Bucket: dataSource.Namespace}
case arcadiav1alpha1.DatasourceTypeOSS:
endpoint := dataSource.Spec.Enpoint.DeepCopy()
// set auth secret's namespace to the datasource's namespace
if endpoint.AuthSecret != nil {
endpoint.AuthSecret.WithNameSpace(dataSource.Namespace)
}
ds, err = datasource.NewOSS(ctx, r.Client, endpoint)
if err != nil {
return err
}
info = dataSource.Spec.OSS.DeepCopy()
}
errs := make([]error, 0)
for _, path := range group.Paths {
fileDatail, ok := pathMap[path]
Expand All @@ -321,42 +303,40 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
fileDatail.UpdateErr(err)
continue
}
switch dataSource.Spec.Type() {
case arcadiav1alpha1.DatasourceTypeLocal, arcadiav1alpha1.DatasourceTypeOSS:
objectStat, ok := stat.(minio.ObjectInfo)
log.V(0).Info(fmt.Sprintf("minio StatFile:%#v", objectStat), "path", path)
if !ok {
err = fmt.Errorf("failed to convert stat to minio.ObjectInfo:%s", path)
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
if objectStat.ETag == fileDatail.Checksum {
fileDatail.LastUpdateTime = metav1.Now()
continue
}
fileDatail.Checksum = objectStat.ETag
tags, err := ds.GetTags(ctx, info)
if err != nil {
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
file, err := ds.ReadFile(ctx, info)
if err != nil {
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
defer file.Close()
if err = r.handleFile(ctx, log, file, info.Object, tags, kb, vectorStore, embedder); err != nil {
err = fmt.Errorf("failed to handle file:%s: %w", path, err)
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
fileDatail.UpdateErr(nil)

objectStat, ok := stat.(minio.ObjectInfo)
log.V(0).Info(fmt.Sprintf("minio StatFile:%#v", objectStat), "path", path)
if !ok {
err = fmt.Errorf("failed to convert stat to minio.ObjectInfo:%s", path)
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
if objectStat.ETag == fileDatail.Checksum {
fileDatail.LastUpdateTime = metav1.Now()
continue
}
fileDatail.Checksum = objectStat.ETag
tags, err := ds.GetTags(ctx, info)
if err != nil {
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
file, err := ds.ReadFile(ctx, info)
if err != nil {
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
defer file.Close()
if err = r.handleFile(ctx, log, file, info.Object, tags, kb, vectorStore, embedder); err != nil {
err = fmt.Errorf("failed to handle file:%s: %w", path, err)
errs = append(errs, err)
fileDatail.UpdateErr(err)
continue
}
fileDatail.UpdateErr(nil)
}
return utilerrors.NewAggregate(errs)
}
Expand Down Expand Up @@ -394,7 +374,7 @@ func (r *KnowledgeBaseReconciler) handleFile(ctx context.Context, log logr.Logge
case "txt":
loader = documentloaders.NewText(dataReader)
case "csv":
loader = documentloaders.NewCSV(dataReader)
loader = pkgdocumentloaders.NewQACSV(dataReader, fileName, "q", "a")
case "html", "htm":
loader = documentloaders.NewHTML(dataReader)
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ spec:
- name
type: object
fileGroups:
description: FileGroups included files Grouped by Datasource
description: FileGroups included files Grouped by VersionedDataset
items:
properties:
paths:
Expand Down Expand Up @@ -197,7 +197,8 @@ spec:
type: object
type: array
source:
description: From defines the source which provides these files
description: From defines the datasource which provides these
files
properties:
apiGroup:
description: APIGroup is the group for the resource being
Expand Down
14 changes: 14 additions & 0 deletions deploy/charts/arcadia/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,20 @@ rules:
- get
- patch
- update
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- versioneddataset
verbs:
- get
- list
- watch
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
- versioneddataset/status
verbs:
- get
- apiGroups:
- arcadia.kubeagi.k8s.com.cn
resources:
Expand Down
Loading

0 comments on commit 3d5d17b

Please sign in to comment.