unity-builder/src/model/cloud-runner/providers/k8s/kubernetes-task-runner.ts
Frostebite e73b48fb38
Cloud runner develop - Stabilizes kubernetes provider (#531)
* fixes

* fixes

* fixes

* fixes

* fixes

* check for startup message in workflows

* check for startup message in workflows

* check for startup message in workflows

* check for startup message in workflows

* check for startup message in workflows

* check for startup message in workflows

* Update cloud-runner-ci-pipeline.yml

* Update cloud-runner-ci-pipeline.yml

* no storage class specified

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* updates

* log file path

* latest develop

* log file path

* log file path

* Update package.json

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* log file path

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* stream logs through standard input and new remote client cli command

* update pipeline to use k3s

* version: 'latest'

* fixes

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* disable aws pipe for now

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* push k8s logs to LOG SERVICE IP

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* podname logs for log service

* podname logs for log service

* podname logs for log service

* podname logs for log service

* podname logs for log service

* podname logs for log service

* podname logs for log service

* podname logs for log service

* podname logs for log service

* hashed logs

* hashed logs

* hashed logs

* hashed logs

* hashed logs

* hashed logs

* no wait, just repeat logs

* no wait, just repeat logs

* remove typo - double await

* test fix - kubernetes - name typo in github yaml

* test fix - kubernetes - name typo in github yaml

* check missing log file

* check missing log file

* Push to steam test

* Push to steam test

* Fix path

* k8s reliable log hashing

* k8s reliable log hashing

* k8s reliable log hashing

* hashed logging k8s

* hashed logging k8s

* hashed logging k8s

* hashed logging k8s

* hashed logging k8s

* hashed logging k8s

* Include log chunk when task runner sees log update, clarify if we can pull logs from same line or next line

* Include log chunk when task runner sees log update, clarify if we can pull logs from same line or next line

* Include log chunk when task runner sees log update, clarify if we can pull logs from same line or next line

* Include log chunk when task runner sees log update, clarify if we can pull logs from same line or next line

* Include log chunk when task runner sees log update, clarify if we can pull logs from same line or next line

* Fix exit flow for k8s job

* hash comparison logging for log complete in k8s flow

* Interrupt k8s logs when logs found

* cleanup async parameter

* cleanup async parameter

* cleanup async parameter

* fixes

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix
2024-02-06 23:46:31 +00:00

120 lines
4.0 KiB
TypeScript

import { CoreV1Api, KubeConfig } from '@kubernetes/client-node';
import CloudRunnerLogger from '../../services/core/cloud-runner-logger';
import { waitUntil } from 'async-wait-until';
import { CloudRunnerSystem } from '../../services/core/cloud-runner-system';
import CloudRunner from '../../cloud-runner';
import KubernetesPods from './kubernetes-pods';
import { FollowLogStreamService } from '../../services/core/follow-log-stream-service';
class KubernetesTaskRunner {
static readonly maxRetry: number = 3;
static lastReceivedMessage: string = ``;
static async runTask(
kubeConfig: KubeConfig,
kubeClient: CoreV1Api,
jobName: string,
podName: string,
containerName: string,
namespace: string,
) {
let output = '';
let shouldReadLogs = true;
let shouldCleanup = true;
let retriesAfterFinish = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
await new Promise((resolve) => setTimeout(resolve, 3000));
CloudRunnerLogger.log(
`Streaming logs from pod: ${podName} container: ${containerName} namespace: ${namespace} ${CloudRunner.buildParameters.kubeVolumeSize}/${CloudRunner.buildParameters.containerCpu}/${CloudRunner.buildParameters.containerMemory}`,
);
let extraFlags = ``;
extraFlags += (await KubernetesPods.IsPodRunning(podName, namespace, kubeClient))
? ` -f -c ${containerName}`
: ` --previous`;
const callback = (outputChunk: string) => {
output += outputChunk;
// split output chunk and handle per line
for (const chunk of outputChunk.split(`\n`)) {
({ shouldReadLogs, shouldCleanup, output } = FollowLogStreamService.handleIteration(
chunk,
shouldReadLogs,
shouldCleanup,
output,
));
}
};
try {
await CloudRunnerSystem.Run(`kubectl logs ${podName}${extraFlags}`, false, true, callback);
} catch (error: any) {
await new Promise((resolve) => setTimeout(resolve, 3000));
const continueStreaming = await KubernetesPods.IsPodRunning(podName, namespace, kubeClient);
CloudRunnerLogger.log(`K8s logging error ${error} ${continueStreaming}`);
if (continueStreaming) {
continue;
}
if (retriesAfterFinish < KubernetesTaskRunner.maxRetry) {
retriesAfterFinish++;
continue;
}
throw error;
}
if (FollowLogStreamService.DidReceiveEndOfTransmission) {
CloudRunnerLogger.log('end of log stream');
break;
}
}
return output;
}
static async watchUntilPodRunning(kubeClient: CoreV1Api, podName: string, namespace: string) {
let waitComplete: boolean = false;
let message = ``;
CloudRunnerLogger.log(`Watching ${podName} ${namespace}`);
await waitUntil(
async () => {
const status = await kubeClient.readNamespacedPodStatus(podName, namespace);
const phase = status?.body.status?.phase;
waitComplete = phase !== 'Pending';
message = `Phase:${status.body.status?.phase} \n Reason:${
status.body.status?.conditions?.[0].reason || ''
} \n Message:${status.body.status?.conditions?.[0].message || ''}`;
// CloudRunnerLogger.log(
// JSON.stringify(
// (await kubeClient.listNamespacedEvent(namespace)).body.items
// .map((x) => {
// return {
// message: x.message || ``,
// name: x.metadata.name || ``,
// reason: x.reason || ``,
// };
// })
// .filter((x) => x.name.includes(podName)),
// undefined,
// 4,
// ),
// );
if (waitComplete || phase !== 'Pending') return true;
return false;
},
{
timeout: 2000000,
intervalBetweenAttempts: 15000,
},
);
if (!waitComplete) {
CloudRunnerLogger.log(message);
}
return waitComplete;
}
}
export default KubernetesTaskRunner;