Skip to content

Commit

Permalink
Refine some code logic and add streamlkit data-app support
Browse files Browse the repository at this point in the history
  • Loading branch information
nkwangleiGIT committed Dec 1, 2023
1 parent 7cd9207 commit 067309a
Show file tree
Hide file tree
Showing 21 changed files with 497 additions and 115 deletions.
2 changes: 1 addition & 1 deletion api/base/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (in *TypedObjectReference) WithNameSpace(namespace string) {

func (in *TypedObjectReference) GetNamespace() string {
if in.Namespace == nil {
return utils.GetSelfNamespace()
return utils.GetCurrentNamespace()
}
return *in.Namespace
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/app-node/chain/llmchain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type LLMChainReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *LLMChainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Start LLMChain Reconcile")
log.V(5).Info("Start LLMChain Reconcile")
instance := &api.LLMChain{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
// There's no need to requeue if the resource no longer exists.
Expand All @@ -54,7 +54,7 @@ func (r *LLMChainReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log = log.WithValues("Generation", instance.GetGeneration(), "ObservedGeneration", instance.Status.ObservedGeneration, "creator", instance.Spec.Creator)
log.V(1).Info("Get LLMChain instance")
log.V(5).Info("Get LLMChain instance")

// Add a finalizer.Then, we can define some operations which should
// occur before the LLMChain to be deleted.
Expand Down
4 changes: 2 additions & 2 deletions controllers/app-node/prompt/prompt_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type PromptReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *PromptReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Start Prompt Reconcile")
log.V(5).Info("Start Prompt Reconcile")
instance := &api.Prompt{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
// There's no need to requeue if the resource no longer exists.
Expand All @@ -54,7 +54,7 @@ func (r *PromptReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log = log.WithValues("Generation", instance.GetGeneration(), "ObservedGeneration", instance.Status.ObservedGeneration, "creator", instance.Spec.Creator)
log.V(1).Info("Get Prompt instance")
log.V(5).Info("Get Prompt instance")

// Add a finalizer.Then, we can define some operations which should
// occur before the Prompt to be deleted.
Expand Down
4 changes: 2 additions & 2 deletions controllers/application_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ApplicationReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Start Application Reconcile")
log.V(5).Info("Start Application Reconcile")
app := &arcadiav1alpha1.Application{}
if err := r.Get(ctx, req.NamespacedName, app); err != nil {
// There's no need to requeue if the resource no longer exists.
Expand All @@ -53,7 +53,7 @@ func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log = log.WithValues("Generation", app.GetGeneration(), "ObservedGeneration", app.Status.ObservedGeneration, "creator", app.Spec.Creator)
log.V(1).Info("Get Application instance")
log.V(5).Info("Get Application instance")

// Add a finalizer.Then, we can define some operations which should
// occur before the Application to be deleted.
Expand Down
4 changes: 2 additions & 2 deletions controllers/datasource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type DatasourceReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *DatasourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Starting datasource reconcile")
logger.V(5).Info("Starting datasource reconcile")

instance := &arcadiav1alpha1.Datasource{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
Expand Down Expand Up @@ -152,7 +152,7 @@ func (r *DatasourceReconciler) Initialize(ctx context.Context, logger logr.Logge

// Checkdatasource to update status
func (r *DatasourceReconciler) Checkdatasource(ctx context.Context, logger logr.Logger, instance *arcadiav1alpha1.Datasource) error {
logger.Info("check datasource")
logger.V(5).Info("check datasource")
var err error

// create datasource
Expand Down
30 changes: 23 additions & 7 deletions controllers/embedder_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import (
"context"
"errors"
"fmt"
"reflect"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1"
"github.com/kubeagi/arcadia/pkg/embeddings"
Expand Down Expand Up @@ -65,13 +69,13 @@ type EmbedderReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *EmbedderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconciling embedding resource")
logger.V(5).Info("Reconciling embedding resource")

instance := &arcadiav1alpha1.Embedder{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
// There's no need to requeue if the resource no longer exists.
// Otherwise, we'll be requeued implicitly because we return an error.
logger.V(1).Info("Failed to get Embedder")
logger.Error(err, "Failed to get Embedder")
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand Down Expand Up @@ -102,23 +106,35 @@ func (r *EmbedderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
logger.Info("Remove Embedder done")
return ctrl.Result{}, nil
}

if err := r.CheckEmbedder(ctx, logger, instance); err != nil {
return ctrl.Result{RequeueAfter: waitMedium}, err
}

return ctrl.Result{RequeueAfter: waitLonger}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *EmbedderReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&arcadiav1alpha1.Embedder{}).
For(&arcadiav1alpha1.Embedder{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
// Avoid to handle the event that it's not spec update or delete
oldEmbedder := ue.ObjectOld.(*arcadiav1alpha1.Embedder)
newEmbedder := ue.ObjectNew.(*arcadiav1alpha1.Embedder)
return !reflect.DeepEqual(oldEmbedder.Spec, newEmbedder.Spec) || newEmbedder.DeletionTimestamp != nil
},
// for other event handler, we must add the function explictly.
CreateFunc: func(event.CreateEvent) bool {
return true
},
DeleteFunc: func(event.DeleteEvent) bool {
return true
},
})).
Complete(r)
}

func (r *EmbedderReconciler) CheckEmbedder(ctx context.Context, logger logr.Logger, instance *arcadiav1alpha1.Embedder) error {
logger.Info("Checking embedding resource")
logger.V(5).Info("Checking embedding resource")

switch instance.Spec.Provider.GetType() {
case arcadiav1alpha1.ProviderType3rdParty:
Expand All @@ -131,7 +147,7 @@ func (r *EmbedderReconciler) CheckEmbedder(ctx context.Context, logger logr.Logg
}

func (r *EmbedderReconciler) check3rdPartyEmbedder(ctx context.Context, logger logr.Logger, instance *arcadiav1alpha1.Embedder) error {
logger.Info("Checking 3rd party embedding resource")
logger.V(5).Info("Checking 3rd party embedding resource")

var err error
var msg string
Expand Down
8 changes: 4 additions & 4 deletions controllers/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type KnowledgeBaseReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *KnowledgeBaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Start KnowledgeBase Reconcile")
log.V(5).Info("Start KnowledgeBase Reconcile")
kb := &arcadiav1alpha1.KnowledgeBase{}
if err := r.Get(ctx, req.NamespacedName, kb); err != nil {
// There's no need to requeue if the resource no longer exists.
Expand All @@ -98,7 +98,7 @@ func (r *KnowledgeBaseReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log = log.WithValues("Generation", kb.GetGeneration(), "ObservedGeneration", kb.Status.ObservedGeneration, "creator", kb.Spec.Creator)
log.V(1).Info("Get KnowledgeBase instance")
log.V(5).Info("Get KnowledgeBase instance")

// Add a finalizer.Then, we can define some operations which should
// occur before the KnowledgeBase to be deleted.
Expand Down Expand Up @@ -307,15 +307,15 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
}
info.Object = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version, path)
stat, err := ds.StatFile(ctx, info)
log.V(0).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", path)
log.V(5).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", path)
if err != nil {
errs = append(errs, err)
fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed)
continue
}

objectStat, ok := stat.(minio.ObjectInfo)
log.V(0).Info(fmt.Sprintf("minio StatFile:%#v", objectStat), "path", path)
log.V(5).Info(fmt.Sprintf("minio StatFile:%#v", objectStat), "path", path)
if !ok {
err = fmt.Errorf("failed to convert stat to minio.ObjectInfo:%s", path)
errs = append(errs, err)
Expand Down
4 changes: 2 additions & 2 deletions controllers/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type ModelReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Starting model reconcile")
logger.V(5).Info("Starting model reconcile")

instance := &arcadiav1alpha1.Model{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (r *ModelReconciler) Initialize(ctx context.Context, logger logr.Logger, in

// CheckModel to update status
func (r *ModelReconciler) CheckModel(ctx context.Context, logger logr.Logger, instance *arcadiav1alpha1.Model) error {
logger.Info("check model")
logger.V(5).Info("check model")
var err error

var ds datasource.Datasource
Expand Down
Loading

0 comments on commit 067309a

Please sign in to comment.