k8s reliable log hashing

This commit is contained in:
Frostebite 2023-09-18 19:21:44 +01:00
parent 2e9e9dfb6a
commit b91e21f29f
8 changed files with 80 additions and 40 deletions

BIN
dist/index.js generated vendored

Binary file not shown.

BIN
dist/index.js.map generated vendored

Binary file not shown.

BIN
dist/licenses.txt generated vendored

Binary file not shown.

View File

@ -40,6 +40,7 @@
"commander": "^9.0.0", "commander": "^9.0.0",
"commander-ts": "^0.2.0", "commander-ts": "^0.2.0",
"kubernetes-client": "^9.0.0", "kubernetes-client": "^9.0.0",
"md5": "^2.3.0",
"nanoid": "^3.3.1", "nanoid": "^3.3.1",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"semver": "^7.5.2", "semver": "^7.5.2",

View File

@ -5,11 +5,10 @@ import { CloudRunnerSystem } from '../../services/core/cloud-runner-system';
import CloudRunner from '../../cloud-runner'; import CloudRunner from '../../cloud-runner';
import KubernetesPods from './kubernetes-pods'; import KubernetesPods from './kubernetes-pods';
import { FollowLogStreamService } from '../../services/core/follow-log-stream-service'; import { FollowLogStreamService } from '../../services/core/follow-log-stream-service';
import { RemoteClientLogger } from '../../remote-client/remote-client-logger';
class KubernetesTaskRunner { class KubernetesTaskRunner {
static lastReceivedTimestamp: number = 0;
static readonly maxRetry: number = 3; static readonly maxRetry: number = 3;
static lastReceivedMessage: string = ``;
static async runTask( static async runTask(
kubeConfig: KubeConfig, kubeConfig: KubeConfig,
kubeClient: CoreV1Api, kubeClient: CoreV1Api,
@ -21,30 +20,17 @@ class KubernetesTaskRunner {
let output = ''; let output = '';
let shouldReadLogs = true; let shouldReadLogs = true;
let shouldCleanup = true; let shouldCleanup = true;
let sinceTime = ``;
let retriesAfterFinish = 0; let retriesAfterFinish = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
await new Promise((resolve) => setTimeout(resolve, 3000)); await new Promise((resolve) => setTimeout(resolve, 3000));
const lastReceivedMessage =
KubernetesTaskRunner.lastReceivedTimestamp > 0
? `\nLast Log Message "${this.lastReceivedMessage}" ${this.lastReceivedTimestamp}`
: ``;
CloudRunnerLogger.log( CloudRunnerLogger.log(
`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} ${CloudRunner.buildParameters.kubeVolumeSize}/${CloudRunner.buildParameters.containerCpu}/${CloudRunner.buildParameters.containerMemory}\n${lastReceivedMessage}`, `Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} ${CloudRunner.buildParameters.kubeVolumeSize}/${CloudRunner.buildParameters.containerCpu}/${CloudRunner.buildParameters.containerMemory}`,
); );
if (KubernetesTaskRunner.lastReceivedTimestamp > 0) {
CloudRunnerLogger.log(`Last received timestamp was set, including --since-time parameter`);
const currentDate = new Date(KubernetesTaskRunner.lastReceivedTimestamp);
const dateTimeIsoString = currentDate.toISOString();
sinceTime = ` --since-time="${dateTimeIsoString}"`;
}
let extraFlags = ``; let extraFlags = ``;
extraFlags += (await KubernetesPods.IsPodRunning(podName, namespace, kubeClient)) extraFlags += (await KubernetesPods.IsPodRunning(podName, namespace, kubeClient))
? ` -f -c ${containerName}` ? ` -f -c ${containerName}`
: ` --previous`; : ` --previous`;
let lastMessageSeenIncludedInChunk = false;
let lastMessageSeen = false;
let logs; let logs;
const callback = (outputChunk: string) => { const callback = (outputChunk: string) => {
@ -56,12 +42,7 @@ class KubernetesTaskRunner {
} }
}; };
try { try {
logs = await CloudRunnerSystem.Run( logs = await CloudRunnerSystem.Run(`kubectl logs ${podName}${extraFlags}`, false, true, callback);
`kubectl logs ${podName}${extraFlags} --timestamps${sinceTime}`,
false,
true,
callback,
);
} catch (error: any) { } catch (error: any) {
await new Promise((resolve) => setTimeout(resolve, 3000)); await new Promise((resolve) => setTimeout(resolve, 3000));
const continueStreaming = await KubernetesPods.IsPodRunning(podName, namespace, kubeClient); const continueStreaming = await KubernetesPods.IsPodRunning(podName, namespace, kubeClient);
@ -78,31 +59,14 @@ class KubernetesTaskRunner {
} }
const splitLogs = logs.split(`\n`); const splitLogs = logs.split(`\n`);
for (const chunk of splitLogs) { for (const chunk of splitLogs) {
if (
chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) &&
KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``) !== ``
) {
CloudRunnerLogger.log(`Previous log message found ${chunk}`);
lastMessageSeenIncludedInChunk = true;
}
}
for (const chunk of splitLogs) {
const newDate = Date.parse(`${chunk.toString().split(`Z `)[0]}Z`);
if (chunk.replace(/\s/g, ``) === KubernetesTaskRunner.lastReceivedMessage.replace(/\s/g, ``)) {
lastMessageSeen = true;
}
if (lastMessageSeenIncludedInChunk && !lastMessageSeen) {
continue;
}
const message = CloudRunner.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1]; const message = CloudRunner.buildParameters.cloudRunnerDebug ? chunk : chunk.split(`Z `)[1];
KubernetesTaskRunner.lastReceivedMessage = chunk;
KubernetesTaskRunner.lastReceivedTimestamp = newDate;
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration( ({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
message, message,
shouldReadLogs, shouldReadLogs,
shouldCleanup, shouldCleanup,
output, output,
)); ));
FollowLogStreamService.DidReceiveEndOfTransmission = RemoteClientLogger.HandleLogChunkLine(message);
} }
if (FollowLogStreamService.DidReceiveEndOfTransmission) { if (FollowLogStreamService.DidReceiveEndOfTransmission) {
CloudRunnerLogger.log('end of log stream'); CloudRunnerLogger.log('end of log stream');

View File

@ -5,6 +5,7 @@ import CloudRunner from '../cloud-runner';
import CloudRunnerOptions from '../options/cloud-runner-options'; import CloudRunnerOptions from '../options/cloud-runner-options';
import { CloudRunnerSystem } from '../services/core/cloud-runner-system'; import { CloudRunnerSystem } from '../services/core/cloud-runner-system';
import { CloudRunnerFolders } from '../options/cloud-runner-folders'; import { CloudRunnerFolders } from '../options/cloud-runner-folders';
const md5 = require('md5');
export class RemoteClientLogger { export class RemoteClientLogger {
private static get LogFilePath() { private static get LogFilePath() {
@ -71,4 +72,29 @@ export class RemoteClientLogger {
await new Promise((resolve) => setTimeout(resolve, 15000)); await new Promise((resolve) => setTimeout(resolve, 15000));
} }
} }
public static HandleLogChunkLine(message: string): boolean {
if (message.includes('LOGHASH: ')) {
RemoteClientLogger.md5 = message.split(`LOGHASH: `)[1];
CloudRunnerLogger.log(`LOGHASH: ${RemoteClientLogger.md5}`);
} else {
if (RemoteClientLogger.value !== '') {
RemoteClientLogger.value += `\n`;
}
RemoteClientLogger.value += message;
const hashedValue = md5(RemoteClientLogger.value);
CloudRunnerLogger.log(
`LOG ITERATION \n message:${message} \n target hash:${RemoteClientLogger.md5} \n hash latest value:${hashedValue} \n cache value:${RemoteClientLogger.value}`,
);
if (RemoteClientLogger.md5 === hashedValue) {
CloudRunnerLogger.log(`LOG COMPLETE`);
return true;
}
}
return false;
}
static value: string = '';
static md5: any;
} }

View File

@ -0,0 +1,25 @@
import { RemoteClientLogger } from '../../remote-client/remote-client-logger';
import setups from '../cloud-runner-suite.test';
const md5 = require('md5');
describe('Cloud Runner Remote Client', () => {
it('Responds', () => {});
setups();
it('Run one build it using K8s without error', async () => {
const testLogStream = 'Test \n Log \n Stream';
const splitLogStream = testLogStream.split('\n');
RemoteClientLogger.HandleLogChunkLine(`LOGHASH: ${md5(testLogStream)}`);
let completed = false;
for (const element of splitLogStream) {
completed = RemoteClientLogger.HandleLogChunkLine(element);
}
expect(completed).toBeTruthy();
}, 1_000_000_000);
// eslint-disable-next-line unicorn/consistent-function-scoping, no-unused-vars
function CreateLogWatcher(callback: (finalMessage: string) => void) {
return (message: string) => {
callback(message);
};
}
});

View File

@ -1972,6 +1972,11 @@ char-regex@^1.0.2:
resolved "https://registry.npmjs.org/char-regex/-/char-regex-1.0.2.tgz" resolved "https://registry.npmjs.org/char-regex/-/char-regex-1.0.2.tgz"
integrity sha512-kWWXztvZ5SBQV+eRgKFeh8q5sLuZY2+8WUIzlxWVTg+oGwY14qylx1KbKzHd8P6ZYkAg0xyIDU9JMHhyJMZ1jw== integrity sha512-kWWXztvZ5SBQV+eRgKFeh8q5sLuZY2+8WUIzlxWVTg+oGwY14qylx1KbKzHd8P6ZYkAg0xyIDU9JMHhyJMZ1jw==
charenc@0.0.2:
version "0.0.2"
resolved "https://registry.yarnpkg.com/charenc/-/charenc-0.0.2.tgz#c0a1d2f3a7092e03774bfa83f14c0fc5790a8667"
integrity sha512-yrLQ/yVUFXkzg7EDQsPieE/53+0RlaWTs+wBrvW36cyilJ2SaDWfl4Yj7MtLTXleV9uEKefbAGUPv2/iWSooRA==
chownr@^2.0.0: chownr@^2.0.0:
version "2.0.0" version "2.0.0"
resolved "https://registry.npmjs.org/chownr/-/chownr-2.0.0.tgz" resolved "https://registry.npmjs.org/chownr/-/chownr-2.0.0.tgz"
@ -2155,6 +2160,11 @@ cross-spawn@^7.0.1, cross-spawn@^7.0.2, cross-spawn@^7.0.3:
shebang-command "^2.0.0" shebang-command "^2.0.0"
which "^2.0.1" which "^2.0.1"
crypt@0.0.2:
version "0.0.2"
resolved "https://registry.yarnpkg.com/crypt/-/crypt-0.0.2.tgz#88d7ff7ec0dfb86f713dc87bbb42d044d3e6c41b"
integrity sha512-mCxBlsHFYh9C+HVpiEacem8FEBnMXgU9gy4zmNC+SXAZNB/1idgp/aulFJ4FgCi7GPEVbfyng092GqL2k2rmow==
cssom@^0.4.4: cssom@^0.4.4:
version "0.4.4" version "0.4.4"
resolved "https://registry.npmjs.org/cssom/-/cssom-0.4.4.tgz" resolved "https://registry.npmjs.org/cssom/-/cssom-0.4.4.tgz"
@ -3301,6 +3311,11 @@ is-boolean-object@^1.1.0:
dependencies: dependencies:
call-bind "^1.0.0" call-bind "^1.0.0"
is-buffer@~1.1.6:
version "1.1.6"
resolved "https://registry.yarnpkg.com/is-buffer/-/is-buffer-1.1.6.tgz#efaa2ea9daa0d7ab2ea13a97b2b8ad51fefbe8be"
integrity sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w==
is-callable@^1.1.4, is-callable@^1.2.3: is-callable@^1.1.4, is-callable@^1.2.3:
version "1.2.3" version "1.2.3"
resolved "https://registry.npmjs.org/is-callable/-/is-callable-1.2.3.tgz" resolved "https://registry.npmjs.org/is-callable/-/is-callable-1.2.3.tgz"
@ -4224,6 +4239,15 @@ makeerror@1.0.x:
dependencies: dependencies:
tmpl "1.0.x" tmpl "1.0.x"
md5@^2.3.0:
version "2.3.0"
resolved "https://registry.yarnpkg.com/md5/-/md5-2.3.0.tgz#c3da9a6aae3a30b46b7b0c349b87b110dc3bda4f"
integrity sha512-T1GITYmFaKuO91vxyoQMFETst+O71VUPEU3ze5GNzDm0OWdP8v1ziTaAEPUr/3kLsY3Sftgz242A1SetQiDL7g==
dependencies:
charenc "0.0.2"
crypt "0.0.2"
is-buffer "~1.1.6"
merge-stream@^2.0.0: merge-stream@^2.0.0:
version "2.0.0" version "2.0.0"
resolved "https://registry.npmjs.org/merge-stream/-/merge-stream-2.0.0.tgz" resolved "https://registry.npmjs.org/merge-stream/-/merge-stream-2.0.0.tgz"