Skip to content

Commit

Permalink
Merge branch 'v5.4' of github.com:ravendb/ravendb into v6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
arekpalinski committed Oct 20, 2023
2 parents 737f1a7 + 7a8cabb commit eac0069
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ public override bool UsingEncryptedCommunicationChannel()
switch (BrokerType)
{
case QueueBrokerType.Kafka:
if (Connection.KafkaConnectionSettings.ConnectionOptions.ContainsKey("SecurityProtocol"))
if (Connection.KafkaConnectionSettings.ConnectionOptions.TryGetValue("security.protocol", out string protocol))
{
string protocol = Connection.KafkaConnectionSettings.ConnectionOptions["SecurityProtocol"];
return protocol.ToLower() == "ssl";
return protocol.ToLower().Contains("ssl");
}
break;
case QueueBrokerType.RabbitMq:
return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase);
return Connection.RabbitMqConnectionSettings.ConnectionString.StartsWith("amqps", StringComparison.OrdinalIgnoreCase);
default:
return false;
throw new NotSupportedException($"Unknown broker type: {BrokerType}");
}

return false;
Expand Down
22 changes: 10 additions & 12 deletions src/Raven.Server/Documents/ETL/EtlLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,24 +352,23 @@ private bool ValidateConfiguration<T>(EtlConfiguration<T> config, HashSet<string

if (_databaseRecord.Encrypted && config.UsingEncryptedCommunicationChannel() == false && config.AllowEtlOnNonEncryptedChannel == false)
{
LogConfigurationError(config,
new List<string>
{
$"{_database.Name} is encrypted, but connection to ETL destination {config.GetDestination()} does not use encryption, so ETL is not allowed. " +
$"You can change this behavior by setting {nameof(config.AllowEtlOnNonEncryptedChannel)} when creating the ETL configuration"
});
return false;
}
if (config.AllowEtlOnNonEncryptedChannel == false)
{
LogConfigurationError(config,
new List<string>
{
$"{_database.Name} is encrypted, but connection to ETL destination {config.GetDestination()} does not use encryption, so ETL is not allowed. " +
$"You can change this behavior by setting {nameof(config.AllowEtlOnNonEncryptedChannel)} when creating the ETL configuration"
});
return false;
}

if (_databaseRecord.Encrypted && config.UsingEncryptedCommunicationChannel() == false && config.AllowEtlOnNonEncryptedChannel)
{
LogConfigurationWarning(config,
new List<string>
{
$"{_database.Name} is encrypted and connection to ETL destination {config.GetDestination()} does not use encryption, " +
$"but {nameof(config.AllowEtlOnNonEncryptedChannel)} is set to true, so ETL is allowed"
});
return true;
}

if (_databaseRecord.Encrypted && config is ElasticSearchEtlConfiguration esConfig && esConfig.Connection.Authentication == null)
Expand All @@ -379,7 +378,6 @@ private bool ValidateConfiguration<T>(EtlConfiguration<T> config, HashSet<string
{
$"{_database.Name} is encrypted and connection to ETL destination {config.GetDestination()} does not use authentication, but ETL is allowed."
});
return true;
}

