Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added additional checks for inflight flow status #3870

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion accessors/vfs/vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ func (self *TestSuite) TestVFSAccessor() {

// Wait here until the collection is completed.
vtesting.WaitUntil(time.Second*5, self.T(), func() bool {
flow, err := launcher.GetFlowDetails(self.Ctx, self.ConfigObj, "server", flow_id)
flow, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
"server", flow_id)
assert.NoError(self.T(), err)

return flow.Context.State == flows_proto.ArtifactCollectorContext_FINISHED
Expand Down
3 changes: 2 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ func (self *ApiServer) GetFlowDetails(
return nil, Status(self.verbose, err)
}
result, err := launcher.GetFlowDetails(
ctx, org_config_obj, in.ClientId, in.FlowId)
ctx, org_config_obj, services.GetFlowOptions{Downloads: true},
in.ClientId, in.FlowId)
if err != nil {
return nil, Status(self.verbose, err)
}
Expand Down
366 changes: 191 additions & 175 deletions config/proto/config.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions config/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,11 @@ message Defaults {
// clients with the key department and value contains accounting
// in their client metadata.
repeated string indexed_client_metadata = 51;

// If this is set we do not actively check the status of inflight
// collections. This is a new feature and may need to be disabled
// in some large deployments.
bool disable_active_inflight_checks = 52;
}

// Configures crypto preferences
Expand Down
1 change: 0 additions & 1 deletion gui/velociraptor/src/components/hunts/hunt-clients.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ export default class HuntClients extends React.Component {
renderers={renderers}
params={params}
translate_column_headers={true}
no_toolbar={true}
name={"HuntClients" + hunt_id}
/>
);
Expand Down
3 changes: 2 additions & 1 deletion http_comms/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func (self *TestSuite) testScheduleCollection() (closer func()) {
var flow *api_proto.FlowDetails

vtesting.WaitUntil(20*time.Second, self.T(), func() bool {
flow, err = launcher.GetFlowDetails(self.Ctx, self.ConfigObj,
flow, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
assert.NoError(self.T(), err)

Expand Down
24 changes: 16 additions & 8 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (self *ServerTestSuite) TestFlowStates() {
flow_ids = append(flow_ids, flow_id)

flow_details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
assert.NoError(self.T(), err)

// Our initial state is RUNNING
Expand Down Expand Up @@ -204,7 +205,8 @@ func (self *ServerTestSuite) TestFlowStates() {

for in_flight := range client_info.InFlightFlows {
flow_details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the WAITING state now until the client sends
Expand All @@ -231,7 +233,8 @@ func (self *ServerTestSuite) TestFlowStates() {
runner.Close(self.Ctx)

flow_details, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the IN_PROGRESS state now.
Expand All @@ -245,7 +248,8 @@ func (self *ServerTestSuite) TestFlowStates() {
defer closer()

flow_details, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the UNRESPONSIVE state now.
Expand All @@ -271,7 +275,8 @@ func (self *ServerTestSuite) TestFlowStates() {
runner.Close(self.Ctx)

flow_details, err = launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, in_flight)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, in_flight)
assert.NoError(self.T(), err)

// The flow is in the FINISHED state now.
Expand Down Expand Up @@ -791,7 +796,8 @@ func (self *ServerTestSuite) TestErrorMessage() {
assert.NoError(self.T(), err)

details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
require.NoError(t, err)

require.Regexp(self.T(), regexp.MustCompile("Error generated"),
Expand Down Expand Up @@ -833,7 +839,8 @@ func (self *ServerTestSuite) TestCompletions() {

vtesting.WaitUntil(5*time.Second, self.T(), func() bool {
details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
require.NoError(t, err)

// Flow not complete yet - still an outstanding request.
Expand All @@ -859,7 +866,8 @@ func (self *ServerTestSuite) TestCompletions() {
vtesting.WaitUntil(5*time.Second, self.T(), func() bool {
// Flow should be complete now that second response arrived.
details, err := launcher.GetFlowDetails(
self.Ctx, self.ConfigObj, self.client_id, flow_id)
self.Ctx, self.ConfigObj, services.GetFlowOptions{},
self.client_id, flow_id)
require.NoError(t, err)

return flows_proto.ArtifactCollectorContext_FINISHED ==
Expand Down
90 changes: 75 additions & 15 deletions services/client_info/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/datastore"
"www.velocidex.com/golang/velociraptor/file_store/api"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
Expand Down Expand Up @@ -264,7 +265,8 @@ var (
)

// Fetch the next number of flow_request tasks off the queue and
// dequeue them.
// dequeue them. NOTE: This function can return more than number
// messages but only number FlowRequest objects.
func (self *ClientInfoManager) getClientTasks(
ctx context.Context, client_id string, number int) (
[]*crypto_proto.VeloMessage, error) {
Expand Down Expand Up @@ -392,6 +394,12 @@ func (self *ClientInfoManager) GetClientTasks(
// is not the same as len(inflight_notifications)
inflight_requests := 0

inflight_checks_enabled := true
if self.config_obj.Defaults != nil &&
self.config_obj.Defaults.DisableActiveInflightChecks {
inflight_checks_enabled = false
}

now := utils.GetTime().Now().Unix()

err := self.storage.Modify(ctx, self.config_obj, client_id,
Expand All @@ -411,15 +419,18 @@ func (self *ClientInfoManager) GetClientTasks(

// Check up on in flight flows every 60 sec at least
// (could be more depending on poll).
for k, v := range client_info.InFlightFlows {
if now-v > 60 {
inflight_notifications = append(inflight_notifications, k)
if inflight_checks_enabled {
for k, v := range client_info.InFlightFlows {
if now-v > 60 {
inflight_notifications = append(
inflight_notifications, k)
}
}
}

// Update the time to ensure we dont send these too often.
for _, k := range inflight_notifications {
client_info.InFlightFlows[k] = utils.GetTime().Now().Unix()
// Update the time to ensure we dont send these too often.
for _, k := range inflight_notifications {
client_info.InFlightFlows[k] = utils.GetTime().Now().Unix()
}
}

// Reset the HasTasks flag
Expand Down Expand Up @@ -457,23 +468,72 @@ func (self *ClientInfoManager) GetClientTasks(
// Add a notification request to the client asking about the
// status of currently in flight requests.
if len(inflight_notifications) > 0 {
result = append(result, &crypto_proto.VeloMessage{
SessionId: constants.STATUS_CHECK_WELL_KNOWN_FLOW,
FlowStatsRequest: &crypto_proto.FlowStatsRequest{
FlowId: inflight_notifications,
},
})
launcher, err := services.GetLauncher(self.config_obj)
if err != nil {
return nil, err
}

// Check the launcher if the flows are really in flight or
// were they already resolved.
verified := make([]string, 0, len(inflight_notifications))
resolved := make([]string, 0, len(inflight_notifications))
for _, n := range inflight_notifications {
// Only request status for flows that have not actually
// been completed.
flow_obj, err := launcher.GetFlowDetails(ctx, self.config_obj,
services.GetFlowOptions{}, client_id, n)
if err != nil {
continue
}

// If the flow is resolved we ignore it.
switch flow_obj.Context.State {
case flows_proto.ArtifactCollectorContext_FINISHED,
flows_proto.ArtifactCollectorContext_ERROR:
resolved = append(resolved, n)
default:
// All other flow states are still unclear what is
// happening with it?
verified = append(verified, n)
}
}

if len(resolved) > 0 {
self.storage.Modify(ctx, self.config_obj, client_id,
func(client_info *services.ClientInfo) (*services.ClientInfo, error) {
if client_info == nil ||
client_info.InFlightFlows == nil {
return nil, nil
}

for _, k := range resolved {
delete(client_info.InFlightFlows, k)
}
return client_info, nil
})
}

// Ask the client about those flows
if len(verified) > 0 {
result = append(result, &crypto_proto.VeloMessage{
SessionId: constants.STATUS_CHECK_WELL_KNOWN_FLOW,
FlowStatsRequest: &crypto_proto.FlowStatsRequest{
FlowId: verified,
},
})
}
}

// What new flows were added?
var inflight_flows []string
for _, message := range result {
// Filter out the FlowRequest checks
if message.FlowRequest != nil && message.SessionId != "" {
inflight_flows = append(inflight_flows, message.SessionId)
}
}

if len(inflight_flows) > 0 {
if inflight_checks_enabled && len(inflight_flows) > 0 {

// Add the inflight tags to the client record immediately.
err := self.storage.Modify(ctx, self.config_obj, client_id,
Expand Down
8 changes: 5 additions & 3 deletions services/hunt_dispatcher/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func (self *HuntDispatcher) syncFlowTables(
}

flow, err := launcher.GetFlowDetails(
ctx, config_obj, participation_row.ClientId,
participation_row.FlowId)
ctx, config_obj,
services.GetFlowOptions{},
participation_row.ClientId, participation_row.FlowId)
if err != nil {
continue
}
Expand Down Expand Up @@ -192,7 +193,8 @@ func (self *HuntDispatcher) GetFlows(
// information.
} else {
collection_context, err = launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
ctx, config_obj, services.GetFlowOptions{},
client_id, flow_id)
if err != nil {
continue
}
Expand Down
5 changes: 3 additions & 2 deletions services/hunt_manager/hunt_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ func (self *HuntManager) maybeDirectlyAssignFlow(
if err != nil {
return err
}
_, err = launcher.GetFlowDetails(ctx, config_obj, assignment.ClientId,
assignment.FlowId)
_, err = launcher.GetFlowDetails(
ctx, config_obj, services.GetFlowOptions{},
assignment.ClientId, assignment.FlowId)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion services/indexing/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (self *Indexer) Start(
// Avoid doing snapshots too quickly. This is mainly for
// tests where the time is mocked for the After(delay)
// above does not work.
if utils.GetTime().Now().Sub(last_run) < time.Second {
if utils.GetTime().Now().Sub(last_run) < time.Minute {
utils.SleepWithCtx(ctx, time.Minute)
continue
}
Expand Down
9 changes: 9 additions & 0 deletions services/journal/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,20 @@ func (self *ReplicationService) Watch(

// Keep retrying to reconnect in case the
// connection dropped.
last_try := utils.GetTime().Now()

select {
case <-self.ctx.Done():
return

case <-time.After(utils.Jitter(self.RetryDuration())):

// Avoid retrying too quickly. This is mainly for
// tests where the time is mocked for the After(delay)
// above does not work.
if utils.GetTime().Now().Sub(last_try) < time.Minute {
utils.SleepWithCtx(ctx, time.Minute)
}
}

logger := logging.GetLogger(self.config_obj,
Expand Down
3 changes: 2 additions & 1 deletion services/journal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func GetFlowFromQueue(
}

flow_details, err := launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
ctx, config_obj, services.GetFlowOptions{},
client_id, flow_id)
if err != nil ||
flow_details == nil ||
flow_details.Context == nil {
Expand Down
9 changes: 9 additions & 0 deletions services/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func GetLauncher(config_obj *config_proto.Config) (Launcher, error) {
return org_manager.Services(config_obj.OrgId).Launcher()
}

// Options for the GetFlowOptions API. This ensures we do no more work
// than necessary.
type GetFlowOptions struct {

// Include the flow downloads (ZIP exports of the flow).
Downloads bool
}

type CompilerOptions struct {
// Should names be obfuscated in the resulting VQL?
ObfuscateNames bool
Expand Down Expand Up @@ -223,6 +231,7 @@ type Launcher interface {
GetFlowDetails(
ctx context.Context,
config_obj *config_proto.Config,
opts GetFlowOptions,
client_id string, flow_id string) (*api_proto.FlowDetails, error)

// Actively cancel the collection
Expand Down
3 changes: 2 additions & 1 deletion services/launcher/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (self *FlowStorageManager) DeleteFlow(
}

collection_details, err := launcher.GetFlowDetails(
ctx, config_obj, client_id, flow_id)
ctx, config_obj, services.GetFlowOptions{},
client_id, flow_id)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading