21021: Merge branch 'main' into 21021-controller-logout
[arvados.git] / services / workbench2 / src / store / process-logs-panel / process-logs-panel-actions.ts
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16 import { ContainerRequestResource, ContainerRequestState } from "models/container-request";
17
18 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
19 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
20
21 export const processLogsPanelActions = unionize({
22     RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
23     INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
24     SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
25     ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
26 });
27
28 // Max size of logs to fetch in bytes
29 const maxLogFetchSize: number = 128 * 1000;
30
31 type FileWithProgress = {
32     file: CollectionFile;
33     lastByte: number;
34 }
35
36 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
37
38 export const setProcessLogsPanelFilter = (filter: string) =>
39     processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
40
41 export const initProcessLogsPanel = (processUuid: string) =>
42     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
43         let process: Process | undefined;
44         try {
45             dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
46             process = getProcess(processUuid)(getState().resources);
47             if (process?.containerRequest?.uuid) {
48                 // Get log file size info
49                 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
50
51                 // Populate lastbyte 0 for each file
52                 const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 }));
53
54                 // Fetch array of LogFragments
55                 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
56
57                 // Populate initial state with filters
58                 const initialState = createInitialLogPanelState(logFiles, logLines);
59                 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
60             }
61         } catch (e) {
62             // On error, populate empty state to allow polling to start
63             const initialState = createInitialLogPanelState([], []);
64             dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
65             // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
66             if (e.status !== 404) {
67                 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR }));
68             }
69             if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) {
70                 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
71             }
72         }
73     };
74
75 export const pollProcessLogs = (processUuid: string) =>
76     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
77         try {
78             // Get log panel state and process from store
79             const currentState = getState().processLogsPanel;
80             const process = getProcess(processUuid)(getState().resources);
81
82             // Check if container request is present and initial logs state loaded
83             if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
84                 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
85
86                 // Determine byte to fetch from while filtering unchanged files
87                 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
88                     // Fetch last byte or 0 for new log files
89                     const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
90
91                     const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
92                     const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
93
94                     if (isNew || isChanged) {
95                         return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte });
96                     } else {
97                         return acc;
98                     }
99                 }, [] as FileWithProgress[]);
100
101                 // Perform range request(s) for each file
102                 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
103
104                 if (logFragments.length) {
105                     // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
106                     const groupedLogs = groupLogs(logFiles, logFragments);
107                     await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
108                 }
109             }
110             return Promise.resolve();
111         } catch (e) {
112             // Remove log when polling error is handled in some way instead of being ignored
113             console.error("Error occurred in pollProcessLogs:", e);
114             return Promise.reject();
115         }
116     };
117
118 const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
119     const logCollectionContents = await logService.listLogFiles(containerRequest);
120
121     // Filter only root directory files matching log event types which have bytes
122     return logCollectionContents.filter((file): file is CollectionFile => (
123         file.type === CollectionFileType.FILE &&
124         PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
125         file.size > 0
126     ));
127 };
128
129 /**
130  * Loads the contents of each file from each file's lastByte simultaneously
131  *   while respecting the maxLogFetchSize by requesting the start and end
132  *   of the desired block and inserting a snipline.
133  * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
134  * @param logService
135  * @param process
136  * @returns LogFragment[] containing a single LogFragment corresponding to each input file
137  */
138 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
139     (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => {
140         const requestSize = file.size - lastByte;
141         if (requestSize > maxLogFetchSize) {
142             const chunkSize = Math.floor(maxLogFetchSize / 2);
143             const firstChunkEnd = lastByte + chunkSize - 1;
144             return Promise.all([
145                 logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
146                 logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1)
147             ] as Promise<(LogFragment)>[]);
148         } else {
149             return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]);
150         }
151     })).then((res) => {
152         if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
153             // Since allSettled does not pass promise rejection we throw an
154             //   error if every request failed
155             const error = res.find(
156                 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
157             )?.reason;
158             return Promise.reject(error);
159         }
160         return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
161             // Filter out log files with rejected promises
162             //   (Promise.all rejects on any failure)
163             promiseResult.status === 'fulfilled' &&
164             // Filter out files where any fragment is empty
165             //   (prevent incorrect snipline generation or an un-resumable situation)
166             !!promiseResult.value.every(logFragment => logFragment.contents.length)
167         )).map(one => one.value)
168     })).map((logResponseSet) => {
169         // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
170         //   Don't add snip line as a separate line so that sorting won't reorder it
171         for (let i = 1; i < logResponseSet.length; i++) {
172             const fragment = logResponseSet[i - 1];
173             const lastLineIndex = fragment.contents.length - 1;
174             const lastLineContents = fragment.contents[lastLineIndex];
175             const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
176
177             logResponseSet[i - 1].contents[lastLineIndex] = newLastLine;
178         }
179
180         // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
181         return logResponseSet.reduce((acc, curr: LogFragment) => ({
182             logType: curr.logType,
183             contents: [...(acc.contents || []), ...curr.contents]
184         }), {} as LogFragment);
185     })
186 );
187
188 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => {
189     const logs = groupLogs(logFiles, logFragments);
190     const filters = Object.keys(logs);
191     return { filters, logs };
192 }
193
194 /**
195  * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
196  * @param logFiles
197  * @param logFragments
198  * @returns ProcessLogs for the store
199  */
200 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
201     const sortableLogFragments = mergeMultilineLoglines(logFragments);
202
203     const allLogs = mergeSortLogFragments(sortableLogFragments);
204     const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
205
206     const groupedLogs = logFragments.reduce((grouped, fragment) => ({
207         ...grouped,
208         [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents }
209     }), {});
210
211     return {
212         [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs },
213         [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs },
214         ...groupedLogs,
215     }
216 };
217
218 /**
219  * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
220  *   If there is no previous line (first line has no timestamp), the line is deleted.
221  *   Only used for combined logs that need sorting by timestamp after merging
222  * @param logFragments
223  * @returns Modified LogFragment[]
224  */
225 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
226     logFragments.map((fragment) => {
227         // Avoid altering the original fragment copy
228         let fragmentCopy: LogFragment = {
229             logType: fragment.logType,
230             contents: [...fragment.contents],
231         }
232         // Merge any non-timestamped lines in sortable log types with previous line
233         if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
234             for (let i = 0; i < fragmentCopy.contents.length; i++) {
235                 const lineContents = fragmentCopy.contents[i];
236                 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
237                     // Partial line without timestamp detected
238                     if (i > 0) {
239                         // If not first line, copy line to previous line
240                         const previousLineContents = fragmentCopy.contents[i - 1];
241                         const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
242                         fragmentCopy.contents[i - 1] = newPreviousLineContents;
243                     }
244                     // Delete the current line and prevent iterating
245                     fragmentCopy.contents.splice(i, 1);
246                     i--;
247                 }
248             }
249         }
250         return fragmentCopy;
251     })
252 );
253
254 /**
255  * Merges log lines of different types and sorts types that contain timestamps (are sortable)
256  * @param logFragments
257  * @returns string[] of merged and sorted log lines
258  */
259 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
260     const sortableLines = fragmentsToLines(logFragments
261         .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
262
263     const nonSortableLines = fragmentsToLines(logFragments
264         .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
265         .sort((a, b) => (a.logType.localeCompare(b.logType))));
266
267     return [...nonSortableLines, ...sortableLines.sort(sortLogLines)]
268 };
269
270 const sortLogLines = (a: string, b: string) => {
271     return a.localeCompare(b);
272 };
273
274 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
275     fragments.reduce((acc, fragment: LogFragment) => (
276         acc.concat(...fragment.contents)
277     ), [] as string[])
278 );
279
280 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
281     return logFiles.find((file) => (file.name.startsWith(key)))?.size
282 };
283
284 export const navigateToLogCollection = (uuid: string) =>
285     async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
286         try {
287             await services.collectionService.get(uuid);
288             dispatch<any>(navigateTo(uuid));
289         } catch {
290             dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
291         }
292     };
293
294 const ALL_FILTER_TYPE = 'All logs';
295
296 const MAIN_FILTER_TYPE = 'Main logs';
297 const MAIN_EVENT_TYPES = [
298     LogEventType.CRUNCH_RUN,
299     LogEventType.STDERR,
300     LogEventType.STDOUT,
301 ];
302
303 const PROCESS_PANEL_LOG_EVENT_TYPES = [
304     LogEventType.ARV_MOUNT,
305     LogEventType.CRUNCH_RUN,
306     LogEventType.CRUNCHSTAT,
307     LogEventType.DISPATCH,
308     LogEventType.HOSTSTAT,
309     LogEventType.NODE_INFO,
310     LogEventType.STDERR,
311     LogEventType.STDOUT,
312     LogEventType.CONTAINER,
313     LogEventType.KEEPSTORE,
314 ];
315
316 const NON_SORTED_LOG_TYPES = [
317     LogEventType.NODE_INFO,
318     LogEventType.CONTAINER,
319 ];