if (uniqueNames.Add(config.Name) == false)
Expand Down
12 changes: 7 additions & 5 deletions src/Voron/Impl/Journal/WriteAheadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using Voron.Exceptions;
using Voron.Impl.FileHeaders;
using Voron.Impl.Paging;
using Voron.Impl.Scratch;
using Voron.Util;
using Constants = Voron.Global.Constants;
using NativeMemory = Sparrow.Utils.NativeMemory;
Expand Down Expand Up @@ -1478,17 +1479,18 @@ private void ApplyPagesToDataFileFromScratch(Dictionary<long, PagePosition> page
var scratchNumber = pagePosition.ScratchNumber;
if (scratchPagerStates.TryGetValue(scratchNumber, out var pagerState) == false)
{
pagerState = scratchBufferPool.GetPagerState(scratchNumber);
pagerState.AddRef();

// we're not under write transaction now, we need to acquire the pager state and use it for reading
var scratchBuffer = scratchBufferPool.GetScratchBufferFile(scratchNumber);
pagerState = scratchBuffer.File.Pager.GetPagerStateAndAddRefAtomically();

scratchPagerStates.Add(scratchNumber, pagerState);
}

if (_waj._env.Options.Encryption.IsEnabled == false)
{
using (tempTx) // release any resources, we just wanted to validate things
{
var page = (PageHeader*)scratchBufferPool.AcquirePagePointerWithOverflowHandling(tempTx, scratchNumber, pagePosition.ScratchPage);
var page = (PageHeader*)scratchBufferPool.AcquirePagePointerWithOverflowHandling(tempTx, scratchNumber, pagePosition.ScratchPage, pagerState);
var checksum = StorageEnvironment.CalculatePageChecksum((byte*)page, page->PageNumber, out var expectedChecksum);
if (checksum != expectedChecksum)
ThrowInvalidChecksumOnPageFromScratch(scratchNumber, pagePosition, page, checksum, expectedChecksum);
Expand Down Expand Up @@ -1834,7 +1836,7 @@ private CompressedPagesResult PrepareToWriteToJournal(LowLevelTransaction tx, IP
var pagesEncountered = 0;
foreach (var txPage in txPages)
{
var scratchPage = tx.Environment.ScratchBufferPool.AcquirePagePointerWithOverflowHandling(tx, txPage.ScratchFileNumber, txPage.PositionInScratchBuffer);
var scratchPage = tx.Environment.ScratchBufferPool.AcquirePagePointerWithOverflowHandling(tx, txPage.ScratchFileNumber, txPage.PositionInScratchBuffer, pagerState: null);
var pageHeader = (PageHeader*)scratchPage;

// When encryption is off, we do validation by checksum
Expand Down
4 changes: 2 additions & 2 deletions src/Voron/Impl/Scratch/ScratchBufferFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,9 @@ public T ReadPageHeaderForDebug<T>(LowLevelTransaction tx, long p, PagerState pa
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte* AcquirePagePointerWithOverflowHandling(IPagerLevelTransactionState tx, long p)
public byte* AcquirePagePointerWithOverflowHandling(IPagerLevelTransactionState tx, long p, PagerState pagerState)
{
return _scratchPager.AcquirePagePointerWithOverflowHandling(tx, p);
return _scratchPager.AcquirePagePointerWithOverflowHandling(tx, p, pagerState);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
10 changes: 2 additions & 8 deletions src/Voron/Impl/Scratch/ScratchBufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,6 @@ private ScratchBufferItem NextFile(long minSize, long? requestedSize)

return item;
}
public PagerState GetPagerState(int scratchNumber)
{
// Not thread-safe but only called by a single writer.
var bufferFile = _scratchBuffers[scratchNumber].File;
return bufferFile.PagerState;
}

public PageFromScratchBuffer Allocate(LowLevelTransaction tx, int numberOfPages)
{
Expand Down Expand Up @@ -415,12 +409,12 @@ public PageFromScratchBuffer ShrinkOverflowPage(PageFromScratchBuffer value, int
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte* AcquirePagePointerWithOverflowHandling(IPagerLevelTransactionState tx, int scratchNumber, long p)
public byte* AcquirePagePointerWithOverflowHandling(IPagerLevelTransactionState tx, int scratchNumber, long p, PagerState pagerState)
{
var item = GetScratchBufferFile(scratchNumber);

ScratchBufferFile bufferFile = item.File;
return bufferFile.AcquirePagePointerWithOverflowHandling(tx, p);
return bufferFile.AcquirePagePointerWithOverflowHandling(tx, p, pagerState);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
56 changes: 56 additions & 0 deletions test/SlowTests/Server/Documents/ETL/Queue/RavenDB_21530.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System.Collections.Generic;
using Raven.Client.Documents.Operations.ETL.Queue;
using Tests.Infrastructure;
using Tests.Infrastructure.ConnectionString;
using Xunit;
using Xunit.Abstractions;

namespace SlowTests.Server.Documents.ETL.Queue;

public class RavenDB_21530 : QueueEtlTestBase
{
public RavenDB_21530(ITestOutputHelper output) : base(output)
{
}

[RavenFact(RavenTestCategory.Etl)]
public void Can_check_kafka_connection_string_against_secured_channel()
{
var c = new QueueEtlConfiguration
{
BrokerType = QueueBrokerType.Kafka,
Connection = new QueueConnectionString
{
Name = "Test",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings()
{
ConnectionOptions = new Dictionary<string, string>()
{
{"security.protocol", "SASL_SSL"}
},
BootstrapServers = "localhost:29290"
}
}
};

Assert.True(c.UsingEncryptedCommunicationChannel());
}

[RavenFact(RavenTestCategory.Etl)]
public void Can_check_rabbitmq_connection_string_against_secured_channel()
{
var c = new QueueEtlConfiguration
{
BrokerType = QueueBrokerType.RabbitMq,
Connection = new QueueConnectionString
{
Name = "Test",
BrokerType = QueueBrokerType.RabbitMq,
RabbitMqConnectionSettings = new RabbitMqConnectionSettings() { ConnectionString = "amqps://guest:guest@localhost:5672/" }
}
};

Assert.True(c.UsingEncryptedCommunicationChannel());
}
}
68 changes: 68 additions & 0 deletions test/SlowTests/Voron/Issues/RavenDB_17997.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using FastTests.Voron;
using Tests.Infrastructure;
using Voron;
using Voron.Impl;
using Voron.Impl.Paging;
using Xunit.Abstractions;

namespace SlowTests.Voron.Issues;

public unsafe class RavenDB_17997 : StorageTest
{
public RavenDB_17997(ITestOutputHelper output) : base(output)
{
}

protected override void Configure(StorageEnvironmentOptions options)
{
options.ManualFlushing = true;
options.ManualSyncing = true;
}

[RavenFact(RavenTestCategory.Voron)]
public void MustNotReadFromUnmappedAllocation()
{
var dataPager = Env.Options.DataPager;

using (var tempTx = new TempPagerTransaction())
{
PagerState stateUsedDuringAcquirePagePointer;
byte* ptr;

{
// the below code show the wrong usage that we had in ApplyPagesToDataFileFromScratch() where we're not under write tx
// if we don't pass pager state to AcquirePagePointer() then it will use the one from the pager instance which might get released because of AllocateMorePages() calls

/*
var state = dataPager.PagerState;
state.AddRef();
dataPager.AllocateMorePages(4 * 1024 * 1024);
dataPager.AllocateMorePages(8 * 1024 * 1024);
stateUsedDuringAcquirePagePointer = dataPager.PagerState;
ptr = dataPager.AcquirePagePointer(tempTx, 0);
*/
}

{
// correct usage should be to increment reference count of pager state and use that pager in AcquirePagePointer()
stateUsedDuringAcquirePagePointer = dataPager.GetPagerStateAndAddRefAtomically();

dataPager.AllocateMorePages(4 * 1024 * 1024);
dataPager.AllocateMorePages(8 * 1024 * 1024);

ptr = dataPager.AcquirePagePointer(tempTx, 0, stateUsedDuringAcquirePagePointer);
}

dataPager.AllocateMorePages(16 * 1024 * 1024);

if (ptr == stateUsedDuringAcquirePagePointer.MapBase && stateUsedDuringAcquirePagePointer._released)
{
throw new InvalidOperationException("Cannot read from already unmapped allocation of memory mapped file");
}
}
}
}

0 comments on commit eac0069

Please sign in to comment.