Skip to content

Commit

Permalink
Added additional checks for inflight flow status (#3870)
Browse files Browse the repository at this point in the history
The Inflight flow status check sends a message to the client about flows
that are currently in flight. This PR adds an additional check to verify
if the flow has already been resolved before sending this check. This
helps to avoid sending a status check for a flow that is already
resolved which will result in an error state for the flow, even though
it is already completed.

Also added a flag to disable this entire feature which might be
necessary for unusual configurations, while the feature is pretty new
and might introduce too much load on busy servers.

Also added toolbar to hunt client view.
  • Loading branch information
scudette committed Nov 4, 2024
1 parent 22fce76 commit d2ded4c
Show file tree
Hide file tree
Showing 26 changed files with 366 additions and 228 deletions.
4 changes: 3 additions & 1 deletion accessors/vfs/vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ func (self *TestSuite) TestVFSAccessor() {

// Wait here until the collection is completed.
vtesting.WaitUntil(time.Second*5, self.T(), func() bool {
flow, err := launcher.GetFlowDetails(self.Ctx, self.ConfigObj, "server", flow_id)
flow, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
"server", flow_id)
assert.NoError(self.T(), err)

return flow.Context.State == flows_proto.ArtifactCollectorContext_FINISHED
Expand Down
3 changes: 2 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ func (self *ApiServer) GetFlowDetails(
return nil, Status(self.verbose, err)
}
result, err := launcher.GetFlowDetails(
ctx, org_config_obj, in.ClientId, in.FlowId)
ctx, org_config_obj, services.GetFlowOptions{Downloads: true},
in.ClientId, in.FlowId)
if err != nil {
return nil, Status(self.verbose, err)
}
Expand Down
366 changes: 191 additions & 175 deletions config/proto/config.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions config/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,11 @@ message Defaults {
// clients with the key department and value contains accounting
// in their client metadata.
repeated string indexed_client_metadata = 51;

// If this is set we do not actively check the status of inflight
// collections. This is a new feature and may need to be disabled
// in some large deployments.
bool disable_active_inflight_checks = 52;
}

// Configures crypto preferences
Expand Down
1 change: 0 additions & 1 deletion gui/velociraptor/src/components/hunts/hunt-clients.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export default class HuntClients extends React.Component {
renderers={renderers}
params={params}
translate_column_headers={true}
no_toolbar={true}
name={"HuntClients" + hunt_id}
/>
);
Expand Down
3 changes: 2 additions & 1 deletion http_comms/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func (self *TestSuite) testScheduleCollection() (closer func()) {
var flow *api_proto.FlowDetails

vtesting.WaitUntil(20*time.Second, self.T(), func() bool {
flow, err = launcher.GetFlowDetails(self.Ctx, self.ConfigObj,
flow, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
assert.NoError(self.T(), err)

Expand Down
24 changes: 16 additions & 8 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (self *ServerTestSuite) TestFlowStates() {
flow_ids = append(flow_ids, flow_id)

flow_details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
assert.NoError(self.T(), err)

// Our initial state is RUNNING
Expand Down Expand Up @@ -204,7 +205,8 @@ func (self *ServerTestSuite) TestFlowStates() {

for in_flight := range client_info.InFlightFlows {
flow_details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the WAITING state now until the client sends
Expand All @@ -231,7 +233,8 @@ func (self *ServerTestSuite) TestFlowStates() {
runner.Close(self.Ctx)

flow_details, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the IN_PROGRESS state now.
Expand All @@ -245,7 +248,8 @@ func (self *ServerTestSuite) TestFlowStates() {
defer closer()

flow_details, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the UNRESPONSIVE state now.
Expand All @@ -271,7 +275,8 @@ func (self *ServerTestSuite) TestFlowStates() {
runner.Close(self.Ctx)

flow_details, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the FINISHED state now.
Expand Down Expand Up @@ -791,7 +796,8 @@ func (self *ServerTestSuite) TestErrorMessage() {
assert.NoError(self.T(), err)

details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
require.NoError(t, err)

require.Regexp(self.T(), regexp.MustCompile("Error generated"),
Expand Down Expand Up @@ -833,7 +839,8 @@ func (self *ServerTestSuite) TestCompletions() {

vtesting.WaitUntil(5*time.Second, self.T(), func() bool {
details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
require.NoError(t, err)

// Flow not complete yet - still an outstanding request.
Expand All @@ -859,7 +866,8 @@ func (self *ServerTestSuite) TestCompletions() {
vtesting.WaitUntil(5*time.Second, self.T(), func() bool {
// Flow should be complete now that second response arrived.
details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
require.NoError(t, err)

return flows_proto.ArtifactCollectorContext_FINISHED ==
Expand Down
90 changes: 75 additions & 15 deletions services/client_info/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/datastore"
"www.velocidex.com/golang/velociraptor/file_store/api"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
Expand Down Expand Up @@ -264,7 +265,8 @@ var (
)

// Fetch the next number of flow_request tasks off the queue and
// dequeue them.
// dequeue them. NOTE: This function can return more than number
// messages but only number FlowRequest objects.
func (self *ClientInfoManager) getClientTasks(
ctx context.Context, client_id string, number int) (
[]*crypto_proto.VeloMessage, error) {
Expand Down Expand Up @@ -392,6 +394,12 @@ func (self *ClientInfoManager) GetClientTasks(
// is not the same as len(inflight_notifications)
inflight_requests := 0

inflight_checks_enabled := true
if self.config_obj.Defaults != nil &&
self.config_obj.Defaults.DisableActiveInflightChecks {
inflight_checks_enabled = false
}

now := utils.GetTime().Now().Unix()

err := self.storage.Modify(ctx, self.config_obj, client_id,
Expand All @@ -411,15 +419,18 @@ func (self *ClientInfoManager) GetClientTasks(

// Check up on in flight flows every 60 sec at least
// (could be more depending on poll).
for k, v := range client_info.InFlightFlows {
if now-v > 60 {
inflight_notifications = append(inflight_notifications, k)
if inflight_checks_enabled {
for k, v := range client_info.InFlightFlows {
if now-v > 60 {
inflight_notifications = append(
inflight_notifications, k)
}
}
}

// Update the time to ensure we dont send these too often.
for _, k := range inflight_notifications {
client_info.InFlightFlows[k] = utils.GetTime().Now().Unix()
// Update the time to ensure we dont send these too often.
for _, k := range inflight_notifications {
client_info.InFlightFlows[k] = utils.GetTime().Now().Unix()
}
}

// Reset the HasTasks flag
Expand Down Expand Up @@ -457,23 +468,72 @@ func (self *ClientInfoManager) GetClientTasks(
// Add a notification request to the client asking about the
// status of currently in flight requests.
if len(inflight_notifications) > 0 {
result = append(result, &crypto_proto.VeloMessage{
SessionId: constants.STATUS_CHECK_WELL_KNOWN_FLOW,
FlowStatsRequest: &crypto_proto.FlowStatsRequest{
FlowId: inflight_notifications,
},
})
launcher, err := services.GetLauncher(self.config_obj)
if err != nil {
return nil, err
}

// Check the launcher if the flows are really in flight or
// were they already resolved.
verified := make([]string, 0, len(inflight_notifications))
resolved := make([]string, 0, len(inflight_notifications))
for _, n := range inflight_notifications {
// Only request status for flows that have not actually
// been completed.
flow_obj, err := launcher.GetFlowDetails(ctx, self.config_obj,
services.GetFlowOptions{}, client_id, n)
if err != nil {
continue
}

// If the flow is resolved we ignore it.
switch flow_obj.Context.State {
case flows_proto.ArtifactCollectorContext_FINISHED,
flows_proto.ArtifactCollectorContext_ERROR:
resolved = append(resolved, n)
default:
// All other flow states are still unclear what is
// happening with it?
verified = append(verified, n)
}
}

if len(resolved) > 0 {
self.storage.Modify(ctx, self.config_obj, client_id,
func(client_info *services.ClientInfo) (*services.ClientInfo, error) {
if client_info == nil ||
client_info.InFlightFlows == nil {
return nil, nil
}

for _, k := range resolved {
delete(client_info.InFlightFlows, k)
}
return client_info, nil
})
}

// Ask the client about those flows
if len(verified) > 0 {
result = append(result, &crypto_proto.VeloMessage{
SessionId: constants.STATUS_CHECK_WELL_KNOWN_FLOW,
FlowStatsRequest: &crypto_proto.FlowStatsRequest{
FlowId: verified,
},
})
}
}

// What new flows were added?
var inflight_flows []string
for _, message := range result {
// Filter out the FlowRequest checks
if message.FlowRequest != nil && message.SessionId != "" {
inflight_flows = append(inflight_flows, message.SessionId)
}
}

if len(inflight_flows) > 0 {
if inflight_checks_enabled && len(inflight_flows) > 0 {

// Add the inflight tags to the client record immediately.
err := self.storage.Modify(ctx, self.config_obj, client_id,
Expand Down
8 changes: 5 additions & 3 deletions services/hunt_dispatcher/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func (self *HuntDispatcher) syncFlowTables(
}

flow, err := launcher.GetFlowDetails(
ctx, config_obj, participation_row.ClientId,
participation_row.FlowId)
ctx, config_obj,
services.GetFlowOptions{},
participation_row.ClientId, participation_row.FlowId)
if err != nil {
continue
}
Expand Down Expand Up @@ -192,7 +193,8 @@ func (self *HuntDispatcher) GetFlows(
// information.
} else {
collection_context, err = launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
ctx, config_obj, services.GetFlowOptions{},
client_id, flow_id)
if err != nil {
continue
}
Expand Down
5 changes: 3 additions & 2 deletions services/hunt_manager/hunt_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ func (self *HuntManager) maybeDirectlyAssignFlow(
if err != nil {
return err
}
_, err = launcher.GetFlowDetails(ctx, config_obj, assignment.ClientId,
assignment.FlowId)
_, err = launcher.GetFlowDetails(
ctx, config_obj, services.GetFlowOptions{},
assignment.ClientId, assignment.FlowId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/indexing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (self *Indexer) Start(
// Avoid doing snapshots too quickly. This is mainly for
// tests where the time is mocked for the After(delay)
// above does not work.
if utils.GetTime().Now().Sub(last_run) < time.Second {
if utils.GetTime().Now().Sub(last_run) < time.Minute {
utils.SleepWithCtx(ctx, time.Minute)
continue
}
Expand Down
9 changes: 9 additions & 0 deletions services/journal/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,20 @@ func (self *ReplicationService) Watch(

// Keep retrying to reconnect in case the
// connection dropped.
last_try := utils.GetTime().Now()

select {
case <-self.ctx.Done():
return

case <-time.After(utils.Jitter(self.RetryDuration())):

// Avoid retrying too quickly. This is mainly for
// tests where the time is mocked for the After(delay)
// above does not work.
if utils.GetTime().Now().Sub(last_try) < time.Minute {
utils.SleepWithCtx(ctx, time.Minute)
}
}

logger := logging.GetLogger(self.config_obj,
Expand Down
3 changes: 2 additions & 1 deletion services/journal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func GetFlowFromQueue(
}

flow_details, err := launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
ctx, config_obj, services.GetFlowOptions{},
client_id, flow_id)
if err != nil ||
flow_details == nil ||
flow_details.Context == nil {
Expand Down
9 changes: 9 additions & 0 deletions services/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func GetLauncher(config_obj *config_proto.Config) (Launcher, error) {
return org_manager.Services(config_obj.OrgId).Launcher()
}

// Options for the GetFlowOptions API. This ensures we do no more work
// than necessary.
type GetFlowOptions struct {

// Include the flow downloads (ZIP exports of the flow).
Downloads bool
}

type CompilerOptions struct {
// Should names be obfuscated in the resulting VQL?
ObfuscateNames bool
Expand Down Expand Up @@ -223,6 +231,7 @@ type Launcher interface {
GetFlowDetails(
ctx context.Context,
config_obj *config_proto.Config,
opts GetFlowOptions,
client_id string, flow_id string) (*api_proto.FlowDetails, error)

// Actively cancel the collection
Expand Down
3 changes: 2 additions & 1 deletion services/launcher/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (self *FlowStorageManager) DeleteFlow(
}

collection_details, err := launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
ctx, config_obj, services.GetFlowOptions{},
client_id, flow_id)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d2ded4c

Please sign in to comment.