using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using Senzing.Sdk;
using Senzing.Sdk.Core;
using Senzing.Snippets.Support; // supporting classes for this example
using static Senzing.Sdk.SzFlags;
#pragma warning disable CA1303 // Do not pass literals as localized parameters (example messages)
// get the senzing repository settings
string? settings = Environment.GetEnvironmentVariable("SENZING_ENGINE_CONFIGURATION_JSON");
if (settings == null)
{
Console.Error.WriteLine("Unable to get settings.");
throw new ArgumentException("Unable to get settings");
}
// create a descriptive instance name (can be anything)
Assembly assembly = Assembly.GetExecutingAssembly();
string? instanceName = assembly.GetName().Name;
// initialize the Senzing environment
SzEnvironment env = SzCoreEnvironment.NewBuilder()
.Settings(settings)
.InstanceName(instanceName)
.VerboseLogging(false)
.Build();
// Create a TaskSchedular using an implementation that restricts
// execution to a specific limited pool of threads. In order to
// improve performance and conserve memory we want to use the same
// threads for Senzing work. The TaskScheduler implementation used
// here is directly pulled from Microsoft's TaskScheduler documentation
TaskScheduler taskScheduler
= new LimitedConcurrencyLevelTaskScheduler(ThreadCount);
// create a TaskFactory and pass it our custom scheduler
TaskFactory factory = new TaskFactory(taskScheduler);
// keep track of the pending tasks and don't backlog too many for memory's sake
IList<(Task, string)> pendingFutures = new List<(Task, string)>(MaximumBacklog);
AppDomain.CurrentDomain.ProcessExit += (s, e) =>
{
#pragma warning disable CA1031 // Need to catch all exceptions here
try
{
HandlePendingFutures(pendingFutures, true);
}
catch (Exception exception)
{
Console.Error.WriteLine(exception);
}
#pragma warning restore CA1031 // Need to catch all exceptions here
// IMPORTANT: make sure to destroy the environment
env.Destroy();
OutputRedoStatistics();
};
try
{
// get the engine from the environment
SzEngine engine = env.GetEngine();
while (true)
{
// loop through the example records and queue them up so long
// as we have more records and backlog is not too large
for (string redo = engine.GetRedoRecord();
redo != null;
redo = engine.GetRedoRecord())
{
Task task = factory.StartNew(() =>
{
engine.ProcessRedoRecord(redo, SzNoFlags);
},
CancellationToken.None,
TaskCreationOptions.None,
taskScheduler);
// add the future to the pending future list
pendingFutures.Add((task, redo));
// handle the pending futures as log as maximum backlog exceeded
for (int loop = 0;
pendingFutures.Count >= MaximumBacklog;
loop++)
{
// check if this is NOT our first iteration through the loop
if (loop > 0)
{
// if we still have exceeded the backlog size after the first
// loop iteration then pause briefly before trying again
try
{
Thread.Sleep(HandlePauseTimeout);
}
catch (ThreadInterruptedException)
{
// do nothing
}
}
// handle any pending futures WITHOUT blocking to reduce the backlog
HandlePendingFutures(pendingFutures, false);
}
}
// check if there are no redo records right now
// NOTE: we do NOT want to call countRedoRecords() in a loop that
// is processing redo records, we call it here AFTER we believe
// have processed all pending redos to confirm still zero
if (engine.CountRedoRecords() == 0)
{
OutputRedoStatistics();
Console.WriteLine();
Console.WriteLine(
"No redo records to process. Pausing for "
+ RedoPauseDescription + "....");
Console.WriteLine("Press CTRL-C to exit.");
try
{
Thread.Sleep(RedoPauseTimeout);
}
catch (ThreadInterruptedException)
{
// ignore the exception
}
continue;
}
}
}
catch (Exception e)
{
Console.Error.WriteLine();
Console.Error.WriteLine("*** Terminated due to critical error ***");
Console.Error.WriteLine(e);
Console.Error.Flush();
throw;
}
finally
{
// normally we would call env.destroy() here, but we have registered
// a shutdown hook to do that since termination will typically occur
// via CTRL-C being pressed, and the shutdown hook will still run if
// we get an exception
}
static void HandlePendingFutures(IList<(Task, string)> pendingFutures, bool blocking)
{
// loop through the pending futures
for (int index = 0; index < pendingFutures.Count; index++)
{
// get the next pending future
(Task task, string redoRecord) = pendingFutures[index];
// if not blocking and this one is not done then continue
if (!blocking && !task.IsCompleted) continue;
// remove the pending future from the list
pendingFutures.RemoveAt(index--);
try
{
try
{
// wait for completion -- if non-blocking then this
// task is already completed and this will just
// throw any exception that might have occurred
if (blocking && !task.IsCompleted)
{
task.Wait();
}
// if we get here then increment the success count
redoneCount++;
}
catch (AggregateException e)
when (e.InnerException is TaskCanceledException
|| e.InnerException is ThreadInterruptedException)
{
throw new SzRetryableException(e.InnerException);
}
catch (ThreadInterruptedException e)
{
throw new SzRetryableException(e.InnerException);
}
catch (AggregateException e)
{
if (e.InnerException != null)
{
// get the inner exception
throw e.InnerException;
}
else
{
throw;
}
}
}
catch (SzRetryableException e)
{
// handle thread interruption and cancellation as retries
LogFailedRedo(Warning, e, redoRecord);
errorCount++; // increment the error count
retryCount++; // increment the retry count
// track the retry record so it can be retried later
TrackRetryRecord(redoRecord);
}
catch (Exception e)
{
// catch any other exception (incl. SzException) here
LogFailedRedo(Critical, e, redoRecord);
errorCount++;
throw; // rethrow since exception is critical
}
}
}
///
/// Tracks the specified JSOn record definition to be retried
/// in a retry file.
///
///
/// The JSON text defining the record to be retried
///
static void TrackRetryRecord(string recordJson)
{
// track the retry record so it can be retried later
if (retryFile == null)
{
retryFile = new FileInfo(
Path.Combine(
Path.GetTempPath(),
RetryPrefix + Path.GetRandomFileName() + RetrySuffix));
retryWriter = new StreamWriter(
new FileStream(retryFile.FullName,
FileMode.Open,
FileAccess.Write),
Encoding.UTF8);
}
if (retryWriter != null)
{
retryWriter.WriteLine(recordJson);
}
}
///
/// Example method for logging failed records.
///
///
/// The error type description.
/// The exception itself.
/// The JSON text for the failed record.
static void LogFailedRedo(string errorType,
Exception exception,
string redoRecord)
{
Console.Error.WriteLine();
Console.Error.WriteLine("** " + errorType + " ** FAILED TO PROCESS REDO: ");
Console.Error.WriteLine(redoRecord);
Console.Error.WriteLine(exception);
Console.Error.Flush();
}
static void OutputRedoStatistics()
{
Console.WriteLine();
Console.WriteLine("Redos successfully processed : " + redoneCount);
Console.WriteLine("Total failed records/redos : " + errorCount);
// check on any retry records
if (retryWriter != null)
{
retryWriter.Flush();
retryWriter.Close();
}
if (retryCount > 0)
{
Console.WriteLine(
retryCount + " records/redos to be retried in " + retryFile);
}
Console.Out.Flush();
}
public partial class Program
{
private const string RedoPauseDescription = "30 seconds";
private const int RedoPauseTimeout = 30000;
private const string RetryPrefix = "retry-";
private const string RetrySuffix = ".jsonl";
private const string Warning = "WARNING";
private const string Critical = "CRITICAL";
// setup some class-wide variables
private static int errorCount;
private static int redoneCount;
private static int retryCount;
private static FileInfo? retryFile;
private static StreamWriter? retryWriter;
private const int ThreadCount = 8;
private const int BacklogFactor = 10;
private const int MaximumBacklog = ThreadCount * BacklogFactor;
private const int HandlePauseTimeout = 100;
}
#pragma warning restore CA1303 // Do not pass literals as localized parameters (example messages)