Skip to content

Commit

Permalink
Fixed initialization of failure detector leading to FailureDetection(…
Browse files Browse the repository at this point in the history
…) test failure
  • Loading branch information
sakno committed Aug 8, 2023
1 parent b491a19 commit e29b972
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ internal sealed class Replicator : IValueTaskSource<Result<bool>>, ILogEntryCons
private ConfiguredTaskAwaitable<Result<HeartbeatResult>>.ConfiguredTaskAwaiter replicationAwaiter;
private ManualResetValueTaskSourceCore<Result<bool>> completionSource;

// contextual fields
internal IFailureDetector? FailureDetector;

internal Replicator(TMember member, ILogger logger)
{
Debug.Assert(member is not null);
Expand All @@ -45,6 +42,12 @@ internal Replicator(TMember member, ILogger logger)
continuation = Complete;
}

internal IFailureDetector? FailureDetector
{
init;
get;
}

internal void Initialize(
IClusterConfiguration activeConfig,
IClusterConfiguration? proposedConfig,
Expand Down Expand Up @@ -316,6 +319,8 @@ private Task<Result<bool>> QueueReplication(
return workItem.Task;
}

private static Replicator CreateReplicator(ILogger logger, TMember member)
=> new(member, logger);
private Replicator CreateReplicator(TMember member) => new(member, Logger)
{
FailureDetector = detectorFactory is not null ? detectorFactory.Invoke(maxLease, member) : null,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ internal LeaderState(IRaftStateMachine<TMember> stateMachine, bool allowPartitio
replicationEvent = new(initialState: false) { MeasurementTags = stateMachine.MeasurementTags };
replicationQueue = new() { MeasurementTags = stateMachine.MeasurementTags };
context = new();

unsafe
{
replicatorFactory = DelegateHelpers.CreateDelegate<ILogger, TMember, Replicator>(&CreateReplicator, stateMachine.Logger);
}
replicatorFactory = CreateReplicator;
}

internal ILeaderStateMetrics? Metrics
Expand Down Expand Up @@ -142,14 +138,11 @@ private async ValueTask<bool> DoHeartbeats(Timestamp startTime, TaskCompletionPi
private bool ProcessMemberResponse(Timestamp startTime, Task<Result<bool>> response, ref long term, ref int quorum, ref int commitQuorum, ref int leaseRenewalThreshold)
{
var replicator = ReplicationWorkItem.GetReplicator(response);
var memberDetector = replicator.FailureDetector is null && detectorFactory is not null
? replicator.FailureDetector = detectorFactory(maxLease, replicator.Member)
: null;

try
{
var result = response.GetAwaiter().GetResult();
memberDetector?.ReportHeartbeat();
replicator.FailureDetector?.ReportHeartbeat();
term = Math.Max(term, result.Term);
quorum++;

Expand Down Expand Up @@ -193,7 +186,7 @@ private bool ProcessMemberResponse(Timestamp startTime, Task<Result<bool>> respo
}

// report unavailable cluster member
switch (memberDetector)
switch (replicator.FailureDetector)
{
case { IsMonitoring: false }:
Logger.UnknownHealthStatus(replicator.Member.EndPoint);
Expand Down

0 comments on commit e29b972

Please sign in to comment.